"""sqlite3 CLI tests.""" import sqlite3 import sys import textwrap import unittest import unittest.mock import os from sqlite3.__main__ import main as cli from test.support.import_helper import import_module from test.support.os_helper import TESTFN, unlink from test.support.pty_helper import run_pty from test.support import ( captured_stdout, captured_stderr, captured_stdin, force_not_colorized_test_class, requires_subprocess, verbose, ) @force_not_colorized_test_class class CommandLineInterface(unittest.TestCase): def _do_test(self, *args, expect_success=True): with ( captured_stdout() as out, captured_stderr() as err, self.assertRaises(SystemExit) as cm ): cli(args) return out.getvalue(), err.getvalue(), cm.exception.code def expect_success(self, *args): out, err, code = self._do_test(*args) self.assertEqual(code, 0, "\n".join([f"Unexpected failure: {args=}", out, err])) self.assertEqual(err, "") return out def expect_failure(self, *args): out, err, code = self._do_test(*args, expect_success=False) self.assertNotEqual(code, 0, "\n".join([f"Unexpected failure: {args=}", out, err])) self.assertEqual(out, "") return err def test_cli_help(self): out = self.expect_success("-h") self.assertIn("usage: ", out) self.assertIn(" [-h] [-v] [filename] [sql]", out) self.assertIn("Python sqlite3 CLI", out) def test_cli_version(self): out = self.expect_success("-v") self.assertIn(sqlite3.sqlite_version, out) def test_cli_execute_sql(self): out = self.expect_success(":memory:", "select 1") self.assertIn("(1,)", out) def test_cli_execute_too_much_sql(self): stderr = self.expect_failure(":memory:", "select 1; select 2") err = "ProgrammingError: You can only execute one statement at a time" self.assertIn(err, stderr) def test_cli_execute_incomplete_sql(self): stderr = self.expect_failure(":memory:", "sel") self.assertIn("OperationalError (SQLITE_ERROR)", stderr) def test_cli_on_disk_db(self): self.addCleanup(unlink, TESTFN) out = self.expect_success(TESTFN, "create table t(t)") self.assertEqual(out, "") out = self.expect_success(TESTFN, "select count(t) from t") self.assertIn("(0,)", out) @force_not_colorized_test_class class InteractiveSession(unittest.TestCase): MEMORY_DB_MSG = "Connected to a transient in-memory database" PS1 = "sqlite> " PS2 = "... " def run_cli(self, *args, commands=()): with ( captured_stdin() as stdin, captured_stdout() as stdout, captured_stderr() as stderr, self.assertRaises(SystemExit) as cm ): for cmd in commands: stdin.write(cmd + "\n") stdin.seek(0) cli(args) out = stdout.getvalue() err = stderr.getvalue() self.assertEqual(cm.exception.code, 0, f"Unexpected failure: {args=}\n{out}\n{err}") return out, err def test_interact(self): out, err = self.run_cli() self.assertIn(self.MEMORY_DB_MSG, err) self.assertIn(self.MEMORY_DB_MSG, err) self.assertEndsWith(out, self.PS1) self.assertEqual(out.count(self.PS1), 1) self.assertEqual(out.count(self.PS2), 0) def test_interact_quit(self): out, err = self.run_cli(commands=(".quit",)) self.assertIn(self.MEMORY_DB_MSG, err) self.assertEndsWith(out, self.PS1) self.assertEqual(out.count(self.PS1), 1) self.assertEqual(out.count(self.PS2), 0) def test_interact_version(self): out, err = self.run_cli(commands=(".version",)) self.assertIn(self.MEMORY_DB_MSG, err) self.assertIn(sqlite3.sqlite_version + "\n", out) self.assertEndsWith(out, self.PS1) self.assertEqual(out.count(self.PS1), 2) self.assertEqual(out.count(self.PS2), 0) self.assertIn(sqlite3.sqlite_version, out) def test_interact_empty_source(self): out, err = self.run_cli(commands=("", " ")) self.assertIn(self.MEMORY_DB_MSG, err) self.assertEndsWith(out, self.PS1) self.assertEqual(out.count(self.PS1), 3) self.assertEqual(out.count(self.PS2), 0) def test_interact_dot_commands_unknown(self): out, err = self.run_cli(commands=(".unknown_command", )) self.assertIn(self.MEMORY_DB_MSG, err) self.assertEndsWith(out, self.PS1) self.assertEqual(out.count(self.PS1), 2) self.assertEqual(out.count(self.PS2), 0) self.assertIn('Error: unknown command: "', err) # test "unknown_command" is pointed out in the error message self.assertIn("unknown_command", err) def test_interact_dot_commands_empty(self): out, err = self.run_cli(commands=(".")) self.assertIn(self.MEMORY_DB_MSG, err) self.assertEndsWith(out, self.PS1) self.assertEqual(out.count(self.PS1), 2) self.assertEqual(out.count(self.PS2), 0) def test_interact_dot_commands_with_whitespaces(self): out, err = self.run_cli(commands=(".version ", ". version")) self.assertIn(self.MEMORY_DB_MSG, err) self.assertEqual(out.count(sqlite3.sqlite_version + "\n"), 2) self.assertEndsWith(out, self.PS1) self.assertEqual(out.count(self.PS1), 3) self.assertEqual(out.count(self.PS2), 0) def test_interact_valid_sql(self): out, err = self.run_cli(commands=("SELECT 1;",)) self.assertIn(self.MEMORY_DB_MSG, err) self.assertIn("(1,)\n", out) self.assertEndsWith(out, self.PS1) self.assertEqual(out.count(self.PS1), 2) self.assertEqual(out.count(self.PS2), 0) def test_interact_incomplete_multiline_sql(self): out, err = self.run_cli(commands=("SELECT 1",)) self.assertIn(self.MEMORY_DB_MSG, err) self.assertEndsWith(out, self.PS2) self.assertEqual(out.count(self.PS1), 1) self.assertEqual(out.count(self.PS2), 1) def test_interact_valid_multiline_sql(self): out, err = self.run_cli(commands=("SELECT 1\n;",)) self.assertIn(self.MEMORY_DB_MSG, err) self.assertIn(self.PS2, out) self.assertIn("(1,)\n", out) self.assertEndsWith(out, self.PS1) self.assertEqual(out.count(self.PS1), 2) self.assertEqual(out.count(self.PS2), 1) def test_interact_invalid_sql(self): out, err = self.run_cli(commands=("sel;",)) self.assertIn(self.MEMORY_DB_MSG, err) self.assertIn("OperationalError (SQLITE_ERROR)", err) self.assertEndsWith(out, self.PS1) self.assertEqual(out.count(self.PS1), 2) self.assertEqual(out.count(self.PS2), 0) def test_interact_on_disk_file(self): self.addCleanup(unlink, TESTFN) out, err = self.run_cli(TESTFN, commands=("CREATE TABLE t(t);",)) self.assertIn(TESTFN, err) self.assertEndsWith(out, self.PS1) out, _ = self.run_cli(TESTFN, commands=("SELECT count(t) FROM t;",)) self.assertIn("(0,)\n", out) def test_color(self): with unittest.mock.patch("_colorize.can_colorize", return_value=True): out, err = self.run_cli(commands="TEXT\n") self.assertIn("\x1b[1;35msqlite> \x1b[0m", out) self.assertIn("\x1b[1;35m ... \x1b[0m\x1b", out) out, err = self.run_cli(commands=("sel;",)) self.assertIn('\x1b[1;35mOperationalError (SQLITE_ERROR)\x1b[0m: ' '\x1b[35mnear "sel": syntax error\x1b[0m', err) @requires_subprocess() @force_not_colorized_test_class class Completion(unittest.TestCase): PS1 = "sqlite> " @classmethod def setUpClass(cls): _sqlite3 = import_module("_sqlite3") if not hasattr(_sqlite3, "SQLITE_KEYWORDS"): raise unittest.SkipTest("unable to determine SQLite keywords") readline = import_module("readline") if readline.backend == "editline": raise unittest.SkipTest("libedit readline is not supported") def write_input(self, input_, env=None): script = textwrap.dedent(""" import readline from sqlite3.__main__ import main readline.parse_and_bind("set colored-completion-prefix off") main() """) return run_pty(script, input_, env) def test_complete_sql_keywords(self): # List candidates starting with 'S', there should be multiple matches. input_ = b"S\t\tEL\t 1;\n.quit\n" output = self.write_input(input_) self.assertIn(b"SELECT", output) self.assertIn(b"SET", output) self.assertIn(b"SAVEPOINT", output) self.assertIn(b"(1,)", output) # Keywords are completed in upper case for even lower case user input. input_ = b"sel\t\t 1;\n.quit\n" output = self.write_input(input_) self.assertIn(b"SELECT", output) self.assertIn(b"(1,)", output) @unittest.skipIf(sys.platform.startswith("freebsd"), "Two actual tabs are inserted when there are no matching" " completions in the pseudo-terminal opened by run_pty()" " on FreeBSD") def test_complete_no_match(self): input_ = b"xyzzy\t\t\b\b\b\b\b\b\b.quit\n" # Set NO_COLOR to disable coloring for self.PS1. output = self.write_input(input_, env={**os.environ, "NO_COLOR": "1"}) lines = output.decode().splitlines() indices = ( i for i, line in enumerate(lines, 1) if line.startswith(f"{self.PS1}xyzzy") ) line_num = next(indices, -1) self.assertNotEqual(line_num, -1) # Completions occupy lines, assert no extra lines when there is nothing # to complete. self.assertEqual(line_num, len(lines)) def test_complete_no_input(self): from _sqlite3 import SQLITE_KEYWORDS script = textwrap.dedent(""" import readline from sqlite3.__main__ import main # Configure readline to ...: # - hide control sequences surrounding each candidate # - hide "Display all xxx possibilities? (y or n)" # - hide "--More--" # - show candidates one per line readline.parse_and_bind("set colored-completion-prefix off") readline.parse_and_bind("set colored-stats off") readline.parse_and_bind("set completion-query-items 0") readline.parse_and_bind("set page-completions off") readline.parse_and_bind("set completion-display-width 0") readline.parse_and_bind("set show-all-if-ambiguous off") readline.parse_and_bind("set show-all-if-unmodified off") main() """) input_ = b"\t\t.quit\n" output = run_pty(script, input_, env={**os.environ, "NO_COLOR": "1"}) try: lines = output.decode().splitlines() indices = [ i for i, line in enumerate(lines) if line.startswith(self.PS1) ] self.assertEqual(len(indices), 2) start, end = indices candidates = [l.strip() for l in lines[start+1:end]] self.assertEqual(candidates, sorted(SQLITE_KEYWORDS)) except: if verbose: print(' PTY output: '.center(30, '-')) print(output.decode(errors='replace')) print(' end PTY output '.center(30, '-')) raise if __name__ == "__main__": unittest.main()