gh-95051: ensure that timeouts scheduled with asyncio.Timeout that have already expired are deliverered promptly (#95109)

Co-authored-by: Kumar Aditya <59607654+kumaraditya303@users.noreply.github.com>
This commit is contained in:
Thomas Grainger 2022-07-24 21:18:05 +01:00 committed by GitHub
parent eb9c8a8bea
commit 0c6f898005
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 32 additions and 4 deletions

View file

@ -105,6 +105,30 @@ class TimeoutTests(unittest.IsolatedAsyncioTestCase):
self.assertLess(t1-t0, 2)
self.assertTrue(t0 <= cm.when() <= t1)
async def test_timeout_zero_sleep_zero(self):
loop = asyncio.get_running_loop()
t0 = loop.time()
with self.assertRaises(TimeoutError):
async with asyncio.timeout(0) as cm:
await asyncio.sleep(0)
t1 = loop.time()
self.assertTrue(cm.expired())
# 2 sec for slow CI boxes
self.assertLess(t1-t0, 2)
self.assertTrue(t0 <= cm.when() <= t1)
async def test_timeout_in_the_past_sleep_zero(self):
loop = asyncio.get_running_loop()
t0 = loop.time()
with self.assertRaises(TimeoutError):
async with asyncio.timeout(-11) as cm:
await asyncio.sleep(0)
t1 = loop.time()
self.assertTrue(cm.expired())
# 2 sec for slow CI boxes
self.assertLess(t1-t0, 2)
self.assertTrue(t0 >= cm.when() <= t1)
async def test_foreign_exception_passed(self):
with self.assertRaises(KeyError):
async with asyncio.timeout(0.01) as cm: