mirror of
https://github.com/python/cpython.git
synced 2025-08-11 04:19:06 +00:00

Python 2.6 one, since the intention is to keep an unified 2.x/3.x codebase. The Python code is automatically translated using "2to3". Please, do not update this code in Python 3.0 by hand. Update the 2.6 one and then do "2to3".
142 lines
5.3 KiB
Python
142 lines
5.3 KiB
Python
import unittest
|
|
import os
|
|
|
|
from .test_all import db, test_support, get_new_environment_path, get_new_database_path
|
|
|
|
|
|
class DBSequenceTest(unittest.TestCase):
|
|
import sys
|
|
if sys.version_info[:3] < (2, 4, 0):
|
|
def assertTrue(self, expr, msg=None):
|
|
self.failUnless(expr,msg=msg)
|
|
|
|
def setUp(self):
|
|
self.int_32_max = 0x100000000
|
|
self.homeDir = get_new_environment_path()
|
|
self.filename = "test"
|
|
|
|
self.dbenv = db.DBEnv()
|
|
self.dbenv.open(self.homeDir, db.DB_CREATE | db.DB_INIT_MPOOL, 0o666)
|
|
self.d = db.DB(self.dbenv)
|
|
self.d.open(self.filename, db.DB_BTREE, db.DB_CREATE, 0o666)
|
|
|
|
def tearDown(self):
|
|
if hasattr(self, 'seq'):
|
|
self.seq.close()
|
|
del self.seq
|
|
if hasattr(self, 'd'):
|
|
self.d.close()
|
|
del self.d
|
|
if hasattr(self, 'dbenv'):
|
|
self.dbenv.close()
|
|
del self.dbenv
|
|
|
|
test_support.rmtree(self.homeDir)
|
|
|
|
def test_get(self):
|
|
self.seq = db.DBSequence(self.d, flags=0)
|
|
start_value = 10 * self.int_32_max
|
|
self.assertEqual(0xA00000000, start_value)
|
|
self.assertEquals(None, self.seq.init_value(start_value))
|
|
self.assertEquals(None, self.seq.open(key='id', txn=None, flags=db.DB_CREATE))
|
|
self.assertEquals(start_value, self.seq.get(5))
|
|
self.assertEquals(start_value + 5, self.seq.get())
|
|
|
|
def test_remove(self):
|
|
self.seq = db.DBSequence(self.d, flags=0)
|
|
self.assertEquals(None, self.seq.open(key='foo', txn=None, flags=db.DB_CREATE))
|
|
self.assertEquals(None, self.seq.remove(txn=None, flags=0))
|
|
del self.seq
|
|
|
|
def test_get_key(self):
|
|
self.seq = db.DBSequence(self.d, flags=0)
|
|
key = 'foo'
|
|
self.assertEquals(None, self.seq.open(key=key, txn=None, flags=db.DB_CREATE))
|
|
self.assertEquals(key, self.seq.get_key())
|
|
|
|
def test_get_dbp(self):
|
|
self.seq = db.DBSequence(self.d, flags=0)
|
|
self.assertEquals(None, self.seq.open(key='foo', txn=None, flags=db.DB_CREATE))
|
|
self.assertEquals(self.d, self.seq.get_dbp())
|
|
|
|
def test_cachesize(self):
|
|
self.seq = db.DBSequence(self.d, flags=0)
|
|
cashe_size = 10
|
|
self.assertEquals(None, self.seq.set_cachesize(cashe_size))
|
|
self.assertEquals(None, self.seq.open(key='foo', txn=None, flags=db.DB_CREATE))
|
|
self.assertEquals(cashe_size, self.seq.get_cachesize())
|
|
|
|
def test_flags(self):
|
|
self.seq = db.DBSequence(self.d, flags=0)
|
|
flag = db.DB_SEQ_WRAP;
|
|
self.assertEquals(None, self.seq.set_flags(flag))
|
|
self.assertEquals(None, self.seq.open(key='foo', txn=None, flags=db.DB_CREATE))
|
|
self.assertEquals(flag, self.seq.get_flags() & flag)
|
|
|
|
def test_range(self):
|
|
self.seq = db.DBSequence(self.d, flags=0)
|
|
seq_range = (10 * self.int_32_max, 11 * self.int_32_max - 1)
|
|
self.assertEquals(None, self.seq.set_range(seq_range))
|
|
self.seq.init_value(seq_range[0])
|
|
self.assertEquals(None, self.seq.open(key='foo', txn=None, flags=db.DB_CREATE))
|
|
self.assertEquals(seq_range, self.seq.get_range())
|
|
|
|
def test_stat(self):
|
|
self.seq = db.DBSequence(self.d, flags=0)
|
|
self.assertEquals(None, self.seq.open(key='foo', txn=None, flags=db.DB_CREATE))
|
|
stat = self.seq.stat()
|
|
for param in ('nowait', 'min', 'max', 'value', 'current',
|
|
'flags', 'cache_size', 'last_value', 'wait'):
|
|
self.assertTrue(param in stat, "parameter %s isn't in stat info" % param)
|
|
|
|
if db.version() >= (4,7) :
|
|
# This code checks a crash solved in Berkeley DB 4.7
|
|
def test_stat_crash(self) :
|
|
d=db.DB()
|
|
d.open(None,dbtype=db.DB_HASH,flags=db.DB_CREATE) # In RAM
|
|
seq = db.DBSequence(d, flags=0)
|
|
|
|
self.assertRaises(db.DBNotFoundError, seq.open,
|
|
key='id', txn=None, flags=0)
|
|
|
|
self.assertRaises(db.DBInvalidArgError, seq.stat)
|
|
|
|
d.close()
|
|
|
|
def test_64bits(self) :
|
|
# We don't use both extremes because they are problematic
|
|
value_plus=(1<<63)-2
|
|
self.assertEquals(9223372036854775806,value_plus)
|
|
value_minus=(-1<<63)+1 # Two complement
|
|
self.assertEquals(-9223372036854775807,value_minus)
|
|
self.seq = db.DBSequence(self.d, flags=0)
|
|
self.assertEquals(None, self.seq.init_value(value_plus-1))
|
|
self.assertEquals(None, self.seq.open(key='id', txn=None,
|
|
flags=db.DB_CREATE))
|
|
self.assertEquals(value_plus-1, self.seq.get(1))
|
|
self.assertEquals(value_plus, self.seq.get(1))
|
|
|
|
self.seq.remove(txn=None, flags=0)
|
|
|
|
self.seq = db.DBSequence(self.d, flags=0)
|
|
self.assertEquals(None, self.seq.init_value(value_minus))
|
|
self.assertEquals(None, self.seq.open(key='id', txn=None,
|
|
flags=db.DB_CREATE))
|
|
self.assertEquals(value_minus, self.seq.get(1))
|
|
self.assertEquals(value_minus+1, self.seq.get(1))
|
|
|
|
def test_multiple_close(self):
|
|
self.seq = db.DBSequence(self.d)
|
|
self.seq.close() # You can close a Sequence multiple times
|
|
self.seq.close()
|
|
self.seq.close()
|
|
|
|
def test_suite():
|
|
suite = unittest.TestSuite()
|
|
if db.version() >= (4,3):
|
|
suite.addTest(unittest.makeSuite(DBSequenceTest))
|
|
return suite
|
|
|
|
|
|
if __name__ == '__main__':
|
|
unittest.main(defaultTest='test_suite')
|