mirror of
https://github.com/python/cpython.git
synced 2025-07-29 22:24:49 +00:00
Merged changes from external pysqlite 2.3.0 release. Documentation updates will
follow in a few hours at the latest. Then we should be ready for beta1.
This commit is contained in:
parent
ea3912b0da
commit
1541ef08af
7 changed files with 376 additions and 96 deletions
|
@ -55,6 +55,9 @@ class AggrNoStep:
|
|||
def __init__(self):
|
||||
pass
|
||||
|
||||
def finalize(self):
|
||||
return 1
|
||||
|
||||
class AggrNoFinalize:
|
||||
def __init__(self):
|
||||
pass
|
||||
|
@ -144,9 +147,12 @@ class FunctionTests(unittest.TestCase):
|
|||
def CheckFuncRefCount(self):
|
||||
def getfunc():
|
||||
def f():
|
||||
return val
|
||||
return 1
|
||||
return f
|
||||
self.con.create_function("reftest", 0, getfunc())
|
||||
f = getfunc()
|
||||
globals()["foo"] = f
|
||||
# self.con.create_function("reftest", 0, getfunc())
|
||||
self.con.create_function("reftest", 0, f)
|
||||
cur = self.con.cursor()
|
||||
cur.execute("select reftest()")
|
||||
|
||||
|
@ -195,9 +201,12 @@ class FunctionTests(unittest.TestCase):
|
|||
|
||||
def CheckFuncException(self):
|
||||
cur = self.con.cursor()
|
||||
cur.execute("select raiseexception()")
|
||||
val = cur.fetchone()[0]
|
||||
self.failUnlessEqual(val, None)
|
||||
try:
|
||||
cur.execute("select raiseexception()")
|
||||
cur.fetchone()
|
||||
self.fail("should have raised OperationalError")
|
||||
except sqlite.OperationalError, e:
|
||||
self.failUnlessEqual(e.args[0], 'user-defined function raised exception')
|
||||
|
||||
def CheckParamString(self):
|
||||
cur = self.con.cursor()
|
||||
|
@ -267,31 +276,47 @@ class AggregateTests(unittest.TestCase):
|
|||
|
||||
def CheckAggrNoStep(self):
|
||||
cur = self.con.cursor()
|
||||
cur.execute("select nostep(t) from test")
|
||||
try:
|
||||
cur.execute("select nostep(t) from test")
|
||||
self.fail("should have raised an AttributeError")
|
||||
except AttributeError, e:
|
||||
self.failUnlessEqual(e.args[0], "AggrNoStep instance has no attribute 'step'")
|
||||
|
||||
def CheckAggrNoFinalize(self):
|
||||
cur = self.con.cursor()
|
||||
cur.execute("select nofinalize(t) from test")
|
||||
val = cur.fetchone()[0]
|
||||
self.failUnlessEqual(val, None)
|
||||
try:
|
||||
cur.execute("select nofinalize(t) from test")
|
||||
val = cur.fetchone()[0]
|
||||
self.fail("should have raised an OperationalError")
|
||||
except sqlite.OperationalError, e:
|
||||
self.failUnlessEqual(e.args[0], "user-defined aggregate's 'finalize' method raised error")
|
||||
|
||||
def CheckAggrExceptionInInit(self):
|
||||
cur = self.con.cursor()
|
||||
cur.execute("select excInit(t) from test")
|
||||
val = cur.fetchone()[0]
|
||||
self.failUnlessEqual(val, None)
|
||||
try:
|
||||
cur.execute("select excInit(t) from test")
|
||||
val = cur.fetchone()[0]
|
||||
self.fail("should have raised an OperationalError")
|
||||
except sqlite.OperationalError, e:
|
||||
self.failUnlessEqual(e.args[0], "user-defined aggregate's '__init__' method raised error")
|
||||
|
||||
def CheckAggrExceptionInStep(self):
|
||||
cur = self.con.cursor()
|
||||
cur.execute("select excStep(t) from test")
|
||||
val = cur.fetchone()[0]
|
||||
self.failUnlessEqual(val, 42)
|
||||
try:
|
||||
cur.execute("select excStep(t) from test")
|
||||
val = cur.fetchone()[0]
|
||||
self.fail("should have raised an OperationalError")
|
||||
except sqlite.OperationalError, e:
|
||||
self.failUnlessEqual(e.args[0], "user-defined aggregate's 'step' method raised error")
|
||||
|
||||
def CheckAggrExceptionInFinalize(self):
|
||||
cur = self.con.cursor()
|
||||
cur.execute("select excFinalize(t) from test")
|
||||
val = cur.fetchone()[0]
|
||||
self.failUnlessEqual(val, None)
|
||||
try:
|
||||
cur.execute("select excFinalize(t) from test")
|
||||
val = cur.fetchone()[0]
|
||||
self.fail("should have raised an OperationalError")
|
||||
except sqlite.OperationalError, e:
|
||||
self.failUnlessEqual(e.args[0], "user-defined aggregate's 'finalize' method raised error")
|
||||
|
||||
def CheckAggrCheckParamStr(self):
|
||||
cur = self.con.cursor()
|
||||
|
@ -331,10 +356,55 @@ class AggregateTests(unittest.TestCase):
|
|||
val = cur.fetchone()[0]
|
||||
self.failUnlessEqual(val, 60)
|
||||
|
||||
def authorizer_cb(action, arg1, arg2, dbname, source):
|
||||
if action != sqlite.SQLITE_SELECT:
|
||||
return sqlite.SQLITE_DENY
|
||||
if arg2 == 'c2' or arg1 == 't2':
|
||||
return sqlite.SQLITE_DENY
|
||||
return sqlite.SQLITE_OK
|
||||
|
||||
class AuthorizerTests(unittest.TestCase):
|
||||
def setUp(self):
|
||||
sqlite.enable_callback_tracebacks(1)
|
||||
self.con = sqlite.connect(":memory:")
|
||||
self.con.executescript("""
|
||||
create table t1 (c1, c2);
|
||||
create table t2 (c1, c2);
|
||||
insert into t1 (c1, c2) values (1, 2);
|
||||
insert into t2 (c1, c2) values (4, 5);
|
||||
""")
|
||||
|
||||
# For our security test:
|
||||
self.con.execute("select c2 from t2")
|
||||
|
||||
self.con.set_authorizer(authorizer_cb)
|
||||
|
||||
def tearDown(self):
|
||||
pass
|
||||
|
||||
def CheckTableAccess(self):
|
||||
try:
|
||||
self.con.execute("select * from t2")
|
||||
except sqlite.DatabaseError, e:
|
||||
if not e.args[0].endswith("prohibited"):
|
||||
self.fail("wrong exception text: %s" % e.args[0])
|
||||
return
|
||||
self.fail("should have raised an exception due to missing privileges")
|
||||
|
||||
def CheckColumnAccess(self):
|
||||
try:
|
||||
self.con.execute("select c2 from t1")
|
||||
except sqlite.DatabaseError, e:
|
||||
if not e.args[0].endswith("prohibited"):
|
||||
self.fail("wrong exception text: %s" % e.args[0])
|
||||
return
|
||||
self.fail("should have raised an exception due to missing privileges")
|
||||
|
||||
def suite():
|
||||
function_suite = unittest.makeSuite(FunctionTests, "Check")
|
||||
aggregate_suite = unittest.makeSuite(AggregateTests, "Check")
|
||||
return unittest.TestSuite((function_suite, aggregate_suite))
|
||||
authorizer_suite = unittest.makeSuite(AuthorizerTests, "Check")
|
||||
return unittest.TestSuite((function_suite, aggregate_suite, authorizer_suite))
|
||||
|
||||
def test():
|
||||
runner = unittest.TextTestRunner()
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue