mirror of
https://github.com/tursodatabase/limbo.git
synced 2025-07-07 20:45:01 +00:00
183 lines
6.8 KiB
Python
Executable file
183 lines
6.8 KiB
Python
Executable file
#!/usr/bin/env python3
|
|
import os
|
|
import select
|
|
import subprocess
|
|
import time
|
|
from pathlib import Path
|
|
from time import sleep
|
|
from typing import Callable, List, Optional
|
|
|
|
from cli_tests import console
|
|
|
|
PIPE_BUF = 4096
|
|
|
|
|
|
class ShellConfig:
|
|
def __init__(self, exe_name, flags: str = "-q"):
|
|
self.sqlite_exec: str = exe_name
|
|
self.sqlite_flags: List[str] = flags.split()
|
|
self.cwd = os.getcwd()
|
|
self.test_dir: Path = Path("testing")
|
|
self.py_folder: Path = Path("cli_tests")
|
|
self.test_files: Path = Path("test_files")
|
|
|
|
|
|
class TursoShell:
|
|
def __init__(self, config: ShellConfig, init_commands: Optional[str] = None):
|
|
self.config = config
|
|
self.pipe = self._start_repl(init_commands)
|
|
|
|
def _start_repl(self, init_commands: Optional[str]) -> subprocess.Popen:
|
|
env = os.environ.copy()
|
|
env["RUST_BACKTRACE"] = "1"
|
|
pipe = subprocess.Popen(
|
|
[self.config.sqlite_exec, *self.config.sqlite_flags],
|
|
stdin=subprocess.PIPE,
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE,
|
|
bufsize=0,
|
|
env=env,
|
|
)
|
|
if init_commands and pipe.stdin is not None:
|
|
pipe.stdin.write((init_commands + "\n").encode())
|
|
pipe.stdin.flush()
|
|
return pipe
|
|
|
|
def get_test_filepath(self) -> Path:
|
|
return self.config.test_dir / "limbo_output.txt"
|
|
|
|
def execute(self, sql: str) -> str:
|
|
end_marker = "END_OF_RESULT"
|
|
self._write_to_pipe(sql)
|
|
|
|
# If we're redirecting output, return so test's don't hang
|
|
if sql.strip().startswith(".output"):
|
|
return ""
|
|
self._write_to_pipe(f"SELECT '{end_marker}';")
|
|
output = ""
|
|
while True:
|
|
ready, _, errors = select.select(
|
|
[self.pipe.stdout, self.pipe.stderr],
|
|
[],
|
|
[self.pipe.stdout, self.pipe.stderr],
|
|
)
|
|
ready_or_errors = set(ready + errors)
|
|
if self.pipe.stderr in ready_or_errors:
|
|
fragment = self.pipe.stderr.read(PIPE_BUF).decode()
|
|
if not fragment:
|
|
console.error(output, end="", _stack_offset=2)
|
|
raise RuntimeError("Error encountered in Turso shell.")
|
|
output += fragment
|
|
if self.pipe.stdout in ready_or_errors:
|
|
fragment = self.pipe.stdout.read(PIPE_BUF).decode()
|
|
output += fragment
|
|
if output.rstrip().endswith(end_marker):
|
|
break
|
|
return self._clean_output(output, end_marker)
|
|
|
|
def _write_to_pipe(self, command: str) -> None:
|
|
if not self.pipe.stdin:
|
|
raise RuntimeError("Failed to start Turso REPL")
|
|
self.pipe.stdin.write((command + "\n").encode())
|
|
self.pipe.stdin.flush()
|
|
|
|
@staticmethod
|
|
def _clean_output(output: str, marker: str) -> str:
|
|
output = output.rstrip().removesuffix(marker)
|
|
lines = [line.strip() for line in output.split("\n") if line]
|
|
return "\n".join(lines)
|
|
|
|
def quit(self) -> None:
|
|
self._write_to_pipe(".quit")
|
|
sleep(0.3)
|
|
self.pipe.terminate()
|
|
self.pipe.kill()
|
|
|
|
|
|
class TestTursoShell:
|
|
def __init__(
|
|
self,
|
|
init_commands: Optional[str] = None,
|
|
init_blobs_table: bool = False,
|
|
exec_name: Optional[str] = None,
|
|
use_testing_db: bool = False,
|
|
flags="",
|
|
):
|
|
if exec_name is None:
|
|
exec_name = os.environ.get("SQLITE_EXEC", "./scripts/limbo-sqlite3")
|
|
self.config = ShellConfig(exe_name=exec_name, flags=flags)
|
|
if use_testing_db:
|
|
self.init_test_db()
|
|
init_commands = ".open testing/testing_clone.db"
|
|
if init_commands is None:
|
|
# Default initialization
|
|
init_commands = """
|
|
CREATE TABLE users (id INTEGER PRIMARY KEY, first_name TEXT, last_name TEXT, age INTEGER);
|
|
CREATE TABLE products (id INTEGER PRIMARY KEY, name TEXT, price INTEGER);
|
|
INSERT INTO users VALUES (1, 'Alice', 'Smith', 30), (2, 'Bob', 'Johnson', 25),
|
|
(3, 'Charlie', 'Brown', 66), (4, 'David', 'Nichols', 70);
|
|
INSERT INTO products VALUES (1, 'Hat', 19.99), (2, 'Shirt', 29.99),
|
|
(3, 'Shorts', 39.99), (4, 'Dress', 49.99);
|
|
"""
|
|
if init_blobs_table:
|
|
init_commands += """
|
|
CREATE TABLE t (x1, x2, x3, x4);
|
|
INSERT INTO t VALUES (zeroblob(1024 - 1), zeroblob(1024 - 2), zeroblob(1024 - 3), zeroblob(1024 - 4));"""
|
|
|
|
init_commands += "\n.nullvalue TURSO"
|
|
self.shell = TursoShell(self.config, init_commands)
|
|
|
|
def quit(self, cleanup=True):
|
|
if cleanup:
|
|
self.cleanup_test_db()
|
|
self.shell.quit()
|
|
|
|
def run_test(self, name: str, sql: str, expected: str) -> None:
|
|
console.test(f"Running test: {name}", _stack_offset=2)
|
|
actual = self.shell.execute(sql)
|
|
assert actual == expected, (
|
|
f"Test failed: {name}\nSQL: {sql}\nExpected:\n{repr(expected)}\nActual:\n{repr(actual)}"
|
|
)
|
|
|
|
def run_debug(self, sql: str):
|
|
console.debug(f"debugging: {sql}", _stack_offset=2)
|
|
actual = self.shell.execute(sql)
|
|
console.debug(f"OUTPUT:\n{repr(actual)}", _stack_offset=2)
|
|
|
|
def run_test_fn(self, sql: str, validate: Callable[[str], bool], desc: str = "") -> None:
|
|
# Print the test that is executing before executing the sql command
|
|
# Printing later confuses the user of the code what test has actually failed
|
|
if desc:
|
|
console.test(f"Testing: {desc}", _stack_offset=2)
|
|
actual = self.shell.execute(sql)
|
|
assert validate(actual), f"Test failed\nSQL: {sql}\nActual:\n{repr(actual)}"
|
|
|
|
def execute_dot(self, dot_command: str) -> None:
|
|
self.shell._write_to_pipe(dot_command)
|
|
|
|
def init_test_db(self) -> None:
|
|
self.cleanup_test_db()
|
|
path = os.path.join("testing", "testing_clone.db")
|
|
if os.path.exists(path):
|
|
os.remove(path)
|
|
time.sleep(0.1) # Ensure the file is removed before cloning
|
|
cmd = "sqlite3 testing/testing.db '.clone testing/testing_clone.db'"
|
|
subprocess.run(cmd, shell=True, capture_output=True, text=True)
|
|
if not os.path.exists("testing/testing_clone.db"):
|
|
raise RuntimeError("Failed to clone testing.db to testing/testing_clone.db")
|
|
|
|
def cleanup_test_db(self) -> None:
|
|
path = os.path.join("testing", "testing_clone.db")
|
|
if os.path.exists(path):
|
|
os.remove(path)
|
|
walpath = os.path.join("testing", "testing.db-wal")
|
|
if os.path.exists(walpath):
|
|
os.remove(walpath)
|
|
|
|
# Enables the use of `with` syntax
|
|
def __enter__(self):
|
|
return self
|
|
|
|
def __exit__(self, exception_type, exception_value, exception_traceback):
|
|
self.cleanup_test_db()
|
|
self.quit()
|