refactor(session): remove compaction async facade exports (#22366)

This commit is contained in:
Kit Langton
2026-04-13 21:23:34 -04:00
committed by GitHub
parent 36745caa2a
commit 7a05ba47d1
2 changed files with 303 additions and 178 deletions

View File

@@ -9,14 +9,12 @@ import z from "zod"
import { Token } from "../util/token"
import { Log } from "../util/log"
import { SessionProcessor } from "./processor"
import { fn } from "@/util/fn"
import { Agent } from "@/agent/agent"
import { Plugin } from "@/plugin"
import { Config } from "@/config/config"
import { NotFoundError } from "@/storage/db"
import { ModelID, ProviderID } from "@/provider/schema"
import { Effect, Layer, Context } from "effect"
import { makeRuntime } from "@/effect/run-service"
import { InstanceState } from "@/effect/instance-state"
import { isOverflow as overflow } from "./overflow"
@@ -408,25 +406,4 @@ When constructing the summary, try to stick to this template:
Layer.provide(Config.defaultLayer),
),
)
const { runPromise } = makeRuntime(Service, defaultLayer)
export async function isOverflow(input: { tokens: MessageV2.Assistant["tokens"]; model: Provider.Model }) {
return runPromise((svc) => svc.isOverflow(input))
}
export async function prune(input: { sessionID: SessionID }) {
return runPromise((svc) => svc.prune(input))
}
export const create = fn(
z.object({
sessionID: SessionID.zod,
agent: z.string(),
model: z.object({ providerID: ProviderID.zod, modelID: ModelID.zod }),
auto: z.boolean(),
overflow: z.boolean().optional(),
}),
(input) => runPromise((svc) => svc.create(input)),
)
}

View File

