mirror of
				https://github.com/python/cpython.git
				synced 2025-11-04 11:49:12 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			954 lines
		
	
	
	
		
			28 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			954 lines
		
	
	
	
		
			28 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
"""
 | 
						|
Basic TestCases for BTree and hash DBs, with and without a DBEnv, with
 | 
						|
various DB flags, etc.
 | 
						|
"""
 | 
						|
 | 
						|
import os
 | 
						|
import sys
 | 
						|
import errno
 | 
						|
import shutil
 | 
						|
import string
 | 
						|
import tempfile
 | 
						|
from pprint import pprint
 | 
						|
import unittest
 | 
						|
import time
 | 
						|
 | 
						|
try:
 | 
						|
    # For Pythons w/distutils pybsddb
 | 
						|
    from bsddb3 import db
 | 
						|
except ImportError:
 | 
						|
    # For Python 2.3
 | 
						|
    from bsddb import db
 | 
						|
 | 
						|
from test_all import verbose
 | 
						|
 | 
						|
DASH = '-'
 | 
						|
 | 
						|
 | 
						|
#----------------------------------------------------------------------
 | 
						|
 | 
						|
class VersionTestCase(unittest.TestCase):
 | 
						|
    def test00_version(self):
 | 
						|
        info = db.version()
 | 
						|
        if verbose:
 | 
						|
            print '\n', '-=' * 20
 | 
						|
            print 'bsddb.db.version(): %s' % (info, )
 | 
						|
            print db.DB_VERSION_STRING
 | 
						|
            print '-=' * 20
 | 
						|
        assert info == (db.DB_VERSION_MAJOR, db.DB_VERSION_MINOR,
 | 
						|
                        db.DB_VERSION_PATCH)
 | 
						|
 | 
						|
#----------------------------------------------------------------------
 | 
						|
 | 
						|
