From 11cd4fb63904768da73fad323dc82a83e052f87c Mon Sep 17 00:00:00 2001 From: Dax Raad Date: Sat, 18 Apr 2026 01:46:26 -0400 Subject: [PATCH] core: extract session entry stepping logic into dedicated module Move the step function from session-entry.ts to session-entry-stepper.ts and remove immer dependency. Add static fromEvent factory methods to Synthetic, Assistant, and Compaction classes for cleaner event-to-entry conversion. --- .../opencode/src/v2/session-entry-stepper.ts | 253 ++++++++++++++++++ packages/opencode/src/v2/session-entry.ts | 178 ++---------- ....test.ts => session-entry-stepper.test.ts} | 188 +++++++++++-- 3 files changed, 446 insertions(+), 173 deletions(-) create mode 100644 packages/opencode/src/v2/session-entry-stepper.ts rename packages/opencode/test/session/{session-entry.test.ts => session-entry-stepper.test.ts} (76%) diff --git a/packages/opencode/src/v2/session-entry-stepper.ts b/packages/opencode/src/v2/session-entry-stepper.ts new file mode 100644 index 0000000000..3d642579d0 --- /dev/null +++ b/packages/opencode/src/v2/session-entry-stepper.ts @@ -0,0 +1,253 @@ +import { castDraft, produce, type WritableDraft } from "immer" +import { SessionEvent } from "./session-event" +import { SessionEntry } from "./session-entry" + +export type MemoryState = { + entries: SessionEntry.Entry[] + pending: SessionEntry.Entry[] +} + +export interface Adapter { + readonly getCurrentAssistant: () => SessionEntry.Assistant | undefined + readonly updateAssistant: (assistant: SessionEntry.Assistant) => void + readonly appendEntry: (entry: SessionEntry.Entry) => void + readonly appendPending: (entry: SessionEntry.Entry) => void + readonly finish: () => Result +} + +export function memory(state: MemoryState): Adapter { + const activeAssistantIndex = () => + state.entries.findLastIndex((entry) => entry.type === "assistant" && !entry.time.completed) + + return { + getCurrentAssistant() { + const index = activeAssistantIndex() + if (index < 0) return + const assistant = state.entries[index] + return assistant?.type === "assistant" ? assistant : undefined + }, + updateAssistant(assistant) { + const index = activeAssistantIndex() + if (index < 0) return + const current = state.entries[index] + if (current?.type !== "assistant") return + state.entries[index] = assistant + }, + appendEntry(entry) { + state.entries.push(entry) + }, + appendPending(entry) { + state.pending.push(entry) + }, + finish() { + return state + }, + } +} + +export function stepWith(adapter: Adapter, event: SessionEvent.Event): Result { + const currentAssistant = adapter.getCurrentAssistant() + type DraftAssistant = WritableDraft + type DraftTool = WritableDraft + type DraftText = WritableDraft + type DraftReasoning = WritableDraft + + const latestTool = (assistant: DraftAssistant | undefined, callID?: string) => + assistant?.content.findLast( + (item): item is DraftTool => item.type === "tool" && (callID === undefined || item.callID === callID), + ) + + const latestText = (assistant: DraftAssistant | undefined) => + assistant?.content.findLast((item): item is DraftText => item.type === "text") + + const latestReasoning = (assistant: DraftAssistant | undefined) => + assistant?.content.findLast((item): item is DraftReasoning => item.type === "reasoning") + + SessionEvent.Event.match(event, { + prompt: (event) => { + const entry = SessionEntry.User.fromEvent(event) + if (currentAssistant) { + adapter.appendPending(entry) + return + } + adapter.appendEntry(entry) + }, + synthetic: (event) => { + adapter.appendEntry(SessionEntry.Synthetic.fromEvent(event)) + }, + "step.started": (event) => { + if (currentAssistant) { + adapter.updateAssistant( + produce(currentAssistant, (draft) => { + draft.time.completed = event.timestamp + }), + ) + } + adapter.appendEntry(SessionEntry.Assistant.fromEvent(event)) + }, + "step.ended": (event) => { + if (currentAssistant) { + adapter.updateAssistant( + produce(currentAssistant, (draft) => { + draft.time.completed = event.timestamp + draft.cost = event.cost + draft.tokens = event.tokens + }), + ) + } + }, + "text.started": () => { + if (currentAssistant) { + adapter.updateAssistant( + produce(currentAssistant, (draft) => { + draft.content.push({ + type: "text", + text: "", + }) + }), + ) + } + }, + "text.delta": (event) => { + if (currentAssistant) { + adapter.updateAssistant( + produce(currentAssistant, (draft) => { + const match = latestText(draft) + if (match) match.text += event.delta + }), + ) + } + }, + "text.ended": () => {}, + "tool.input.started": (event) => { + if (currentAssistant) { + adapter.updateAssistant( + produce(currentAssistant, (draft) => { + draft.content.push({ + type: "tool", + callID: event.callID, + name: event.name, + time: { + created: event.timestamp, + }, + state: { + status: "pending", + input: "", + }, + }) + }), + ) + } + }, + "tool.input.delta": (event) => { + if (currentAssistant) { + adapter.updateAssistant( + produce(currentAssistant, (draft) => { + const match = latestTool(draft, event.callID) + // oxlint-disable-next-line no-base-to-string -- event.delta is a Schema.String (runtime string) + if (match && match.state.status === "pending") match.state.input += event.delta + }), + ) + } + }, + "tool.input.ended": () => {}, + "tool.called": (event) => { + if (currentAssistant) { + adapter.updateAssistant( + produce(currentAssistant, (draft) => { + const match = latestTool(draft, event.callID) + if (match) { + match.time.ran = event.timestamp + match.state = { + status: "running", + input: event.input, + } + } + }), + ) + } + }, + "tool.success": (event) => { + if (currentAssistant) { + adapter.updateAssistant( + produce(currentAssistant, (draft) => { + const match = latestTool(draft, event.callID) + if (match && match.state.status === "running") { + match.state = { + status: "completed", + input: match.state.input, + output: event.output ?? "", + title: event.title, + metadata: event.metadata ?? {}, + attachments: [...(event.attachments ?? [])], + } + } + }), + ) + } + }, + "tool.error": (event) => { + if (currentAssistant) { + adapter.updateAssistant( + produce(currentAssistant, (draft) => { + const match = latestTool(draft, event.callID) + if (match && match.state.status === "running") { + match.state = { + status: "error", + error: event.error, + input: match.state.input, + metadata: event.metadata ?? {}, + } + } + }), + ) + } + }, + "reasoning.started": () => { + if (currentAssistant) { + adapter.updateAssistant( + produce(currentAssistant, (draft) => { + draft.content.push({ + type: "reasoning", + text: "", + }) + }), + ) + } + }, + "reasoning.delta": (event) => { + if (currentAssistant) { + adapter.updateAssistant( + produce(currentAssistant, (draft) => { + const match = latestReasoning(draft) + if (match) match.text += event.delta + }), + ) + } + }, + "reasoning.ended": (event) => { + if (currentAssistant) { + adapter.updateAssistant( + produce(currentAssistant, (draft) => { + const match = latestReasoning(draft) + if (match) match.text = event.text + }), + ) + } + }, + retried: () => {}, + compacted: (event) => { + adapter.appendEntry(SessionEntry.Compaction.fromEvent(event)) + }, + }) + + return adapter.finish() +} + +export function step(old: MemoryState, event: SessionEvent.Event): MemoryState { + return produce(old, (draft) => { + stepWith(memory(draft as MemoryState), event) + }) +} + +export * as SessionEntryStepper from "./session-entry-stepper" diff --git a/packages/opencode/src/v2/session-entry.ts b/packages/opencode/src/v2/session-entry.ts index 08122428ae..97c5fc7ce9 100644 --- a/packages/opencode/src/v2/session-entry.ts +++ b/packages/opencode/src/v2/session-entry.ts @@ -1,6 +1,5 @@ import { Schema } from "effect" import { SessionEvent } from "./session-event" -import { castDraft, produce } from "immer" export const ID = SessionEvent.ID export type ID = Schema.Schema.Type @@ -40,7 +39,14 @@ export class Synthetic extends Schema.Class("Session.Entry.Synthetic" ...SessionEvent.Synthetic.fields, ...Base, type: Schema.Literal("synthetic"), -}) {} +}) { + static fromEvent(event: SessionEvent.Synthetic) { + return new Synthetic({ + ...event, + time: { created: event.timestamp }, + }) + } +} export class ToolStatePending extends Schema.Class("Session.Entry.ToolState.Pending")({ status: Schema.Literal("pending"), @@ -122,13 +128,32 @@ export class Assistant extends Schema.Class("Session.Entry.Assistant" created: Schema.DateTimeUtc, completed: Schema.DateTimeUtc.pipe(Schema.optional), }), -}) {} +}) { + static fromEvent(event: SessionEvent.Step.Started) { + return new Assistant({ + id: event.id, + type: "assistant", + time: { + created: event.timestamp, + }, + content: [], + }) + } +} export class Compaction extends Schema.Class("Session.Entry.Compaction")({ ...SessionEvent.Compacted.fields, type: Schema.Literal("compaction"), ...Base, -}) {} +}) { + static fromEvent(event: SessionEvent.Compacted) { + return new Compaction({ + ...event, + type: "compaction", + time: { created: event.timestamp }, + }) + } +} export const Entry = Schema.Union([User, Synthetic, Assistant, Compaction]).pipe(Schema.toTaggedUnion("type")) @@ -136,151 +161,6 @@ export type Entry = Schema.Schema.Type export type Type = Entry["type"] -export type History = { - entries: Entry[] - pending: Entry[] -} - -export function step(old: History, event: SessionEvent.Event): History { - return produce(old, (draft) => { - const lastAssistant = draft.entries.findLast((x) => x.type === "assistant") - const pendingAssistant = lastAssistant && !lastAssistant.time.completed ? lastAssistant : undefined - type DraftContent = NonNullable["content"][number] - type DraftTool = Extract - - const latestTool = (callID?: string) => - pendingAssistant?.content.findLast( - (item): item is DraftTool => item.type === "tool" && (callID === undefined || item.callID === callID), - ) - const latestText = () => pendingAssistant?.content.findLast((item) => item.type === "text") - const latestReasoning = () => pendingAssistant?.content.findLast((item) => item.type === "reasoning") - - SessionEvent.Event.match(event, { - prompt: (event) => { - const entry = User.fromEvent(event) - if (pendingAssistant) { - draft.pending.push(castDraft(entry)) - return - } - draft.entries.push(castDraft(entry)) - }, - synthetic: (event) => { - draft.entries.push(new Synthetic({ ...event, time: { created: event.timestamp } })) - }, - "step.started": (event) => { - if (pendingAssistant) pendingAssistant.time.completed = event.timestamp - draft.entries.push({ - id: event.id, - type: "assistant", - time: { - created: event.timestamp, - }, - content: [], - }) - }, - "step.ended": (event) => { - if (!pendingAssistant) return - pendingAssistant.time.completed = event.timestamp - pendingAssistant.cost = event.cost - pendingAssistant.tokens = event.tokens - }, - "text.started": () => { - if (!pendingAssistant) return - pendingAssistant.content.push({ - type: "text", - text: "", - }) - }, - "text.delta": (event) => { - if (!pendingAssistant) return - const match = latestText() - if (match) match.text += event.delta - }, - "text.ended": () => {}, - "tool.input.started": (event) => { - if (!pendingAssistant) return - pendingAssistant.content.push({ - type: "tool", - callID: event.callID, - name: event.name, - time: { - created: event.timestamp, - }, - state: { - status: "pending", - input: "", - }, - }) - }, - "tool.input.delta": (event) => { - if (!pendingAssistant) return - const match = latestTool(event.callID) - // oxlint-disable-next-line no-base-to-string -- event.delta is a Schema.String (runtime string) - if (match) match.state.input += event.delta - }, - "tool.input.ended": () => {}, - "tool.called": (event) => { - if (!pendingAssistant) return - const match = latestTool(event.callID) - if (match) { - match.time.ran = event.timestamp - match.state = { - status: "running", - input: event.input, - } - } - }, - "tool.success": (event) => { - if (!pendingAssistant) return - const match = latestTool(event.callID) - if (match && match.state.status === "running") { - match.state = { - status: "completed", - input: match.state.input, - output: event.output ?? "", - title: event.title, - metadata: event.metadata ?? {}, - attachments: [...(event.attachments ?? [])], - } - } - }, - "tool.error": (event) => { - if (!pendingAssistant) return - const match = latestTool(event.callID) - if (match && match.state.status === "running") { - match.state = { - status: "error", - error: event.error, - input: match.state.input, - metadata: event.metadata ?? {}, - } - } - }, - "reasoning.started": () => { - if (!pendingAssistant) return - pendingAssistant.content.push({ - type: "reasoning", - text: "", - }) - }, - "reasoning.delta": (event) => { - if (!pendingAssistant) return - const match = latestReasoning() - if (match) match.text += event.delta - }, - "reasoning.ended": (event) => { - if (!pendingAssistant) return - const match = latestReasoning() - if (match) match.text = event.text - }, - retried: () => {}, - compacted: (event) => { - draft.entries.push(new Compaction({ ...event, type: "compaction", time: { created: event.timestamp } })) - }, - }) - }) -} - /* export interface Interface { readonly decode: (row: typeof SessionEntryTable.$inferSelect) => Entry diff --git a/packages/opencode/test/session/session-entry.test.ts b/packages/opencode/test/session/session-entry-stepper.test.ts similarity index 76% rename from packages/opencode/test/session/session-entry.test.ts rename to packages/opencode/test/session/session-entry-stepper.test.ts index dea8da20a0..a81b4c2be4 100644 --- a/packages/opencode/test/session/session-entry.test.ts +++ b/packages/opencode/test/session/session-entry-stepper.test.ts @@ -2,6 +2,7 @@ import { describe, expect, test } from "bun:test" import * as DateTime from "effect/DateTime" import * as FastCheck from "effect/testing/FastCheck" import { SessionEntry } from "../../src/v2/session-entry" +import { SessionEntryStepper } from "../../src/v2/session-entry-stepper" import { SessionEvent } from "../../src/v2/session-event" const time = (n: number) => DateTime.makeUnsafe(n) @@ -29,8 +30,8 @@ function assistant() { }) } -function history() { - const state: SessionEntry.History = { +function memoryState() { + const state: SessionEntryStepper.MemoryState = { entries: [], pending: [], } @@ -38,51 +39,189 @@ function history() { } function active() { - const state: SessionEntry.History = { + const state: SessionEntryStepper.MemoryState = { entries: [assistant()], pending: [], } return state } -function run(events: SessionEvent.Event[], state = history()) { - return events.reduce((state, event) => SessionEntry.step(state, event), state) +function run(events: SessionEvent.Event[], state = memoryState()) { + return events.reduce((state, event) => SessionEntryStepper.step(state, event), state) } -function last(state: SessionEntry.History) { +function last(state: SessionEntryStepper.MemoryState) { const entry = [...state.pending, ...state.entries].reverse().find((x) => x.type === "assistant") expect(entry?.type).toBe("assistant") return entry?.type === "assistant" ? entry : undefined } -function texts_of(state: SessionEntry.History) { +function texts_of(state: SessionEntryStepper.MemoryState) { const entry = last(state) if (!entry) return [] return entry.content.filter((x): x is SessionEntry.AssistantText => x.type === "text") } -function reasons(state: SessionEntry.History) { +function reasons(state: SessionEntryStepper.MemoryState) { const entry = last(state) if (!entry) return [] return entry.content.filter((x): x is SessionEntry.AssistantReasoning => x.type === "reasoning") } -function tools(state: SessionEntry.History) { +function tools(state: SessionEntryStepper.MemoryState) { const entry = last(state) if (!entry) return [] return entry.content.filter((x): x is SessionEntry.AssistantTool => x.type === "tool") } -function tool(state: SessionEntry.History, callID: string) { +function tool(state: SessionEntryStepper.MemoryState, callID: string) { return tools(state).find((x) => x.callID === callID) } -describe("session-entry step", () => { - describe("seeded pending assistant", () => { +function adapterStore() { + return { + committed: [] as SessionEntry.Entry[], + deferred: [] as SessionEntry.Entry[], + } +} + +function adapterFor(store: ReturnType): SessionEntryStepper.Adapter { + const activeAssistantIndex = () => + store.committed.findLastIndex((entry) => entry.type === "assistant" && !entry.time.completed) + + const getCurrentAssistant = () => { + const index = activeAssistantIndex() + if (index < 0) return + const assistant = store.committed[index] + return assistant?.type === "assistant" ? assistant : undefined + } + + return { + getCurrentAssistant, + updateAssistant(assistant) { + const index = activeAssistantIndex() + if (index < 0) return + const current = store.committed[index] + if (current?.type !== "assistant") return + store.committed[index] = assistant + }, + appendEntry(entry) { + store.committed.push(entry) + }, + appendPending(entry) { + store.deferred.push(entry) + }, + finish() { + return store + }, + } +} + +describe("session-entry-stepper", () => { + describe("stepWith", () => { + test("reduces through a custom adapter", () => { + const store = adapterStore() + store.committed.push(assistant()) + + SessionEntryStepper.stepWith(adapterFor(store), SessionEvent.Prompt.create({ text: "hello", timestamp: time(1) })) + SessionEntryStepper.stepWith(adapterFor(store), SessionEvent.Reasoning.Started.create({ timestamp: time(2) })) + SessionEntryStepper.stepWith(adapterFor(store), SessionEvent.Reasoning.Delta.create({ delta: "thinking", timestamp: time(3) })) + SessionEntryStepper.stepWith(adapterFor(store), SessionEvent.Reasoning.Ended.create({ text: "thought", timestamp: time(4) })) + SessionEntryStepper.stepWith(adapterFor(store), SessionEvent.Text.Started.create({ timestamp: time(5) })) + SessionEntryStepper.stepWith(adapterFor(store), SessionEvent.Text.Delta.create({ delta: "world", timestamp: time(6) })) + SessionEntryStepper.stepWith( + adapterFor(store), + SessionEvent.Step.Ended.create({ + reason: "stop", + cost: 1, + tokens: { + input: 1, + output: 2, + reasoning: 3, + cache: { + read: 4, + write: 5, + }, + }, + timestamp: time(7), + }), + ) + + expect(store.deferred).toHaveLength(1) + expect(store.deferred[0]?.type).toBe("user") + expect(store.committed).toHaveLength(1) + expect(store.committed[0]?.type).toBe("assistant") + if (store.committed[0]?.type !== "assistant") return + + expect(store.committed[0].content).toEqual([ + { type: "reasoning", text: "thought" }, + { type: "text", text: "world" }, + ]) + expect(store.committed[0].time.completed).toEqual(time(7)) + }) + }) + + describe("memory", () => { + test("tracks and replaces the current assistant", () => { + const state = active() + const adapter = SessionEntryStepper.memory(state) + const current = adapter.getCurrentAssistant() + + expect(current?.type).toBe("assistant") + if (!current) return + + adapter.updateAssistant( + new SessionEntry.Assistant({ + ...current, + content: [new SessionEntry.AssistantText({ type: "text", text: "done" })], + time: { + ...current.time, + completed: time(1), + }, + }), + ) + + expect(adapter.getCurrentAssistant()).toBeUndefined() + expect(state.entries[0]?.type).toBe("assistant") + if (state.entries[0]?.type !== "assistant") return + + expect(state.entries[0].content).toEqual([{ type: "text", text: "done" }]) + expect(state.entries[0].time.completed).toEqual(time(1)) + }) + + test("appends committed and pending entries", () => { + const state = memoryState() + const adapter = SessionEntryStepper.memory(state) + const committed = SessionEntry.User.fromEvent(SessionEvent.Prompt.create({ text: "committed", timestamp: time(1) })) + const pending = SessionEntry.User.fromEvent(SessionEvent.Prompt.create({ text: "pending", timestamp: time(2) })) + + adapter.appendEntry(committed) + adapter.appendPending(pending) + + expect(state.entries).toEqual([committed]) + expect(state.pending).toEqual([pending]) + }) + + test("stepWith through memory records reasoning", () => { + const state = active() + + SessionEntryStepper.stepWith(SessionEntryStepper.memory(state), SessionEvent.Reasoning.Started.create({ timestamp: time(1) })) + SessionEntryStepper.stepWith(SessionEntryStepper.memory(state), SessionEvent.Reasoning.Delta.create({ delta: "draft", timestamp: time(2) })) + SessionEntryStepper.stepWith( + SessionEntryStepper.memory(state), + SessionEvent.Reasoning.Ended.create({ text: "final", timestamp: time(3) }), + ) + + expect(reasons(state)).toEqual([{ type: "reasoning", text: "final" }]) + }) + }) + + describe("step", () => { + describe("seeded pending assistant", () => { test("stores prompts in entries when no assistant is pending", () => { FastCheck.assert( FastCheck.property(word, (body) => { - const next = SessionEntry.step(history(), SessionEvent.Prompt.create({ text: body, timestamp: time(1) })) + const next = SessionEntryStepper.step(memoryState(), SessionEvent.Prompt.create({ text: body, timestamp: time(1) })) expect(next.entries).toHaveLength(1) expect(next.entries[0]?.type).toBe("user") if (next.entries[0]?.type !== "user") return @@ -95,7 +234,7 @@ describe("session-entry step", () => { test("stores prompts in pending when an assistant is pending", () => { FastCheck.assert( FastCheck.property(word, (body) => { - const next = SessionEntry.step(active(), SessionEvent.Prompt.create({ text: body, timestamp: time(1) })) + const next = SessionEntryStepper.step(active(), SessionEvent.Prompt.create({ text: body, timestamp: time(1) })) expect(next.pending).toHaveLength(1) expect(next.pending[0]?.type).toBe("user") if (next.pending[0]?.type !== "user") return @@ -110,8 +249,8 @@ describe("session-entry step", () => { FastCheck.property(texts, (parts) => { const next = parts.reduce( (state, part, i) => - SessionEntry.step(state, SessionEvent.Text.Delta.create({ delta: part, timestamp: time(i + 2) })), - SessionEntry.step(active(), SessionEvent.Text.Started.create({ timestamp: time(1) })), + SessionEntryStepper.step(state, SessionEvent.Text.Delta.create({ delta: part, timestamp: time(i + 2) })), + SessionEntryStepper.step(active(), SessionEvent.Text.Started.create({ timestamp: time(1) })), ) expect(texts_of(next)).toEqual([ @@ -302,7 +441,7 @@ describe("session-entry step", () => { }, timestamp: time(n), }) - const next = SessionEntry.step(active(), event) + const next = SessionEntryStepper.step(active(), event) const entry = last(next) expect(entry).toBeDefined() if (!entry) return @@ -316,12 +455,12 @@ describe("session-entry step", () => { }) }) - describe("known reducer gaps", () => { + describe("known reducer gaps", () => { test("prompt appends immutably when no assistant is pending", () => { FastCheck.assert( FastCheck.property(word, (body) => { - const old = history() - const next = SessionEntry.step(old, SessionEvent.Prompt.create({ text: body, timestamp: time(1) })) + const old = memoryState() + const next = SessionEntryStepper.step(old, SessionEvent.Prompt.create({ text: body, timestamp: time(1) })) expect(old).not.toBe(next) expect(old.entries).toHaveLength(0) expect(next.entries).toHaveLength(1) @@ -334,7 +473,7 @@ describe("session-entry step", () => { FastCheck.assert( FastCheck.property(word, (body) => { const old = active() - const next = SessionEntry.step(old, SessionEvent.Prompt.create({ text: body, timestamp: time(1) })) + const next = SessionEntryStepper.step(old, SessionEvent.Prompt.create({ text: body, timestamp: time(1) })) expect(old).not.toBe(next) expect(old.pending).toHaveLength(0) expect(next.pending).toHaveLength(1) @@ -651,7 +790,7 @@ describe("session-entry step", () => { test("records synthetic events", () => { FastCheck.assert( FastCheck.property(word, (body) => { - const next = SessionEntry.step(history(), SessionEvent.Synthetic.create({ text: body, timestamp: time(1) })) + const next = SessionEntryStepper.step(memoryState(), SessionEvent.Synthetic.create({ text: body, timestamp: time(1) })) expect(next.entries).toHaveLength(1) expect(next.entries[0]?.type).toBe("synthetic") if (next.entries[0]?.type !== "synthetic") return @@ -664,8 +803,8 @@ describe("session-entry step", () => { test("records compaction events", () => { FastCheck.assert( FastCheck.property(FastCheck.boolean(), maybe(FastCheck.boolean()), (auto, overflow) => { - const next = SessionEntry.step( - history(), + const next = SessionEntryStepper.step( + memoryState(), SessionEvent.Compacted.create({ auto, overflow, timestamp: time(1) }), ) expect(next.entries).toHaveLength(1) @@ -677,5 +816,6 @@ describe("session-entry step", () => { { numRuns: 50 }, ) }) + }) }) })