@@ -13,7 +13,7 @@ import { Instance } from "../../src/project/instance"
import { Log } from "../../src/util/log"
import { Permission } from "../../src/permission"
import { Plugin } from "../../src/plugin"
import { tmpdir } from "../fixture/fixture"
import { provideTmpdirInstance, tmpdir } from "../fixture/fixture"
import { Session } from "../../src/session"
import { MessageV2 } from "../../src/session/message-v2"
import { MessageID, PartID, SessionID } from "../../src/session/schema"
@@ -24,6 +24,8 @@ import type { Provider } from "../../src/provider/provider"
import * as SessionProcessorModule from "../../src/session/processor"
import { Snapshot } from "../../src/snapshot"
import { ProviderTest } from "../fake/provider"
import { testEffect } from "../lib/effect"
import * as CrossSpawnSpawner from "../../src/effect/cross-spawn-spawner"
Log.init({ print: false })
@@ -179,6 +181,23 @@ function runtime(result: "continue" | "compact", plugin = Plugin.defaultLayer, p
)
}
const deps = Layer.mergeAll(
ProviderTest.fake().layer,
layer("continue"),
Agent.defaultLayer,
Plugin.defaultLayer,
Bus.layer,
Config.defaultLayer,
)
const env = Layer.mergeAll(
Session.defaultLayer,
CrossSpawnSpawner.defaultLayer,
SessionCompaction.layer.pipe(Layer.provide(Session.defaultLayer), Layer.provideMerge(deps)),
)
const it = testEffect(env)
function llm() {
const queue: Array<
Stream.Stream<LLM.Event, unknown> | ((input: LLM.StreamInput) => Stream.Stream<LLM.Event, unknown>)
@@ -259,77 +278,77 @@ function autocontinue(enabled: boolean) {
}
describe("session.compaction.isOverflow", () => {
test("returns true when token count exceeds usable context", async () => {
await using tmp = await tmpdir()
await Instance.provide({
directory: tmp.path,
fn: async () => {
it.live(
"returns true when token count exceeds usable context",
provideTmpdirInstance(() =>
Effect.gen(function* () {
const compact = yield* SessionCompaction.Service
const model = createModel({ context: 100_000, output: 32_000 })
const tokens = { input: 75_000, output: 5_000, reasoning: 0, cache: { read: 0, write: 0 } }
expect(await SessionCompaction.isOverflow({ tokens, model })).toBe(true)
},
})
})
expect(yield* compact.isOverflow({ tokens, model })).toBe(true)
}),
),
)
test("returns false when token count within usable context", async () => {
await using tmp = await tmpdir()
await Instance.provide({
directory: tmp.path,
fn: async () => {
it.live(
"returns false when token count within usable context",
provideTmpdirInstance(() =>
Effect.gen(function* () {
const compact = yield* SessionCompaction.Service
const model = createModel({ context: 200_000, output: 32_000 })
const tokens = { input: 100_000, output: 10_000, reasoning: 0, cache: { read: 0, write: 0 } }
expect(await SessionCompaction.isOverflow({ tokens, model })).toBe(false)
},
})
})
expect(yield* compact.isOverflow({ tokens, model })).toBe(false)
}),
),
)
test("includes cache.read in token count", async () => {
await using tmp = await tmpdir()
await Instance.provide({
directory: tmp.path,
fn: async () => {
it.live(
"includes cache.read in token count",
provideTmpdirInstance(() =>
Effect.gen(function* () {
const compact = yield* SessionCompaction.Service
const model = createModel({ context: 100_000, output: 32_000 })
const tokens = { input: 60_000, output: 10_000, reasoning: 0, cache: { read: 10_000, write: 0 } }
expect(await SessionCompaction.isOverflow({ tokens, model })).toBe(true)
},
})
})
expect(yield* compact.isOverflow({ tokens, model })).toBe(true)
}),
),
)
test("respects input limit for input caps", async () => {
await using tmp = await tmpdir()
await Instance.provide({
directory: tmp.path,
fn: async () => {
it.live(
"respects input limit for input caps",
provideTmpdirInstance(() =>
Effect.gen(function* () {
const compact = yield* SessionCompaction.Service
const model = createModel({ context: 400_000, input: 272_000, output: 128_000 })
const tokens = { input: 271_000, output: 1_000, reasoning: 0, cache: { read: 2_000, write: 0 } }
expect(await SessionCompaction.isOverflow({ tokens, model })).toBe(true)
},
})
})
expect(yield* compact.isOverflow({ tokens, model })).toBe(true)
}),
),
)
test("returns false when input/output are within input caps", async () => {
await using tmp = await tmpdir()
await Instance.provide({
directory: tmp.path,
fn: async () => {
it.live(
"returns false when input/output are within input caps",
provideTmpdirInstance(() =>
Effect.gen(function* () {
const compact = yield* SessionCompaction.Service
const model = createModel({ context: 400_000, input: 272_000, output: 128_000 })
const tokens = { input: 200_000, output: 20_000, reasoning: 0, cache: { read: 10_000, write: 0 } }
expect(await SessionCompaction.isOverflow({ tokens, model })).toBe(false)
},
})
})
expect(yield* compact.isOverflow({ tokens, model })).toBe(false)
}),
),
)
test("returns false when output within limit with input caps", async () => {
await using tmp = await tmpdir()
await Instance.provide({
directory: tmp.path,
fn: async () => {
it.live(
"returns false when output within limit with input caps",
provideTmpdirInstance(() =>
Effect.gen(function* () {
const compact = yield* SessionCompaction.Service
const model = createModel({ context: 200_000, input: 120_000, output: 10_000 })
const tokens = { input: 50_000, output: 9_999, reasoning: 0, cache: { read: 0, write: 0 } }
expect(await SessionCompaction.isOverflow({ tokens, model })).toBe(false)
},
})
})
expect(yield* compact.isOverflow({ tokens, model })).toBe(false)
}),
),
)
// ─── Bug reproduction tests ───────────────────────────────────────────
// These tests demonstrate that when limit.input is set, isOverflow()
@@ -343,11 +362,11 @@ describe("session.compaction.isOverflow", () => {
// Related issues: #10634, #8089, #11086, #12621
// Open PRs: #6875, #12924
test("BUG: no headroom when limit.input is set — compaction should trigger near boundary but does not", async () => {
await using tmp = await tmpdir()
await Instance.provide({
directory: tmp.path,
fn: async () => {
it.live(
"BUG: no headroom when limit.input is set — compaction should trigger near boundary but does not",
provideTmpdirInstance(() =>
Effect.gen(function* () {
const compact = yield* SessionCompaction.Service
// Simulate Claude with prompt caching: input limit = 200K, output limit = 32K
const model = createModel({ context: 200_000, input: 200_000, output: 32_000 })
@@ -364,16 +383,16 @@ describe("session.compaction.isOverflow", () => {
// With 198K used and only 2K headroom, the next turn will overflow.
// Compaction MUST trigger here.
expect(await SessionCompaction.isOverflow({ tokens, model })).toBe(true)
},
})
})
expect(yield* compact.isOverflow({ tokens, model })).toBe(true)
}),
),
)
test("BUG: without limit.input, same token count correctly triggers compaction", async () => {
await using tmp = await tmpdir()
await Instance.provide({
directory: tmp.path,
fn: async () => {
it.live(
"BUG: without limit.input, same token count correctly triggers compaction",
provideTmpdirInstance(() =>
Effect.gen(function* () {
const compact = yield* SessionCompaction.Service
// Same model but without limit.input — uses context - output instead
const model = createModel({ context: 200_000, output: 32_000 })
@@ -383,17 +402,17 @@ describe("session.compaction.isOverflow", () => {
// usable = context - output = 200K - 32K = 168K
// 198K > 168K = true → compaction correctly triggered
const result = await SessionCompaction.isOverflow({ tokens, model })
const result = yield* compact.isOverflow({ tokens, model })
expect(result).toBe(true) // ← Correct: headroom is reserved
},
})
})
}),
),
)
test("BUG: asymmetry — limit.input model allows 30K more usage before compaction than equivalent model without it", async () => {
await using tmp = await tmpdir()
await Instance.provide({
directory: tmp.path,
fn: async () => {
it.live(
"BUG: asymmetry — limit.input model allows 30K more usage before compaction than equivalent model without it",
provideTmpdirInstance(() =>
Effect.gen(function* () {
const compact = yield* SessionCompaction.Service
// Two models with identical context/output limits, differing only in limit.input
const withInputLimit = createModel({ context: 200_000, input: 200_000, output: 32_000 })
const withoutInputLimit = createModel({ context: 200_000, output: 32_000 })
@@ -401,67 +420,66 @@ describe("session.compaction.isOverflow", () => {
// 170K total tokens — well above context-output (168K) but below input limit (200K)
const tokens = { input: 166_000, output: 10_000, reasoning: 0, cache: { read: 5_000, write: 0 } }
const withLimit = await SessionCompaction.isOverflow({ tokens, model: withInputLimit })
const withoutLimit = await SessionCompaction.isOverflow({ tokens, model: withoutInputLimit })
const withLimit = yield* compact.isOverflow({ tokens, model: withInputLimit })
const withoutLimit = yield* compact.isOverflow({ tokens, model: withoutInputLimit })
// Both models have identical real capacity — they should agree:
expect(withLimit).toBe(true) // should compact (170K leaves no room for 32K output)
expect(withoutLimit).toBe(true) // correctly compacts (170K > 168K)
},
})
})
}),
),
)
test("returns false when model context limit is 0", async () => {
await using tmp = await tmpdir()
await Instance.provide({
directory: tmp.path,
fn: async () => {
it.live(
"returns false when model context limit is 0",
provideTmpdirInstance(() =>
Effect.gen(function* () {
const compact = yield* SessionCompaction.Service
const model = createModel({ context: 0, output: 32_000 })
const tokens = { input: 100_000, output: 10_000, reasoning: 0, cache: { read: 0, write: 0 } }
expect(await SessionCompaction.isOverflow({ tokens, model })).toBe(false)
},
})
})
expect(yield* compact.isOverflow({ tokens, model })).toBe(false)
}),
),
)
test("returns false when compaction.auto is disabled", async () => {
await using tmp = await tmpdir({
init: async (dir) => {
await Bun.write(
path.join(dir, "opencode.json"),
JSON.stringify({
compaction: { auto: false },
}),
)
it.live(
"returns false when compaction.auto is disabled",
provideTmpdirInstance(
() =>
Effect.gen(function* () {
const compact = yield* SessionCompaction.Service
const model = createModel({ context: 100_000, output: 32_000 })
const tokens = { input: 75_000, output: 5_000, reasoning: 0, cache: { read: 0, write: 0 } }
expect(yield* compact.isOverflow({ tokens, model })).toBe(false)
}),
{
config: {
compaction: { auto: false },
},
},
})
await Instance.provide({
directory: tmp.path,
fn: async () => {
const model = createModel({ context: 100_000, output: 32_000 })
const tokens = { input: 75_000, output: 5_000, reasoning: 0, cache: { read: 0, write: 0 } }
expect(await SessionCompaction.isOverflow({ tokens, model })).toBe(false)
},
})
})
),
)
})
describe("session.compaction.create", () => {
test("creates a compaction user message and part", async () => {
await using tmp = await tmpdir()
await Instance.provide({
directory: tmp.path,
fn: async () => {
const session = await Session.create({})
it.live(
"creates a compaction user message and part",
provideTmpdirInstance(() =>
Effect.gen(function* () {
const compact = yield* SessionCompaction.Service
const session = yield* Session.Service
await SessionCompaction.create({
sessionID: session.id,
const info = yield* session.create({})
yield* compact.create({
sessionID: info.id,
agent: "build",
model: ref,
auto: true,
overflow: true,
})
const msgs = await Session.messages({ sessionID: session.id })
const msgs = yield* session.messages({ sessionID: info.id })
expect(msgs).toHaveLength(1)
expect(msgs[0].info.role).toBe("user")
expect(msgs[0].parts).toHaveLength(1)
@@ -470,60 +488,190 @@ describe("session.compaction.create", () => {
auto: true,
overflow: true,
})
},
})
})
}),
),
)
})
describe("session.compaction.prune", () => {
test("compacts old completed tool output", async () => {
await using tmp = await tmpdir()
await Instance.provide({
directory: tmp.path,
fn: async () => {
const session = await Session.create({})
const a = await user(session.id, "first")
const b = await assistant(session.id, a.id, tmp.path)
await tool(session.id, b.id, "bash", "x".repeat(200_000))
await user(session.id, "second")
await user(session.id, "third")
it.live(
"compacts old completed tool output",
provideTmpdirInstance((dir) =>
Effect.gen(function* () {
const compact = yield* SessionCompaction.Service
const session = yield* Session.Service
const info = yield* session.create({})
const a = yield* session.updateMessage({
id: MessageID.ascending(),
role: "user",
sessionID: info.id,
agent: "build",
model: ref,
time: { created: Date.now() },
})
yield* session.updatePart({
id: PartID.ascending(),
messageID: a.id,
sessionID: info.id,
type: "text",
text: "first",
})
const b: MessageV2.Assistant = {
id: MessageID.ascending(),
role: "assistant",
sessionID: info.id,
mode: "build",
agent: "build",
path: { cwd: dir, root: dir },
cost: 0,
tokens: {
output: 0,
input: 0,
reasoning: 0,
cache: { read: 0, write: 0 },
},
modelID: ref.modelID,
providerID: ref.providerID,
parentID: a.id,
time: { created: Date.now() },
finish: "end_turn",
}
yield* session.updateMessage(b)
yield* session.updatePart({
id: PartID.ascending(),
messageID: b.id,
sessionID: info.id,
type: "tool",
callID: crypto.randomUUID(),
tool: "bash",
state: {
status: "completed",
input: {},
output: "x".repeat(200_000),
title: "done",
metadata: {},
time: { start: Date.now(), end: Date.now() },
},
})
for (const text of ["second", "third"]) {
const msg = yield* session.updateMessage({
id: MessageID.ascending(),
role: "user",
sessionID: info.id,
agent: "build",
model: ref,
time: { created: Date.now() },
})
yield* session.updatePart({
id: PartID.ascending(),
messageID: msg.id,
sessionID: info.id,
type: "text",
text,
})
}
await SessionCompaction.prune({ sessionID: session.id })
yield* compact.prune({ sessionID: info.id })
const msgs = await Session.messages({ sessionID: session.id })
const msgs = yield* session.messages({ sessionID: info.id })
const part = msgs.flatMap((msg) => msg.parts).find((part) => part.type === "tool")
expect(part?.type).toBe("tool")
expect(part?.state.status).toBe("completed")
if (part?.type === "tool" && part.state.status === "completed") {
expect(part.state.time.compacted).toBeNumber()
}
},
})
})
}),
),
)
test("skips protected skill tool output", async () => {
await using tmp = await tmpdir()
await Instance.provide({
directory: tmp.path,
fn: async () => {
const session = await Session.create({})
const a = await user(session.id, "first")
const b = await assistant(session.id, a.id, tmp.path)
await tool(session.id, b.id, "skill", "x".repeat(200_000))
await user(session.id, "second")
await user(session.id, "third")
it.live(
"skips protected skill tool output",
provideTmpdirInstance((dir) =>
Effect.gen(function* () {
const compact = yield* SessionCompaction.Service
const session = yield* Session.Service
const info = yield* session.create({})
const a = yield* session.updateMessage({
id: MessageID.ascending(),
role: "user",
sessionID: info.id,
agent: "build",
model: ref,
time: { created: Date.now() },
})
yield* session.updatePart({
id: PartID.ascending(),
messageID: a.id,
sessionID: info.id,
type: "text",
text: "first",
})
const b: MessageV2.Assistant = {
id: MessageID.ascending(),
role: "assistant",
sessionID: info.id,
mode: "build",
agent: "build",
path: { cwd: dir, root: dir },
cost: 0,
tokens: {
output: 0,
input: 0,
reasoning: 0,
cache: { read: 0, write: 0 },
},
modelID: ref.modelID,
providerID: ref.providerID,
parentID: a.id,
time: { created: Date.now() },
finish: "end_turn",
}
yield* session.updateMessage(b)
yield* session.updatePart({
id: PartID.ascending(),
messageID: b.id,
sessionID: info.id,
type: "tool",
callID: crypto.randomUUID(),
tool: "skill",
state: {
status: "completed",
input: {},
output: "x".repeat(200_000),
title: "done",
metadata: {},
time: { start: Date.now(), end: Date.now() },
},
})
for (const text of ["second", "third"]) {
const msg = yield* session.updateMessage({
id: MessageID.ascending(),
role: "user",
sessionID: info.id,
agent: "build",
model: ref,
time: { created: Date.now() },
})
yield* session.updatePart({
id: PartID.ascending(),
messageID: msg.id,
sessionID: info.id,
type: "text",
text,
})
}
await SessionCompaction.prune({ sessionID: session.id })
yield* compact.prune({ sessionID: info.id })
const msgs = await Session.messages({ sessionID: session.id })
const msgs = yield* session.messages({ sessionID: info.id })
const part = msgs.flatMap((msg) => msg.parts).find((part) => part.type === "tool")
expect(part?.type).toBe("tool")
if (part?.type === "tool" && part.state.status === "completed") {
expect(part.state.time.compacted).toBeUndefined()
}
},
})
})
}),
),
)
})
describe("session.compaction.process", () => {