This commit is contained in:
Dax Raad 2025-08-31 19:43:11 -04:00
parent ce1b5ee496
commit b51975fd67
6 changed files with 224 additions and 118 deletions

View file

@ -7,13 +7,12 @@ export namespace Bus {
type Subscription = (event: any) => void
const state = Instance.state(() => {
const subscriptions = new Map<any, Subscription[]>()
const subscriptions = new Map<any, Subscription[]>()
return {
subscriptions,
}
},
)
return {
subscriptions,
}
})
export type EventDefinition = ReturnType<typeof event>

View file

@ -2,7 +2,7 @@ import z from "zod"
import { Filesystem } from "../util/filesystem"
import path from "path"
import { $ } from "bun"
import { StorageNext } from "../storage/storage-next"
import { Storage } from "../storage/storage"
import { Log } from "../util/log"
export namespace Project {
@ -37,7 +37,7 @@ export namespace Project {
created: Date.now(),
},
}
await StorageNext.write<Info>(["project", "global"], project)
await Storage.write<Info>(["project", "global"], project)
return project
}
let worktree = path.dirname(git)
@ -69,7 +69,7 @@ export namespace Project {
created: Date.now(),
},
}
await StorageNext.write<Info>(["project", id], project)
await Storage.write<Info>(["project", id], project)
return project
}
if (cache.has(directory)) {
@ -81,13 +81,13 @@ export namespace Project {
}
export async function setInitialized(projectID: string) {
await StorageNext.update<Info>(["project", projectID], (draft) => {
await Storage.update<Info>(["project", projectID], (draft) => {
draft.time.initialized = Date.now()
})
}
export async function list() {
const keys = await StorageNext.list(["project"])
return await Promise.all(keys.map((x) => StorageNext.read<Info>(x)))
const keys = await Storage.list(["project"])
return await Promise.all(keys.map((x) => Storage.read<Info>(x)))
}
}

View file

@ -31,7 +31,7 @@ import { ProviderTransform } from "../provider/transform"
import type { ModelsDev } from "../provider/models"
import { Share } from "../share/share"
import { Snapshot } from "../snapshot"
import { StorageNext } from "../storage/storage-next"
import { Storage } from "../storage/storage"
import { Log } from "../util/log"
import { NamedError } from "../util/error"
import { SystemPrompt } from "./system"
@ -187,7 +187,7 @@ export namespace Session {
},
}
log.info("created", result)
await StorageNext.write(["session", Instance.project.id, result.id], result)
await Storage.write(["session", Instance.project.id, result.id], result)
const cfg = await Config.get()
if (!result.parentID && (Flag.OPENCODE_AUTO_SHARE || cfg.share === "auto"))
share(result.id)
@ -206,12 +206,12 @@ export namespace Session {
}
export async function get(id: string) {
const read = await StorageNext.read<Info>(["session", Instance.project.id, id])
const read = await Storage.read<Info>(["session", Instance.project.id, id])
return read as Info
}
export async function getShare(id: string) {
return StorageNext.read<ShareInfo>(["share", id])
return Storage.read<ShareInfo>(["share", id])
}
export async function share(id: string) {
@ -228,7 +228,7 @@ export namespace Session {
url: share.url,
}
})
await StorageNext.write(["share", id], share)
await Storage.write(["share", id], share)
await Share.sync("session/info/" + id, session)
for (const msg of await messages(id)) {
await Share.sync("session/message/" + id + "/" + msg.info.id, msg.info)
@ -242,7 +242,7 @@ export namespace Session {
export async function unshare(id: string) {
const share = await getShare(id)
if (!share) return
await StorageNext.remove(["share", id])
await Storage.remove(["share", id])
await update(id, (draft) => {
draft.share = undefined
})
@ -251,7 +251,7 @@ export namespace Session {
export async function update(id: string, editor: (session: Info) => void) {
const project = Instance.project
const result = await StorageNext.update<Info>(["session", project.id, id], (draft) => {
const result = await Storage.update<Info>(["session", project.id, id], (draft) => {
editor(draft)
draft.time.updated = Date.now()
})
@ -266,8 +266,8 @@ export namespace Session {
info: MessageV2.Info
parts: MessageV2.Part[]
}[]
for (const p of await StorageNext.list(["message", sessionID])) {
const read = await StorageNext.read<MessageV2.Info>(p)
for (const p of await Storage.list(["message", sessionID])) {
const read = await Storage.read<MessageV2.Info>(p)
result.push({
info: read,
parts: await getParts(read.id),
@ -279,15 +279,15 @@ export namespace Session {
export async function getMessage(sessionID: string, messageID: string) {
return {
info: await StorageNext.read<MessageV2.Info>(["message", sessionID, messageID]),
info: await Storage.read<MessageV2.Info>(["message", sessionID, messageID]),
parts: await getParts(messageID),
}
}
export async function getParts(messageID: string) {
const result = [] as MessageV2.Part[]
for (const item of await StorageNext.list(["part", messageID])) {
const read = await StorageNext.read<MessageV2.Part>(item)
for (const item of await Storage.list(["part", messageID])) {
const read = await Storage.read<MessageV2.Part>(item)
result.push(read)
}
result.sort((a, b) => (a.id > b.id ? 1 : -1))
@ -296,16 +296,16 @@ export namespace Session {
export async function* list() {
const project = Instance.project
for (const item of await StorageNext.list(["session", project.id])) {
yield StorageNext.read<Info>(item)
for (const item of await Storage.list(["session", project.id])) {
yield Storage.read<Info>(item)
}
}
export async function children(parentID: string) {
const project = Instance.project
const result = [] as Session.Info[]
for (const item of await StorageNext.list(["session", project.id])) {
const session = await StorageNext.read<Info>(item)
for (const item of await Storage.list(["session", project.id])) {
const session = await Storage.read<Info>(item)
if (session.parentID !== parentID) continue
result.push(session)
}
@ -332,13 +332,13 @@ export namespace Session {
await remove(child.id, false)
}
await unshare(sessionID).catch(() => {})
for (const msg of await StorageNext.list(["message", sessionID])) {
for (const part of await StorageNext.list(["part", msg.at(-1)!])) {
await StorageNext.remove(part)
for (const msg of await Storage.list(["message", sessionID])) {
for (const part of await Storage.list(["part", msg.at(-1)!])) {
await Storage.remove(part)
}
await StorageNext.remove(msg)
await Storage.remove(msg)
}
await StorageNext.remove(["session", project.id, sessionID])
await Storage.remove(["session", project.id, sessionID])
if (emitEvent) {
Bus.publish(Event.Deleted, {
info: session,
@ -350,14 +350,14 @@ export namespace Session {
}
async function updateMessage(msg: MessageV2.Info) {
await StorageNext.write(["message", msg.sessionID, msg.id], msg)
await Storage.write(["message", msg.sessionID, msg.id], msg)
Bus.publish(MessageV2.Event.Updated, {
info: msg,
})
}
async function updatePart(part: MessageV2.Part) {
await StorageNext.write(["part", part.messageID, part.id], part)
await Storage.write(["part", part.messageID, part.id], part)
Bus.publish(MessageV2.Event.PartUpdated, {
part,
})
@ -425,7 +425,7 @@ export namespace Session {
const [preserve, remove] = splitWhen(msgs, (x) => x.info.id === messageID)
msgs = preserve
for (const msg of remove) {
await StorageNext.remove(["message", input.sessionID, msg.info.id])
await Storage.remove(["message", input.sessionID, msg.info.id])
await Bus.publish(MessageV2.Event.Removed, { sessionID: input.sessionID, messageID: msg.info.id })
}
const last = preserve.at(-1)
@ -434,7 +434,7 @@ export namespace Session {
const [preserveParts, removeParts] = splitWhen(last.parts, (x) => x.id === partID)
last.parts = preserveParts
for (const part of removeParts) {
await StorageNext.remove(["part", last.info.id, part.id])
await Storage.remove(["part", last.info.id, part.id])
await Bus.publish(MessageV2.Event.PartRemoved, {
sessionID: input.sessionID,
messageID: last.info.id,
@ -787,7 +787,7 @@ export namespace Session {
await updateMessage(assistantMsg)
await using _ = defer(async () => {
if (assistantMsg.time.completed) return
await StorageNext.remove(["session", "message", input.sessionID, assistantMsg.id])
await Storage.remove(["session", "message", input.sessionID, assistantMsg.id])
await Bus.publish(MessageV2.Event.Removed, { sessionID: input.sessionID, messageID: assistantMsg.id })
})
const tools: Record<string, AITool> = {}

View file

@ -1,7 +1,7 @@
import { Bus } from "../bus"
import { Installation } from "../installation"
import { Session } from "../session"
import { StorageNext } from "../storage/storage-next"
import { MessageV2 } from "../session/message-v2"
import { Log } from "../util/log"
export namespace Share {
@ -45,7 +45,25 @@ export namespace Share {
})
}
export function init() {}
export function init() {
Bus.subscribe(Session.Event.Updated, async (evt) => {
await sync("session/info/" + evt.properties.info.id, evt.properties.info)
})
Bus.subscribe(MessageV2.Event.Updated, async (evt) => {
await sync("session/message/" + evt.properties.info.sessionID + "/" + evt.properties.info.id, evt.properties.info)
})
Bus.subscribe(MessageV2.Event.PartUpdated, async (evt) => {
await sync(
"session/part/" +
evt.properties.part.sessionID +
"/" +
evt.properties.part.messageID +
"/" +
evt.properties.part.id,
evt.properties.part,
)
})
}
export const URL =
process.env["OPENCODE_API"] ??

View file

@ -1,78 +0,0 @@
import { Log } from "../util/log"
import path from "path"
import fs from "fs/promises"
import { Global } from "../global"
import { lazy } from "../util/lazy"
import { Lock } from "../util/lock"
export namespace StorageNext {
const log = Log.create({ service: "storage" })
type Migration = (dir: string) => Promise<void>
const MIGRATIONS: Migration[] = []
const state = lazy(async () => {
const dir = path.join(Global.Path.data, "storage")
const migration = await Bun.file(path.join(dir, "migration"))
.json()
.then((x) => parseInt(x))
.catch(() => 0)
for (let index = migration; index < MIGRATIONS.length; index++) {
log.info("running migration", { index })
const migration = MIGRATIONS[index]
await migration(dir)
await Bun.write(path.join(dir, "migration"), (index + 1).toString())
}
return {
dir,
}
})
export async function remove(key: string[]) {
const dir = await state().then((x) => x.dir)
const target = path.join(dir, ...key) + ".json"
await fs.unlink(target).catch(() => {})
}
export async function read<T>(key: string[]) {
const dir = await state().then((x) => x.dir)
const target = path.join(dir, ...key) + ".json"
using _ = await Lock.read(target)
return Bun.file(target).json() as Promise<T>
}
export async function update<T>(key: string[], fn: (draft: T) => void) {
const dir = await state().then((x) => x.dir)
const target = path.join(dir, ...key) + ".json"
using _ = await Lock.write("storage")
const content = await Bun.file(target).json()
fn(content)
await Bun.write(target, JSON.stringify(content, null, 2))
return content as T
}
export async function write<T>(key: string[], content: T) {
const dir = await state().then((x) => x.dir)
const target = path.join(dir, ...key) + ".json"
using _ = await Lock.write("storage")
await Bun.write(target, JSON.stringify(content, null, 2))
}
const glob = new Bun.Glob("**/*")
export async function list(prefix: string[]) {
const dir = await state().then((x) => x.dir)
try {
const result = await Array.fromAsync(
glob.scan({
cwd: path.join(dir, ...prefix),
onlyFiles: true,
}),
).then((results) => results.map((x) => [...prefix, ...x.slice(0, -5).split(path.sep)]))
result.sort()
return result
} catch {
return []
}
}
}

View file

@ -0,0 +1,167 @@
import { Log } from "../util/log"
import path from "path"
import fs from "fs/promises"
import { Global } from "../global"
import { lazy } from "../util/lazy"
import { Lock } from "../util/lock"
import { $ } from "bun"
export namespace Storage {
const log = Log.create({ service: "storage" })
type Migration = (dir: string) => Promise<void>
const MIGRATIONS: Migration[] = [
async (dir) => {
const project = path.resolve(dir, "../project")
for await (const projectDir of new Bun.Glob("*").scan({ cwd: project, onlyFiles: false })) {
let projectID = projectDir
const fullProjectDir = path.join(project, projectDir)
let worktree = "/"
if (projectID !== "global") {
for await (const msgFile of new Bun.Glob("storage/session/message/*/*.json").scan({
cwd: path.join(project, projectDir),
absolute: true,
})) {
const json = await Bun.file(msgFile).json()
worktree = json.path?.root
if (worktree) break
}
if (!worktree) continue
const [id] = await $`git rev-list --max-parents=0 --all`
.quiet()
.nothrow()
.cwd(worktree)
.text()
.then((x) =>
x
.split("\n")
.filter(Boolean)
.map((x) => x.trim())
.toSorted(),
)
if (!id) continue
projectID = id
await Bun.write(
path.join(dir, "project", projectID + ".json"),
JSON.stringify({
id,
vcs: "git",
worktree,
time: {
created: Date.now(),
initialized: Date.now(),
},
}),
)
for await (const sessionFile of new Bun.Glob("storage/session/info/*.json").scan({
cwd: fullProjectDir,
absolute: true,
})) {
const dest = path.join(dir, "session", projectID, path.basename(sessionFile))
log.info("copying", {
sessionFile,
dest,
})
const session = await Bun.file(sessionFile).json()
await Bun.write(dest, JSON.stringify(session))
for await (const msgFile of new Bun.Glob(`storage/session/message/${session.id}/*.json`).scan({
cwd: fullProjectDir,
absolute: true,
})) {
const dest = path.join(dir, "message", session.id, path.basename(msgFile))
log.info("copying", {
msgFile,
dest,
})
const message = await Bun.file(msgFile).json()
await Bun.write(dest, JSON.stringify(message))
for await (const partFile of new Bun.Glob(`storage/session/part/${session.id}/${message.id}/*.json`).scan(
{
cwd: fullProjectDir,
absolute: true,
},
)) {
const dest = path.join(dir, "part", message.id, path.basename(partFile))
const part = await Bun.file(partFile).json()
log.info("copying", {
partFile,
dest,
})
await Bun.write(dest, JSON.stringify(part))
}
}
}
}
}
},
]
const state = lazy(async () => {
const dir = path.join(Global.Path.data, "storage")
const migration = await Bun.file(path.join(dir, "migration"))
.json()
.then((x) => parseInt(x))
.catch(() => 0)
for (let index = migration; index < MIGRATIONS.length; index++) {
log.info("running migration", { index })
const migration = MIGRATIONS[index]
await migration(dir)
await Bun.write(path.join(dir, "migration"), (index + 1).toString())
}
return {
dir,
}
})
export async function remove(key: string[]) {
const dir = await state().then((x) => x.dir)
const target = path.join(dir, ...key) + ".json"
await fs.unlink(target).catch(() => {})
}
export async function read<T>(key: string[]) {
const dir = await state().then((x) => x.dir)
const target = path.join(dir, ...key) + ".json"
using _ = await Lock.read(target)
return Bun.file(target).json() as Promise<T>
}
export async function update<T>(key: string[], fn: (draft: T) => void) {
const dir = await state().then((x) => x.dir)
const target = path.join(dir, ...key) + ".json"
using _ = await Lock.write("storage")
const content = await Bun.file(target).json()
fn(content)
await Bun.write(target, JSON.stringify(content, null, 2))
return content as T
}
export async function write<T>(key: string[], content: T) {
const dir = await state().then((x) => x.dir)
const target = path.join(dir, ...key) + ".json"
using _ = await Lock.write("storage")
await Bun.write(target, JSON.stringify(content, null, 2))
}
const glob = new Bun.Glob("**/*")
export async function list(prefix: string[]) {
const dir = await state().then((x) => x.dir)
try {
const result = await Array.fromAsync(
glob.scan({
cwd: path.join(dir, ...prefix),
onlyFiles: true,
}),
).then((results) => results.map((x) => [...prefix, ...x.slice(0, -5).split(path.sep)]))
result.sort()
return result
} catch {
return []
}
}
}