mirror of
https://github.com/python/cpython.git
synced 2025-09-19 07:00:59 +00:00
[3.11] gh-99357: Close the event loop when it is no longer used in test_uncancel_structured_blocks (GH-99414) (#99424)
(cherry picked from commit 99972dc745
)
Co-authored-by: Xiao Chen <chenxiao_7@163.com>
This commit is contained in:
parent
5971a65d07
commit
90de4b76b9
1 changed files with 23 additions and 20 deletions
|
@ -639,27 +639,30 @@ class BaseTaskTests:
|
||||||
await asyncio.sleep(0)
|
await asyncio.sleep(0)
|
||||||
return timed_out, structured_block_finished, outer_code_reached
|
return timed_out, structured_block_finished, outer_code_reached
|
||||||
|
|
||||||
# Test which timed out.
|
try:
|
||||||
t1 = self.new_task(loop, make_request_with_timeout(sleep=10.0, timeout=0.1))
|
# Test which timed out.
|
||||||
timed_out, structured_block_finished, outer_code_reached = (
|
t1 = self.new_task(loop, make_request_with_timeout(sleep=10.0, timeout=0.1))
|
||||||
loop.run_until_complete(t1)
|
timed_out, structured_block_finished, outer_code_reached = (
|
||||||
)
|
loop.run_until_complete(t1)
|
||||||
self.assertTrue(timed_out)
|
)
|
||||||
self.assertFalse(structured_block_finished) # it was cancelled
|
self.assertTrue(timed_out)
|
||||||
self.assertTrue(outer_code_reached) # task got uncancelled after leaving
|
self.assertFalse(structured_block_finished) # it was cancelled
|
||||||
# the structured block and continued until
|
self.assertTrue(outer_code_reached) # task got uncancelled after leaving
|
||||||
# completion
|
# the structured block and continued until
|
||||||
self.assertEqual(t1.cancelling(), 0) # no pending cancellation of the outer task
|
# completion
|
||||||
|
self.assertEqual(t1.cancelling(), 0) # no pending cancellation of the outer task
|
||||||
|
|
||||||
# Test which did not time out.
|
# Test which did not time out.
|
||||||
t2 = self.new_task(loop, make_request_with_timeout(sleep=0, timeout=10.0))
|
t2 = self.new_task(loop, make_request_with_timeout(sleep=0, timeout=10.0))
|
||||||
timed_out, structured_block_finished, outer_code_reached = (
|
timed_out, structured_block_finished, outer_code_reached = (
|
||||||
loop.run_until_complete(t2)
|
loop.run_until_complete(t2)
|
||||||
)
|
)
|
||||||
self.assertFalse(timed_out)
|
self.assertFalse(timed_out)
|
||||||
self.assertTrue(structured_block_finished)
|
self.assertTrue(structured_block_finished)
|
||||||
self.assertTrue(outer_code_reached)
|
self.assertTrue(outer_code_reached)
|
||||||
self.assertEqual(t2.cancelling(), 0)
|
self.assertEqual(t2.cancelling(), 0)
|
||||||
|
finally:
|
||||||
|
loop.close()
|
||||||
|
|
||||||
def test_cancel(self):
|
def test_cancel(self):
|
||||||
|
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue