From a9f27371cf93082bc1163ee32aeca16c456c9b90 Mon Sep 17 00:00:00 2001 From: xu0o0 Date: Fri, 12 Dec 2025 10:02:06 +0800 Subject: [PATCH] acp: replay conversation history in session/load (#5385) --- packages/opencode/src/acp/agent.ts | 240 ++++++++++++++++++++++++++- packages/opencode/src/acp/session.ts | 31 ++++ 2 files changed, 269 insertions(+), 2 deletions(-) diff --git a/packages/opencode/src/acp/agent.ts b/packages/opencode/src/acp/agent.ts index ae4798d96..a1e45e1d2 100644 --- a/packages/opencode/src/acp/agent.ts +++ b/packages/opencode/src/acp/agent.ts @@ -28,7 +28,7 @@ import { Config } from "@/config/config" import { Todo } from "@/session/todo" import { z } from "zod" import { LoadAPIKeyError } from "ai" -import type { OpencodeClient } from "@opencode-ai/sdk/v2" +import type { OpencodeClient, SessionMessageResponse } from "@opencode-ai/sdk/v2" export namespace ACP { const log = Log.create({ service: "acp-agent" }) @@ -386,7 +386,7 @@ export namespace ACP { log.info("creating_session", { sessionId, mcpServers: params.mcpServers.length }) - const load = await this.loadSession({ + const load = await this.loadSessionMode({ cwd: directory, mcpServers: params.mcpServers, sessionId, @@ -412,6 +412,242 @@ export namespace ACP { } async loadSession(params: LoadSessionRequest) { + const directory = params.cwd + const sessionId = params.sessionId + + try { + const model = await defaultModel(this.config, directory) + + // Store ACP session state + const state = await this.sessionManager.load(sessionId, params.cwd, params.mcpServers, model) + + log.info("load_session", { sessionId, mcpServers: params.mcpServers.length }) + + const mode = await this.loadSessionMode({ + cwd: directory, + mcpServers: params.mcpServers, + sessionId, + }) + + this.setupEventSubscriptions(state) + + // Replay session history + const messages = await this.sdk.session + .messages( + { + sessionID: sessionId, + directory, + }, + { throwOnError: true }, + ) + .then((x) => x.data) + .catch((err) => { + log.error("unexpected error when fetching message", { error: err }) + return undefined + }) + + for (const msg of messages ?? []) { + log.debug("replay message", msg) + await this.processMessage(msg) + } + + return mode + } catch (e) { + const error = MessageV2.fromError(e, { + providerID: this.config.defaultModel?.providerID ?? "unknown", + }) + if (LoadAPIKeyError.isInstance(error)) { + throw RequestError.authRequired() + } + throw e + } + } + + private async processMessage(message: SessionMessageResponse) { + log.debug("process message", message) + if (message.info.role !== "assistant" && message.info.role !== "user") return + const sessionId = message.info.sessionID + + for (const part of message.parts) { + if (part.type === "tool") { + switch (part.state.status) { + case "pending": + await this.connection + .sessionUpdate({ + sessionId, + update: { + sessionUpdate: "tool_call", + toolCallId: part.callID, + title: part.tool, + kind: toToolKind(part.tool), + status: "pending", + locations: [], + rawInput: {}, + }, + }) + .catch((err) => { + log.error("failed to send tool pending to ACP", { error: err }) + }) + break + case "running": + await this.connection + .sessionUpdate({ + sessionId, + update: { + sessionUpdate: "tool_call_update", + toolCallId: part.callID, + status: "in_progress", + locations: toLocations(part.tool, part.state.input), + rawInput: part.state.input, + }, + }) + .catch((err) => { + log.error("failed to send tool in_progress to ACP", { error: err }) + }) + break + case "completed": + const kind = toToolKind(part.tool) + const content: ToolCallContent[] = [ + { + type: "content", + content: { + type: "text", + text: part.state.output, + }, + }, + ] + + if (kind === "edit") { + const input = part.state.input + const filePath = typeof input["filePath"] === "string" ? input["filePath"] : "" + const oldText = typeof input["oldString"] === "string" ? input["oldString"] : "" + const newText = + typeof input["newString"] === "string" + ? input["newString"] + : typeof input["content"] === "string" + ? input["content"] + : "" + content.push({ + type: "diff", + path: filePath, + oldText, + newText, + }) + } + + if (part.tool === "todowrite") { + const parsedTodos = z.array(Todo.Info).safeParse(JSON.parse(part.state.output)) + if (parsedTodos.success) { + await this.connection + .sessionUpdate({ + sessionId, + update: { + sessionUpdate: "plan", + entries: parsedTodos.data.map((todo) => { + const status: PlanEntry["status"] = + todo.status === "cancelled" ? "completed" : (todo.status as PlanEntry["status"]) + return { + priority: "medium", + status, + content: todo.content, + } + }), + }, + }) + .catch((err) => { + log.error("failed to send session update for todo", { error: err }) + }) + } else { + log.error("failed to parse todo output", { error: parsedTodos.error }) + } + } + + await this.connection + .sessionUpdate({ + sessionId, + update: { + sessionUpdate: "tool_call_update", + toolCallId: part.callID, + status: "completed", + kind, + content, + title: part.state.title, + rawOutput: { + output: part.state.output, + metadata: part.state.metadata, + }, + }, + }) + .catch((err) => { + log.error("failed to send tool completed to ACP", { error: err }) + }) + break + case "error": + await this.connection + .sessionUpdate({ + sessionId, + update: { + sessionUpdate: "tool_call_update", + toolCallId: part.callID, + status: "failed", + content: [ + { + type: "content", + content: { + type: "text", + text: part.state.error, + }, + }, + ], + rawOutput: { + error: part.state.error, + }, + }, + }) + .catch((err) => { + log.error("failed to send tool error to ACP", { error: err }) + }) + break + } + } else if (part.type === "text") { + if (part.text) { + await this.connection + .sessionUpdate({ + sessionId, + update: { + sessionUpdate: message.info.role === "user" ? "user_message_chunk" : "agent_message_chunk", + content: { + type: "text", + text: part.text, + }, + }, + }) + .catch((err) => { + log.error("failed to send text to ACP", { error: err }) + }) + } + } else if (part.type === "reasoning") { + if (part.text) { + await this.connection + .sessionUpdate({ + sessionId, + update: { + sessionUpdate: "agent_thought_chunk", + content: { + type: "text", + text: part.text, + }, + }, + }) + .catch((err) => { + log.error("failed to send reasoning to ACP", { error: err }) + }) + } + } + } + } + + private async loadSessionMode(params: LoadSessionRequest) { const directory = params.cwd const model = await defaultModel(this.config, directory) const sessionId = params.sessionId diff --git a/packages/opencode/src/acp/session.ts b/packages/opencode/src/acp/session.ts index 6658e4203..70b658347 100644 --- a/packages/opencode/src/acp/session.ts +++ b/packages/opencode/src/acp/session.ts @@ -40,6 +40,37 @@ export class ACPSessionManager { return state } + async load( + sessionId: string, + cwd: string, + mcpServers: McpServer[], + model?: ACPSessionState["model"], + ): Promise { + const session = await this.sdk.session + .get( + { + sessionID: sessionId, + directory: cwd, + }, + { throwOnError: true }, + ) + .then((x) => x.data!) + + const resolvedModel = model + + const state: ACPSessionState = { + id: sessionId, + cwd, + mcpServers, + createdAt: new Date(session.time.created), + model: resolvedModel, + } + log.info("loading_session", { state }) + + this.sessions.set(sessionId, state) + return state + } + get(sessionId: string): ACPSessionState { const session = this.sessions.get(sessionId) if (!session) {