Updated the sqlite3 module to the external pysqlite 2.2.2 version.

This commit is contained in:
Gerhard Häring 2006-04-23 15:24:26 +00:00
parent 5ef9d9fdb9
commit 3e99c0ad64
22 changed files with 310 additions and 313 deletions

View file

@ -22,6 +22,9 @@
# 3. This notice may not be removed or altered from any source distribution.
import datetime
import time
from _sqlite3 import *
paramstyle = "qmark"
@ -29,10 +32,6 @@ threadsafety = 1
apilevel = "2.0"
from _sqlite3 import *
import datetime, time
Date = datetime.date
Time = datetime.time
@ -40,45 +39,50 @@ Time = datetime.time
Timestamp = datetime.datetime
def DateFromTicks(ticks):
return apply(Date,time.localtime(ticks)[:3])
return apply(Date, time.localtime(ticks)[:3])
def TimeFromTicks(ticks):
return apply(Time,time.localtime(ticks)[3:6])
return apply(Time, time.localtime(ticks)[3:6])
def TimestampFromTicks(ticks):
return apply(Timestamp,time.localtime(ticks)[:6])
return apply(Timestamp, time.localtime(ticks)[:6])
_major, _minor, _micro = version.split(".")
version_info = (int(_major), int(_minor), _micro)
_major, _minor, _micro = sqlite_version.split(".")
sqlite_version_info = (int(_major), int(_minor), _micro)
version_info = tuple([int(x) for x in version.split(".")])
sqlite_version_info = tuple([int(x) for x in sqlite_version.split(".")])
Binary = buffer
def adapt_date(val):
return val.isoformat()
def register_adapters_and_converters():
def adapt_date(val):
return val.isoformat()
def adapt_datetime(val):
return val.isoformat(" ")
def adapt_datetime(val):
return val.isoformat(" ")
def convert_date(val):
return datetime.date(*map(int, val.split("-")))
def convert_date(val):
return datetime.date(*map(int, val.split("-")))
def convert_timestamp(val):
datepart, timepart = val.split(" ")
year, month, day = map(int, datepart.split("-"))
timepart_full = timepart.split(".")
hours, minutes, seconds = map(int, timepart_full[0].split(":"))
if len(timepart_full) == 2:
microseconds = int(float("0." + timepart_full[1]) * 1000000)
else:
microseconds = 0
def convert_timestamp(val):
datepart, timepart = val.split(" ")
year, month, day = map(int, datepart.split("-"))
timepart_full = timepart.split(".")
hours, minutes, seconds = map(int, timepart_full[0].split(":"))
if len(timepart_full) == 2:
microseconds = int(float("0." + timepart_full[1]) * 1000000)
else:
microseconds = 0
val = datetime.datetime(year, month, day, hours, minutes, seconds, microseconds)
return val
val = datetime.datetime(year, month, day, hours, minutes, seconds, microseconds)
return val
register_adapter(datetime.date, adapt_date)
register_adapter(datetime.datetime, adapt_datetime)
register_converter("date", convert_date)
register_converter("timestamp", convert_timestamp)
register_adapter(datetime.date, adapt_date)
register_adapter(datetime.datetime, adapt_datetime)
register_converter("date", convert_date)
register_converter("timestamp", convert_timestamp)
register_adapters_and_converters()
# Clean up namespace
del(register_adapters_and_converters)

View file

@ -22,7 +22,7 @@
# 3. This notice may not be removed or altered from any source distribution.
import os, unittest
import pysqlite2.dbapi2 as sqlite
import sqlite3 as sqlite
class CollationTests(unittest.TestCase):
def setUp(self):
@ -72,7 +72,7 @@ class CollationTests(unittest.TestCase):
result = con.execute(sql).fetchall()
self.fail("should have raised an OperationalError")
except sqlite.OperationalError, e:
self.failUnlessEqual(e.args[0], "no such collation sequence: mycoll")
self.failUnlessEqual(e.args[0].lower(), "no such collation sequence: mycoll")
def CheckCollationRegisterTwice(self):
"""

View file

@ -22,7 +22,7 @@
# 3. This notice may not be removed or altered from any source distribution.
import unittest
import pysqlite2.dbapi2 as sqlite
import sqlite3 as sqlite
class RegressionTests(unittest.TestCase):
def setUp(self):
@ -36,6 +36,31 @@ class RegressionTests(unittest.TestCase):
cur = self.con.cursor()
cur.execute("pragma user_version")
def CheckPragmaSchemaVersion(self):
# This still crashed pysqlite <= 2.2.1
con = sqlite.connect(":memory:", detect_types=sqlite.PARSE_COLNAMES)
try:
cur = self.con.cursor()
cur.execute("pragma schema_version")
finally:
cur.close()
con.close()
def CheckStatementReset(self):
# pysqlite 2.1.0 to 2.2.0 have the problem that not all statements are
# reset before a rollback, but only those that are still in the
# statement cache. The others are not accessible from the connection object.
con = sqlite.connect(":memory:", cached_statements=5)
cursors = [con.cursor() for x in xrange(5)]
cursors[0].execute("create table test(x)")
for i in range(10):
cursors[0].executemany("insert into test(x) values (?)", [(x,) for x in xrange(10)])
for i in range(5):
cursors[i].execute(" " * i + "select x from test")
con.rollback()
def suite():
regression_suite = unittest.makeSuite(RegressionTests, "Check")
return unittest.TestSuite((regression_suite,))

View file

@ -134,6 +134,13 @@ class FunctionTests(unittest.TestCase):
def tearDown(self):
self.con.close()
def CheckFuncErrorOnCreate(self):
try:
self.con.create_function("bla", -100, lambda x: 2*x)
self.fail("should have raised an OperationalError")
except sqlite.OperationalError:
pass
def CheckFuncRefCount(self):
def getfunc():
def f():
@ -251,6 +258,13 @@ class AggregateTests(unittest.TestCase):
#self.con.close()
pass
def CheckAggrErrorOnCreate(self):
try:
self.con.create_function("bla", -100, AggrSum)
self.fail("should have raised an OperationalError")
except sqlite.OperationalError:
pass
def CheckAggrNoStep(self):
cur = self.con.cursor()
cur.execute("select nostep(t) from test")