mirror of
https://fastgit.cc/https://github.com/anomalyco/opencode
synced 2026-04-21 13:21:17 +08:00
Merge branch 'nxl/improve-compaction-strategy' into dev
This commit is contained in:
@@ -1,6 +1,7 @@
|
||||
You are a helpful AI assistant tasked with summarizing conversations.
|
||||
|
||||
When asked to summarize, provide a detailed but concise summary of the conversation.
|
||||
When asked to summarize, provide a detailed but concise summary of the older conversation history.
|
||||
The most recent turns may be preserved verbatim outside your summary, so focus on information that would still be needed to continue the work with that recent context available.
|
||||
Focus on information that would be helpful for continuing the conversation, including:
|
||||
- What was done
|
||||
- What is currently being worked on
|
||||
|
||||
@@ -204,6 +204,13 @@ const InfoSchema = Schema.Struct({
|
||||
prune: Schema.optional(Schema.Boolean).annotate({
|
||||
description: "Enable pruning of old tool outputs (default: true)",
|
||||
}),
|
||||
tail_turns: Schema.optional(NonNegativeInt).annotate({
|
||||
description:
|
||||
"Number of recent user turns, including their following assistant/tool responses, to keep verbatim during compaction (default: 2)",
|
||||
}),
|
||||
tail_tokens: Schema.optional(NonNegativeInt).annotate({
|
||||
description: "Token budget for retained recent turn spans during compaction",
|
||||
}),
|
||||
reserved: Schema.optional(NonNegativeInt).annotate({
|
||||
description: "Token buffer for compaction. Leaves enough window to avoid overflow during compaction.",
|
||||
}),
|
||||
|
||||
@@ -15,7 +15,9 @@ import { NotFoundError } from "@/storage"
|
||||
import { ModelID, ProviderID } from "@/provider/schema"
|
||||
import { Effect, Layer, Context } from "effect"
|
||||
import { InstanceState } from "@/effect"
|
||||
import { isOverflow as overflow } from "./overflow"
|
||||
import { isOverflow as overflow, usable } from "./overflow"
|
||||
import { makeRuntime } from "@/effect/run-service"
|
||||
import { fn } from "@/util/fn"
|
||||
|
||||
const log = Log.create({ service: "session.compaction" })
|
||||
|
||||
@@ -31,6 +33,39 @@ export const Event = {
|
||||
export const PRUNE_MINIMUM = 20_000
|
||||
export const PRUNE_PROTECT = 40_000
|
||||
const PRUNE_PROTECTED_TOOLS = ["skill"]
|
||||
const DEFAULT_TAIL_TURNS = 2
|
||||
const MIN_TAIL_TOKENS = 2_000
|
||||
const MAX_TAIL_TOKENS = 8_000
|
||||
type Turn = {
|
||||
start: number
|
||||
end: number
|
||||
id: MessageID
|
||||
}
|
||||
|
||||
function tailBudget(input: { cfg: Config.Info; model: Provider.Model }) {
|
||||
return (
|
||||
input.cfg.compaction?.tail_tokens ??
|
||||
Math.min(MAX_TAIL_TOKENS, Math.max(MIN_TAIL_TOKENS, Math.floor(usable(input) * 0.25)))
|
||||
)
|
||||
}
|
||||
|
||||
function turns(messages: MessageV2.WithParts[]) {
|
||||
const result: Turn[] = []
|
||||
for (let i = 0; i < messages.length; i++) {
|
||||
const msg = messages[i]
|
||||
if (msg.info.role !== "user") continue
|
||||
if (msg.parts.some((part) => part.type === "compaction")) continue
|
||||
result.push({
|
||||
start: i,
|
||||
end: messages.length,
|
||||
id: msg.info.id,
|
||||
})
|
||||
}
|
||||
for (let i = 0; i < result.length - 1; i++) {
|
||||
result[i].end = result[i + 1].start
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
||||
export interface Interface {
|
||||
readonly isOverflow: (input: {
|
||||
@@ -84,6 +119,55 @@ export const layer: Layer.Layer<
|
||||
return overflow({ cfg: yield* config.get(), tokens: input.tokens, model: input.model })
|
||||
})
|
||||
|
||||
const estimate = Effect.fn("SessionCompaction.estimate")(function* (input: {
|
||||
messages: MessageV2.WithParts[]
|
||||
model: Provider.Model
|
||||
}) {
|
||||
const msgs = yield* MessageV2.toModelMessagesEffect(input.messages, input.model)
|
||||
return Token.estimate(JSON.stringify(msgs))
|
||||
})
|
||||
|
||||
const select = Effect.fn("SessionCompaction.select")(function* (input: {
|
||||
messages: MessageV2.WithParts[]
|
||||
cfg: Config.Info
|
||||
model: Provider.Model
|
||||
}) {
|
||||
const limit = input.cfg.compaction?.tail_turns ?? DEFAULT_TAIL_TURNS
|
||||
if (limit <= 0) return { head: input.messages, tail_start_id: undefined }
|
||||
const budget = tailBudget({ cfg: input.cfg, model: input.model })
|
||||
const all = turns(input.messages)
|
||||
if (!all.length) return { head: input.messages, tail_start_id: undefined }
|
||||
const recent = all.slice(-limit)
|
||||
const sizes = yield* Effect.forEach(
|
||||
recent,
|
||||
(turn) =>
|
||||
estimate({
|
||||
messages: input.messages.slice(turn.start, turn.end),
|
||||
model: input.model,
|
||||
}),
|
||||
{ concurrency: 1 },
|
||||
)
|
||||
if (sizes.at(-1)! > budget) {
|
||||
log.info("tail fallback", { budget, size: sizes.at(-1) })
|
||||
return { head: input.messages, tail_start_id: undefined }
|
||||
}
|
||||
|
||||
let total = 0
|
||||
let keep: Turn | undefined
|
||||
for (let i = recent.length - 1; i >= 0; i--) {
|
||||
const size = sizes[i]
|
||||
if (total + size > budget) break
|
||||
total += size
|
||||
keep = recent[i]
|
||||
}
|
||||
|
||||
if (!keep || keep.start === 0) return { head: input.messages, tail_start_id: undefined }
|
||||
return {
|
||||
head: input.messages.slice(0, keep.start),
|
||||
tail_start_id: keep.id,
|
||||
}
|
||||
})
|
||||
|
||||
// goes backwards through parts until there are PRUNE_PROTECT tokens worth of tool
|
||||
// calls, then erases output of older tool calls to free context space
|
||||
const prune = Effect.fn("SessionCompaction.prune")(function* (input: { sessionID: SessionID }) {
|
||||
@@ -146,6 +230,7 @@ export const layer: Layer.Layer<
|
||||
throw new Error(`Compaction parent must be a user message: ${input.parentID}`)
|
||||
}
|
||||
const userMessage = parent.info
|
||||
const compactionPart = parent.parts.find((part): part is MessageV2.CompactionPart => part.type === "compaction")
|
||||
|
||||
let messages = input.messages
|
||||
let replay:
|
||||
@@ -176,19 +261,20 @@ export const layer: Layer.Layer<
|
||||
const model = agent.model
|
||||
? yield* provider.getModel(agent.model.providerID, agent.model.modelID)
|
||||
: yield* provider.getModel(userMessage.model.providerID, userMessage.model.modelID)
|
||||
const cfg = yield* config.get()
|
||||
const history = compactionPart && messages.at(-1)?.info.id === input.parentID ? messages.slice(0, -1) : messages
|
||||
const selected = yield* select({
|
||||
messages: history,
|
||||
cfg,
|
||||
model,
|
||||
})
|
||||
// Allow plugins to inject context or replace compaction prompt.
|
||||
const compacting = yield* plugin.trigger(
|
||||
"experimental.session.compacting",
|
||||
{ sessionID: input.sessionID },
|
||||
{ context: [], prompt: undefined },
|
||||
)
|
||||
const defaultPrompt = `Provide a detailed prompt for continuing our conversation above.
|
||||
Focus on information that would be helpful for continuing the conversation, including what we did, what we're doing, which files we're working on, and what we're going to do next.
|
||||
The summary that you construct will be used so that another agent can read it and continue the work.
|
||||
Do not call any tools. Respond only with the summary text.
|
||||
Respond in the same language as the user's messages in the conversation.
|
||||
|
||||
When constructing the summary, try to stick to this template:
|
||||
const defaultPrompt = `When constructing the summary, try to stick to this template:
|
||||
---
|
||||
## Goal
|
||||
|
||||
@@ -213,7 +299,7 @@ When constructing the summary, try to stick to this template:
|
||||
---`
|
||||
|
||||
const prompt = compacting.prompt ?? [defaultPrompt, ...compacting.context].join("\n\n")
|
||||
const msgs = structuredClone(messages)
|
||||
const msgs = structuredClone(selected.head)
|
||||
yield* plugin.trigger("experimental.chat.messages.transform", {}, { messages: msgs })
|
||||
const modelMessages = yield* MessageV2.toModelMessagesEffect(msgs, model, { stripMedia: true })
|
||||
const ctx = yield* InstanceState.context
|
||||
@@ -276,6 +362,13 @@ When constructing the summary, try to stick to this template:
|
||||
return "stop"
|
||||
}
|
||||
|
||||
if (compactionPart && selected.tail_start_id && compactionPart.tail_start_id !== selected.tail_start_id) {
|
||||
yield* session.updatePart({
|
||||
...compactionPart,
|
||||
tail_start_id: selected.tail_start_id,
|
||||
})
|
||||
}
|
||||
|
||||
if (result === "continue" && input.auto) {
|
||||
if (replay) {
|
||||
const original = replay.info
|
||||
@@ -409,4 +502,25 @@ export const defaultLayer = Layer.suspend(() =>
|
||||
),
|
||||
)
|
||||
|
||||
const { runPromise } = makeRuntime(Service, defaultLayer)
|
||||
|
||||
export async function isOverflow(input: { tokens: MessageV2.Assistant["tokens"]; model: Provider.Model }) {
|
||||
return runPromise((svc) => svc.isOverflow(input))
|
||||
}
|
||||
|
||||
export async function prune(input: { sessionID: SessionID }) {
|
||||
return runPromise((svc) => svc.prune(input))
|
||||
}
|
||||
|
||||
export const create = fn(
|
||||
z.object({
|
||||
sessionID: SessionID.zod,
|
||||
agent: z.string(),
|
||||
model: z.object({ providerID: ProviderID.zod, modelID: ModelID.zod }),
|
||||
auto: z.boolean(),
|
||||
overflow: z.boolean().optional(),
|
||||
}),
|
||||
(input) => runPromise((svc) => svc.create(input)),
|
||||
)
|
||||
|
||||
export * as SessionCompaction from "./compaction"
|
||||
|
||||
@@ -208,6 +208,7 @@ export const CompactionPart = PartBase.extend({
|
||||
type: z.literal("compaction"),
|
||||
auto: z.boolean(),
|
||||
overflow: z.boolean().optional(),
|
||||
tail_start_id: MessageID.zod.optional(),
|
||||
}).meta({
|
||||
ref: "CompactionPart",
|
||||
})
|
||||
@@ -923,8 +924,21 @@ export function get(input: { sessionID: SessionID; messageID: MessageID }): With
|
||||
export function filterCompacted(msgs: Iterable<WithParts>) {
|
||||
const result = [] as WithParts[]
|
||||
const completed = new Set<string>()
|
||||
let retain: MessageID | undefined
|
||||
for (const msg of msgs) {
|
||||
result.push(msg)
|
||||
if (retain) {
|
||||
if (msg.info.id === retain) break
|
||||
continue
|
||||
}
|
||||
if (msg.info.role === "user" && completed.has(msg.info.id)) {
|
||||
const part = msg.parts.find((item): item is CompactionPart => item.type === "compaction")
|
||||
if (!part) continue
|
||||
if (!part.tail_start_id) break
|
||||
retain = part.tail_start_id
|
||||
if (msg.info.id === retain) break
|
||||
continue
|
||||
}
|
||||
if (msg.info.role === "user" && completed.has(msg.info.id) && msg.parts.some((part) => part.type === "compaction"))
|
||||
break
|
||||
if (msg.info.role === "assistant" && msg.info.summary && msg.info.finish && !msg.info.error)
|
||||
|
||||
@@ -5,18 +5,22 @@ import type { MessageV2 } from "./message-v2"
|
||||
|
||||
const COMPACTION_BUFFER = 20_000
|
||||
|
||||
export function isOverflow(input: { cfg: Config.Info; tokens: MessageV2.Assistant["tokens"]; model: Provider.Model }) {
|
||||
if (input.cfg.compaction?.auto === false) return false
|
||||
export function usable(input: { cfg: Config.Info; model: Provider.Model }) {
|
||||
const context = input.model.limit.context
|
||||
if (context === 0) return false
|
||||
|
||||
const count =
|
||||
input.tokens.total || input.tokens.input + input.tokens.output + input.tokens.cache.read + input.tokens.cache.write
|
||||
if (context === 0) return 0
|
||||
|
||||
const reserved =
|
||||
input.cfg.compaction?.reserved ?? Math.min(COMPACTION_BUFFER, ProviderTransform.maxOutputTokens(input.model))
|
||||
const usable = input.model.limit.input
|
||||
? input.model.limit.input - reserved
|
||||
: context - ProviderTransform.maxOutputTokens(input.model)
|
||||
return count >= usable
|
||||
return input.model.limit.input
|
||||
? Math.max(0, input.model.limit.input - reserved)
|
||||
: Math.max(0, context - ProviderTransform.maxOutputTokens(input.model))
|
||||
}
|
||||
|
||||
export function isOverflow(input: { cfg: Config.Info; tokens: MessageV2.Assistant["tokens"]; model: Provider.Model }) {
|
||||
if (input.cfg.compaction?.auto === false) return false
|
||||
if (input.model.limit.context === 0) return false
|
||||
|
||||
const count =
|
||||
input.tokens.total || input.tokens.input + input.tokens.output + input.tokens.cache.read + input.tokens.cache.write
|
||||
return count >= usable(input)
|
||||
}
|
||||
|
||||
@@ -167,7 +167,19 @@ function layer(result: "continue" | "compact") {
|
||||
)
|
||||
}
|
||||
|
||||
function runtime(result: "continue" | "compact", plugin = Plugin.defaultLayer, provider = ProviderTest.fake()) {
|
||||
function cfg(compaction?: Config.Info["compaction"]) {
|
||||
const base = Config.Info.parse({})
|
||||
return Layer.mock(Config.Service)({
|
||||
get: () => Effect.succeed({ ...base, compaction }),
|
||||
})
|
||||
}
|
||||
|
||||
function runtime(
|
||||
result: "continue" | "compact",
|
||||
plugin = Plugin.defaultLayer,
|
||||
provider = ProviderTest.fake(),
|
||||
config = Config.defaultLayer,
|
||||
) {
|
||||
const bus = Bus.layer
|
||||
return ManagedRuntime.make(
|
||||
Layer.mergeAll(SessionCompaction.layer, bus).pipe(
|
||||
@@ -177,7 +189,7 @@ function runtime(result: "continue" | "compact", plugin = Plugin.defaultLayer, p
|
||||
Layer.provide(Agent.defaultLayer),
|
||||
Layer.provide(plugin),
|
||||
Layer.provide(bus),
|
||||
Layer.provide(Config.defaultLayer),
|
||||
Layer.provide(config),
|
||||
),
|
||||
)
|
||||
}
|
||||
@@ -221,7 +233,7 @@ function llm() {
|
||||
}
|
||||
}
|
||||
|
||||
function liveRuntime(layer: Layer.Layer<LLM.Service>, provider = ProviderTest.fake()) {
|
||||
function liveRuntime(layer: Layer.Layer<LLM.Service>, provider = ProviderTest.fake(), config = Config.defaultLayer) {
|
||||
const bus = Bus.layer
|
||||
const status = SessionStatus.layer.pipe(Layer.provide(bus))
|
||||
const processor = SessionProcessorModule.SessionProcessor.layer.pipe(Layer.provide(summary))
|
||||
@@ -236,11 +248,66 @@ function liveRuntime(layer: Layer.Layer<LLM.Service>, provider = ProviderTest.fa
|
||||
Layer.provide(Plugin.defaultLayer),
|
||||
Layer.provide(status),
|
||||
Layer.provide(bus),
|
||||
Layer.provide(Config.defaultLayer),
|
||||
Layer.provide(config),
|
||||
),
|
||||
)
|
||||
}
|
||||
|
||||
function reply(
|
||||
text: string,
|
||||
capture?: (input: LLM.StreamInput) => void,
|
||||
): (input: LLM.StreamInput) => Stream.Stream<LLM.Event, unknown> {
|
||||
return (input) => {
|
||||
capture?.(input)
|
||||
return Stream.make(
|
||||
{ type: "start" } satisfies LLM.Event,
|
||||
{ type: "text-start", id: "txt-0" } satisfies LLM.Event,
|
||||
{ type: "text-delta", id: "txt-0", delta: text, text } as LLM.Event,
|
||||
{ type: "text-end", id: "txt-0" } satisfies LLM.Event,
|
||||
{
|
||||
type: "finish-step",
|
||||
finishReason: "stop",
|
||||
rawFinishReason: "stop",
|
||||
response: { id: "res", modelId: "test-model", timestamp: new Date() },
|
||||
providerMetadata: undefined,
|
||||
usage: {
|
||||
inputTokens: 1,
|
||||
outputTokens: 1,
|
||||
totalTokens: 2,
|
||||
inputTokenDetails: {
|
||||
noCacheTokens: undefined,
|
||||
cacheReadTokens: undefined,
|
||||
cacheWriteTokens: undefined,
|
||||
},
|
||||
outputTokenDetails: {
|
||||
textTokens: undefined,
|
||||
reasoningTokens: undefined,
|
||||
},
|
||||
},
|
||||
} satisfies LLM.Event,
|
||||
{
|
||||
type: "finish",
|
||||
finishReason: "stop",
|
||||
rawFinishReason: "stop",
|
||||
totalUsage: {
|
||||
inputTokens: 1,
|
||||
outputTokens: 1,
|
||||
totalTokens: 2,
|
||||
inputTokenDetails: {
|
||||
noCacheTokens: undefined,
|
||||
cacheReadTokens: undefined,
|
||||
cacheWriteTokens: undefined,
|
||||
},
|
||||
outputTokenDetails: {
|
||||
textTokens: undefined,
|
||||
reasoningTokens: undefined,
|
||||
},
|
||||
},
|
||||
} satisfies LLM.Event,
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
function wait(ms = 50) {
|
||||
return new Promise((resolve) => setTimeout(resolve, ms))
|
||||
}
|
||||
@@ -835,6 +902,210 @@ describe("session.compaction.process", () => {
|
||||
})
|
||||
})
|
||||
|
||||
test("persists tail_start_id for retained recent turns", async () => {
|
||||
await using tmp = await tmpdir()
|
||||
await Instance.provide({
|
||||
directory: tmp.path,
|
||||
fn: async () => {
|
||||
const session = await svc.create({})
|
||||
await user(session.id, "first")
|
||||
const keep = await user(session.id, "second")
|
||||
await user(session.id, "third")
|
||||
await SessionCompaction.create({
|
||||
sessionID: session.id,
|
||||
agent: "build",
|
||||
model: ref,
|
||||
auto: false,
|
||||
})
|
||||
|
||||
const rt = runtime("continue", Plugin.defaultLayer, wide(), cfg({ tail_turns: 2, tail_tokens: 10_000 }))
|
||||
try {
|
||||
const msgs = await svc.messages({ sessionID: session.id })
|
||||
const parent = msgs.at(-1)?.info.id
|
||||
expect(parent).toBeTruthy()
|
||||
await rt.runPromise(
|
||||
SessionCompaction.Service.use((svc) =>
|
||||
svc.process({
|
||||
parentID: parent!,
|
||||
messages: msgs,
|
||||
sessionID: session.id,
|
||||
auto: false,
|
||||
}),
|
||||
),
|
||||
)
|
||||
|
||||
const part = (await svc.messages({ sessionID: session.id }))
|
||||
.at(-2)
|
||||
?.parts.find((item) => item.type === "compaction")
|
||||
|
||||
expect(part?.type).toBe("compaction")
|
||||
if (part?.type === "compaction") expect(part.tail_start_id).toBe(keep.id)
|
||||
} finally {
|
||||
await rt.dispose()
|
||||
}
|
||||
},
|
||||
})
|
||||
})
|
||||
|
||||
test("shrinks retained tail to fit tail token budget", async () => {
|
||||
await using tmp = await tmpdir()
|
||||
await Instance.provide({
|
||||
directory: tmp.path,
|
||||
fn: async () => {
|
||||
const session = await svc.create({})
|
||||
await user(session.id, "first")
|
||||
await user(session.id, "x".repeat(2_000))
|
||||
const keep = await user(session.id, "tiny")
|
||||
await SessionCompaction.create({
|
||||
sessionID: session.id,
|
||||
agent: "build",
|
||||
model: ref,
|
||||
auto: false,
|
||||
})
|
||||
|
||||
const rt = runtime("continue", Plugin.defaultLayer, wide(), cfg({ tail_turns: 2, tail_tokens: 100 }))
|
||||
try {
|
||||
const msgs = await svc.messages({ sessionID: session.id })
|
||||
const parent = msgs.at(-1)?.info.id
|
||||
expect(parent).toBeTruthy()
|
||||
await rt.runPromise(
|
||||
SessionCompaction.Service.use((svc) =>
|
||||
svc.process({
|
||||
parentID: parent!,
|
||||
messages: msgs,
|
||||
sessionID: session.id,
|
||||
auto: false,
|
||||
}),
|
||||
),
|
||||
)
|
||||
|
||||
const part = (await svc.messages({ sessionID: session.id }))
|
||||
.at(-2)
|
||||
?.parts.find((item) => item.type === "compaction")
|
||||
|
||||
expect(part?.type).toBe("compaction")
|
||||
if (part?.type === "compaction") expect(part.tail_start_id).toBe(keep.id)
|
||||
} finally {
|
||||
await rt.dispose()
|
||||
}
|
||||
},
|
||||
})
|
||||
})
|
||||
|
||||
test("falls back to full summary when even one recent turn exceeds tail budget", async () => {
|
||||
await using tmp = await tmpdir({ git: true })
|
||||
const stub = llm()
|
||||
let captured = ""
|
||||
stub.push(
|
||||
reply("summary", (input) => {
|
||||
captured = JSON.stringify(input.messages)
|
||||
}),
|
||||
)
|
||||
await Instance.provide({
|
||||
directory: tmp.path,
|
||||
fn: async () => {
|
||||
const session = await svc.create({})
|
||||
await user(session.id, "first")
|
||||
await user(session.id, "y".repeat(2_000))
|
||||
await SessionCompaction.create({
|
||||
sessionID: session.id,
|
||||
agent: "build",
|
||||
model: ref,
|
||||
auto: false,
|
||||
})
|
||||
|
||||
const rt = liveRuntime(stub.layer, wide(), cfg({ tail_turns: 1, tail_tokens: 20 }))
|
||||
try {
|
||||
const msgs = await svc.messages({ sessionID: session.id })
|
||||
const parent = msgs.at(-1)?.info.id
|
||||
expect(parent).toBeTruthy()
|
||||
await rt.runPromise(
|
||||
SessionCompaction.Service.use((svc) =>
|
||||
svc.process({
|
||||
parentID: parent!,
|
||||
messages: msgs,
|
||||
sessionID: session.id,
|
||||
auto: false,
|
||||
}),
|
||||
),
|
||||
)
|
||||
|
||||
const part = (await svc.messages({ sessionID: session.id }))
|
||||
.at(-2)
|
||||
?.parts.find((item) => item.type === "compaction")
|
||||
|
||||
expect(part?.type).toBe("compaction")
|
||||
if (part?.type === "compaction") expect(part.tail_start_id).toBeUndefined()
|
||||
expect(captured).toContain("yyyy")
|
||||
} finally {
|
||||
await rt.dispose()
|
||||
}
|
||||
},
|
||||
})
|
||||
})
|
||||
|
||||
test("falls back to full summary when retained tail media exceeds tail budget", async () => {
|
||||
await using tmp = await tmpdir({ git: true })
|
||||
const stub = llm()
|
||||
let captured = ""
|
||||
stub.push(
|
||||
reply("summary", (input) => {
|
||||
captured = JSON.stringify(input.messages)
|
||||
}),
|
||||
)
|
||||
await Instance.provide({
|
||||
directory: tmp.path,
|
||||
fn: async () => {
|
||||
const session = await svc.create({})
|
||||
await user(session.id, "older")
|
||||
const recent = await user(session.id, "recent image turn")
|
||||
await svc.updatePart({
|
||||
id: PartID.ascending(),
|
||||
messageID: recent.id,
|
||||
sessionID: session.id,
|
||||
type: "file",
|
||||
mime: "image/png",
|
||||
filename: "big.png",
|
||||
url: `data:image/png;base64,${"a".repeat(4_000)}`,
|
||||
})
|
||||
await SessionCompaction.create({
|
||||
sessionID: session.id,
|
||||
agent: "build",
|
||||
model: ref,
|
||||
auto: false,
|
||||
})
|
||||
|
||||
const rt = liveRuntime(stub.layer, wide(), cfg({ tail_turns: 1, tail_tokens: 100 }))
|
||||
try {
|
||||
const msgs = await svc.messages({ sessionID: session.id })
|
||||
const parent = msgs.at(-1)?.info.id
|
||||
expect(parent).toBeTruthy()
|
||||
await rt.runPromise(
|
||||
SessionCompaction.Service.use((svc) =>
|
||||
svc.process({
|
||||
parentID: parent!,
|
||||
messages: msgs,
|
||||
sessionID: session.id,
|
||||
auto: false,
|
||||
}),
|
||||
),
|
||||
)
|
||||
|
||||
const part = (await svc.messages({ sessionID: session.id }))
|
||||
.at(-2)
|
||||
?.parts.find((item) => item.type === "compaction")
|
||||
|
||||
expect(part?.type).toBe("compaction")
|
||||
if (part?.type === "compaction") expect(part.tail_start_id).toBeUndefined()
|
||||
expect(captured).toContain("recent image turn")
|
||||
expect(captured).toContain("Attached image/png: big.png")
|
||||
} finally {
|
||||
await rt.dispose()
|
||||
}
|
||||
},
|
||||
})
|
||||
})
|
||||
|
||||
test("allows plugins to disable synthetic continue prompt", async () => {
|
||||
await using tmp = await tmpdir()
|
||||
await Instance.provide({
|
||||
@@ -1195,6 +1466,132 @@ describe("session.compaction.process", () => {
|
||||
},
|
||||
})
|
||||
})
|
||||
|
||||
test("summarizes only the head while keeping recent tail out of summary input", async () => {
|
||||
const stub = llm()
|
||||
let captured = ""
|
||||
stub.push(
|
||||
reply("summary", (input) => {
|
||||
captured = JSON.stringify(input.messages)
|
||||
}),
|
||||
)
|
||||
|
||||
await using tmp = await tmpdir({ git: true })
|
||||
await Instance.provide({
|
||||
directory: tmp.path,
|
||||
fn: async () => {
|
||||
const session = await svc.create({})
|
||||
await user(session.id, "older context")
|
||||
await user(session.id, "keep this turn")
|
||||
await user(session.id, "and this one too")
|
||||
await SessionCompaction.create({
|
||||
sessionID: session.id,
|
||||
agent: "build",
|
||||
model: ref,
|
||||
auto: false,
|
||||
})
|
||||
|
||||
const rt = liveRuntime(stub.layer, wide())
|
||||
try {
|
||||
const msgs = await svc.messages({ sessionID: session.id })
|
||||
const parent = msgs.at(-1)?.info.id
|
||||
expect(parent).toBeTruthy()
|
||||
await rt.runPromise(
|
||||
SessionCompaction.Service.use((svc) =>
|
||||
svc.process({
|
||||
parentID: parent!,
|
||||
messages: msgs,
|
||||
sessionID: session.id,
|
||||
auto: false,
|
||||
}),
|
||||
),
|
||||
)
|
||||
|
||||
expect(captured).toContain("older context")
|
||||
expect(captured).not.toContain("keep this turn")
|
||||
expect(captured).not.toContain("and this one too")
|
||||
expect(captured).not.toContain("What did we do so far?")
|
||||
} finally {
|
||||
await rt.dispose()
|
||||
}
|
||||
},
|
||||
})
|
||||
})
|
||||
|
||||
test("keeps recent pre-compaction turns across repeated compactions", async () => {
|
||||
const stub = llm()
|
||||
stub.push(reply("summary one"))
|
||||
stub.push(reply("summary two"))
|
||||
await using tmp = await tmpdir()
|
||||
await Instance.provide({
|
||||
directory: tmp.path,
|
||||
fn: async () => {
|
||||
const session = await svc.create({})
|
||||
const u1 = await user(session.id, "one")
|
||||
const u2 = await user(session.id, "two")
|
||||
const u3 = await user(session.id, "three")
|
||||
await SessionCompaction.create({
|
||||
sessionID: session.id,
|
||||
agent: "build",
|
||||
model: ref,
|
||||
auto: false,
|
||||
})
|
||||
|
||||
const rt = liveRuntime(stub.layer, wide(), cfg({ tail_turns: 2, tail_tokens: 10_000 }))
|
||||
try {
|
||||
let msgs = await svc.messages({ sessionID: session.id })
|
||||
let parent = msgs.at(-1)?.info.id
|
||||
expect(parent).toBeTruthy()
|
||||
await rt.runPromise(
|
||||
SessionCompaction.Service.use((svc) =>
|
||||
svc.process({
|
||||
parentID: parent!,
|
||||
messages: msgs,
|
||||
sessionID: session.id,
|
||||
auto: false,
|
||||
}),
|
||||
),
|
||||
)
|
||||
|
||||
const u4 = await user(session.id, "four")
|
||||
await SessionCompaction.create({
|
||||
sessionID: session.id,
|
||||
agent: "build",
|
||||
model: ref,
|
||||
auto: false,
|
||||
})
|
||||
|
||||
msgs = MessageV2.filterCompacted(MessageV2.stream(session.id))
|
||||
parent = msgs.at(-1)?.info.id
|
||||
expect(parent).toBeTruthy()
|
||||
await rt.runPromise(
|
||||
SessionCompaction.Service.use((svc) =>
|
||||
svc.process({
|
||||
parentID: parent!,
|
||||
messages: msgs,
|
||||
sessionID: session.id,
|
||||
auto: false,
|
||||
}),
|
||||
),
|
||||
)
|
||||
|
||||
const filtered = MessageV2.filterCompacted(MessageV2.stream(session.id))
|
||||
const ids = filtered.map((msg) => msg.info.id)
|
||||
|
||||
expect(ids).not.toContain(u1.id)
|
||||
expect(ids).not.toContain(u2.id)
|
||||
expect(ids).toContain(u3.id)
|
||||
expect(ids).toContain(u4.id)
|
||||
expect(filtered.some((msg) => msg.info.role === "assistant" && msg.info.summary)).toBe(true)
|
||||
expect(
|
||||
filtered.some((msg) => msg.info.role === "user" && msg.parts.some((part) => part.type === "compaction")),
|
||||
).toBe(true)
|
||||
} finally {
|
||||
await rt.dispose()
|
||||
}
|
||||
},
|
||||
})
|
||||
})
|
||||
})
|
||||
|
||||
describe("util.token.estimate", () => {
|
||||
|
||||
@@ -107,13 +107,14 @@ async function addAssistant(
|
||||
return id
|
||||
}
|
||||
|
||||
async function addCompactionPart(sessionID: SessionID, messageID: MessageID) {
|
||||
async function addCompactionPart(sessionID: SessionID, messageID: MessageID, tailStartID?: MessageID) {
|
||||
await svc.updatePart({
|
||||
id: PartID.ascending(),
|
||||
sessionID,
|
||||
messageID,
|
||||
type: "compaction",
|
||||
auto: true,
|
||||
tail_start_id: tailStartID,
|
||||
} as any)
|
||||
}
|
||||
|
||||
@@ -780,6 +781,139 @@ describe("MessageV2.filterCompacted", () => {
|
||||
})
|
||||
})
|
||||
|
||||
test("retains original tail when compaction stores tail_start_id", async () => {
|
||||
await Instance.provide({
|
||||
directory: root,
|
||||
fn: async () => {
|
||||
const session = await svc.create({})
|
||||
|
||||
const u1 = await addUser(session.id, "first")
|
||||
const a1 = await addAssistant(session.id, u1, { finish: "end_turn" })
|
||||
await svc.updatePart({
|
||||
id: PartID.ascending(),
|
||||
sessionID: session.id,
|
||||
messageID: a1,
|
||||
type: "text",
|
||||
text: "first reply",
|
||||
})
|
||||
|
||||
const u2 = await addUser(session.id, "second")
|
||||
const a2 = await addAssistant(session.id, u2, { finish: "end_turn" })
|
||||
await svc.updatePart({
|
||||
id: PartID.ascending(),
|
||||
sessionID: session.id,
|
||||
messageID: a2,
|
||||
type: "text",
|
||||
text: "second reply",
|
||||
})
|
||||
|
||||
const c1 = await addUser(session.id)
|
||||
await addCompactionPart(session.id, c1, u2)
|
||||
const s1 = await addAssistant(session.id, c1, { summary: true, finish: "end_turn" })
|
||||
await svc.updatePart({
|
||||
id: PartID.ascending(),
|
||||
sessionID: session.id,
|
||||
messageID: s1,
|
||||
type: "text",
|
||||
text: "summary",
|
||||
})
|
||||
|
||||
const u3 = await addUser(session.id, "third")
|
||||
const a3 = await addAssistant(session.id, u3, { finish: "end_turn" })
|
||||
await svc.updatePart({
|
||||
id: PartID.ascending(),
|
||||
sessionID: session.id,
|
||||
messageID: a3,
|
||||
type: "text",
|
||||
text: "third reply",
|
||||
})
|
||||
|
||||
const result = MessageV2.filterCompacted(MessageV2.stream(session.id))
|
||||
|
||||
expect(result.map((item) => item.info.id)).toEqual([u2, a2, c1, s1, u3, a3])
|
||||
|
||||
await svc.remove(session.id)
|
||||
},
|
||||
})
|
||||
})
|
||||
|
||||
test("prefers latest compaction boundary when repeated compactions exist", async () => {
|
||||
await Instance.provide({
|
||||
directory: root,
|
||||
fn: async () => {
|
||||
const session = await svc.create({})
|
||||
|
||||
const u1 = await addUser(session.id, "first")
|
||||
const a1 = await addAssistant(session.id, u1, { finish: "end_turn" })
|
||||
await svc.updatePart({
|
||||
id: PartID.ascending(),
|
||||
sessionID: session.id,
|
||||
messageID: a1,
|
||||
type: "text",
|
||||
text: "first reply",
|
||||
})
|
||||
|
||||
const u2 = await addUser(session.id, "second")
|
||||
const a2 = await addAssistant(session.id, u2, { finish: "end_turn" })
|
||||
await svc.updatePart({
|
||||
id: PartID.ascending(),
|
||||
sessionID: session.id,
|
||||
messageID: a2,
|
||||
type: "text",
|
||||
text: "second reply",
|
||||
})
|
||||
|
||||
const c1 = await addUser(session.id)
|
||||
await addCompactionPart(session.id, c1, u2)
|
||||
const s1 = await addAssistant(session.id, c1, { summary: true, finish: "end_turn" })
|
||||
await svc.updatePart({
|
||||
id: PartID.ascending(),
|
||||
sessionID: session.id,
|
||||
messageID: s1,
|
||||
type: "text",
|
||||
text: "summary one",
|
||||
})
|
||||
|
||||
const u3 = await addUser(session.id, "third")
|
||||
const a3 = await addAssistant(session.id, u3, { finish: "end_turn" })
|
||||
await svc.updatePart({
|
||||
id: PartID.ascending(),
|
||||
sessionID: session.id,
|
||||
messageID: a3,
|
||||
type: "text",
|
||||
text: "third reply",
|
||||
})
|
||||
|
||||
const c2 = await addUser(session.id)
|
||||
await addCompactionPart(session.id, c2, u3)
|
||||
const s2 = await addAssistant(session.id, c2, { summary: true, finish: "end_turn" })
|
||||
await svc.updatePart({
|
||||
id: PartID.ascending(),
|
||||
sessionID: session.id,
|
||||
messageID: s2,
|
||||
type: "text",
|
||||
text: "summary two",
|
||||
})
|
||||
|
||||
const u4 = await addUser(session.id, "fourth")
|
||||
const a4 = await addAssistant(session.id, u4, { finish: "end_turn" })
|
||||
await svc.updatePart({
|
||||
id: PartID.ascending(),
|
||||
sessionID: session.id,
|
||||
messageID: a4,
|
||||
type: "text",
|
||||
text: "fourth reply",
|
||||
})
|
||||
|
||||
const result = MessageV2.filterCompacted(MessageV2.stream(session.id))
|
||||
|
||||
expect(result.map((item) => item.info.id)).toEqual([u3, a3, c2, s2, u4, a4])
|
||||
|
||||
await svc.remove(session.id)
|
||||
},
|
||||
})
|
||||
})
|
||||
|
||||
test("works with array input", () => {
|
||||
// filterCompacted accepts any Iterable, not just generators
|
||||
const id = MessageID.ascending()
|
||||
|
||||
Reference in New Issue
Block a user