mirror of
https://github.com/python/cpython.git
synced 2025-08-16 23:01:34 +00:00

svn+ssh://pythondev@svn.python.org/python/trunk ........ r78558 | r.david.murray | 2010-03-01 14:14:16 -0500 (Mon, 01 Mar 2010) | 6 lines Issue 3892 again. The bsddb3 replication test still fails randomly. Since this module is unmaintained in the library and gone in py3k, this patch skips the remainder of the replication test if a second timeout occurs, as it randomly does. This should improve buildbot stability. ........
463 lines
18 KiB
Python
463 lines
18 KiB
Python
"""TestCases for distributed transactions.
|
|
"""
|
|
|
|
import os
|
|
import time
|
|
import unittest
|
|
|
|
from test_all import db, test_support, have_threads, verbose, \
|
|
get_new_environment_path, get_new_database_path
|
|
|
|
|
|
#----------------------------------------------------------------------
|
|
|
|
class DBReplicationManager(unittest.TestCase):
|
|
import sys
|
|
if sys.version_info[:3] < (2, 4, 0):
|
|
def assertTrue(self, expr, msg=None):
|
|
self.failUnless(expr,msg=msg)
|
|
|
|
def setUp(self) :
|
|
self.homeDirMaster = get_new_environment_path()
|
|
self.homeDirClient = get_new_environment_path()
|
|
|
|
self.dbenvMaster = db.DBEnv()
|
|
self.dbenvClient = db.DBEnv()
|
|
|
|
# Must use "DB_THREAD" because the Replication Manager will
|
|
# be executed in other threads but will use the same environment.
|
|
# http://forums.oracle.com/forums/thread.jspa?threadID=645788&tstart=0
|
|
self.dbenvMaster.open(self.homeDirMaster, db.DB_CREATE | db.DB_INIT_TXN
|
|
| db.DB_INIT_LOG | db.DB_INIT_MPOOL | db.DB_INIT_LOCK |
|
|
db.DB_INIT_REP | db.DB_RECOVER | db.DB_THREAD, 0666)
|
|
self.dbenvClient.open(self.homeDirClient, db.DB_CREATE | db.DB_INIT_TXN
|
|
| db.DB_INIT_LOG | db.DB_INIT_MPOOL | db.DB_INIT_LOCK |
|
|
db.DB_INIT_REP | db.DB_RECOVER | db.DB_THREAD, 0666)
|
|
|
|
self.confirmed_master=self.client_startupdone=False
|
|
def confirmed_master(a,b,c) :
|
|
if b==db.DB_EVENT_REP_MASTER :
|
|
self.confirmed_master=True
|
|
|
|
def client_startupdone(a,b,c) :
|
|
if b==db.DB_EVENT_REP_STARTUPDONE :
|
|
self.client_startupdone=True
|
|
|
|
self.dbenvMaster.set_event_notify(confirmed_master)
|
|
self.dbenvClient.set_event_notify(client_startupdone)
|
|
|
|
#self.dbenvMaster.set_verbose(db.DB_VERB_REPLICATION, True)
|
|
#self.dbenvMaster.set_verbose(db.DB_VERB_FILEOPS_ALL, True)
|
|
#self.dbenvClient.set_verbose(db.DB_VERB_REPLICATION, True)
|
|
#self.dbenvClient.set_verbose(db.DB_VERB_FILEOPS_ALL, True)
|
|
|
|
self.dbMaster = self.dbClient = None
|
|
|
|
|
|
def tearDown(self):
|
|
if self.dbClient :
|
|
self.dbClient.close()
|
|
if self.dbMaster :
|
|
self.dbMaster.close()
|
|
self.dbenvClient.close()
|
|
self.dbenvMaster.close()
|
|
test_support.rmtree(self.homeDirClient)
|
|
test_support.rmtree(self.homeDirMaster)
|
|
|
|
def test01_basic_replication(self) :
|
|
master_port = test_support.find_unused_port()
|
|
self.dbenvMaster.repmgr_set_local_site("127.0.0.1", master_port)
|
|
client_port = test_support.find_unused_port()
|
|
self.dbenvClient.repmgr_set_local_site("127.0.0.1", client_port)
|
|
self.dbenvMaster.repmgr_add_remote_site("127.0.0.1", client_port)
|
|
self.dbenvClient.repmgr_add_remote_site("127.0.0.1", master_port)
|
|
self.dbenvMaster.rep_set_nsites(2)
|
|
self.dbenvClient.rep_set_nsites(2)
|
|
self.dbenvMaster.rep_set_priority(10)
|
|
self.dbenvClient.rep_set_priority(0)
|
|
|
|
self.dbenvMaster.rep_set_timeout(db.DB_REP_CONNECTION_RETRY,100123)
|
|
self.dbenvClient.rep_set_timeout(db.DB_REP_CONNECTION_RETRY,100321)
|
|
self.assertEquals(self.dbenvMaster.rep_get_timeout(
|
|
db.DB_REP_CONNECTION_RETRY), 100123)
|
|
self.assertEquals(self.dbenvClient.rep_get_timeout(
|
|
db.DB_REP_CONNECTION_RETRY), 100321)
|
|
|
|
self.dbenvMaster.rep_set_timeout(db.DB_REP_ELECTION_TIMEOUT, 100234)
|
|
self.dbenvClient.rep_set_timeout(db.DB_REP_ELECTION_TIMEOUT, 100432)
|
|
self.assertEquals(self.dbenvMaster.rep_get_timeout(
|
|
db.DB_REP_ELECTION_TIMEOUT), 100234)
|
|
self.assertEquals(self.dbenvClient.rep_get_timeout(
|
|
db.DB_REP_ELECTION_TIMEOUT), 100432)
|
|
|
|
self.dbenvMaster.rep_set_timeout(db.DB_REP_ELECTION_RETRY, 100345)
|
|
self.dbenvClient.rep_set_timeout(db.DB_REP_ELECTION_RETRY, 100543)
|
|
self.assertEquals(self.dbenvMaster.rep_get_timeout(
|
|
db.DB_REP_ELECTION_RETRY), 100345)
|
|
self.assertEquals(self.dbenvClient.rep_get_timeout(
|
|
db.DB_REP_ELECTION_RETRY), 100543)
|
|
|
|
self.dbenvMaster.repmgr_set_ack_policy(db.DB_REPMGR_ACKS_ALL)
|
|
self.dbenvClient.repmgr_set_ack_policy(db.DB_REPMGR_ACKS_ALL)
|
|
|
|
self.dbenvMaster.repmgr_start(1, db.DB_REP_MASTER);
|
|
self.dbenvClient.repmgr_start(1, db.DB_REP_CLIENT);
|
|
|
|
self.assertEquals(self.dbenvMaster.rep_get_nsites(),2)
|
|
self.assertEquals(self.dbenvClient.rep_get_nsites(),2)
|
|
self.assertEquals(self.dbenvMaster.rep_get_priority(),10)
|
|
self.assertEquals(self.dbenvClient.rep_get_priority(),0)
|
|
self.assertEquals(self.dbenvMaster.repmgr_get_ack_policy(),
|
|
db.DB_REPMGR_ACKS_ALL)
|
|
self.assertEquals(self.dbenvClient.repmgr_get_ack_policy(),
|
|
db.DB_REPMGR_ACKS_ALL)
|
|
|
|
# The timeout is necessary in BDB 4.5, since DB_EVENT_REP_STARTUPDONE
|
|
# is not generated if the master has no new transactions.
|
|
# This is solved in BDB 4.6 (#15542).
|
|
import time
|
|
timeout = time.time()+60
|
|
while (time.time()<timeout) and not (self.confirmed_master and self.client_startupdone) :
|
|
time.sleep(0.02)
|
|
# self.client_startupdone does not always get set to True within
|
|
# the timeout. On windows this may be a deep issue, on other
|
|
# platforms it is likely just a timing issue, especially on slow
|
|
# virthost buildbots (see issue 3892 for more). Even though
|
|
# the timeout triggers, the rest of this test method usually passes
|
|
# (but not all of it always, see below). So we just note the
|
|
# timeout on stderr and keep soldering on.
|
|
if time.time()>timeout:
|
|
import sys
|
|
print >> sys.stderr, ("XXX: timeout happened before"
|
|
"startup was confirmed - see issue 3892")
|
|
startup_timeout = True
|
|
|
|
d = self.dbenvMaster.repmgr_site_list()
|
|
self.assertEquals(len(d), 1)
|
|
self.assertEquals(d[0][0], "127.0.0.1")
|
|
self.assertEquals(d[0][1], client_port)
|
|
self.assertTrue((d[0][2]==db.DB_REPMGR_CONNECTED) or \
|
|
(d[0][2]==db.DB_REPMGR_DISCONNECTED))
|
|
|
|
d = self.dbenvClient.repmgr_site_list()
|
|
self.assertEquals(len(d), 1)
|
|
self.assertEquals(d[0][0], "127.0.0.1")
|
|
self.assertEquals(d[0][1], master_port)
|
|
self.assertTrue((d[0][2]==db.DB_REPMGR_CONNECTED) or \
|
|
(d[0][2]==db.DB_REPMGR_DISCONNECTED))
|
|
|
|
if db.version() >= (4,6) :
|
|
d = self.dbenvMaster.repmgr_stat(flags=db.DB_STAT_CLEAR);
|
|
self.assertTrue("msgs_queued" in d)
|
|
|
|
self.dbMaster=db.DB(self.dbenvMaster)
|
|
txn=self.dbenvMaster.txn_begin()
|
|
self.dbMaster.open("test", db.DB_HASH, db.DB_CREATE, 0666, txn=txn)
|
|
txn.commit()
|
|
|
|
import time,os.path
|
|
timeout=time.time()+10
|
|
while (time.time()<timeout) and \
|
|
not (os.path.exists(os.path.join(self.homeDirClient,"test"))) :
|
|
time.sleep(0.01)
|
|
|
|
self.dbClient=db.DB(self.dbenvClient)
|
|
while True :
|
|
txn=self.dbenvClient.txn_begin()
|
|
try :
|
|
self.dbClient.open("test", db.DB_HASH, flags=db.DB_RDONLY,
|
|
mode=0666, txn=txn)
|
|
except db.DBRepHandleDeadError :
|
|
txn.abort()
|
|
self.dbClient.close()
|
|
self.dbClient=db.DB(self.dbenvClient)
|
|
continue
|
|
|
|
txn.commit()
|
|
break
|
|
|
|
txn=self.dbenvMaster.txn_begin()
|
|
self.dbMaster.put("ABC", "123", txn=txn)
|
|
txn.commit()
|
|
import time
|
|
timeout=time.time()+10
|
|
v=None
|
|
while (time.time()<timeout) and (v==None) :
|
|
txn=self.dbenvClient.txn_begin()
|
|
v=self.dbClient.get("ABC", txn=txn)
|
|
txn.commit()
|
|
if v==None :
|
|
time.sleep(0.02)
|
|
# If startup did not happen before the timeout above, then this test
|
|
# sometimes fails. This happens randomly, which causes buildbot
|
|
# instability, but all the other bsddb tests pass. Since bsddb3 in the
|
|
# stdlib is currently not getting active maintenance, and is gone in
|
|
# py3k, we just skip the end of the test in that case.
|
|
if time.time()>=timeout and startup_timeout:
|
|
self.skipTest("replication test skipped due to random failure, "
|
|
"see issue 3892")
|
|
self.assertTrue(time.time()<timeout)
|
|
self.assertEquals("123", v)
|
|
|
|
txn=self.dbenvMaster.txn_begin()
|
|
self.dbMaster.delete("ABC", txn=txn)
|
|
txn.commit()
|
|
timeout=time.time()+10
|
|
while (time.time()<timeout) and (v!=None) :
|
|
txn=self.dbenvClient.txn_begin()
|
|
v=self.dbClient.get("ABC", txn=txn)
|
|
txn.commit()
|
|
if v==None :
|
|
time.sleep(0.02)
|
|
self.assertTrue(time.time()<timeout)
|
|
self.assertEquals(None, v)
|
|
|
|
class DBBaseReplication(DBReplicationManager):
|
|
def setUp(self) :
|
|
DBReplicationManager.setUp(self)
|
|
def confirmed_master(a,b,c) :
|
|
if (b == db.DB_EVENT_REP_MASTER) or (b == db.DB_EVENT_REP_ELECTED) :
|
|
self.confirmed_master = True
|
|
|
|
def client_startupdone(a,b,c) :
|
|
if b == db.DB_EVENT_REP_STARTUPDONE :
|
|
self.client_startupdone = True
|
|
|
|
self.dbenvMaster.set_event_notify(confirmed_master)
|
|
self.dbenvClient.set_event_notify(client_startupdone)
|
|
|
|
import Queue
|
|
self.m2c = Queue.Queue()
|
|
self.c2m = Queue.Queue()
|
|
|
|
# There are only two nodes, so we don't need to
|
|
# do any routing decision
|
|
def m2c(dbenv, control, rec, lsnp, envid, flags) :
|
|
self.m2c.put((control, rec))
|
|
|
|
def c2m(dbenv, control, rec, lsnp, envid, flags) :
|
|
self.c2m.put((control, rec))
|
|
|
|
self.dbenvMaster.rep_set_transport(13,m2c)
|
|
self.dbenvMaster.rep_set_priority(10)
|
|
self.dbenvClient.rep_set_transport(3,c2m)
|
|
self.dbenvClient.rep_set_priority(0)
|
|
|
|
self.assertEquals(self.dbenvMaster.rep_get_priority(),10)
|
|
self.assertEquals(self.dbenvClient.rep_get_priority(),0)
|
|
|
|
#self.dbenvMaster.set_verbose(db.DB_VERB_REPLICATION, True)
|
|
#self.dbenvMaster.set_verbose(db.DB_VERB_FILEOPS_ALL, True)
|
|
#self.dbenvClient.set_verbose(db.DB_VERB_REPLICATION, True)
|
|
#self.dbenvClient.set_verbose(db.DB_VERB_FILEOPS_ALL, True)
|
|
|
|
def thread_master() :
|
|
return self.thread_do(self.dbenvMaster, self.c2m, 3,
|
|
self.master_doing_election, True)
|
|
|
|
def thread_client() :
|
|
return self.thread_do(self.dbenvClient, self.m2c, 13,
|
|
self.client_doing_election, False)
|
|
|
|
from threading import Thread
|
|
t_m=Thread(target=thread_master)
|
|
t_c=Thread(target=thread_client)
|
|
import sys
|
|
if sys.version_info[0] < 3 :
|
|
t_m.setDaemon(True)
|
|
t_c.setDaemon(True)
|
|
else :
|
|
t_m.daemon = True
|
|
t_c.daemon = True
|
|
|
|
self.t_m = t_m
|
|
self.t_c = t_c
|
|
|
|
self.dbMaster = self.dbClient = None
|
|
|
|
self.master_doing_election=[False]
|
|
self.client_doing_election=[False]
|
|
|
|
|
|
def tearDown(self):
|
|
if self.dbClient :
|
|
self.dbClient.close()
|
|
if self.dbMaster :
|
|
self.dbMaster.close()
|
|
self.m2c.put(None)
|
|
self.c2m.put(None)
|
|
self.t_m.join()
|
|
self.t_c.join()
|
|
self.dbenvClient.close()
|
|
self.dbenvMaster.close()
|
|
test_support.rmtree(self.homeDirClient)
|
|
test_support.rmtree(self.homeDirMaster)
|
|
|
|
def basic_rep_threading(self) :
|
|
self.dbenvMaster.rep_start(flags=db.DB_REP_MASTER)
|
|
self.dbenvClient.rep_start(flags=db.DB_REP_CLIENT)
|
|
|
|
def thread_do(env, q, envid, election_status, must_be_master) :
|
|
while True :
|
|
v=q.get()
|
|
if v == None : return
|
|
env.rep_process_message(v[0], v[1], envid)
|
|
|
|
self.thread_do = thread_do
|
|
|
|
self.t_m.start()
|
|
self.t_c.start()
|
|
|
|
def test01_basic_replication(self) :
|
|
self.basic_rep_threading()
|
|
|
|
# The timeout is necessary in BDB 4.5, since DB_EVENT_REP_STARTUPDONE
|
|
# is not generated if the master has no new transactions.
|
|
# This is solved in BDB 4.6 (#15542).
|
|
import time
|
|
timeout = time.time()+60
|
|
while (time.time()<timeout) and not (self.confirmed_master and
|
|
self.client_startupdone) :
|
|
time.sleep(0.02)
|
|
self.assertTrue(time.time()<timeout)
|
|
|
|
self.dbMaster=db.DB(self.dbenvMaster)
|
|
txn=self.dbenvMaster.txn_begin()
|
|
self.dbMaster.open("test", db.DB_HASH, db.DB_CREATE, 0666, txn=txn)
|
|
txn.commit()
|
|
|
|
import time,os.path
|
|
timeout=time.time()+10
|
|
while (time.time()<timeout) and \
|
|
not (os.path.exists(os.path.join(self.homeDirClient,"test"))) :
|
|
time.sleep(0.01)
|
|
|
|
self.dbClient=db.DB(self.dbenvClient)
|
|
while True :
|
|
txn=self.dbenvClient.txn_begin()
|
|
try :
|
|
self.dbClient.open("test", db.DB_HASH, flags=db.DB_RDONLY,
|
|
mode=0666, txn=txn)
|
|
except db.DBRepHandleDeadError :
|
|
txn.abort()
|
|
self.dbClient.close()
|
|
self.dbClient=db.DB(self.dbenvClient)
|
|
continue
|
|
|
|
txn.commit()
|
|
break
|
|
|
|
txn=self.dbenvMaster.txn_begin()
|
|
self.dbMaster.put("ABC", "123", txn=txn)
|
|
txn.commit()
|
|
import time
|
|
timeout=time.time()+10
|
|
v=None
|
|
while (time.time()<timeout) and (v==None) :
|
|
txn=self.dbenvClient.txn_begin()
|
|
v=self.dbClient.get("ABC", txn=txn)
|
|
txn.commit()
|
|
if v==None :
|
|
time.sleep(0.02)
|
|
self.assertTrue(time.time()<timeout)
|
|
self.assertEquals("123", v)
|
|
|
|
txn=self.dbenvMaster.txn_begin()
|
|
self.dbMaster.delete("ABC", txn=txn)
|
|
txn.commit()
|
|
timeout=time.time()+10
|
|
while (time.time()<timeout) and (v!=None) :
|
|
txn=self.dbenvClient.txn_begin()
|
|
v=self.dbClient.get("ABC", txn=txn)
|
|
txn.commit()
|
|
if v==None :
|
|
time.sleep(0.02)
|
|
self.assertTrue(time.time()<timeout)
|
|
self.assertEquals(None, v)
|
|
|
|
if db.version() >= (4,7) :
|
|
def test02_test_request(self) :
|
|
self.basic_rep_threading()
|
|
(minimum, maximum) = self.dbenvClient.rep_get_request()
|
|
self.dbenvClient.rep_set_request(minimum-1, maximum+1)
|
|
self.assertEqual(self.dbenvClient.rep_get_request(),
|
|
(minimum-1, maximum+1))
|
|
|
|
if db.version() >= (4,6) :
|
|
def test03_master_election(self) :
|
|
# Get ready to hold an election
|
|
#self.dbenvMaster.rep_start(flags=db.DB_REP_MASTER)
|
|
self.dbenvMaster.rep_start(flags=db.DB_REP_CLIENT)
|
|
self.dbenvClient.rep_start(flags=db.DB_REP_CLIENT)
|
|
|
|
def thread_do(env, q, envid, election_status, must_be_master) :
|
|
while True :
|
|
v=q.get()
|
|
if v == None : return
|
|
r = env.rep_process_message(v[0],v[1],envid)
|
|
if must_be_master and self.confirmed_master :
|
|
self.dbenvMaster.rep_start(flags = db.DB_REP_MASTER)
|
|
must_be_master = False
|
|
|
|
if r[0] == db.DB_REP_HOLDELECTION :
|
|
def elect() :
|
|
while True :
|
|
try :
|
|
env.rep_elect(2, 1)
|
|
election_status[0] = False
|
|
break
|
|
except db.DBRepUnavailError :
|
|
pass
|
|
if not election_status[0] and not self.confirmed_master :
|
|
from threading import Thread
|
|
election_status[0] = True
|
|
t=Thread(target=elect)
|
|
import sys
|
|
if sys.version_info[0] < 3 :
|
|
t.setDaemon(True)
|
|
else :
|
|
t.daemon = True
|
|
t.start()
|
|
|
|
self.thread_do = thread_do
|
|
|
|
self.t_m.start()
|
|
self.t_c.start()
|
|
|
|
self.dbenvMaster.rep_set_timeout(db.DB_REP_ELECTION_TIMEOUT, 50000)
|
|
self.dbenvClient.rep_set_timeout(db.DB_REP_ELECTION_TIMEOUT, 50000)
|
|
self.client_doing_election[0] = True
|
|
while True :
|
|
try :
|
|
self.dbenvClient.rep_elect(2, 1)
|
|
self.client_doing_election[0] = False
|
|
break
|
|
except db.DBRepUnavailError :
|
|
pass
|
|
|
|
self.assertTrue(self.confirmed_master)
|
|
|
|
#----------------------------------------------------------------------
|
|
|
|
def test_suite():
|
|
suite = unittest.TestSuite()
|
|
if db.version() >= (4, 6) :
|
|
dbenv = db.DBEnv()
|
|
try :
|
|
dbenv.repmgr_get_ack_policy()
|
|
ReplicationManager_available=True
|
|
except :
|
|
ReplicationManager_available=False
|
|
dbenv.close()
|
|
del dbenv
|
|
if ReplicationManager_available :
|
|
suite.addTest(unittest.makeSuite(DBReplicationManager))
|
|
|
|
if have_threads :
|
|
suite.addTest(unittest.makeSuite(DBBaseReplication))
|
|
|
|
return suite
|
|
|
|
|
|
if __name__ == '__main__':
|
|
unittest.main(defaultTest='test_suite')
|