cpython/Lib/test/test_sqlite3/test_dump.py
Erlend E. Aasland e38b43c213
gh-118221: Always use the default row factory in sqlite3.iterdump() (#118223)
sqlite3.iterdump() depends on the row factory returning resulting rows
as tuples; it will fail with custom row factories like for example a
dict factory.

With this commit, we explicitly reset the row factory of the cursor used
by iterdump(), so we always get predictable results. This does not
affect the row factory of the parent connection.

Co-authored-by: Mariusz Felisiak <felisiak.mariusz@gmail.com>
Co-authored-by: Serhiy Storchaka <storchaka@gmail.com>
2024-04-25 10:11:45 +02:00

230 lines
8.8 KiB
Python

# Author: Paul Kippes <kippesp@gmail.com>
import unittest
from .util import memory_database
from .util import MemoryDatabaseMixin
class DumpTests(MemoryDatabaseMixin, unittest.TestCase):
def test_table_dump(self):
expected_sqls = [
"""CREATE TABLE "index"("index" blob);"""
,
"""INSERT INTO "index" VALUES(X'01');"""
,
"""CREATE TABLE "quoted""table"("quoted""field" text);"""
,
"""INSERT INTO "quoted""table" VALUES('quoted''value');"""
,
"CREATE TABLE t1(id integer primary key, s1 text, " \
"t1_i1 integer not null, i2 integer, unique (s1), " \
"constraint t1_idx1 unique (i2), " \
"constraint t1_i1_idx1 unique (t1_i1));"
,
"INSERT INTO \"t1\" VALUES(1,'foo',10,20);"
,
"INSERT INTO \"t1\" VALUES(2,'foo2',30,30);"
,
"CREATE TABLE t2(id integer, t2_i1 integer, " \
"t2_i2 integer, primary key (id)," \
"foreign key(t2_i1) references t1(t1_i1));"
,
# Foreign key violation.
"INSERT INTO \"t2\" VALUES(1,2,3);"
,
"CREATE TRIGGER trigger_1 update of t1_i1 on t1 " \
"begin " \
"update t2 set t2_i1 = new.t1_i1 where t2_i1 = old.t1_i1; " \
"end;"
,
"CREATE VIEW v1 as select * from t1 left join t2 " \
"using (id);"
]
[self.cu.execute(s) for s in expected_sqls]
i = self.cx.iterdump()
actual_sqls = [s for s in i]
expected_sqls = [
"PRAGMA foreign_keys=OFF;",
"BEGIN TRANSACTION;",
*expected_sqls,
"COMMIT;",
]
[self.assertEqual(expected_sqls[i], actual_sqls[i])
for i in range(len(expected_sqls))]
def test_table_dump_filter(self):
all_table_sqls = [
"""CREATE TABLE "some_table_2" ("id_1" INTEGER);""",
"""INSERT INTO "some_table_2" VALUES(3);""",
"""INSERT INTO "some_table_2" VALUES(4);""",
"""CREATE TABLE "test_table_1" ("id_2" INTEGER);""",
"""INSERT INTO "test_table_1" VALUES(1);""",
"""INSERT INTO "test_table_1" VALUES(2);""",
]
all_views_sqls = [
"""CREATE VIEW "view_1" AS SELECT * FROM "some_table_2";""",
"""CREATE VIEW "view_2" AS SELECT * FROM "test_table_1";""",
]
# Create database structure.
for sql in [*all_table_sqls, *all_views_sqls]:
self.cu.execute(sql)
# %_table_% matches all tables.
dump_sqls = list(self.cx.iterdump(filter="%_table_%"))
self.assertEqual(
dump_sqls,
["BEGIN TRANSACTION;", *all_table_sqls, "COMMIT;"],
)
# view_% matches all views.
dump_sqls = list(self.cx.iterdump(filter="view_%"))
self.assertEqual(
dump_sqls,
["BEGIN TRANSACTION;", *all_views_sqls, "COMMIT;"],
)
# %_1 matches tables and views with the _1 suffix.
dump_sqls = list(self.cx.iterdump(filter="%_1"))
self.assertEqual(
dump_sqls,
[
"BEGIN TRANSACTION;",
"""CREATE TABLE "test_table_1" ("id_2" INTEGER);""",
"""INSERT INTO "test_table_1" VALUES(1);""",
"""INSERT INTO "test_table_1" VALUES(2);""",
"""CREATE VIEW "view_1" AS SELECT * FROM "some_table_2";""",
"COMMIT;"
],
)
# some_% matches some_table_2.
dump_sqls = list(self.cx.iterdump(filter="some_%"))
self.assertEqual(
dump_sqls,
[
"BEGIN TRANSACTION;",
"""CREATE TABLE "some_table_2" ("id_1" INTEGER);""",
"""INSERT INTO "some_table_2" VALUES(3);""",
"""INSERT INTO "some_table_2" VALUES(4);""",
"COMMIT;"
],
)
# Only single object.
dump_sqls = list(self.cx.iterdump(filter="view_2"))
self.assertEqual(
dump_sqls,
[
"BEGIN TRANSACTION;",
"""CREATE VIEW "view_2" AS SELECT * FROM "test_table_1";""",
"COMMIT;"
],
)
# % matches all objects.
dump_sqls = list(self.cx.iterdump(filter="%"))
self.assertEqual(
dump_sqls,
["BEGIN TRANSACTION;", *all_table_sqls, *all_views_sqls, "COMMIT;"],
)
def test_dump_autoincrement(self):
expected = [
'CREATE TABLE "t1" (id integer primary key autoincrement);',
'INSERT INTO "t1" VALUES(NULL);',
'CREATE TABLE "t2" (id integer primary key autoincrement);',
]
self.cu.executescript("".join(expected))
# the NULL value should now be automatically be set to 1
expected[1] = expected[1].replace("NULL", "1")
expected.insert(0, "BEGIN TRANSACTION;")
expected.extend([
'DELETE FROM "sqlite_sequence";',
'INSERT INTO "sqlite_sequence" VALUES(\'t1\',1);',
'COMMIT;',
])
actual = [stmt for stmt in self.cx.iterdump()]
self.assertEqual(expected, actual)
def test_dump_autoincrement_create_new_db(self):
self.cu.execute("BEGIN TRANSACTION")
self.cu.execute("CREATE TABLE t1 (id integer primary key autoincrement)")
self.cu.execute("CREATE TABLE t2 (id integer primary key autoincrement)")
self.cu.executemany("INSERT INTO t1 VALUES(?)", ((None,) for _ in range(9)))
self.cu.executemany("INSERT INTO t2 VALUES(?)", ((None,) for _ in range(4)))
self.cx.commit()
with memory_database() as cx2:
query = "".join(self.cx.iterdump())
cx2.executescript(query)
cu2 = cx2.cursor()
dataset = (
("t1", 9),
("t2", 4),
)
for table, seq in dataset:
with self.subTest(table=table, seq=seq):
res = cu2.execute("""
SELECT "seq" FROM "sqlite_sequence" WHERE "name" == ?
""", (table,))
rows = res.fetchall()
self.assertEqual(rows[0][0], seq)
def test_unorderable_row(self):
# iterdump() should be able to cope with unorderable row types (issue #15545)
class UnorderableRow:
def __init__(self, cursor, row):
self.row = row
def __getitem__(self, index):
return self.row[index]
self.cx.row_factory = UnorderableRow
CREATE_ALPHA = """CREATE TABLE "alpha" ("one");"""
CREATE_BETA = """CREATE TABLE "beta" ("two");"""
expected = [
"BEGIN TRANSACTION;",
CREATE_ALPHA,
CREATE_BETA,
"COMMIT;"
]
self.cu.execute(CREATE_BETA)
self.cu.execute(CREATE_ALPHA)
got = list(self.cx.iterdump())
self.assertEqual(expected, got)
def test_dump_custom_row_factory(self):
# gh-118221: iterdump should be able to cope with custom row factories.
def dict_factory(cu, row):
fields = [col[0] for col in cu.description]
return dict(zip(fields, row))
self.cx.row_factory = dict_factory
CREATE_TABLE = "CREATE TABLE test(t);"
expected = ["BEGIN TRANSACTION;", CREATE_TABLE, "COMMIT;"]
self.cu.execute(CREATE_TABLE)
actual = list(self.cx.iterdump())
self.assertEqual(expected, actual)
self.assertEqual(self.cx.row_factory, dict_factory)
def test_dump_virtual_tables(self):
# gh-64662
expected = [
"BEGIN TRANSACTION;",
"PRAGMA writable_schema=ON;",
("INSERT INTO sqlite_master(type,name,tbl_name,rootpage,sql)"
"VALUES('table','test','test',0,'CREATE VIRTUAL TABLE test USING fts4(example)');"),
"CREATE TABLE 'test_content'(docid INTEGER PRIMARY KEY, 'c0example');",
"CREATE TABLE 'test_docsize'(docid INTEGER PRIMARY KEY, size BLOB);",
("CREATE TABLE 'test_segdir'(level INTEGER,idx INTEGER,start_block INTEGER,"
"leaves_end_block INTEGER,end_block INTEGER,root BLOB,PRIMARY KEY(level, idx));"),
"CREATE TABLE 'test_segments'(blockid INTEGER PRIMARY KEY, block BLOB);",
"CREATE TABLE 'test_stat'(id INTEGER PRIMARY KEY, value BLOB);",
"PRAGMA writable_schema=OFF;",
"COMMIT;"
]
self.cu.execute("CREATE VIRTUAL TABLE test USING fts4(example)")
actual = list(self.cx.iterdump())
self.assertEqual(expected, actual)
if __name__ == "__main__":
unittest.main()