mirror of
https://github.com/python/cpython.git
synced 2025-09-09 18:32:22 +00:00

features in BerkeleyDB not exposed. notably: the DB_MPOOLFILE interface has not yet been wrapped in an object. Adds support for building and installing bsddb3 in python2.3 that has an older version of this module installed as bsddb without conflicts. The pybsddb.sf.net build/packaged version of the module uses a dynamicly loadable module called _pybsddb rather than _bsddb.
144 lines
4.2 KiB
Python
144 lines
4.2 KiB
Python
"""
|
|
TestCases for testing the locking sub-system.
|
|
"""
|
|
|
|
import sys, os, string
|
|
import tempfile
|
|
import time
|
|
from pprint import pprint
|
|
from whrandom import random
|
|
|
|
try:
|
|
from threading import Thread, currentThread
|
|
have_threads = 1
|
|
except ImportError:
|
|
have_threads = 0
|
|
|
|
|
|
import unittest
|
|
from test_all import verbose
|
|
|
|
try:
|
|
# For Pythons w/distutils pybsddb
|
|
from bsddb3 import db
|
|
except ImportError:
|
|
# For Python 2.3
|
|
from bsddb import db
|
|
|
|
|
|
#----------------------------------------------------------------------
|
|
|
|
class LockingTestCase(unittest.TestCase):
|
|
|
|
def setUp(self):
|
|
homeDir = os.path.join(os.path.dirname(sys.argv[0]), 'db_home')
|
|
self.homeDir = homeDir
|
|
try: os.mkdir(homeDir)
|
|
except os.error: pass
|
|
self.env = db.DBEnv()
|
|
self.env.open(homeDir, db.DB_THREAD | db.DB_INIT_MPOOL |
|
|
db.DB_INIT_LOCK | db.DB_CREATE)
|
|
|
|
|
|
def tearDown(self):
|
|
self.env.close()
|
|
import glob
|
|
files = glob.glob(os.path.join(self.homeDir, '*'))
|
|
for file in files:
|
|
os.remove(file)
|
|
|
|
|
|
def test01_simple(self):
|
|
if verbose:
|
|
print '\n', '-=' * 30
|
|
print "Running %s.test01_simple..." % self.__class__.__name__
|
|
|
|
anID = self.env.lock_id()
|
|
if verbose:
|
|
print "locker ID: %s" % anID
|
|
lock = self.env.lock_get(anID, "some locked thing", db.DB_LOCK_WRITE)
|
|
if verbose:
|
|
print "Aquired lock: %s" % lock
|
|
time.sleep(1)
|
|
self.env.lock_put(lock)
|
|
if verbose:
|
|
print "Released lock: %s" % lock
|
|
|
|
|
|
|
|
|
|
def test02_threaded(self):
|
|
if verbose:
|
|
print '\n', '-=' * 30
|
|
print "Running %s.test02_threaded..." % self.__class__.__name__
|
|
|
|
threads = []
|
|
threads.append(Thread(target = self.theThread,
|
|
args=(5, db.DB_LOCK_WRITE)))
|
|
threads.append(Thread(target = self.theThread,
|
|
args=(1, db.DB_LOCK_READ)))
|
|
threads.append(Thread(target = self.theThread,
|
|
args=(1, db.DB_LOCK_READ)))
|
|
threads.append(Thread(target = self.theThread,
|
|
args=(1, db.DB_LOCK_WRITE)))
|
|
threads.append(Thread(target = self.theThread,
|
|
args=(1, db.DB_LOCK_READ)))
|
|
threads.append(Thread(target = self.theThread,
|
|
args=(1, db.DB_LOCK_READ)))
|
|
threads.append(Thread(target = self.theThread,
|
|
args=(1, db.DB_LOCK_WRITE)))
|
|
threads.append(Thread(target = self.theThread,
|
|
args=(1, db.DB_LOCK_WRITE)))
|
|
threads.append(Thread(target = self.theThread,
|
|
args=(1, db.DB_LOCK_WRITE)))
|
|
|
|
for t in threads:
|
|
t.start()
|
|
for t in threads:
|
|
t.join()
|
|
|
|
def test03_set_timeout(self):
|
|
# test that the set_timeout call works
|
|
if hasattr(self.env, 'set_timeout'):
|
|
self.env.set_timeout(0, db.DB_SET_LOCK_TIMEOUT)
|
|
self.env.set_timeout(0, db.DB_SET_TXN_TIMEOUT)
|
|
self.env.set_timeout(123456, db.DB_SET_LOCK_TIMEOUT)
|
|
self.env.set_timeout(7890123, db.DB_SET_TXN_TIMEOUT)
|
|
|
|
def theThread(self, sleepTime, lockType):
|
|
name = currentThread().getName()
|
|
if lockType == db.DB_LOCK_WRITE:
|
|
lt = "write"
|
|
else:
|
|
lt = "read"
|
|
|
|
anID = self.env.lock_id()
|
|
if verbose:
|
|
print "%s: locker ID: %s" % (name, anID)
|
|
|
|
lock = self.env.lock_get(anID, "some locked thing", lockType)
|
|
if verbose:
|
|
print "%s: Aquired %s lock: %s" % (name, lt, lock)
|
|
|
|
time.sleep(sleepTime)
|
|
|
|
self.env.lock_put(lock)
|
|
if verbose:
|
|
print "%s: Released %s lock: %s" % (name, lt, lock)
|
|
|
|
|
|
#----------------------------------------------------------------------
|
|
|
|
def test_suite():
|
|
suite = unittest.TestSuite()
|
|
|
|
if have_threads:
|
|
suite.addTest(unittest.makeSuite(LockingTestCase))
|
|
else:
|
|
suite.addTest(unittest.makeSuite(LockingTestCase, 'test01'))
|
|
|
|
return suite
|
|
|
|
|
|
if __name__ == '__main__':
|
|
unittest.main(defaultTest='test_suite')
|