class BasicTestCase(unittest.TestCase):
 | 
						|
    dbtype       = db.DB_UNKNOWN  # must be set in derived class
 | 
						|
    dbopenflags  = 0
 | 
						|
    dbsetflags   = 0
 | 
						|
    dbmode       = 0660
 | 
						|
    dbname       = None
 | 
						|
    useEnv       = 0
 | 
						|
    envflags     = 0
 | 
						|
    envsetflags  = 0
 | 
						|
 | 
						|
    _numKeys      = 1002    # PRIVATE.  NOTE: must be an even value
 | 
						|
 | 
						|
    def setUp(self):
 | 
						|
        if self.useEnv:
 | 
						|
            homeDir = os.path.join(os.path.dirname(sys.argv[0]), 'db_home')
 | 
						|
            self.homeDir = homeDir
 | 
						|
            try:
 | 
						|
                shutil.rmtree(homeDir)
 | 
						|
            except OSError, e:
 | 
						|
                # unix returns ENOENT, windows returns ESRCH
 | 
						|
                if e.errno not in (errno.ENOENT, errno.ESRCH): raise
 | 
						|
            os.mkdir(homeDir)
 | 
						|
            try:
 | 
						|
                self.env = db.DBEnv()
 | 
						|
                self.env.set_lg_max(1024*1024)
 | 
						|
                self.env.set_tx_max(30)
 | 
						|
                self.env.set_tx_timestamp(int(time.time()))
 | 
						|
                self.env.set_flags(self.envsetflags, 1)
 | 
						|
                self.env.open(homeDir, self.envflags | db.DB_CREATE)
 | 
						|
                tempfile.tempdir = homeDir
 | 
						|
                self.filename = os.path.split(tempfile.mktemp())[1]
 | 
						|
                tempfile.tempdir = None
 | 
						|
            # Yes, a bare except is intended, since we're re-raising the exc.
 | 
						|
            except:
 | 
						|
                shutil.rmtree(homeDir)
 | 
						|
                raise
 | 
						|
        else:
 | 
						|
            self.env = None
 | 
						|
            self.filename = tempfile.mktemp()
 | 
						|
 | 
						|
        # create and open the DB
 | 
						|
        self.d = db.DB(self.env)
 | 
						|
        self.d.set_flags(self.dbsetflags)
 | 
						|
        if self.dbname:
 | 
						|
            self.d.open(self.filename, self.dbname, self.dbtype,
 | 
						|
                        self.dbopenflags|db.DB_CREATE, self.dbmode)
 | 
						|
        else:
 | 
						|
            self.d.open(self.filename,   # try out keyword args
 | 
						|
                        mode = self.dbmode,
 | 
						|
                        dbtype = self.dbtype,
 | 
						|
                        flags = self.dbopenflags|db.DB_CREATE)
 | 
						|
 | 
						|
        self.populateDB()
 | 
						|
 | 
						|
 | 
						|
    def tearDown(self):
 | 
						|
        self.d.close()
 | 
						|
        if self.env is not None:
 | 
						|
            self.env.close()
 | 
						|
            shutil.rmtree(self.homeDir)
 | 
						|
            ## Make a new DBEnv to remove the env files from the home dir.
 | 
						|
            ## (It can't be done while the env is open, nor after it has been
 | 
						|
            ## closed, so we make a new one to do it.)
 | 
						|
            #e = db.DBEnv()
 | 
						|
            #e.remove(self.homeDir)
 | 
						|
            #os.remove(os.path.join(self.homeDir, self.filename))
 | 
						|
        else:
 | 
						|
            os.remove(self.filename)
 | 
						|
 | 
						|
 | 
						|
 | 
						|
    def populateDB(self, _txn=None):
 | 
						|
        d = self.d
 | 
						|
 | 
						|
        for x in range(self._numKeys/2):
 | 
						|
            key = '%04d' % (self._numKeys - x)  # insert keys in reverse order
 | 
						|
            data = self.makeData(key)
 | 
						|
            d.put(key, data, _txn)
 | 
						|
 | 
						|
        d.put('empty value', '', _txn)
 | 
						|
 | 
						|
        for x in range(self._numKeys/2-1):
 | 
						|
            key = '%04d' % x  # and now some in forward order
 | 
						|
            data = self.makeData(key)
 | 
						|
            d.put(key, data, _txn)
 | 
						|
 | 
						|
        if _txn:
 | 
						|
            _txn.commit()
 | 
						|
 | 
						|
        num = len(d)
 | 
						|
        if verbose:
 | 
						|
            print "created %d records" % num
 | 
						|
 | 
						|
 | 
						|
    def makeData(self, key):
 | 
						|
        return DASH.join([key] * 5)
 | 
						|
 | 
						|
 | 
						|
 | 
						|
    #----------------------------------------
 | 
						|
 | 
						|
    def test01_GetsAndPuts(self):
 | 
						|
        d = self.d
 | 
						|
 | 
						|
        if verbose:
 | 
						|
            print '\n', '-=' * 30
 | 
						|
            print "Running %s.test01_GetsAndPuts..." % self.__class__.__name__
 | 
						|
 | 
						|
        for key in ['0001', '0100', '0400', '0700', '0999']:
 | 
						|
            data = d.get(key)
 | 
						|
            if verbose:
 | 
						|
                print data
 | 
						|
 | 
						|
        assert d.get('0321') == '0321-0321-0321-0321-0321'
 | 
						|
 | 
						|
        # By default non-existant keys return None...
 | 
						|
        assert d.get('abcd') == None
 | 
						|
 | 
						|
        # ...but they raise exceptions in other situations.  Call
 | 
						|
        # set_get_returns_none() to change it.
 | 
						|
        try:
 | 
						|
            d.delete('abcd')
 | 
						|
        except db.DBNotFoundError, val:
 | 
						|
            assert val[0] == db.DB_NOTFOUND
 | 
						|
            if verbose: print val
 | 
						|
        else:
 | 
						|
            self.fail("expected exception")
 | 
						|
 | 
						|
 | 
						|
        d.put('abcd', 'a new record')
 | 
						|
        assert d.get('abcd') == 'a new record'
 | 
						|
 | 
						|
        d.put('abcd', 'same key')
 | 
						|
        if self.dbsetflags & db.DB_DUP:
 | 
						|
            assert d.get('abcd') == 'a new record'
 | 
						|
        else:
 | 
						|
            assert d.get('abcd') == 'same key'
 | 
						|
 | 
						|
 | 
						|
        try:
 | 
						|
            d.put('abcd', 'this should fail', flags=db.DB_NOOVERWRITE)
 | 
						|
        except db.DBKeyExistError, val:
 | 
						|
            assert val[0] == db.DB_KEYEXIST
 | 
						|
            if verbose: print val
 | 
						|
        else:
 | 
						|
            self.fail("expected exception")
 | 
						|
 | 
						|
        if self.dbsetflags & db.DB_DUP:
 | 
						|
            assert d.get('abcd') == 'a new record'
 | 
						|
        else:
 | 
						|
            assert d.get('abcd') == 'same key'
 | 
						|
 | 
						|
 | 
						|
        d.sync()
 | 
						|
        d.close()
 | 
						|
        del d
 | 
						|
 | 
						|
        self.d = db.DB(self.env)
 | 
						|
        if self.dbname:
 | 
						|
            self.d.open(self.filename, self.dbname)
 | 
						|
        else:
 | 
						|
            self.d.open(self.filename)
 | 
						|
        d = self.d
 | 
						|
 | 
						|
        assert d.get('0321') == '0321-0321-0321-0321-0321'
 | 
						|
        if self.dbsetflags & db.DB_DUP:
 | 
						|
            assert d.get('abcd') == 'a new record'
 | 
						|
        else:
 | 
						|
            assert d.get('abcd') == 'same key'
 | 
						|
 | 
						|
        rec = d.get_both('0555', '0555-0555-0555-0555-0555')
 | 
						|
        if verbose:
 | 
						|
            print rec
 | 
						|
 | 
						|
        assert d.get_both('0555', 'bad data') == None
 | 
						|
 | 
						|
        # test default value
 | 
						|
        data = d.get('bad key', 'bad data')
 | 
						|
        assert data == 'bad data'
 | 
						|
 | 
						|
        # any object can pass through
 | 
						|
        data = d.get('bad key', self)
 | 
						|
        assert data == self
 | 
						|
 | 
						|
        s = d.stat()
 | 
						|
        assert type(s) == type({})
 | 
						|
        if verbose:
 | 
						|
            print 'd.stat() returned this dictionary:'
 | 
						|
            pprint(s)
 | 
						|
 | 
						|
 | 
						|
    #----------------------------------------
 | 
						|
 | 
						|
    def test02_DictionaryMethods(self):
 | 
						|
        d = self.d
 | 
						|
 | 
						|
        if verbose:
 | 
						|
            print '\n', '-=' * 30
 | 
						|
            print "Running %s.test02_DictionaryMethods..." % \
 | 
						|
                  self.__class__.__name__
 | 
						|
 | 
						|
        for key in ['0002', '0101', '0401', '0701', '0998']:
 | 
						|
            data = d[key]
 | 
						|
            assert data == self.makeData(key)
 | 
						|
            if verbose:
 | 
						|
                print data
 | 
						|
 | 
						|
        assert len(d) == self._numKeys
 | 
						|
        keys = d.keys()
 | 
						|
        assert len(keys) == self._numKeys
 | 
						|
        assert type(keys) == type([])
 | 
						|
 | 
						|
        d['new record'] = 'a new record'
 | 
						|
        assert len(d) == self._numKeys+1
 | 
						|
        keys = d.keys()
 | 
						|
        assert len(keys) == self._numKeys+1
 | 
						|
 | 
						|
        d['new record'] = 'a replacement record'
 | 
						|
        assert len(d) == self._numKeys+1
 | 
						|
        keys = d.keys()
 | 
						|
        assert len(keys) == self._numKeys+1
 | 
						|
 | 
						|
        if verbose:
 | 
						|
            print "the first 10 keys are:"
 | 
						|
            pprint(keys[:10])
 | 
						|
 | 
						|
        assert d['new record'] == 'a replacement record'
 | 
						|
 | 
						|
        assert d.has_key('0001') == 1
 | 
						|
        assert d.has_key('spam') == 0
 | 
						|
 | 
						|
        items = d.items()
 | 
						|
        assert len(items) == self._numKeys+1
 | 
						|
        assert type(items) == type([])
 | 
						|
        assert type(items[0]) == type(())
 | 
						|
        assert len(items[0]) == 2
 | 
						|
 | 
						|
        if verbose:
 | 
						|
            print "the first 10 items are:"
 | 
						|
            pprint(items[:10])
 | 
						|
 | 
						|
        values = d.values()
 | 
						|
        assert len(values) == self._numKeys+1
 | 
						|
        assert type(values) == type([])
 | 
						|
 | 
						|
        if verbose:
 | 
						|
            print "the first 10 values are:"
 | 
						|
            pprint(values[:10])
 | 
						|
 | 
						|
 | 
						|
 | 
						|
    #----------------------------------------
 | 
						|
 | 
						|
    def test03_SimpleCursorStuff(self, get_raises_error=0, set_raises_error=0):
 | 
						|
        if verbose:
 | 
						|
            print '\n', '-=' * 30
 | 
						|
            print "Running %s.test03_SimpleCursorStuff (get_error %s, set_error %s)..." % \
 | 
						|
                  (self.__class__.__name__, get_raises_error, set_raises_error)
 | 
						|
 | 
						|
        if self.env and self.dbopenflags & db.DB_AUTO_COMMIT:
 | 
						|
            txn = self.env.txn_begin()
 | 
						|
        else:
 | 
						|
            txn = None
 | 
						|
        c = self.d.cursor(txn=txn)
 | 
						|
 | 
						|
        rec = c.first()
 | 
						|
        count = 0
 | 
						|
        while rec is not None:
 | 
						|
            count = count + 1
 | 
						|
            if verbose and count % 100 == 0:
 | 
						|
                print rec
 | 
						|
            try:
 | 
						|
                rec = c.next()
 | 
						|
            except db.DBNotFoundError, val:
 | 
						|
                if get_raises_error:
 | 
						|
                    assert val[0] == db.DB_NOTFOUND
 | 
						|
                    if verbose: print val
 | 
						|
                    rec = None
 | 
						|
                else:
 | 
						|
                    self.fail("unexpected DBNotFoundError")
 | 
						|
            assert c.get_current_size() == len(c.current()[1]), "%s != len(%r)" % (c.get_current_size(), c.current()[1])
 | 
						|
 | 
						|
        assert count == self._numKeys
 | 
						|
 | 
						|
 | 
						|
        rec = c.last()
 | 
						|
        count = 0
 | 
						|
        while rec is not None:
 | 
						|
            count = count + 1
 | 
						|
            if verbose and count % 100 == 0:
 | 
						|
                print rec
 | 
						|
            try:
 | 
						|
                rec = c.prev()
 | 
						|
            except db.DBNotFoundError, val:
 | 
						|
                if get_raises_error:
 | 
						|
                    assert val[0] == db.DB_NOTFOUND
 | 
						|
                    if verbose: print val
 | 
						|
                    rec = None
 | 
						|
                else:
 | 
						|
                    self.fail("unexpected DBNotFoundError")
 | 
						|
 | 
						|
        assert count == self._numKeys
 | 
						|
 | 
						|
        rec = c.set('0505')
 | 
						|
        rec2 = c.current()
 | 
						|
        assert rec == rec2
 | 
						|
        assert rec[0] == '0505'
 | 
						|
        assert rec[1] == self.makeData('0505')
 | 
						|
        assert c.get_current_size() == len(rec[1])
 | 
						|
 | 
						|
        # make sure we get empty values properly
 | 
						|
        rec = c.set('empty value')
 | 
						|
        assert rec[1] == ''
 | 
						|
        assert c.get_current_size() == 0
 | 
						|
 | 
						|
        try:
 | 
						|
            n = c.set('bad key')
 | 
						|
        except db.DBNotFoundError, val:
 | 
						|
            assert val[0] == db.DB_NOTFOUND
 | 
						|
            if verbose: print val
 | 
						|
        else:
 | 
						|
            if set_raises_error:
 | 
						|
                self.fail("expected exception")
 | 
						|
            if n != None:
 | 
						|
                self.fail("expected None: %r" % (n,))
 | 
						|
 | 
						|
        rec = c.get_both('0404', self.makeData('0404'))
 | 
						|
        assert rec == ('0404', self.makeData('0404'))
 | 
						|
 | 
						|
        try:
 | 
						|
            n = c.get_both('0404', 'bad data')
 | 
						|
        except db.DBNotFoundError, val:
 | 
						|
            assert val[0] == db.DB_NOTFOUND
 | 
						|
            if verbose: print val
 | 
						|
        else:
 | 
						|
            if get_raises_error:
 | 
						|
                self.fail("expected exception")
 | 
						|
            if n != None:
 | 
						|
                self.fail("expected None: %r" % (n,))
 | 
						|
 | 
						|
        if self.d.get_type() == db.DB_BTREE:
 | 
						|
            rec = c.set_range('011')
 | 
						|
            if verbose:
 | 
						|
                print "searched for '011', found: ", rec
 | 
						|
 | 
						|
            rec = c.set_range('011',dlen=0,doff=0)
 | 
						|
            if verbose:
 | 
						|
                print "searched (partial) for '011', found: ", rec
 | 
						|
            if rec[1] != '': self.fail('expected empty data portion')
 | 
						|
 | 
						|
            ev = c.set_range('empty value')
 | 
						|
            if verbose:
 | 
						|
                print "search for 'empty value' returned", ev
 | 
						|
            if ev[1] != '': self.fail('empty value lookup failed')
 | 
						|
 | 
						|
        c.set('0499')
 | 
						|
        c.delete()
 | 
						|
        try:
 | 
						|
            rec = c.current()
 | 
						|
        except db.DBKeyEmptyError, val:
 | 
						|
            if get_raises_error:
 | 
						|
                assert val[0] == db.DB_KEYEMPTY
 | 
						|
                if verbose: print val
 | 
						|
            else:
 | 
						|
                self.fail("unexpected DBKeyEmptyError")
 | 
						|
        else:
 | 
						|
            if get_raises_error:
 | 
						|
                self.fail('DBKeyEmptyError exception expected')
 | 
						|
 | 
						|
        c.next()
 | 
						|
        c2 = c.dup(db.DB_POSITION)
 | 
						|
        assert c.current() == c2.current()
 | 
						|
 | 
						|
        c2.put('', 'a new value', db.DB_CURRENT)
 | 
						|
        assert c.current() == c2.current()
 | 
						|
        assert c.current()[1] == 'a new value'
 | 
						|
 | 
						|
        c2.put('', 'er', db.DB_CURRENT, dlen=0, doff=5)
 | 
						|
        assert c2.current()[1] == 'a newer value'
 | 
						|
 | 
						|
        c.close()
 | 
						|
        c2.close()
 | 
						|
        if txn:
 | 
						|
            txn.commit()
 | 
						|
 | 
						|
        # time to abuse the closed cursors and hope we don't crash
 | 
						|
        methods_to_test = {
 | 
						|
            'current': (),
 | 
						|
            'delete': (),
 | 
						|
            'dup': (db.DB_POSITION,),
 | 
						|
            'first': (),
 | 
						|
            'get': (0,),
 | 
						|
            'next': (),
 | 
						|
            'prev': (),
 | 
						|
            'last': (),
 | 
						|
            'put':('', 'spam', db.DB_CURRENT),
 | 
						|
            'set': ("0505",),
 | 
						|
        }
 | 
						|
        for method, args in methods_to_test.items():
 | 
						|
            try:
 | 
						|
                if verbose:
 | 
						|
                    print "attempting to use a closed cursor's %s method" % \
 | 
						|
                          method
 | 
						|
                # a bug may cause a NULL pointer dereference...
 | 
						|
                getattr(c, method)(*args)
 | 
						|
            except db.DBError, val:
 | 
						|
                assert val[0] == 0
 | 
						|
                if verbose: print val
 | 
						|
            else:
 | 
						|
                self.fail("no exception raised when using a buggy cursor's"
 | 
						|
                          "%s method" % method)
 | 
						|
 | 
						|
        #
 | 
						|
        # free cursor referencing a closed database, it should not barf:
 | 
						|
        #
 | 
						|
        oldcursor = self.d.cursor(txn=txn)
 | 
						|
        self.d.close()
 | 
						|
 | 
						|
        # this would originally cause a segfault when the cursor for a
 | 
						|
        # closed database was cleaned up.  it should not anymore.
 | 
						|
        # SF pybsddb bug id 667343
 | 
						|
        del oldcursor
 | 
						|
 | 
						|
    def test03b_SimpleCursorWithoutGetReturnsNone0(self):
 | 
						|
        # same test but raise exceptions instead of returning None
 | 
						|
        if verbose:
 | 
						|
            print '\n', '-=' * 30
 | 
						|
            print "Running %s.test03b_SimpleCursorStuffWithoutGetReturnsNone..." % \
 | 
						|
                  self.__class__.__name__
 | 
						|
 | 
						|
        old = self.d.set_get_returns_none(0)
 | 
						|
        assert old == 2
 | 
						|
        self.test03_SimpleCursorStuff(get_raises_error=1, set_raises_error=1)
 | 
						|
 | 
						|
    def test03b_SimpleCursorWithGetReturnsNone1(self):
 | 
						|
        # same test but raise exceptions instead of returning None
 | 
						|
        if verbose:
 | 
						|
            print '\n', '-=' * 30
 | 
						|
            print "Running %s.test03b_SimpleCursorStuffWithoutGetReturnsNone..." % \
 | 
						|
                  self.__class__.__name__
 | 
						|
 | 
						|
        old = self.d.set_get_returns_none(1)
 | 
						|
        self.test03_SimpleCursorStuff(get_raises_error=0, set_raises_error=1)
 | 
						|
 | 
						|
 | 
						|
    def test03c_SimpleCursorGetReturnsNone2(self):
 | 
						|
        # same test but raise exceptions instead of returning None
 | 
						|
        if verbose:
 | 
						|
            print '\n', '-=' * 30
 | 
						|
            print "Running %s.test03c_SimpleCursorStuffWithoutSetReturnsNone..." % \
 | 
						|
                  self.__class__.__name__
 | 
						|
 | 
						|
        old = self.d.set_get_returns_none(1)
 | 
						|
        assert old == 2
 | 
						|
        old = self.d.set_get_returns_none(2)
 | 
						|
        assert old == 1
 | 
						|
        self.test03_SimpleCursorStuff(get_raises_error=0, set_raises_error=0)
 | 
						|
 | 
						|
    #----------------------------------------
 | 
						|
 | 
						|
    def test04_PartialGetAndPut(self):
 | 
						|
        d = self.d
 | 
						|
        if verbose:
 | 
						|
            print '\n', '-=' * 30
 | 
						|
            print "Running %s.test04_PartialGetAndPut..." % \
 | 
						|
                  self.__class__.__name__
 | 
						|
 | 
						|
        key = "partialTest"
 | 
						|
        data = "1" * 1000 + "2" * 1000
 | 
						|
        d.put(key, data)
 | 
						|
        assert d.get(key) == data
 | 
						|
        assert d.get(key, dlen=20, doff=990) == ("1" * 10) + ("2" * 10)
 | 
						|
 | 
						|
        d.put("partialtest2", ("1" * 30000) + "robin" )
 | 
						|
        assert d.get("partialtest2", dlen=5, doff=30000) == "robin"
 | 
						|
 | 
						|
        # There seems to be a bug in DB here...  Commented out the test for
 | 
						|
        # now.
 | 
						|
        ##assert d.get("partialtest2", dlen=5, doff=30010) == ""
 | 
						|
 | 
						|
        if self.dbsetflags != db.DB_DUP:
 | 
						|
            # Partial put with duplicate records requires a cursor
 | 
						|
            d.put(key, "0000", dlen=2000, doff=0)
 | 
						|
            assert d.get(key) == "0000"
 | 
						|
 | 
						|
            d.put(key, "1111", dlen=1, doff=2)
 | 
						|
            assert d.get(key) == "0011110"
 | 
						|
 | 
						|
    #----------------------------------------
 | 
						|
 | 
						|
    def test05_GetSize(self):
 | 
						|
        d = self.d
 | 
						|
        if verbose:
 | 
						|
            print '\n', '-=' * 30
 | 
						|
            print "Running %s.test05_GetSize..." % self.__class__.__name__
 | 
						|
 | 
						|
        for i in range(1, 50000, 500):
 | 
						|
            key = "size%s" % i
 | 
						|
            #print "before ", i,
 | 
						|
            d.put(key, "1" * i)
 | 
						|
            #print "after",
 | 
						|
            assert d.get_size(key) == i
 | 
						|
            #print "done"
 | 
						|
 | 
						|
    #----------------------------------------
 | 
						|
 | 
						|
    def test06_Truncate(self):
 | 
						|
        if db.version() < (3,3):
 | 
						|
            # truncate is a feature of BerkeleyDB 3.3 and above
 | 
						|
            return
 | 
						|
 | 
						|
        d = self.d
 | 
						|
        if verbose:
 | 
						|
            print '\n', '-=' * 30
 | 
						|
            print "Running %s.test99_Truncate..." % self.__class__.__name__
 | 
						|
 | 
						|
        d.put("abcde", "ABCDE");
 | 
						|
        num = d.truncate()
 | 
						|
        assert num >= 1, "truncate returned <= 0 on non-empty database"
 | 
						|
        num = d.truncate()
 | 
						|
        assert num == 0, "truncate on empty DB returned nonzero (%r)" % (num,)
 | 
						|
 | 
						|
