mirror of
https://github.com/python/cpython.git
synced 2025-09-07 01:11:26 +00:00

Use unittest discover instead of manually enumerating all test modules and classes. Also add support for filtering them by pattern.
164 lines
5.6 KiB
Python
164 lines
5.6 KiB
Python
import sqlite3 as sqlite
|
|
import unittest
|
|
|
|
|
|
class BackupTests(unittest.TestCase):
|
|
def setUp(self):
|
|
cx = self.cx = sqlite.connect(":memory:")
|
|
cx.execute('CREATE TABLE foo (key INTEGER)')
|
|
cx.executemany('INSERT INTO foo (key) VALUES (?)', [(3,), (4,)])
|
|
cx.commit()
|
|
|
|
def tearDown(self):
|
|
self.cx.close()
|
|
|
|
def verify_backup(self, bckcx):
|
|
result = bckcx.execute("SELECT key FROM foo ORDER BY key").fetchall()
|
|
self.assertEqual(result[0][0], 3)
|
|
self.assertEqual(result[1][0], 4)
|
|
|
|
def test_bad_target(self):
|
|
with self.assertRaises(TypeError):
|
|
self.cx.backup(None)
|
|
with self.assertRaises(TypeError):
|
|
self.cx.backup()
|
|
|
|
def test_bad_target_filename(self):
|
|
with self.assertRaises(TypeError):
|
|
self.cx.backup('some_file_name.db')
|
|
|
|
def test_bad_target_same_connection(self):
|
|
with self.assertRaises(ValueError):
|
|
self.cx.backup(self.cx)
|
|
|
|
def test_bad_target_closed_connection(self):
|
|
bck = sqlite.connect(':memory:')
|
|
bck.close()
|
|
with self.assertRaises(sqlite.ProgrammingError):
|
|
self.cx.backup(bck)
|
|
|
|
def test_bad_source_closed_connection(self):
|
|
bck = sqlite.connect(':memory:')
|
|
source = sqlite.connect(":memory:")
|
|
source.close()
|
|
with self.assertRaises(sqlite.ProgrammingError):
|
|
source.backup(bck)
|
|
|
|
def test_bad_target_in_transaction(self):
|
|
bck = sqlite.connect(':memory:')
|
|
bck.execute('CREATE TABLE bar (key INTEGER)')
|
|
bck.executemany('INSERT INTO bar (key) VALUES (?)', [(3,), (4,)])
|
|
with self.assertRaises(sqlite.OperationalError) as cm:
|
|
self.cx.backup(bck)
|
|
if sqlite.sqlite_version_info < (3, 8, 8):
|
|
self.assertEqual(str(cm.exception), 'target is in transaction')
|
|
|
|
def test_keyword_only_args(self):
|
|
with self.assertRaises(TypeError):
|
|
with sqlite.connect(':memory:') as bck:
|
|
self.cx.backup(bck, 1)
|
|
|
|
def test_simple(self):
|
|
with sqlite.connect(':memory:') as bck:
|
|
self.cx.backup(bck)
|
|
self.verify_backup(bck)
|
|
|
|
def test_progress(self):
|
|
journal = []
|
|
|
|
def progress(status, remaining, total):
|
|
journal.append(status)
|
|
|
|
with sqlite.connect(':memory:') as bck:
|
|
self.cx.backup(bck, pages=1, progress=progress)
|
|
self.verify_backup(bck)
|
|
|
|
self.assertEqual(len(journal), 2)
|
|
self.assertEqual(journal[0], sqlite.SQLITE_OK)
|
|
self.assertEqual(journal[1], sqlite.SQLITE_DONE)
|
|
|
|
def test_progress_all_pages_at_once_1(self):
|
|
journal = []
|
|
|
|
def progress(status, remaining, total):
|
|
journal.append(remaining)
|
|
|
|
with sqlite.connect(':memory:') as bck:
|
|
self.cx.backup(bck, progress=progress)
|
|
self.verify_backup(bck)
|
|
|
|
self.assertEqual(len(journal), 1)
|
|
self.assertEqual(journal[0], 0)
|
|
|
|
def test_progress_all_pages_at_once_2(self):
|
|
journal = []
|
|
|
|
def progress(status, remaining, total):
|
|
journal.append(remaining)
|
|
|
|
with sqlite.connect(':memory:') as bck:
|
|
self.cx.backup(bck, pages=-1, progress=progress)
|
|
self.verify_backup(bck)
|
|
|
|
self.assertEqual(len(journal), 1)
|
|
self.assertEqual(journal[0], 0)
|
|
|
|
def test_non_callable_progress(self):
|
|
with self.assertRaises(TypeError) as cm:
|
|
with sqlite.connect(':memory:') as bck:
|
|
self.cx.backup(bck, pages=1, progress='bar')
|
|
self.assertEqual(str(cm.exception), 'progress argument must be a callable')
|
|
|
|
def test_modifying_progress(self):
|
|
journal = []
|
|
|
|
def progress(status, remaining, total):
|
|
if not journal:
|
|
self.cx.execute('INSERT INTO foo (key) VALUES (?)', (remaining+1000,))
|
|
self.cx.commit()
|
|
journal.append(remaining)
|
|
|
|
with sqlite.connect(':memory:') as bck:
|
|
self.cx.backup(bck, pages=1, progress=progress)
|
|
self.verify_backup(bck)
|
|
|
|
result = bck.execute("SELECT key FROM foo"
|
|
" WHERE key >= 1000"
|
|
" ORDER BY key").fetchall()
|
|
self.assertEqual(result[0][0], 1001)
|
|
|
|
self.assertEqual(len(journal), 3)
|
|
self.assertEqual(journal[0], 1)
|
|
self.assertEqual(journal[1], 1)
|
|
self.assertEqual(journal[2], 0)
|
|
|
|
def test_failing_progress(self):
|
|
def progress(status, remaining, total):
|
|
raise SystemError('nearly out of space')
|
|
|
|
with self.assertRaises(SystemError) as err:
|
|
with sqlite.connect(':memory:') as bck:
|
|
self.cx.backup(bck, progress=progress)
|
|
self.assertEqual(str(err.exception), 'nearly out of space')
|
|
|
|
def test_database_source_name(self):
|
|
with sqlite.connect(':memory:') as bck:
|
|
self.cx.backup(bck, name='main')
|
|
with sqlite.connect(':memory:') as bck:
|
|
self.cx.backup(bck, name='temp')
|
|
with self.assertRaises(sqlite.OperationalError) as cm:
|
|
with sqlite.connect(':memory:') as bck:
|
|
self.cx.backup(bck, name='non-existing')
|
|
self.assertIn("unknown database", str(cm.exception))
|
|
|
|
self.cx.execute("ATTACH DATABASE ':memory:' AS attached_db")
|
|
self.cx.execute('CREATE TABLE attached_db.foo (key INTEGER)')
|
|
self.cx.executemany('INSERT INTO attached_db.foo (key) VALUES (?)', [(3,), (4,)])
|
|
self.cx.commit()
|
|
with sqlite.connect(':memory:') as bck:
|
|
self.cx.backup(bck, name='attached_db')
|
|
self.verify_backup(bck)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
unittest.main()
|