gh-79579: Improve DML query detection in sqlite3 (#93623)

The fix involves using pysqlite_check_remaining_sql(), not only to check
for multiple statements, but now also to strip leading comments and
whitespace from SQL statements, so we can improve DML query detection.

pysqlite_check_remaining_sql() is renamed lstrip_sql(), to more
accurately reflect its function, and hardened to handle more SQL comment
corner cases.
This commit is contained in:
Erlend Egeberg Aasland 2022-06-14 13:56:36 +02:00 committed by GitHub
parent e566ce5496
commit 46740073ef
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 103 additions and 84 deletions

View file

@ -746,22 +746,44 @@ class CursorTests(unittest.TestCase):
with self.assertRaises(sqlite.OperationalError):
self.cu.execute("select asdf")
def test_execute_too_much_sql(self):
self.assertRaisesRegex(sqlite.ProgrammingError,
"You can only execute one statement at a time",
self.cu.execute, "select 5+4; select 4+5")
def test_execute_multiple_statements(self):
msg = "You can only execute one statement at a time"
dataset = (
"select 1; select 2",
"select 1; // c++ comments are not allowed",
"select 1; *not a comment",
"select 1; -*not a comment",
"select 1; /* */ a",
"select 1; /**/a",
"select 1; -",
"select 1; /",
"select 1; -\n- select 2",
"""select 1;
-- comment
select 2
""",
)
for query in dataset:
with self.subTest(query=query):
with self.assertRaisesRegex(sqlite.ProgrammingError, msg):
self.cu.execute(query)
def test_execute_too_much_sql2(self):
self.cu.execute("select 5+4; -- foo bar")
def test_execute_too_much_sql3(self):
self.cu.execute("""
def test_execute_with_appended_comments(self):
dataset = (
"select 1; -- foo bar",
"select 1; --",
"select 1; /*", # Unclosed comments ending in \0 are skipped.
"""
select 5+4;
/*
foo
*/
""")
""",
)
for query in dataset:
with self.subTest(query=query):
self.cu.execute(query)
def test_execute_wrong_sql_arg(self):
with self.assertRaises(TypeError):
@ -906,6 +928,30 @@ class CursorTests(unittest.TestCase):
self.assertEqual(self.cu.fetchone()[0], 1)
self.assertEqual(self.cu.rowcount, 1)
def test_rowcount_prefixed_with_comment(self):
# gh-79579: rowcount is updated even if query is prefixed with comments
self.cu.execute("""
-- foo
insert into test(name) values ('foo'), ('foo')
""")
self.assertEqual(self.cu.rowcount, 2)
self.cu.execute("""
/* -- messy *r /* /* ** *- *--
*/
/* one more */ insert into test(name) values ('messy')
""")
self.assertEqual(self.cu.rowcount, 1)
self.cu.execute("/* bar */ update test set name='bar' where name='foo'")
self.assertEqual(self.cu.rowcount, 3)
def test_rowcount_vaccuum(self):
data = ((1,), (2,), (3,))
self.cu.executemany("insert into test(income) values(?)", data)
self.assertEqual(self.cu.rowcount, 3)
self.cx.commit()
self.cu.execute("vacuum")
self.assertEqual(self.cu.rowcount, -1)
def test_total_changes(self):
self.cu.execute("insert into test(name) values ('foo')")
self.cu.execute("insert into test(name) values ('foo')")