#----------------------------------------------------------------------
 | 
						|
 | 
						|
 | 
						|
class BasicBTreeTestCase(BasicTestCase):
 | 
						|
    dbtype = db.DB_BTREE
 | 
						|
 | 
						|
 | 
						|
class BasicHashTestCase(BasicTestCase):
 | 
						|
    dbtype = db.DB_HASH
 | 
						|
 | 
						|
 | 
						|
class BasicBTreeWithThreadFlagTestCase(BasicTestCase):
 | 
						|
    dbtype = db.DB_BTREE
 | 
						|
    dbopenflags = db.DB_THREAD
 | 
						|
 | 
						|
 | 
						|
class BasicHashWithThreadFlagTestCase(BasicTestCase):
 | 
						|
    dbtype = db.DB_HASH
 | 
						|
    dbopenflags = db.DB_THREAD
 | 
						|
 | 
						|
 | 
						|
class BasicBTreeWithEnvTestCase(BasicTestCase):
 | 
						|
    dbtype = db.DB_BTREE
 | 
						|
    dbopenflags = db.DB_THREAD
 | 
						|
    useEnv = 1
 | 
						|
    envflags = db.DB_THREAD | db.DB_INIT_MPOOL | db.DB_INIT_LOCK
 | 
						|
 | 
						|
 | 
						|
