From 4379270ba833f3d136f0cc34a452ea03a6155d61 Mon Sep 17 00:00:00 2001 From: Dax Raad Date: Sun, 16 Nov 2025 18:06:44 -0500 Subject: [PATCH] progress --- packages/opencode/src/session/compaction.ts | 293 +--------- packages/opencode/src/session/message-v2.ts | 188 +++---- packages/opencode/src/session/processor.ts | 579 ++++++++++---------- packages/opencode/src/session/prompt.ts | 222 ++++---- 4 files changed, 513 insertions(+), 769 deletions(-) diff --git a/packages/opencode/src/session/compaction.ts b/packages/opencode/src/session/compaction.ts index f53da40da..eeb424ed4 100644 --- a/packages/opencode/src/session/compaction.ts +++ b/packages/opencode/src/session/compaction.ts @@ -1,9 +1,8 @@ -import { streamText, type ModelMessage, type StreamTextResult, type Tool as AITool } from "ai" +import { streamText, type ModelMessage } from "ai" import { Session } from "." import { Identifier } from "../id/id" import { Instance } from "../project/instance" import { Provider } from "../provider/provider" -import { defer } from "../util/defer" import { MessageV2 } from "./message-v2" import { SystemPrompt } from "./system" import { Bus } from "../bus" @@ -14,9 +13,6 @@ import { Flag } from "../flag/flag" import { Token } from "../util/token" import { Log } from "../util/log" import { ProviderTransform } from "@/provider/transform" -import { SessionRetry } from "./retry" -import { Config } from "@/config/config" -import { Lock } from "../util/lock" import { SessionProcessor } from "./processor" export namespace SessionCompaction { @@ -37,14 +33,12 @@ export namespace SessionCompaction { if (context === 0) return false const count = input.tokens.input + input.tokens.cache.read + input.tokens.output const output = Math.min(input.model.limit.output, SessionPrompt.OUTPUT_TOKEN_MAX) || SessionPrompt.OUTPUT_TOKEN_MAX - // const usable = context - output - const usable = 20_000 + const usable = context - output return count > usable } export const PRUNE_MINIMUM = 20_000 export const PRUNE_PROTECT = 40_000 - const MAX_RETRIES = 10 // goes backwards through parts until there are 40_000 tokens worth of tool // calls. then erases output of previous tool calls. idea is to throw away old @@ -111,6 +105,7 @@ export namespace SessionCompaction { parentID: input.parentID, sessionID: input.sessionID, mode: "build", + summary: true, path: { cwd: Instance.directory, root: Instance.worktree, @@ -128,33 +123,6 @@ export namespace SessionCompaction { created: Date.now(), }, })) as MessageV2.Assistant - const stream = streamText({ - // set to 0, we handle loop - maxRetries: 0, - model: model.language, - providerOptions: ProviderTransform.providerOptions(model.npm, model.providerID, model.info.options), - headers: model.info.headers, - abortSignal: input.abort, - tools: model.info.tool_call ? {} : undefined, - messages: [ - ...system.map( - (x): ModelMessage => ({ - role: "system", - content: x, - }), - ), - ...MessageV2.toModelMessage(input.messages), - { - role: "user", - content: [ - { - type: "text", - text: "Provide a detailed but concise summary of our conversation above. Focus on information that would be helpful for continuing the conversation, including what we did, what we're doing, which files we're working on, and what we're going to do next.", - }, - ], - }, - ], - }) const processor = SessionProcessor.create({ assistantMessage: msg, sessionID: input.sessionID, @@ -162,79 +130,14 @@ export namespace SessionCompaction { model: model.info, abort: input.abort, }) - const result = await processor.process(stream) - return result - } - - export async function run(input: { sessionID: string; providerID: string; modelID: string; signal?: AbortSignal }) { - const signal = input.signal ?? new AbortController().signal - await using lock = input.signal === undefined ? await Lock.write(input.sessionID) : undefined - - await Session.update(input.sessionID, (draft) => { - draft.time.compacting = Date.now() - }) - await using _ = defer(async () => { - await Session.update(input.sessionID, (draft) => { - draft.time.compacting = undefined - }) - }) - const toSummarize = await MessageV2.filterCompacted(MessageV2.stream(input.sessionID)) - const model = await Provider.getModel(input.providerID, input.modelID) - const system = [ - ...SystemPrompt.summarize(model.providerID), - ...(await SystemPrompt.environment()), - ...(await SystemPrompt.custom()), - ] - - const msg = (await Session.updateMessage({ - id: Identifier.ascending("message"), - role: "assistant", - parentID: toSummarize.findLast((m) => m.info.role === "user")?.info.id!, - sessionID: input.sessionID, - mode: "build", - path: { - cwd: Instance.directory, - root: Instance.worktree, - }, - summary: true, - cost: 0, - tokens: { - output: 0, - input: 0, - reasoning: 0, - cache: { read: 0, write: 0 }, - }, - modelID: input.modelID, - providerID: model.providerID, - time: { - created: Date.now(), - }, - })) as MessageV2.Assistant - - const part = (await Session.updatePart({ - type: "text", - sessionID: input.sessionID, - messageID: msg.id, - id: Identifier.ascending("part"), - text: "", - time: { - start: Date.now(), - }, - })) as MessageV2.TextPart - - const doStream = () => + const result = await processor.process(() => streamText({ // set to 0, we handle loop maxRetries: 0, model: model.language, providerOptions: ProviderTransform.providerOptions(model.npm, model.providerID, model.info.options), headers: model.info.headers, - abortSignal: signal, - onError(error) { - log.error("stream error", { - error, - }) - }, + abortSignal: input.abort, tools: model.info.tool_call ? {} : undefined, messages: [ ...system.map( @@ -243,7 +146,7 @@ export namespace SessionCompaction { content: x, }), ), - ...MessageV2.toModelMessage(toSummarize), + ...MessageV2.toModelMessage(input.messages), { role: "user", content: [ @@ -254,168 +157,32 @@ export namespace SessionCompaction { ], }, ], - }) - - // TODO: reduce duplication between compaction.ts & prompt.ts - const process = async ( - stream: StreamTextResult, never>, - retries: { count: number; max: number }, - ) => { - let shouldRetry = false - try { - for await (const value of stream.fullStream) { - signal.throwIfAborted() - switch (value.type) { - case "text-delta": - part.text += value.text - if (value.providerMetadata) part.metadata = value.providerMetadata - if (part.text) - await Session.updatePart({ - part, - delta: value.text, - }) - continue - case "text-end": { - part.text = part.text.trimEnd() - part.time = { - start: Date.now(), - end: Date.now(), - } - if (value.providerMetadata) part.metadata = value.providerMetadata - await Session.updatePart(part) - continue - } - case "finish-step": { - const usage = Session.getUsage({ - model: model.info, - usage: value.usage, - metadata: value.providerMetadata, - }) - msg.cost += usage.cost - msg.tokens = usage.tokens - await Session.updateMessage(msg) - continue - } - case "error": - throw value.error - default: - continue - } - } - } catch (e) { - log.error("compaction error", { - error: e, - }) - const error = MessageV2.fromError(e, { providerID: input.providerID }) - if (retries.count < retries.max && MessageV2.APIError.isInstance(error) && error.data.isRetryable) { - shouldRetry = true - await Session.updatePart({ - id: Identifier.ascending("part"), - messageID: msg.id, - sessionID: msg.sessionID, - type: "retry", - attempt: retries.count + 1, - time: { - created: Date.now(), - }, - error, - }) - } else { - msg.error = error - Bus.publish(Session.Event.Error, { - sessionID: msg.sessionID, - error: msg.error, - }) - } - } - - const parts = await MessageV2.parts(msg.id) - return { - info: msg, - parts, - shouldRetry, - } - } - - let stream = doStream() - const cfg = await Config.get() - const maxRetries = cfg.experimental?.chatMaxRetries ?? MAX_RETRIES - let result = await process(stream, { - count: 0, - max: maxRetries, - }) - if (result.shouldRetry) { - const start = Date.now() - for (let retry = 1; retry < maxRetries; retry++) { - const lastRetryPart = result.parts.findLast((p): p is MessageV2.RetryPart => p.type === "retry") - - if (lastRetryPart) { - const delayMs = SessionRetry.getBoundedDelay({ - error: lastRetryPart.error, - attempt: retry, - startTime: start, - }) - if (!delayMs) { - break - } - - log.info("retrying with backoff", { - attempt: retry, - delayMs, - elapsed: Date.now() - start, - }) - - const stop = await SessionRetry.sleep(delayMs, signal) - .then(() => false) - .catch((error) => { - if (error instanceof DOMException && error.name === "AbortError") { - const err = new MessageV2.AbortedError( - { message: error.message }, - { - cause: error, - }, - ).toObject() - result.info.error = err - Bus.publish(Session.Event.Error, { - sessionID: result.info.sessionID, - error: result.info.error, - }) - return true - } - throw error - }) - - if (stop) break - } - - stream = doStream() - result = await process(stream, { - count: retry, - max: maxRetries, - }) - if (!result.shouldRetry) { - break - } - } - } - - msg.time.completed = Date.now() - - if ( - !msg.error || - (MessageV2.AbortedError.isInstance(msg.error) && - result.parts.some((part): part is MessageV2.TextPart => part.type === "text" && part.text.length > 0)) - ) { - msg.summary = true - Bus.publish(Event.Compacted, { + }), + ) + if (result === "continue") { + const continueMsg = await Session.updateMessage({ + id: Identifier.ascending("message"), + role: "user", sessionID: input.sessionID, + time: { + created: Date.now(), + }, + agent: "build", + model: input.model, + }) + await Session.updatePart({ + id: Identifier.ascending("part"), + messageID: continueMsg.id, + sessionID: input.sessionID, + type: "text", + synthetic: true, + text: "Continue if you have next steps", + time: { + start: Date.now(), + end: Date.now(), + }, }) } - await Session.updateMessage(msg) - - return { - info: msg, - parts: result.parts, - } + return "continue" } } diff --git a/packages/opencode/src/session/message-v2.ts b/packages/opencode/src/session/message-v2.ts index 32f26bd71..c636b5635 100644 --- a/packages/opencode/src/session/message-v2.ts +++ b/packages/opencode/src/session/message-v2.ts @@ -550,116 +550,101 @@ export namespace MessageV2 { if (msg.parts.length === 0) continue if (msg.info.role === "user") { - result.push({ + const userMessage: UIMessage = { id: msg.info.id, role: "user", - parts: msg.parts.flatMap((part): UIMessage["parts"] => { - if (part.type === "text") - return [ - { - type: "text", - text: part.text, - }, - ] - // text/plain and directory files are converted into text parts, ignore them - if (part.type === "file" && part.mime !== "text/plain" && part.mime !== "application/x-directory") - return [ - { - type: "file", - url: part.url, - mediaType: part.mime, - filename: part.filename, - }, - ] + parts: [], + } + result.push(userMessage) + for (const part of msg.parts) { + if (part.type === "text") + userMessage.parts.push({ + type: "text", + text: part.text, + }) + // text/plain and directory files are converted into text parts, ignore them + if (part.type === "file" && part.mime !== "text/plain" && part.mime !== "application/x-directory") + userMessage.parts.push({ + type: "file", + url: part.url, + mediaType: part.mime, + filename: part.filename, + }) - if (part.type === "compaction") { - return [ - { - type: "text", - text: "The user requested a compaction of the session. YOU MUST CONTINUE THE CONVERSATION AFTER THIS MESSAGE.", - }, - ] - } - return [] - }), - }) + if (part.type === "compaction") { + userMessage.parts.push({ + type: "text", + text: "The user requested a compaction of the session.", + }) + } + } } if (msg.info.role === "assistant") { - result.push({ + const assistantMessage: UIMessage = { id: msg.info.id, role: "assistant", - parts: msg.parts.flatMap((part): UIMessage["parts"] => { - if (part.type === "text") - return [ - { - type: "text", - text: part.text, - providerMetadata: part.metadata, - }, - ] - if (part.type === "step-start") - return [ - { - type: "step-start", - }, - ] - if (part.type === "tool") { - if (part.state.status === "completed") { - if (part.state.attachments?.length) { - result.push({ - id: Identifier.ascending("message"), - role: "user", - parts: [ - { - type: "text", - text: `Tool ${part.tool} returned an attachment:`, - }, - ...part.state.attachments.map((attachment) => ({ - type: "file" as const, - url: attachment.url, - mediaType: attachment.mime, - filename: attachment.filename, - })), - ], - }) - } - return [ - { - type: ("tool-" + part.tool) as `tool-${string}`, - state: "output-available", - toolCallId: part.callID, - input: part.state.input, - output: part.state.time.compacted ? "[Old tool result content cleared]" : part.state.output, - callProviderMetadata: part.metadata, - }, - ] + parts: [], + } + result.push(assistantMessage) + for (const part of msg.parts) { + if (part.type === "text") + assistantMessage.parts.push({ + type: "text", + text: part.text, + providerMetadata: part.metadata, + }) + if (part.type === "step-start") + assistantMessage.parts.push({ + type: "step-start", + }) + if (part.type === "tool") { + if (part.state.status === "completed") { + if (part.state.attachments?.length) { + result.push({ + id: Identifier.ascending("message"), + role: "user", + parts: [ + { + type: "text", + text: `Tool ${part.tool} returned an attachment:`, + }, + ...part.state.attachments.map((attachment) => ({ + type: "file" as const, + url: attachment.url, + mediaType: attachment.mime, + filename: attachment.filename, + })), + ], + }) } - if (part.state.status === "error") - return [ - { - type: ("tool-" + part.tool) as `tool-${string}`, - state: "output-error", - toolCallId: part.callID, - input: part.state.input, - errorText: part.state.error, - callProviderMetadata: part.metadata, - }, - ] + assistantMessage.parts.push({ + type: ("tool-" + part.tool) as `tool-${string}`, + state: "output-available", + toolCallId: part.callID, + input: part.state.input, + output: part.state.time.compacted ? "[Old tool result content cleared]" : part.state.output, + callProviderMetadata: part.metadata, + }) } - if (part.type === "reasoning") { - return [ - { - type: "reasoning", - text: part.text, - providerMetadata: part.metadata, - }, - ] - } - - return [] - }), - }) + if (part.state.status === "error") + assistantMessage.parts.push({ + type: ("tool-" + part.tool) as `tool-${string}`, + state: "output-error", + toolCallId: part.callID, + input: part.state.input, + errorText: part.state.error, + callProviderMetadata: part.metadata, + }) + } + if (part.type === "reasoning") { + assistantMessage.parts.push({ + type: "reasoning", + text: part.text, + providerMetadata: part.metadata, + }) + } + } } } @@ -710,8 +695,7 @@ export namespace MessageV2 { msg.parts.some((part) => part.type === "compaction") ) break - if (msg.info.role === "assistant" && msg.info.summary === true) break - if (msg.info.role === "assistant" && msg.info.finish) completed.add(msg.info.id) + if (msg.info.summary) break } result.reverse() return result diff --git a/packages/opencode/src/session/processor.ts b/packages/opencode/src/session/processor.ts index cac169c46..7b3ad90b5 100644 --- a/packages/opencode/src/session/processor.ts +++ b/packages/opencode/src/session/processor.ts @@ -1,6 +1,6 @@ import type { ModelsDev } from "@/provider/models" import { MessageV2 } from "./message-v2" -import type { StreamTextResult, Tool as AITool } from "ai" +import { type StreamTextResult, type Tool as AITool, APICallError } from "ai" import { Log } from "@/util/log" import { Identifier } from "@/id/id" import { Session } from "." @@ -9,6 +9,7 @@ import { Permission } from "@/permission" import { Snapshot } from "@/snapshot" import { SessionSummary } from "./summary" import { Bus } from "@/bus" +import { SessionRetry } from "./retry" export namespace SessionProcessor { const DOOM_LOOP_THRESHOLD = 3 @@ -27,6 +28,7 @@ export namespace SessionProcessor { const toolcalls: Record = {} let snapshot: string | undefined let blocked = false + let attempt = 0 const result = { get message() { @@ -35,314 +37,327 @@ export namespace SessionProcessor { partFromToolCall(toolCallID: string) { return toolcalls[toolCallID] }, - async process(stream: StreamTextResult, never>) { + async process(fn: () => StreamTextResult, never>) { log.info("process") - try { - let currentText: MessageV2.TextPart | undefined - let reasoningMap: Record = {} + while (true) { + try { + let currentText: MessageV2.TextPart | undefined + let reasoningMap: Record = {} + const stream = fn() - for await (const value of stream.fullStream) { - input.abort.throwIfAborted() - switch (value.type) { - case "start": - break + for await (const value of stream.fullStream) { + input.abort.throwIfAborted() + switch (value.type) { + case "start": + break - case "reasoning-start": - if (value.id in reasoningMap) { - continue - } - reasoningMap[value.id] = { - id: Identifier.ascending("part"), - messageID: input.assistantMessage.id, - sessionID: input.assistantMessage.sessionID, - type: "reasoning", - text: "", - time: { - start: Date.now(), - }, - metadata: value.providerMetadata, - } - break - - case "reasoning-delta": - if (value.id in reasoningMap) { - const part = reasoningMap[value.id] - part.text += value.text - if (value.providerMetadata) part.metadata = value.providerMetadata - if (part.text) await Session.updatePart({ part, delta: value.text }) - } - break - - case "reasoning-end": - if (value.id in reasoningMap) { - const part = reasoningMap[value.id] - part.text = part.text.trimEnd() - - part.time = { - ...part.time, - end: Date.now(), + case "reasoning-start": + if (value.id in reasoningMap) { + continue } - if (value.providerMetadata) part.metadata = value.providerMetadata - await Session.updatePart(part) - delete reasoningMap[value.id] - } - break - - case "tool-input-start": - const part = await Session.updatePart({ - id: toolcalls[value.id]?.id ?? Identifier.ascending("part"), - messageID: input.assistantMessage.id, - sessionID: input.assistantMessage.sessionID, - type: "tool", - tool: value.toolName, - callID: value.id, - state: { - status: "pending", - input: {}, - raw: "", - }, - }) - toolcalls[value.id] = part as MessageV2.ToolPart - break - - case "tool-input-delta": - break - - case "tool-input-end": - break - - case "tool-call": { - const match = toolcalls[value.toolCallId] - if (match) { - const part = await Session.updatePart({ - ...match, - tool: value.toolName, - state: { - status: "running", - input: value.input, - time: { - start: Date.now(), - }, + reasoningMap[value.id] = { + id: Identifier.ascending("part"), + messageID: input.assistantMessage.id, + sessionID: input.assistantMessage.sessionID, + type: "reasoning", + text: "", + time: { + start: Date.now(), }, metadata: value.providerMetadata, - }) - toolcalls[value.toolCallId] = part as MessageV2.ToolPart + } + break - const parts = await MessageV2.parts(input.assistantMessage.id) - const lastThree = parts.slice(-DOOM_LOOP_THRESHOLD) - if ( - lastThree.length === DOOM_LOOP_THRESHOLD && - lastThree.every( - (p) => - p.type === "tool" && - p.tool === value.toolName && - p.state.status !== "pending" && - JSON.stringify(p.state.input) === JSON.stringify(value.input), - ) - ) { - const permission = await Agent.get(input.assistantMessage.mode).then((x) => x.permission) - if (permission.doom_loop === "ask") { - await Permission.ask({ - type: "doom_loop", - pattern: value.toolName, - sessionID: input.assistantMessage.sessionID, - messageID: input.assistantMessage.id, - callID: value.toolCallId, - title: `Possible doom loop: "${value.toolName}" called ${DOOM_LOOP_THRESHOLD} times with identical arguments`, - metadata: { - tool: value.toolName, - input: value.input, + case "reasoning-delta": + if (value.id in reasoningMap) { + const part = reasoningMap[value.id] + part.text += value.text + if (value.providerMetadata) part.metadata = value.providerMetadata + if (part.text) await Session.updatePart({ part, delta: value.text }) + } + break + + case "reasoning-end": + if (value.id in reasoningMap) { + const part = reasoningMap[value.id] + part.text = part.text.trimEnd() + + part.time = { + ...part.time, + end: Date.now(), + } + if (value.providerMetadata) part.metadata = value.providerMetadata + await Session.updatePart(part) + delete reasoningMap[value.id] + } + break + + case "tool-input-start": + const part = await Session.updatePart({ + id: toolcalls[value.id]?.id ?? Identifier.ascending("part"), + messageID: input.assistantMessage.id, + sessionID: input.assistantMessage.sessionID, + type: "tool", + tool: value.toolName, + callID: value.id, + state: { + status: "pending", + input: {}, + raw: "", + }, + }) + toolcalls[value.id] = part as MessageV2.ToolPart + break + + case "tool-input-delta": + break + + case "tool-input-end": + break + + case "tool-call": { + const match = toolcalls[value.toolCallId] + if (match) { + const part = await Session.updatePart({ + ...match, + tool: value.toolName, + state: { + status: "running", + input: value.input, + time: { + start: Date.now(), }, - }) + }, + metadata: value.providerMetadata, + }) + toolcalls[value.toolCallId] = part as MessageV2.ToolPart + + const parts = await MessageV2.parts(input.assistantMessage.id) + const lastThree = parts.slice(-DOOM_LOOP_THRESHOLD) + if ( + lastThree.length === DOOM_LOOP_THRESHOLD && + lastThree.every( + (p) => + p.type === "tool" && + p.tool === value.toolName && + p.state.status !== "pending" && + JSON.stringify(p.state.input) === JSON.stringify(value.input), + ) + ) { + const permission = await Agent.get(input.assistantMessage.mode).then((x) => x.permission) + if (permission.doom_loop === "ask") { + await Permission.ask({ + type: "doom_loop", + pattern: value.toolName, + sessionID: input.assistantMessage.sessionID, + messageID: input.assistantMessage.id, + callID: value.toolCallId, + title: `Possible doom loop: "${value.toolName}" called ${DOOM_LOOP_THRESHOLD} times with identical arguments`, + metadata: { + tool: value.toolName, + input: value.input, + }, + }) + } } } + break } - break - } - case "tool-result": { - const match = toolcalls[value.toolCallId] - if (match && match.state.status === "running") { - await Session.updatePart({ - ...match, - state: { - status: "completed", - input: value.input, - output: value.output.output, - metadata: value.output.metadata, - title: value.output.title, - time: { - start: match.state.time.start, - end: Date.now(), - }, - attachments: value.output.attachments, - }, - }) - - delete toolcalls[value.toolCallId] - } - break - } - - case "tool-error": { - const match = toolcalls[value.toolCallId] - if (match && match.state.status === "running") { - await Session.updatePart({ - ...match, - state: { - status: "error", - input: value.input, - error: (value.error as any).toString(), - metadata: value.error instanceof Permission.RejectedError ? value.error.metadata : undefined, - time: { - start: match.state.time.start, - end: Date.now(), - }, - }, - }) - - if (value.error instanceof Permission.RejectedError) { - blocked = true - } - delete toolcalls[value.toolCallId] - } - break - } - case "error": - throw value.error - - case "start-step": - snapshot = await Snapshot.track() - await Session.updatePart({ - id: Identifier.ascending("part"), - messageID: input.assistantMessage.id, - sessionID: input.sessionID, - snapshot, - type: "step-start", - }) - break - - case "finish-step": - const usage = Session.getUsage({ - model: input.model, - usage: value.usage, - metadata: value.providerMetadata, - }) - input.assistantMessage.finish = value.finishReason - input.assistantMessage.cost += usage.cost - input.assistantMessage.tokens = usage.tokens - await Session.updatePart({ - id: Identifier.ascending("part"), - reason: value.finishReason, - snapshot: await Snapshot.track(), - messageID: input.assistantMessage.id, - sessionID: input.assistantMessage.sessionID, - type: "step-finish", - tokens: usage.tokens, - cost: usage.cost, - }) - await Session.updateMessage(input.assistantMessage) - if (snapshot) { - const patch = await Snapshot.patch(snapshot) - if (patch.files.length) { + case "tool-result": { + const match = toolcalls[value.toolCallId] + if (match && match.state.status === "running") { await Session.updatePart({ - id: Identifier.ascending("part"), - messageID: input.assistantMessage.id, - sessionID: input.sessionID, - type: "patch", - hash: patch.hash, - files: patch.files, + ...match, + state: { + status: "completed", + input: value.input, + output: value.output.output, + metadata: value.output.metadata, + title: value.output.title, + time: { + start: match.state.time.start, + end: Date.now(), + }, + attachments: value.output.attachments, + }, }) + + delete toolcalls[value.toolCallId] } - snapshot = undefined + break } - SessionSummary.summarize({ - sessionID: input.sessionID, - messageID: input.assistantMessage.parentID, - }) - break - case "text-start": - currentText = { - id: Identifier.ascending("part"), - messageID: input.assistantMessage.id, - sessionID: input.assistantMessage.sessionID, - type: "text", - text: "", - time: { - start: Date.now(), - }, - metadata: value.providerMetadata, - } - break - - case "text-delta": - if (currentText) { - currentText.text += value.text - if (value.providerMetadata) currentText.metadata = value.providerMetadata - if (currentText.text) + case "tool-error": { + const match = toolcalls[value.toolCallId] + if (match && match.state.status === "running") { await Session.updatePart({ - part: currentText, - delta: value.text, + ...match, + state: { + status: "error", + input: value.input, + error: (value.error as any).toString(), + metadata: value.error instanceof Permission.RejectedError ? value.error.metadata : undefined, + time: { + start: match.state.time.start, + end: Date.now(), + }, + }, }) - } - break - case "text-end": - if (currentText) { - currentText.text = currentText.text.trimEnd() - currentText.time = { - start: Date.now(), - end: Date.now(), + if (value.error instanceof Permission.RejectedError) { + blocked = true + } + delete toolcalls[value.toolCallId] } - if (value.providerMetadata) currentText.metadata = value.providerMetadata - await Session.updatePart(currentText) + break } - currentText = undefined - break + case "error": + throw value.error - case "finish": - input.assistantMessage.time.completed = Date.now() - await Session.updateMessage(input.assistantMessage) - break + case "start-step": + snapshot = await Snapshot.track() + await Session.updatePart({ + id: Identifier.ascending("part"), + messageID: input.assistantMessage.id, + sessionID: input.sessionID, + snapshot, + type: "step-start", + }) + break - default: - log.info("unhandled", { - ...value, - }) - continue + case "finish-step": + const usage = Session.getUsage({ + model: input.model, + usage: value.usage, + metadata: value.providerMetadata, + }) + input.assistantMessage.finish = value.finishReason + input.assistantMessage.cost += usage.cost + input.assistantMessage.tokens = usage.tokens + await Session.updatePart({ + id: Identifier.ascending("part"), + reason: value.finishReason, + snapshot: await Snapshot.track(), + messageID: input.assistantMessage.id, + sessionID: input.assistantMessage.sessionID, + type: "step-finish", + tokens: usage.tokens, + cost: usage.cost, + }) + await Session.updateMessage(input.assistantMessage) + if (snapshot) { + const patch = await Snapshot.patch(snapshot) + if (patch.files.length) { + await Session.updatePart({ + id: Identifier.ascending("part"), + messageID: input.assistantMessage.id, + sessionID: input.sessionID, + type: "patch", + hash: patch.hash, + files: patch.files, + }) + } + snapshot = undefined + } + SessionSummary.summarize({ + sessionID: input.sessionID, + messageID: input.assistantMessage.parentID, + }) + break + + case "text-start": + currentText = { + id: Identifier.ascending("part"), + messageID: input.assistantMessage.id, + sessionID: input.assistantMessage.sessionID, + type: "text", + text: "", + time: { + start: Date.now(), + }, + metadata: value.providerMetadata, + } + break + + case "text-delta": + if (currentText) { + currentText.text += value.text + if (value.providerMetadata) currentText.metadata = value.providerMetadata + if (currentText.text) + await Session.updatePart({ + part: currentText, + delta: value.text, + }) + } + break + + case "text-end": + if (currentText) { + currentText.text = currentText.text.trimEnd() + currentText.time = { + start: Date.now(), + end: Date.now(), + } + if (value.providerMetadata) currentText.metadata = value.providerMetadata + await Session.updatePart(currentText) + } + currentText = undefined + break + + case "finish": + input.assistantMessage.time.completed = Date.now() + await Session.updateMessage(input.assistantMessage) + break + + default: + log.info("unhandled", { + ...value, + }) + continue + } } - } - } catch (e) { - log.error("process", { - error: e, - }) - const error = MessageV2.fromError(e, { providerID: input.providerID }) - input.assistantMessage.error = error - Bus.publish(Session.Event.Error, { - sessionID: input.assistantMessage.sessionID, - error: input.assistantMessage.error, - }) - } - const p = await MessageV2.parts(input.assistantMessage.id) - for (const part of p) { - if (part.type === "tool" && part.state.status !== "completed" && part.state.status !== "error") { - await Session.updatePart({ - ...part, - state: { - ...part.state, - status: "error", - error: "Tool execution aborted", - time: { - start: Date.now(), - end: Date.now(), - }, - }, + } catch (e) { + log.error("process", { + error: e, + }) + const error = MessageV2.fromError(e, { providerID: input.providerID }) + if (error?.name === "APIError" && error.data.isRetryable) { + attempt++ + const delay = SessionRetry.getRetryDelayInMs(error, attempt) + if (delay) { + await SessionRetry.sleep(delay, input.abort) + continue + } + } + input.assistantMessage.error = error + Bus.publish(Session.Event.Error, { + sessionID: input.assistantMessage.sessionID, + error: input.assistantMessage.error, }) } + const p = await MessageV2.parts(input.assistantMessage.id) + for (const part of p) { + if (part.type === "tool" && part.state.status !== "completed" && part.state.status !== "error") { + await Session.updatePart({ + ...part, + state: { + ...part.state, + status: "error", + error: "Tool execution aborted", + time: { + start: Date.now(), + end: Date.now(), + }, + }, + }) + } + } + input.assistantMessage.time.completed = Date.now() + await Session.updateMessage(input.assistantMessage) + if (blocked) return "stop" + if (input.assistantMessage.error) return "stop" + return "continue" } - input.assistantMessage.time.completed = Date.now() - await Session.updateMessage(input.assistantMessage) - return { info: input.assistantMessage, parts: p, blocked } }, } return result diff --git a/packages/opencode/src/session/prompt.ts b/packages/opencode/src/session/prompt.ts index e06d856ab..655cbf66b 100644 --- a/packages/opencode/src/session/prompt.ts +++ b/packages/opencode/src/session/prompt.ts @@ -283,8 +283,8 @@ export namespace SessionPrompt { using _ = defer(() => cancel(sessionID)) let step = 0 - let retries = 0 while (true) { + if (abort.aborted) break let msgs = await MessageV2.filterCompacted(MessageV2.stream(sessionID)) let lastUser: MessageV2.User | undefined @@ -295,10 +295,11 @@ export namespace SessionPrompt { const msg = msgs[i] if (!lastUser && msg.info.role === "user") lastUser = msg.info as MessageV2.User if (!lastAssistant && msg.info.role === "assistant") lastAssistant = msg.info as MessageV2.Assistant - if (msg.info.role === "assistant" && msg.info.finish) lastFinished = msg.info as MessageV2.Assistant + if (!lastFinished && msg.info.role === "assistant" && msg.info.finish) + lastFinished = msg.info as MessageV2.Assistant if (lastUser && lastFinished) break const compaction = msg.parts.find((part) => part.type === "compaction") - if (compaction) { + if (compaction && !lastFinished) { tasks.push(compaction) } } @@ -312,9 +313,25 @@ export namespace SessionPrompt { const model = await Provider.getModel(lastUser.model.providerID, lastUser.model.modelID) const task = tasks.pop() + // pending compaction + if (task?.type === "compaction") { + await SessionCompaction.process({ + messages: msgs, + parentID: lastUser.id, + abort, + model: { + providerID: model.providerID, + modelID: model.modelID, + }, + sessionID, + }) + continue + } + + // context overflow, needs compaction if ( - task?.type !== "compaction" && lastFinished && + lastFinished.summary !== true && SessionCompaction.isOverflow({ tokens: lastFinished.tokens, model: model.info }) ) { const msg = await Session.updateMessage({ @@ -336,97 +353,85 @@ export namespace SessionPrompt { continue } - const result = await iife(async () => { - if (task?.type === "compaction") { - return await SessionCompaction.process({ - messages: msgs, - parentID: lastUser.id, - abort, - model: { - providerID: model.providerID, - modelID: model.modelID, - }, - sessionID, - }) - } - - const agent = await Agent.get(lastUser.agent) - msgs = insertReminders({ - messages: msgs, - agent, - }) - const processor = SessionProcessor.create({ - assistantMessage: (await Session.updateMessage({ - id: Identifier.ascending("message"), - parentID: lastUser.id, - role: "assistant", - mode: agent.name, - path: { - cwd: Instance.directory, - root: Instance.worktree, - }, - cost: 0, - tokens: { - input: 0, - output: 0, - reasoning: 0, - cache: { read: 0, write: 0 }, - }, - modelID: model.modelID, - providerID: model.providerID, - time: { - created: Date.now(), - }, - sessionID, - })) as MessageV2.Assistant, - sessionID: sessionID, - model: model.info, + // normal processing + const agent = await Agent.get(lastUser.agent) + msgs = insertReminders({ + messages: msgs, + agent, + }) + const processor = SessionProcessor.create({ + assistantMessage: (await Session.updateMessage({ + id: Identifier.ascending("message"), + parentID: lastUser.id, + role: "assistant", + mode: agent.name, + path: { + cwd: Instance.directory, + root: Instance.worktree, + }, + cost: 0, + tokens: { + input: 0, + output: 0, + reasoning: 0, + cache: { read: 0, write: 0 }, + }, + modelID: model.modelID, providerID: model.providerID, - abort, - }) - const system = await resolveSystemPrompt({ - providerID: model.providerID, - modelID: model.info.id, - agent, - system: lastUser.system, - }) - const tools = await resolveTools({ - agent, + time: { + created: Date.now(), + }, sessionID, - model: lastUser.model, - tools: lastUser.tools, - processor, + })) as MessageV2.Assistant, + sessionID: sessionID, + model: model.info, + providerID: model.providerID, + abort, + }) + const system = await resolveSystemPrompt({ + providerID: model.providerID, + modelID: model.info.id, + agent, + system: lastUser.system, + }) + const tools = await resolveTools({ + agent, + sessionID, + model: lastUser.model, + tools: lastUser.tools, + processor, + }) + const params = await Plugin.trigger( + "chat.params", + { + sessionID: sessionID, + agent: lastUser.agent, + model: model.info, + provider: await Provider.getProvider(model.providerID), + message: lastUser, + }, + { + temperature: model.info.temperature + ? (agent.temperature ?? ProviderTransform.temperature(model.providerID, model.modelID)) + : undefined, + topP: agent.topP ?? ProviderTransform.topP(model.providerID, model.modelID), + options: { + ...ProviderTransform.options(model.providerID, model.modelID, model.npm ?? "", sessionID), + ...model.info.options, + ...agent.options, + }, + }, + ) + + if (step === 1) { + SessionSummary.summarize({ + sessionID: sessionID, + messageID: lastUser.id, }) - const params = await Plugin.trigger( - "chat.params", - { - sessionID: sessionID, - agent: lastUser.agent, - model: model.info, - provider: await Provider.getProvider(model.providerID), - message: lastUser, - }, - { - temperature: model.info.temperature - ? (agent.temperature ?? ProviderTransform.temperature(model.providerID, model.modelID)) - : undefined, - topP: agent.topP ?? ProviderTransform.topP(model.providerID, model.modelID), - options: { - ...ProviderTransform.options(model.providerID, model.modelID, model.npm ?? "", sessionID), - ...model.info.options, - ...agent.options, - }, - }, - ) + } - if (step === 1) { - SessionSummary.summarize({ - sessionID: sessionID, - messageID: lastUser.id, - }) - } - - const stream = streamText({ + const result = await processor.process(() => + streamText({ onError(error) { log.error("stream error", { error, @@ -514,37 +519,10 @@ export namespace SessionPrompt { }, ], }), - }) - - return await processor.process(stream) - }) - - if (result.blocked) break - if (result.info.error?.name === "APIError" && result.info.error.data.isRetryable) { - retries++ - const delay = SessionRetry.getRetryDelayInMs(result.info.error, retries) - if (!delay) break - state()[sessionID].status = { - type: "retry", - attempt: retries, - message: result.info.error.data.message, - } - Bus.publish(Event.Status, { - sessionID, - status: state()[sessionID].status, - }) - await SessionRetry.sleep(delay, abort).catch(() => {}) - state()[sessionID].status = { - type: "busy", - } - Bus.publish(Event.Status, { - sessionID, - status: state()[sessionID].status, - }) - continue - } - retries = 0 - if (result.info.error) break + }), + ) + if (result === "stop") break + continue } SessionCompaction.prune({ sessionID }) for await (const item of MessageV2.stream(sessionID)) {