mirror of
https://github.com/tursodatabase/limbo.git
synced 2025-12-23 08:21:09 +00:00
208 lines
7.7 KiB
Python
208 lines
7.7 KiB
Python
import logging
|
|
import multiprocessing
|
|
import os
|
|
import tempfile
|
|
import time
|
|
|
|
import turso
|
|
import turso.sync
|
|
|
|
from .utils import TursoServer
|
|
|
|
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s", force=True)
|
|
|
|
|
|
def test_bootstrap():
|
|
with TursoServer() as server:
|
|
server.db_sql("CREATE TABLE t(x)")
|
|
server.db_sql("INSERT INTO t VALUES ('hello'), ('turso'), ('sync')")
|
|
server.db_sql("SELECT * FROM t")
|
|
|
|
conn = turso.sync.connect(":memory:", server.db_url())
|
|
rows = conn.execute("SELECT * FROM t").fetchall()
|
|
assert rows == [("hello",), ("turso",), ("sync",)]
|
|
|
|
|
|
def test_pull():
|
|
with TursoServer() as server:
|
|
server.db_sql("CREATE TABLE t(x)")
|
|
server.db_sql("INSERT INTO t VALUES ('hello'), ('turso'), ('sync')")
|
|
server.db_sql("SELECT * FROM t")
|
|
|
|
conn = turso.sync.connect(":memory:", server.db_url())
|
|
rows = conn.execute("SELECT * FROM t").fetchall()
|
|
assert rows == [("hello",), ("turso",), ("sync",)]
|
|
|
|
server.db_sql("INSERT INTO t VALUES ('pull works')")
|
|
|
|
rows = conn.execute("SELECT * FROM t").fetchall()
|
|
assert rows == [("hello",), ("turso",), ("sync",)]
|
|
|
|
assert conn.pull()
|
|
|
|
rows = conn.execute("SELECT * FROM t").fetchall()
|
|
assert rows == [("hello",), ("turso",), ("sync",), ("pull works",)]
|
|
|
|
assert not conn.pull()
|
|
|
|
|
|
def test_push():
|
|
with TursoServer() as server:
|
|
server.db_sql("CREATE TABLE t(x)")
|
|
server.db_sql("INSERT INTO t VALUES ('hello'), ('turso'), ('sync')")
|
|
server.db_sql("SELECT * FROM t")
|
|
|
|
conn = turso.sync.connect(":memory:", server.db_url())
|
|
rows = conn.execute("SELECT * FROM t").fetchall()
|
|
assert rows == [("hello",), ("turso",), ("sync",)]
|
|
|
|
conn.execute("INSERT INTO t VALUES ('push works')")
|
|
conn.commit()
|
|
|
|
r1 = server.db_sql("SELECT * FROM t")
|
|
assert r1 == [["hello"], ["turso"], ["sync"]]
|
|
|
|
conn.push()
|
|
|
|
r2 = server.db_sql("SELECT * FROM t")
|
|
assert r2 == [["hello"], ["turso"], ["sync"], ["push works"]]
|
|
|
|
|
|
def test_checkpoint():
|
|
# turso.setup_logging(level=logging.DEBUG)
|
|
|
|
with TursoServer() as server:
|
|
conn = turso.sync.connect(":memory:", remote_url=server.db_url())
|
|
conn.execute("CREATE TABLE t(x)")
|
|
conn.commit()
|
|
for i in range(1024):
|
|
conn.execute(f"INSERT INTO t VALUES ({i})")
|
|
conn.commit()
|
|
stats1 = conn.stats()
|
|
conn.checkpoint()
|
|
stats2 = conn.stats()
|
|
|
|
assert stats1.main_wal_size > 1024 * 1024
|
|
assert stats1.revert_wal_size == 0
|
|
|
|
assert stats2.main_wal_size == 0
|
|
assert stats2.revert_wal_size < 8 * 1024
|
|
|
|
conn.push()
|
|
|
|
assert server.db_sql("SELECT SUM(x) FROM t") == [[f"{1024 * 1023 // 2}"]]
|
|
|
|
|
|
def test_partial_sync():
|
|
# turso.setup_logging(level=logging.DEBUG)
|
|
|
|
with TursoServer() as server:
|
|
server.db_sql("CREATE TABLE t(x)")
|
|
server.db_sql("INSERT INTO t SELECT randomblob(1024) FROM generate_series(1, 2000)")
|
|
|
|
conn_full = turso.sync.connect(":memory:", remote_url=server.db_url())
|
|
assert conn_full.execute("SELECT LENGTH(x) FROM t LIMIT 1").fetchall() == [(1024,)]
|
|
assert conn_full.stats().network_received_bytes > 2000 * 1024
|
|
assert conn_full.execute("SELECT SUM(LENGTH(x)) FROM t").fetchall() == [(2000 * 1024,)]
|
|
|
|
conn_partial = turso.sync.connect(
|
|
":memory:",
|
|
remote_url=server.db_url(),
|
|
partial_sync_opts=turso.sync.PartialSyncOpts(
|
|
bootstrap_strategy=turso.sync.PartialSyncPrefixBootstrap(length=128 * 1024),
|
|
),
|
|
)
|
|
assert conn_partial.execute("SELECT LENGTH(x) FROM t LIMIT 1").fetchall() == [(1024,)]
|
|
assert conn_partial.stats().network_received_bytes < 256 * (1024 + 10)
|
|
|
|
start = time.time()
|
|
assert conn_partial.execute("SELECT SUM(LENGTH(x)) FROM t").fetchall() == [(2000 * 1024,)]
|
|
print(time.time() - start)
|
|
assert conn_partial.stats().network_received_bytes > 2000 * 1024
|
|
|
|
def test_partial_sync_segment_size():
|
|
# turso.setup_logging(level=logging.DEBUG)
|
|
|
|
with TursoServer() as server:
|
|
server.db_sql("CREATE TABLE t(x)")
|
|
server.db_sql("INSERT INTO t SELECT randomblob(1024) FROM generate_series(1, 256)")
|
|
|
|
conn_full = turso.sync.connect(":memory:", remote_url=server.db_url())
|
|
assert conn_full.execute("SELECT LENGTH(x) FROM t LIMIT 1").fetchall() == [(1024,)]
|
|
assert conn_full.stats().network_received_bytes > 256 * 1024
|
|
assert conn_full.execute("SELECT SUM(LENGTH(x)) FROM t").fetchall() == [(256 * 1024,)]
|
|
|
|
conn_partial = turso.sync.connect(
|
|
":memory:",
|
|
remote_url=server.db_url(),
|
|
partial_sync_opts=turso.sync.PartialSyncOpts(
|
|
bootstrap_strategy=turso.sync.PartialSyncPrefixBootstrap(length=128 * 1024),
|
|
segment_size=4 * 1024,
|
|
),
|
|
)
|
|
assert conn_partial.execute("SELECT LENGTH(x) FROM t LIMIT 1").fetchall() == [(1024,)]
|
|
assert conn_partial.stats().network_received_bytes < 128 * 1024 * 1.5
|
|
|
|
start = time.time()
|
|
assert conn_partial.execute("SELECT SUM(LENGTH(x)) FROM t").fetchall() == [(256 * 1024,)]
|
|
print(time.time() - start)
|
|
assert conn_partial.stats().network_received_bytes > 256 * 1024
|
|
|
|
def test_partial_sync_prefetch():
|
|
# turso.setup_logging(level=logging.DEBUG)
|
|
|
|
with TursoServer() as server:
|
|
server.db_sql("CREATE TABLE t(x)")
|
|
server.db_sql("INSERT INTO t SELECT randomblob(1024) FROM generate_series(1, 2000)")
|
|
|
|
conn_full = turso.sync.connect(":memory:", remote_url=server.db_url())
|
|
assert conn_full.execute("SELECT LENGTH(x) FROM t LIMIT 1").fetchall() == [(1024,)]
|
|
assert conn_full.stats().network_received_bytes > 2000 * 1024
|
|
assert conn_full.execute("SELECT SUM(LENGTH(x)) FROM t").fetchall() == [(2000 * 1024,)]
|
|
|
|
conn_partial = turso.sync.connect(
|
|
":memory:",
|
|
remote_url=server.db_url(),
|
|
partial_sync_opts=turso.sync.PartialSyncOpts(
|
|
bootstrap_strategy=turso.sync.PartialSyncPrefixBootstrap(length=128 * 1024),
|
|
segment_size=4 * 1024,
|
|
prefetch=True,
|
|
),
|
|
)
|
|
assert conn_partial.execute("SELECT LENGTH(x) FROM t LIMIT 1").fetchall() == [(1024,)]
|
|
assert conn_partial.stats().network_received_bytes < 1200 * 1024
|
|
|
|
start = time.time()
|
|
assert conn_partial.execute("SELECT SUM(LENGTH(x)) FROM t").fetchall() == [(2000 * 1024,)]
|
|
print(time.time() - start)
|
|
assert conn_partial.stats().network_received_bytes > 2000 * 1024
|
|
|
|
def run_full(path: str, remote_url: str, barrier: any):
|
|
barrier.wait()
|
|
try:
|
|
print(turso.sync.connect(path, remote_url=remote_url))
|
|
except Exception as e:
|
|
print("valid error", e, type(e), isinstance(e, turso.Error), turso.Error)
|
|
|
|
def test_bootstrap_concurrency():
|
|
# turso.setup_logging(level=logging.DEBUG)
|
|
|
|
with TursoServer() as server:
|
|
server.db_sql("CREATE TABLE t(x)")
|
|
server.db_sql("INSERT INTO t SELECT randomblob(1024) FROM generate_series(1, 2000)")
|
|
|
|
with tempfile.TemporaryDirectory(prefix="pyturso-") as dir:
|
|
path = os.path.join(dir, "local.db")
|
|
print(path)
|
|
barrier = multiprocessing.Barrier(2)
|
|
t1 = multiprocessing.Process(target=run_full, args=(path, server.db_url(), barrier))
|
|
t2 = multiprocessing.Process(target=run_full, args=(path, server.db_url(), barrier))
|
|
|
|
t1.start()
|
|
t2.start()
|
|
|
|
t1.join()
|
|
t2.join()
|
|
|
|
assert t1.exitcode == 0
|
|
assert t2.exitcode == 0
|