class BasicHashWithEnvTestCase(BasicTestCase):
 | 
						|
    dbtype = db.DB_HASH
 | 
						|
    dbopenflags = db.DB_THREAD
 | 
						|
    useEnv = 1
 | 
						|
    envflags = db.DB_THREAD | db.DB_INIT_MPOOL | db.DB_INIT_LOCK
 | 
						|
 | 
						|
 | 
						|
#----------------------------------------------------------------------
 | 
						|
 | 
						|
class BasicTransactionTestCase(BasicTestCase):
 | 
						|
    dbopenflags = db.DB_THREAD | db.DB_AUTO_COMMIT
 | 
						|
    useEnv = 1
 | 
						|
    envflags = (db.DB_THREAD | db.DB_INIT_MPOOL | db.DB_INIT_LOCK |
 | 
						|
                db.DB_INIT_TXN)
 | 
						|
    envsetflags = db.DB_AUTO_COMMIT
 | 
						|
 | 
						|
 | 
						|
    def tearDown(self):
 | 
						|
        self.txn.commit()
 | 
						|
        BasicTestCase.tearDown(self)
 | 
						|
 | 
						|
 | 
						|
    def populateDB(self):
 | 
						|
        txn = self.env.txn_begin()
 | 
						|
        BasicTestCase.populateDB(self, _txn=txn)
 | 
						|
 | 
						|
        self.txn = self.env.txn_begin()
 | 
						|
 | 
						|
 | 
						|
    def test06_Transactions(self):
 | 
						|
        d = self.d
 | 
						|
        if verbose:
 | 
						|
            print '\n', '-=' * 30
 | 
						|
            print "Running %s.test06_Transactions..." % self.__class__.__name__
 | 
						|
 | 
						|
        assert d.get('new rec', txn=self.txn) == None
 | 
						|
        d.put('new rec', 'this is a new record', self.txn)
 | 
						|
        assert d.get('new rec', txn=self.txn) == 'this is a new record'
 | 
						|
        self.txn.abort()
 | 
						|
        assert d.get('new rec') == None
 | 
						|
 | 
						|
        self.txn = self.env.txn_begin()
 | 
						|
 | 
						|
        assert d.get('new rec', txn=self.txn) == None
 | 
						|
        d.put('new rec', 'this is a new record', self.txn)
 | 
						|
        assert d.get('new rec', txn=self.txn) == 'this is a new record'
 | 
						|
        self.txn.commit()
 | 
						|
        assert d.get('new rec') == 'this is a new record'
 | 
						|
 | 
						|
        self.txn = self.env.txn_begin()
 | 
						|
        c = d.cursor(self.txn)
 | 
						|
        rec = c.first()
 | 
						|
        count = 0
 | 
						|
        while rec is not None:
 | 
						|
            count = count + 1
 | 
						|
            if verbose and count % 100 == 0:
 | 
						|
                print rec
 | 
						|
            rec = c.next()
 | 
						|
        assert count == self._numKeys+1
 | 
						|
 | 
						|
        c.close()                # Cursors *MUST* be closed before commit!
 | 
						|
        self.txn.commit()
 | 
						|
 | 
						|
        # flush pending updates
 | 
						|
        try:
 | 
						|
            self.env.txn_checkpoint (0, 0, 0)
 | 
						|
        except db.DBIncompleteError:
 | 
						|
            pass
 | 
						|
 | 
						|
        # must have at least one log file present:
 | 
						|
        logs = self.env.log_archive(db.DB_ARCH_ABS | db.DB_ARCH_LOG)
 | 
						|
        assert logs != None
 | 
						|
        for log in logs:
 | 
						|
            if verbose:
 | 
						|
                print 'log file: ' + log
 | 
						|
 | 
						|
        self.txn = self.env.txn_begin()
 | 
						|
 | 
						|
    #----------------------------------------
 | 
						|
 | 
						|
    def test07_TxnTruncate(self):
 | 
						|
        if db.version() < (3,3):
 | 
						|
            # truncate is a feature of BerkeleyDB 3.3 and above
 | 
						|
            return
 | 
						|
 | 
						|
        d = self.d
 | 
						|
        if verbose:
 | 
						|
            print '\n', '-=' * 30
 | 
						|
            print "Running %s.test07_TxnTruncate..." % self.__class__.__name__
 | 
						|
 | 
						|
        d.put("abcde", "ABCDE");
 | 
						|
        txn = self.env.txn_begin()
 | 
						|
        num = d.truncate(txn)
 | 
						|
        assert num >= 1, "truncate returned <= 0 on non-empty database"
 | 
						|
        num = d.truncate(txn)
 | 
						|
        assert num == 0, "truncate on empty DB returned nonzero (%r)" % (num,)
 | 
						|
        txn.commit()
 | 
						|
 | 
						|
    #----------------------------------------
 | 
						|
 | 
						|
    def test08_TxnLateUse(self):
 | 
						|
        txn = self.env.txn_begin()
 | 
						|
        txn.abort()
 | 
						|
        try:
 | 
						|
            txn.abort()
 | 
						|
        except db.DBError, e:
 | 
						|
            pass
 | 
						|
        else:
 | 
						|
            raise RuntimeError, "DBTxn.abort() called after DB_TXN no longer valid w/o an exception"
 | 
						|
 | 
						|
        txn = self.env.txn_begin()
 | 
						|
        txn.commit()
 | 
						|
        try:
 | 
						|
            txn.commit()
 | 
						|
        except db.DBError, e:
 | 
						|
            pass
 | 
						|
        else:
 | 
						|
            raise RuntimeError, "DBTxn.commit() called after DB_TXN no longer valid w/o an exception"
 | 
						|
 | 
						|
 | 
						|
