limbo/testing/cli_tests/test_turso_cli.py

183 lines
6.8 KiB
Python
Executable file

#!/usr/bin/env python3
import os
import select
import subprocess
import time
from pathlib import Path
from time import sleep
from typing import Callable, List, Optional
from cli_tests import console
PIPE_BUF = 4096
class ShellConfig:
def __init__(self, exe_name, flags: str = "-q"):
self.sqlite_exec: str = exe_name
self.sqlite_flags: List[str] = flags.split()
self.cwd = os.getcwd()
self.test_dir: Path = Path("testing")
self.py_folder: Path = Path("cli_tests")
self.test_files: Path = Path("test_files")
class TursoShell:
def __init__(self, config: ShellConfig, init_commands: Optional[str] = None):
self.config = config
self.pipe = self._start_repl(init_commands)
def _start_repl(self, init_commands: Optional[str]) -> subprocess.Popen:
env = os.environ.copy()
env["RUST_BACKTRACE"] = "1"
pipe = subprocess.Popen(
[self.config.sqlite_exec, *self.config.sqlite_flags],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
bufsize=0,
env=env,
)
if init_commands and pipe.stdin is not None:
pipe.stdin.write((init_commands + "\n").encode())
pipe.stdin.flush()
return pipe
def get_test_filepath(self) -> Path:
return self.config.test_dir / "limbo_output.txt"
def execute(self, sql: str) -> str:
end_marker = "END_OF_RESULT"
self._write_to_pipe(sql)
# If we're redirecting output, return so test's don't hang
if sql.strip().startswith(".output"):
return ""
self._write_to_pipe(f"SELECT '{end_marker}';")
output = ""
while True:
ready, _, errors = select.select(
[self.pipe.stdout, self.pipe.stderr],
[],
[self.pipe.stdout, self.pipe.stderr],
)
ready_or_errors = set(ready + errors)
if self.pipe.stderr in ready_or_errors:
fragment = self.pipe.stderr.read(PIPE_BUF).decode()
if not fragment:
console.error(output, end="", _stack_offset=2)
raise RuntimeError("Error encountered in Turso shell.")
output += fragment
if self.pipe.stdout in ready_or_errors:
fragment = self.pipe.stdout.read(PIPE_BUF).decode()
output += fragment
if output.rstrip().endswith(end_marker):
break
return self._clean_output(output, end_marker)
def _write_to_pipe(self, command: str) -> None:
if not self.pipe.stdin:
raise RuntimeError("Failed to start Turso REPL")
self.pipe.stdin.write((command + "\n").encode())
self.pipe.stdin.flush()
@staticmethod
def _clean_output(output: str, marker: str) -> str:
output = output.rstrip().removesuffix(marker)
lines = [line.strip() for line in output.split("\n") if line]
return "\n".join(lines)
def quit(self) -> None:
self._write_to_pipe(".quit")
sleep(0.3)
self.pipe.terminate()
self.pipe.kill()
class TestTursoShell:
def __init__(
self,
init_commands: Optional[str] = None,
init_blobs_table: bool = False,
exec_name: Optional[str] = None,
use_testing_db: bool = False,
flags="",
):
if exec_name is None:
exec_name = os.environ.get("SQLITE_EXEC", "./scripts/limbo-sqlite3")
self.config = ShellConfig(exe_name=exec_name, flags=flags)
if use_testing_db:
self.init_test_db()
init_commands = ".open testing/testing_clone.db"
if init_commands is None:
# Default initialization
init_commands = """
CREATE TABLE users (id INTEGER PRIMARY KEY, first_name TEXT, last_name TEXT, age INTEGER);
CREATE TABLE products (id INTEGER PRIMARY KEY, name TEXT, price INTEGER);
INSERT INTO users VALUES (1, 'Alice', 'Smith', 30), (2, 'Bob', 'Johnson', 25),
(3, 'Charlie', 'Brown', 66), (4, 'David', 'Nichols', 70);
INSERT INTO products VALUES (1, 'Hat', 19.99), (2, 'Shirt', 29.99),
(3, 'Shorts', 39.99), (4, 'Dress', 49.99);
"""
if init_blobs_table:
init_commands += """
CREATE TABLE t (x1, x2, x3, x4);
INSERT INTO t VALUES (zeroblob(1024 - 1), zeroblob(1024 - 2), zeroblob(1024 - 3), zeroblob(1024 - 4));"""
init_commands += "\n.nullvalue TURSO"
self.shell = TursoShell(self.config, init_commands)
def quit(self, cleanup=True):
if cleanup:
self.cleanup_test_db()
self.shell.quit()
def run_test(self, name: str, sql: str, expected: str) -> None:
console.test(f"Running test: {name}", _stack_offset=2)
actual = self.shell.execute(sql)
assert actual == expected, (
f"Test failed: {name}\nSQL: {sql}\nExpected:\n{repr(expected)}\nActual:\n{repr(actual)}"
)
def run_debug(self, sql: str):
console.debug(f"debugging: {sql}", _stack_offset=2)
actual = self.shell.execute(sql)
console.debug(f"OUTPUT:\n{repr(actual)}", _stack_offset=2)
def run_test_fn(self, sql: str, validate: Callable[[str], bool], desc: str = "") -> None:
# Print the test that is executing before executing the sql command
# Printing later confuses the user of the code what test has actually failed
if desc:
console.test(f"Testing: {desc}", _stack_offset=2)
actual = self.shell.execute(sql)
assert validate(actual), f"Test failed\nSQL: {sql}\nActual:\n{repr(actual)}"
def execute_dot(self, dot_command: str) -> None:
self.shell._write_to_pipe(dot_command)
def init_test_db(self) -> None:
self.cleanup_test_db()
path = os.path.join("testing", "testing_clone.db")
if os.path.exists(path):
os.remove(path)
time.sleep(0.1) # Ensure the file is removed before cloning
cmd = "sqlite3 testing/testing.db '.clone testing/testing_clone.db'"
subprocess.run(cmd, shell=True, capture_output=True, text=True)
if not os.path.exists("testing/testing_clone.db"):
raise RuntimeError("Failed to clone testing.db to testing/testing_clone.db")
def cleanup_test_db(self) -> None:
path = os.path.join("testing", "testing_clone.db")
if os.path.exists(path):
os.remove(path)
walpath = os.path.join("testing", "testing.db-wal")
if os.path.exists(walpath):
os.remove(walpath)
# Enables the use of `with` syntax
def __enter__(self):
return self
def __exit__(self, exception_type, exception_value, exception_traceback):
self.cleanup_test_db()
self.quit()