Issue 10260

Adding the wait_for() method to threading.Condition
This commit is contained in:
Kristján Valur Jónsson 2010-11-18 12:46:39 +00:00
parent bcc4810002
commit 6331520950
3 changed files with 108 additions and 7 deletions

View file

@ -446,6 +446,46 @@ class ConditionTests(BaseTestCase):
# In practice, this implementation has no spurious wakeups.
self.assertFalse(result)
def test_waitfor(self):
cond = self.condtype()
state = 0
def f():
with cond:
result = cond.wait_for(lambda : state==4)
self.assertTrue(result)
self.assertEqual(state, 4)
b = Bunch(f, 1)
b.wait_for_started()
for i in range(5):
time.sleep(0.01)
with cond:
state += 1
cond.notify()
b.wait_for_finished()
def test_waitfor_timeout(self):
cond = self.condtype()
state = 0
success = []
def f():
with cond:
dt = time.time()
result = cond.wait_for(lambda : state==4, timeout=0.1)
dt = time.time() - dt
self.assertFalse(result)
self.assertTimeout(dt, 0.1)
success.append(None)
b = Bunch(f, 1)
b.wait_for_started()
# Only increment 3 times, so state == 4 is never reached.
for i in range(3):
time.sleep(0.01)
with cond:
state += 1
cond.notify()
b.wait_for_finished()
self.assertEqual(len(success), 1)
class BaseSemaphoreTests(BaseTestCase):
"""