class BTreeTransactionTestCase(BasicTransactionTestCase):
 | 
						|
    dbtype = db.DB_BTREE
 | 
						|
 | 
						|
class HashTransactionTestCase(BasicTransactionTestCase):
 | 
						|
    dbtype = db.DB_HASH
 | 
						|
 | 
						|
 | 
						|
 | 
						|
#----------------------------------------------------------------------
 | 
						|
 | 
						|
class BTreeRecnoTestCase(BasicTestCase):
 | 
						|
    dbtype     = db.DB_BTREE
 | 
						|
    dbsetflags = db.DB_RECNUM
 | 
						|
 | 
						|
    def test07_RecnoInBTree(self):
 | 
						|
        d = self.d
 | 
						|
        if verbose:
 | 
						|
            print '\n', '-=' * 30
 | 
						|
            print "Running %s.test07_RecnoInBTree..." % self.__class__.__name__
 | 
						|
 | 
						|
        rec = d.get(200)
 | 
						|
        assert type(rec) == type(())
 | 
						|
        assert len(rec) == 2
 | 
						|
        if verbose:
 | 
						|
            print "Record #200 is ", rec
 | 
						|
 | 
						|
        c = d.cursor()
 | 
						|
        c.set('0200')
 | 
						|
        num = c.get_recno()
 | 
						|
        assert type(num) == type(1)
 | 
						|
        if verbose:
 | 
						|
            print "recno of d['0200'] is ", num
 | 
						|
 | 
						|
        rec = c.current()
 | 
						|
        assert c.set_recno(num) == rec
 | 
						|
 | 
						|
        c.close()
 | 
						|
 | 
						|
 | 
						|
 | 
						|
