From 3d99dc78dbff982cde6dd61a0629b7ee9de1a8f3 Mon Sep 17 00:00:00 2001 From: Dax Raad Date: Mon, 1 Dec 2025 16:35:03 -0500 Subject: [PATCH] core: reduce latency when loading shared sessions through event compaction --- packages/enterprise/src/core/share.ts | 102 +++++-- packages/enterprise/src/core/storage.ts | 20 +- packages/enterprise/test-debug.ts | 37 +++ packages/enterprise/test/core/share.test.ts | 269 ++++++++++++++++++ packages/enterprise/test/core/storage.test.ts | 67 +++++ packages/util/src/identifier.ts | 48 ++++ 6 files changed, 507 insertions(+), 36 deletions(-) create mode 100644 packages/enterprise/test-debug.ts create mode 100644 packages/enterprise/test/core/share.test.ts create mode 100644 packages/enterprise/test/core/storage.test.ts create mode 100644 packages/util/src/identifier.ts diff --git a/packages/enterprise/src/core/share.ts b/packages/enterprise/src/core/share.ts index 5c2fbc28c..bf10c4a14 100644 --- a/packages/enterprise/src/core/share.ts +++ b/packages/enterprise/src/core/share.ts @@ -1,8 +1,10 @@ import { FileDiff, Message, Model, Part, Session, SessionStatus } from "@opencode-ai/sdk" import { fn } from "@opencode-ai/util/fn" import { iife } from "@opencode-ai/util/iife" +import { Identifier } from "@opencode-ai/util/identifier" import z from "zod" import { Storage } from "./storage" +import { Binary } from "@opencode-ai/util/binary" export namespace Share { export const Info = z.object({ @@ -37,15 +39,15 @@ export namespace Share { export type Data = z.infer export const create = fn(z.object({ sessionID: z.string() }), async (body) => { + const isTest = process.env.NODE_ENV === "test" || body.sessionID.startsWith("test_") const info: Info = { - id: body.sessionID.slice(-8), + id: (isTest ? "test_" : "") + body.sessionID.slice(-8), sessionID: body.sessionID, secret: crypto.randomUUID(), } const exists = await get(info.id) if (exists) throw new Errors.AlreadyExists(info.id) await Storage.write(["share", info.id], info) - console.log("created share", info.id) return info }) @@ -58,35 +60,72 @@ export namespace Share { if (!share) throw new Errors.NotFound(body.id) if (share.secret !== body.secret) throw new Errors.InvalidSecret(body.id) await Storage.remove(["share", body.id]) - const list = await Storage.list(["share_data", body.id]) + const list = await Storage.list({ prefix: ["share_data", body.id] }) for (const item of list) { await Storage.remove(item) } }) - export async function data(id: string) { - let time = Date.now() - const list = await Storage.list(["share_data", id]) - console.log("listing share data", Date.now() - time, list.length) - const promises = [] - time = Date.now() - for (const item of list) { - promises.push( - iife(async () => { - const [, , type] = item - return { - type: type as any, - data: await Storage.read(item), - } as Data - }), - ) - } - const result = await Promise.all(promises) - console.log("read share data", Date.now() - time, result.length) - return result + export const sync = fn( + z.object({ + share: Info.pick({ id: true, secret: true }), + data: Data.array(), + }), + async (input) => { + const share = await get(input.share.id) + if (!share) throw new Errors.NotFound(input.share.id) + if (share.secret !== input.share.secret) throw new Errors.InvalidSecret(input.share.id) + await Storage.write(["share_event", input.share.id, Identifier.descending()], input.data) + }, + ) + + type Compaction = { + event?: string + data: Data[] } - export const sync = fn( + export async function data(shareID: string) { + const compaction: Compaction = (await Storage.read(["share_compaction", shareID])) ?? { + data: [], + event: undefined, + } + + const list = await Storage.list({ + prefix: ["share_event", shareID], + end: compaction.event, + }).then((x) => x.toReversed()) + + const data = await Promise.all(list.map(async (event) => await Storage.read(event))).then((x) => x.flat()) + for (const item of data) { + if (!item) continue + const key = (item: Data) => { + switch (item.type) { + case "session": + return "session" + case "message": + return `message/${item.data.id}` + case "part": + return `${item.data.messageID}/${item.data.id}` + case "session_diff": + return "session_diff" + case "model": + return "model" + } + } + const id = key(item) + const result = Binary.search(compaction.data, id, key) + if (result.found) { + compaction.data[result.index] = item + } else { + compaction.data.splice(result.index, 0, item) + } + } + compaction.event = list.at(-1)?.at(-1) + await Storage.write(["share_compaction", shareID], compaction) + return compaction.data + } + + export const syncOld = fn( z.object({ share: Info.pick({ id: true, secret: true }), data: Data.array(), @@ -103,15 +142,16 @@ export namespace Share { case "session": await Storage.write(["share_data", input.share.id, "session"], item.data) break - case "message": - await Storage.write(["share_data", input.share.id, "message", item.data.id], item.data) + case "message": { + const data = item.data as Message + await Storage.write(["share_data", input.share.id, "message", data.id], item.data) break - case "part": - await Storage.write( - ["share_data", input.share.id, "part", item.data.messageID, item.data.id], - item.data, - ) + } + case "part": { + const data = item.data as Part + await Storage.write(["share_data", input.share.id, "part", data.messageID, data.id], item.data) break + } case "session_diff": await Storage.write(["share_data", input.share.id, "session_diff"], item.data) break diff --git a/packages/enterprise/src/core/storage.ts b/packages/enterprise/src/core/storage.ts index ee711458b..6edbef9ed 100644 --- a/packages/enterprise/src/core/storage.ts +++ b/packages/enterprise/src/core/storage.ts @@ -6,7 +6,7 @@ export namespace Storage { read(path: string): Promise write(path: string, value: string): Promise remove(path: string): Promise - list(prefix: string): Promise + list(options?: { prefix?: string; limit?: number; start?: string; end?: string }): Promise } function createAdapter(client: AwsClient, endpoint: string, bucket: string): Adapter { @@ -37,8 +37,14 @@ export namespace Storage { if (!response.ok) throw new Error(`Failed to remove ${path}: ${response.status}`) }, - async list(prefix: string): Promise { + async list(options?: { prefix?: string; limit?: number; start?: string; end?: string }): Promise { + const prefix = options?.prefix || "" const params = new URLSearchParams({ "list-type": "2", prefix }) + if (options?.limit) params.set("max-keys", options.limit.toString()) + if (options?.start) { + const startPath = prefix + options.start + ".json" + params.set("start-after", startPath) + } const response = await client.fetch(`${base}?${params}`) if (!response.ok) throw new Error(`Failed to list ${prefix}: ${response.status}`) const xml = await response.text() @@ -48,6 +54,10 @@ export namespace Storage { while ((match = regex.exec(xml)) !== null) { keys.push(match[1]) } + if (options?.end) { + const endPath = prefix + options.end + ".json" + return keys.filter((key) => key <= endPath) + } return keys }, } @@ -98,9 +108,9 @@ export namespace Storage { return adapter().remove(resolve(key)) } - export async function list(prefix: string[]) { - const p = prefix.join("/") + (prefix.length ? "/" : "") - const result = await adapter().list(p) + export async function list(options?: { prefix?: string[]; limit?: number; start?: string; end?: string }) { + const p = options?.prefix ? options.prefix.join("/") + (options.prefix.length ? "/" : "") : "" + const result = await adapter().list({ prefix: p, limit: options?.limit, start: options?.start, end: options?.end }) return result.map((x) => x.replace(/\.json$/, "").split("/")) } diff --git a/packages/enterprise/test-debug.ts b/packages/enterprise/test-debug.ts new file mode 100644 index 000000000..fca995297 --- /dev/null +++ b/packages/enterprise/test-debug.ts @@ -0,0 +1,37 @@ +import { Share } from "./src/core/share" +import { Storage } from "./src/core/storage" + +async function test() { + const shareInfo = await Share.create({ sessionID: "test-debug-" + Date.now() }) + + const batch1: Share.Data[] = [ + { type: "part", data: { id: "part1", sessionID: "session1", messageID: "msg1", type: "text", text: "Hello" } }, + ] + + const batch2: Share.Data[] = [ + { type: "part", data: { id: "part1", sessionID: "session1", messageID: "msg1", type: "text", text: "Hello Updated" } }, + ] + + await Share.sync({ + share: { id: shareInfo.id, secret: shareInfo.secret }, + data: batch1, + }) + + await Share.sync({ + share: { id: shareInfo.id, secret: shareInfo.secret }, + data: batch2, + }) + + const events = await Storage.list({ prefix: ["share_event", shareInfo.id] }) + console.log("Events (raw):", events) + console.log("Events (reversed):", events.toReversed()) + + for (const event of events.toReversed()) { + const data = await Storage.read(event) + console.log("Event data (reversed order):", event, data) + } + + await Share.remove({ id: shareInfo.id, secret: shareInfo.secret }) +} + +test() diff --git a/packages/enterprise/test/core/share.test.ts b/packages/enterprise/test/core/share.test.ts new file mode 100644 index 000000000..d3bf6a2c2 --- /dev/null +++ b/packages/enterprise/test/core/share.test.ts @@ -0,0 +1,269 @@ +import { describe, expect, test, afterAll } from "bun:test" +import { Share } from "../../src/core/share" +import { Storage } from "../../src/core/storage" +import { Identifier } from "@opencode-ai/util/identifier" + +describe.concurrent("core.share", () => { + test("should create a share", async () => { + const sessionID = Identifier.descending() + const share = await Share.create({ sessionID }) + + expect(share.sessionID).toBe(sessionID) + expect(share.secret).toBeDefined() + + await Share.remove({ id: share.id, secret: share.secret }) + }) + + test("should sync data to a share", async () => { + const sessionID = Identifier.descending() + const share = await Share.create({ sessionID }) + + const data: Share.Data[] = [ + { + type: "part", + data: { id: "part1", sessionID, messageID: "msg1", type: "text", text: "Hello" }, + }, + ] + + await Share.sync({ + share: { id: share.id, secret: share.secret }, + data, + }) + + const events = await Storage.list({ prefix: ["share_event", share.id] }) + expect(events.length).toBe(1) + + await Share.remove({ id: share.id, secret: share.secret }) + }) + + test("should sync multiple batches of data", async () => { + const sessionID = Identifier.descending() + const share = await Share.create({ sessionID }) + + const data1: Share.Data[] = [ + { + type: "part", + data: { id: "part1", sessionID, messageID: "msg1", type: "text", text: "Hello" }, + }, + ] + + const data2: Share.Data[] = [ + { + type: "part", + data: { id: "part2", sessionID, messageID: "msg1", type: "text", text: "World" }, + }, + ] + + await Share.sync({ + share: { id: share.id, secret: share.secret }, + data: data1, + }) + + await Share.sync({ + share: { id: share.id, secret: share.secret }, + data: data2, + }) + + const events = await Storage.list({ prefix: ["share_event", share.id] }) + expect(events.length).toBe(2) + + await Share.remove({ id: share.id, secret: share.secret }) + }) + + test("should retrieve synced data", async () => { + const sessionID = Identifier.descending() + const share = await Share.create({ sessionID }) + + const data: Share.Data[] = [ + { + type: "part", + data: { id: "part1", sessionID, messageID: "msg1", type: "text", text: "Hello" }, + }, + { + type: "part", + data: { id: "part2", sessionID, messageID: "msg1", type: "text", text: "World" }, + }, + ] + + await Share.sync({ + share: { id: share.id, secret: share.secret }, + data, + }) + + const result = await Share.data(share.id) + + expect(result.length).toBe(2) + expect(result[0].type).toBe("part") + expect(result[1].type).toBe("part") + + await Share.remove({ id: share.id, secret: share.secret }) + }) + + test("should retrieve data from multiple syncs", async () => { + const sessionID = Identifier.descending() + const share = await Share.create({ sessionID }) + + const data1: Share.Data[] = [ + { + type: "part", + data: { id: "part1", sessionID, messageID: "msg1", type: "text", text: "Hello" }, + }, + ] + + const data2: Share.Data[] = [ + { + type: "part", + data: { id: "part2", sessionID, messageID: "msg2", type: "text", text: "World" }, + }, + ] + + const data3: Share.Data[] = [ + { type: "part", data: { id: "part3", sessionID, messageID: "msg3", type: "text", text: "!" } }, + ] + + await Share.sync({ + share: { id: share.id, secret: share.secret }, + data: data1, + }) + + await Share.sync({ + share: { id: share.id, secret: share.secret }, + data: data2, + }) + + await Share.sync({ + share: { id: share.id, secret: share.secret }, + data: data3, + }) + + const result = await Share.data(share.id) + + expect(result.length).toBe(3) + const parts = result.filter((d) => d.type === "part") + expect(parts.length).toBe(3) + + await Share.remove({ id: share.id, secret: share.secret }) + }) + + test("should return latest data when syncing duplicate parts", async () => { + const sessionID = Identifier.descending() + const share = await Share.create({ sessionID }) + + const data1: Share.Data[] = [ + { + type: "part", + data: { id: "part1", sessionID, messageID: "msg1", type: "text", text: "Hello" }, + }, + ] + + const data2: Share.Data[] = [ + { + type: "part", + data: { id: "part1", sessionID, messageID: "msg1", type: "text", text: "Hello Updated" }, + }, + ] + + await Share.sync({ + share: { id: share.id, secret: share.secret }, + data: data1, + }) + + await Share.sync({ + share: { id: share.id, secret: share.secret }, + data: data2, + }) + + const result = await Share.data(share.id) + + expect(result.length).toBe(1) + const [first] = result + expect(first.type).toBe("part") + expect(first.type === "part" && first.data.type === "text" && first.data.text).toBe("Hello Updated") + + await Share.remove({ id: share.id, secret: share.secret }) + }) + + test("should return empty array for share with no data", async () => { + const sessionID = Identifier.descending() + const share = await Share.create({ sessionID }) + + const result = await Share.data(share.id) + + expect(result).toEqual([]) + + await Share.remove({ id: share.id, secret: share.secret }) + }) + + test("should throw error for invalid secret", async () => { + const sessionID = Identifier.descending() + const share = await Share.create({ sessionID }) + + const data: Share.Data[] = [ + { + type: "part", + data: { id: "part1", sessionID, messageID: "msg1", type: "text", text: "Test" }, + }, + ] + + expect(async () => { + await Share.sync({ + share: { id: share.id, secret: "invalid-secret" }, + data, + }) + }).toThrow() + + await Share.remove({ id: share.id, secret: share.secret }) + }) + + test("should throw error for non-existent share", async () => { + const sessionID = Identifier.descending() + const data: Share.Data[] = [ + { + type: "part", + data: { id: "part1", sessionID, messageID: "msg1", type: "text", text: "Test" }, + }, + ] + + expect(async () => { + await Share.sync({ + share: { id: "non-existent-id", secret: "some-secret" }, + data, + }) + }).toThrow() + }) + + test("should handle different data types", async () => { + const sessionID = Identifier.descending() + const share = await Share.create({ sessionID }) + + const data: Share.Data[] = [ + { type: "session", data: { id: sessionID, status: "running" } as any }, + { type: "message", data: { id: "msg1", sessionID } as any }, + { + type: "part", + data: { id: "part1", sessionID, messageID: "msg1", type: "text", text: "Hello" }, + }, + ] + + await Share.sync({ + share: { id: share.id, secret: share.secret }, + data, + }) + + const result = await Share.data(share.id) + + expect(result.length).toBe(3) + expect(result.some((d) => d.type === "session")).toBe(true) + expect(result.some((d) => d.type === "message")).toBe(true) + expect(result.some((d) => d.type === "part")).toBe(true) + + await Share.remove({ id: share.id, secret: share.secret }) + }) + + afterAll(async () => { + const files = await Storage.list() + for (const file of files) { + Storage.remove(file) + } + }) +}) diff --git a/packages/enterprise/test/core/storage.test.ts b/packages/enterprise/test/core/storage.test.ts new file mode 100644 index 000000000..27e51384c --- /dev/null +++ b/packages/enterprise/test/core/storage.test.ts @@ -0,0 +1,67 @@ +import { describe, expect, test, afterAll } from "bun:test" +import { Storage } from "../../src/core/storage" + +describe("core.storage", () => { + test("should list files with start and end range", async () => { + await Storage.write(["test", "users", "user1"], { name: "user1" }) + await Storage.write(["test", "users", "user2"], { name: "user2" }) + await Storage.write(["test", "users", "user3"], { name: "user3" }) + await Storage.write(["test", "users", "user4"], { name: "user4" }) + await Storage.write(["test", "users", "user5"], { name: "user5" }) + + const result = await Storage.list({ prefix: ["test", "users"], start: "user2", end: "user4" }) + + expect(result).toEqual([ + ["test", "users", "user3"], + ["test", "users", "user4"], + ]) + }) + + test("should list files with start only", async () => { + const result = await Storage.list({ prefix: ["test", "users"], start: "user3" }) + + expect(result).toEqual([ + ["test", "users", "user4"], + ["test", "users", "user5"], + ]) + }) + + test("should list files with limit", async () => { + const result = await Storage.list({ prefix: ["test", "users"], limit: 3 }) + + expect(result).toEqual([ + ["test", "users", "user1"], + ["test", "users", "user2"], + ["test", "users", "user3"], + ]) + }) + + test("should list all files without prefix", async () => { + const result = await Storage.list() + + expect(result.length).toBeGreaterThan(0) + }) + + test("should list all files with prefix", async () => { + const result = await Storage.list({ prefix: ["test", "users"] }) + + expect(result).toEqual([ + ["test", "users", "user1"], + ["test", "users", "user2"], + ["test", "users", "user3"], + ["test", "users", "user4"], + ["test", "users", "user5"], + ]) + }) + + afterAll(async () => { + const testFiles = await Storage.list({ prefix: ["test"] }) + + for (const file of testFiles) { + await Storage.remove(file) + } + + const remainingFiles = await Storage.list({ prefix: ["test"] }) + expect(remainingFiles).toEqual([]) + }) +}) diff --git a/packages/util/src/identifier.ts b/packages/util/src/identifier.ts new file mode 100644 index 000000000..ba28a351b --- /dev/null +++ b/packages/util/src/identifier.ts @@ -0,0 +1,48 @@ +import { randomBytes } from "crypto" + +export namespace Identifier { + const LENGTH = 26 + + // State for monotonic ID generation + let lastTimestamp = 0 + let counter = 0 + + export function ascending() { + return create(false) + } + + export function descending() { + return create(true) + } + + function randomBase62(length: number): string { + const chars = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz" + let result = "" + const bytes = randomBytes(length) + for (let i = 0; i < length; i++) { + result += chars[bytes[i] % 62] + } + return result + } + + export function create(descending: boolean, timestamp?: number): string { + const currentTimestamp = timestamp ?? Date.now() + + if (currentTimestamp !== lastTimestamp) { + lastTimestamp = currentTimestamp + counter = 0 + } + counter++ + + let now = BigInt(currentTimestamp) * BigInt(0x1000) + BigInt(counter) + + now = descending ? ~now : now + + const timeBytes = Buffer.alloc(6) + for (let i = 0; i < 6; i++) { + timeBytes[i] = Number((now >> BigInt(40 - 8 * i)) & BigInt(0xff)) + } + + return timeBytes.toString("hex") + randomBase62(LENGTH - 12) + } +}