From 625d5593259c13bb5f98c652de5570923aca2122 Mon Sep 17 00:00:00 2001 From: Dax Raad Date: Mon, 21 Jul 2025 14:02:40 -0400 Subject: [PATCH] queueing --- packages/opencode/src/session/index.ts | 191 +++++++++++++++++-------- 1 file changed, 132 insertions(+), 59 deletions(-) diff --git a/packages/opencode/src/session/index.ts b/packages/opencode/src/session/index.ts index 292a604f9..a1a8e3eca 100644 --- a/packages/opencode/src/session/index.ts +++ b/packages/opencode/src/session/index.ts @@ -118,11 +118,21 @@ export namespace Session { const sessions = new Map() const messages = new Map() const pending = new Map() + const queued = new Map< + string, + { + message: MessageV2.User + parts: MessageV2.Part[] + processed: boolean + callback: (input: ReturnType) => void + }[] + >() return { sessions, messages, pending, + queued, } }, async (state) => { @@ -351,64 +361,14 @@ export namespace Session { ]), ), }) + export type ChatInput = z.infer - export async function chat(input: z.infer) { + export async function chat( + input: z.infer, + ): Promise<{ info: MessageV2.Assistant; parts: MessageV2.Part[] }> { const l = log.clone().tag("session", input.sessionID) l.info("chatting") - const model = await Provider.getModel(input.providerID, input.modelID) - let msgs = await messages(input.sessionID) - const session = await get(input.sessionID) - - if (session.revert) { - const trimmed = [] - for (const msg of msgs) { - if ( - msg.info.id > session.revert.messageID || - (msg.info.id === session.revert.messageID && session.revert.part === 0) - ) { - await Storage.remove("session/message/" + input.sessionID + "/" + msg.info.id) - await Bus.publish(MessageV2.Event.Removed, { - sessionID: input.sessionID, - messageID: msg.info.id, - }) - continue - } - - if (msg.info.id === session.revert.messageID) { - if (session.revert.part === 0) break - msg.parts = msg.parts.slice(0, session.revert.part) - } - trimmed.push(msg) - } - msgs = trimmed - await update(input.sessionID, (draft) => { - draft.revert = undefined - }) - } - - const previous = msgs.filter((x) => x.info.role === "assistant").at(-1)?.info as MessageV2.Assistant - const outputLimit = Math.min(model.info.limit.output, OUTPUT_TOKEN_MAX) || OUTPUT_TOKEN_MAX - - // auto summarize if too long - if (previous && previous.tokens) { - const tokens = - previous.tokens.input + previous.tokens.cache.read + previous.tokens.cache.write + previous.tokens.output - if (model.info.limit.context && tokens > Math.max((model.info.limit.context - outputLimit) * 0.9, 0)) { - await summarize({ - sessionID: input.sessionID, - providerID: input.providerID, - modelID: input.modelID, - }) - return chat(input) - } - } - - using abort = lock(input.sessionID) - - const lastSummary = msgs.findLast((msg) => msg.info.role === "assistant" && msg.info.summary === true) - if (lastSummary) msgs = msgs.filter((msg) => msg.info.id >= lastSummary.info.id) - const userMsg: MessageV2.Info = { id: input.messageID ?? Identifier.ascending("message"), role: "user", @@ -533,7 +493,6 @@ export namespace Session { ] }), ).then((x) => x.flat()) - if (input.mode === "plan") userParts.push({ id: Identifier.ascending("part"), @@ -544,6 +503,77 @@ export namespace Session { synthetic: true, }) + await updateMessage(userMsg) + for (const part of userParts) { + await updatePart(part) + } + + if (isLocked(input.sessionID)) { + return new Promise((resolve) => { + const queue = state().queued.get(input.sessionID) ?? [] + queue.push({ + message: userMsg, + parts: userParts, + processed: false, + callback: resolve, + }) + state().queued.set(input.sessionID, queue) + }) + } + + const model = await Provider.getModel(input.providerID, input.modelID) + let msgs = await messages(input.sessionID) + const session = await get(input.sessionID) + + if (session.revert) { + const trimmed = [] + for (const msg of msgs) { + if ( + msg.info.id > session.revert.messageID || + (msg.info.id === session.revert.messageID && session.revert.part === 0) + ) { + await Storage.remove("session/message/" + input.sessionID + "/" + msg.info.id) + await Bus.publish(MessageV2.Event.Removed, { + sessionID: input.sessionID, + messageID: msg.info.id, + }) + continue + } + + if (msg.info.id === session.revert.messageID) { + if (session.revert.part === 0) break + msg.parts = msg.parts.slice(0, session.revert.part) + } + trimmed.push(msg) + } + msgs = trimmed + await update(input.sessionID, (draft) => { + draft.revert = undefined + }) + } + + const previous = msgs.filter((x) => x.info.role === "assistant").at(-1)?.info as MessageV2.Assistant + const outputLimit = Math.min(model.info.limit.output, OUTPUT_TOKEN_MAX) || OUTPUT_TOKEN_MAX + + // auto summarize if too long + if (previous && previous.tokens) { + const tokens = + previous.tokens.input + previous.tokens.cache.read + previous.tokens.cache.write + previous.tokens.output + if (model.info.limit.context && tokens > Math.max((model.info.limit.context - outputLimit) * 0.9, 0)) { + await summarize({ + sessionID: input.sessionID, + providerID: input.providerID, + modelID: input.modelID, + }) + return chat(input) + } + } + + using abort = lock(input.sessionID) + + const lastSummary = msgs.findLast((msg) => msg.info.role === "assistant" && msg.info.summary === true) + if (lastSummary) msgs = msgs.filter((msg) => msg.info.id >= lastSummary.info.id) + if (msgs.length === 0 && !session.parentID) { const small = (await Provider.getSmallModel(input.providerID)) ?? model generateText({ @@ -582,10 +612,6 @@ export namespace Session { }) .catch(() => {}) } - await updateMessage(userMsg) - for (const part of userParts) { - await updatePart(part) - } msgs.push({ info: userMsg, parts: userParts }) const mode = await Mode.get(input.mode ?? "build") @@ -692,6 +718,49 @@ export namespace Session { const stream = streamText({ onError() {}, + async prepareStep({ messages }) { + const queue = (state().queued.get(input.sessionID) ?? []).filter((x) => !x.processed) + if (queue.length) { + for (const item of queue) { + if (item.processed) continue + messages.push( + ...MessageV2.toModelMessage([ + { + info: item.message, + parts: item.parts, + }, + ]), + ) + item.processed = true + } + Object.assign(assistantMsg, { + id: Identifier.ascending("message"), + role: "assistant", + system, + path: { + cwd: app.path.cwd, + root: app.path.root, + }, + cost: 0, + tokens: { + input: 0, + output: 0, + reasoning: 0, + cache: { read: 0, write: 0 }, + }, + modelID: input.modelID, + providerID: input.providerID, + time: { + created: Date.now(), + }, + sessionID: input.sessionID, + }) + await updateMessage(assistantMsg) + } + return { + messages, + } + }, maxRetries: 10, maxOutputTokens: outputLimit, abortSignal: abort.signal, @@ -1087,6 +1156,10 @@ export namespace Session { return result } + function isLocked(sessionID: string) { + return state().pending.has(sessionID) + } + function lock(sessionID: string) { log.info("locking", { sessionID }) if (state().pending.has(sessionID)) throw new BusyError(sessionID)