class BTreeRecnoWithThreadFlagTestCase(BTreeRecnoTestCase):
 | 
						|
    dbopenflags = db.DB_THREAD
 | 
						|
 | 
						|
#----------------------------------------------------------------------
 | 
						|
 | 
						|
class BasicDUPTestCase(BasicTestCase):
 | 
						|
    dbsetflags = db.DB_DUP
 | 
						|
 | 
						|
    def test08_DuplicateKeys(self):
 | 
						|
        d = self.d
 | 
						|
        if verbose:
 | 
						|
            print '\n', '-=' * 30
 | 
						|
            print "Running %s.test08_DuplicateKeys..." % \
 | 
						|
                  self.__class__.__name__
 | 
						|
 | 
						|
        d.put("dup0", "before")
 | 
						|
        for x in "The quick brown fox jumped over the lazy dog.".split():
 | 
						|
            d.put("dup1", x)
 | 
						|
        d.put("dup2", "after")
 | 
						|
 | 
						|
        data = d.get("dup1")
 | 
						|
        assert data == "The"
 | 
						|
        if verbose:
 | 
						|
            print data
 | 
						|
 | 
						|
        c = d.cursor()
 | 
						|
        rec = c.set("dup1")
 | 
						|
        assert rec == ('dup1', 'The')
 | 
						|
 | 
						|
        next = c.next()
 | 
						|
        assert next == ('dup1', 'quick')
 | 
						|
 | 
						|
        rec = c.set("dup1")
 | 
						|
        count = c.count()
 | 
						|
        assert count == 9
 | 
						|
 | 
						|
        next_dup = c.next_dup()
 | 
						|
        assert next_dup == ('dup1', 'quick')
 | 
						|
 | 
						|
        rec = c.set('dup1')
 | 
						|
        while rec is not None:
 | 
						|
            if verbose:
 | 
						|
                print rec
 | 
						|
            rec = c.next_dup()
 | 
						|
 | 
						|
        c.set('dup1')
 | 
						|
        rec = c.next_nodup()
 | 
						|
        assert rec[0] != 'dup1'
 | 
						|
        if verbose:
 | 
						|
            print rec
 | 
						|
 | 
						|
        c.close()
 | 
						|
 | 
						|
 | 
						|
 | 
						|
