mirror of
https://github.com/python/cpython.git
synced 2025-09-12 03:37:09 +00:00

svn+ssh://pythondev@svn.python.org/python/trunk ........ r58822 | brett.cannon | 2007-11-02 23:47:02 -0700 (Fri, 02 Nov 2007) | 2 lines Add a missing quotation mark. ........ r58840 | skip.montanaro | 2007-11-04 07:56:52 -0800 (Sun, 04 Nov 2007) | 2 lines Note change to get_dialect semantics in 2.5. Will backport to 2.5. ........ r58844 | georg.brandl | 2007-11-04 09:43:49 -0800 (Sun, 04 Nov 2007) | 2 lines Fix syntax for versionchanged markup. ........ r58850 | gregory.p.smith | 2007-11-04 18:32:26 -0800 (Sun, 04 Nov 2007) | 9 lines Fixes bug 477182 on pybsddb.sf.net. DB objects now load the flags and pay attention to them when opening an existing database. This means that d[] behaves properly even on databases previously created with DB_DUP or DB_DUPSORT flags to allow duplicate keys. http://sourceforge.net/tracker/index.php?func=detail&aid=477182&group_id=13900&atid=113900 Do not backport, this bugfix could be considered an API change. ........ r58851 | gregory.p.smith | 2007-11-04 18:56:31 -0800 (Sun, 04 Nov 2007) | 3 lines Add the bsddb.db.DBEnv.lock_id_free method. Improve test_lock's tempdir creation and cleanup. ........ r58852 | gregory.p.smith | 2007-11-05 01:06:28 -0800 (Mon, 05 Nov 2007) | 3 lines * db->get_types is only available in BerkeleyDB >= 4.2 * get compiling with older versions of python again for a stand alone release. ........ r58853 | gregory.p.smith | 2007-11-05 01:07:40 -0800 (Mon, 05 Nov 2007) | 2 lines * db->get_flags is only available in BerkeleyDB >= 4.2 ........ r58854 | mark.summerfield | 2007-11-05 01:22:48 -0800 (Mon, 05 Nov 2007) | 3 lines Added cross-references between the various archive file formats. ........ r58857 | mark.summerfield | 2007-11-05 06:38:50 -0800 (Mon, 05 Nov 2007) | 5 lines Clarified the fact that you can have comments for individual archive members even though comments to the archive itself aren't currently supported. ........
141 lines
4.1 KiB
Python
141 lines
4.1 KiB
Python
"""
|
|
TestCases for testing the locking sub-system.
|
|
"""
|
|
|
|
import os
|
|
from pprint import pprint
|
|
import shutil
|
|
import sys
|
|
import tempfile
|
|
import time
|
|
|
|
try:
|
|
from threading import Thread, currentThread
|
|
have_threads = 1
|
|
except ImportError:
|
|
have_threads = 0
|
|
|
|
|
|
import unittest
|
|
from bsddb.test.test_all import verbose
|
|
|
|
try:
|
|
# For Pythons w/distutils pybsddb
|
|
from bsddb3 import db
|
|
except ImportError:
|
|
# For Python 2.3
|
|
from bsddb import db
|
|
|
|
|
|
#----------------------------------------------------------------------
|
|
|
|
class LockingTestCase(unittest.TestCase):
|
|
|
|
def setUp(self):
|
|
self.homeDir = tempfile.mkdtemp('.test_lock')
|
|
self.env = db.DBEnv()
|
|
self.env.open(self.homeDir, db.DB_THREAD | db.DB_INIT_MPOOL |
|
|
db.DB_INIT_LOCK | db.DB_CREATE)
|
|
|
|
|
|
def tearDown(self):
|
|
self.env.close()
|
|
shutil.rmtree(self.homeDir)
|
|
|
|
|
|
def test01_simple(self):
|
|
if verbose:
|
|
print('\n', '-=' * 30)
|
|
print("Running %s.test01_simple..." % self.__class__.__name__)
|
|
|
|
anID = self.env.lock_id()
|
|
if verbose:
|
|
print("locker ID: %s" % anID)
|
|
lock = self.env.lock_get(anID, b"some locked thing", db.DB_LOCK_WRITE)
|
|
if verbose:
|
|
print("Aquired lock: %s" % lock)
|
|
time.sleep(1)
|
|
self.env.lock_put(lock)
|
|
if verbose:
|
|
print("Released lock: %s" % lock)
|
|
if db.version() >= (4,0):
|
|
self.env.lock_id_free(anID)
|
|
|
|
|
|
def test02_threaded(self):
|
|
if verbose:
|
|
print('\n', '-=' * 30)
|
|
print("Running %s.test02_threaded..." % self.__class__.__name__)
|
|
|
|
threads = []
|
|
threads.append(Thread(target = self.theThread,
|
|
args=(5, db.DB_LOCK_WRITE)))
|
|
threads.append(Thread(target = self.theThread,
|
|
args=(1, db.DB_LOCK_READ)))
|
|
threads.append(Thread(target = self.theThread,
|
|
args=(1, db.DB_LOCK_READ)))
|
|
threads.append(Thread(target = self.theThread,
|
|
args=(1, db.DB_LOCK_WRITE)))
|
|
threads.append(Thread(target = self.theThread,
|
|
args=(1, db.DB_LOCK_READ)))
|
|
threads.append(Thread(target = self.theThread,
|
|
args=(1, db.DB_LOCK_READ)))
|
|
threads.append(Thread(target = self.theThread,
|
|
args=(1, db.DB_LOCK_WRITE)))
|
|
threads.append(Thread(target = self.theThread,
|
|
args=(1, db.DB_LOCK_WRITE)))
|
|
threads.append(Thread(target = self.theThread,
|
|
args=(1, db.DB_LOCK_WRITE)))
|
|
|
|
for t in threads:
|
|
t.start()
|
|
for t in threads:
|
|
t.join()
|
|
|
|
def test03_set_timeout(self):
|
|
# test that the set_timeout call works
|
|
if hasattr(self.env, 'set_timeout'):
|
|
self.env.set_timeout(0, db.DB_SET_LOCK_TIMEOUT)
|
|
self.env.set_timeout(0, db.DB_SET_TXN_TIMEOUT)
|
|
self.env.set_timeout(123456, db.DB_SET_LOCK_TIMEOUT)
|
|
self.env.set_timeout(7890123, db.DB_SET_TXN_TIMEOUT)
|
|
|
|
def theThread(self, sleepTime, lockType):
|
|
name = currentThread().getName()
|
|
if lockType == db.DB_LOCK_WRITE:
|
|
lt = "write"
|
|
else:
|
|
lt = "read"
|
|
|
|
anID = self.env.lock_id()
|
|
if verbose:
|
|
print("%s: locker ID: %s" % (name, anID))
|
|
|
|
lock = self.env.lock_get(anID, b"some locked thing", lockType)
|
|
if verbose:
|
|
print("%s: Aquired %s lock: %s" % (name, lt, lock))
|
|
|
|
time.sleep(sleepTime)
|
|
|
|
self.env.lock_put(lock)
|
|
if verbose:
|
|
print("%s: Released %s lock: %s" % (name, lt, lock))
|
|
if db.version() >= (4,0):
|
|
self.env.lock_id_free(anID)
|
|
|
|
|
|
#----------------------------------------------------------------------
|
|
|
|
def test_suite():
|
|
suite = unittest.TestSuite()
|
|
|
|
if have_threads:
|
|
suite.addTest(unittest.makeSuite(LockingTestCase))
|
|
else:
|
|
suite.addTest(unittest.makeSuite(LockingTestCase, 'test01'))
|
|
|
|
return suite
|
|
|
|
|
|
if __name__ == '__main__':
|
|
unittest.main(defaultTest='test_suite')
|