Make some tests more frienly to MemoryError.

Free memory, unlock hanging threads.
This commit is contained in:
Serhiy Storchaka 2015-03-28 20:38:37 +02:00
parent d7aa5248fb
commit 9db55004a1
6 changed files with 43 additions and 16 deletions

View file

@ -3514,11 +3514,16 @@ class SignalsTest(unittest.TestCase):
# received (forcing a first EINTR in write()).
read_results = []
write_finished = False
error = None
def _read():
while not write_finished:
while r in select.select([r], [], [], 1.0)[0]:
s = os.read(r, 1024)
read_results.append(s)
try:
while not write_finished:
while r in select.select([r], [], [], 1.0)[0]:
s = os.read(r, 1024)
read_results.append(s)
except BaseException as exc:
nonlocal error
error = exc
t = threading.Thread(target=_read)
t.daemon = True
def alarm1(sig, frame):
@ -3539,6 +3544,8 @@ class SignalsTest(unittest.TestCase):
wio.flush()
write_finished = True
t.join()
self.assertIsNone(error)
self.assertEqual(N, sum(len(x) for x in read_results))
finally:
write_finished = True