class BTreeDUPTestCase(BasicDUPTestCase):
 | 
						|
    dbtype = db.DB_BTREE
 | 
						|
 | 
						|
class HashDUPTestCase(BasicDUPTestCase):
 | 
						|
    dbtype = db.DB_HASH
 | 
						|
 | 
						|
class BTreeDUPWithThreadTestCase(BasicDUPTestCase):
 | 
						|
    dbtype = db.DB_BTREE
 | 
						|
    dbopenflags = db.DB_THREAD
 | 
						|
 | 
						|
class HashDUPWithThreadTestCase(BasicDUPTestCase):
 | 
						|
    dbtype = db.DB_HASH
 | 
						|
    dbopenflags = db.DB_THREAD
 | 
						|
 | 
						|
 | 
						|
#----------------------------------------------------------------------
 | 
						|
 | 
						|
class BasicMultiDBTestCase(BasicTestCase):
 | 
						|
    dbname = 'first'
 | 
						|
 | 
						|
    def otherType(self):
 | 
						|
        if self.dbtype == db.DB_BTREE:
 | 
						|
            return db.DB_HASH
 | 
						|
        else:
 | 
						|
            return db.DB_BTREE
 | 
						|
 | 
						|
    def test09_MultiDB(self):
 | 
						|
        d1 = self.d
 | 
						|
        if verbose:
 | 
						|
            print '\n', '-=' * 30
 | 
						|
            print "Running %s.test09_MultiDB..." % self.__class__.__name__
 | 
						|
 | 
						|
        d2 = db.DB(self.env)
 | 
						|
        d2.open(self.filename, "second", self.dbtype,
 | 
						|
                self.dbopenflags|db.DB_CREATE)
 | 
						|
        d3 = db.DB(self.env)
 | 
						|
        d3.open(self.filename, "third", self.otherType(),
 | 
						|
                self.dbopenflags|db.DB_CREATE)
 | 
						|
 | 
						|
        for x in "The quick brown fox jumped over the lazy dog".split():
 | 
						|
            d2.put(x, self.makeData(x))
 | 
						|
 | 
						|
        for x in string.letters:
 | 
						|
            d3.put(x, x*70)
 | 
						|
 | 
						|
        d1.sync()
 | 
						|
        d2.sync()
 | 
						|
        d3.sync()
 | 
						|
        d1.close()
 | 
						|
        d2.close()
 | 
						|
        d3.close()
 | 
						|
 | 
						|
        self.d = d1 = d2 = d3 = None
 | 
						|
 | 
						|
        self.d = d1 = db.DB(self.env)
 | 
						|
        d1.open(self.filename, self.dbname, flags = self.dbopenflags)
 | 
						|
        d2 = db.DB(self.env)
 | 
						|
        d2.open(self.filename, "second",  flags = self.dbopenflags)
 | 
						|
        d3 = db.DB(self.env)
 | 
						|
        d3.open(self.filename, "third", flags = self.dbopenflags)
 | 
						|
 | 
						|
        c1 = d1.cursor()
 | 
						|
        c2 = d2.cursor()
 | 
						|
        c3 = d3.cursor()
 | 
						|
 | 
						|
        count = 0
 | 
						|
        rec = c1.first()
 | 
						|
        while rec is not None:
 | 
						|
            count = count + 1
 | 
						|
            if verbose and (count % 50) == 0:
 | 
						|
                print rec
 | 
						|
            rec = c1.next()
 | 
						|
        assert count == self._numKeys
 | 
						|
 | 
						|
        count = 0
 | 
						|
        rec = c2.first()
 | 
						|
        while rec is not None:
 | 
						|
            count = count + 1
 | 
						|
            if verbose:
 | 
						|
                print rec
 | 
						|
            rec = c2.next()
 | 
						|
        assert count == 9
 | 
						|
 | 
						|
        count = 0
 | 
						|
        rec = c3.first()
 | 
						|
        while rec is not None:
 | 
						|
            count = count + 1
 | 
						|
            if verbose:
 | 
						|
                print rec
 | 
						|
            rec = c3.next()
 | 
						|
        assert count == 52
 | 
						|
 | 
						|
 | 
						|
        c1.close()
 | 
						|
        c2.close()
 | 
						|
        c3.close()
 | 
						|
 | 
						|
        d2.close()
 | 
						|
        d3.close()
 | 
						|
 | 
						|
 | 
						|
 | 
						|
