mirror of
https://github.com/python/cpython.git
synced 2025-12-04 08:34:25 +00:00
Make the sqlite tests pass.
This commit is contained in:
parent
bd4a63e091
commit
6d21456137
7 changed files with 28 additions and 31 deletions
|
|
@ -139,32 +139,32 @@ class TextFactoryTests(unittest.TestCase):
|
||||||
self.con = sqlite.connect(":memory:")
|
self.con = sqlite.connect(":memory:")
|
||||||
|
|
||||||
def CheckUnicode(self):
|
def CheckUnicode(self):
|
||||||
austria = str("Österreich", "latin1")
|
austria = "Österreich"
|
||||||
row = self.con.execute("select ?", (austria,)).fetchone()
|
row = self.con.execute("select ?", (austria,)).fetchone()
|
||||||
self.failUnless(type(row[0]) == str, "type of row[0] must be unicode")
|
self.failUnless(type(row[0]) == str, "type of row[0] must be unicode")
|
||||||
|
|
||||||
def CheckString(self):
|
def CheckString(self):
|
||||||
self.con.text_factory = str
|
self.con.text_factory = bytes
|
||||||
austria = str("Österreich", "latin1")
|
austria = "Österreich"
|
||||||
row = self.con.execute("select ?", (austria,)).fetchone()
|
row = self.con.execute("select ?", (austria,)).fetchone()
|
||||||
self.failUnless(type(row[0]) == str, "type of row[0] must be str")
|
self.failUnless(type(row[0]) == bytes, "type of row[0] must be bytes")
|
||||||
self.failUnless(row[0] == austria.encode("utf-8"), "column must equal original data in UTF-8")
|
self.failUnless(row[0] == austria.encode("utf-8"), "column must equal original data in UTF-8")
|
||||||
|
|
||||||
def CheckCustom(self):
|
def CheckCustom(self):
|
||||||
self.con.text_factory = lambda x: str(x, "utf-8", "ignore")
|
self.con.text_factory = lambda x: str(x, "utf-8", "ignore")
|
||||||
austria = str("Österreich", "latin1")
|
austria = "Österreich"
|
||||||
row = self.con.execute("select ?", (austria.encode("latin1"),)).fetchone()
|
row = self.con.execute("select ?", (austria,)).fetchone()
|
||||||
self.failUnless(type(row[0]) == str, "type of row[0] must be unicode")
|
self.failUnless(type(row[0]) == str, "type of row[0] must be unicode")
|
||||||
self.failUnless(row[0].endswith("reich"), "column must contain original data")
|
self.failUnless(row[0].endswith("reich"), "column must contain original data")
|
||||||
|
|
||||||
def CheckOptimizedUnicode(self):
|
def CheckOptimizedUnicode(self):
|
||||||
self.con.text_factory = sqlite.OptimizedUnicode
|
self.con.text_factory = sqlite.OptimizedUnicode
|
||||||
austria = str("Österreich", "latin1")
|
austria = "Österreich"
|
||||||
germany = str("Deutchland")
|
germany = "Deutchland"
|
||||||
a_row = self.con.execute("select ?", (austria,)).fetchone()
|
a_row = self.con.execute("select ?", (austria,)).fetchone()
|
||||||
d_row = self.con.execute("select ?", (germany,)).fetchone()
|
d_row = self.con.execute("select ?", (germany,)).fetchone()
|
||||||
self.failUnless(type(a_row[0]) == str, "type of non-ASCII row must be unicode")
|
self.failUnless(type(a_row[0]) == str, "type of non-ASCII row must be unicode")
|
||||||
self.failUnless(type(d_row[0]) == str, "type of ASCII-only row must be str")
|
self.failUnless(type(d_row[0]) == str8, "type of ASCII-only row must be str8")
|
||||||
|
|
||||||
def tearDown(self):
|
def tearDown(self):
|
||||||
self.con.close()
|
self.con.close()
|
||||||
|
|
|
||||||
|
|
@ -62,7 +62,7 @@ class SqliteTypeTests(unittest.TestCase):
|
||||||
self.failUnlessEqual(row[0], val)
|
self.failUnlessEqual(row[0], val)
|
||||||
|
|
||||||
def CheckBlob(self):
|
def CheckBlob(self):
|
||||||
val = buffer("Guglhupf")
|
val = buffer(b"Guglhupf")
|
||||||
self.cur.execute("insert into test(b) values (?)", (val,))
|
self.cur.execute("insert into test(b) values (?)", (val,))
|
||||||
self.cur.execute("select b from test")
|
self.cur.execute("select b from test")
|
||||||
row = self.cur.fetchone()
|
row = self.cur.fetchone()
|
||||||
|
|
@ -203,7 +203,7 @@ class DeclTypesTests(unittest.TestCase):
|
||||||
|
|
||||||
def CheckBlob(self):
|
def CheckBlob(self):
|
||||||
# default
|
# default
|
||||||
val = buffer("Guglhupf")
|
val = buffer(b"Guglhupf")
|
||||||
self.cur.execute("insert into test(bin) values (?)", (val,))
|
self.cur.execute("insert into test(bin) values (?)", (val,))
|
||||||
self.cur.execute("select bin from test")
|
self.cur.execute("select bin from test")
|
||||||
row = self.cur.fetchone()
|
row = self.cur.fetchone()
|
||||||
|
|
@ -304,7 +304,7 @@ class BinaryConverterTests(unittest.TestCase):
|
||||||
self.con.close()
|
self.con.close()
|
||||||
|
|
||||||
def CheckBinaryInputForConverter(self):
|
def CheckBinaryInputForConverter(self):
|
||||||
testdata = "abcdefg" * 10
|
testdata = b"abcdefg" * 10
|
||||||
result = self.con.execute('select ? as "x [bin]"', (buffer(bz2.compress(testdata)),)).fetchone()[0]
|
result = self.con.execute('select ? as "x [bin]"', (buffer(bz2.compress(testdata)),)).fetchone()[0]
|
||||||
self.failUnlessEqual(testdata, result)
|
self.failUnlessEqual(testdata, result)
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -36,7 +36,7 @@ def func_returnfloat():
|
||||||
def func_returnnull():
|
def func_returnnull():
|
||||||
return None
|
return None
|
||||||
def func_returnblob():
|
def func_returnblob():
|
||||||
return buffer("blob")
|
return buffer(b"blob")
|
||||||
def func_raiseexception():
|
def func_raiseexception():
|
||||||
5/0
|
5/0
|
||||||
|
|
||||||
|
|
@ -197,7 +197,7 @@ class FunctionTests(unittest.TestCase):
|
||||||
cur.execute("select returnblob()")
|
cur.execute("select returnblob()")
|
||||||
val = cur.fetchone()[0]
|
val = cur.fetchone()[0]
|
||||||
self.failUnlessEqual(type(val), buffer)
|
self.failUnlessEqual(type(val), buffer)
|
||||||
self.failUnlessEqual(val, buffer("blob"))
|
self.failUnlessEqual(val, buffer(b"blob"))
|
||||||
|
|
||||||
def CheckFuncException(self):
|
def CheckFuncException(self):
|
||||||
cur = self.con.cursor()
|
cur = self.con.cursor()
|
||||||
|
|
|
||||||
|
|
@ -435,10 +435,8 @@ void _pysqlite_set_result(sqlite3_context* context, PyObject* py_val)
|
||||||
} else if (PyString_Check(py_val)) {
|
} else if (PyString_Check(py_val)) {
|
||||||
sqlite3_result_text(context, PyString_AsString(py_val), -1, SQLITE_TRANSIENT);
|
sqlite3_result_text(context, PyString_AsString(py_val), -1, SQLITE_TRANSIENT);
|
||||||
} else if (PyUnicode_Check(py_val)) {
|
} else if (PyUnicode_Check(py_val)) {
|
||||||
stringval = PyUnicode_AsUTF8String(py_val);
|
|
||||||
if (stringval) {
|
if (stringval) {
|
||||||
sqlite3_result_text(context, PyString_AsString(stringval), -1, SQLITE_TRANSIENT);
|
sqlite3_result_text(context, PyUnicode_AsString(stringval), -1, SQLITE_TRANSIENT);
|
||||||
Py_DECREF(stringval);
|
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
/* TODO: raise error */
|
/* TODO: raise error */
|
||||||
|
|
@ -1094,7 +1092,7 @@ pysqlite_connection_create_collation(pysqlite_Connection* self, PyObject* args)
|
||||||
goto finally;
|
goto finally;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!PyArg_ParseTuple(args, "O!O:create_collation(name, callback)", &PyString_Type, &name, &callable)) {
|
if (!PyArg_ParseTuple(args, "O!O:create_collation(name, callback)", &PyUnicode_Type, &name, &callable)) {
|
||||||
goto finally;
|
goto finally;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -248,7 +248,7 @@ PyObject* _pysqlite_build_column_name(const char* colname)
|
||||||
if ((*pos == '[') && (pos > colname) && (*(pos-1) == ' ')) {
|
if ((*pos == '[') && (pos > colname) && (*(pos-1) == ' ')) {
|
||||||
pos--;
|
pos--;
|
||||||
}
|
}
|
||||||
return PyString_FromStringAndSize(colname, pos - colname);
|
return PyUnicode_FromStringAndSize(colname, pos - colname);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -372,8 +372,10 @@ PyObject* _pysqlite_fetch_one_row(pysqlite_Cursor* self)
|
||||||
}
|
}
|
||||||
} else if (self->connection->text_factory == (PyObject*)&PyString_Type) {
|
} else if (self->connection->text_factory == (PyObject*)&PyString_Type) {
|
||||||
converted = PyString_FromString(val_str);
|
converted = PyString_FromString(val_str);
|
||||||
|
} else if (self->connection->text_factory == (PyObject*)&PyBytes_Type) {
|
||||||
|
converted = PyBytes_FromStringAndSize(val_str, strlen(val_str));
|
||||||
} else {
|
} else {
|
||||||
converted = PyObject_CallFunction(self->connection->text_factory, "s", val_str);
|
converted = PyObject_CallFunction(self->connection->text_factory, "y", val_str);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
/* coltype == SQLITE_BLOB */
|
/* coltype == SQLITE_BLOB */
|
||||||
|
|
@ -746,17 +748,13 @@ PyObject* pysqlite_cursor_executescript(pysqlite_Cursor* self, PyObject* args)
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (PyString_Check(script_obj)) {
|
if (PyUnicode_Check(script_obj)) {
|
||||||
script_cstr = PyString_AsString(script_obj);
|
script_cstr = PyUnicode_AsString(script_obj);
|
||||||
} else if (PyUnicode_Check(script_obj)) {
|
if (!script_cstr) {
|
||||||
script_str = PyUnicode_AsUTF8String(script_obj);
|
|
||||||
if (!script_str) {
|
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
script_cstr = PyString_AsString(script_str);
|
|
||||||
} else {
|
} else {
|
||||||
PyErr_SetString(PyExc_ValueError, "script argument must be unicode or string.");
|
PyErr_SetString(PyExc_ValueError, "script argument must be unicode.");
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -86,8 +86,8 @@ PyObject* pysqlite_row_subscript(pysqlite_Row* self, PyObject* idx)
|
||||||
item = PyTuple_GetItem(self->data, _idx);
|
item = PyTuple_GetItem(self->data, _idx);
|
||||||
Py_XINCREF(item);
|
Py_XINCREF(item);
|
||||||
return item;
|
return item;
|
||||||
} else if (PyString_Check(idx)) {
|
} else if (PyUnicode_Check(idx)) {
|
||||||
key = PyString_AsString(idx);
|
key = PyUnicode_AsString(idx);
|
||||||
|
|
||||||
nitems = PyTuple_Size(self->description);
|
nitems = PyTuple_Size(self->description);
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -113,7 +113,8 @@ int pysqlite_statement_bind_parameter(pysqlite_Statement* self, int pos, PyObjec
|
||||||
rc = sqlite3_bind_text(self->st, pos, string, -1, SQLITE_TRANSIENT);
|
rc = sqlite3_bind_text(self->st, pos, string, -1, SQLITE_TRANSIENT);
|
||||||
} else if PyUnicode_Check(parameter) {
|
} else if PyUnicode_Check(parameter) {
|
||||||
stringval = PyUnicode_AsUTF8String(parameter);
|
stringval = PyUnicode_AsUTF8String(parameter);
|
||||||
string = PyString_AsString(stringval);
|
string = PyBytes_AsString(stringval);
|
||||||
|
|
||||||
rc = sqlite3_bind_text(self->st, pos, string, -1, SQLITE_TRANSIENT);
|
rc = sqlite3_bind_text(self->st, pos, string, -1, SQLITE_TRANSIENT);
|
||||||
Py_DECREF(stringval);
|
Py_DECREF(stringval);
|
||||||
} else {
|
} else {
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue