This commit is contained in:
Dax Raad 2025-11-16 18:06:44 -05:00
parent ec4ce5fe92
commit 4379270ba8
4 changed files with 513 additions and 769 deletions

View file

@ -1,9 +1,8 @@
import { streamText, type ModelMessage, type StreamTextResult, type Tool as AITool } from "ai"
import { streamText, type ModelMessage } from "ai"
import { Session } from "."
import { Identifier } from "../id/id"
import { Instance } from "../project/instance"
import { Provider } from "../provider/provider"
import { defer } from "../util/defer"
import { MessageV2 } from "./message-v2"
import { SystemPrompt } from "./system"
import { Bus } from "../bus"
@ -14,9 +13,6 @@ import { Flag } from "../flag/flag"
import { Token } from "../util/token"
import { Log } from "../util/log"
import { ProviderTransform } from "@/provider/transform"
import { SessionRetry } from "./retry"
import { Config } from "@/config/config"
import { Lock } from "../util/lock"
import { SessionProcessor } from "./processor"
export namespace SessionCompaction {
@ -37,14 +33,12 @@ export namespace SessionCompaction {
if (context === 0) return false
const count = input.tokens.input + input.tokens.cache.read + input.tokens.output
const output = Math.min(input.model.limit.output, SessionPrompt.OUTPUT_TOKEN_MAX) || SessionPrompt.OUTPUT_TOKEN_MAX
// const usable = context - output
const usable = 20_000
const usable = context - output
return count > usable
}
export const PRUNE_MINIMUM = 20_000
export const PRUNE_PROTECT = 40_000
const MAX_RETRIES = 10
// goes backwards through parts until there are 40_000 tokens worth of tool
// calls. then erases output of previous tool calls. idea is to throw away old
@ -111,6 +105,7 @@ export namespace SessionCompaction {
parentID: input.parentID,
sessionID: input.sessionID,
mode: "build",
summary: true,
path: {
cwd: Instance.directory,
root: Instance.worktree,
@ -128,33 +123,6 @@ export namespace SessionCompaction {
created: Date.now(),
},
})) as MessageV2.Assistant
const stream = streamText({
// set to 0, we handle loop
maxRetries: 0,
model: model.language,
providerOptions: ProviderTransform.providerOptions(model.npm, model.providerID, model.info.options),
headers: model.info.headers,
abortSignal: input.abort,
tools: model.info.tool_call ? {} : undefined,
messages: [
...system.map(
(x): ModelMessage => ({
role: "system",
content: x,
}),
),
...MessageV2.toModelMessage(input.messages),
{
role: "user",
content: [
{
type: "text",
text: "Provide a detailed but concise summary of our conversation above. Focus on information that would be helpful for continuing the conversation, including what we did, what we're doing, which files we're working on, and what we're going to do next.",
},
],
},
],
})
const processor = SessionProcessor.create({
assistantMessage: msg,
sessionID: input.sessionID,
@ -162,79 +130,14 @@ export namespace SessionCompaction {
model: model.info,
abort: input.abort,
})
const result = await processor.process(stream)
return result
}
export async function run(input: { sessionID: string; providerID: string; modelID: string; signal?: AbortSignal }) {
const signal = input.signal ?? new AbortController().signal
await using lock = input.signal === undefined ? await Lock.write(input.sessionID) : undefined
await Session.update(input.sessionID, (draft) => {
draft.time.compacting = Date.now()
})
await using _ = defer(async () => {
await Session.update(input.sessionID, (draft) => {
draft.time.compacting = undefined
})
})
const toSummarize = await MessageV2.filterCompacted(MessageV2.stream(input.sessionID))
const model = await Provider.getModel(input.providerID, input.modelID)
const system = [
...SystemPrompt.summarize(model.providerID),
...(await SystemPrompt.environment()),
...(await SystemPrompt.custom()),
]
const msg = (await Session.updateMessage({
id: Identifier.ascending("message"),
role: "assistant",
parentID: toSummarize.findLast((m) => m.info.role === "user")?.info.id!,
sessionID: input.sessionID,
mode: "build",
path: {
cwd: Instance.directory,
root: Instance.worktree,
},
summary: true,
cost: 0,
tokens: {
output: 0,
input: 0,
reasoning: 0,
cache: { read: 0, write: 0 },
},
modelID: input.modelID,
providerID: model.providerID,
time: {
created: Date.now(),
},
})) as MessageV2.Assistant
const part = (await Session.updatePart({
type: "text",
sessionID: input.sessionID,
messageID: msg.id,
id: Identifier.ascending("part"),
text: "",
time: {
start: Date.now(),
},
})) as MessageV2.TextPart
const doStream = () =>
const result = await processor.process(() =>
streamText({
// set to 0, we handle loop
maxRetries: 0,
model: model.language,
providerOptions: ProviderTransform.providerOptions(model.npm, model.providerID, model.info.options),
headers: model.info.headers,
abortSignal: signal,
onError(error) {
log.error("stream error", {
error,
})
},
abortSignal: input.abort,
tools: model.info.tool_call ? {} : undefined,
messages: [
...system.map(
@ -243,7 +146,7 @@ export namespace SessionCompaction {
content: x,
}),
),
...MessageV2.toModelMessage(toSummarize),
...MessageV2.toModelMessage(input.messages),
{
role: "user",
content: [
@ -254,168 +157,32 @@ export namespace SessionCompaction {
],
},
],
})
// TODO: reduce duplication between compaction.ts & prompt.ts
const process = async (
stream: StreamTextResult<Record<string, AITool>, never>,
retries: { count: number; max: number },
) => {
let shouldRetry = false
try {
for await (const value of stream.fullStream) {
signal.throwIfAborted()
switch (value.type) {
case "text-delta":
part.text += value.text
if (value.providerMetadata) part.metadata = value.providerMetadata
if (part.text)
await Session.updatePart({
part,
delta: value.text,
})
continue
case "text-end": {
part.text = part.text.trimEnd()
part.time = {
start: Date.now(),
end: Date.now(),
}
if (value.providerMetadata) part.metadata = value.providerMetadata
await Session.updatePart(part)
continue
}
case "finish-step": {
const usage = Session.getUsage({
model: model.info,
usage: value.usage,
metadata: value.providerMetadata,
})
msg.cost += usage.cost
msg.tokens = usage.tokens
await Session.updateMessage(msg)
continue
}
case "error":
throw value.error
default:
continue
}
}
} catch (e) {
log.error("compaction error", {
error: e,
})
const error = MessageV2.fromError(e, { providerID: input.providerID })
if (retries.count < retries.max && MessageV2.APIError.isInstance(error) && error.data.isRetryable) {
shouldRetry = true
await Session.updatePart({
id: Identifier.ascending("part"),
messageID: msg.id,
sessionID: msg.sessionID,
type: "retry",
attempt: retries.count + 1,
time: {
created: Date.now(),
},
error,
})
} else {
msg.error = error
Bus.publish(Session.Event.Error, {
sessionID: msg.sessionID,
error: msg.error,
})
}
}
const parts = await MessageV2.parts(msg.id)
return {
info: msg,
parts,
shouldRetry,
}
}
let stream = doStream()
const cfg = await Config.get()
const maxRetries = cfg.experimental?.chatMaxRetries ?? MAX_RETRIES
let result = await process(stream, {
count: 0,
max: maxRetries,
})
if (result.shouldRetry) {
const start = Date.now()
for (let retry = 1; retry < maxRetries; retry++) {
const lastRetryPart = result.parts.findLast((p): p is MessageV2.RetryPart => p.type === "retry")
if (lastRetryPart) {
const delayMs = SessionRetry.getBoundedDelay({
error: lastRetryPart.error,
attempt: retry,
startTime: start,
})
if (!delayMs) {
break
}
log.info("retrying with backoff", {
attempt: retry,
delayMs,
elapsed: Date.now() - start,
})
const stop = await SessionRetry.sleep(delayMs, signal)
.then(() => false)
.catch((error) => {
if (error instanceof DOMException && error.name === "AbortError") {
const err = new MessageV2.AbortedError(
{ message: error.message },
{
cause: error,
},
).toObject()
result.info.error = err
Bus.publish(Session.Event.Error, {
sessionID: result.info.sessionID,
error: result.info.error,
})
return true
}
throw error
})
if (stop) break
}
stream = doStream()
result = await process(stream, {
count: retry,
max: maxRetries,
})
if (!result.shouldRetry) {
break
}
}
}
msg.time.completed = Date.now()
if (
!msg.error ||
(MessageV2.AbortedError.isInstance(msg.error) &&
result.parts.some((part): part is MessageV2.TextPart => part.type === "text" && part.text.length > 0))
) {
msg.summary = true
Bus.publish(Event.Compacted, {
}),
)
if (result === "continue") {
const continueMsg = await Session.updateMessage({
id: Identifier.ascending("message"),
role: "user",
sessionID: input.sessionID,
time: {
created: Date.now(),
},
agent: "build",
model: input.model,
})
await Session.updatePart({
id: Identifier.ascending("part"),
messageID: continueMsg.id,
sessionID: input.sessionID,
type: "text",
synthetic: true,
text: "Continue if you have next steps",
time: {
start: Date.now(),
end: Date.now(),
},
})
}
await Session.updateMessage(msg)
return {
info: msg,
parts: result.parts,
}
return "continue"
}
}

