Issue #19305: try to fix sporadic test_asyncio failure on FreeBSD 10.0

This commit is contained in:
Antoine Pitrou 2013-10-20 01:51:25 +02:00
parent 0d9eefda34
commit d20afad7d4
2 changed files with 20 additions and 2 deletions

View file

@ -7,6 +7,7 @@ import unittest.mock
import os
import sys
import threading
import time
import unittest
import unittest.mock
from wsgiref.simple_server import make_server, WSGIRequestHandler, WSGIServer
@ -46,6 +47,20 @@ def run_briefly(loop):
gen.close()
def run_until(loop, pred, timeout=None):
if timeout is not None:
deadline = time.time() + timeout
while not pred():
if timeout is not None:
timeout = deadline - time.time()
if timeout <= 0:
return False
loop.run_until_complete(tasks.sleep(timeout, loop=loop))
else:
run_briefly(loop)
return True
def run_once(loop):
"""loop.stop() schedules _raise_stop_error()
and run_forever() runs until _raise_stop_error() callback.