mirror of
https://github.com/python/cpython.git
synced 2025-11-13 07:26:31 +00:00
Issue #14087: multiprocessing: add Condition.wait_for(). Patch by sbt.
This commit is contained in:
parent
a3f4457b17
commit
c8ce715a82
5 changed files with 113 additions and 0 deletions
|
|
@ -897,6 +897,9 @@ object -- see :ref:`multiprocessing-managers`.
|
||||||
If *lock* is specified then it should be a :class:`Lock` or :class:`RLock`
|
If *lock* is specified then it should be a :class:`Lock` or :class:`RLock`
|
||||||
object from :mod:`multiprocessing`.
|
object from :mod:`multiprocessing`.
|
||||||
|
|
||||||
|
.. versionchanged:: 3.3
|
||||||
|
The :meth:`wait_for` method was added.
|
||||||
|
|
||||||
.. class:: Event()
|
.. class:: Event()
|
||||||
|
|
||||||
A clone of :class:`threading.Event`.
|
A clone of :class:`threading.Event`.
|
||||||
|
|
@ -1281,6 +1284,9 @@ their parent process exits. The manager classes are defined in the
|
||||||
If *lock* is supplied then it should be a proxy for a
|
If *lock* is supplied then it should be a proxy for a
|
||||||
:class:`threading.Lock` or :class:`threading.RLock` object.
|
:class:`threading.Lock` or :class:`threading.RLock` object.
|
||||||
|
|
||||||
|
.. versionchanged:: 3.3
|
||||||
|
The :meth:`wait_for` method was added.
|
||||||
|
|
||||||
.. method:: Event()
|
.. method:: Event()
|
||||||
|
|
||||||
Create a shared :class:`threading.Event` object and return a proxy for it.
|
Create a shared :class:`threading.Event` object and return a proxy for it.
|
||||||
|
|
|
||||||
|
|
@ -48,6 +48,7 @@ from traceback import format_exc
|
||||||
from multiprocessing import Process, current_process, active_children, Pool, util, connection
|
from multiprocessing import Process, current_process, active_children, Pool, util, connection
|
||||||
from multiprocessing.process import AuthenticationString
|
from multiprocessing.process import AuthenticationString
|
||||||
from multiprocessing.forking import exit, Popen, ForkingPickler
|
from multiprocessing.forking import exit, Popen, ForkingPickler
|
||||||
|
from time import time as _time
|
||||||
|
|
||||||
#
|
#
|
||||||
# Register some things for pickling
|
# Register some things for pickling
|
||||||
|
|
@ -996,6 +997,24 @@ class ConditionProxy(AcquirerProxy):
|
||||||
return self._callmethod('notify')
|
return self._callmethod('notify')
|
||||||
def notify_all(self):
|
def notify_all(self):
|
||||||
return self._callmethod('notify_all')
|
return self._callmethod('notify_all')
|
||||||
|
def wait_for(self, predicate, timeout=None):
|
||||||
|
result = predicate()
|
||||||
|
if result:
|
||||||
|
return result
|
||||||
|
if timeout is not None:
|
||||||
|
endtime = _time() + timeout
|
||||||
|
else:
|
||||||
|
endtime = None
|
||||||
|
waittime = None
|
||||||
|
while not result:
|
||||||
|
if endtime is not None:
|
||||||
|
waittime = endtime - _time()
|
||||||
|
if waittime <= 0:
|
||||||
|
break
|
||||||
|
self.wait(waittime)
|
||||||
|
result = predicate()
|
||||||
|
return result
|
||||||
|
|
||||||
|
|
||||||
class EventProxy(BaseProxy):
|
class EventProxy(BaseProxy):
|
||||||
_exposed_ = ('is_set', 'set', 'clear', 'wait')
|
_exposed_ = ('is_set', 'set', 'clear', 'wait')
|
||||||
|
|
|
||||||
|
|
@ -43,6 +43,7 @@ import _multiprocessing
|
||||||
from multiprocessing.process import current_process
|
from multiprocessing.process import current_process
|
||||||
from multiprocessing.util import register_after_fork, debug
|
from multiprocessing.util import register_after_fork, debug
|
||||||
from multiprocessing.forking import assert_spawning, Popen
|
from multiprocessing.forking import assert_spawning, Popen
|
||||||
|
from time import time as _time
|
||||||
|
|
||||||
# Try to import the mp.synchronize module cleanly, if it fails
|
# Try to import the mp.synchronize module cleanly, if it fails
|
||||||
# raise ImportError for platforms lacking a working sem_open implementation.
|
# raise ImportError for platforms lacking a working sem_open implementation.
|
||||||
|
|
@ -290,6 +291,24 @@ class Condition(object):
|
||||||
while self._wait_semaphore.acquire(False):
|
while self._wait_semaphore.acquire(False):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
def wait_for(self, predicate, timeout=None):
|
||||||
|
result = predicate()
|
||||||
|
if result:
|
||||||
|
return result
|
||||||
|
if timeout is not None:
|
||||||
|
endtime = _time() + timeout
|
||||||
|
else:
|
||||||
|
endtime = None
|
||||||
|
waittime = None
|
||||||
|
while not result:
|
||||||
|
if endtime is not None:
|
||||||
|
waittime = endtime - _time()
|
||||||
|
if waittime <= 0:
|
||||||
|
break
|
||||||
|
self.wait(waittime)
|
||||||
|
result = predicate()
|
||||||
|
return result
|
||||||
|
|
||||||
#
|
#
|
||||||
# Event
|
# Event
|
||||||
#
|
#
|
||||||
|
|
|
||||||
|
|
@ -887,6 +887,73 @@ class _TestCondition(BaseTestCase):
|
||||||
self.assertEqual(res, False)
|
self.assertEqual(res, False)
|
||||||
self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
|
self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def _test_waitfor_f(cls, cond, state):
|
||||||
|
with cond:
|
||||||
|
state.value = 0
|
||||||
|
cond.notify()
|
||||||
|
result = cond.wait_for(lambda : state.value==4)
|
||||||
|
if not result or state.value != 4:
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
@unittest.skipUnless(HAS_SHAREDCTYPES, 'needs sharedctypes')
|
||||||
|
def test_waitfor(self):
|
||||||
|
# based on test in test/lock_tests.py
|
||||||
|
cond = self.Condition()
|
||||||
|
state = self.Value('i', -1)
|
||||||
|
|
||||||
|
p = self.Process(target=self._test_waitfor_f, args=(cond, state))
|
||||||
|
p.daemon = True
|
||||||
|
p.start()
|
||||||
|
|
||||||
|
with cond:
|
||||||
|
result = cond.wait_for(lambda : state.value==0)
|
||||||
|
self.assertTrue(result)
|
||||||
|
self.assertEqual(state.value, 0)
|
||||||
|
|
||||||
|
for i in range(4):
|
||||||
|
time.sleep(0.01)
|
||||||
|
with cond:
|
||||||
|
state.value += 1
|
||||||
|
cond.notify()
|
||||||
|
|
||||||
|
p.join(5)
|
||||||
|
self.assertFalse(p.is_alive())
|
||||||
|
self.assertEqual(p.exitcode, 0)
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def _test_waitfor_timeout_f(cls, cond, state, success):
|
||||||
|
with cond:
|
||||||
|
expected = 0.1
|
||||||
|
dt = time.time()
|
||||||
|
result = cond.wait_for(lambda : state.value==4, timeout=expected)
|
||||||
|
dt = time.time() - dt
|
||||||
|
# borrow logic in assertTimeout() from test/lock_tests.py
|
||||||
|
if not result and expected * 0.6 < dt < expected * 10.0:
|
||||||
|
success.value = True
|
||||||
|
|
||||||
|
@unittest.skipUnless(HAS_SHAREDCTYPES, 'needs sharedctypes')
|
||||||
|
def test_waitfor_timeout(self):
|
||||||
|
# based on test in test/lock_tests.py
|
||||||
|
cond = self.Condition()
|
||||||
|
state = self.Value('i', 0)
|
||||||
|
success = self.Value('i', False)
|
||||||
|
|
||||||
|
p = self.Process(target=self._test_waitfor_timeout_f,
|
||||||
|
args=(cond, state, success))
|
||||||
|
p.daemon = True
|
||||||
|
p.start()
|
||||||
|
|
||||||
|
# Only increment 3 times, so state == 4 is never reached.
|
||||||
|
for i in range(3):
|
||||||
|
time.sleep(0.01)
|
||||||
|
with cond:
|
||||||
|
state.value += 1
|
||||||
|
cond.notify()
|
||||||
|
|
||||||
|
p.join(5)
|
||||||
|
self.assertTrue(success.value)
|
||||||
|
|
||||||
|
|
||||||
class _TestEvent(BaseTestCase):
|
class _TestEvent(BaseTestCase):
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -39,6 +39,8 @@ Core and Builtins
|
||||||
Library
|
Library
|
||||||
-------
|
-------
|
||||||
|
|
||||||
|
- Issue #14087: multiprocessing: add Condition.wait_for(). Patch by sbt.
|
||||||
|
|
||||||
- Issue #14452: SysLogHandler no longer inserts a UTF-8 BOM into the message.
|
- Issue #14452: SysLogHandler no longer inserts a UTF-8 BOM into the message.
|
||||||
|
|
||||||
- Issue #14386: Expose the dict_proxy internal type as types.MappingProxyType.
|
- Issue #14386: Expose the dict_proxy internal type as types.MappingProxyType.
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue