sqlite3: Handle strings with embedded zeros correctly

Closes #13676.
This commit is contained in:
Petri Lehtinen 2012-02-01 22:18:19 +02:00
parent fc3ba6b8fc
commit 023fe334bb
5 changed files with 54 additions and 9 deletions

View file

@ -225,6 +225,13 @@ class CursorTests(unittest.TestCase):
def CheckExecuteArgString(self): def CheckExecuteArgString(self):
self.cu.execute("insert into test(name) values (?)", ("Hugo",)) self.cu.execute("insert into test(name) values (?)", ("Hugo",))
def CheckExecuteArgStringWithZeroByte(self):
self.cu.execute("insert into test(name) values (?)", ("Hu\x00go",))
self.cu.execute("select name from test where id=?", (self.cu.lastrowid,))
row = self.cu.fetchone()
self.assertEqual(row[0], "Hu\x00go")
def CheckExecuteWrongNoOfArgs1(self): def CheckExecuteWrongNoOfArgs1(self):
# too many parameters # too many parameters
try: try:

View file

@ -189,13 +189,48 @@ class TextFactoryTests(unittest.TestCase):
def tearDown(self): def tearDown(self):
self.con.close() self.con.close()
class TextFactoryTestsWithEmbeddedZeroBytes(unittest.TestCase):
def setUp(self):
self.con = sqlite.connect(":memory:")
self.con.execute("create table test (value text)")
self.con.execute("insert into test (value) values (?)", ("a\x00b",))
def CheckString(self):
# text_factory defaults to str
row = self.con.execute("select value from test").fetchone()
self.assertIs(type(row[0]), str)
self.assertEqual(row[0], "a\x00b")
def CheckBytes(self):
self.con.text_factory = bytes
row = self.con.execute("select value from test").fetchone()
self.assertIs(type(row[0]), bytes)
self.assertEqual(row[0], b"a\x00b")
def CheckBytearray(self):
self.con.text_factory = bytearray
row = self.con.execute("select value from test").fetchone()
self.assertIs(type(row[0]), bytearray)
self.assertEqual(row[0], b"a\x00b")
def CheckCustom(self):
# A custom factory should receive a bytes argument
self.con.text_factory = lambda x: x
row = self.con.execute("select value from test").fetchone()
self.assertIs(type(row[0]), bytes)
self.assertEqual(row[0], b"a\x00b")
def tearDown(self):
self.con.close()
def suite(): def suite():
connection_suite = unittest.makeSuite(ConnectionFactoryTests, "Check") connection_suite = unittest.makeSuite(ConnectionFactoryTests, "Check")
cursor_suite = unittest.makeSuite(CursorFactoryTests, "Check") cursor_suite = unittest.makeSuite(CursorFactoryTests, "Check")
row_suite_compat = unittest.makeSuite(RowFactoryTestsBackwardsCompat, "Check") row_suite_compat = unittest.makeSuite(RowFactoryTestsBackwardsCompat, "Check")
row_suite = unittest.makeSuite(RowFactoryTests, "Check") row_suite = unittest.makeSuite(RowFactoryTests, "Check")
text_suite = unittest.makeSuite(TextFactoryTests, "Check") text_suite = unittest.makeSuite(TextFactoryTests, "Check")
return unittest.TestSuite((connection_suite, cursor_suite, row_suite_compat, row_suite, text_suite)) text_zero_bytes_suite = unittest.makeSuite(TextFactoryTestsWithEmbeddedZeroBytes, "Check")
return unittest.TestSuite((connection_suite, cursor_suite, row_suite_compat, row_suite, text_suite, text_zero_bytes_suite))
def test(): def test():
runner = unittest.TextTestRunner() runner = unittest.TextTestRunner()

View file

