mirror of
https://github.com/python/cpython.git
synced 2025-07-08 03:45:36 +00:00

svn+ssh://pythondev@svn.python.org/python/branches/p3yk ................ r56760 | neal.norwitz | 2007-08-05 18:55:39 -0700 (Sun, 05 Aug 2007) | 178 lines Merged revisions 56477-56759 via svnmerge from svn+ssh://pythondev@svn.python.org/python/trunk ........ r56485 | facundo.batista | 2007-07-21 17:13:00 -0700 (Sat, 21 Jul 2007) | 5 lines Selectively enable tests for asyncore.readwrite based on the presence of poll support in the select module (since this is the only case in which readwrite can be called). [GSoC - Alan McIntyre] ........ r56488 | nick.coghlan | 2007-07-22 03:18:07 -0700 (Sun, 22 Jul 2007) | 1 line Add explicit relative import tests for runpy.run_module ........ r56509 | nick.coghlan | 2007-07-23 06:41:45 -0700 (Mon, 23 Jul 2007) | 5 lines Correctly cleanup sys.modules after executing runpy relative import tests Restore Python 2.4 ImportError when attempting to execute a package (as imports cannot be guaranteed to work properly if you try it) ........ r56519 | nick.coghlan | 2007-07-24 06:07:38 -0700 (Tue, 24 Jul 2007) | 1 line Tweak runpy test to do a better job of confirming that sys has been manipulated correctly ........ r56520 | nick.coghlan | 2007-07-24 06:58:28 -0700 (Tue, 24 Jul 2007) | 1 line Fix an incompatibility between the -i and -m command line switches as reported on python-dev by PJE - runpy.run_module now leaves any changes it makes to the sys module intact after the function terminates ........ r56523 | nick.coghlan | 2007-07-24 07:39:23 -0700 (Tue, 24 Jul 2007) | 1 line Try to get rid of spurious failure in test_resource on the Debian buildbots by changing the file size limit before attempting to close the file ........ r56533 | facundo.batista | 2007-07-24 14:20:42 -0700 (Tue, 24 Jul 2007) | 7 lines New tests for basic behavior of smtplib.SMTP and smtpd.DebuggingServer. Change to use global host & port number variables. Modified the 'server' to take a string to send back in order to vary test server responses. Added a test for the reaction of smtplib.SMTP to a non-200 HELO response. [GSoC - Alan McIntyre] ........ r56538 | nick.coghlan | 2007-07-25 05:57:48 -0700 (Wed, 25 Jul 2007) | 1 line More buildbot cleanup - let the OS assign the port for test_urllib2_localnet ........ r56539 | nick.coghlan | 2007-07-25 06:18:58 -0700 (Wed, 25 Jul 2007) | 1 line Add a temporary diagnostic message before a strange failure on the alpha Debian buildbot ........ r56543 | martin.v.loewis | 2007-07-25 09:24:23 -0700 (Wed, 25 Jul 2007) | 2 lines Change location of the package index to pypi.python.org/pypi ........ r56551 | georg.brandl | 2007-07-26 02:36:25 -0700 (Thu, 26 Jul 2007) | 2 lines tabs, newlines and crs are valid XML characters. ........ r56553 | nick.coghlan | 2007-07-26 07:03:00 -0700 (Thu, 26 Jul 2007) | 1 line Add explicit test for a misbehaving math.floor ........ r56561 | mark.hammond | 2007-07-26 21:52:32 -0700 (Thu, 26 Jul 2007) | 3 lines In consultation with Kristjan Jonsson, only define WINVER and _WINNT_WIN32 if (a) we are building Python itself and (b) no one previously defined them ........ r56562 | mark.hammond | 2007-07-26 22:08:54 -0700 (Thu, 26 Jul 2007) | 2 lines Correctly detect AMD64 architecture on VC2003 ........ r56566 | nick.coghlan | 2007-07-27 03:36:30 -0700 (Fri, 27 Jul 2007) | 1 line Make test_math error messages more meaningful for small discrepancies in results ........ r56588 | martin.v.loewis | 2007-07-27 11:28:22 -0700 (Fri, 27 Jul 2007) | 2 lines Bug #978833: Close https sockets by releasing the _ssl object. ........ r56601 | martin.v.loewis | 2007-07-28 00:03:05 -0700 (Sat, 28 Jul 2007) | 3 lines Bug #1704793: Return UTF-16 pair if unicodedata.lookup cannot represent the result in a single character. ........ r56604 | facundo.batista | 2007-07-28 07:21:22 -0700 (Sat, 28 Jul 2007) | 9 lines Moved all of the capture_server socket setup code into the try block so that the event gets set if a failure occurs during server setup (otherwise the test will block forever). Changed to let the OS assign the server port number, and client side of test waits for port number assignment before proceeding. The test data in DispatcherWithSendTests is also sent in multiple send() calls instead of one to make sure this works properly. [GSoC - Alan McIntyre] ........ r56611 | georg.brandl | 2007-07-29 01:26:10 -0700 (Sun, 29 Jul 2007) | 2 lines Clarify PEP 343 description. ........ r56614 | georg.brandl | 2007-07-29 02:11:15 -0700 (Sun, 29 Jul 2007) | 2 lines try-except-finally is new in 2.5. ........ r56617 | facundo.batista | 2007-07-29 07:23:08 -0700 (Sun, 29 Jul 2007) | 9 lines Added tests for asynchat classes simple_producer & fifo, and the find_prefix_at_end function. Check behavior of a string given as a producer. Added tests for behavior of asynchat.async_chat when given int, long, and None terminator arguments. Added usepoll attribute to TestAsynchat to allow running the asynchat tests with poll support chosen whether it's available or not (improves coverage of asyncore code). [GSoC - Alan McIntyre] ........ r56620 | georg.brandl | 2007-07-29 10:38:35 -0700 (Sun, 29 Jul 2007) | 2 lines Bug #1763149: use proper slice syntax in docstring. (backport) ........ r56624 | mark.hammond | 2007-07-29 17:45:29 -0700 (Sun, 29 Jul 2007) | 4 lines Correct use of Py_BUILD_CORE - now make sure it is defined before it is referenced, and also fix definition of _WIN32_WINNT. Resolves patch 1761803. ........ r56632 | facundo.batista | 2007-07-30 20:03:34 -0700 (Mon, 30 Jul 2007) | 8 lines When running asynchat tests on OS X (darwin), the test client now overrides asyncore.dispatcher.handle_expt to do nothing, since select.poll gives a POLLHUP error at the completion of these tests. Added timeout & count arguments to several asyncore.loop calls to avoid the possibility of a test hanging up a build. [GSoC - Alan McIntyre] ........ r56633 | nick.coghlan | 2007-07-31 06:38:01 -0700 (Tue, 31 Jul 2007) | 1 line Eliminate RLock race condition reported in SF bug #1764059 ........ r56636 | martin.v.loewis | 2007-07-31 12:57:56 -0700 (Tue, 31 Jul 2007) | 2 lines Define _BSD_SOURCE, to get access to POSIX extensions on OpenBSD 4.1+. ........ r56653 | facundo.batista | 2007-08-01 16:18:36 -0700 (Wed, 01 Aug 2007) | 9 lines Allow the OS to select a free port for each test server. For DebuggingServerTests, construct SMTP objects with a localhost argument to avoid abysmally long FQDN lookups (not relevant to items under test) on some machines that would cause the test to fail. Moved server setup code in the server function inside the try block to avoid the possibility of setup failure hanging the test. Minor edits to conform to PEP 8. [GSoC - Alan McIntyre] ........ r56681 | matthias.klose | 2007-08-02 14:33:13 -0700 (Thu, 02 Aug 2007) | 2 lines - Allow Emacs 22 for building the documentation in info format. ........ r56689 | neal.norwitz | 2007-08-02 23:46:29 -0700 (Thu, 02 Aug 2007) | 1 line Py_ssize_t is defined regardless of HAVE_LONG_LONG. Will backport ........ r56727 | hyeshik.chang | 2007-08-03 21:10:18 -0700 (Fri, 03 Aug 2007) | 3 lines Fix gb18030 codec's bug that doesn't map two-byte characters on GB18030 extension in encoding. (bug reported by Bjorn Stabell) ........ r56751 | neal.norwitz | 2007-08-04 20:23:31 -0700 (Sat, 04 Aug 2007) | 7 lines Handle errors when generating a warning. The value is always written to the returned pointer if getting it was successful, even if a warning causes an error. (This probably doesn't matter as the caller will probably discard the value.) Will backport. ........ ................
827 lines
26 KiB
Python
827 lines
26 KiB
Python
"""Thread module emulating a subset of Java's threading model."""
|
|
|
|
import sys as _sys
|
|
|
|
try:
|
|
import thread
|
|
except ImportError:
|
|
del _sys.modules[__name__]
|
|
raise
|
|
|
|
from time import time as _time, sleep as _sleep
|
|
from traceback import format_exc as _format_exc
|
|
from collections import deque
|
|
|
|
# Rename some stuff so "from threading import *" is safe
|
|
__all__ = ['activeCount', 'Condition', 'currentThread', 'enumerate', 'Event',
|
|
'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', 'Thread',
|
|
'Timer', 'setprofile', 'settrace', 'local', 'stack_size']
|
|
|
|
_start_new_thread = thread.start_new_thread
|
|
_allocate_lock = thread.allocate_lock
|
|
_get_ident = thread.get_ident
|
|
ThreadError = thread.error
|
|
del thread
|
|
|
|
|
|
# Debug support (adapted from ihooks.py).
|
|
# All the major classes here derive from _Verbose. We force that to
|
|
# be a new-style class so that all the major classes here are new-style.
|
|
# This helps debugging (type(instance) is more revealing for instances
|
|
# of new-style classes).
|
|
|
|
_VERBOSE = False
|
|
|
|
if __debug__:
|
|
|
|
class _Verbose(object):
|
|
|
|
def __init__(self, verbose=None):
|
|
if verbose is None:
|
|
verbose = _VERBOSE
|
|
self.__verbose = verbose
|
|
|
|
def _note(self, format, *args):
|
|
if self.__verbose:
|
|
format = format % args
|
|
format = "%s: %s\n" % (
|
|
currentThread().getName(), format)
|
|
_sys.stderr.write(format)
|
|
|
|
else:
|
|
# Disable this when using "python -O"
|
|
class _Verbose(object):
|
|
def __init__(self, verbose=None):
|
|
pass
|
|
def _note(self, *args):
|
|
pass
|
|
|
|
# Support for profile and trace hooks
|
|
|
|
_profile_hook = None
|
|
_trace_hook = None
|
|
|
|
def setprofile(func):
|
|
global _profile_hook
|
|
_profile_hook = func
|
|
|
|
def settrace(func):
|
|
global _trace_hook
|
|
_trace_hook = func
|
|
|
|
# Synchronization classes
|
|
|
|
Lock = _allocate_lock
|
|
|
|
def RLock(*args, **kwargs):
|
|
return _RLock(*args, **kwargs)
|
|
|
|
class _RLock(_Verbose):
|
|
|
|
def __init__(self, verbose=None):
|
|
_Verbose.__init__(self, verbose)
|
|
self.__block = _allocate_lock()
|
|
self.__owner = None
|
|
self.__count = 0
|
|
|
|
def __repr__(self):
|
|
owner = self.__owner
|
|
return "<%s(%s, %d)>" % (
|
|
self.__class__.__name__,
|
|
owner and owner.getName(),
|
|
self.__count)
|
|
|
|
def acquire(self, blocking=1):
|
|
me = currentThread()
|
|
if self.__owner is me:
|
|
self.__count = self.__count + 1
|
|
if __debug__:
|
|
self._note("%s.acquire(%s): recursive success", self, blocking)
|
|
return 1
|
|
rc = self.__block.acquire(blocking)
|
|
if rc:
|
|
self.__owner = me
|
|
self.__count = 1
|
|
if __debug__:
|
|
self._note("%s.acquire(%s): initial success", self, blocking)
|
|
else:
|
|
if __debug__:
|
|
self._note("%s.acquire(%s): failure", self, blocking)
|
|
return rc
|
|
|
|
__enter__ = acquire
|
|
|
|
def release(self):
|
|
if self.__owner is not currentThread():
|
|
raise RuntimeError("cannot release un-aquired lock")
|
|
self.__count = count = self.__count - 1
|
|
if not count:
|
|
self.__owner = None
|
|
self.__block.release()
|
|
if __debug__:
|
|
self._note("%s.release(): final release", self)
|
|
else:
|
|
if __debug__:
|
|
self._note("%s.release(): non-final release", self)
|
|
|
|
def __exit__(self, t, v, tb):
|
|
self.release()
|
|
|
|
# Internal methods used by condition variables
|
|
|
|
def _acquire_restore(self, state):
|
|
self.__block.acquire()
|
|
self.__count, self.__owner = state
|
|
if __debug__:
|
|
self._note("%s._acquire_restore()", self)
|
|
|
|
def _release_save(self):
|
|
if __debug__:
|
|
self._note("%s._release_save()", self)
|
|
count = self.__count
|
|
self.__count = 0
|
|
owner = self.__owner
|
|
self.__owner = None
|
|
self.__block.release()
|
|
return (count, owner)
|
|
|
|
def _is_owned(self):
|
|
return self.__owner is currentThread()
|
|
|
|
|
|
def Condition(*args, **kwargs):
|
|
return _Condition(*args, **kwargs)
|
|
|
|
class _Condition(_Verbose):
|
|
|
|
def __init__(self, lock=None, verbose=None):
|
|
_Verbose.__init__(self, verbose)
|
|
if lock is None:
|
|
lock = RLock()
|
|
self.__lock = lock
|
|
# Export the lock's acquire() and release() methods
|
|
self.acquire = lock.acquire
|
|
self.release = lock.release
|
|
# If the lock defines _release_save() and/or _acquire_restore(),
|
|
# these override the default implementations (which just call
|
|
# release() and acquire() on the lock). Ditto for _is_owned().
|
|
try:
|
|
self._release_save = lock._release_save
|
|
except AttributeError:
|
|
pass
|
|
try:
|
|
self._acquire_restore = lock._acquire_restore
|
|
except AttributeError:
|
|
pass
|
|
try:
|
|
self._is_owned = lock._is_owned
|
|
except AttributeError:
|
|
pass
|
|
self.__waiters = []
|
|
|
|
def __enter__(self):
|
|
return self.__lock.__enter__()
|
|
|
|
def __exit__(self, *args):
|
|
return self.__lock.__exit__(*args)
|
|
|
|
def __repr__(self):
|
|
return "<Condition(%s, %d)>" % (self.__lock, len(self.__waiters))
|
|
|
|
def _release_save(self):
|
|
self.__lock.release() # No state to save
|
|
|
|
def _acquire_restore(self, x):
|
|
self.__lock.acquire() # Ignore saved state
|
|
|
|
def _is_owned(self):
|
|
# Return True if lock is owned by currentThread.
|
|
# This method is called only if __lock doesn't have _is_owned().
|
|
if self.__lock.acquire(0):
|
|
self.__lock.release()
|
|
return False
|
|
else:
|
|
return True
|
|
|
|
def wait(self, timeout=None):
|
|
if not self._is_owned():
|
|
raise RuntimeError("cannot wait on un-aquired lock")
|
|
waiter = _allocate_lock()
|
|
waiter.acquire()
|
|
self.__waiters.append(waiter)
|
|
saved_state = self._release_save()
|
|
try: # restore state no matter what (e.g., KeyboardInterrupt)
|
|
if timeout is None:
|
|
waiter.acquire()
|
|
if __debug__:
|
|
self._note("%s.wait(): got it", self)
|
|
else:
|
|
# Balancing act: We can't afford a pure busy loop, so we
|
|
# have to sleep; but if we sleep the whole timeout time,
|
|
# we'll be unresponsive. The scheme here sleeps very
|
|
# little at first, longer as time goes on, but never longer
|
|
# than 20 times per second (or the timeout time remaining).
|
|
endtime = _time() + timeout
|
|
delay = 0.0005 # 500 us -> initial delay of 1 ms
|
|
while True:
|
|
gotit = waiter.acquire(0)
|
|
if gotit:
|
|
break
|
|
remaining = endtime - _time()
|
|
if remaining <= 0:
|
|
break
|
|
delay = min(delay * 2, remaining, .05)
|
|
_sleep(delay)
|
|
if not gotit:
|
|
if __debug__:
|
|
self._note("%s.wait(%s): timed out", self, timeout)
|
|
try:
|
|
self.__waiters.remove(waiter)
|
|
except ValueError:
|
|
pass
|
|
else:
|
|
if __debug__:
|
|
self._note("%s.wait(%s): got it", self, timeout)
|
|
finally:
|
|
self._acquire_restore(saved_state)
|
|
|
|
def notify(self, n=1):
|
|
if not self._is_owned():
|
|
raise RuntimeError("cannot notify on un-aquired lock")
|
|
__waiters = self.__waiters
|
|
waiters = __waiters[:n]
|
|
if not waiters:
|
|
if __debug__:
|
|
self._note("%s.notify(): no waiters", self)
|
|
return
|
|
self._note("%s.notify(): notifying %d waiter%s", self, n,
|
|
n!=1 and "s" or "")
|
|
for waiter in waiters:
|
|
waiter.release()
|
|
try:
|
|
__waiters.remove(waiter)
|
|
except ValueError:
|
|
pass
|
|
|
|
def notifyAll(self):
|
|
self.notify(len(self.__waiters))
|
|
|
|
|
|
def Semaphore(*args, **kwargs):
|
|
return _Semaphore(*args, **kwargs)
|
|
|
|
class _Semaphore(_Verbose):
|
|
|
|
# After Tim Peters' semaphore class, but not quite the same (no maximum)
|
|
|
|
def __init__(self, value=1, verbose=None):
|
|
if value < 0:
|
|
raise ValueError("semaphore initial value must be >= 0")
|
|
_Verbose.__init__(self, verbose)
|
|
self.__cond = Condition(Lock())
|
|
self.__value = value
|
|
|
|
def acquire(self, blocking=1):
|
|
rc = False
|
|
self.__cond.acquire()
|
|
while self.__value == 0:
|
|
if not blocking:
|
|
break
|
|
if __debug__:
|
|
self._note("%s.acquire(%s): blocked waiting, value=%s",
|
|
self, blocking, self.__value)
|
|
self.__cond.wait()
|
|
else:
|
|
self.__value = self.__value - 1
|
|
if __debug__:
|
|
self._note("%s.acquire: success, value=%s",
|
|
self, self.__value)
|
|
rc = True
|
|
self.__cond.release()
|
|
return rc
|
|
|
|
__enter__ = acquire
|
|
|
|
def release(self):
|
|
self.__cond.acquire()
|
|
self.__value = self.__value + 1
|
|
if __debug__:
|
|
self._note("%s.release: success, value=%s",
|
|
self, self.__value)
|
|
self.__cond.notify()
|
|
self.__cond.release()
|
|
|
|
def __exit__(self, t, v, tb):
|
|
self.release()
|
|
|
|
|
|
def BoundedSemaphore(*args, **kwargs):
|
|
return _BoundedSemaphore(*args, **kwargs)
|
|
|
|
class _BoundedSemaphore(_Semaphore):
|
|
"""Semaphore that checks that # releases is <= # acquires"""
|
|
def __init__(self, value=1, verbose=None):
|
|
_Semaphore.__init__(self, value, verbose)
|
|
self._initial_value = value
|
|
|
|
def release(self):
|
|
if self._Semaphore__value >= self._initial_value:
|
|
raise ValueError, "Semaphore released too many times"
|
|
return _Semaphore.release(self)
|
|
|
|
|
|
def Event(*args, **kwargs):
|
|
return _Event(*args, **kwargs)
|
|
|
|
class _Event(_Verbose):
|
|
|
|
# After Tim Peters' event class (without is_posted())
|
|
|
|
def __init__(self, verbose=None):
|
|
_Verbose.__init__(self, verbose)
|
|
self.__cond = Condition(Lock())
|
|
self.__flag = False
|
|
|
|
def isSet(self):
|
|
return self.__flag
|
|
|
|
def set(self):
|
|
self.__cond.acquire()
|
|
try:
|
|
self.__flag = True
|
|
self.__cond.notifyAll()
|
|
finally:
|
|
self.__cond.release()
|
|
|
|
def clear(self):
|
|
self.__cond.acquire()
|
|
try:
|
|
self.__flag = False
|
|
finally:
|
|
self.__cond.release()
|
|
|
|
def wait(self, timeout=None):
|
|
self.__cond.acquire()
|
|
try:
|
|
if not self.__flag:
|
|
self.__cond.wait(timeout)
|
|
finally:
|
|
self.__cond.release()
|
|
|
|
# Helper to generate new thread names
|
|
_counter = 0
|
|
def _newname(template="Thread-%d"):
|
|
global _counter
|
|
_counter = _counter + 1
|
|
return template % _counter
|
|
|
|
# Active thread administration
|
|
_active_limbo_lock = _allocate_lock()
|
|
_active = {} # maps thread id to Thread object
|
|
_limbo = {}
|
|
|
|
|
|
# Main class for threads
|
|
|
|
class Thread(_Verbose):
|
|
|
|
__initialized = False
|
|
# Need to store a reference to sys.exc_info for printing
|
|
# out exceptions when a thread tries to use a global var. during interp.
|
|
# shutdown and thus raises an exception about trying to perform some
|
|
# operation on/with a NoneType
|
|
__exc_info = _sys.exc_info
|
|
|
|
def __init__(self, group=None, target=None, name=None,
|
|
args=(), kwargs=None, verbose=None):
|
|
assert group is None, "group argument must be None for now"
|
|
_Verbose.__init__(self, verbose)
|
|
if kwargs is None:
|
|
kwargs = {}
|
|
self.__target = target
|
|
self.__name = str(name or _newname())
|
|
self.__args = args
|
|
self.__kwargs = kwargs
|
|
self.__daemonic = self._set_daemon()
|
|
self.__started = False
|
|
self.__stopped = False
|
|
self.__block = Condition(Lock())
|
|
self.__initialized = True
|
|
# sys.stderr is not stored in the class like
|
|
# sys.exc_info since it can be changed between instances
|
|
self.__stderr = _sys.stderr
|
|
|
|
def _set_daemon(self):
|
|
# Overridden in _MainThread and _DummyThread
|
|
return currentThread().isDaemon()
|
|
|
|
def __repr__(self):
|
|
assert self.__initialized, "Thread.__init__() was not called"
|
|
status = "initial"
|
|
if self.__started:
|
|
status = "started"
|
|
if self.__stopped:
|
|
status = "stopped"
|
|
if self.__daemonic:
|
|
status = status + " daemon"
|
|
return "<%s(%s, %s)>" % (self.__class__.__name__, self.__name, status)
|
|
|
|
def start(self):
|
|
if not self.__initialized:
|
|
raise RuntimeError("thread.__init__() not called")
|
|
if self.__started:
|
|
raise RuntimeError("thread already started")
|
|
if __debug__:
|
|
self._note("%s.start(): starting thread", self)
|
|
_active_limbo_lock.acquire()
|
|
_limbo[self] = self
|
|
_active_limbo_lock.release()
|
|
_start_new_thread(self.__bootstrap, ())
|
|
self.__started = True
|
|
_sleep(0.000001) # 1 usec, to let the thread run (Solaris hack)
|
|
|
|
def run(self):
|
|
if self.__target:
|
|
self.__target(*self.__args, **self.__kwargs)
|
|
|
|
def __bootstrap(self):
|
|
try:
|
|
self.__started = True
|
|
_active_limbo_lock.acquire()
|
|
_active[_get_ident()] = self
|
|
del _limbo[self]
|
|
_active_limbo_lock.release()
|
|
if __debug__:
|
|
self._note("%s.__bootstrap(): thread started", self)
|
|
|
|
if _trace_hook:
|
|
self._note("%s.__bootstrap(): registering trace hook", self)
|
|
_sys.settrace(_trace_hook)
|
|
if _profile_hook:
|
|
self._note("%s.__bootstrap(): registering profile hook", self)
|
|
_sys.setprofile(_profile_hook)
|
|
|
|
try:
|
|
self.run()
|
|
except SystemExit:
|
|
if __debug__:
|
|
self._note("%s.__bootstrap(): raised SystemExit", self)
|
|
except:
|
|
if __debug__:
|
|
self._note("%s.__bootstrap(): unhandled exception", self)
|
|
# If sys.stderr is no more (most likely from interpreter
|
|
# shutdown) use self.__stderr. Otherwise still use sys (as in
|
|
# _sys) in case sys.stderr was redefined since the creation of
|
|
# self.
|
|
if _sys:
|
|
_sys.stderr.write("Exception in thread %s:\n%s\n" %
|
|
(self.getName(), _format_exc()))
|
|
else:
|
|
# Do the best job possible w/o a huge amt. of code to
|
|
# approximate a traceback (code ideas from
|
|
# Lib/traceback.py)
|
|
exc_type, exc_value, exc_tb = self.__exc_info()
|
|
try:
|
|
print((
|
|
"Exception in thread " + self.getName() +
|
|
" (most likely raised during interpreter shutdown):"), file=self.__stderr)
|
|
print((
|
|
"Traceback (most recent call last):"), file=self.__stderr)
|
|
while exc_tb:
|
|
print((
|
|
' File "%s", line %s, in %s' %
|
|
(exc_tb.tb_frame.f_code.co_filename,
|
|
exc_tb.tb_lineno,
|
|
exc_tb.tb_frame.f_code.co_name)), file=self.__stderr)
|
|
exc_tb = exc_tb.tb_next
|
|
print(("%s: %s" % (exc_type, exc_value)), file=self.__stderr)
|
|
# Make sure that exc_tb gets deleted since it is a memory
|
|
# hog; deleting everything else is just for thoroughness
|
|
finally:
|
|
del exc_type, exc_value, exc_tb
|
|
else:
|
|
if __debug__:
|
|
self._note("%s.__bootstrap(): normal return", self)
|
|
finally:
|
|
self.__stop()
|
|
try:
|
|
self.__delete()
|
|
except:
|
|
pass
|
|
|
|
def __stop(self):
|
|
self.__block.acquire()
|
|
self.__stopped = True
|
|
self.__block.notifyAll()
|
|
self.__block.release()
|
|
|
|
def __delete(self):
|
|
"Remove current thread from the dict of currently running threads."
|
|
|
|
# Notes about running with dummy_thread:
|
|
#
|
|
# Must take care to not raise an exception if dummy_thread is being
|
|
# used (and thus this module is being used as an instance of
|
|
# dummy_threading). dummy_thread.get_ident() always returns -1 since
|
|
# there is only one thread if dummy_thread is being used. Thus
|
|
# len(_active) is always <= 1 here, and any Thread instance created
|
|
# overwrites the (if any) thread currently registered in _active.
|
|
#
|
|
# An instance of _MainThread is always created by 'threading'. This
|
|
# gets overwritten the instant an instance of Thread is created; both
|
|
# threads return -1 from dummy_thread.get_ident() and thus have the
|
|
# same key in the dict. So when the _MainThread instance created by
|
|
# 'threading' tries to clean itself up when atexit calls this method
|
|
# it gets a KeyError if another Thread instance was created.
|
|
#
|
|
# This all means that KeyError from trying to delete something from
|
|
# _active if dummy_threading is being used is a red herring. But
|
|
# since it isn't if dummy_threading is *not* being used then don't
|
|
# hide the exception.
|
|
|
|
_active_limbo_lock.acquire()
|
|
try:
|
|
try:
|
|
del _active[_get_ident()]
|
|
except KeyError:
|
|
if 'dummy_threading' not in _sys.modules:
|
|
raise
|
|
finally:
|
|
_active_limbo_lock.release()
|
|
|
|
def join(self, timeout=None):
|
|
if not self.__initialized:
|
|
raise RuntimeError("Thread.__init__() not called")
|
|
if not self.__started:
|
|
raise RuntimeError("cannot join thread before it is started")
|
|
if self is currentThread():
|
|
raise RuntimeError("cannot join current thread")
|
|
|
|
if __debug__:
|
|
if not self.__stopped:
|
|
self._note("%s.join(): waiting until thread stops", self)
|
|
self.__block.acquire()
|
|
try:
|
|
if timeout is None:
|
|
while not self.__stopped:
|
|
self.__block.wait()
|
|
if __debug__:
|
|
self._note("%s.join(): thread stopped", self)
|
|
else:
|
|
deadline = _time() + timeout
|
|
while not self.__stopped:
|
|
delay = deadline - _time()
|
|
if delay <= 0:
|
|
if __debug__:
|
|
self._note("%s.join(): timed out", self)
|
|
break
|
|
self.__block.wait(delay)
|
|
else:
|
|
if __debug__:
|
|
self._note("%s.join(): thread stopped", self)
|
|
finally:
|
|
self.__block.release()
|
|
|
|
def getName(self):
|
|
assert self.__initialized, "Thread.__init__() not called"
|
|
return self.__name
|
|
|
|
def setName(self, name):
|
|
assert self.__initialized, "Thread.__init__() not called"
|
|
self.__name = str(name)
|
|
|
|
def isAlive(self):
|
|
assert self.__initialized, "Thread.__init__() not called"
|
|
return self.__started and not self.__stopped
|
|
|
|
def isDaemon(self):
|
|
assert self.__initialized, "Thread.__init__() not called"
|
|
return self.__daemonic
|
|
|
|
def setDaemon(self, daemonic):
|
|
if not self.__initialized:
|
|
raise RuntimeError("Thread.__init__() not called")
|
|
if self.__started:
|
|
raise RuntimeError("cannot set daemon status of active thread");
|
|
self.__daemonic = daemonic
|
|
|
|
# The timer class was contributed by Itamar Shtull-Trauring
|
|
|
|
def Timer(*args, **kwargs):
|
|
return _Timer(*args, **kwargs)
|
|
|
|
class _Timer(Thread):
|
|
"""Call a function after a specified number of seconds:
|
|
|
|
t = Timer(30.0, f, args=[], kwargs={})
|
|
t.start()
|
|
t.cancel() # stop the timer's action if it's still waiting
|
|
"""
|
|
|
|
def __init__(self, interval, function, args=[], kwargs={}):
|
|
Thread.__init__(self)
|
|
self.interval = interval
|
|
self.function = function
|
|
self.args = args
|
|
self.kwargs = kwargs
|
|
self.finished = Event()
|
|
|
|
def cancel(self):
|
|
"""Stop the timer if it hasn't finished yet"""
|
|
self.finished.set()
|
|
|
|
def run(self):
|
|
self.finished.wait(self.interval)
|
|
if not self.finished.isSet():
|
|
self.function(*self.args, **self.kwargs)
|
|
self.finished.set()
|
|
|
|
# Special thread class to represent the main thread
|
|
# This is garbage collected through an exit handler
|
|
|
|
class _MainThread(Thread):
|
|
|
|
def __init__(self):
|
|
Thread.__init__(self, name="MainThread")
|
|
self._Thread__started = True
|
|
_active_limbo_lock.acquire()
|
|
_active[_get_ident()] = self
|
|
_active_limbo_lock.release()
|
|
|
|
def _set_daemon(self):
|
|
return False
|
|
|
|
def _exitfunc(self):
|
|
self._Thread__stop()
|
|
t = _pickSomeNonDaemonThread()
|
|
if t:
|
|
if __debug__:
|
|
self._note("%s: waiting for other threads", self)
|
|
while t:
|
|
t.join()
|
|
t = _pickSomeNonDaemonThread()
|
|
if __debug__:
|
|
self._note("%s: exiting", self)
|
|
self._Thread__delete()
|
|
|
|
def _pickSomeNonDaemonThread():
|
|
for t in enumerate():
|
|
if not t.isDaemon() and t.isAlive():
|
|
return t
|
|
return None
|
|
|
|
|
|
# Dummy thread class to represent threads not started here.
|
|
# These aren't garbage collected when they die, nor can they be waited for.
|
|
# If they invoke anything in threading.py that calls currentThread(), they
|
|
# leave an entry in the _active dict forever after.
|
|
# Their purpose is to return *something* from currentThread().
|
|
# They are marked as daemon threads so we won't wait for them
|
|
# when we exit (conform previous semantics).
|
|
|
|
class _DummyThread(Thread):
|
|
|
|
def __init__(self):
|
|
Thread.__init__(self, name=_newname("Dummy-%d"))
|
|
|
|
# Thread.__block consumes an OS-level locking primitive, which
|
|
# can never be used by a _DummyThread. Since a _DummyThread
|
|
# instance is immortal, that's bad, so release this resource.
|
|
del self._Thread__block
|
|
|
|
self._Thread__started = True
|
|
_active_limbo_lock.acquire()
|
|
_active[_get_ident()] = self
|
|
_active_limbo_lock.release()
|
|
|
|
def _set_daemon(self):
|
|
return True
|
|
|
|
def join(self, timeout=None):
|
|
assert False, "cannot join a dummy thread"
|
|
|
|
|
|
# Global API functions
|
|
|
|
def currentThread():
|
|
try:
|
|
return _active[_get_ident()]
|
|
except KeyError:
|
|
##print "currentThread(): no current thread for", _get_ident()
|
|
return _DummyThread()
|
|
|
|
def activeCount():
|
|
_active_limbo_lock.acquire()
|
|
count = len(_active) + len(_limbo)
|
|
_active_limbo_lock.release()
|
|
return count
|
|
|
|
def enumerate():
|
|
_active_limbo_lock.acquire()
|
|
active = list(_active.values()) + list(_limbo.values())
|
|
_active_limbo_lock.release()
|
|
return active
|
|
|
|
from thread import stack_size
|
|
|
|
# Create the main thread object,
|
|
# and make it available for the interpreter
|
|
# (Py_Main) as threading._shutdown.
|
|
|
|
_shutdown = _MainThread()._exitfunc
|
|
|
|
# get thread-local implementation, either from the thread
|
|
# module, or from the python fallback
|
|
|
|
try:
|
|
from thread import _local as local
|
|
except ImportError:
|
|
from _threading_local import local
|
|
|
|
|
|
# Self-test code
|
|
|
|
def _test():
|
|
|
|
class BoundedQueue(_Verbose):
|
|
|
|
def __init__(self, limit):
|
|
_Verbose.__init__(self)
|
|
self.mon = RLock()
|
|
self.rc = Condition(self.mon)
|
|
self.wc = Condition(self.mon)
|
|
self.limit = limit
|
|
self.queue = deque()
|
|
|
|
def put(self, item):
|
|
self.mon.acquire()
|
|
while len(self.queue) >= self.limit:
|
|
self._note("put(%s): queue full", item)
|
|
self.wc.wait()
|
|
self.queue.append(item)
|
|
self._note("put(%s): appended, length now %d",
|
|
item, len(self.queue))
|
|
self.rc.notify()
|
|
self.mon.release()
|
|
|
|
def get(self):
|
|
self.mon.acquire()
|
|
while not self.queue:
|
|
self._note("get(): queue empty")
|
|
self.rc.wait()
|
|
item = self.queue.popleft()
|
|
self._note("get(): got %s, %d left", item, len(self.queue))
|
|
self.wc.notify()
|
|
self.mon.release()
|
|
return item
|
|
|
|
class ProducerThread(Thread):
|
|
|
|
def __init__(self, queue, quota):
|
|
Thread.__init__(self, name="Producer")
|
|
self.queue = queue
|
|
self.quota = quota
|
|
|
|
def run(self):
|
|
from random import random
|
|
counter = 0
|
|
while counter < self.quota:
|
|
counter = counter + 1
|
|
self.queue.put("%s.%d" % (self.getName(), counter))
|
|
_sleep(random() * 0.00001)
|
|
|
|
|
|
class ConsumerThread(Thread):
|
|
|
|
def __init__(self, queue, count):
|
|
Thread.__init__(self, name="Consumer")
|
|
self.queue = queue
|
|
self.count = count
|
|
|
|
def run(self):
|
|
while self.count > 0:
|
|
item = self.queue.get()
|
|
print(item)
|
|
self.count = self.count - 1
|
|
|
|
NP = 3
|
|
QL = 4
|
|
NI = 5
|
|
|
|
Q = BoundedQueue(QL)
|
|
P = []
|
|
for i in range(NP):
|
|
t = ProducerThread(Q, NI)
|
|
t.setName("Producer-%d" % (i+1))
|
|
P.append(t)
|
|
C = ConsumerThread(Q, NI*NP)
|
|
for t in P:
|
|
t.start()
|
|
_sleep(0.000001)
|
|
C.start()
|
|
for t in P:
|
|
t.join()
|
|
C.join()
|
|
|
|
if __name__ == '__main__':
|
|
_test()
|