View file

@ -550,116 +550,101 @@ export namespace MessageV2 {
if (msg.parts.length === 0) continue
if (msg.info.role === "user") {
result.push({
const userMessage: UIMessage = {
id: msg.info.id,
role: "user",
parts: msg.parts.flatMap((part): UIMessage["parts"] => {
if (part.type === "text")
return [
{
type: "text",
text: part.text,
},
]
// text/plain and directory files are converted into text parts, ignore them
if (part.type === "file" && part.mime !== "text/plain" && part.mime !== "application/x-directory")
return [
{
type: "file",
url: part.url,
mediaType: part.mime,
filename: part.filename,
},
]
parts: [],
}
result.push(userMessage)
for (const part of msg.parts) {
if (part.type === "text")
userMessage.parts.push({
type: "text",
text: part.text,
})
// text/plain and directory files are converted into text parts, ignore them
if (part.type === "file" && part.mime !== "text/plain" && part.mime !== "application/x-directory")
userMessage.parts.push({
type: "file",
url: part.url,
mediaType: part.mime,
filename: part.filename,
})
if (part.type === "compaction") {
return [
{
type: "text",
text: "The user requested a compaction of the session. YOU MUST CONTINUE THE CONVERSATION AFTER THIS MESSAGE.",
},
]
}
return []
}),
})
if (part.type === "compaction") {
userMessage.parts.push({
type: "text",
text: "The user requested a compaction of the session.",
})
}
}
}
if (msg.info.role === "assistant") {
result.push({
const assistantMessage: UIMessage = {
id: msg.info.id,
role: "assistant",
parts: msg.parts.flatMap((part): UIMessage["parts"] => {
if (part.type === "text")
return [
{
type: "text",
text: part.text,
providerMetadata: part.metadata,
},
]
if (part.type === "step-start")
return [
{
type: "step-start",
},
]
if (part.type === "tool") {
if (part.state.status === "completed") {
if (part.state.attachments?.length) {
result.push({
id: Identifier.ascending("message"),
role: "user",
parts: [
{
type: "text",
text: `Tool ${part.tool} returned an attachment:`,
},
...part.state.attachments.map((attachment) => ({
type: "file" as const,
url: attachment.url,
mediaType: attachment.mime,
filename: attachment.filename,
})),
],
})
}
return [
{
type: ("tool-" + part.tool) as `tool-${string}`,
state: "output-available",
toolCallId: part.callID,
input: part.state.input,
output: part.state.time.compacted ? "[Old tool result content cleared]" : part.state.output,
callProviderMetadata: part.metadata,
},
]
parts: [],
}
result.push(assistantMessage)
for (const part of msg.parts) {
if (part.type === "text")
assistantMessage.parts.push({
type: "text",
text: part.text,
providerMetadata: part.metadata,
})
if (part.type === "step-start")
assistantMessage.parts.push({
type: "step-start",
})
if (part.type === "tool") {
if (part.state.status === "completed") {
if (part.state.attachments?.length) {
result.push({
id: Identifier.ascending("message"),
role: "user",
parts: [
{
type: "text",
text: `Tool ${part.tool} returned an attachment:`,
},
...part.state.attachments.map((attachment) => ({
type: "file" as const,
url: attachment.url,
mediaType: attachment.mime,
filename: attachment.filename,
})),
],
})
}
if (part.state.status === "error")
return [
{
type: ("tool-" + part.tool) as `tool-${string}`,
state: "output-error",
toolCallId: part.callID,
input: part.state.input,
errorText: part.state.error,
callProviderMetadata: part.metadata,
},
]
assistantMessage.parts.push({
type: ("tool-" + part.tool) as `tool-${string}`,
state: "output-available",
toolCallId: part.callID,
input: part.state.input,
output: part.state.time.compacted ? "[Old tool result content cleared]" : part.state.output,
callProviderMetadata: part.metadata,
})
}
if (part.type === "reasoning") {
return [
{
type: "reasoning",
text: part.text,
providerMetadata: part.metadata,
},
]
}
return []
}),
})
if (part.state.status === "error")
assistantMessage.parts.push({
type: ("tool-" + part.tool) as `tool-${string}`,
state: "output-error",
toolCallId: part.callID,
input: part.state.input,
errorText: part.state.error,
callProviderMetadata: part.metadata,
})
}
if (part.type === "reasoning") {
assistantMessage.parts.push({
type: "reasoning",
text: part.text,
providerMetadata: part.metadata,
})
}
}
}
}
@ -710,8 +695,7 @@ export namespace MessageV2 {
msg.parts.some((part) => part.type === "compaction")
)
break
if (msg.info.role === "assistant" && msg.info.summary === true) break
if (msg.info.role === "assistant" && msg.info.finish) completed.add(msg.info.id)
if (msg.info.summary) break
}
result.reverse()
return result

View file

@ -1,6 +1,6 @@
import type { ModelsDev } from "@/provider/models"
import { MessageV2 } from "./message-v2"
import type { StreamTextResult, Tool as AITool } from "ai"
import { type StreamTextResult, type Tool as AITool, APICallError } from "ai"
import { Log } from "@/util/log"
import { Identifier } from "@/id/id"
import { Session } from "."
@ -9,6 +9,7 @@ import { Permission } from "@/permission"
import { Snapshot } from "@/snapshot"
import { SessionSummary } from "./summary"
import { Bus } from "@/bus"
import { SessionRetry } from "./retry"
export namespace SessionProcessor {
const DOOM_LOOP_THRESHOLD = 3
@ -27,6 +28,7 @@ export namespace SessionProcessor {
const toolcalls: Record<string, MessageV2.ToolPart> = {}
let snapshot: string | undefined
let blocked = false
let attempt = 0
const result = {
get message() {
@ -35,314 +37,327 @@ export namespace SessionProcessor {
partFromToolCall(toolCallID: string) {
return toolcalls[toolCallID]
},
async process(stream: StreamTextResult<Record<string, AITool>, never>) {
async process(fn: () => StreamTextResult<Record<string, AITool>, never>) {
log.info("process")
try {
let currentText: MessageV2.TextPart | undefined
let reasoningMap: Record<string, MessageV2.ReasoningPart> = {}
while (true) {
try {
let currentText: MessageV2.TextPart | undefined
let reasoningMap: Record<string, MessageV2.ReasoningPart> = {}
const stream = fn()
for await (const value of stream.fullStream) {
input.abort.throwIfAborted()
switch (value.type) {
case "start":
break
for await (const value of stream.fullStream) {
input.abort.throwIfAborted()
switch (value.type) {
case "start":
break
case "reasoning-start":
if (value.id in reasoningMap) {
continue
}
reasoningMap[value.id] = {
id: Identifier.ascending("part"),
messageID: input.assistantMessage.id,
sessionID: input.assistantMessage.sessionID,
type: "reasoning",
text: "",
time: {
start: Date.now(),
},
metadata: value.providerMetadata,
}
break
case "reasoning-delta":
if (value.id in reasoningMap) {
const part = reasoningMap[value.id]
part.text += value.text
if (value.providerMetadata) part.metadata = value.providerMetadata
if (part.text) await Session.updatePart({ part, delta: value.text })
}
break
case "reasoning-end":
if (value.id in reasoningMap) {
const part = reasoningMap[value.id]
part.text = part.text.trimEnd()
part.time = {
...part.time,
end: Date.now(),
case "reasoning-start":
if (value.id in reasoningMap) {
continue
}
if (value.providerMetadata) part.metadata = value.providerMetadata
await Session.updatePart(part)
delete reasoningMap[value.id]
}
break
case "tool-input-start":
const part = await Session.updatePart({
id: toolcalls[value.id]?.id ?? Identifier.ascending("part"),
messageID: input.assistantMessage.id,
sessionID: input.assistantMessage.sessionID,
type: "tool",
tool: value.toolName,
callID: value.id,
state: {
status: "pending",
input: {},
raw: "",
},
})
toolcalls[value.id] = part as MessageV2.ToolPart
break
case "tool-input-delta":
break
case "tool-input-end":
break
case "tool-call": {
const match = toolcalls[value.toolCallId]
if (match) {
const part = await Session.updatePart({
...match,
tool: value.toolName,
state: {
status: "running",
input: value.input,
time: {
start: Date.now(),
},
reasoningMap[value.id] = {
id: Identifier.ascending("part"),
messageID: input.assistantMessage.id,
sessionID: input.assistantMessage.sessionID,
type: "reasoning",
text: "",
time: {
start: Date.now(),
},
metadata: value.providerMetadata,
})
toolcalls[value.toolCallId] = part as MessageV2.ToolPart
}
break
const parts = await MessageV2.parts(input.assistantMessage.id)
const lastThree = parts.slice(-DOOM_LOOP_THRESHOLD)
if (
lastThree.length === DOOM_LOOP_THRESHOLD &&
lastThree.every(
(p) =>
p.type === "tool" &&
p.tool === value.toolName &&
p.state.status !== "pending" &&
JSON.stringify(p.state.input) === JSON.stringify(value.input),
)
) {
const permission = await Agent.get(input.assistantMessage.mode).then((x) => x.permission)
if (permission.doom_loop === "ask") {
await Permission.ask({
type: "doom_loop",
pattern: value.toolName,
sessionID: input.assistantMessage.sessionID,
messageID: input.assistantMessage.id,
callID: value.toolCallId,
title: `Possible doom loop: "${value.toolName}" called ${DOOM_LOOP_THRESHOLD} times with identical arguments`,
metadata: {
tool: value.toolName,
input: value.input,
case "reasoning-delta":
if (value.id in reasoningMap) {
const part = reasoningMap[value.id]
part.text += value.text
if (value.providerMetadata) part.metadata = value.providerMetadata
if (part.text) await Session.updatePart({ part, delta: value.text })
}
break
case "reasoning-end":
if (value.id in reasoningMap) {
const part = reasoningMap[value.id]
part.text = part.text.trimEnd()
part.time = {
...part.time,
end: Date.now(),
}
if (value.providerMetadata) part.metadata = value.providerMetadata
await Session.updatePart(part)
delete reasoningMap[value.id]
}
break
case "tool-input-start":
const part = await Session.updatePart({
id: toolcalls[value.id]?.id ?? Identifier.ascending("part"),
messageID: input.assistantMessage.id,
sessionID: input.assistantMessage.sessionID,
type: "tool",
tool: value.toolName,
callID: value.id,
state: {
status: "pending",
input: {},
raw: "",
},
})
toolcalls[value.id] = part as MessageV2.ToolPart
break
case "tool-input-delta":
break
case "tool-input-end":
break
case "tool-call": {
const match = toolcalls[value.toolCallId]
if (match) {
const part = await Session.updatePart({
...match,
tool: value.toolName,
state: {
status: "running",
input: value.input,
time: {
start: Date.now(),
},
})
},
metadata: value.providerMetadata,
})
toolcalls[value.toolCallId] = part as MessageV2.ToolPart
const parts = await MessageV2.parts(input.assistantMessage.id)
const lastThree = parts.slice(-DOOM_LOOP_THRESHOLD)
if (
lastThree.length === DOOM_LOOP_THRESHOLD &&
lastThree.every(
(p) =>
p.type === "tool" &&
p.tool === value.toolName &&
p.state.status !== "pending" &&
JSON.stringify(p.state.input) === JSON.stringify(value.input),
)
) {
const permission = await Agent.get(input.assistantMessage.mode).then((x) => x.permission)
if (permission.doom_loop === "ask") {
await Permission.ask({
type: "doom_loop",
pattern: value.toolName,
sessionID: input.assistantMessage.sessionID,
messageID: input.assistantMessage.id,
callID: value.toolCallId,
title: `Possible doom loop: "${value.toolName}" called ${DOOM_LOOP_THRESHOLD} times with identical arguments`,
metadata: {
tool: value.toolName,
input: value.input,
},
})
}
}
}
break
}
break
}
case "tool-result": {
const match = toolcalls[value.toolCallId]
if (match && match.state.status === "running") {
await Session.updatePart({
...match,
state: {
status: "completed",
input: value.input,
output: value.output.output,
metadata: value.output.metadata,
title: value.output.title,
time: {
start: match.state.time.start,
end: Date.now(),
},
attachments: value.output.attachments,
},
})
delete toolcalls[value.toolCallId]
}
break
}
case "tool-error": {
const match = toolcalls[value.toolCallId]
if (match && match.state.status === "running") {
await Session.updatePart({
...match,
state: {
status: "error",
input: value.input,
error: (value.error as any).toString(),
metadata: value.error instanceof Permission.RejectedError ? value.error.metadata : undefined,
time: {
start: match.state.time.start,
end: Date.now(),
},
},
})
if (value.error instanceof Permission.RejectedError) {
blocked = true
}
delete toolcalls[value.toolCallId]
}
break
}
case "error":
throw value.error
case "start-step":
snapshot = await Snapshot.track()
await Session.updatePart({
id: Identifier.ascending("part"),
messageID: input.assistantMessage.id,
sessionID: input.sessionID,
snapshot,
type: "step-start",
})
break
case "finish-step":
const usage = Session.getUsage({
model: input.model,
usage: value.usage,
metadata: value.providerMetadata,
})
input.assistantMessage.finish = value.finishReason
input.assistantMessage.cost += usage.cost
input.assistantMessage.tokens = usage.tokens
await Session.updatePart({
id: Identifier.ascending("part"),
reason: value.finishReason,
snapshot: await Snapshot.track(),
messageID: input.assistantMessage.id,
sessionID: input.assistantMessage.sessionID,
type: "step-finish",
tokens: usage.tokens,
cost: usage.cost,
})
await Session.updateMessage(input.assistantMessage)
if (snapshot) {
const patch = await Snapshot.patch(snapshot)
if (patch.files.length) {
case "tool-result": {
const match = toolcalls[value.toolCallId]
if (match && match.state.status === "running") {
await Session.updatePart({
id: Identifier.ascending("part"),
messageID: input.assistantMessage.id,
sessionID: input.sessionID,
type: "patch",
hash: patch.hash,
files: patch.files,
...match,
state: {
status: "completed",
input: value.input,
output: value.output.output,
metadata: value.output.metadata,
title: value.output.title,
time: {
start: match.state.time.start,
end: Date.now(),
},
attachments: value.output.attachments,
},
})
delete toolcalls[value.toolCallId]
}
snapshot = undefined
break
}
SessionSummary.summarize({
sessionID: input.sessionID,
messageID: input.assistantMessage.parentID,
})
break
case "text-start":
currentText = {
id: Identifier.ascending("part"),
messageID: input.assistantMessage.id,
sessionID: input.assistantMessage.sessionID,
type: "text",
text: "",
time: {
start: Date.now(),
},
metadata: value.providerMetadata,
}
break
case "text-delta":
if (currentText) {
currentText.text += value.text
if (value.providerMetadata) currentText.metadata = value.providerMetadata
if (currentText.text)
case "tool-error": {
const match = toolcalls[value.toolCallId]
if (match && match.state.status === "running") {
await Session.updatePart({
part: currentText,
delta: value.text,
...match,
state: {
status: "error",
input: value.input,
error: (value.error as any).toString(),
metadata: value.error instanceof Permission.RejectedError ? value.error.metadata : undefined,
time: {
start: match.state.time.start,
end: Date.now(),
},
},
})
}
break
case "text-end":
if (currentText) {
currentText.text = currentText.text.trimEnd()
currentText.time = {
start: Date.now(),
end: Date.now(),
if (value.error instanceof Permission.RejectedError) {
blocked = true
}
delete toolcalls[value.toolCallId]
}
if (value.providerMetadata) currentText.metadata = value.providerMetadata
await Session.updatePart(currentText)
break
}
currentText = undefined
break
case "error":
throw value.error
case "finish":
input.assistantMessage.time.completed = Date.now()
await Session.updateMessage(input.assistantMessage)
break
case "start-step":
snapshot = await Snapshot.track()
await Session.updatePart({
id: Identifier.ascending("part"),
messageID: input.assistantMessage.id,
sessionID: input.sessionID,
snapshot,
type: "step-start",
})
break
default:
log.info("unhandled", {
...value,
})
continue
case "finish-step":
const usage = Session.getUsage({
model: input.model,
usage: value.usage,
metadata: value.providerMetadata,
})
input.assistantMessage.finish = value.finishReason
input.assistantMessage.cost += usage.cost
input.assistantMessage.tokens = usage.tokens
await Session.updatePart({
id: Identifier.ascending("part"),
reason: value.finishReason,
snapshot: await Snapshot.track(),
messageID: input.assistantMessage.id,
sessionID: input.assistantMessage.sessionID,
type: "step-finish",
tokens: usage.tokens,
cost: usage.cost,
})
await Session.updateMessage(input.assistantMessage)
if (snapshot) {
const patch = await Snapshot.patch(snapshot)
if (patch.files.length) {
await Session.updatePart({
id: Identifier.ascending("part"),
messageID: input.assistantMessage.id,
sessionID: input.sessionID,
type: "patch",
hash: patch.hash,
files: patch.files,
})
}
snapshot = undefined
}
SessionSummary.summarize({
sessionID: input.sessionID,
messageID: input.assistantMessage.parentID,
})
break
case "text-start":
currentText = {
id: Identifier.ascending("part"),
messageID: input.assistantMessage.id,
sessionID: input.assistantMessage.sessionID,
type: "text",
text: "",
time: {
start: Date.now(),
},
metadata: value.providerMetadata,
}
break
case "text-delta":
if (currentText) {
currentText.text += value.text
if (value.providerMetadata) currentText.metadata = value.providerMetadata
if (currentText.text)
await Session.updatePart({
part: currentText,
delta: value.text,
})
}
break
case "text-end":
if (currentText) {
currentText.text = currentText.text.trimEnd()
currentText.time = {
start: Date.now(),
end: Date.now(),
}
if (value.providerMetadata) currentText.metadata = value.providerMetadata
await Session.updatePart(currentText)
}
currentText = undefined
break
case "finish":
input.assistantMessage.time.completed = Date.now()
await Session.updateMessage(input.assistantMessage)
break
default:
log.info("unhandled", {
...value,
})
continue
}
}
}
} catch (e) {
log.error("process", {
error: e,
})
const error = MessageV2.fromError(e, { providerID: input.providerID })
input.assistantMessage.error = error
Bus.publish(Session.Event.Error, {
sessionID: input.assistantMessage.sessionID,
error: input.assistantMessage.error,
})
}
const p = await MessageV2.parts(input.assistantMessage.id)
for (const part of p) {
if (part.type === "tool" && part.state.status !== "completed" && part.state.status !== "error") {
await Session.updatePart({
...part,
state: {
...part.state,
status: "error",
error: "Tool execution aborted",
time: {
start: Date.now(),
end: Date.now(),
},
},
} catch (e) {
log.error("process", {
error: e,
})
const error = MessageV2.fromError(e, { providerID: input.providerID })
if (error?.name === "APIError" && error.data.isRetryable) {
attempt++
const delay = SessionRetry.getRetryDelayInMs(error, attempt)
if (delay) {
await SessionRetry.sleep(delay, input.abort)
continue
}
}
input.assistantMessage.error = error
Bus.publish(Session.Event.Error, {
sessionID: input.assistantMessage.sessionID,
error: input.assistantMessage.error,
})
}
const p = await MessageV2.parts(input.assistantMessage.id)
for (const part of p) {
if (part.type === "tool" && part.state.status !== "completed" && part.state.status !== "error") {
await Session.updatePart({
...part,
state: {
...part.state,
status: "error",
error: "Tool execution aborted",
time: {
start: Date.now(),
end: Date.now(),
},
},
})
}
}
input.assistantMessage.time.completed = Date.now()
await Session.updateMessage(input.assistantMessage)
if (blocked) return "stop"
if (input.assistantMessage.error) return "stop"
return "continue"
}
input.assistantMessage.time.completed = Date.now()
await Session.updateMessage(input.assistantMessage)
return { info: input.assistantMessage, parts: p, blocked }
},
}
return result

View file

@ -283,8 +283,8 @@ export namespace SessionPrompt {
using _ = defer(() => cancel(sessionID))
let step = 0
let retries = 0
while (true) {
if (abort.aborted) break
let msgs = await MessageV2.filterCompacted(MessageV2.stream(sessionID))
let lastUser: MessageV2.User | undefined
@ -295,10 +295,11 @@ export namespace SessionPrompt {
const msg = msgs[i]
if (!lastUser && msg.info.role === "user") lastUser = msg.info as MessageV2.User
if (!lastAssistant && msg.info.role === "assistant") lastAssistant = msg.info as MessageV2.Assistant
if (msg.info.role === "assistant" && msg.info.finish) lastFinished = msg.info as MessageV2.Assistant
if (!lastFinished && msg.info.role === "assistant" && msg.info.finish)
lastFinished = msg.info as MessageV2.Assistant
if (lastUser && lastFinished) break
const compaction = msg.parts.find((part) => part.type === "compaction")
if (compaction) {
if (compaction && !lastFinished) {
tasks.push(compaction)
}
}
@ -312,9 +313,25 @@ export namespace SessionPrompt {
const model = await Provider.getModel(lastUser.model.providerID, lastUser.model.modelID)
const task = tasks.pop()
// pending compaction
if (task?.type === "compaction") {
await SessionCompaction.process({
messages: msgs,
parentID: lastUser.id,
abort,
model: {
providerID: model.providerID,
modelID: model.modelID,
},
sessionID,
})
continue
}
// context overflow, needs compaction
if (
task?.type !== "compaction" &&
lastFinished &&
lastFinished.summary !== true &&
SessionCompaction.isOverflow({ tokens: lastFinished.tokens, model: model.info })
) {
const msg = await Session.updateMessage({
@ -336,97 +353,85 @@ export namespace SessionPrompt {
continue
}
const result = await iife(async () => {
if (task?.type === "compaction") {
return await SessionCompaction.process({
messages: msgs,
parentID: lastUser.id,
abort,
model: {
providerID: model.providerID,
modelID: model.modelID,
},
sessionID,
})
}
const agent = await Agent.get(lastUser.agent)
msgs = insertReminders({
messages: msgs,
agent,
})
const processor = SessionProcessor.create({
assistantMessage: (await Session.updateMessage({
id: Identifier.ascending("message"),
parentID: lastUser.id,
role: "assistant",
mode: agent.name,
path: {
cwd: Instance.directory,
root: Instance.worktree,
},
cost: 0,
tokens: {
input: 0,
output: 0,
reasoning: 0,
cache: { read: 0, write: 0 },
},
modelID: model.modelID,
providerID: model.providerID,
time: {
created: Date.now(),
},
sessionID,
})) as MessageV2.Assistant,
sessionID: sessionID,
model: model.info,
// normal processing
const agent = await Agent.get(lastUser.agent)
msgs = insertReminders({
messages: msgs,
agent,
})
const processor = SessionProcessor.create({
assistantMessage: (await Session.updateMessage({
id: Identifier.ascending("message"),
parentID: lastUser.id,
role: "assistant",
mode: agent.name,
path: {
cwd: Instance.directory,
root: Instance.worktree,
},
cost: 0,
tokens: {
input: 0,
output: 0,
reasoning: 0,
cache: { read: 0, write: 0 },
},
modelID: model.modelID,
providerID: model.providerID,
abort,
})
const system = await resolveSystemPrompt({
providerID: model.providerID,
modelID: model.info.id,
agent,
system: lastUser.system,
})
const tools = await resolveTools({
agent,
time: {
created: Date.now(),
},
sessionID,
model: lastUser.model,
tools: lastUser.tools,
processor,
})) as MessageV2.Assistant,
sessionID: sessionID,
model: model.info,
providerID: model.providerID,
abort,
})
const system = await resolveSystemPrompt({
providerID: model.providerID,
modelID: model.info.id,
agent,
system: lastUser.system,
})
const tools = await resolveTools({
agent,
sessionID,
model: lastUser.model,
tools: lastUser.tools,
processor,
})
const params = await Plugin.trigger(
"chat.params",
{
sessionID: sessionID,
agent: lastUser.agent,
model: model.info,
provider: await Provider.getProvider(model.providerID),
message: lastUser,
},
{
temperature: model.info.temperature
? (agent.temperature ?? ProviderTransform.temperature(model.providerID, model.modelID))
: undefined,
topP: agent.topP ?? ProviderTransform.topP(model.providerID, model.modelID),
options: {
...ProviderTransform.options(model.providerID, model.modelID, model.npm ?? "", sessionID),
...model.info.options,
...agent.options,
},
},
)
if (step === 1) {
SessionSummary.summarize({
sessionID: sessionID,
messageID: lastUser.id,
})
const params = await Plugin.trigger(
"chat.params",
{
sessionID: sessionID,
agent: lastUser.agent,
model: model.info,
provider: await Provider.getProvider(model.providerID),
message: lastUser,
},
{
temperature: model.info.temperature
? (agent.temperature ?? ProviderTransform.temperature(model.providerID, model.modelID))
: undefined,
topP: agent.topP ?? ProviderTransform.topP(model.providerID, model.modelID),
options: {
...ProviderTransform.options(model.providerID, model.modelID, model.npm ?? "", sessionID),
...model.info.options,
...agent.options,
},
},
)
}
if (step === 1) {
SessionSummary.summarize({
sessionID: sessionID,
messageID: lastUser.id,
})
}
const stream = streamText({
const result = await processor.process(() =>
streamText({
onError(error) {
log.error("stream error", {
error,
@ -514,37 +519,10 @@ export namespace SessionPrompt {
},
],
}),
})
return await processor.process(stream)
})
if (result.blocked) break
if (result.info.error?.name === "APIError" && result.info.error.data.isRetryable) {
retries++
const delay = SessionRetry.getRetryDelayInMs(result.info.error, retries)
if (!delay) break
state()[sessionID].status = {
type: "retry",
attempt: retries,
message: result.info.error.data.message,
}
Bus.publish(Event.Status, {
sessionID,
status: state()[sessionID].status,
})
await SessionRetry.sleep(delay, abort).catch(() => {})
state()[sessionID].status = {
type: "busy",
}
Bus.publish(Event.Status, {
sessionID,
status: state()[sessionID].status,
})
continue
}
retries = 0
if (result.info.error) break
}),
)
if (result === "stop") break
continue
}
SessionCompaction.prune({ sessionID })
for await (const item of MessageV2.stream(sessionID)) {