# Strange things happen if you try to use Multiple DBs per file without a
 | 
						|
# DBEnv with MPOOL and LOCKing...
 | 
						|
 | 
						|
class BTreeMultiDBTestCase(BasicMultiDBTestCase):
 | 
						|
    dbtype = db.DB_BTREE
 | 
						|
    dbopenflags = db.DB_THREAD
 | 
						|
    useEnv = 1
 | 
						|
    envflags = db.DB_THREAD | db.DB_INIT_MPOOL | db.DB_INIT_LOCK
 | 
						|
 | 
						|
class HashMultiDBTestCase(BasicMultiDBTestCase):
 | 
						|
    dbtype = db.DB_HASH
 | 
						|
    dbopenflags = db.DB_THREAD
 | 
						|
    useEnv = 1
 | 
						|
    envflags = db.DB_THREAD | db.DB_INIT_MPOOL | db.DB_INIT_LOCK
 | 
						|
 | 
						|
 | 
						|
#----------------------------------------------------------------------
 | 
						|
#----------------------------------------------------------------------
 | 
						|
 | 
						|
def test_suite():
 | 
						|
    suite = unittest.TestSuite()
 | 
						|
 | 
						|
    suite.addTest(unittest.makeSuite(VersionTestCase))
 | 
						|
    suite.addTest(unittest.makeSuite(BasicBTreeTestCase))
 | 
						|
    suite.addTest(unittest.makeSuite(BasicHashTestCase))
 | 
						|
    suite.addTest(unittest.makeSuite(BasicBTreeWithThreadFlagTestCase))
 | 
						|
    suite.addTest(unittest.makeSuite(BasicHashWithThreadFlagTestCase))
 | 
						|
    suite.addTest(unittest.makeSuite(BasicBTreeWithEnvTestCase))
 | 
						|
    suite.addTest(unittest.makeSuite(BasicHashWithEnvTestCase))
 | 
						|
    suite.addTest(unittest.makeSuite(BTreeTransactionTestCase))
 | 
						|
    suite.addTest(unittest.makeSuite(HashTransactionTestCase))
 | 
						|
    suite.addTest(unittest.makeSuite(BTreeRecnoTestCase))
 | 
						|
    suite.addTest(unittest.makeSuite(BTreeRecnoWithThreadFlagTestCase))
 | 
						|
    suite.addTest(unittest.makeSuite(BTreeDUPTestCase))
 | 
						|
    suite.addTest(unittest.makeSuite(HashDUPTestCase))
 | 
						|
    suite.addTest(unittest.makeSuite(BTreeDUPWithThreadTestCase))
 | 
						|
    suite.addTest(unittest.makeSuite(HashDUPWithThreadTestCase))
 | 
						|
    suite.addTest(unittest.makeSuite(BTreeMultiDBTestCase))
 | 
						|
    suite.addTest(unittest.makeSuite(HashMultiDBTestCase))
 | 
						|
 | 
						|
    return suite
 | 
						|
 | 
						|
 | 
						|
if __name__ == '__main__':
 | 
						|
    unittest.main(defaultTest='test_suite')
 |