@ -113,6 +113,8 @@ Core and Builtins
Library Library
------- -------
- Issue #13676: Handle strings with embedded zeros correctly in sqlite3.
- Issue #13506: Add '' to path for IDLE Shell when started and restarted with Restart Shell. - Issue #13506: Add '' to path for IDLE Shell when started and restarted with Restart Shell.
Original patches by Marco Scataglini and Roger Serwy. Original patches by Marco Scataglini and Roger Serwy.

View file

@ -268,9 +268,9 @@ PyObject* _pysqlite_build_column_name(const char* colname)
} }
} }
PyObject* pysqlite_unicode_from_string(const char* val_str, int optimize) PyObject* pysqlite_unicode_from_string(const char* val_str, Py_ssize_t size, int optimize)
{ {
return PyUnicode_FromString(val_str); return PyUnicode_FromStringAndSize(val_str, size);
} }
/* /*
@ -355,10 +355,11 @@ PyObject* _pysqlite_fetch_one_row(pysqlite_Cursor* self)
converted = PyFloat_FromDouble(sqlite3_column_double(self->statement->st, i)); converted = PyFloat_FromDouble(sqlite3_column_double(self->statement->st, i));
} else if (coltype == SQLITE_TEXT) { } else if (coltype == SQLITE_TEXT) {
val_str = (const char*)sqlite3_column_text(self->statement->st, i); val_str = (const char*)sqlite3_column_text(self->statement->st, i);
nbytes = sqlite3_column_bytes(self->statement->st, i);
if ((self->connection->text_factory == (PyObject*)&PyUnicode_Type) if ((self->connection->text_factory == (PyObject*)&PyUnicode_Type)
|| (self->connection->text_factory == pysqlite_OptimizedUnicode)) { || (self->connection->text_factory == pysqlite_OptimizedUnicode)) {
converted = pysqlite_unicode_from_string(val_str, converted = pysqlite_unicode_from_string(val_str, nbytes,
self->connection->text_factory == pysqlite_OptimizedUnicode ? 1 : 0); self->connection->text_factory == pysqlite_OptimizedUnicode ? 1 : 0);
if (!converted) { if (!converted) {
@ -383,11 +384,11 @@ PyObject* _pysqlite_fetch_one_row(pysqlite_Cursor* self)
} }
} }
} else if (self->connection->text_factory == (PyObject*)&PyBytes_Type) { } else if (self->connection->text_factory == (PyObject*)&PyBytes_Type) {
converted = PyBytes_FromString(val_str); converted = PyBytes_FromStringAndSize(val_str, nbytes);
} else if (self->connection->text_factory == (PyObject*)&PyByteArray_Type) { } else if (self->connection->text_factory == (PyObject*)&PyByteArray_Type) {
converted = PyByteArray_FromStringAndSize(val_str, strlen(val_str)); converted = PyByteArray_FromStringAndSize(val_str, nbytes);
} else { } else {
converted = PyObject_CallFunction(self->connection->text_factory, "y", val_str); converted = PyObject_CallFunction(self->connection->text_factory, "y#", val_str, nbytes);
} }
} else { } else {
/* coltype == SQLITE_BLOB */ /* coltype == SQLITE_BLOB */

View file

@ -129,9 +129,9 @@ int pysqlite_statement_bind_parameter(pysqlite_Statement* self, int pos, PyObjec
rc = sqlite3_bind_double(self->st, pos, PyFloat_AsDouble(parameter)); rc = sqlite3_bind_double(self->st, pos, PyFloat_AsDouble(parameter));
break; break;
case TYPE_UNICODE: case TYPE_UNICODE:
string = _PyUnicode_AsString(parameter); string = _PyUnicode_AsStringAndSize(parameter, &buflen);
if (string != NULL) if (string != NULL)
rc = sqlite3_bind_text(self->st, pos, string, -1, SQLITE_TRANSIENT); rc = sqlite3_bind_text(self->st, pos, string, buflen, SQLITE_TRANSIENT);
else else
rc = -1; rc = -1;
break; break;