mirror of
https://github.com/python/cpython.git
synced 2025-08-03 16:39:00 +00:00
Merged code from pysqlite 2.6.0.
This commit is contained in:
parent
2bb66e03b7
commit
3bbb67273a
25 changed files with 626 additions and 83 deletions
|
@ -1,7 +1,7 @@
|
|||
#-*- coding: ISO-8859-1 -*-
|
||||
# pysqlite2/test/dbapi.py: tests for DB-API compliance
|
||||
#
|
||||
# Copyright (C) 2004-2007 Gerhard Häring <gh@ghaering.de>
|
||||
# Copyright (C) 2004-2010 Gerhard Häring <gh@ghaering.de>
|
||||
#
|
||||
# This file is part of pysqlite.
|
||||
#
|
||||
|
@ -672,13 +672,13 @@ class ExtensionTests(unittest.TestCase):
|
|||
res = cur.fetchone()[0]
|
||||
self.assertEqual(res, 6)
|
||||
|
||||
def CheckScriptErrorIncomplete(self):
|
||||
def CheckScriptSyntaxError(self):
|
||||
con = sqlite.connect(":memory:")
|
||||
cur = con.cursor()
|
||||
raised = False
|
||||
try:
|
||||
cur.executescript("create table test(sadfsadfdsa")
|
||||
except sqlite.ProgrammingError:
|
||||
cur.executescript("create table test(x); asdf; create table test2(x)")
|
||||
except sqlite.OperationalError:
|
||||
raised = True
|
||||
self.assertEqual(raised, True, "should have raised an exception")
|
||||
|
||||
|
@ -711,7 +711,7 @@ class ExtensionTests(unittest.TestCase):
|
|||
result = con.execute("select foo from test").fetchone()[0]
|
||||
self.assertEqual(result, 5, "Basic test of Connection.executescript")
|
||||
|
||||
class ClosedTests(unittest.TestCase):
|
||||
class ClosedConTests(unittest.TestCase):
|
||||
def setUp(self):
|
||||
pass
|
||||
|
||||
|
@ -763,6 +763,102 @@ class ClosedTests(unittest.TestCase):
|
|||
except:
|
||||
self.fail("Should have raised a ProgrammingError")
|
||||
|
||||
def CheckClosedCreateFunction(self):
|
||||
con = sqlite.connect(":memory:")
|
||||
con.close()
|
||||
def f(x): return 17
|
||||
try:
|
||||
con.create_function("foo", 1, f)
|
||||
self.fail("Should have raised a ProgrammingError")
|
||||
except sqlite.ProgrammingError:
|
||||
pass
|
||||
except:
|
||||
self.fail("Should have raised a ProgrammingError")
|
||||
|
||||
def CheckClosedCreateAggregate(self):
|
||||
con = sqlite.connect(":memory:")
|
||||
con.close()
|
||||
class Agg:
|
||||
def __init__(self):
|
||||
pass
|
||||
def step(self, x):
|
||||
pass
|
||||
def finalize(self):
|
||||
return 17
|
||||
try:
|
||||
con.create_aggregate("foo", 1, Agg)
|
||||
self.fail("Should have raised a ProgrammingError")
|
||||
except sqlite.ProgrammingError:
|
||||
pass
|
||||
except:
|
||||
self.fail("Should have raised a ProgrammingError")
|
||||
|
||||
def CheckClosedSetAuthorizer(self):
|
||||
con = sqlite.connect(":memory:")
|
||||
con.close()
|
||||
def authorizer(*args):
|
||||
return sqlite.DENY
|
||||
try:
|
||||
con.set_authorizer(authorizer)
|
||||
self.fail("Should have raised a ProgrammingError")
|
||||
except sqlite.ProgrammingError:
|
||||
pass
|
||||
except:
|
||||
self.fail("Should have raised a ProgrammingError")
|
||||
|
||||
def CheckClosedSetProgressCallback(self):
|
||||
con = sqlite.connect(":memory:")
|
||||
con.close()
|
||||
def progress(): pass
|
||||
try:
|
||||
con.set_progress_handler(progress, 100)
|
||||
self.fail("Should have raised a ProgrammingError")
|
||||
except sqlite.ProgrammingError:
|
||||
pass
|
||||
except:
|
||||
self.fail("Should have raised a ProgrammingError")
|
||||
|
||||
def CheckClosedCall(self):
|
||||
con = sqlite.connect(":memory:")
|
||||
con.close()
|
||||
try:
|
||||
con()
|
||||
self.fail("Should have raised a ProgrammingError")
|
||||
except sqlite.ProgrammingError:
|
||||
pass
|
||||
except:
|
||||
self.fail("Should have raised a ProgrammingError")
|
||||
|
||||
class ClosedCurTests(unittest.TestCase):
|
||||
def setUp(self):
|
||||
pass
|
||||
|
||||
def tearDown(self):
|
||||
pass
|
||||
|
||||
def CheckClosed(self):
|
||||
con = sqlite.connect(":memory:")
|
||||
cur = con.cursor()
|
||||
cur.close()
|
||||
|
||||
for method_name in ("execute", "executemany", "executescript", "fetchall", "fetchmany", "fetchone"):
|
||||
if method_name in ("execute", "executescript"):
|
||||
params = ("select 4 union select 5",)
|
||||
elif method_name == "executemany":
|
||||
params = ("insert into foo(bar) values (?)", [(3,), (4,)])
|
||||
else:
|
||||
params = []
|
||||
|
||||
try:
|
||||
method = getattr(cur, method_name)
|
||||
|
||||
method(*params)
|
||||
self.fail("Should have raised a ProgrammingError: method " + method_name)
|
||||
except sqlite.ProgrammingError:
|
||||
pass
|
||||
except:
|
||||
self.fail("Should have raised a ProgrammingError: " + method_name)
|
||||
|
||||
def suite():
|
||||
module_suite = unittest.makeSuite(ModuleTests, "Check")
|
||||
connection_suite = unittest.makeSuite(ConnectionTests, "Check")
|
||||
|
@ -770,8 +866,9 @@ def suite():
|
|||
thread_suite = unittest.makeSuite(ThreadTests, "Check")
|
||||
constructor_suite = unittest.makeSuite(ConstructorTests, "Check")
|
||||
ext_suite = unittest.makeSuite(ExtensionTests, "Check")
|
||||
closed_suite = unittest.makeSuite(ClosedTests, "Check")
|
||||
return unittest.TestSuite((module_suite, connection_suite, cursor_suite, thread_suite, constructor_suite, ext_suite, closed_suite))
|
||||
closed_con_suite = unittest.makeSuite(ClosedConTests, "Check")
|
||||
closed_cur_suite = unittest.makeSuite(ClosedCurTests, "Check")
|
||||
return unittest.TestSuite((module_suite, connection_suite, cursor_suite, thread_suite, constructor_suite, ext_suite, closed_con_suite, closed_cur_suite))
|
||||
|
||||
def test():
|
||||
runner = unittest.TextTestRunner()
|
||||
|
|
|
@ -70,16 +70,6 @@ class RegressionTests(unittest.TestCase):
|
|||
cur.execute('select 1 as "foo baz"')
|
||||
self.assertEqual(cur.description[0][0], "foo baz")
|
||||
|
||||
def CheckStatementAvailable(self):
|
||||
# pysqlite up to 2.3.2 crashed on this, because the active statement handle was not checked
|
||||
# before trying to fetch data from it. close() destroys the active statement ...
|
||||
con = sqlite.connect(":memory:", detect_types=sqlite.PARSE_DECLTYPES)
|
||||
cur = con.cursor()
|
||||
cur.execute("select 4 union select 5")
|
||||
cur.close()
|
||||
cur.fetchone()
|
||||
cur.fetchone()
|
||||
|
||||
def CheckStatementFinalizationOnCloseDb(self):
|
||||
# pysqlite versions <= 2.3.3 only finalized statements in the statement
|
||||
# cache when closing the database. statements that were still
|
||||
|
@ -167,6 +157,107 @@ class RegressionTests(unittest.TestCase):
|
|||
self.assertRaises(UnicodeEncodeError, setattr, con,
|
||||
"isolation_level", u"\xe9")
|
||||
|
||||
def CheckCursorConstructorCallCheck(self):
|
||||
"""
|
||||
Verifies that cursor methods check wether base class __init__ was called.
|
||||
"""
|
||||
class Cursor(sqlite.Cursor):
|
||||
def __init__(self, con):
|
||||
pass
|
||||
|
||||
con = sqlite.connect(":memory:")
|
||||
cur = Cursor(con)
|
||||
try:
|
||||
cur.execute("select 4+5").fetchall()
|
||||
self.fail("should have raised ProgrammingError")
|
||||
except sqlite.ProgrammingError:
|
||||
pass
|
||||
except:
|
||||
self.fail("should have raised ProgrammingError")
|
||||
|
||||
def CheckConnectionConstructorCallCheck(self):
|
||||
"""
|
||||
Verifies that connection methods check wether base class __init__ was called.
|
||||
"""
|
||||
class Connection(sqlite.Connection):
|
||||
def __init__(self, name):
|
||||
pass
|
||||
|
||||
con = Connection(":memory:")
|
||||
try:
|
||||
cur = con.cursor()
|
||||
self.fail("should have raised ProgrammingError")
|
||||
except sqlite.ProgrammingError:
|
||||
pass
|
||||
except:
|
||||
self.fail("should have raised ProgrammingError")
|
||||
|
||||
def CheckCursorRegistration(self):
|
||||
"""
|
||||
Verifies that subclassed cursor classes are correctly registered with
|
||||
the connection object, too. (fetch-across-rollback problem)
|
||||
"""
|
||||
class Connection(sqlite.Connection):
|
||||
def cursor(self):
|
||||
return Cursor(self)
|
||||
|
||||
class Cursor(sqlite.Cursor):
|
||||
def __init__(self, con):
|
||||
sqlite.Cursor.__init__(self, con)
|
||||
|
||||
con = Connection(":memory:")
|
||||
cur = con.cursor()
|
||||
cur.execute("create table foo(x)")
|
||||
cur.executemany("insert into foo(x) values (?)", [(3,), (4,), (5,)])
|
||||
cur.execute("select x from foo")
|
||||
con.rollback()
|
||||
try:
|
||||
cur.fetchall()
|
||||
self.fail("should have raised InterfaceError")
|
||||
except sqlite.InterfaceError:
|
||||
pass
|
||||
except:
|
||||
self.fail("should have raised InterfaceError")
|
||||
|
||||
def CheckAutoCommit(self):
|
||||
"""
|
||||
Verifies that creating a connection in autocommit mode works.
|
||||
2.5.3 introduced a regression so that these could no longer
|
||||
be created.
|
||||
"""
|
||||
con = sqlite.connect(":memory:", isolation_level=None)
|
||||
|
||||
def CheckPragmaAutocommit(self):
|
||||
"""
|
||||
Verifies that running a PRAGMA statement that does an autocommit does
|
||||
work. This did not work in 2.5.3/2.5.4.
|
||||
"""
|
||||
con = sqlite.connect(":memory:")
|
||||
cur = con.cursor()
|
||||
cur.execute("create table foo(bar)")
|
||||
cur.execute("insert into foo(bar) values (5)")
|
||||
|
||||
cur.execute("pragma page_size")
|
||||
row = cur.fetchone()
|
||||
|
||||
def CheckSetDict(self):
|
||||
"""
|
||||
See http://bugs.python.org/issue7478
|
||||
|
||||
It was possible to successfully register callbacks that could not be
|
||||
hashed. Return codes of PyDict_SetItem were not checked properly.
|
||||
"""
|
||||
class NotHashable:
|
||||
def __call__(self, *args, **kw):
|
||||
pass
|
||||
def __hash__(self):
|
||||
raise TypeError()
|
||||
var = NotHashable()
|
||||
con = sqlite.connect(":memory:")
|
||||
self.assertRaises(TypeError, con.create_function, var)
|
||||
self.assertRaises(TypeError, con.create_aggregate, var)
|
||||
self.assertRaises(TypeError, con.set_authorizer, var)
|
||||
self.assertRaises(TypeError, con.set_progress_handler, var)
|
||||
|
||||
def suite():
|
||||
regression_suite = unittest.makeSuite(RegressionTests, "Check")
|
||||
|
|
|
@ -148,6 +148,26 @@ class TransactionTests(unittest.TestCase):
|
|||
# NO self.con2.rollback() HERE!!!
|
||||
self.con1.commit()
|
||||
|
||||
def CheckRollbackCursorConsistency(self):
|
||||
"""
|
||||
Checks if cursors on the connection are set into a "reset" state
|
||||
when a rollback is done on the connection.
|
||||
"""
|
||||
con = sqlite.connect(":memory:")
|
||||
cur = con.cursor()
|
||||
cur.execute("create table test(x)")
|
||||
cur.execute("insert into test(x) values (5)")
|
||||
cur.execute("select 1 union select 2 union select 3")
|
||||
|
||||
con.rollback()
|
||||
try:
|
||||
cur.fetchall()
|
||||
self.fail("InterfaceError should have been raised")
|
||||
except sqlite.InterfaceError, e:
|
||||
pass
|
||||
except:
|
||||
self.fail("InterfaceError should have been raised")
|
||||
|
||||
class SpecialCommandTests(unittest.TestCase):
|
||||
def setUp(self):
|
||||
self.con = sqlite.connect(":memory:")
|
||||
|
|
|
@ -78,6 +78,33 @@ class SqliteTypeTests(unittest.TestCase):
|
|||
row = self.cur.fetchone()
|
||||
self.assertEqual(row[0], u"Österreich")
|
||||
|
||||
def CheckNonUtf8_Default(self):
|
||||
try:
|
||||
self.cur.execute("select ?", (chr(150),))
|
||||
self.fail("should have raised a ProgrammingError")
|
||||
except sqlite.ProgrammingError:
|
||||
pass
|
||||
|
||||
def CheckNonUtf8_TextFactoryString(self):
|
||||
orig_text_factory = self.con.text_factory
|
||||
try:
|
||||
self.con.text_factory = str
|
||||
self.cur.execute("select ?", (chr(150),))
|
||||
finally:
|
||||
self.con.text_factory = orig_text_factory
|
||||
|
||||
def CheckNonUtf8_TextFactoryOptimizedUnicode(self):
|
||||
orig_text_factory = self.con.text_factory
|
||||
try:
|
||||
try:
|
||||
self.con.text_factory = sqlite.OptimizedUnicode
|
||||
self.cur.execute("select ?", (chr(150),))
|
||||
self.fail("should have raised a ProgrammingError")
|
||||
except sqlite.ProgrammingError:
|
||||
pass
|
||||
finally:
|
||||
self.con.text_factory = orig_text_factory
|
||||
|
||||
class DeclTypesTests(unittest.TestCase):
|
||||
class Foo:
|
||||
def __init__(self, _val):
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue