Compare commits

..

1 Commits

Author SHA1 Message Date
Kit Langton
bbbe611d5b refactor(cli): convert debug agent command to effectCmd
Drops bootstrap() + 3 AppRuntime.runPromise wrappers. Helpers
(getAvailableTools, createToolContext) now return Effects yielded
directly. Instance.directory/.worktree → ctx.directory/.worktree from
InstanceRef. process.exit(1) → fail("", 1) for the three error paths
(stderr message printed inline, exit code 1).

Bonus fix: --tool execution path was awaiting tool.execute() which
returns an Effect (not a Promise), so result was the Effect object
itself — JSON.stringify produced garbage. Now properly yields the Effect
to get the ExecuteResult.
2026-05-02 17:19:02 -04:00
66 changed files with 2431 additions and 9361 deletions

View File

@@ -83,7 +83,6 @@ export const Flag = {
OPENCODE_WORKSPACE_ID: process.env["OPENCODE_WORKSPACE_ID"],
OPENCODE_EXPERIMENTAL_HTTPAPI: truthy("OPENCODE_EXPERIMENTAL_HTTPAPI"),
OPENCODE_EXPERIMENTAL_WORKSPACES: OPENCODE_EXPERIMENTAL || truthy("OPENCODE_EXPERIMENTAL_WORKSPACES"),
OPENCODE_EXPERIMENTAL_EVENT_SYSTEM: OPENCODE_EXPERIMENTAL || truthy("OPENCODE_EXPERIMENTAL_EVENT_SYSTEM"),
// Evaluated at access time (not module load) because tests, the CLI, and
// external tooling set these env vars at runtime.

View File

@@ -1,5 +1,3 @@
export * as Log from "./log"
import path from "path"
import fs from "fs/promises"
import { createWriteStream } from "fs"

View File

@@ -1,17 +0,0 @@
CREATE TABLE `session_message` (
`id` text PRIMARY KEY,
`session_id` text NOT NULL,
`type` text NOT NULL,
`time_created` integer NOT NULL,
`time_updated` integer NOT NULL,
`data` text NOT NULL,
CONSTRAINT `fk_session_message_session_id_session_id_fk` FOREIGN KEY (`session_id`) REFERENCES `session`(`id`) ON DELETE CASCADE
);
--> statement-breakpoint
DROP INDEX IF EXISTS `session_entry_session_idx`;--> statement-breakpoint
DROP INDEX IF EXISTS `session_entry_session_type_idx`;--> statement-breakpoint
DROP INDEX IF EXISTS `session_entry_time_created_idx`;--> statement-breakpoint
CREATE INDEX `session_message_session_idx` ON `session_message` (`session_id`);--> statement-breakpoint
CREATE INDEX `session_message_session_type_idx` ON `session_message` (`session_id`,`type`);--> statement-breakpoint
CREATE INDEX `session_message_time_created_idx` ON `session_message` (`time_created`);--> statement-breakpoint
DROP TABLE `session_entry`;

View File

@@ -2,9 +2,7 @@
"version": "7",
"dialect": "sqlite",
"id": "aaa2ebeb-caa4-478d-8365-4fc595d16856",
"prevIds": [
"61f807f9-6398-4067-be05-804acc2561bc"
],
"prevIds": ["66cbe0d7-def0-451b-b88a-7608513a9b44"],
"ddl": [
{
"name": "account_state",
@@ -39,7 +37,7 @@
"entityType": "tables"
},
{
"name": "session_message",
"name": "session_entry",
"entityType": "tables"
},
{
@@ -600,7 +598,7 @@
"generated": null,
"name": "id",
"entityType": "columns",
"table": "session_message"
"table": "session_entry"
},
{
"type": "text",
@@ -610,7 +608,7 @@
"generated": null,
"name": "session_id",
"entityType": "columns",
"table": "session_message"
"table": "session_entry"
},
{
"type": "text",
@@ -620,7 +618,7 @@
"generated": null,
"name": "type",
"entityType": "columns",
"table": "session_message"
"table": "session_entry"
},
{
"type": "integer",
@@ -630,7 +628,7 @@
"generated": null,
"name": "time_created",
"entityType": "columns",
"table": "session_message"
"table": "session_entry"
},
{
"type": "integer",
@@ -640,7 +638,7 @@
"generated": null,
"name": "time_updated",
"entityType": "columns",
"table": "session_message"
"table": "session_entry"
},
{
"type": "text",
@@ -650,7 +648,7 @@
"generated": null,
"name": "data",
"entityType": "columns",
"table": "session_message"
"table": "session_entry"
},
{
"type": "text",
@@ -1053,13 +1051,9 @@
"table": "event"
},
{
"columns": [
"active_account_id"
],
"columns": ["active_account_id"],
"tableTo": "account",
"columnsTo": [
"id"
],
"columnsTo": ["id"],
"onUpdate": "NO ACTION",
"onDelete": "SET NULL",
"nameExplicit": false,
@@ -1068,13 +1062,9 @@
"table": "account_state"
},
{
"columns": [
"project_id"
],
"columns": ["project_id"],
"tableTo": "project",
"columnsTo": [
"id"
],
"columnsTo": ["id"],
"onUpdate": "NO ACTION",
"onDelete": "CASCADE",
"nameExplicit": false,
@@ -1083,13 +1073,9 @@
"table": "workspace"
},
{
"columns": [
"session_id"
],
"columns": ["session_id"],
"tableTo": "session",
"columnsTo": [
"id"
],
"columnsTo": ["id"],
"onUpdate": "NO ACTION",
"onDelete": "CASCADE",
"nameExplicit": false,
@@ -1098,13 +1084,9 @@
"table": "message"
},
{
"columns": [
"message_id"
],
"columns": ["message_id"],
"tableTo": "message",
"columnsTo": [
"id"
],
"columnsTo": ["id"],
"onUpdate": "NO ACTION",
"onDelete": "CASCADE",
"nameExplicit": false,
@@ -1113,13 +1095,9 @@
"table": "part"
},
{
"columns": [
"project_id"
],
"columns": ["project_id"],
"tableTo": "project",
"columnsTo": [
"id"
],
"columnsTo": ["id"],
"onUpdate": "NO ACTION",
"onDelete": "CASCADE",
"nameExplicit": false,
@@ -1128,28 +1106,20 @@
"table": "permission"
},
{
"columns": [
"session_id"
],
"columns": ["session_id"],
"tableTo": "session",
"columnsTo": [
"id"
],
"columnsTo": ["id"],
"onUpdate": "NO ACTION",
"onDelete": "CASCADE",
"nameExplicit": false,
"name": "fk_session_message_session_id_session_id_fk",
"name": "fk_session_entry_session_id_session_id_fk",
"entityType": "fks",
"table": "session_message"
"table": "session_entry"
},
{
"columns": [
"project_id"
],
"columns": ["project_id"],
"tableTo": "project",
"columnsTo": [
"id"
],
"columnsTo": ["id"],
"onUpdate": "NO ACTION",
"onDelete": "CASCADE",
"nameExplicit": false,
@@ -1158,13 +1128,9 @@
"table": "session"
},
{
"columns": [
"session_id"
],
"columns": ["session_id"],
"tableTo": "session",
"columnsTo": [
"id"
],
"columnsTo": ["id"],
"onUpdate": "NO ACTION",
"onDelete": "CASCADE",
"nameExplicit": false,
@@ -1173,13 +1139,9 @@
"table": "todo"
},
{
"columns": [
"session_id"
],
"columns": ["session_id"],
"tableTo": "session",
"columnsTo": [
"id"
],
"columnsTo": ["id"],
"onUpdate": "NO ACTION",
"onDelete": "CASCADE",
"nameExplicit": false,
@@ -1188,13 +1150,9 @@
"table": "session_share"
},
{
"columns": [
"aggregate_id"
],
"columns": ["aggregate_id"],
"tableTo": "event_sequence",
"columnsTo": [
"aggregate_id"
],
"columnsTo": ["aggregate_id"],
"onUpdate": "NO ACTION",
"onDelete": "CASCADE",
"nameExplicit": false,
@@ -1203,128 +1161,98 @@
"table": "event"
},
{
"columns": [
"email",
"url"
],
"columns": ["email", "url"],
"nameExplicit": false,
"name": "control_account_pk",
"entityType": "pks",
"table": "control_account"
},
{
"columns": [
"session_id",
"position"
],
"columns": ["session_id", "position"],
"nameExplicit": false,
"name": "todo_pk",
"entityType": "pks",
"table": "todo"
},
{
"columns": [
"id"
],
"columns": ["id"],
"nameExplicit": false,
"name": "account_state_pk",
"table": "account_state",
"entityType": "pks"
},
{
"columns": [
"id"
],
"columns": ["id"],
"nameExplicit": false,
"name": "account_pk",
"table": "account",
"entityType": "pks"
},
{
"columns": [
"id"
],
"columns": ["id"],
"nameExplicit": false,
"name": "workspace_pk",
"table": "workspace",
"entityType": "pks"
},
{
"columns": [
"id"
],
"columns": ["id"],
"nameExplicit": false,
"name": "project_pk",
"table": "project",
"entityType": "pks"
},
{
"columns": [
"id"
],
"columns": ["id"],
"nameExplicit": false,
"name": "message_pk",
"table": "message",
"entityType": "pks"
},
{
"columns": [
"id"
],
"columns": ["id"],
"nameExplicit": false,
"name": "part_pk",
"table": "part",
"entityType": "pks"
},
{
"columns": [
"project_id"
],
"columns": ["project_id"],
"nameExplicit": false,
"name": "permission_pk",
"table": "permission",
"entityType": "pks"
},
{
"columns": [
"id"
],
"columns": ["id"],
"nameExplicit": false,
"name": "session_message_pk",
"table": "session_message",
"name": "session_entry_pk",
"table": "session_entry",
"entityType": "pks"
},
{
"columns": [
"id"
],
"columns": ["id"],
"nameExplicit": false,
"name": "session_pk",
"table": "session",
"entityType": "pks"
},
{
"columns": [
"session_id"
],
"columns": ["session_id"],
"nameExplicit": false,
"name": "session_share_pk",
"table": "session_share",
"entityType": "pks"
},
{
"columns": [
"aggregate_id"
],
"columns": ["aggregate_id"],
"nameExplicit": false,
"name": "event_sequence_pk",
"table": "event_sequence",
"entityType": "pks"
},
{
"columns": [
"id"
],
"columns": ["id"],
"nameExplicit": false,
"name": "event_pk",
"table": "event",
@@ -1394,9 +1322,9 @@
"isUnique": false,
"where": null,
"origin": "manual",
"name": "session_message_session_idx",
"name": "session_entry_session_idx",
"entityType": "indexes",
"table": "session_message"
"table": "session_entry"
},
{
"columns": [
@@ -1412,9 +1340,9 @@
"isUnique": false,
"where": null,
"origin": "manual",
"name": "session_message_session_type_idx",
"name": "session_entry_session_type_idx",
"entityType": "indexes",
"table": "session_message"
"table": "session_entry"
},
{
"columns": [
@@ -1426,9 +1354,9 @@
"isUnique": false,
"where": null,
"origin": "manual",
"name": "session_message_time_created_idx",
"name": "session_entry_time_created_idx",
"entityType": "indexes",
"table": "session_message"
"table": "session_entry"
},
{
"columns": [

View File

@@ -1,2 +0,0 @@
ALTER TABLE `session` ADD `agent` text;--> statement-breakpoint
ALTER TABLE `session` ADD `model` text;

View File

@@ -24,7 +24,6 @@ export function payloads() {
.map(([type, def]) => {
return z
.object({
id: z.string(),
type: z.literal(type),
properties: zodObject(def.properties),
})
@@ -40,7 +39,6 @@ export function effectPayloads() {
.entries()
.map(([type, def]) =>
Schema.Struct({
id: Schema.String,
type: Schema.Literal(type),
properties: def.properties,
}).annotate({ identifier: `Event.${type}` }),

View File

@@ -1,5 +1,4 @@
import { EventEmitter } from "events"
import { Identifier } from "@/id/id"
export type GlobalEvent = {
directory?: string
@@ -8,15 +7,6 @@ export type GlobalEvent = {
payload: any
}
class GlobalBusEmitter extends EventEmitter<{
export const GlobalBus = new EventEmitter<{
event: [GlobalEvent]
}> {
override emit(eventName: "event", event: GlobalEvent): boolean {
if (event.payload && typeof event.payload === "object" && !("id" in event.payload)) {
event.payload.id = event.payload.syncEvent?.id ?? Identifier.create("evt", "ascending")
}
return super.emit(eventName, event)
}
}
export const GlobalBus = new GlobalBusEmitter()
}>()

View File

@@ -5,7 +5,6 @@ import { BusEvent } from "./bus-event"
import { GlobalBus } from "./global"
import { InstanceState } from "@/effect/instance-state"
import { makeRuntime } from "@/effect/run-service"
import { Identifier } from "@/id/id"
const log = Log.create({ service: "bus" })
@@ -19,7 +18,6 @@ export const InstanceDisposed = BusEvent.define(
)
type Payload<D extends BusEvent.Definition = BusEvent.Definition> = {
id: string
type: D["type"]
properties: BusProperties<D>
}
@@ -30,11 +28,7 @@ type State = {
}
export interface Interface {
readonly publish: <D extends BusEvent.Definition>(
def: D,
properties: BusProperties<D>,
options?: { id?: string },
) => Effect.Effect<void>
readonly publish: <D extends BusEvent.Definition>(def: D, properties: BusProperties<D>) => Effect.Effect<void>
readonly subscribe: <D extends BusEvent.Definition>(def: D) => Stream.Stream<Payload<D>>
readonly subscribeAll: () => Stream.Stream<Payload>
readonly subscribeCallback: <D extends BusEvent.Definition>(
@@ -59,7 +53,6 @@ export const layer = Layer.effect(
// Publish InstanceDisposed before shutting down so subscribers see it
yield* PubSub.publish(wildcard, {
type: InstanceDisposed.type,
id: createID(),
properties: { directory: ctx.directory },
})
yield* PubSub.shutdown(wildcard)
@@ -84,10 +77,10 @@ export const layer = Layer.effect(
})
}
function publish<D extends BusEvent.Definition>(def: D, properties: BusProperties<D>, options?: { id?: string }) {
function publish<D extends BusEvent.Definition>(def: D, properties: BusProperties<D>) {
return Effect.gen(function* () {
const s = yield* InstanceState.get(state)
const payload: Payload = { id: options?.id ?? createID(), type: def.type, properties }
const payload: Payload = { type: def.type, properties }
log.info("publishing", { type: def.type })
const ps = s.typed.get(def.type)
@@ -180,16 +173,8 @@ const { runPromise, runSync } = makeRuntime(Service, layer)
// runSync is safe here because the subscribe chain (InstanceState.get, PubSub.subscribe,
// Scope.make, Effect.forkScoped) is entirely synchronous. If any step becomes async, this will throw.
export function createID() {
return Identifier.create("evt", "ascending")
}
export async function publish<D extends BusEvent.Definition>(
def: D,
properties: BusProperties<D>,
options?: { id?: string },
) {
return runPromise((svc) => svc.publish(def, properties, options))
export async function publish<D extends BusEvent.Definition>(def: D, properties: BusProperties<D>) {
return runPromise((svc) => svc.publish(def, properties))
}
export function subscribe<D extends BusEvent.Definition>(def: D, callback: (event: Payload<D>) => unknown) {

View File

@@ -7,14 +7,14 @@ import { Session } from "@/session/session"
import type { MessageV2 } from "../../../session/message-v2"
import { MessageID, PartID } from "../../../session/schema"
import { ToolRegistry } from "@/tool/registry"
import { Instance } from "../../../project/instance"
import { Permission } from "../../../permission"
import { iife } from "../../../util/iife"
import { bootstrap } from "../../bootstrap"
import { cmd } from "../cmd"
import { AppRuntime } from "@/effect/app-runtime"
import { effectCmd, fail } from "../../effect-cmd"
import { InstanceRef } from "@/effect/instance-ref"
import { InstanceStore } from "@/project/instance-store"
import type { InstanceContext } from "@/project/instance"
export const AgentCommand = cmd({
export const AgentCommand = effectCmd({
command: "agent <name>",
describe: "show agent configuration details",
builder: (yargs) =>
@@ -32,60 +32,61 @@ export const AgentCommand = cmd({
type: "string",
description: "Tool params as JSON or a JS object literal",
}),
async handler(args) {
await bootstrap(process.cwd(), async () => {
const agentName = args.name as string
const agent = await AppRuntime.runPromise(Agent.Service.use((svc) => svc.get(agentName)))
if (!agent) {
process.stderr.write(
`Agent ${agentName} not found, run '${basename(process.execPath)} agent list' to get an agent list` + EOL,
)
process.exit(1)
}
const availableTools = await getAvailableTools(agent)
const resolvedTools = await resolveTools(agent, availableTools)
const toolID = args.tool as string | undefined
if (toolID) {
const tool = availableTools.find((item) => item.id === toolID)
if (!tool) {
process.stderr.write(`Tool ${toolID} not found for agent ${agentName}` + EOL)
process.exit(1)
}
if (resolvedTools[toolID] === false) {
process.stderr.write(`Tool ${toolID} is disabled for agent ${agentName}` + EOL)
process.exit(1)
}
const params = parseToolParams(args.params as string | undefined)
const ctx = await createToolContext(agent)
const result = await tool.execute(params, ctx)
process.stdout.write(JSON.stringify({ tool: toolID, input: params, result }, null, 2) + EOL)
return
}
const output = {
...agent,
tools: resolvedTools,
}
process.stdout.write(JSON.stringify(output, null, 2) + EOL)
})
},
handler: Effect.fn("Cli.debug.agent")(function* (args) {
const ctx = yield* InstanceRef
if (!ctx) return
const store = yield* InstanceStore.Service
return yield* run(args, ctx).pipe(Effect.ensuring(store.dispose(ctx)))
}),
})
async function getAvailableTools(agent: Agent.Info) {
return AppRuntime.runPromise(
Effect.gen(function* () {
const provider = yield* Provider.Service
const registry = yield* ToolRegistry.Service
const model = agent.model ?? (yield* provider.defaultModel())
return yield* registry.tools({
...model,
agent,
})
}),
)
}
const run = Effect.fn("Cli.debug.agent.body")(function* (
args: { name: string; tool?: string; params?: string },
ctx: InstanceContext,
) {
const agentName = args.name
const agent = yield* Agent.Service.use((svc) => svc.get(agentName))
if (!agent) {
process.stderr.write(
`Agent ${agentName} not found, run '${basename(process.execPath)} agent list' to get an agent list` + EOL,
)
return yield* fail("", 1)
}
const availableTools = yield* getAvailableTools(agent)
const resolvedTools = resolveTools(agent, availableTools)
const toolID = args.tool
if (toolID) {
const tool = availableTools.find((item) => item.id === toolID)
if (!tool) {
process.stderr.write(`Tool ${toolID} not found for agent ${agentName}` + EOL)
return yield* fail("", 1)
}
if (resolvedTools[toolID] === false) {
process.stderr.write(`Tool ${toolID} is disabled for agent ${agentName}` + EOL)
return yield* fail("", 1)
}
const params = parseToolParams(args.params)
const toolCtx = yield* createToolContext(agent, ctx)
const result = yield* tool.execute(params, toolCtx)
process.stdout.write(JSON.stringify({ tool: toolID, input: params, result }, null, 2) + EOL)
return
}
async function resolveTools(agent: Agent.Info, availableTools: Awaited<ReturnType<typeof getAvailableTools>>) {
const output = {
...agent,
tools: resolvedTools,
}
process.stdout.write(JSON.stringify(output, null, 2) + EOL)
})
const getAvailableTools = Effect.fn("Cli.debug.agent.getAvailableTools")(function* (agent: Agent.Info) {
const provider = yield* Provider.Service
const registry = yield* ToolRegistry.Service
const model = agent.model ?? (yield* provider.defaultModel())
return yield* registry.tools({ ...model, agent })
})
function resolveTools(agent: Agent.Info, availableTools: { id: string }[]) {
const disabled = Permission.disabled(
availableTools.map((tool) => tool.id),
agent.permission,
@@ -123,50 +124,38 @@ function parseToolParams(input?: string) {
return parsed as Record<string, unknown>
}
async function createToolContext(agent: Agent.Info) {
const { session, messageID } = await AppRuntime.runPromise(
Effect.gen(function* () {
const session = yield* Session.Service
const result = yield* session.create({ title: `Debug tool run (${agent.name})` })
const messageID = MessageID.ascending()
const model = agent.model
? agent.model
: yield* Effect.gen(function* () {
const provider = yield* Provider.Service
return yield* provider.defaultModel()
})
const now = Date.now()
const message: MessageV2.Assistant = {
id: messageID,
sessionID: result.id,
role: "assistant",
time: {
created: now,
},
parentID: messageID,
modelID: model.modelID,
providerID: model.providerID,
mode: "debug",
agent: agent.name,
path: {
cwd: Instance.directory,
root: Instance.worktree,
},
cost: 0,
tokens: {
input: 0,
output: 0,
reasoning: 0,
cache: {
read: 0,
write: 0,
},
},
}
yield* session.updateMessage(message)
return { session: result, messageID }
}),
)
const createToolContext = Effect.fn("Cli.debug.agent.createToolContext")(function* (
agent: Agent.Info,
ctx: InstanceContext,
) {
const sessionSvc = yield* Session.Service
const session = yield* sessionSvc.create({ title: `Debug tool run (${agent.name})` })
const messageID = MessageID.ascending()
const model = agent.model
? agent.model
: yield* Effect.gen(function* () {
const provider = yield* Provider.Service
return yield* provider.defaultModel()
})
const now = Date.now()
const message: MessageV2.Assistant = {
id: messageID,
sessionID: session.id,
role: "assistant",
time: { created: now },
parentID: messageID,
modelID: model.modelID,
providerID: model.providerID,
mode: "debug",
agent: agent.name,
path: {
cwd: ctx.directory,
root: ctx.worktree,
},
cost: 0,
tokens: { input: 0, output: 0, reasoning: 0, cache: { read: 0, write: 0 } },
}
yield* sessionSvc.updateMessage(message)
const ruleset = Permission.merge(agent.permission, session.permission ?? [])
@@ -189,4 +178,4 @@ async function createToolContext(agent: Agent.Info) {
})
},
}
}
})

View File

@@ -1,13 +1,13 @@
import type { Argv } from "yargs"
import { Session } from "@/session/session"
import { MessageV2 } from "../../session/message-v2"
import { SessionID } from "../../session/schema"
import { effectCmd, fail } from "../effect-cmd"
import { cmd } from "./cmd"
import { bootstrap } from "../bootstrap"
import { UI } from "../ui"
import * as prompts from "@clack/prompts"
import { EOL } from "os"
import { Effect } from "effect"
import { InstanceRef } from "@/effect/instance-ref"
import { InstanceStore } from "@/project/instance-store"
import { AppRuntime } from "@/effect/app-runtime"
function redact(kind: string, id: string, value: string) {
return value.trim() ? `[redacted:${kind}:${id}]` : value
@@ -220,11 +220,11 @@ function sanitize(data: { info: Session.Info; messages: MessageV2.WithParts[] })
}
}
export const ExportCommand = effectCmd({
export const ExportCommand = cmd({
command: "export [sessionID]",
describe: "export session data as JSON",
builder: (yargs) =>
yargs
builder: (yargs: Argv) => {
return yargs
.positional("sessionID", {
describe: "session id to export",
type: "string",
@@ -232,65 +232,72 @@ export const ExportCommand = effectCmd({
.option("sanitize", {
describe: "redact sensitive transcript and file data",
type: "boolean",
}),
handler: Effect.fn("Cli.export")(function* (args) {
const ctx = yield* InstanceRef
if (!ctx) return
const store = yield* InstanceStore.Service
return yield* run(args).pipe(Effect.ensuring(store.dispose(ctx)))
}),
})
const run = Effect.fn("Cli.export.body")(function* (args: { sessionID?: string; sanitize?: boolean }) {
const svc = yield* Session.Service
let sessionID = args.sessionID ? SessionID.make(args.sessionID) : undefined
process.stderr.write(`Exporting session: ${sessionID ?? "latest"}\n`)
if (!sessionID) {
UI.empty()
prompts.intro("Export session", { output: process.stderr })
const sessions = yield* svc.list()
if (sessions.length === 0) {
prompts.log.error("No sessions found", { output: process.stderr })
prompts.outro("Done", { output: process.stderr })
return
}
sessions.sort((a, b) => b.time.updated - a.time.updated)
const selectedSession = yield* Effect.promise(() =>
prompts.autocomplete({
message: "Select session to export",
maxItems: 10,
options: sessions.map((session) => ({
label: session.title,
value: session.id,
hint: `${new Date(session.time.updated).toLocaleString()}${session.id.slice(-8)}`,
})),
output: process.stderr,
}),
)
if (prompts.isCancel(selectedSession)) {
return yield* Effect.die(new UI.CancelledError())
}
sessionID = selectedSession
prompts.outro("Exporting session...", { output: process.stderr })
}
// Match legacy try/catch — catches both typed failures and defects
// (Session.Service.get throws NotFoundError as a defect, not a typed E).
return yield* Effect.gen(function* () {
const sessionInfo = yield* svc.get(sessionID!)
const messages = yield* svc.messages({ sessionID: sessionInfo.id })
const exportData = { info: sessionInfo, messages }
process.stdout.write(JSON.stringify(args.sanitize ? sanitize(exportData) : exportData, null, 2))
process.stdout.write(EOL)
}).pipe(Effect.catchCause(() => fail(`Session not found: ${sessionID!}`)))
})
},
handler: async (args) => {
await bootstrap(process.cwd(), async () => {
let sessionID = args.sessionID ? SessionID.make(args.sessionID) : undefined
process.stderr.write(`Exporting session: ${sessionID ?? "latest"}\n`)
if (!sessionID) {
UI.empty()
prompts.intro("Export session", {
output: process.stderr,
})
const sessions = await AppRuntime.runPromise(Session.Service.use((svc) => svc.list()))
if (sessions.length === 0) {
prompts.log.error("No sessions found", {
output: process.stderr,
})
prompts.outro("Done", {
output: process.stderr,
})
return
}
sessions.sort((a, b) => b.time.updated - a.time.updated)
const selectedSession = await prompts.autocomplete({
message: "Select session to export",
maxItems: 10,
options: sessions.map((session) => ({
label: session.title,
value: session.id,
hint: `${new Date(session.time.updated).toLocaleString()}${session.id.slice(-8)}`,
})),
output: process.stderr,
})
if (prompts.isCancel(selectedSession)) {
throw new UI.CancelledError()
}
sessionID = selectedSession
prompts.outro("Exporting session...", {
output: process.stderr,
})
}
try {
const sessionInfo = await AppRuntime.runPromise(Session.Service.use((svc) => svc.get(sessionID!)))
const messages = await AppRuntime.runPromise(
Session.Service.use((svc) => svc.messages({ sessionID: sessionInfo.id })),
)
const exportData = {
info: sessionInfo,
messages,
}
process.stdout.write(JSON.stringify(args.sanitize ? sanitize(exportData) : exportData, null, 2))
process.stdout.write(EOL)
} catch {
UI.error(`Session not found: ${sessionID!}`)
process.exit(1)
}
})
},
})

View File

@@ -1,15 +1,17 @@
import type { Argv } from "yargs"
import type { Session as SDKSession, Message, Part } from "@opencode-ai/sdk/v2"
import { Session } from "@/session/session"
import { MessageV2 } from "../../session/message-v2"
import { CliError, effectCmd } from "../effect-cmd"
import { cmd } from "./cmd"
import { bootstrap } from "../bootstrap"
import { Database } from "@/storage/db"
import { SessionTable, MessageTable, PartTable } from "../../session/session.sql"
import { InstanceRef } from "@/effect/instance-ref"
import { InstanceStore } from "@/project/instance-store"
import { Instance } from "../../project/instance"
import { ShareNext } from "@/share/share-next"
import { EOL } from "os"
import { Filesystem } from "@/util/filesystem"
import { Effect, Schema } from "effect"
import { AppRuntime } from "@/effect/app-runtime"
import { Schema } from "effect"
const decodeMessageInfo = Schema.decodeUnknownSync(MessageV2.Info)
const decodePart = Schema.decodeUnknownSync(MessageV2.Part)
@@ -76,147 +78,135 @@ export function transformShareData(shareData: ShareData[]): {
}
}
type ExportData = { info: SDKSession; messages: Array<{ info: Message; parts: Part[] }> }
export const ImportCommand = effectCmd({
export const ImportCommand = cmd({
command: "import <file>",
describe: "import session data from JSON file or URL",
builder: (yargs) =>
yargs.positional("file", {
builder: (yargs: Argv) => {
return yargs.positional("file", {
describe: "path to JSON file or share URL",
type: "string",
demandOption: true,
}),
handler: Effect.fn("Cli.import")(function* (args) {
// effectCmd always provides InstanceRef via InstanceStore.Service.provide; this is an invariant.
const ctx = yield* InstanceRef
if (!ctx) return yield* Effect.die("InstanceRef not provided")
const store = yield* InstanceStore.Service
// Ensure store.dispose runs disposers and emits server.instance.disposed
// on every exit path: success, early return, typed failure, defect, interrupt.
return yield* runImport(args.file, ctx.project.id).pipe(Effect.ensuring(store.dispose(ctx)))
}),
})
const runImport = Effect.fn("Cli.import.body")(function* (file: string, projectID: string) {
const share = yield* ShareNext.Service
let exportData: ExportData | undefined
const isUrl = file.startsWith("http://") || file.startsWith("https://")
if (isUrl) {
const slug = parseShareUrl(file)
if (!slug) {
const baseUrl = yield* Effect.orDie(share.url())
process.stdout.write(`Invalid URL format. Expected: ${baseUrl}/share/<slug>`)
process.stdout.write(EOL)
return
}
const baseUrl = new URL(file).origin
const req = yield* Effect.orDie(share.request())
const headers = shouldAttachShareAuthHeaders(file, req.baseUrl) ? req.headers : {}
const tryFetch = (url: string) =>
Effect.tryPromise({
try: () => fetch(url, { headers }),
catch: (e) =>
new CliError({
message: `Failed to fetch share data: ${e instanceof Error ? e.message : String(e)}`,
}),
})
const dataPath = req.api.data(slug)
let response = yield* tryFetch(`${baseUrl}${dataPath}`)
if (!response.ok && dataPath !== `/api/share/${slug}/data`) {
response = yield* tryFetch(`${baseUrl}/api/share/${slug}/data`)
}
if (!response.ok) {
process.stdout.write(`Failed to fetch share data: ${response.statusText}`)
process.stdout.write(EOL)
return
}
const shareData = yield* Effect.tryPromise({
try: () => response.json() as Promise<ShareData[]>,
catch: () => new CliError({ message: "Share data was not valid JSON" }),
})
const transformed = transformShareData(shareData)
},
handler: async (args) => {
await bootstrap(process.cwd(), async () => {
let exportData:
| {
info: SDKSession
messages: Array<{
info: Message
parts: Part[]
}>
}
| undefined
if (!transformed) {
process.stdout.write(`Share not found or empty: ${slug}`)
process.stdout.write(EOL)
return
}
const isUrl = args.file.startsWith("http://") || args.file.startsWith("https://")
exportData = transformed
} else {
exportData = yield* Effect.promise(() =>
Filesystem.readJson<NonNullable<typeof exportData>>(file).catch(() => undefined),
)
if (!exportData) {
process.stdout.write(`File not found: ${file}`)
process.stdout.write(EOL)
return
}
}
if (isUrl) {
const slug = parseShareUrl(args.file)
if (!slug) {
const baseUrl = await AppRuntime.runPromise(ShareNext.Service.use((svc) => svc.url()))
process.stdout.write(`Invalid URL format. Expected: ${baseUrl}/share/<slug>`)
process.stdout.write(EOL)
return
}
if (!exportData) {
process.stdout.write(`Failed to read session data`)
process.stdout.write(EOL)
return
}
const parsed = new URL(args.file)
const baseUrl = parsed.origin
const req = await AppRuntime.runPromise(ShareNext.Service.use((svc) => svc.request()))
const headers = shouldAttachShareAuthHeaders(args.file, req.baseUrl) ? req.headers : {}
const info = Schema.decodeUnknownSync(Session.Info)({
...exportData.info,
projectID,
}) as Session.Info
const row = Session.toRow(info)
Database.use((db) =>
db
.insert(SessionTable)
.values(row)
.onConflictDoUpdate({ target: SessionTable.id, set: { project_id: row.project_id } })
.run(),
)
for (const msg of exportData.messages) {
const msgInfo = decodeMessageInfo(msg.info) as MessageV2.Info
const { id, sessionID: _, ...msgData } = msgInfo
Database.use((db) =>
db
.insert(MessageTable)
.values({
id,
session_id: row.id,
time_created: msgInfo.time?.created ?? Date.now(),
data: msgData,
const dataPath = req.api.data(slug)
let response = await fetch(`${baseUrl}${dataPath}`, {
headers,
})
.onConflictDoNothing()
.run(),
)
for (const part of msg.parts) {
const partInfo = decodePart(part) as MessageV2.Part
const { id: partId, sessionID: _s, messageID, ...partData } = partInfo
if (!response.ok && dataPath !== `/api/share/${slug}/data`) {
response = await fetch(`${baseUrl}/api/share/${slug}/data`, {
headers,
})
}
if (!response.ok) {
process.stdout.write(`Failed to fetch share data: ${response.statusText}`)
process.stdout.write(EOL)
return
}
const shareData: ShareData[] = await response.json()
const transformed = transformShareData(shareData)
if (!transformed) {
process.stdout.write(`Share not found or empty: ${slug}`)
process.stdout.write(EOL)
return
}
exportData = transformed
} else {
exportData = await Filesystem.readJson<NonNullable<typeof exportData>>(args.file).catch(() => undefined)
if (!exportData) {
process.stdout.write(`File not found: ${args.file}`)
process.stdout.write(EOL)
return
}
}
if (!exportData) {
process.stdout.write(`Failed to read session data`)
process.stdout.write(EOL)
return
}
const info = Schema.decodeUnknownSync(Session.Info)({
...exportData.info,
projectID: Instance.project.id,
}) as Session.Info
const row = Session.toRow(info)
Database.use((db) =>
db
.insert(PartTable)
.values({
id: partId,
message_id: messageID,
session_id: row.id,
data: partData,
})
.onConflictDoNothing()
.insert(SessionTable)
.values(row)
.onConflictDoUpdate({ target: SessionTable.id, set: { project_id: row.project_id } })
.run(),
)
}
}
process.stdout.write(`Imported session: ${exportData.info.id}`)
process.stdout.write(EOL)
for (const msg of exportData.messages) {
const msgInfo = decodeMessageInfo(msg.info) as MessageV2.Info
const { id, sessionID: _, ...msgData } = msgInfo
Database.use((db) =>
db
.insert(MessageTable)
.values({
id,
session_id: row.id,
time_created: msgInfo.time?.created ?? Date.now(),
data: msgData,
})
.onConflictDoNothing()
.run(),
)
for (const part of msg.parts) {
const partInfo = decodePart(part) as MessageV2.Part
const { id: partId, sessionID: _s, messageID, ...partData } = partInfo
Database.use((db) =>
db
.insert(PartTable)
.values({
id: partId,
message_id: messageID,
session_id: row.id,
data: partData,
})
.onConflictDoNothing()
.run(),
)
}
}
process.stdout.write(`Imported session: ${exportData.info.id}`)
process.stdout.write(EOL)
})
},
})

View File

@@ -1,16 +1,16 @@
import { intro, log, outro, spinner } from "@clack/prompts"
import { Effect } from "effect"
import type { Argv } from "yargs"
import { ConfigPaths } from "@/config/paths"
import { Global } from "@opencode-ai/core/global"
import { installPlugin, patchPluginConfig, readPluginManifest } from "../../plugin/install"
import { resolvePluginTarget } from "../../plugin/shared"
import { Instance } from "../../project/instance"
import { errorMessage } from "../../util/error"
import { Filesystem } from "@/util/filesystem"
import { Process } from "@/util/process"
import { UI } from "../ui"
import { effectCmd } from "../effect-cmd"
import { InstanceRef } from "@/effect/instance-ref"
import { cmd } from "./cmd"
type Spin = {
start: (msg: string) => void
@@ -175,12 +175,12 @@ export function createPlugTask(input: PlugInput, dep: PlugDeps = defaultPlugDeps
}
}
export const PluginCommand = effectCmd({
export const PluginCommand = cmd({
command: "plugin <module>",
aliases: ["plug"],
describe: "install plugin and update config",
builder: (yargs) =>
yargs
builder: (yargs: Argv) => {
return yargs
.positional("module", {
type: "string",
describe: "npm module name",
@@ -196,8 +196,9 @@ export const PluginCommand = effectCmd({
type: "boolean",
default: false,
describe: "replace existing plugin version",
}),
handler: Effect.fn("Cli.plug")(function* (args) {
})
},
handler: async (args) => {
const mod = String(args.module ?? "").trim()
if (!mod) {
UI.error("module is required")
@@ -213,18 +214,20 @@ export const PluginCommand = effectCmd({
global: Boolean(args.global),
force: Boolean(args.force),
})
let ok = true
const ctx = yield* InstanceRef
if (!ctx) return
const ok = yield* Effect.promise(() =>
run({
vcs: ctx.project.vcs,
worktree: ctx.worktree,
directory: ctx.directory,
}),
)
await Instance.provide({
directory: process.cwd(),
fn: async () => {
ok = await run({
vcs: Instance.project.vcs,
worktree: Instance.worktree,
directory: Instance.directory,
})
},
})
outro("Done")
if (!ok) process.exitCode = 1
}),
},
})

View File

@@ -1,11 +1,11 @@
import { Effect } from "effect"
import { UI } from "../ui"
import { effectCmd, fail } from "../effect-cmd"
import { cmd } from "./cmd"
import { AppRuntime } from "@/effect/app-runtime"
import { Git } from "@/git"
import { InstanceRef } from "@/effect/instance-ref"
import { Instance } from "@/project/instance"
import { Process } from "@/util/process"
export const PrCommand = effectCmd({
export const PrCommand = cmd({
command: "pr <number>",
describe: "fetch and checkout a GitHub PR branch, then run opencode",
builder: (yargs) =>
@@ -14,102 +14,125 @@ export const PrCommand = effectCmd({
describe: "PR number to checkout",
demandOption: true,
}),
handler: Effect.fn("Cli.pr")(function* (args) {
const ctx = yield* InstanceRef
if (!ctx) return yield* fail("Could not load instance context")
if (ctx.project.vcs !== "git") {
return yield* fail("Could not find git repository. Please run this command from a git repository.")
}
const git = yield* Git.Service
const worktree = ctx.worktree
const prNumber = args.number
const localBranchName = `pr/${prNumber}`
UI.println(`Fetching and checking out PR #${prNumber}...`)
const checkout = yield* Effect.promise(() =>
Process.run(["gh", "pr", "checkout", `${prNumber}`, "--branch", localBranchName, "--force"], { nothrow: true }),
)
if (checkout.code !== 0) {
return yield* fail(`Failed to checkout PR #${prNumber}. Make sure you have gh CLI installed and authenticated.`)
}
const prInfoResult = yield* Effect.promise(() =>
Process.text(
[
"gh",
"pr",
"view",
`${prNumber}`,
"--json",
"headRepository,headRepositoryOwner,isCrossRepository,headRefName,body",
],
{ nothrow: true },
),
)
let sessionId: string | undefined
if (prInfoResult.code === 0 && prInfoResult.text.trim()) {
const prInfo = JSON.parse(prInfoResult.text)
if (prInfo?.isCrossRepository && prInfo.headRepository && prInfo.headRepositoryOwner) {
const forkOwner = prInfo.headRepositoryOwner.login
const forkName = prInfo.headRepository.name
const remoteName = forkOwner
const remotes = (yield* git.run(["remote"], { cwd: worktree })).text().trim()
if (!remotes.split("\n").includes(remoteName)) {
yield* git.run(["remote", "add", remoteName, `https://github.com/${forkOwner}/${forkName}.git`], {
cwd: worktree,
})
UI.println(`Added fork remote: ${remoteName}`)
async handler(args) {
await Instance.provide({
directory: process.cwd(),
async fn() {
const project = Instance.project
if (project.vcs !== "git") {
UI.error("Could not find git repository. Please run this command from a git repository.")
process.exit(1)
}
yield* git.run(["branch", `--set-upstream-to=${remoteName}/${prInfo.headRefName}`, localBranchName], {
cwd: worktree,
})
}
const prNumber = args.number
const localBranchName = `pr/${prNumber}`
UI.println(`Fetching and checking out PR #${prNumber}...`)
if (prInfo?.body) {
const sessionMatch = prInfo.body.match(/https:\/\/opncd\.ai\/s\/([a-zA-Z0-9_-]+)/)
if (sessionMatch) {
const sessionUrl = sessionMatch[0]
UI.println(`Found opencode session: ${sessionUrl}`)
UI.println(`Importing session...`)
// Use gh pr checkout with custom branch name
const result = await Process.run(
["gh", "pr", "checkout", `${prNumber}`, "--branch", localBranchName, "--force"],
{
nothrow: true,
},
)
const importResult = yield* Effect.promise(() =>
Process.text(["opencode", "import", sessionUrl], { nothrow: true }),
)
if (importResult.code === 0) {
const sessionIdMatch = importResult.text.trim().match(/Imported session: ([a-zA-Z0-9_-]+)/)
if (sessionIdMatch) {
sessionId = sessionIdMatch[1]
UI.println(`Session imported: ${sessionId}`)
if (result.code !== 0) {
UI.error(`Failed to checkout PR #${prNumber}. Make sure you have gh CLI installed and authenticated.`)
process.exit(1)
}
// Fetch PR info for fork handling and session link detection
const prInfoResult = await Process.text(
[
"gh",
"pr",
"view",
`${prNumber}`,
"--json",
"headRepository,headRepositoryOwner,isCrossRepository,headRefName,body",
],
{ nothrow: true },
)
let sessionId: string | undefined
if (prInfoResult.code === 0) {
const prInfoText = prInfoResult.text
if (prInfoText.trim()) {
const prInfo = JSON.parse(prInfoText)
// Handle fork PRs
if (prInfo && prInfo.isCrossRepository && prInfo.headRepository && prInfo.headRepositoryOwner) {
const forkOwner = prInfo.headRepositoryOwner.login
const forkName = prInfo.headRepository.name
const remoteName = forkOwner
// Check if remote already exists
const remotes = await AppRuntime.runPromise(
Git.Service.use((git) => git.run(["remote"], { cwd: Instance.worktree })),
).then((x) => x.text().trim())
if (!remotes.split("\n").includes(remoteName)) {
await AppRuntime.runPromise(
Git.Service.use((git) =>
git.run(["remote", "add", remoteName, `https://github.com/${forkOwner}/${forkName}.git`], {
cwd: Instance.worktree,
}),
),
)
UI.println(`Added fork remote: ${remoteName}`)
}
// Set upstream to the fork so pushes go there
const headRefName = prInfo.headRefName
await AppRuntime.runPromise(
Git.Service.use((git) =>
git.run(["branch", `--set-upstream-to=${remoteName}/${headRefName}`, localBranchName], {
cwd: Instance.worktree,
}),
),
)
}
// Check for opencode session link in PR body
if (prInfo && prInfo.body) {
const sessionMatch = prInfo.body.match(/https:\/\/opncd\.ai\/s\/([a-zA-Z0-9_-]+)/)
if (sessionMatch) {
const sessionUrl = sessionMatch[0]
UI.println(`Found opencode session: ${sessionUrl}`)
UI.println(`Importing session...`)
const importResult = await Process.text(["opencode", "import", sessionUrl], {
nothrow: true,
})
if (importResult.code === 0) {
const importOutput = importResult.text.trim()
// Extract session ID from the output (format: "Imported session: <session-id>")
const sessionIdMatch = importOutput.match(/Imported session: ([a-zA-Z0-9_-]+)/)
if (sessionIdMatch) {
sessionId = sessionIdMatch[1]
UI.println(`Session imported: ${sessionId}`)
}
}
}
}
}
}
}
}
UI.println(`Successfully checked out PR #${prNumber} as branch '${localBranchName}'`)
UI.println()
UI.println("Starting opencode...")
UI.println()
UI.println(`Successfully checked out PR #${prNumber} as branch '${localBranchName}'`)
UI.println()
UI.println("Starting opencode...")
UI.println()
const opencodeArgs = sessionId ? ["-s", sessionId] : []
const code = yield* Effect.promise(
() =>
Process.spawn(["opencode", ...opencodeArgs], {
const opencodeArgs = sessionId ? ["-s", sessionId] : []
const opencodeProcess = Process.spawn(["opencode", ...opencodeArgs], {
stdin: "inherit",
stdout: "inherit",
stderr: "inherit",
cwd: process.cwd(),
}).exited,
)
// Match legacy throw semantics — propagate as a defect so the top-level
// index.ts catch handles it identically (exit 1, "Unexpected error" banner).
if (code !== 0) return yield* Effect.die(new Error(`opencode exited with code ${code}`))
}),
})
const code = await opencodeProcess.exited
if (code !== 0) throw new Error(`opencode exited with code ${code}`)
},
})
},
})

View File

@@ -1,9 +1,8 @@
import type { Argv } from "yargs"
import { Effect } from "effect"
import { cmd } from "./cmd"
import { effectCmd, fail } from "../effect-cmd"
import { Session } from "@/session/session"
import { SessionID } from "../../session/schema"
import { bootstrap } from "../bootstrap"
import { UI } from "../ui"
import { Locale } from "@/util/locale"
import { Flag } from "@opencode-ai/core/flag/flag"
@@ -12,8 +11,7 @@ import { Process } from "@/util/process"
import { EOL } from "os"
import path from "path"
import { which } from "../../util/which"
import { InstanceRef } from "@/effect/instance-ref"
import { InstanceStore } from "@/project/instance-store"
import { AppRuntime } from "@/effect/app-runtime"
function pagerCmd(): string[] {
const lessOptions = ["-R", "-S"]
@@ -49,35 +47,36 @@ export const SessionCommand = cmd({
async handler() {},
})
export const SessionDeleteCommand = effectCmd({
export const SessionDeleteCommand = cmd({
command: "delete <sessionID>",
describe: "delete a session",
builder: (yargs) =>
yargs.positional("sessionID", {
builder: (yargs: Argv) => {
return yargs.positional("sessionID", {
describe: "session ID to delete",
type: "string",
demandOption: true,
}),
handler: Effect.fn("Cli.session.delete")(function* (args) {
const ctx = yield* InstanceRef
if (!ctx) return
const store = yield* InstanceStore.Service
return yield* Effect.gen(function* () {
const svc = yield* Session.Service
})
},
handler: async (args) => {
await bootstrap(process.cwd(), async () => {
const sessionID = SessionID.make(args.sessionID)
// Match legacy try/catch — Session.get surfaces NotFoundError as a defect.
yield* svc.get(sessionID).pipe(Effect.catchCause(() => fail(`Session not found: ${args.sessionID}`)))
yield* svc.remove(sessionID)
try {
await AppRuntime.runPromise(Session.Service.use((svc) => svc.get(sessionID)))
} catch {
UI.error(`Session not found: ${args.sessionID}`)
process.exit(1)
}
await AppRuntime.runPromise(Session.Service.use((svc) => svc.remove(sessionID)))
UI.println(UI.Style.TEXT_SUCCESS_BOLD + `Session ${args.sessionID} deleted` + UI.Style.TEXT_NORMAL)
}).pipe(Effect.ensuring(store.dispose(ctx)))
}),
})
},
})
export const SessionListCommand = effectCmd({
export const SessionListCommand = cmd({
command: "list",
describe: "list sessions",
builder: (yargs) =>
yargs
builder: (yargs: Argv) => {
return yargs
.option("max-count", {
alias: "n",
describe: "limit to N most recent sessions",
@@ -88,42 +87,47 @@ export const SessionListCommand = effectCmd({
type: "string",
choices: ["table", "json"],
default: "table",
}),
handler: Effect.fn("Cli.session.list")(function* (args) {
const ctx = yield* InstanceRef
if (!ctx) return
const store = yield* InstanceStore.Service
return yield* Effect.gen(function* () {
const sessions = yield* Session.Service.use((svc) => svc.list({ roots: true, limit: args.maxCount }))
})
},
handler: async (args) => {
await bootstrap(process.cwd(), async () => {
const sessions = await AppRuntime.runPromise(
Session.Service.use((svc) => svc.list({ roots: true, limit: args.maxCount })),
)
if (sessions.length === 0) return
if (sessions.length === 0) {
return
}
const output = args.format === "json" ? formatSessionJSON(sessions) : formatSessionTable(sessions)
let output: string
if (args.format === "json") {
output = formatSessionJSON(sessions)
} else {
output = formatSessionTable(sessions)
}
const shouldPaginate = process.stdout.isTTY && !args.maxCount && args.format === "table"
if (shouldPaginate) {
yield* Effect.promise(async () => {
const proc = Process.spawn(pagerCmd(), {
stdin: "pipe",
stdout: "inherit",
stderr: "inherit",
})
if (!proc.stdin) {
console.log(output)
return
}
proc.stdin.write(output)
proc.stdin.end()
await proc.exited
const proc = Process.spawn(pagerCmd(), {
stdin: "pipe",
stdout: "inherit",
stderr: "inherit",
})
if (!proc.stdin) {
console.log(output)
return
}
proc.stdin.write(output)
proc.stdin.end()
await proc.exited
} else {
console.log(output)
}
}).pipe(Effect.ensuring(store.dispose(ctx)))
}),
})
},
})
function formatSessionTable(sessions: Session.Info[]): string {

View File

@@ -1,11 +1,11 @@
import { Effect } from "effect"
import { effectCmd } from "../effect-cmd"
import type { Argv } from "yargs"
import { cmd } from "./cmd"
import { Session } from "@/session/session"
import { bootstrap } from "../bootstrap"
import { Database } from "@/storage/db"
import { SessionTable } from "../../session/session.sql"
import { Project } from "@/project/project"
import { InstanceRef } from "@/effect/instance-ref"
import { InstanceStore } from "@/project/instance-store"
import { Instance } from "../../project/instance"
import { AppRuntime } from "@/effect/app-runtime"
interface SessionStats {
@@ -47,11 +47,11 @@ interface SessionStats {
medianTokensPerSession: number
}
export const StatsCommand = effectCmd({
export const StatsCommand = cmd({
command: "stats",
describe: "show token usage and cost statistics",
builder: (yargs) =>
yargs
builder: (yargs: Argv) => {
return yargs
.option("days", {
describe: "show stats for the last N days (default: all time)",
type: "number",
@@ -66,42 +66,34 @@ export const StatsCommand = effectCmd({
.option("project", {
describe: "filter by project (default: all projects, empty string: current project)",
type: "string",
}),
handler: Effect.fn("Cli.stats")(function* (args) {
const ctx = yield* InstanceRef
if (!ctx) return
const store = yield* InstanceStore.Service
return yield* run(args, ctx.project).pipe(Effect.ensuring(store.dispose(ctx)))
}),
})
},
handler: async (args) => {
await bootstrap(process.cwd(), async () => {
const stats = await aggregateSessionStats(args.days, args.project)
let modelLimit: number | undefined
if (args.models === true) {
modelLimit = Infinity
} else if (typeof args.models === "number") {
modelLimit = args.models
}
displayStats(stats, args.tools, modelLimit)
})
},
})
const run = (
args: { days?: number; tools?: number; models?: unknown; project?: string },
currentProject: Project.Info,
) =>
Effect.promise(async () => {
const stats = await aggregateSessionStats(args.days, args.project, currentProject)
let modelLimit: number | undefined
if (args.models === true) {
modelLimit = Infinity
} else if (typeof args.models === "number") {
modelLimit = args.models
}
displayStats(stats, args.tools, modelLimit)
})
async function getCurrentProject(): Promise<Project.Info> {
return Instance.project
}
async function getAllSessions(): Promise<Session.Info[]> {
const rows = Database.use((db) => db.select().from(SessionTable).all())
return rows.map((row) => Session.fromRow(row))
}
export async function aggregateSessionStats(
days?: number,
projectFilter?: string,
currentProject?: Project.Info,
): Promise<SessionStats> {
export async function aggregateSessionStats(days?: number, projectFilter?: string): Promise<SessionStats> {
const sessions = await getAllSessions()
const MS_IN_DAY = 24 * 60 * 60 * 1000
@@ -125,7 +117,7 @@ export async function aggregateSessionStats(
if (projectFilter !== undefined) {
if (projectFilter === "") {
if (!currentProject) throw new Error("currentProject required when projectFilter is empty string")
const currentProject = await getCurrentProject()
filteredSessions = filteredSessions.filter((session) => session.projectID === currentProject.id)
} else {
filteredSessions = filteredSessions.filter((session) => session.projectID === projectFilter)

View File

@@ -28,7 +28,6 @@ import { useEvent } from "@tui/context/event"
import { SDKProvider, useSDK } from "@tui/context/sdk"
import { StartupLoading } from "@tui/component/startup-loading"
import { SyncProvider, useSync } from "@tui/context/sync"
import { SyncProviderV2 } from "@tui/context/sync-v2"
import { LocalProvider, useLocal } from "@tui/context/local"
import { DialogModel } from "@tui/component/dialog-model"
import { useConnected } from "@tui/component/use-connected"
@@ -169,29 +168,27 @@ export function tui(input: {
>
<ProjectProvider>
<SyncProvider>
<SyncProviderV2>
<ThemeProvider mode={mode}>
<LocalProvider>
<KeybindProvider>
<PromptStashProvider>
<DialogProvider>
<CommandProvider>
<FrecencyProvider>
<PromptHistoryProvider>
<PromptRefProvider>
<EditorContextProvider>
<App onSnapshot={input.onSnapshot} />
</EditorContextProvider>
</PromptRefProvider>
</PromptHistoryProvider>
</FrecencyProvider>
</CommandProvider>
</DialogProvider>
</PromptStashProvider>
</KeybindProvider>
</LocalProvider>
</ThemeProvider>
</SyncProviderV2>
<ThemeProvider mode={mode}>
<LocalProvider>
<KeybindProvider>
<PromptStashProvider>
<DialogProvider>
<CommandProvider>
<FrecencyProvider>
<PromptHistoryProvider>
<PromptRefProvider>
<EditorContextProvider>
<App onSnapshot={input.onSnapshot} />
</EditorContextProvider>
</PromptRefProvider>
</PromptHistoryProvider>
</FrecencyProvider>
</CommandProvider>
</DialogProvider>
</PromptStashProvider>
</KeybindProvider>
</LocalProvider>
</ThemeProvider>
</SyncProvider>
</ProjectProvider>
</SDKProvider>

View File

@@ -750,18 +750,9 @@ export function Prompt(props: PromptProps) {
return false
}
const variant = local.model.variant.current()
let sessionID = props.sessionID
if (sessionID == null) {
const res = await sdk.client.session.create({
workspace: props.workspaceID,
agent: agent.name,
model: {
providerID: selectedModel.providerID,
id: selectedModel.modelID,
variant,
},
})
const res = await sdk.client.session.create({ workspace: props.workspaceID })
if (res.error) {
console.log("Creating a session failed:", res.error)
@@ -801,6 +792,7 @@ export function Prompt(props: PromptProps) {
// Capture mode before it gets reset
const currentMode = store.mode
const variant = local.model.variant.current()
const editorSelection = editorContext()
const currentEditorSelectionKey = editorSelectionKey(editorSelection)
const editorParts =

View File

@@ -1,298 +0,0 @@
import { useEvent } from "@tui/context/event"
import type {
SessionMessage,
SessionMessageAssistant,
SessionMessageAssistantReasoning,
SessionMessageAssistantText,
SessionMessageAssistantTool,
} from "@opencode-ai/sdk/v2"
import { createStore, produce, reconcile } from "solid-js/store"
import { createSimpleContext } from "./helper"
import { useSDK } from "./sdk"
function activeAssistant(messages: SessionMessage[]) {
const index = messages.findLastIndex((message) => message.type === "assistant" && !message.time.completed)
if (index < 0) return
const assistant = messages[index]
return assistant?.type === "assistant" ? assistant : undefined
}
function activeCompaction(messages: SessionMessage[]) {
const index = messages.findLastIndex((message) => message.type === "compaction")
if (index < 0) return
const compaction = messages[index]
return compaction?.type === "compaction" ? compaction : undefined
}
function activeShell(messages: SessionMessage[], callID: string) {
const index = messages.findLastIndex((message) => message.type === "shell" && message.callID === callID)
if (index < 0) return
const shell = messages[index]
return shell?.type === "shell" ? shell : undefined
}
function latestTool(assistant: SessionMessageAssistant | undefined, callID?: string) {
return assistant?.content.findLast(
(item): item is SessionMessageAssistantTool => item.type === "tool" && (callID === undefined || item.id === callID),
)
}
function latestText(assistant: SessionMessageAssistant | undefined) {
return assistant?.content.findLast((item): item is SessionMessageAssistantText => item.type === "text")
}
function latestReasoning(assistant: SessionMessageAssistant | undefined, reasoningID: string) {
return assistant?.content.findLast(
(item): item is SessionMessageAssistantReasoning => item.type === "reasoning" && item.id === reasoningID,
)
}
export const { use: useSyncV2, provider: SyncProviderV2 } = createSimpleContext({
name: "SyncV2",
init: () => {
const [store, setStore] = createStore<{
messages: {
[sessionID: string]: SessionMessage[]
}
}>({
messages: {},
})
const event = useEvent()
const sdk = useSDK()
function update(sessionID: string, fn: (messages: SessionMessage[]) => void) {
setStore(
"messages",
produce((draft) => {
fn((draft[sessionID] ??= []))
}),
)
}
event.subscribe((event) => {
switch (event.type) {
case "session.next.prompted": {
update(event.properties.sessionID, (draft) => {
draft.push({
id: event.id,
type: "user",
text: event.properties.prompt.text,
files: event.properties.prompt.files,
agents: event.properties.prompt.agents,
time: { created: event.properties.timestamp },
})
})
break
}
case "session.next.synthetic":
update(event.properties.sessionID, (draft) => {
draft.push({
id: event.id,
type: "synthetic",
sessionID: event.properties.sessionID,
text: event.properties.text,
time: { created: event.properties.timestamp },
})
})
break
case "session.next.shell.started":
update(event.properties.sessionID, (draft) => {
draft.push({
id: event.id,
type: "shell",
callID: event.properties.callID,
command: event.properties.command,
output: "",
time: { created: event.properties.timestamp },
})
})
break
case "session.next.shell.ended":
update(event.properties.sessionID, (draft) => {
const match = activeShell(draft, event.properties.callID)
if (!match) return
match.output = event.properties.output
match.time.completed = event.properties.timestamp
})
break
case "session.next.step.started":
update(event.properties.sessionID, (draft) => {
const currentAssistant = activeAssistant(draft)
if (currentAssistant) currentAssistant.time.completed = event.properties.timestamp
draft.push({
id: event.id,
type: "assistant",
agent: event.properties.agent,
model: event.properties.model,
content: [],
snapshot: event.properties.snapshot ? { start: event.properties.snapshot } : undefined,
time: { created: event.properties.timestamp },
})
})
break
case "session.next.step.ended":
update(event.properties.sessionID, (draft) => {
const currentAssistant = activeAssistant(draft)
if (!currentAssistant) return
currentAssistant.time.completed = event.properties.timestamp
currentAssistant.finish = event.properties.finish
currentAssistant.cost = event.properties.cost
currentAssistant.tokens = event.properties.tokens
if (event.properties.snapshot)
currentAssistant.snapshot = { ...currentAssistant.snapshot, end: event.properties.snapshot }
})
break
case "session.next.text.started":
update(event.properties.sessionID, (draft) => {
activeAssistant(draft)?.content.push({ type: "text", text: "" })
})
break
case "session.next.text.delta":
update(event.properties.sessionID, (draft) => {
const match = latestText(activeAssistant(draft))
if (match) match.text += event.properties.delta
})
break
case "session.next.text.ended":
update(event.properties.sessionID, (draft) => {
const match = latestText(activeAssistant(draft))
if (match) match.text = event.properties.text
})
break
case "session.next.tool.input.started":
update(event.properties.sessionID, (draft) => {
activeAssistant(draft)?.content.push({
type: "tool",
id: event.properties.callID,
name: event.properties.name,
time: { created: event.properties.timestamp },
state: { status: "pending", input: "" },
})
})
break
case "session.next.tool.input.delta":
update(event.properties.sessionID, (draft) => {
const match = latestTool(activeAssistant(draft), event.properties.callID)
if (match?.state.status === "pending") match.state.input += event.properties.delta
})
break
case "session.next.tool.input.ended":
break
case "session.next.tool.called":
update(event.properties.sessionID, (draft) => {
const match = latestTool(activeAssistant(draft), event.properties.callID)
if (!match) return
match.time.ran = event.properties.timestamp
match.provider = event.properties.provider
match.state = { status: "running", input: event.properties.input, structured: {}, content: [] }
})
break
case "session.next.tool.progress":
update(event.properties.sessionID, (draft) => {
const match = latestTool(activeAssistant(draft), event.properties.callID)
if (match?.state.status !== "running") return
match.state.structured = event.properties.structured
match.state.content = [...event.properties.content]
})
break
case "session.next.tool.success":
update(event.properties.sessionID, (draft) => {
const match = latestTool(activeAssistant(draft), event.properties.callID)
if (match?.state.status !== "running") return
match.state = {
status: "completed",
input: match.state.input,
structured: event.properties.structured,
content: [...event.properties.content],
}
match.provider = event.properties.provider
match.time.completed = event.properties.timestamp
})
break
case "session.next.tool.error":
update(event.properties.sessionID, (draft) => {
const match = latestTool(activeAssistant(draft), event.properties.callID)
if (match?.state.status !== "running") return
match.state = {
status: "error",
error: event.properties.error,
input: match.state.input,
structured: match.state.structured,
content: match.state.content,
}
match.provider = event.properties.provider
match.time.completed = event.properties.timestamp
})
break
case "session.next.reasoning.started":
update(event.properties.sessionID, (draft) => {
activeAssistant(draft)?.content.push({
type: "reasoning",
id: event.properties.reasoningID,
text: "",
})
})
break
case "session.next.reasoning.delta":
update(event.properties.sessionID, (draft) => {
const match = latestReasoning(activeAssistant(draft), event.properties.reasoningID)
if (match) match.text += event.properties.delta
})
break
case "session.next.reasoning.ended":
update(event.properties.sessionID, (draft) => {
const match = latestReasoning(activeAssistant(draft), event.properties.reasoningID)
if (match) match.text = event.properties.text
})
break
case "session.next.retried":
break
case "session.next.compaction.started":
update(event.properties.sessionID, (draft) => {
draft.push({
id: event.id,
type: "compaction",
reason: event.properties.reason,
summary: "",
time: { created: event.properties.timestamp },
})
})
break
case "session.next.compaction.delta":
update(event.properties.sessionID, (draft) => {
const match = activeCompaction(draft)
if (match) match.summary += event.properties.text
})
break
case "session.next.compaction.ended":
update(event.properties.sessionID, (draft) => {
const match = activeCompaction(draft)
if (!match) return
match.summary = event.properties.text
match.include = event.properties.include
})
break
}
})
const result = {
data: store,
session: {
message: {
async sync(sessionID: string) {
const response = await sdk.client.v2.session.messages({ sessionID })
setStore("messages", sessionID, reconcile(response.data?.items ?? []))
},
fromSession(sessionID: string) {
const messages = store.messages[sessionID]
if (!messages) return []
return messages
},
},
},
}
return result
},
})

View File

@@ -7,7 +7,6 @@ import SidebarTodo from "../feature-plugins/sidebar/todo"
import SidebarFiles from "../feature-plugins/sidebar/files"
import SidebarFooter from "../feature-plugins/sidebar/footer"
import PluginManager from "../feature-plugins/system/plugins"
import SessionV2Debug from "../feature-plugins/system/session-v2"
import type { TuiPlugin, TuiPluginModule } from "@opencode-ai/plugin/tui"
export type InternalTuiPlugin = TuiPluginModule & {
@@ -25,5 +24,4 @@ export const INTERNAL_TUI_PLUGINS: InternalTuiPlugin[] = [
SidebarFiles,
SidebarFooter,
PluginManager,
SessionV2Debug,
]

View File

@@ -31,7 +31,6 @@ export const fail = (message: string, exitCode = 1) => Effect.fail(new CliError(
*/
export const effectCmd = <Args, A>(opts: {
command: string | readonly string[]
aliases?: string | readonly string[]
describe: string | false
builder?: (yargs: Argv) => Argv<Args>
/** Defaults to process.cwd(). Override for commands that take a directory positional. */
@@ -40,7 +39,6 @@ export const effectCmd = <Args, A>(opts: {
}) =>
cmd<{}, Args>({
command: opts.command,
aliases: opts.aliases,
describe: opts.describe,
builder: opts.builder as never,
async handler(rawArgs) {

View File

@@ -6,7 +6,6 @@ import z from "zod"
import { BusEvent } from "@/bus/bus-event"
import { SyncEvent } from "@/sync"
import { GlobalBus } from "@/bus/global"
import { Bus } from "@/bus"
import { AppRuntime } from "@/effect/app-runtime"
import { AsyncQueue } from "@/util/queue"
import { InstanceStore } from "../../project/instance-store"
@@ -29,7 +28,6 @@ async function streamEvents(c: Context, subscribe: (q: AsyncQueue<string | null>
q.push(
JSON.stringify({
payload: {
id: Bus.createID(),
type: "server.connected",
properties: {},
},
@@ -41,7 +39,6 @@ async function streamEvents(c: Context, subscribe: (q: AsyncQueue<string | null>
q.push(
JSON.stringify({
payload: {
id: Bus.createID(),
type: "server.heartbeat",
properties: {},
},

View File

@@ -42,7 +42,6 @@ export const EventRoutes = () =>
q.push(
JSON.stringify({
id: Bus.createID(),
type: "server.connected",
properties: {},
}),
@@ -51,10 +50,9 @@ export const EventRoutes = () =>
// Send heartbeat every 10s to prevent stalled proxy streams.
const heartbeat = setInterval(() => {
q.push(
JSON.stringify({
id: Bus.createID(),
type: "server.heartbeat",
properties: {},
JSON.stringify({
type: "server.heartbeat",
properties: {},
}),
)
}, 10_000)

View File

@@ -19,7 +19,6 @@ import { SessionApi } from "./groups/session"
import { SyncApi } from "./groups/sync"
import { TuiApi } from "./groups/tui"
import { WorkspaceApi } from "./groups/workspace"
import { V2Api } from "./groups/v2"
// SSE event schemas built from the same BusEvent/SyncEvent registries that
// the Hono spec uses, so both specs emit identical Event/SyncEvent components.
@@ -41,7 +40,6 @@ export const InstanceHttpApi = HttpApi.make("opencode-instance")
.addHttpApi(ProviderApi)
.addHttpApi(SessionApi)
.addHttpApi(SyncApi)
.addHttpApi(V2Api)
.addHttpApi(TuiApi)
.addHttpApi(WorkspaceApi)

View File

@@ -41,12 +41,12 @@ function eventResponse(bus: Bus.Interface) {
const events = bus.subscribeAll().pipe(Stream.takeUntil((event) => event.type === Bus.InstanceDisposed.type))
const heartbeat = Stream.tick("10 seconds").pipe(
Stream.drop(1),
Stream.map(() => ({ id: Bus.createID(), type: "server.heartbeat", properties: {} })),
Stream.map(() => ({ type: "server.heartbeat", properties: {} })),
)
log.info("event connected")
return HttpServerResponse.stream(
Stream.make({ id: Bus.createID(), type: "server.connected", properties: {} }).pipe(
Stream.make({ type: "server.connected", properties: {} }).pipe(
Stream.concat(events.pipe(Stream.merge(heartbeat, { haltStrategy: "left" }))),
Stream.map(eventData),
Stream.pipeThroughChannel(Sse.encode()),

View File

@@ -1,14 +0,0 @@
import { HttpApi, OpenApi } from "effect/unstable/httpapi"
import { MessageGroup } from "./v2/message"
import { SessionGroup } from "./v2/session"
export const V2Api = HttpApi.make("v2")
.add(SessionGroup)
.add(MessageGroup)
.annotateMerge(
OpenApi.annotations({
title: "opencode experimental HttpApi",
version: "0.0.1",
description: "Experimental HttpApi surface for selected instance routes.",
}),
)

View File

@@ -1,69 +0,0 @@
import { SessionID } from "@/session/schema"
import { SessionMessage } from "@/v2/session-message"
import { Schema } from "effect"
import { HttpApiEndpoint, HttpApiError, HttpApiGroup, OpenApi } from "effect/unstable/httpapi"
import { Authorization } from "../../middleware/authorization"
export const MessageGroup = HttpApiGroup.make("v2.message")
.add(
HttpApiEndpoint.get("messages", "/api/session/:sessionID/message", {
params: { sessionID: SessionID },
query: Schema.Union([
Schema.Struct({
limit: Schema.optional(
Schema.NumberFromString.check(
Schema.isInt(),
Schema.isGreaterThanOrEqualTo(1),
Schema.isLessThanOrEqualTo(200),
),
).annotate({
description:
"Maximum number of messages to return. When omitted, the endpoint returns its default page size.",
}),
order: Schema.optional(Schema.Union([Schema.Literal("asc"), Schema.Literal("desc")])).annotate({
description: "Message order for the first page. Use desc for newest first or asc for oldest first.",
}),
cursor: Schema.optional(Schema.Never),
}),
Schema.Struct({
limit: Schema.optional(
Schema.NumberFromString.check(
Schema.isInt(),
Schema.isGreaterThanOrEqualTo(1),
Schema.isLessThanOrEqualTo(200),
),
).annotate({
description:
"Maximum number of messages to return. When omitted, the endpoint returns its default page size.",
}),
cursor: Schema.String.annotate({
description:
"Opaque pagination cursor returned as cursor.previous or cursor.next in the previous response. Do not combine with order.",
}),
order: Schema.optional(Schema.Never),
}),
]).annotate({ identifier: "V2SessionMessagesQuery" }),
success: Schema.Struct({
items: Schema.Array(SessionMessage.Message),
cursor: Schema.Struct({
previous: Schema.String.pipe(Schema.optional),
next: Schema.String.pipe(Schema.optional),
}),
}).annotate({ identifier: "V2SessionMessagesResponse" }),
error: HttpApiError.BadRequest,
}).annotateMerge(
OpenApi.annotations({
identifier: "v2.session.messages",
summary: "Get v2 session messages",
description:
"Retrieve projected v2 messages for a session. Items keep the requested order across pages; use cursor.next or cursor.previous to move through the ordered timeline.",
}),
),
)
.annotateMerge(
OpenApi.annotations({
title: "v2 messages",
description: "Experimental v2 message routes.",
}),
)
.middleware(Authorization)

View File

@@ -1,128 +0,0 @@
import { WorkspaceID } from "@/control-plane/schema"
import { SessionID } from "@/session/schema"
import { SessionMessage } from "@/v2/session-message"
import { Prompt } from "@/v2/session-prompt"
import { SessionV2 } from "@/v2/session"
import { Schema, SchemaGetter } from "effect"
import { HttpApiEndpoint, HttpApiError, HttpApiGroup, HttpApiSchema, OpenApi } from "effect/unstable/httpapi"
import { Authorization } from "../../middleware/authorization"
export const SessionGroup = HttpApiGroup.make("v2.session")
.add(
HttpApiEndpoint.get("sessions", "/api/session", {
query: Schema.Union([
Schema.Struct({
limit: Schema.optional(
Schema.NumberFromString.check(
Schema.isInt(),
Schema.isGreaterThanOrEqualTo(1),
Schema.isLessThanOrEqualTo(200),
),
).annotate({
description: "Maximum number of sessions to return. Defaults to the newest 50 sessions.",
}),
order: Schema.optional(Schema.Union([Schema.Literal("asc"), Schema.Literal("desc")])).annotate({
description: "Session order for the first page. Use desc for newest first or asc for oldest first.",
}),
directory: Schema.String.pipe(Schema.optional),
path: Schema.String.pipe(Schema.optional),
workspace: WorkspaceID.pipe(Schema.optional),
roots: Schema.Literals(["true", "false"])
.pipe(
Schema.decodeTo(Schema.Boolean, {
decode: SchemaGetter.transform((value) => value === "true"),
encode: SchemaGetter.transform((value) => (value ? "true" : "false")),
}),
)
.pipe(Schema.optional),
start: Schema.NumberFromString.pipe(Schema.optional),
search: Schema.String.pipe(Schema.optional),
cursor: Schema.optional(Schema.Never),
}),
Schema.Struct({
limit: Schema.optional(
Schema.NumberFromString.check(
Schema.isInt(),
Schema.isGreaterThanOrEqualTo(1),
Schema.isLessThanOrEqualTo(200),
),
).annotate({
description: "Maximum number of sessions to return. Defaults to the newest 50 sessions.",
}),
cursor: Schema.String.annotate({
description:
"Opaque pagination cursor returned as cursor.previous or cursor.next in the previous response. Do not combine with order.",
}),
order: Schema.optional(Schema.Never),
directory: Schema.optional(Schema.Never),
path: Schema.optional(Schema.Never),
workspace: Schema.optional(Schema.Never),
roots: Schema.optional(Schema.Never),
start: Schema.optional(Schema.Never),
search: Schema.optional(Schema.Never),
}),
]).annotate({ identifier: "V2SessionsQuery" }),
success: Schema.Struct({
items: Schema.Array(SessionV2.Info),
cursor: Schema.Struct({
previous: Schema.String.pipe(Schema.optional),
next: Schema.String.pipe(Schema.optional),
}),
}).annotate({ identifier: "V2SessionsResponse" }),
error: HttpApiError.BadRequest,
}).annotateMerge(
OpenApi.annotations({
identifier: "v2.session.list",
summary: "List v2 sessions",
description:
"Retrieve sessions in the requested order. Items keep that order across pages; use cursor.next or cursor.previous to move through the ordered list.",
}),
),
)
.add(
HttpApiEndpoint.post("prompt", "/api/session/:sessionID/prompt", {
params: { sessionID: SessionID },
payload: Schema.Struct({
prompt: Prompt,
delivery: SessionV2.Delivery.pipe(Schema.optional),
}),
success: SessionMessage.Message,
}).annotateMerge(
OpenApi.annotations({
identifier: "v2.session.prompt",
summary: "Send v2 message",
description: "Create a v2 session message and queue it for the agent loop.",
}),
),
)
.add(
HttpApiEndpoint.post("compact", "/api/session/:sessionID/compact", {
params: { sessionID: SessionID },
success: HttpApiSchema.NoContent,
}).annotateMerge(
OpenApi.annotations({
identifier: "v2.session.compact",
summary: "Compact v2 session",
description: "Compact a v2 session conversation.",
}),
),
)
.add(
HttpApiEndpoint.post("wait", "/api/session/:sessionID/wait", {
params: { sessionID: SessionID },
success: HttpApiSchema.NoContent,
}).annotateMerge(
OpenApi.annotations({
identifier: "v2.session.wait",
summary: "Wait for v2 session",
description: "Wait for a v2 session agent loop to become idle.",
}),
),
)
.annotateMerge(
OpenApi.annotations({
title: "v2",
description: "Experimental v2 routes.",
}),
)
.middleware(Authorization)

View File

@@ -1,6 +1,5 @@
import { Config } from "@/config/config"
import { GlobalBus, type GlobalEvent as GlobalBusEvent } from "@/bus/global"
import { Bus } from "@/bus"
import { Installation } from "@/installation"
import { InstanceStore } from "@/project/instance-store"
import { InstallationVersion } from "@opencode-ai/core/installation/version"
@@ -43,11 +42,11 @@ function eventResponse() {
})
const heartbeat = Stream.tick("10 seconds").pipe(
Stream.drop(1),
Stream.map(() => ({ payload: { id: Bus.createID(), type: "server.heartbeat", properties: {} } })),
Stream.map(() => ({ payload: { type: "server.heartbeat", properties: {} } })),
)
return HttpServerResponse.stream(
Stream.make({ payload: { id: Bus.createID(), type: "server.connected", properties: {} } }).pipe(
Stream.make({ payload: { type: "server.connected", properties: {} } }).pipe(
Stream.concat(events.pipe(Stream.merge(heartbeat, { haltStrategy: "left" }))),
Stream.map(eventData),
Stream.pipeThroughChannel(Sse.encode()),

View File

@@ -1,6 +0,0 @@
import { SessionV2 } from "@/v2/session"
import { Layer } from "effect"
import { messageHandlers } from "./v2/message"
import { sessionHandlers } from "./v2/session"
export const v2Handlers = Layer.mergeAll(sessionHandlers, messageHandlers).pipe(Layer.provide(SessionV2.defaultLayer))

View File

@@ -1,60 +0,0 @@
import { SessionMessage } from "@/v2/session-message"
import { SessionV2 } from "@/v2/session"
import { Effect, Schema } from "effect"
import * as DateTime from "effect/DateTime"
import { HttpApiBuilder, HttpApiError } from "effect/unstable/httpapi"
import { InstanceHttpApi } from "../../api"
const DefaultMessagesLimit = 50
const Cursor = Schema.Struct({
id: SessionMessage.ID,
time: Schema.Number,
order: Schema.Union([Schema.Literal("asc"), Schema.Literal("desc")]),
direction: Schema.Union([Schema.Literal("previous"), Schema.Literal("next")]),
})
const decodeCursor = Schema.decodeUnknownSync(Cursor)
const cursor = {
encode(message: SessionMessage.Message, order: "asc" | "desc", direction: "previous" | "next") {
return Buffer.from(
JSON.stringify({ id: message.id, time: DateTime.toEpochMillis(message.time.created), order, direction }),
).toString("base64url")
},
decode(input: string) {
return decodeCursor(JSON.parse(Buffer.from(input, "base64url").toString("utf8")))
},
}
export const messageHandlers = HttpApiBuilder.group(InstanceHttpApi, "v2.message", (handlers) =>
Effect.gen(function* () {
const session = yield* SessionV2.Service
return handlers.handle(
"messages",
Effect.fn(function* (ctx) {
const decoded = yield* Effect.try({
try: () => (ctx.query.cursor ? cursor.decode(ctx.query.cursor) : undefined),
catch: () => new HttpApiError.BadRequest({}),
})
const order = decoded?.order ?? ctx.query.order ?? "desc"
const messages = yield* session.messages({
sessionID: ctx.params.sessionID,
limit: ctx.query.limit ?? DefaultMessagesLimit,
order,
cursor: decoded ? { id: decoded.id, time: decoded.time, direction: decoded.direction } : undefined,
})
const first = messages[0]
const last = messages.at(-1)
return {
items: messages,
cursor: {
previous: first ? cursor.encode(first, order, "previous") : undefined,
next: last ? cursor.encode(last, order, "next") : undefined,
},
}
}),
)
}),
)

View File

@@ -1,109 +0,0 @@
import { WorkspaceID } from "@/control-plane/schema"
import { SessionV2 } from "@/v2/session"
import { Effect, Schema } from "effect"
import { HttpApiBuilder, HttpApiError, HttpApiSchema } from "effect/unstable/httpapi"
import { InstanceHttpApi } from "../../api"
const DefaultSessionsLimit = 50
const SessionCursor = Schema.Struct({
id: SessionV2.Info.fields.id,
time: Schema.Number,
order: Schema.Union([Schema.Literal("asc"), Schema.Literal("desc")]),
direction: Schema.Union([Schema.Literal("previous"), Schema.Literal("next")]),
directory: Schema.String.pipe(Schema.optional),
path: Schema.String.pipe(Schema.optional),
workspaceID: WorkspaceID.pipe(Schema.optional),
roots: Schema.Boolean.pipe(Schema.optional),
start: Schema.Number.pipe(Schema.optional),
search: Schema.String.pipe(Schema.optional),
})
type SessionCursor = typeof SessionCursor.Type
const decodeCursor = Schema.decodeUnknownSync(SessionCursor)
const sessionCursor = {
encode(
session: SessionV2.Info,
order: "asc" | "desc",
direction: "previous" | "next",
filters: Pick<SessionCursor, "directory" | "path" | "workspaceID" | "roots" | "start" | "search">,
) {
return Buffer.from(
JSON.stringify({ id: session.id, time: session.time.created, order, direction, ...filters }),
).toString("base64url")
},
decode(input: string) {
return decodeCursor(JSON.parse(Buffer.from(input, "base64url").toString("utf8")))
},
}
export const sessionHandlers = HttpApiBuilder.group(InstanceHttpApi, "v2.session", (handlers) =>
Effect.gen(function* () {
const session = yield* SessionV2.Service
return handlers
.handle(
"sessions",
Effect.fn(function* (ctx) {
const decoded = yield* Effect.try({
try: () => (ctx.query.cursor ? sessionCursor.decode(ctx.query.cursor) : undefined),
catch: () => new HttpApiError.BadRequest({}),
})
const order = decoded?.order ?? ctx.query.order ?? "desc"
const filters = decoded ?? {
directory: ctx.query.directory,
path: ctx.query.path,
workspaceID: ctx.query.workspace ? WorkspaceID.make(ctx.query.workspace) : undefined,
roots: ctx.query.roots,
start: ctx.query.start,
search: ctx.query.search,
}
const sessions = yield* session.list({
limit: ctx.query.limit ?? DefaultSessionsLimit,
order,
directory: filters.directory,
path: filters.path,
workspaceID: filters.workspaceID,
roots: filters.roots,
start: filters.start,
search: filters.search,
cursor: decoded ? { id: decoded.id, time: decoded.time, direction: decoded.direction } : undefined,
})
const first = sessions[0]
const last = sessions.at(-1)
return {
items: sessions,
cursor: {
previous: first ? sessionCursor.encode(first, order, "previous", filters) : undefined,
next: last ? sessionCursor.encode(last, order, "next", filters) : undefined,
},
}
}),
)
.handle(
"prompt",
Effect.fn(function* (ctx) {
return yield* session.prompt({
sessionID: ctx.params.sessionID,
prompt: ctx.payload.prompt,
delivery: ctx.payload.delivery ?? SessionV2.DefaultDelivery,
})
}),
)
.handle(
"compact",
Effect.fn(function* (ctx) {
yield* session.compact(ctx.params.sessionID)
return HttpApiSchema.NoContent.make()
}),
)
.handle(
"wait",
Effect.fn(function* (ctx) {
yield* session.wait(ctx.params.sessionID)
return HttpApiSchema.NoContent.make()
}),
)
}),
)

View File

@@ -65,7 +65,6 @@ import { questionHandlers } from "./handlers/question"
import { sessionHandlers } from "./handlers/session"
import { syncHandlers } from "./handlers/sync"
import { tuiHandlers } from "./handlers/tui"
import { v2Handlers } from "./handlers/v2"
import { workspaceHandlers } from "./handlers/workspace"
import { instanceContextLayer, instanceRouterMiddleware } from "./middleware/instance-context"
import { workspaceRouterMiddleware, workspaceRoutingLayer } from "./middleware/workspace-routing"
@@ -117,7 +116,6 @@ const instanceApiRoutes = HttpApiBuilder.layer(InstanceHttpApi).pipe(
providerHandlers,
sessionHandlers,
syncHandlers,
v2Handlers,
tuiHandlers,
workspaceHandlers,
]),

View File

@@ -1,8 +1,7 @@
import { describeRoute, resolver, validator } from "hono-openapi"
import { Hono } from "hono"
import type { UpgradeWebSocket } from "hono/ws"
import { Context, Effect } from "effect"
import { Flag } from "@opencode-ai/core/flag/flag"
import { Effect } from "effect"
import z from "zod"
import { Format } from "@/format"
import { TuiRoutes } from "./tui"
@@ -26,136 +25,12 @@ import { ExperimentalRoutes } from "./experimental"
import { ProviderRoutes } from "./provider"
import { EventRoutes } from "./event"
import { SyncRoutes } from "./sync"
import { V2Routes } from "./v2"
import { InstanceMiddleware } from "./middleware"
import { jsonRequest } from "./trace"
import { ExperimentalHttpApiServer } from "./httpapi/server"
import { EventPaths } from "./httpapi/event"
import { ExperimentalPaths } from "./httpapi/groups/experimental"
import { FilePaths } from "./httpapi/groups/file"
import { InstancePaths } from "./httpapi/groups/instance"
import { McpPaths } from "./httpapi/groups/mcp"
import { PtyPaths } from "./httpapi/groups/pty"
import { SessionPaths } from "./httpapi/groups/session"
import { SyncPaths } from "./httpapi/groups/sync"
import { TuiPaths } from "./httpapi/groups/tui"
import { WorkspacePaths } from "./httpapi/groups/workspace"
export const InstanceRoutes = (upgrade: UpgradeWebSocket): Hono => {
const app = new Hono()
if (Flag.OPENCODE_EXPERIMENTAL_HTTPAPI) {
const handler = ExperimentalHttpApiServer.webHandler().handler
const context = Context.empty() as Context.Context<unknown>
app.all("/api/*", (c) => handler(c.req.raw, context))
app.get(EventPaths.event, (c) => handler(c.req.raw, context))
app.get("/question", (c) => handler(c.req.raw, context))
app.post("/question/:requestID/reply", (c) => handler(c.req.raw, context))
app.post("/question/:requestID/reject", (c) => handler(c.req.raw, context))
app.get("/permission", (c) => handler(c.req.raw, context))
app.post("/permission/:requestID/reply", (c) => handler(c.req.raw, context))
app.get("/config", (c) => handler(c.req.raw, context))
app.patch("/config", (c) => handler(c.req.raw, context))
app.get("/config/providers", (c) => handler(c.req.raw, context))
app.get(ExperimentalPaths.console, (c) => handler(c.req.raw, context))
app.get(ExperimentalPaths.consoleOrgs, (c) => handler(c.req.raw, context))
app.post(ExperimentalPaths.consoleSwitch, (c) => handler(c.req.raw, context))
app.get(ExperimentalPaths.tool, (c) => handler(c.req.raw, context))
app.get(ExperimentalPaths.toolIDs, (c) => handler(c.req.raw, context))
app.get(ExperimentalPaths.worktree, (c) => handler(c.req.raw, context))
app.post(ExperimentalPaths.worktree, (c) => handler(c.req.raw, context))
app.delete(ExperimentalPaths.worktree, (c) => handler(c.req.raw, context))
app.post(ExperimentalPaths.worktreeReset, (c) => handler(c.req.raw, context))
app.get(ExperimentalPaths.session, (c) => handler(c.req.raw, context))
app.get(ExperimentalPaths.resource, (c) => handler(c.req.raw, context))
app.get("/provider", (c) => handler(c.req.raw, context))
app.get("/provider/auth", (c) => handler(c.req.raw, context))
app.post("/provider/:providerID/oauth/authorize", (c) => handler(c.req.raw, context))
app.post("/provider/:providerID/oauth/callback", (c) => handler(c.req.raw, context))
app.get("/project", (c) => handler(c.req.raw, context))
app.get("/project/current", (c) => handler(c.req.raw, context))
app.post("/project/git/init", (c) => handler(c.req.raw, context))
app.patch("/project/:projectID", (c) => handler(c.req.raw, context))
app.get(FilePaths.findText, (c) => handler(c.req.raw, context))
app.get(FilePaths.findFile, (c) => handler(c.req.raw, context))
app.get(FilePaths.findSymbol, (c) => handler(c.req.raw, context))
app.get(FilePaths.list, (c) => handler(c.req.raw, context))
app.get(FilePaths.content, (c) => handler(c.req.raw, context))
app.get(FilePaths.status, (c) => handler(c.req.raw, context))
app.get(InstancePaths.path, (c) => handler(c.req.raw, context))
app.post(InstancePaths.dispose, (c) => handler(c.req.raw, context))
app.get(InstancePaths.vcs, (c) => handler(c.req.raw, context))
app.get(InstancePaths.vcsDiff, (c) => handler(c.req.raw, context))
app.get(InstancePaths.command, (c) => handler(c.req.raw, context))
app.get(InstancePaths.agent, (c) => handler(c.req.raw, context))
app.get(InstancePaths.skill, (c) => handler(c.req.raw, context))
app.get(InstancePaths.lsp, (c) => handler(c.req.raw, context))
app.get(InstancePaths.formatter, (c) => handler(c.req.raw, context))
app.get(McpPaths.status, (c) => handler(c.req.raw, context))
app.post(McpPaths.status, (c) => handler(c.req.raw, context))
app.post(McpPaths.auth, (c) => handler(c.req.raw, context))
app.post(McpPaths.authCallback, (c) => handler(c.req.raw, context))
app.post(McpPaths.authAuthenticate, (c) => handler(c.req.raw, context))
app.delete(McpPaths.auth, (c) => handler(c.req.raw, context))
app.post(McpPaths.connect, (c) => handler(c.req.raw, context))
app.post(McpPaths.disconnect, (c) => handler(c.req.raw, context))
app.post(SyncPaths.start, (c) => handler(c.req.raw, context))
app.post(SyncPaths.replay, (c) => handler(c.req.raw, context))
app.post(SyncPaths.history, (c) => handler(c.req.raw, context))
app.get(PtyPaths.list, (c) => handler(c.req.raw, context))
app.post(PtyPaths.create, (c) => handler(c.req.raw, context))
app.get(PtyPaths.get, (c) => handler(c.req.raw, context))
app.put(PtyPaths.update, (c) => handler(c.req.raw, context))
app.delete(PtyPaths.remove, (c) => handler(c.req.raw, context))
app.get(PtyPaths.connect, (c) => handler(c.req.raw, context))
app.get(SessionPaths.list, (c) => handler(c.req.raw, context))
app.get(SessionPaths.status, (c) => handler(c.req.raw, context))
app.get(SessionPaths.get, (c) => handler(c.req.raw, context))
app.get(SessionPaths.children, (c) => handler(c.req.raw, context))
app.get(SessionPaths.todo, (c) => handler(c.req.raw, context))
app.get(SessionPaths.diff, (c) => handler(c.req.raw, context))
app.get(SessionPaths.messages, (c) => handler(c.req.raw, context))
app.get(SessionPaths.message, (c) => handler(c.req.raw, context))
app.post(SessionPaths.create, (c) => handler(c.req.raw, context))
app.delete(SessionPaths.remove, (c) => handler(c.req.raw, context))
app.patch(SessionPaths.update, (c) => handler(c.req.raw, context))
app.post(SessionPaths.init, (c) => handler(c.req.raw, context))
app.post(SessionPaths.fork, (c) => handler(c.req.raw, context))
app.post(SessionPaths.abort, (c) => handler(c.req.raw, context))
app.post(SessionPaths.share, (c) => handler(c.req.raw, context))
app.delete(SessionPaths.share, (c) => handler(c.req.raw, context))
app.post(SessionPaths.summarize, (c) => handler(c.req.raw, context))
app.post(SessionPaths.prompt, (c) => handler(c.req.raw, context))
app.post(SessionPaths.promptAsync, (c) => handler(c.req.raw, context))
app.post(SessionPaths.command, (c) => handler(c.req.raw, context))
app.post(SessionPaths.shell, (c) => handler(c.req.raw, context))
app.post(SessionPaths.revert, (c) => handler(c.req.raw, context))
app.post(SessionPaths.unrevert, (c) => handler(c.req.raw, context))
app.post(SessionPaths.permissions, (c) => handler(c.req.raw, context))
app.delete(SessionPaths.deleteMessage, (c) => handler(c.req.raw, context))
app.delete(SessionPaths.deletePart, (c) => handler(c.req.raw, context))
app.patch(SessionPaths.updatePart, (c) => handler(c.req.raw, context))
app.post(TuiPaths.appendPrompt, (c) => handler(c.req.raw, context))
app.post(TuiPaths.openHelp, (c) => handler(c.req.raw, context))
app.post(TuiPaths.openSessions, (c) => handler(c.req.raw, context))
app.post(TuiPaths.openThemes, (c) => handler(c.req.raw, context))
app.post(TuiPaths.openModels, (c) => handler(c.req.raw, context))
app.post(TuiPaths.submitPrompt, (c) => handler(c.req.raw, context))
app.post(TuiPaths.clearPrompt, (c) => handler(c.req.raw, context))
app.post(TuiPaths.executeCommand, (c) => handler(c.req.raw, context))
app.post(TuiPaths.showToast, (c) => handler(c.req.raw, context))
app.post(TuiPaths.publish, (c) => handler(c.req.raw, context))
app.post(TuiPaths.selectSession, (c) => handler(c.req.raw, context))
app.get(TuiPaths.controlNext, (c) => handler(c.req.raw, context))
app.post(TuiPaths.controlResponse, (c) => handler(c.req.raw, context))
app.get(WorkspacePaths.adapters, (c) => handler(c.req.raw, context))
app.post(WorkspacePaths.list, (c) => handler(c.req.raw, context))
app.get(WorkspacePaths.list, (c) => handler(c.req.raw, context))
app.get(WorkspacePaths.status, (c) => handler(c.req.raw, context))
app.delete(WorkspacePaths.remove, (c) => handler(c.req.raw, context))
app.post(WorkspacePaths.sessionRestore, (c) => handler(c.req.raw, context))
}
return app
.route("/project", ProjectRoutes())
.route("/pty", PtyRoutes(upgrade))
@@ -166,7 +41,6 @@ export const InstanceRoutes = (upgrade: UpgradeWebSocket): Hono => {
.route("/question", QuestionRoutes())
.route("/provider", ProviderRoutes())
.route("/sync", SyncRoutes())
.route("/api", V2Routes())
.route("/", FileRoutes())
.route("/", EventRoutes())
.route("/mcp", McpRoutes())

View File

@@ -1,229 +0,0 @@
import { WorkspaceID } from "@/control-plane/schema"
import { SessionID } from "@/session/schema"
import { SessionMessage } from "@/v2/session-message"
import { SessionV2 } from "@/v2/session"
import { zod } from "@/util/effect-zod"
import { lazy } from "@/util/lazy"
import { Effect, Schema } from "effect"
import * as DateTime from "effect/DateTime"
import { Hono } from "hono"
import { describeRoute, resolver, validator } from "hono-openapi"
import { HTTPException } from "hono/http-exception"
import z from "zod"
import { errors } from "../../error"
import { jsonRequest } from "./trace"
const DefaultMessagesLimit = 50
const DefaultSessionsLimit = 50
const SessionCursor = Schema.Struct({
id: SessionID,
time: Schema.Number,
order: Schema.Union([Schema.Literal("asc"), Schema.Literal("desc")]),
direction: Schema.Union([Schema.Literal("previous"), Schema.Literal("next")]),
directory: Schema.String.pipe(Schema.optional),
path: Schema.String.pipe(Schema.optional),
workspaceID: WorkspaceID.pipe(Schema.optional),
roots: Schema.Boolean.pipe(Schema.optional),
start: Schema.Number.pipe(Schema.optional),
search: Schema.String.pipe(Schema.optional),
})
type SessionCursor = typeof SessionCursor.Type
const SessionsResponse = Schema.Struct({
items: Schema.Array(SessionV2.Info),
cursor: Schema.Struct({
previous: Schema.String.pipe(Schema.optional),
next: Schema.String.pipe(Schema.optional),
}),
}).annotate({ identifier: "V2SessionsResponse" })
const Cursor = Schema.Struct({
id: SessionMessage.ID,
time: Schema.Number,
order: Schema.Union([Schema.Literal("asc"), Schema.Literal("desc")]),
direction: Schema.Union([Schema.Literal("previous"), Schema.Literal("next")]),
})
const MessagesResponse = Schema.Struct({
items: Schema.Array(SessionMessage.Message),
cursor: Schema.Struct({
previous: Schema.String.pipe(Schema.optional),
next: Schema.String.pipe(Schema.optional),
}),
}).annotate({ identifier: "V2SessionMessagesResponse" })
const decodeCursor = Schema.decodeUnknownSync(Cursor)
const decodeSessionCursor = Schema.decodeUnknownSync(SessionCursor)
const sessionCursor = {
encode(
session: SessionV2.Info,
order: "asc" | "desc",
direction: "previous" | "next",
filters: Pick<SessionCursor, "directory" | "path" | "workspaceID" | "roots" | "start" | "search">,
) {
return Buffer.from(
JSON.stringify({ id: session.id, time: session.time.created, order, direction, ...filters }),
).toString("base64url")
},
decode(input: string) {
return decodeSessionCursor(JSON.parse(Buffer.from(input, "base64url").toString("utf8")))
},
}
const cursor = {
encode(message: SessionMessage.Message, order: "asc" | "desc", direction: "previous" | "next") {
return Buffer.from(
JSON.stringify({ id: message.id, time: DateTime.toEpochMillis(message.time.created), order, direction }),
).toString("base64url")
},
decode(input: string) {
return decodeCursor(JSON.parse(Buffer.from(input, "base64url").toString("utf8")))
},
}
export const V2Routes = lazy(() =>
new Hono()
.get(
"/session",
describeRoute({
summary: "List v2 sessions",
description:
"Retrieve sessions in the requested order. Items keep that order across pages; use cursor.next or cursor.previous to move through the ordered list.",
operationId: "v2.session.list",
responses: {
200: {
description: "List of v2 sessions",
content: {
"application/json": {
schema: resolver(zod(SessionsResponse)),
},
},
},
...errors(400),
},
}),
validator(
"query",
z.object({
limit: z.coerce.number().int().min(1).max(200).optional(),
cursor: z.string().optional(),
order: z.enum(["asc", "desc"]).optional(),
directory: z.string().optional(),
path: z.string().optional(),
workspace: WorkspaceID.zod.optional(),
roots: z
.enum(["true", "false"])
.transform((value) => value === "true")
.optional(),
start: z.coerce.number().optional(),
search: z.string().optional(),
}),
),
async (c) => {
const query = c.req.valid("query")
const decoded = (() => {
try {
return query.cursor ? sessionCursor.decode(query.cursor) : undefined
} catch {
throw new HTTPException(400)
}
})()
const order = decoded?.order ?? query.order ?? "desc"
const filters = decoded ?? {
directory: query.directory,
path: query.path,
workspaceID: query.workspace,
roots: query.roots,
start: query.start,
search: query.search,
}
return jsonRequest("V2Routes.sessions", c, function* () {
return yield* Effect.gen(function* () {
const session = yield* SessionV2.Service
const sessions = yield* session.list({
limit: query.limit ?? DefaultSessionsLimit,
order,
directory: filters.directory,
path: filters.path,
workspaceID: filters.workspaceID,
roots: filters.roots,
start: filters.start,
search: filters.search,
cursor: decoded ? { id: decoded.id, time: decoded.time, direction: decoded.direction } : undefined,
})
const first = sessions[0]
const last = sessions.at(-1)
return {
items: sessions,
cursor: {
previous: first ? sessionCursor.encode(first, order, "previous", filters) : undefined,
next: last ? sessionCursor.encode(last, order, "next", filters) : undefined,
},
}
}).pipe(Effect.provide(SessionV2.defaultLayer))
})
},
)
.get(
"/session/:sessionID/message",
describeRoute({
summary: "Get v2 session messages",
description: "Retrieve projected v2 messages for a session directly from the message database.",
operationId: "v2.session.messages",
responses: {
200: {
description: "List of v2 session messages",
content: {
"application/json": {
schema: resolver(zod(MessagesResponse)),
},
},
},
...errors(400, 404),
},
}),
validator("param", z.object({ sessionID: SessionID.zod })),
validator(
"query",
z.object({
limit: z.coerce.number().int().min(1).max(200).optional(),
cursor: z.string().optional(),
order: z.enum(["asc", "desc"]).optional(),
}),
),
async (c) => {
const sessionID = c.req.valid("param").sessionID
const query = c.req.valid("query")
const decoded = (() => {
try {
return query.cursor ? cursor.decode(query.cursor) : undefined
} catch {
throw new HTTPException(400)
}
})()
const order = decoded?.order ?? query.order ?? "desc"
return jsonRequest("V2Routes.messages", c, function* () {
return yield* Effect.gen(function* () {
const session = yield* SessionV2.Service
const messages = yield* session.messages({
sessionID,
limit: query.limit ?? DefaultMessagesLimit,
order,
cursor: decoded ? { id: decoded.id, time: decoded.time, direction: decoded.direction } : undefined,
})
const first = messages[0]
const last = messages.at(-1)
return {
items: messages,
cursor: {
previous: first ? cursor.encode(first, order, "previous") : undefined,
next: last ? cursor.encode(last, order, "next") : undefined,
},
}
}).pipe(Effect.provide(SessionV2.defaultLayer))
})
},
),
)

View File

@@ -14,13 +14,10 @@ import { Config } from "@/config/config"
import { NotFoundError } from "@/storage/storage"
import { ModelID, ProviderID } from "@/provider/schema"
import { Effect, Layer, Context, Schema } from "effect"
import * as DateTime from "effect/DateTime"
import { InstanceState } from "@/effect/instance-state"
import { isOverflow as overflow, usable } from "./overflow"
import { makeRuntime } from "@/effect/run-service"
import { fn } from "@/util/fn"
import { EventV2 } from "@/v2/event"
import { SessionEvent } from "@/v2/session-event"
const log = Log.create({ service: "session.compaction" })
@@ -559,21 +556,7 @@ export const layer: Layer.Layer<
}
if (processor.message.error) return "stop"
if (result === "continue") {
const summary = summaryText(
(yield* session.messages({ sessionID: input.sessionID })).find((item) => item.info.id === msg.id) ?? {
info: msg,
parts: [],
},
)
EventV2.run(SessionEvent.Compaction.Ended.Sync, {
sessionID: input.sessionID,
timestamp: DateTime.makeUnsafe(Date.now()),
text: summary ?? "",
include: selected.tail_start_id,
})
yield* bus.publish(Event.Compacted, { sessionID: input.sessionID })
}
if (result === "continue") yield* bus.publish(Event.Compacted, { sessionID: input.sessionID })
return result
})
@@ -600,11 +583,6 @@ export const layer: Layer.Layer<
auto: input.auto,
overflow: input.overflow,
})
EventV2.run(SessionEvent.Compaction.Started.Sync, {
sessionID: input.sessionID,
timestamp: DateTime.makeUnsafe(Date.now()),
reason: input.auto ? "auto" : "manual",
})
})
return Service.of({

View File

@@ -20,9 +20,6 @@ import { Question } from "@/question"
import { errorMessage } from "@/util/error"
import * as Log from "@opencode-ai/core/util/log"
import { isRecord } from "@/util/record"
import { EventV2 } from "@/v2/event"
import { SessionEvent } from "@/v2/session-event"
import * as DateTime from "effect/DateTime"
const DOOM_LOOP_THRESHOLD = 3
const log = Log.create({ service: "session.processor" })
@@ -224,12 +221,6 @@ export const layer: Layer.Layer<
case "reasoning-start":
if (value.id in ctx.reasoningMap) return
// TODO(v2): Temporary dual-write while migrating session messages to v2 events.
EventV2.run(SessionEvent.Reasoning.Started.Sync, {
sessionID: ctx.sessionID,
reasoningID: value.id,
timestamp: DateTime.makeUnsafe(Date.now()),
})
ctx.reasoningMap[value.id] = {
id: PartID.ascending(),
messageID: ctx.assistantMessage.id,
@@ -244,13 +235,6 @@ export const layer: Layer.Layer<
case "reasoning-delta":
if (!(value.id in ctx.reasoningMap)) return
// TODO(v2): Temporary dual-write while migrating session messages to v2 events.
EventV2.run(SessionEvent.Reasoning.Delta.Sync, {
sessionID: ctx.sessionID,
reasoningID: value.id,
delta: value.text,
timestamp: DateTime.makeUnsafe(Date.now()),
})
ctx.reasoningMap[value.id].text += value.text
if (value.providerMetadata) ctx.reasoningMap[value.id].metadata = value.providerMetadata
yield* session.updatePartDelta({
@@ -264,13 +248,6 @@ export const layer: Layer.Layer<
case "reasoning-end":
if (!(value.id in ctx.reasoningMap)) return
// TODO(v2): Temporary dual-write while migrating session messages to v2 events.
EventV2.run(SessionEvent.Reasoning.Ended.Sync, {
sessionID: ctx.sessionID,
reasoningID: value.id,
text: ctx.reasoningMap[value.id].text,
timestamp: DateTime.makeUnsafe(Date.now()),
})
// oxlint-disable-next-line no-self-assign -- reactivity trigger
ctx.reasoningMap[value.id].text = ctx.reasoningMap[value.id].text
ctx.reasoningMap[value.id].time = { ...ctx.reasoningMap[value.id].time, end: Date.now() }
@@ -283,13 +260,6 @@ export const layer: Layer.Layer<
if (ctx.assistantMessage.summary) {
throw new Error(`Tool call not allowed while generating summary: ${value.toolName}`)
}
// TODO(v2): Temporary dual-write while migrating session messages to v2 events.
EventV2.run(SessionEvent.Tool.Input.Started.Sync, {
sessionID: ctx.sessionID,
callID: value.id,
name: value.toolName,
timestamp: DateTime.makeUnsafe(Date.now()),
})
const part = yield* session.updatePart({
id: ctx.toolcalls[value.id]?.partID ?? PartID.ascending(),
messageID: ctx.assistantMessage.id,
@@ -311,34 +281,13 @@ export const layer: Layer.Layer<
case "tool-input-delta":
return
case "tool-input-end": {
// TODO(v2): Temporary dual-write while migrating session messages to v2 events.
EventV2.run(SessionEvent.Tool.Input.Ended.Sync, {
sessionID: ctx.sessionID,
callID: value.id,
text: "",
timestamp: DateTime.makeUnsafe(Date.now()),
})
case "tool-input-end":
return
}
case "tool-call": {
if (ctx.assistantMessage.summary) {
throw new Error(`Tool call not allowed while generating summary: ${value.toolName}`)
}
const toolCall = yield* readToolCall(value.toolCallId)
// TODO(v2): Temporary dual-write while migrating session messages to v2 events.
EventV2.run(SessionEvent.Tool.Called.Sync, {
sessionID: ctx.sessionID,
callID: value.toolCallId,
tool: value.toolName,
input: value.input,
provider: {
executed: toolCall?.part.metadata?.providerExecuted === true,
...(value.providerMetadata ? { metadata: value.providerMetadata } : {}),
},
timestamp: DateTime.makeUnsafe(Date.now()),
})
yield* updateToolCall(value.toolCallId, (match) => ({
...match,
tool: value.toolName,
@@ -382,48 +331,11 @@ export const layer: Layer.Layer<
}
case "tool-result": {
const toolCall = yield* readToolCall(value.toolCallId)
// TODO(v2): Temporary dual-write while migrating session messages to v2 events.
EventV2.run(SessionEvent.Tool.Success.Sync, {
sessionID: ctx.sessionID,
callID: value.toolCallId,
structured: value.output.metadata,
content: [
{
type: "text",
text: value.output.output,
},
...(value.output.attachments?.map((item: MessageV2.FilePart) => ({
type: "file",
uri: item.url,
mime: item.mime,
name: item.filename,
})) ?? []),
],
provider: {
executed: toolCall?.part.metadata?.providerExecuted === true,
},
timestamp: DateTime.makeUnsafe(Date.now()),
})
yield* completeToolCall(value.toolCallId, value.output)
return
}
case "tool-error": {
const toolCall = yield* readToolCall(value.toolCallId)
// TODO(v2): Temporary dual-write while migrating session messages to v2 events.
EventV2.run(SessionEvent.Tool.Error.Sync, {
sessionID: ctx.sessionID,
callID: value.toolCallId,
error: {
type: "unknown",
message: errorMessage(value.error),
},
provider: {
executed: toolCall?.part.metadata?.providerExecuted === true,
},
timestamp: DateTime.makeUnsafe(Date.now()),
})
yield* failToolCall(value.toolCallId, value.error)
return
}
@@ -433,20 +345,6 @@ export const layer: Layer.Layer<
case "start-step":
if (!ctx.snapshot) ctx.snapshot = yield* snapshot.track()
if (!ctx.assistantMessage.summary) {
// TODO(v2): Temporary dual-write while migrating session messages to v2 events.
EventV2.run(SessionEvent.Step.Started.Sync, {
sessionID: ctx.sessionID,
agent: input.assistantMessage.agent,
model: {
id: ctx.model.id,
providerID: ctx.model.providerID,
variant: input.assistantMessage.variant,
},
snapshot: ctx.snapshot,
timestamp: DateTime.makeUnsafe(Date.now()),
})
}
yield* session.updatePart({
id: PartID.ascending(),
messageID: ctx.assistantMessage.id,
@@ -457,30 +355,18 @@ export const layer: Layer.Layer<
return
case "finish-step": {
const completedSnapshot = yield* snapshot.track()
const usage = Session.getUsage({
model: ctx.model,
usage: value.usage,
metadata: value.providerMetadata,
})
if (!ctx.assistantMessage.summary) {
// TODO(v2): Temporary dual-write while migrating session messages to v2 events.
EventV2.run(SessionEvent.Step.Ended.Sync, {
sessionID: ctx.sessionID,
finish: value.finishReason,
cost: usage.cost,
tokens: usage.tokens,
snapshot: completedSnapshot,
timestamp: DateTime.makeUnsafe(Date.now()),
})
}
ctx.assistantMessage.finish = value.finishReason
ctx.assistantMessage.cost += usage.cost
ctx.assistantMessage.tokens = usage.tokens
yield* session.updatePart({
id: PartID.ascending(),
reason: value.finishReason,
snapshot: completedSnapshot,
snapshot: yield* snapshot.track(),
messageID: ctx.assistantMessage.id,
sessionID: ctx.assistantMessage.sessionID,
type: "step-finish",
@@ -518,13 +404,6 @@ export const layer: Layer.Layer<
}
case "text-start":
if (!ctx.assistantMessage.summary) {
// TODO(v2): Temporary dual-write while migrating session messages to v2 events.
EventV2.run(SessionEvent.Text.Started.Sync, {
sessionID: ctx.sessionID,
timestamp: DateTime.makeUnsafe(Date.now()),
})
}
ctx.currentText = {
id: PartID.ascending(),
messageID: ctx.assistantMessage.id,
@@ -539,13 +418,6 @@ export const layer: Layer.Layer<
case "text-delta":
if (!ctx.currentText) return
if (ctx.assistantMessage.summary) {
EventV2.run(SessionEvent.Compaction.Delta.Sync, {
sessionID: ctx.sessionID,
text: value.text,
timestamp: DateTime.makeUnsafe(Date.now()),
})
}
ctx.currentText.text += value.text
if (value.providerMetadata) ctx.currentText.metadata = value.providerMetadata
yield* session.updatePartDelta({
@@ -570,14 +442,6 @@ export const layer: Layer.Layer<
},
{ text: ctx.currentText.text },
)).text
if (!ctx.assistantMessage.summary) {
// TODO(v2): Temporary dual-write while migrating session messages to v2 events.
EventV2.run(SessionEvent.Text.Ended.Sync, {
sessionID: ctx.sessionID,
text: ctx.currentText.text,
timestamp: DateTime.makeUnsafe(Date.now()),
})
}
{
const end = Date.now()
ctx.currentText.time = { start: ctx.currentText.time?.start ?? end, end }
@@ -704,24 +568,13 @@ export const layer: Layer.Layer<
Effect.retry(
SessionRetry.policy({
parse,
set: (info) => {
// TODO(v2): Temporary dual-write while migrating session messages to v2 events.
EventV2.run(SessionEvent.Retried.Sync, {
sessionID: ctx.sessionID,
attempt: info.attempt,
error: {
message: info.message,
isRetryable: true,
},
timestamp: DateTime.makeUnsafe(Date.now()),
})
return status.set(ctx.sessionID, {
set: (info) =>
status.set(ctx.sessionID, {
type: "retry",
attempt: info.attempt,
message: info.message,
next: info.next,
})
},
}),
}),
),
Effect.catch(halt),

View File

@@ -1,206 +0,0 @@
import { and, desc, eq } from "@/storage/db"
import type { Database } from "@/storage/db"
import { SessionMessage } from "@/v2/session-message"
import { SessionMessageUpdater } from "@/v2/session-message-updater"
import { SessionEvent } from "@/v2/session-event"
import * as DateTime from "effect/DateTime"
import { SyncEvent } from "@/sync"
import { SessionMessageTable, SessionTable } from "./session.sql"
import type { SessionID } from "./schema"
import { Schema } from "effect"
const decodeMessage = Schema.decodeUnknownSync(SessionMessage.Message)
type SessionMessageData = NonNullable<(typeof SessionMessageTable.$inferInsert)["data"]>
function encodeDateTimes(value: unknown): unknown {
if (DateTime.isDateTime(value)) return DateTime.toEpochMillis(value)
if (Array.isArray(value)) return value.map(encodeDateTimes)
if (typeof value === "object" && value !== null) {
return Object.fromEntries(Object.entries(value).map(([key, item]) => [key, encodeDateTimes(item)]))
}
return value
}
function encodeMessageData(value: unknown): SessionMessageData {
return encodeDateTimes(value) as SessionMessageData
}
function sqlite(db: Database.TxOrDb, sessionID: SessionID): SessionMessageUpdater.Adapter<void> {
return {
getCurrentAssistant() {
return db
.select()
.from(SessionMessageTable)
.where(and(eq(SessionMessageTable.session_id, sessionID), eq(SessionMessageTable.type, "assistant")))
.orderBy(desc(SessionMessageTable.id))
.all()
.map((row) => decodeMessage({ ...row.data, id: row.id, type: row.type }))
.find((message): message is SessionMessage.Assistant => message.type === "assistant" && !message.time.completed)
},
getCurrentCompaction() {
return db
.select()
.from(SessionMessageTable)
.where(and(eq(SessionMessageTable.session_id, sessionID), eq(SessionMessageTable.type, "compaction")))
.orderBy(desc(SessionMessageTable.id))
.all()
.map((row) => decodeMessage({ ...row.data, id: row.id, type: row.type }))
.find((message): message is SessionMessage.Compaction => message.type === "compaction")
},
getCurrentShell(callID) {
return db
.select()
.from(SessionMessageTable)
.where(and(eq(SessionMessageTable.session_id, sessionID), eq(SessionMessageTable.type, "shell")))
.orderBy(desc(SessionMessageTable.id))
.all()
.map((row) => decodeMessage({ ...row.data, id: row.id, type: row.type }))
.find((message): message is SessionMessage.Shell => message.type === "shell" && message.callID === callID)
},
updateAssistant(assistant) {
const { id, type, ...data } = assistant
db.update(SessionMessageTable)
.set({ data: encodeMessageData(data) })
.where(
and(
eq(SessionMessageTable.id, id),
eq(SessionMessageTable.session_id, sessionID),
eq(SessionMessageTable.type, type),
),
)
.run()
},
updateCompaction(compaction) {
const { id, type, ...data } = compaction
db.update(SessionMessageTable)
.set({ data: encodeMessageData(data) })
.where(
and(
eq(SessionMessageTable.id, id),
eq(SessionMessageTable.session_id, sessionID),
eq(SessionMessageTable.type, type),
),
)
.run()
},
updateShell(shell) {
const { id, type, ...data } = shell
db.update(SessionMessageTable)
.set({ data: encodeMessageData(data) })
.where(
and(
eq(SessionMessageTable.id, id),
eq(SessionMessageTable.session_id, sessionID),
eq(SessionMessageTable.type, type),
),
)
.run()
},
appendMessage(message) {
const { id, type, ...data } = message
db.insert(SessionMessageTable)
.values([
{
id,
session_id: sessionID,
type,
time_created: DateTime.toEpochMillis(message.time.created),
data: encodeMessageData(data),
},
])
.run()
},
finish() {},
}
}
function update(db: Database.TxOrDb, event: SessionEvent.Event) {
SessionMessageUpdater.update(sqlite(db, event.data.sessionID), event)
}
export default [
SyncEvent.project(SessionEvent.AgentSwitched.Sync, (db, data, event) => {
db.update(SessionTable)
.set({
agent: data.agent,
time_updated: DateTime.toEpochMillis(data.timestamp),
})
.where(eq(SessionTable.id, data.sessionID))
.run()
update(db, { id: SessionMessage.ID.make(event.id), type: "session.next.agent.switched", data })
}),
SyncEvent.project(SessionEvent.ModelSwitched.Sync, (db, data, event) => {
db.update(SessionTable)
.set({
model: {
id: data.id,
providerID: data.providerID,
variant: data.variant,
},
time_updated: DateTime.toEpochMillis(data.timestamp),
})
.where(eq(SessionTable.id, data.sessionID))
.run()
update(db, { id: SessionMessage.ID.make(event.id), type: "session.next.model.switched", data })
}),
SyncEvent.project(SessionEvent.Prompted.Sync, (db, data, event) => {
update(db, { id: SessionMessage.ID.make(event.id), type: "session.next.prompted", data })
}),
SyncEvent.project(SessionEvent.Synthetic.Sync, (db, data, event) => {
update(db, { id: SessionMessage.ID.make(event.id), type: "session.next.synthetic", data })
}),
SyncEvent.project(SessionEvent.Shell.Started.Sync, (db, data, event) => {
update(db, { id: SessionMessage.ID.make(event.id), type: "session.next.shell.started", data })
}),
SyncEvent.project(SessionEvent.Shell.Ended.Sync, (db, data, event) => {
update(db, { id: SessionMessage.ID.make(event.id), type: "session.next.shell.ended", data })
}),
SyncEvent.project(SessionEvent.Step.Started.Sync, (db, data, event) => {
update(db, { id: SessionMessage.ID.make(event.id), type: "session.next.step.started", data })
}),
SyncEvent.project(SessionEvent.Step.Ended.Sync, (db, data, event) => {
update(db, { id: SessionMessage.ID.make(event.id), type: "session.next.step.ended", data })
}),
SyncEvent.project(SessionEvent.Text.Started.Sync, (db, data, event) => {
update(db, { id: SessionMessage.ID.make(event.id), type: "session.next.text.started", data })
}),
SyncEvent.project(SessionEvent.Text.Delta.Sync, () => {}),
SyncEvent.project(SessionEvent.Text.Ended.Sync, (db, data, event) => {
update(db, { id: SessionMessage.ID.make(event.id), type: "session.next.text.ended", data })
}),
SyncEvent.project(SessionEvent.Tool.Input.Started.Sync, (db, data, event) => {
update(db, { id: SessionMessage.ID.make(event.id), type: "session.next.tool.input.started", data })
}),
SyncEvent.project(SessionEvent.Tool.Input.Delta.Sync, () => {}),
SyncEvent.project(SessionEvent.Tool.Input.Ended.Sync, (db, data, event) => {
update(db, { id: SessionMessage.ID.make(event.id), type: "session.next.tool.input.ended", data })
}),
SyncEvent.project(SessionEvent.Tool.Called.Sync, (db, data, event) => {
update(db, { id: SessionMessage.ID.make(event.id), type: "session.next.tool.called", data })
}),
SyncEvent.project(SessionEvent.Tool.Success.Sync, (db, data, event) => {
update(db, { id: SessionMessage.ID.make(event.id), type: "session.next.tool.success", data })
}),
SyncEvent.project(SessionEvent.Tool.Error.Sync, (db, data, event) => {
update(db, { id: SessionMessage.ID.make(event.id), type: "session.next.tool.error", data })
}),
SyncEvent.project(SessionEvent.Reasoning.Started.Sync, (db, data, event) => {
update(db, { id: SessionMessage.ID.make(event.id), type: "session.next.reasoning.started", data })
}),
SyncEvent.project(SessionEvent.Reasoning.Delta.Sync, () => {}),
SyncEvent.project(SessionEvent.Reasoning.Ended.Sync, (db, data, event) => {
update(db, { id: SessionMessage.ID.make(event.id), type: "session.next.reasoning.ended", data })
}),
SyncEvent.project(SessionEvent.Retried.Sync, (db, data, event) => {
update(db, { id: SessionMessage.ID.make(event.id), type: "session.next.retried", data })
}),
SyncEvent.project(SessionEvent.Compaction.Started.Sync, (db, data, event) => {
update(db, { id: SessionMessage.ID.make(event.id), type: "session.next.compaction.started", data })
}),
SyncEvent.project(SessionEvent.Compaction.Delta.Sync, (db, data, event) => {
update(db, { id: SessionMessage.ID.make(event.id), type: "session.next.compaction.delta", data })
}),
SyncEvent.project(SessionEvent.Compaction.Ended.Sync, (db, data, event) => {
update(db, { id: SessionMessage.ID.make(event.id), type: "session.next.compaction.ended", data })
}),
]

View File

@@ -5,8 +5,7 @@ import { SyncEvent } from "@/sync"
import * as Session from "./session"
import { MessageV2 } from "./message-v2"
import { SessionTable, MessageTable, PartTable } from "./session.sql"
import { Log } from "@opencode-ai/core/util/log"
import nextProjectors from "./projectors-next"
import * as Log from "@opencode-ai/core/util/log"
const log = Log.create({ service: "session.projector" })
@@ -137,6 +136,4 @@ export default [
log.warn("ignored late part update", { partID: id, messageID, sessionID })
}
}),
...nextProjectors,
]

View File

@@ -53,13 +53,6 @@ import { InstanceState } from "@/effect/instance-state"
import { TaskTool, type TaskPromptOps } from "@/tool/task"
import { SessionRunState } from "./run-state"
import { EffectBridge } from "@/effect/bridge"
import { EventV2 } from "@/v2/event"
import { SessionEvent } from "@/v2/session-event"
import { AgentAttachment, FileAttachment, Source } from "@/v2/session-prompt"
import * as DateTime from "effect/DateTime"
import { eq } from "@/storage/db"
import * as Database from "@/storage/db"
import { SessionTable } from "./session.sql"
// @ts-ignore
globalThis.AI_SDK_LOG_WARNINGS = false
@@ -791,28 +784,20 @@ NOTE: At any point in time through this workflow you should feel free to ask the
providerID: model.providerID,
}
yield* sessions.updateMessage(msg)
const callID = ulid()
const started = Date.now()
const part: MessageV2.ToolPart = {
type: "tool",
id: PartID.ascending(),
messageID: msg.id,
sessionID: input.sessionID,
tool: "bash",
callID,
callID: ulid(),
state: {
status: "running",
time: { start: started },
time: { start: Date.now() },
input: { command: input.command },
},
}
yield* sessions.updatePart(part)
EventV2.run(SessionEvent.Shell.Started.Sync, {
sessionID: input.sessionID,
timestamp: DateTime.makeUnsafe(started),
callID,
command: input.command,
})
return { msg, part, cwd: ctx.directory }
}).pipe(Effect.ensuring(markReady))
@@ -827,21 +812,14 @@ NOTE: At any point in time through this workflow you should feel free to ask the
if (aborted) {
output += "\n\n" + ["<metadata>", "User aborted the command", "</metadata>"].join("\n")
}
const completed = Date.now()
EventV2.run(SessionEvent.Shell.Ended.Sync, {
sessionID: input.sessionID,
timestamp: DateTime.makeUnsafe(completed),
callID: part.callID,
output,
})
if (!msg.time.completed) {
msg.time.completed = completed
msg.time.completed = Date.now()
yield* sessions.updateMessage(msg)
}
if (part.state.status === "running") {
part.state = {
status: "completed",
time: { ...part.state.time, end: completed },
time: { ...part.state.time, end: Date.now() },
input: part.state.input,
title: "",
metadata: { output, description: "" },
@@ -955,34 +933,6 @@ NOTE: At any point in time through this workflow you should feel free to ask the
format: input.format,
}
const current = Database.use((db) =>
db
.select({ agent: SessionTable.agent, model: SessionTable.model })
.from(SessionTable)
.where(eq(SessionTable.id, input.sessionID))
.get(),
)
if (current?.agent !== info.agent) {
EventV2.run(SessionEvent.AgentSwitched.Sync, {
sessionID: input.sessionID,
timestamp: DateTime.makeUnsafe(info.time.created),
agent: info.agent,
})
}
if (
current?.model?.providerID !== info.model.providerID ||
current.model.id !== info.model.modelID ||
current.model.variant !== info.model.variant
) {
EventV2.run(SessionEvent.ModelSwitched.Sync, {
sessionID: input.sessionID,
timestamp: DateTime.makeUnsafe(info.time.created),
id: info.model.modelID,
providerID: info.model.providerID,
variant: info.model.variant,
})
}
yield* Effect.addFinalizer(() => instruction.clear(info.id))
type Draft<T> = T extends MessageV2.Part ? Omit<T, "id"> & { id?: string } : never
@@ -1299,69 +1249,6 @@ NOTE: At any point in time through this workflow you should feel free to ask the
yield* sessions.updateMessage(info)
for (const part of parts) yield* sessions.updatePart(part)
const nextPrompt = parts.reduce(
(result, part) => {
if (part.type === "text") {
if (part.synthetic) result.synthetic.push(part.text)
else result.text.push(part.text)
}
if (part.type === "file") {
result.files.push(
new FileAttachment({
uri: part.url,
mime: part.mime,
name: part.filename,
source: part.source
? new Source({
start: part.source.text.start,
end: part.source.text.end,
text: part.source.text.value,
})
: undefined,
}),
)
}
if (part.type === "agent") {
result.agents.push(
new AgentAttachment({
name: part.name,
source: part.source
? new Source({
start: part.source.start,
end: part.source.end,
text: part.source.value,
})
: undefined,
}),
)
}
return result
},
{
text: [] as string[],
files: [] as FileAttachment[],
agents: [] as AgentAttachment[],
synthetic: [] as string[],
},
)
// TODO(v2): Temporary dual-write while migrating session messages to v2 events.
EventV2.run(SessionEvent.Prompted.Sync, {
sessionID: input.sessionID,
timestamp: DateTime.makeUnsafe(info.time.created),
prompt: {
text: nextPrompt.text.join("\n"),
files: nextPrompt.files,
agents: nextPrompt.agents,
},
})
for (const text of nextPrompt.synthetic) {
// TODO(v2): Temporary dual-write while migrating session messages to v2 events.
EventV2.run(SessionEvent.Synthetic.Sync, {
sessionID: input.sessionID,
timestamp: DateTime.makeUnsafe(info.time.created),
text,
})
}
return { info, parts }
}, Effect.scoped)

View File

@@ -1,7 +1,7 @@
import { sqliteTable, text, integer, index, primaryKey } from "drizzle-orm/sqlite-core"
import { ProjectTable } from "../project/project.sql"
import type { MessageV2 } from "./message-v2"
import type { SessionMessage } from "../v2/session-message"
import type { SessionEntry } from "../v2/session-entry"
import type { Snapshot } from "../snapshot"
import type { Permission } from "../permission"
import type { ProjectID } from "../project/schema"
@@ -11,7 +11,6 @@ import { Timestamps } from "../storage/schema.sql"
type PartData = Omit<MessageV2.Part, "id" | "sessionID" | "messageID">
type InfoData = Omit<MessageV2.Info, "id" | "sessionID">
type SessionMessageData = Omit<(typeof SessionMessage.Message)["Encoded"], "type" | "id">
export const SessionTable = sqliteTable(
"session",
@@ -35,12 +34,6 @@ export const SessionTable = sqliteTable(
summary_diffs: text({ mode: "json" }).$type<Snapshot.FileDiff[]>(),
revert: text({ mode: "json" }).$type<{ messageID: MessageID; partID?: PartID; snapshot?: string; diff?: string }>(),
permission: text({ mode: "json" }).$type<Permission.Ruleset>(),
agent: text(),
model: text({ mode: "json" }).$type<{
id: string
providerID: string
variant?: string
}>(),
...Timestamps,
time_compacting: integer(),
time_archived: integer(),
@@ -103,22 +96,22 @@ export const TodoTable = sqliteTable(
],
)
export const SessionMessageTable = sqliteTable(
"session_message",
export const SessionEntryTable = sqliteTable(
"session_entry",
{
id: text().$type<SessionMessage.ID>().primaryKey(),
id: text().$type<SessionEntry.ID>().primaryKey(),
session_id: text()
.$type<SessionID>()
.notNull()
.references(() => SessionTable.id, { onDelete: "cascade" }),
type: text().$type<SessionMessage.Type>().notNull(),
type: text().$type<SessionEntry.Type>().notNull(),
...Timestamps,
data: text({ mode: "json" }).notNull().$type<SessionMessageData>(),
data: text({ mode: "json" }).notNull().$type<Omit<SessionEntry.Entry, "type" | "id">>(),
},
(table) => [
index("session_message_session_idx").on(table.session_id),
index("session_message_session_type_idx").on(table.session_id, table.type),
index("session_message_time_created_idx").on(table.time_created),
index("session_entry_session_idx").on(table.session_id),
index("session_entry_session_type_idx").on(table.session_id, table.type),
index("session_entry_time_created_idx").on(table.time_created),
],
)

View File

@@ -32,7 +32,6 @@ import { Snapshot } from "@/snapshot"
import { ProjectID } from "../project/schema"
import { WorkspaceID } from "../control-plane/schema"
import { SessionID, MessageID, PartID } from "./schema"
import { ModelID, ProviderID } from "@/provider/schema"
import type { Provider } from "@/provider/provider"
import { Permission } from "@/permission"
@@ -79,10 +78,6 @@ export function fromRow(row: SessionRow): Info {
path: row.path ?? undefined,
parentID: row.parent_id ?? undefined,
title: row.title,
agent: row.agent ?? undefined,
model: row.model
? { id: ModelID.make(row.model.id), providerID: ProviderID.make(row.model.providerID), variant: row.model.variant }
: undefined,
version: row.version,
summary,
share,
@@ -107,8 +102,6 @@ export function toRow(info: Info) {
directory: info.directory,
path: info.path,
title: info.title,
agent: info.agent,
model: info.model,
version: info.version,
share_url: info.share?.url,
summary_additions: info.summary?.additions,
@@ -167,12 +160,6 @@ const Revert = Schema.Struct({
diff: optionalOmitUndefined(Schema.String),
})
const Model = Schema.Struct({
id: ModelID,
providerID: ProviderID,
variant: optionalOmitUndefined(Schema.String),
})
export const Info = Schema.Struct({
id: SessionID,
slug: Schema.String,
@@ -184,8 +171,6 @@ export const Info = Schema.Struct({
summary: optionalOmitUndefined(Summary),
share: optionalOmitUndefined(Share),
title: Schema.String,
agent: optionalOmitUndefined(Schema.String),
model: optionalOmitUndefined(Model),
version: Schema.String,
time: Time,
permission: optionalOmitUndefined(Permission.Ruleset),
@@ -216,8 +201,6 @@ export const CreateInput = Schema.optional(
Schema.Struct({
parentID: Schema.optional(SessionID),
title: Schema.optional(Schema.String),
agent: Schema.optional(Schema.String),
model: Schema.optional(Model),
permission: Schema.optional(Permission.Ruleset),
workspaceID: Schema.optional(WorkspaceID),
}),
@@ -289,8 +272,6 @@ const UpdatedInfo = Schema.Struct({
summary: Schema.optional(Schema.NullOr(Summary)),
share: Schema.optional(UpdatedShare),
title: Schema.optional(Schema.NullOr(Schema.String)),
agent: Schema.optional(Schema.NullOr(Schema.String)),
model: Schema.optional(Schema.NullOr(Model)),
version: Schema.optional(Schema.NullOr(Schema.String)),
time: Schema.optional(UpdatedTime),
permission: Schema.optional(Schema.NullOr(Permission.Ruleset)),
@@ -423,8 +404,6 @@ export interface Interface {
readonly create: (input?: {
parentID?: SessionID
title?: string
agent?: string
model?: Schema.Schema.Type<typeof Model>
permission?: Permission.Ruleset
workspaceID?: WorkspaceID
}) => Effect.Effect<Info>
@@ -485,8 +464,6 @@ export const layer: Layer.Layer<Service, never, Bus.Service | Storage.Service |
const createNext = Effect.fn("Session.createNext")(function* (input: {
id?: SessionID
title?: string
agent?: string
model?: Schema.Schema.Type<typeof Model>
parentID?: SessionID
workspaceID?: WorkspaceID
directory: string
@@ -504,8 +481,6 @@ export const layer: Layer.Layer<Service, never, Bus.Service | Storage.Service |
workspaceID: input.workspaceID,
parentID: input.parentID,
title: input.title ?? createDefaultTitle(!!input.parentID),
agent: input.agent,
model: input.model,
permission: input.permission,
time: {
created: Date.now(),
@@ -616,8 +591,6 @@ export const layer: Layer.Layer<Service, never, Bus.Service | Storage.Service |
const create = Effect.fn("Session.create")(function* (input?: {
parentID?: SessionID
title?: string
agent?: string
model?: Schema.Schema.Type<typeof Model>
permission?: Permission.Ruleset
workspaceID?: WorkspaceID
}) {
@@ -628,8 +601,6 @@ export const layer: Layer.Layer<Service, never, Bus.Service | Storage.Service |
directory: ctx.directory,
path: sessionPath(ctx.worktree, ctx.directory),
title: input?.title,
agent: input?.agent,
model: input?.model,
permission: input?.permission,
workspaceID: input?.workspaceID ?? workspace,
})

View File

@@ -46,7 +46,7 @@ export type Properties<Def extends Definition = Definition> = EffectSchema.Schem
export type SerializedEvent<Def extends Definition = Definition> = Event<Def> & { type: string }
type ProjectorFunc = (db: Database.TxOrDb, data: unknown, event: Event) => void
type ProjectorFunc = (db: Database.TxOrDb, data: unknown) => void
type ConvertEvent = (type: string, data: Event["data"]) => unknown | Promise<unknown>
type PublishContext = {
instance?: InstanceContext
@@ -255,7 +255,7 @@ export function define<
export function project<Def extends Definition>(
def: Def,
func: (db: Database.TxOrDb, data: Event<Def>["data"], event: Event<Def>) => void,
func: (db: Database.TxOrDb, data: Event<Def>["data"]) => void,
): [Definition, ProjectorFunc] {
return [def, func as ProjectorFunc]
}
@@ -277,7 +277,7 @@ function process<Def extends Definition>(
// idempotent: need to ignore any events already logged
Database.transaction((tx) => {
projector(tx, event.data, event)
projector(tx, event.data)
if (Flag.OPENCODE_EXPERIMENTAL_WORKSPACES) {
tx.insert(EventSequenceTable)
@@ -308,7 +308,7 @@ function process<Def extends Definition>(
}
const result = convertEvent(def.type, event.data)
const publish = (data: unknown) => ProjectBus.publish(def, data as Properties<Def>, { id: event.id })
const publish = (data: unknown) => ProjectBus.publish(def, data as Properties<Def>)
if (result instanceof Promise) {
void result.then(publish)
} else {

View File

@@ -90,7 +90,7 @@ function bodyWithChecks(ast: SchemaAST.AST): z.ZodTypeAny {
// Schema.withDecodingDefault also attaches encoding, but we want `.default(v)`
// on the inner Zod rather than a transform wrapper — so optional ASTs whose
// encoding resolves a default from Option.none() route through body()/opt().
const hasEncoding = ast.encoding?.length && (ast._tag !== "Declaration" || ast.typeParameters.length === 0)
const hasEncoding = ast.encoding?.length && ast._tag !== "Declaration"
const hasTransform = hasEncoding && !(SchemaAST.isOptional(ast) && extractDefault(ast) !== undefined)
const base = hasTransform ? encoded(ast) : body(ast)
return ast.checks?.length ? applyChecks(base, ast.checks, ast) : base

View File

@@ -1,53 +0,0 @@
import { Identifier } from "@/id/id"
import { SyncEvent } from "@/sync"
import { withStatics } from "@/util/schema"
import { Flag } from "@opencode-ai/core/flag/flag"
import * as Schema from "effect/Schema"
export const ID = Schema.String.pipe(
Schema.brand("Event.ID"),
withStatics((s) => ({
create: () => s.make(Identifier.create("evt", "ascending")),
})),
)
export type ID = Schema.Schema.Type<typeof ID>
export function define<const Type extends string, Fields extends Schema.Struct.Fields>(input: {
type: Type
schema: Fields
aggregate: string
version?: number
}) {
const Payload = Schema.Struct({
id: ID,
metadata: Schema.Record(Schema.String, Schema.Unknown).pipe(Schema.optional),
type: Schema.Literal(input.type),
data: Schema.Struct(input.schema),
}).annotate({
identifier: input.type,
})
const Sync = SyncEvent.define({
type: input.type,
version: input.version ?? 1,
aggregate: input.aggregate,
schema: Payload.fields.data,
})
return Object.assign(Payload, {
Sync,
version: input.version,
aggregate: input.aggregate,
})
}
export function run<Def extends SyncEvent.Definition>(
def: Def,
data: SyncEvent.Event<Def>["data"],
options?: { publish?: boolean },
) {
if (!Flag.OPENCODE_EXPERIMENTAL_EVENT_SYSTEM) return
SyncEvent.run(def, data, options)
}
export * as EventV2 from "./event"

View File

@@ -0,0 +1,261 @@
import { produce, type WritableDraft } from "immer"
import { SessionEvent } from "./session-event"
import { SessionEntry } from "./session-entry"
export type MemoryState = {
entries: SessionEntry.Entry[]
pending: SessionEntry.Entry[]
}
export interface Adapter<Result> {
readonly getCurrentAssistant: () => SessionEntry.Assistant | undefined
readonly updateAssistant: (assistant: SessionEntry.Assistant) => void
readonly appendEntry: (entry: SessionEntry.Entry) => void
readonly appendPending: (entry: SessionEntry.Entry) => void
readonly finish: () => Result
}
export function memory(state: MemoryState): Adapter<MemoryState> {
const activeAssistantIndex = () =>
state.entries.findLastIndex((entry) => entry.type === "assistant" && !entry.time.completed)
return {
getCurrentAssistant() {
const index = activeAssistantIndex()
if (index < 0) return
const assistant = state.entries[index]
return assistant?.type === "assistant" ? assistant : undefined
},
updateAssistant(assistant) {
const index = activeAssistantIndex()
if (index < 0) return
const current = state.entries[index]
if (current?.type !== "assistant") return
state.entries[index] = assistant
},
appendEntry(entry) {
state.entries.push(entry)
},
appendPending(entry) {
state.pending.push(entry)
},
finish() {
return state
},
}
}
export function stepWith<Result>(adapter: Adapter<Result>, event: SessionEvent.Event): Result {
const currentAssistant = adapter.getCurrentAssistant()
type DraftAssistant = WritableDraft<SessionEntry.Assistant>
type DraftTool = WritableDraft<SessionEntry.AssistantTool>
type DraftText = WritableDraft<SessionEntry.AssistantText>
type DraftReasoning = WritableDraft<SessionEntry.AssistantReasoning>
const latestTool = (assistant: DraftAssistant | undefined, callID?: string) =>
assistant?.content.findLast(
(item): item is DraftTool => item.type === "tool" && (callID === undefined || item.callID === callID),
)
const latestText = (assistant: DraftAssistant | undefined) =>
assistant?.content.findLast((item): item is DraftText => item.type === "text")
const latestReasoning = (assistant: DraftAssistant | undefined) =>
assistant?.content.findLast((item): item is DraftReasoning => item.type === "reasoning")
SessionEvent.Event.match(event, {
prompt: (event) => {
const entry = SessionEntry.User.fromEvent(event)
if (currentAssistant) {
adapter.appendPending(entry)
return
}
adapter.appendEntry(entry)
},
synthetic: (event) => {
adapter.appendEntry(SessionEntry.Synthetic.fromEvent(event))
},
"step.started": (event) => {
if (currentAssistant) {
adapter.updateAssistant(
produce(currentAssistant, (draft) => {
draft.time.completed = event.timestamp
}),
)
}
adapter.appendEntry(SessionEntry.Assistant.fromEvent(event))
},
"step.ended": (event) => {
if (currentAssistant) {
adapter.updateAssistant(
produce(currentAssistant, (draft) => {
draft.time.completed = event.timestamp
draft.cost = event.cost
draft.tokens = event.tokens
}),
)
}
},
"text.started": () => {
if (currentAssistant) {
adapter.updateAssistant(
produce(currentAssistant, (draft) => {
draft.content.push({
type: "text",
text: "",
})
}),
)
}
},
"text.delta": (event) => {
if (currentAssistant) {
adapter.updateAssistant(
produce(currentAssistant, (draft) => {
const match = latestText(draft)
if (match) match.text += event.delta
}),
)
}
},
"text.ended": () => {},
"tool.input.started": (event) => {
if (currentAssistant) {
adapter.updateAssistant(
produce(currentAssistant, (draft) => {
draft.content.push({
type: "tool",
callID: event.callID,
name: event.name,
time: {
created: event.timestamp,
},
state: {
status: "pending",
input: "",
},
})
}),
)
}
},
"tool.input.delta": (event) => {
if (currentAssistant) {
adapter.updateAssistant(
produce(currentAssistant, (draft) => {
const match = latestTool(draft, event.callID)
// oxlint-disable-next-line no-base-to-string -- event.delta is a Schema.String (runtime string)
if (match && match.state.status === "pending") match.state.input += event.delta
}),
)
}
},
"tool.input.ended": () => {},
"tool.called": (event) => {
if (currentAssistant) {
adapter.updateAssistant(
produce(currentAssistant, (draft) => {
const match = latestTool(draft, event.callID)
if (match) {
match.time.ran = event.timestamp
match.state = {
status: "running",
input: event.input,
}
}
}),
)
}
},
"tool.success": (event) => {
if (currentAssistant) {
adapter.updateAssistant(
produce(currentAssistant, (draft) => {
const match = latestTool(draft, event.callID)
if (match && match.state.status === "running") {
match.state = {
status: "completed",
input: match.state.input,
output: event.output ?? "",
title: event.title,
metadata: event.metadata ?? {},
attachments: [...(event.attachments ?? [])],
}
}
}),
)
}
},
"tool.error": (event) => {
if (currentAssistant) {
adapter.updateAssistant(
produce(currentAssistant, (draft) => {
const match = latestTool(draft, event.callID)
if (match && match.state.status === "running") {
match.state = {
status: "error",
error: event.error,
input: match.state.input,
metadata: event.metadata ?? {},
}
}
}),
)
}
},
"reasoning.started": () => {
if (currentAssistant) {
adapter.updateAssistant(
produce(currentAssistant, (draft) => {
draft.content.push({
type: "reasoning",
text: "",
})
}),
)
}
},
"reasoning.delta": (event) => {
if (currentAssistant) {
adapter.updateAssistant(
produce(currentAssistant, (draft) => {
const match = latestReasoning(draft)
if (match) match.text += event.delta
}),
)
}
},
"reasoning.ended": (event) => {
if (currentAssistant) {
adapter.updateAssistant(
produce(currentAssistant, (draft) => {
const match = latestReasoning(draft)
if (match) match.text = event.text
}),
)
}
},
retried: (event) => {
if (currentAssistant) {
adapter.updateAssistant(
produce(currentAssistant, (draft) => {
draft.retries = [...(draft.retries ?? []), SessionEntry.AssistantRetry.fromEvent(event)]
}),
)
}
},
compacted: (event) => {
adapter.appendEntry(SessionEntry.Compaction.fromEvent(event))
},
})
return adapter.finish()
}
export function step(old: MemoryState, event: SessionEvent.Event): MemoryState {
return produce(old, (draft) => {
stepWith(memory(draft as MemoryState), event)
})
}
export * as SessionEntryStepper from "./session-entry-stepper"

View File

@@ -0,0 +1,220 @@
import { Schema } from "effect"
import { NonNegativeInt } from "@/util/schema"
import { SessionEvent } from "./session-event"
export const ID = SessionEvent.ID
export type ID = Schema.Schema.Type<typeof ID>
const Base = {
id: SessionEvent.ID,
metadata: Schema.Record(Schema.String, Schema.Unknown).pipe(Schema.optional),
time: Schema.Struct({
created: Schema.DateTimeUtc,
}),
}
export class User extends Schema.Class<User>("Session.Entry.User")({
...Base,
text: SessionEvent.Prompt.fields.text,
files: SessionEvent.Prompt.fields.files,
agents: SessionEvent.Prompt.fields.agents,
type: Schema.Literal("user"),
time: Schema.Struct({
created: Schema.DateTimeUtc,
}),
}) {
static fromEvent(event: SessionEvent.Prompt) {
return new User({
id: event.id,
type: "user",
metadata: event.metadata,
text: event.text,
files: event.files,
agents: event.agents,
time: { created: event.timestamp },
})
}
}
export class Synthetic extends Schema.Class<Synthetic>("Session.Entry.Synthetic")({
...SessionEvent.Synthetic.fields,
...Base,
type: Schema.Literal("synthetic"),
}) {
static fromEvent(event: SessionEvent.Synthetic) {
return new Synthetic({
...event,
time: { created: event.timestamp },
})
}
}
export class ToolStatePending extends Schema.Class<ToolStatePending>("Session.Entry.ToolState.Pending")({
status: Schema.Literal("pending"),
input: Schema.String,
}) {}
export class ToolStateRunning extends Schema.Class<ToolStateRunning>("Session.Entry.ToolState.Running")({
status: Schema.Literal("running"),
input: Schema.Record(Schema.String, Schema.Unknown),
title: Schema.String.pipe(Schema.optional),
metadata: Schema.Record(Schema.String, Schema.Unknown).pipe(Schema.optional),
}) {}
export class ToolStateCompleted extends Schema.Class<ToolStateCompleted>("Session.Entry.ToolState.Completed")({
status: Schema.Literal("completed"),
input: Schema.Record(Schema.String, Schema.Unknown),
output: Schema.String,
title: Schema.String,
metadata: Schema.Record(Schema.String, Schema.Unknown),
attachments: SessionEvent.FileAttachment.pipe(Schema.Array, Schema.optional),
}) {}
export class ToolStateError extends Schema.Class<ToolStateError>("Session.Entry.ToolState.Error")({
status: Schema.Literal("error"),
input: Schema.Record(Schema.String, Schema.Unknown),
error: Schema.String,
metadata: Schema.Record(Schema.String, Schema.Unknown).pipe(Schema.optional),
}) {}
export const ToolState = Schema.Union([ToolStatePending, ToolStateRunning, ToolStateCompleted, ToolStateError]).pipe(
Schema.toTaggedUnion("status"),
)
export type ToolState = Schema.Schema.Type<typeof ToolState>
export class AssistantTool extends Schema.Class<AssistantTool>("Session.Entry.Assistant.Tool")({
type: Schema.Literal("tool"),
callID: Schema.String,
name: Schema.String,
state: ToolState,
time: Schema.Struct({
created: Schema.DateTimeUtc,
ran: Schema.DateTimeUtc.pipe(Schema.optional),
completed: Schema.DateTimeUtc.pipe(Schema.optional),
pruned: Schema.DateTimeUtc.pipe(Schema.optional),
}),
}) {}
export class AssistantText extends Schema.Class<AssistantText>("Session.Entry.Assistant.Text")({
type: Schema.Literal("text"),
text: Schema.String,
}) {}
export class AssistantReasoning extends Schema.Class<AssistantReasoning>("Session.Entry.Assistant.Reasoning")({
type: Schema.Literal("reasoning"),
text: Schema.String,
}) {}
export class AssistantRetry extends Schema.Class<AssistantRetry>("Session.Entry.Assistant.Retry")({
attempt: NonNegativeInt,
error: SessionEvent.RetryError,
time: Schema.Struct({
created: Schema.DateTimeUtc,
}),
}) {
static fromEvent(event: SessionEvent.Retried) {
return new AssistantRetry({
attempt: event.attempt,
error: event.error,
time: {
created: event.timestamp,
},
})
}
}
export const AssistantContent = Schema.Union([AssistantText, AssistantReasoning, AssistantTool]).pipe(
Schema.toTaggedUnion("type"),
)
export type AssistantContent = Schema.Schema.Type<typeof AssistantContent>
export class Assistant extends Schema.Class<Assistant>("Session.Entry.Assistant")({
...Base,
type: Schema.Literal("assistant"),
content: AssistantContent.pipe(Schema.Array),
retries: AssistantRetry.pipe(Schema.Array, Schema.optional),
cost: Schema.Finite.pipe(Schema.optional),
tokens: Schema.Struct({
input: NonNegativeInt,
output: NonNegativeInt,
reasoning: NonNegativeInt,
cache: Schema.Struct({
read: NonNegativeInt,
write: NonNegativeInt,
}),
}).pipe(Schema.optional),
error: Schema.String.pipe(Schema.optional),
time: Schema.Struct({
created: Schema.DateTimeUtc,
completed: Schema.DateTimeUtc.pipe(Schema.optional),
}),
}) {
static fromEvent(event: SessionEvent.Step.Started) {
return new Assistant({
id: event.id,
type: "assistant",
time: {
created: event.timestamp,
},
content: [],
retries: [],
})
}
}
export class Compaction extends Schema.Class<Compaction>("Session.Entry.Compaction")({
...SessionEvent.Compacted.fields,
type: Schema.Literal("compaction"),
...Base,
}) {
static fromEvent(event: SessionEvent.Compacted) {
return new Compaction({
...event,
type: "compaction",
time: { created: event.timestamp },
})
}
}
export const Entry = Schema.Union([User, Synthetic, Assistant, Compaction]).pipe(Schema.toTaggedUnion("type"))
export type Entry = Schema.Schema.Type<typeof Entry>
export type Type = Entry["type"]
/*
export interface Interface {
readonly decode: (row: typeof SessionEntryTable.$inferSelect) => Entry
readonly fromSession: (sessionID: SessionID) => Effect.Effect<Entry[], never>
}
export class Service extends Context.Service<Service, Interface>()("@opencode/SessionEntry") {}
export const layer: Layer.Layer<Service, never, never> = Layer.effect(
Service,
Effect.gen(function* () {
const decodeEntry = Schema.decodeUnknownSync(Entry)
const decode: (typeof Service.Service)["decode"] = (row) => decodeEntry({ ...row, id: row.id, type: row.type })
const fromSession = Effect.fn("SessionEntry.fromSession")(function* (sessionID: SessionID) {
return Database.use((db) =>
db
.select()
.from(SessionEntryTable)
.where(eq(SessionEntryTable.session_id, sessionID))
.orderBy(SessionEntryTable.id)
.all()
.map((row) => decode(row)),
)
})
return Service.of({
decode,
fromSession,
})
}),
)
*/
export * as SessionEntry from "./session-entry"

View File

@@ -1,118 +1,128 @@
import { SessionID } from "@/session/schema"
import { NonNegativeInt } from "@/util/schema"
import { EventV2 } from "./event"
import { FileAttachment, Prompt } from "./session-prompt"
import { Identifier } from "@/id/id"
import { NonNegativeInt, withStatics } from "@/util/schema"
import * as DateTime from "effect/DateTime"
import { Schema } from "effect"
export { FileAttachment }
import { ToolOutput } from "./tool-output"
import { ModelID, ProviderID } from "@/provider/schema"
export const Source = Schema.Struct({
start: NonNegativeInt,
end: NonNegativeInt,
text: Schema.String,
}).annotate({
identifier: "session.next.event.source",
})
export type Source = Schema.Schema.Type<typeof Source>
export namespace SessionEvent {
export const ID = Schema.String.pipe(
Schema.brand("Session.Event.ID"),
withStatics((s) => ({
create: () => s.make(Identifier.create("evt", "ascending")),
})),
)
export type ID = Schema.Schema.Type<typeof ID>
type Stamp = Schema.Schema.Type<typeof Schema.DateTimeUtc>
type BaseInput = {
id?: ID
metadata?: Record<string, unknown>
timestamp?: Stamp
}
const Base = {
timestamp: Schema.DateTimeUtcFromMillis,
sessionID: SessionID,
}
const Base = {
id: ID,
metadata: Schema.Record(Schema.String, Schema.Unknown).pipe(Schema.optional),
timestamp: Schema.DateTimeUtc,
}
export const AgentSwitched = EventV2.define({
type: "session.next.agent.switched",
aggregate: "sessionID",
version: 1,
schema: {
...Base,
agent: Schema.String,
},
})
export type AgentSwitched = Schema.Schema.Type<typeof AgentSwitched>
export const ModelSwitched = EventV2.define({
type: "session.next.model.switched",
aggregate: "sessionID",
version: 1,
schema: {
...Base,
id: ModelID,
providerID: ProviderID,
variant: Schema.String.pipe(Schema.optional),
},
})
export type ModelSwitched = Schema.Schema.Type<typeof ModelSwitched>
export const Prompted = EventV2.define({
type: "session.next.prompted",
aggregate: "sessionID",
version: 1,
schema: {
...Base,
prompt: Prompt,
},
})
export type Prompted = Schema.Schema.Type<typeof Prompted>
export const Synthetic = EventV2.define({
type: "session.next.synthetic",
aggregate: "sessionID",
schema: {
...Base,
export class Source extends Schema.Class<Source>("Session.Event.Source")({
start: NonNegativeInt,
end: NonNegativeInt,
text: Schema.String,
},
})
export type Synthetic = Schema.Schema.Type<typeof Synthetic>
}) {}
export namespace Shell {
export const Started = EventV2.define({
type: "session.next.shell.started",
aggregate: "sessionID",
schema: {
...Base,
callID: Schema.String,
command: Schema.String,
},
})
export type Started = Schema.Schema.Type<typeof Started>
export class FileAttachment extends Schema.Class<FileAttachment>("Session.Event.FileAttachment")({
uri: Schema.String,
mime: Schema.String,
name: Schema.String.pipe(Schema.optional),
description: Schema.String.pipe(Schema.optional),
source: Source.pipe(Schema.optional),
}) {
static create(input: FileAttachment) {
return new FileAttachment({
uri: input.uri,
mime: input.mime,
name: input.name,
description: input.description,
source: input.source,
})
}
}
export const Ended = EventV2.define({
type: "session.next.shell.ended",
aggregate: "sessionID",
schema: {
...Base,
callID: Schema.String,
output: Schema.String,
},
})
export type Ended = Schema.Schema.Type<typeof Ended>
}
export class AgentAttachment extends Schema.Class<AgentAttachment>("Session.Event.AgentAttachment")({
name: Schema.String,
source: Source.pipe(Schema.optional),
}) {}
export namespace Step {
export const Started = EventV2.define({
type: "session.next.step.started",
aggregate: "sessionID",
schema: {
export class RetryError extends Schema.Class<RetryError>("Session.Event.Retry.Error")({
message: Schema.String,
statusCode: NonNegativeInt.pipe(Schema.optional),
isRetryable: Schema.Boolean,
responseHeaders: Schema.Record(Schema.String, Schema.String).pipe(Schema.optional),
responseBody: Schema.String.pipe(Schema.optional),
metadata: Schema.Record(Schema.String, Schema.String).pipe(Schema.optional),
}) {}
export class Prompt extends Schema.Class<Prompt>("Session.Event.Prompt")({
...Base,
type: Schema.Literal("prompt"),
text: Schema.String,
files: Schema.Array(FileAttachment).pipe(Schema.optional),
agents: Schema.Array(AgentAttachment).pipe(Schema.optional),
}) {
static create(input: BaseInput & { text: string; files?: FileAttachment[]; agents?: AgentAttachment[] }) {
return new Prompt({
id: input.id ?? ID.create(),
type: "prompt",
timestamp: input.timestamp ?? DateTime.makeUnsafe(Date.now()),
metadata: input.metadata,
text: input.text,
files: input.files,
agents: input.agents,
})
}
}
export class Synthetic extends Schema.Class<Synthetic>("Session.Event.Synthetic")({
...Base,
type: Schema.Literal("synthetic"),
text: Schema.String,
}) {
static create(input: BaseInput & { text: string }) {
return new Synthetic({
id: input.id ?? ID.create(),
type: "synthetic",
timestamp: input.timestamp ?? DateTime.makeUnsafe(Date.now()),
metadata: input.metadata,
text: input.text,
})
}
}
export namespace Step {
export class Started extends Schema.Class<Started>("Session.Event.Step.Started")({
...Base,
agent: Schema.String,
type: Schema.Literal("step.started"),
model: Schema.Struct({
id: Schema.String,
providerID: Schema.String,
variant: Schema.String.pipe(Schema.optional),
}),
snapshot: Schema.String.pipe(Schema.optional),
},
})
export type Started = Schema.Schema.Type<typeof Started>
}) {
static create(input: BaseInput & { model: { id: string; providerID: string; variant?: string } }) {
return new Started({
id: input.id ?? ID.create(),
type: "step.started",
timestamp: input.timestamp ?? DateTime.makeUnsafe(Date.now()),
metadata: input.metadata,
model: input.model,
})
}
}
export const Ended = EventV2.define({
type: "session.next.step.ended",
aggregate: "sessionID",
schema: {
export class Ended extends Schema.Class<Ended>("Session.Event.Step.Ended")({
...Base,
finish: Schema.String,
type: Schema.Literal("step.ended"),
reason: Schema.String,
cost: Schema.Finite,
tokens: Schema.Struct({
input: NonNegativeInt,
@@ -123,118 +133,177 @@ export namespace Step {
write: NonNegativeInt,
}),
}),
snapshot: Schema.String.pipe(Schema.optional),
},
})
export type Ended = Schema.Schema.Type<typeof Ended>
}
}) {
static create(input: BaseInput & { reason: string; cost: number; tokens: Ended["tokens"] }) {
return new Ended({
id: input.id ?? ID.create(),
type: "step.ended",
timestamp: input.timestamp ?? DateTime.makeUnsafe(Date.now()),
metadata: input.metadata,
reason: input.reason,
cost: input.cost,
tokens: input.tokens,
})
}
}
}
export namespace Text {
export const Started = EventV2.define({
type: "session.next.text.started",
aggregate: "sessionID",
schema: {
export namespace Text {
export class Started extends Schema.Class<Started>("Session.Event.Text.Started")({
...Base,
},
})
export type Started = Schema.Schema.Type<typeof Started>
type: Schema.Literal("text.started"),
}) {
static create(input: BaseInput = {}) {
return new Started({
id: input.id ?? ID.create(),
type: "text.started",
timestamp: input.timestamp ?? DateTime.makeUnsafe(Date.now()),
metadata: input.metadata,
})
}
}
export const Delta = EventV2.define({
type: "session.next.text.delta",
aggregate: "sessionID",
schema: {
export class Delta extends Schema.Class<Delta>("Session.Event.Text.Delta")({
...Base,
type: Schema.Literal("text.delta"),
delta: Schema.String,
},
})
export type Delta = Schema.Schema.Type<typeof Delta>
}) {
static create(input: BaseInput & { delta: string }) {
return new Delta({
id: input.id ?? ID.create(),
type: "text.delta",
timestamp: input.timestamp ?? DateTime.makeUnsafe(Date.now()),
metadata: input.metadata,
delta: input.delta,
})
}
}
export const Ended = EventV2.define({
type: "session.next.text.ended",
aggregate: "sessionID",
schema: {
export class Ended extends Schema.Class<Ended>("Session.Event.Text.Ended")({
...Base,
type: Schema.Literal("text.ended"),
text: Schema.String,
},
})
export type Ended = Schema.Schema.Type<typeof Ended>
}
}) {
static create(input: BaseInput & { text: string }) {
return new Ended({
id: input.id ?? ID.create(),
type: "text.ended",
timestamp: input.timestamp ?? DateTime.makeUnsafe(Date.now()),
metadata: input.metadata,
text: input.text,
})
}
}
}
export namespace Reasoning {
export const Started = EventV2.define({
type: "session.next.reasoning.started",
aggregate: "sessionID",
schema: {
export namespace Reasoning {
export class Started extends Schema.Class<Started>("Session.Event.Reasoning.Started")({
...Base,
reasoningID: Schema.String,
},
})
export type Started = Schema.Schema.Type<typeof Started>
type: Schema.Literal("reasoning.started"),
}) {
static create(input: BaseInput = {}) {
return new Started({
id: input.id ?? ID.create(),
type: "reasoning.started",
timestamp: input.timestamp ?? DateTime.makeUnsafe(Date.now()),
metadata: input.metadata,
})
}
}
export const Delta = EventV2.define({
type: "session.next.reasoning.delta",
aggregate: "sessionID",
schema: {
export class Delta extends Schema.Class<Delta>("Session.Event.Reasoning.Delta")({
...Base,
reasoningID: Schema.String,
type: Schema.Literal("reasoning.delta"),
delta: Schema.String,
},
})
export type Delta = Schema.Schema.Type<typeof Delta>
}) {
static create(input: BaseInput & { delta: string }) {
return new Delta({
id: input.id ?? ID.create(),
type: "reasoning.delta",
timestamp: input.timestamp ?? DateTime.makeUnsafe(Date.now()),
metadata: input.metadata,
delta: input.delta,
})
}
}
export const Ended = EventV2.define({
type: "session.next.reasoning.ended",
aggregate: "sessionID",
schema: {
export class Ended extends Schema.Class<Ended>("Session.Event.Reasoning.Ended")({
...Base,
reasoningID: Schema.String,
type: Schema.Literal("reasoning.ended"),
text: Schema.String,
},
})
export type Ended = Schema.Schema.Type<typeof Ended>
}
}) {
static create(input: BaseInput & { text: string }) {
return new Ended({
id: input.id ?? ID.create(),
type: "reasoning.ended",
timestamp: input.timestamp ?? DateTime.makeUnsafe(Date.now()),
metadata: input.metadata,
text: input.text,
})
}
}
}
export namespace Tool {
export namespace Input {
export const Started = EventV2.define({
type: "session.next.tool.input.started",
aggregate: "sessionID",
schema: {
export namespace Tool {
export namespace Input {
export class Started extends Schema.Class<Started>("Session.Event.Tool.Input.Started")({
...Base,
callID: Schema.String,
name: Schema.String,
},
})
export type Started = Schema.Schema.Type<typeof Started>
type: Schema.Literal("tool.input.started"),
}) {
static create(input: BaseInput & { callID: string; name: string }) {
return new Started({
id: input.id ?? ID.create(),
type: "tool.input.started",
timestamp: input.timestamp ?? DateTime.makeUnsafe(Date.now()),
metadata: input.metadata,
callID: input.callID,
name: input.name,
})
}
}
export const Delta = EventV2.define({
type: "session.next.tool.input.delta",
aggregate: "sessionID",
schema: {
export class Delta extends Schema.Class<Delta>("Session.Event.Tool.Input.Delta")({
...Base,
callID: Schema.String,
type: Schema.Literal("tool.input.delta"),
delta: Schema.String,
},
})
export type Delta = Schema.Schema.Type<typeof Delta>
}) {
static create(input: BaseInput & { callID: string; delta: string }) {
return new Delta({
id: input.id ?? ID.create(),
type: "tool.input.delta",
timestamp: input.timestamp ?? DateTime.makeUnsafe(Date.now()),
metadata: input.metadata,
callID: input.callID,
delta: input.delta,
})
}
}
export const Ended = EventV2.define({
type: "session.next.tool.input.ended",
aggregate: "sessionID",
schema: {
export class Ended extends Schema.Class<Ended>("Session.Event.Tool.Input.Ended")({
...Base,
callID: Schema.String,
type: Schema.Literal("tool.input.ended"),
text: Schema.String,
},
})
export type Ended = Schema.Schema.Type<typeof Ended>
}
}) {
static create(input: BaseInput & { callID: string; text: string }) {
return new Ended({
id: input.id ?? ID.create(),
type: "tool.input.ended",
timestamp: input.timestamp ?? DateTime.makeUnsafe(Date.now()),
metadata: input.metadata,
callID: input.callID,
text: input.text,
})
}
}
}
export const Called = EventV2.define({
type: "session.next.tool.called",
aggregate: "sessionID",
schema: {
export class Called extends Schema.Class<Called>("Session.Event.Tool.Called")({
...Base,
type: Schema.Literal("tool.called"),
callID: Schema.String,
tool: Schema.String,
input: Schema.Record(Schema.String, Schema.Unknown),
@@ -242,155 +311,148 @@ export namespace Tool {
executed: Schema.Boolean,
metadata: Schema.Record(Schema.String, Schema.Unknown).pipe(Schema.optional),
}),
},
})
export type Called = Schema.Schema.Type<typeof Called>
}) {
static create(
input: BaseInput & {
callID: string
tool: string
input: Record<string, unknown>
provider: Called["provider"]
},
) {
return new Called({
id: input.id ?? ID.create(),
type: "tool.called",
timestamp: input.timestamp ?? DateTime.makeUnsafe(Date.now()),
metadata: input.metadata,
callID: input.callID,
tool: input.tool,
input: input.input,
provider: input.provider,
})
}
}
export const Progress = EventV2.define({
type: "session.next.tool.progress",
aggregate: "sessionID",
schema: {
export class Success extends Schema.Class<Success>("Session.Event.Tool.Success")({
...Base,
type: Schema.Literal("tool.success"),
callID: Schema.String,
structured: ToolOutput.Structured,
content: Schema.Array(ToolOutput.Content),
},
})
export type Progress = Schema.Schema.Type<typeof Progress>
export const Success = EventV2.define({
type: "session.next.tool.success",
aggregate: "sessionID",
schema: {
...Base,
callID: Schema.String,
structured: ToolOutput.Structured,
content: Schema.Array(ToolOutput.Content),
title: Schema.String,
output: Schema.String.pipe(Schema.optional),
attachments: Schema.Array(FileAttachment).pipe(Schema.optional),
provider: Schema.Struct({
executed: Schema.Boolean,
metadata: Schema.Record(Schema.String, Schema.Unknown).pipe(Schema.optional),
}),
},
})
export type Success = Schema.Schema.Type<typeof Success>
}) {
static create(
input: BaseInput & {
callID: string
title: string
output?: string
attachments?: FileAttachment[]
provider: Success["provider"]
},
) {
return new Success({
id: input.id ?? ID.create(),
type: "tool.success",
timestamp: input.timestamp ?? DateTime.makeUnsafe(Date.now()),
metadata: input.metadata,
callID: input.callID,
title: input.title,
output: input.output,
attachments: input.attachments,
provider: input.provider,
})
}
}
export const Error = EventV2.define({
type: "session.next.tool.error",
aggregate: "sessionID",
schema: {
export class Error extends Schema.Class<Error>("Session.Event.Tool.Error")({
...Base,
type: Schema.Literal("tool.error"),
callID: Schema.String,
error: Schema.Struct({
type: Schema.String,
message: Schema.String,
}),
error: Schema.String,
provider: Schema.Struct({
executed: Schema.Boolean,
metadata: Schema.Record(Schema.String, Schema.Unknown).pipe(Schema.optional),
}),
},
})
export type Error = Schema.Schema.Type<typeof Error>
}
}) {
static create(input: BaseInput & { callID: string; error: string; provider: Error["provider"] }) {
return new Error({
id: input.id ?? ID.create(),
type: "tool.error",
timestamp: input.timestamp ?? DateTime.makeUnsafe(Date.now()),
metadata: input.metadata,
callID: input.callID,
error: input.error,
provider: input.provider,
})
}
}
}
export const RetryError = Schema.Struct({
message: Schema.String,
statusCode: NonNegativeInt.pipe(Schema.optional),
isRetryable: Schema.Boolean,
responseHeaders: Schema.Record(Schema.String, Schema.String).pipe(Schema.optional),
responseBody: Schema.String.pipe(Schema.optional),
metadata: Schema.Record(Schema.String, Schema.String).pipe(Schema.optional),
}).annotate({
identifier: "session.next.retry_error",
})
export type RetryError = Schema.Schema.Type<typeof RetryError>
export const Retried = EventV2.define({
type: "session.next.retried",
aggregate: "sessionID",
schema: {
export class Retried extends Schema.Class<Retried>("Session.Event.Retried")({
...Base,
type: Schema.Literal("retried"),
attempt: NonNegativeInt,
error: RetryError,
},
})
export type Retried = Schema.Schema.Type<typeof Retried>
}) {
static create(input: BaseInput & { attempt: number; error: RetryError }) {
return new Retried({
id: input.id ?? ID.create(),
type: "retried",
timestamp: input.timestamp ?? DateTime.makeUnsafe(Date.now()),
metadata: input.metadata,
attempt: input.attempt,
error: input.error,
})
}
}
export namespace Compaction {
export const Started = EventV2.define({
type: "session.next.compaction.started",
aggregate: "sessionID",
schema: {
...Base,
reason: Schema.Union([Schema.Literal("auto"), Schema.Literal("manual")]),
},
})
export type Started = Schema.Schema.Type<typeof Started>
export class Compacted extends Schema.Class<Compacted>("Session.Event.Compated")({
...Base,
type: Schema.Literal("compacted"),
auto: Schema.Boolean,
overflow: Schema.Boolean.pipe(Schema.optional),
}) {
static create(input: BaseInput & { auto: boolean; overflow?: boolean }) {
return new Compacted({
id: input.id ?? ID.create(),
type: "compacted",
timestamp: input.timestamp ?? DateTime.makeUnsafe(Date.now()),
metadata: input.metadata,
auto: input.auto,
overflow: input.overflow,
})
}
}
export const Delta = EventV2.define({
type: "session.next.compaction.delta",
aggregate: "sessionID",
schema: {
...Base,
text: Schema.String,
export const Event = Schema.Union(
[
Prompt,
Synthetic,
Step.Started,
Step.Ended,
Text.Started,
Text.Delta,
Text.Ended,
Tool.Input.Started,
Tool.Input.Delta,
Tool.Input.Ended,
Tool.Called,
Tool.Success,
Tool.Error,
Reasoning.Started,
Reasoning.Delta,
Reasoning.Ended,
Retried,
Compacted,
],
{
mode: "oneOf",
},
})
export const Ended = EventV2.define({
type: "session.next.compaction.ended",
aggregate: "sessionID",
schema: {
...Base,
text: Schema.String,
include: Schema.String.pipe(Schema.optional),
},
})
export type Ended = Schema.Schema.Type<typeof Ended>
).pipe(Schema.toTaggedUnion("type"))
export type Event = Schema.Schema.Type<typeof Event>
export type Type = Event["type"]
}
export const All = Schema.Union(
[
AgentSwitched,
ModelSwitched,
Prompted,
Synthetic,
Shell.Started,
Shell.Ended,
Step.Started,
Step.Ended,
Text.Started,
Text.Delta,
Text.Ended,
Tool.Input.Started,
Tool.Input.Delta,
Tool.Input.Ended,
Tool.Called,
Tool.Progress,
Tool.Success,
Tool.Error,
Reasoning.Started,
Reasoning.Delta,
Reasoning.Ended,
Retried,
Compaction.Started,
Compaction.Delta,
Compaction.Ended,
],
{
mode: "oneOf",
},
).pipe(Schema.toTaggedUnion("type"))
// user
// assistant
// assistant
// assistant
// user
// compaction marker
// -> text
// assistant
export type Event = Schema.Schema.Type<typeof All>
export type Type = Event["type"]
export * as SessionEvent from "./session-event"

View File

@@ -1,411 +0,0 @@
import { produce, type WritableDraft } from "immer"
import { SessionEvent } from "./session-event"
import { SessionMessage } from "./session-message"
export type MemoryState = {
messages: SessionMessage.Message[]
}
export interface Adapter<Result> {
readonly getCurrentAssistant: () => SessionMessage.Assistant | undefined
readonly getCurrentCompaction: () => SessionMessage.Compaction | undefined
readonly getCurrentShell: (callID: string) => SessionMessage.Shell | undefined
readonly updateAssistant: (assistant: SessionMessage.Assistant) => void
readonly updateCompaction: (compaction: SessionMessage.Compaction) => void
readonly updateShell: (shell: SessionMessage.Shell) => void
readonly appendMessage: (message: SessionMessage.Message) => void
readonly finish: () => Result
}
export function memory(state: MemoryState): Adapter<MemoryState> {
const activeAssistantIndex = () =>
state.messages.findLastIndex((message) => message.type === "assistant" && !message.time.completed)
const activeCompactionIndex = () => state.messages.findLastIndex((message) => message.type === "compaction")
const activeShellIndex = (callID: string) =>
state.messages.findLastIndex((message) => message.type === "shell" && message.callID === callID)
return {
getCurrentAssistant() {
const index = activeAssistantIndex()
if (index < 0) return
const assistant = state.messages[index]
return assistant?.type === "assistant" ? assistant : undefined
},
getCurrentCompaction() {
const index = activeCompactionIndex()
if (index < 0) return
const compaction = state.messages[index]
return compaction?.type === "compaction" ? compaction : undefined
},
getCurrentShell(callID) {
const index = activeShellIndex(callID)
if (index < 0) return
const shell = state.messages[index]
return shell?.type === "shell" ? shell : undefined
},
updateAssistant(assistant) {
const index = activeAssistantIndex()
if (index < 0) return
const current = state.messages[index]
if (current?.type !== "assistant") return
state.messages[index] = assistant
},
updateCompaction(compaction) {
const index = activeCompactionIndex()
if (index < 0) return
const current = state.messages[index]
if (current?.type !== "compaction") return
state.messages[index] = compaction
},
updateShell(shell) {
const index = activeShellIndex(shell.callID)
if (index < 0) return
const current = state.messages[index]
if (current?.type !== "shell") return
state.messages[index] = shell
},
appendMessage(message) {
state.messages.push(message)
},
finish() {
return state
},
}
}
export function update<Result>(adapter: Adapter<Result>, event: SessionEvent.Event): Result {
const currentAssistant = adapter.getCurrentAssistant()
type DraftAssistant = WritableDraft<SessionMessage.Assistant>
type DraftTool = WritableDraft<SessionMessage.AssistantTool>
type DraftText = WritableDraft<SessionMessage.AssistantText>
type DraftReasoning = WritableDraft<SessionMessage.AssistantReasoning>
const latestTool = (assistant: DraftAssistant | undefined, callID?: string) =>
assistant?.content.findLast(
(item): item is DraftTool => item.type === "tool" && (callID === undefined || item.id === callID),
)
const latestText = (assistant: DraftAssistant | undefined) =>
assistant?.content.findLast((item): item is DraftText => item.type === "text")
const latestReasoning = (assistant: DraftAssistant | undefined, reasoningID: string) =>
assistant?.content.findLast(
(item): item is DraftReasoning => item.type === "reasoning" && item.id === reasoningID,
)
SessionEvent.All.match(event, {
"session.next.agent.switched": (event) => {
adapter.appendMessage(
new SessionMessage.AgentSwitched({
id: event.id,
type: "agent-switched",
metadata: event.metadata,
agent: event.data.agent,
time: { created: event.data.timestamp },
}),
)
},
"session.next.model.switched": (event) => {
adapter.appendMessage(
new SessionMessage.ModelSwitched({
id: event.id,
type: "model-switched",
metadata: event.metadata,
model: {
id: event.data.id,
providerID: event.data.providerID,
variant: event.data.variant,
},
time: { created: event.data.timestamp },
}),
)
},
"session.next.prompted": (event) => {
adapter.appendMessage(
new SessionMessage.User({
id: event.id,
type: "user",
metadata: event.metadata,
text: event.data.prompt.text,
files: event.data.prompt.files,
agents: event.data.prompt.agents,
time: { created: event.data.timestamp },
}),
)
},
"session.next.synthetic": (event) => {
adapter.appendMessage(
new SessionMessage.Synthetic({
sessionID: event.data.sessionID,
text: event.data.text,
id: event.id,
type: "synthetic",
time: { created: event.data.timestamp },
}),
)
},
"session.next.shell.started": (event) => {
adapter.appendMessage(
new SessionMessage.Shell({
id: event.id,
type: "shell",
metadata: event.metadata,
callID: event.data.callID,
command: event.data.command,
output: "",
time: { created: event.data.timestamp },
}),
)
},
"session.next.shell.ended": (event) => {
const currentShell = adapter.getCurrentShell(event.data.callID)
if (currentShell) {
adapter.updateShell(
produce(currentShell, (draft) => {
draft.output = event.data.output
draft.time.completed = event.data.timestamp
}),
)
}
},
"session.next.step.started": (event) => {
if (currentAssistant) {
adapter.updateAssistant(
produce(currentAssistant, (draft) => {
draft.time.completed = event.data.timestamp
}),
)
}
adapter.appendMessage(
new SessionMessage.Assistant({
id: event.id,
type: "assistant",
agent: event.data.agent,
model: event.data.model,
time: { created: event.data.timestamp },
content: [],
snapshot: event.data.snapshot ? { start: event.data.snapshot } : undefined,
}),
)
},
"session.next.step.ended": (event) => {
if (currentAssistant) {
adapter.updateAssistant(
produce(currentAssistant, (draft) => {
draft.time.completed = event.data.timestamp
draft.finish = event.data.finish
draft.cost = event.data.cost
draft.tokens = event.data.tokens
if (event.data.snapshot) draft.snapshot = { ...draft.snapshot, end: event.data.snapshot }
}),
)
}
},
"session.next.text.started": () => {
if (currentAssistant) {
adapter.updateAssistant(
produce(currentAssistant, (draft) => {
draft.content.push({
type: "text",
text: "",
})
}),
)
}
},
"session.next.text.delta": (event) => {
if (currentAssistant) {
adapter.updateAssistant(
produce(currentAssistant, (draft) => {
const match = latestText(draft)
if (match) match.text += event.data.delta
}),
)
}
},
"session.next.text.ended": (event) => {
if (currentAssistant) {
adapter.updateAssistant(
produce(currentAssistant, (draft) => {
const match = latestText(draft)
if (match) match.text = event.data.text
}),
)
}
},
"session.next.tool.input.started": (event) => {
if (currentAssistant) {
adapter.updateAssistant(
produce(currentAssistant, (draft) => {
draft.content.push({
type: "tool",
id: event.data.callID,
name: event.data.name,
time: {
created: event.data.timestamp,
},
state: {
status: "pending",
input: "",
},
})
}),
)
}
},
"session.next.tool.input.delta": (event) => {
if (currentAssistant) {
adapter.updateAssistant(
produce(currentAssistant, (draft) => {
const match = latestTool(draft, event.data.callID)
// oxlint-disable-next-line no-base-to-string -- event.delta is a Schema.String (runtime string)
if (match && match.state.status === "pending") match.state.input += event.data.delta
}),
)
}
},
"session.next.tool.input.ended": () => {},
"session.next.tool.called": (event) => {
if (currentAssistant) {
adapter.updateAssistant(
produce(currentAssistant, (draft) => {
const match = latestTool(draft, event.data.callID)
if (match) {
match.provider = event.data.provider
match.time.ran = event.data.timestamp
match.state = {
status: "running",
input: event.data.input,
structured: {},
content: [],
}
}
}),
)
}
},
"session.next.tool.progress": (event) => {
if (currentAssistant) {
adapter.updateAssistant(
produce(currentAssistant, (draft) => {
const match = latestTool(draft, event.data.callID)
if (match && match.state.status === "running") {
match.state.structured = event.data.structured
match.state.content = [...event.data.content]
}
}),
)
}
},
"session.next.tool.success": (event) => {
if (currentAssistant) {
adapter.updateAssistant(
produce(currentAssistant, (draft) => {
const match = latestTool(draft, event.data.callID)
if (match && match.state.status === "running") {
match.provider = event.data.provider
match.time.completed = event.data.timestamp
match.state = {
status: "completed",
input: match.state.input,
structured: event.data.structured,
content: [...event.data.content],
}
}
}),
)
}
},
"session.next.tool.error": (event) => {
if (currentAssistant) {
adapter.updateAssistant(
produce(currentAssistant, (draft) => {
const match = latestTool(draft, event.data.callID)
if (match && match.state.status === "running") {
match.provider = event.data.provider
match.time.completed = event.data.timestamp
match.state = {
status: "error",
error: event.data.error,
input: match.state.input,
structured: match.state.structured,
content: match.state.content,
}
}
}),
)
}
},
"session.next.reasoning.started": (event) => {
if (currentAssistant) {
adapter.updateAssistant(
produce(currentAssistant, (draft) => {
draft.content.push({
type: "reasoning",
id: event.data.reasoningID,
text: "",
})
}),
)
}
},
"session.next.reasoning.delta": (event) => {
if (currentAssistant) {
adapter.updateAssistant(
produce(currentAssistant, (draft) => {
const match = latestReasoning(draft, event.data.reasoningID)
if (match) match.text += event.data.delta
}),
)
}
},
"session.next.reasoning.ended": (event) => {
if (currentAssistant) {
adapter.updateAssistant(
produce(currentAssistant, (draft) => {
const match = latestReasoning(draft, event.data.reasoningID)
if (match) match.text = event.data.text
}),
)
}
},
"session.next.retried": () => {},
"session.next.compaction.started": (event) => {
adapter.appendMessage(
new SessionMessage.Compaction({
id: event.id,
type: "compaction",
metadata: event.metadata,
reason: event.data.reason,
summary: "",
time: { created: event.data.timestamp },
}),
)
},
"session.next.compaction.delta": (event) => {
const currentCompaction = adapter.getCurrentCompaction()
if (currentCompaction) {
adapter.updateCompaction(
produce(currentCompaction, (draft) => {
draft.summary += event.data.text
}),
)
}
},
"session.next.compaction.ended": (event) => {
const currentCompaction = adapter.getCurrentCompaction()
if (currentCompaction) {
adapter.updateCompaction(
produce(currentCompaction, (draft) => {
draft.summary = event.data.text
draft.include = event.data.include
}),
)
}
},
})
return adapter.finish()
}
export * as SessionMessageUpdater from "./session-message-updater"

View File

@@ -1,177 +0,0 @@
import { Schema } from "effect"
import { Prompt } from "./session-prompt"
import { SessionEvent } from "./session-event"
import { EventV2 } from "./event"
import { ToolOutput } from "./tool-output"
export const ID = EventV2.ID
export type ID = Schema.Schema.Type<typeof ID>
const Base = {
id: ID,
metadata: Schema.Record(Schema.String, Schema.Unknown).pipe(Schema.optional),
time: Schema.Struct({
created: Schema.DateTimeUtcFromMillis,
}),
}
export class AgentSwitched extends Schema.Class<AgentSwitched>("Session.Message.AgentSwitched")({
...Base,
type: Schema.Literal("agent-switched"),
agent: SessionEvent.AgentSwitched.fields.data.fields.agent,
}) {}
export class ModelSwitched extends Schema.Class<ModelSwitched>("Session.Message.ModelSwitched")({
...Base,
type: Schema.Literal("model-switched"),
model: Schema.Struct({
id: SessionEvent.ModelSwitched.fields.data.fields.id,
providerID: SessionEvent.ModelSwitched.fields.data.fields.providerID,
variant: SessionEvent.ModelSwitched.fields.data.fields.variant,
}),
}) {}
export class User extends Schema.Class<User>("Session.Message.User")({
...Base,
text: Prompt.fields.text,
files: Prompt.fields.files,
agents: Prompt.fields.agents,
type: Schema.Literal("user"),
time: Schema.Struct({
created: Schema.DateTimeUtcFromMillis,
}),
}) {}
export class Synthetic extends Schema.Class<Synthetic>("Session.Message.Synthetic")({
...Base,
sessionID: SessionEvent.Synthetic.fields.data.fields.sessionID,
text: SessionEvent.Synthetic.fields.data.fields.text,
type: Schema.Literal("synthetic"),
}) {}
export class Shell extends Schema.Class<Shell>("Session.Message.Shell")({
...Base,
type: Schema.Literal("shell"),
callID: SessionEvent.Shell.Started.fields.data.fields.callID,
command: SessionEvent.Shell.Started.fields.data.fields.command,
output: Schema.String,
time: Schema.Struct({
created: Schema.DateTimeUtcFromMillis,
completed: Schema.DateTimeUtcFromMillis.pipe(Schema.optional),
}),
}) {}
export class ToolStatePending extends Schema.Class<ToolStatePending>("Session.Message.ToolState.Pending")({
status: Schema.Literal("pending"),
input: Schema.String,
}) {}
export class ToolStateRunning extends Schema.Class<ToolStateRunning>("Session.Message.ToolState.Running")({
status: Schema.Literal("running"),
input: Schema.Record(Schema.String, Schema.Unknown),
structured: ToolOutput.Structured,
content: ToolOutput.Content.pipe(Schema.Array),
}) {}
export class ToolStateCompleted extends Schema.Class<ToolStateCompleted>("Session.Message.ToolState.Completed")({
status: Schema.Literal("completed"),
input: Schema.Record(Schema.String, Schema.Unknown),
attachments: SessionEvent.FileAttachment.pipe(Schema.Array, Schema.optional),
content: ToolOutput.Content.pipe(Schema.Array),
structured: ToolOutput.Structured,
}) {}
export class ToolStateError extends Schema.Class<ToolStateError>("Session.Message.ToolState.Error")({
status: Schema.Literal("error"),
input: Schema.Record(Schema.String, Schema.Unknown),
content: ToolOutput.Content.pipe(Schema.Array),
structured: ToolOutput.Structured,
error: Schema.Struct({
type: Schema.String,
message: Schema.String,
}),
}) {}
export const ToolState = Schema.Union([ToolStatePending, ToolStateRunning, ToolStateCompleted, ToolStateError]).pipe(
Schema.toTaggedUnion("status"),
)
export type ToolState = Schema.Schema.Type<typeof ToolState>
export class AssistantTool extends Schema.Class<AssistantTool>("Session.Message.Assistant.Tool")({
type: Schema.Literal("tool"),
id: Schema.String,
name: Schema.String,
provider: Schema.Struct({
executed: Schema.Boolean,
metadata: Schema.Record(Schema.String, Schema.Unknown).pipe(Schema.optional),
}).pipe(Schema.optional),
state: ToolState,
time: Schema.Struct({
created: Schema.DateTimeUtcFromMillis,
ran: Schema.DateTimeUtcFromMillis.pipe(Schema.optional),
completed: Schema.DateTimeUtcFromMillis.pipe(Schema.optional),
pruned: Schema.DateTimeUtcFromMillis.pipe(Schema.optional),
}),
}) {}
export class AssistantText extends Schema.Class<AssistantText>("Session.Message.Assistant.Text")({
type: Schema.Literal("text"),
text: Schema.String,
}) {}
export class AssistantReasoning extends Schema.Class<AssistantReasoning>("Session.Message.Assistant.Reasoning")({
type: Schema.Literal("reasoning"),
id: Schema.String,
text: Schema.String,
}) {}
export const AssistantContent = Schema.Union([AssistantText, AssistantReasoning, AssistantTool]).pipe(
Schema.toTaggedUnion("type"),
)
export type AssistantContent = Schema.Schema.Type<typeof AssistantContent>
export class Assistant extends Schema.Class<Assistant>("Session.Message.Assistant")({
...Base,
type: Schema.Literal("assistant"),
agent: Schema.String,
model: SessionEvent.Step.Started.fields.data.fields.model,
content: AssistantContent.pipe(Schema.Array),
snapshot: Schema.Struct({
start: Schema.String.pipe(Schema.optional),
end: Schema.String.pipe(Schema.optional),
}).pipe(Schema.optional),
finish: Schema.String.pipe(Schema.optional),
cost: Schema.Number.pipe(Schema.optional),
tokens: Schema.Struct({
input: Schema.Number,
output: Schema.Number,
reasoning: Schema.Number,
cache: Schema.Struct({
read: Schema.Number,
write: Schema.Number,
}),
}).pipe(Schema.optional),
error: Schema.String.pipe(Schema.optional),
time: Schema.Struct({
created: Schema.DateTimeUtcFromMillis,
completed: Schema.DateTimeUtcFromMillis.pipe(Schema.optional),
}),
}) {}
export class Compaction extends Schema.Class<Compaction>("Session.Message.Compaction")({
type: Schema.Literal("compaction"),
reason: SessionEvent.Compaction.Started.fields.data.fields.reason,
summary: Schema.String,
include: Schema.String.pipe(Schema.optional),
...Base,
}) {}
export const Message = Schema.Union([AgentSwitched, ModelSwitched, User, Synthetic, Shell, Assistant, Compaction])
.pipe(Schema.toTaggedUnion("type"))
.annotate({ identifier: "Session.Message" })
export type Message = Schema.Schema.Type<typeof Message>
export type Type = Message["type"]
export * as SessionMessage from "./session-message"

View File

@@ -1,36 +0,0 @@
import * as Schema from "effect/Schema"
export class Source extends Schema.Class<Source>("Prompt.Source")({
start: Schema.Number,
end: Schema.Number,
text: Schema.String,
}) {}
export class FileAttachment extends Schema.Class<FileAttachment>("Prompt.FileAttachment")({
uri: Schema.String,
mime: Schema.String,
name: Schema.String.pipe(Schema.optional),
description: Schema.String.pipe(Schema.optional),
source: Source.pipe(Schema.optional),
}) {
static create(input: FileAttachment) {
return new FileAttachment({
uri: input.uri,
mime: input.mime,
name: input.name,
description: input.description,
source: input.source,
})
}
}
export class AgentAttachment extends Schema.Class<AgentAttachment>("Prompt.AgentAttachment")({
name: Schema.String,
source: Source.pipe(Schema.optional),
}) {}
export class Prompt extends Schema.Class<Prompt>("Prompt")({
text: Schema.String,
files: Schema.Array(FileAttachment).pipe(Schema.optional),
agents: Schema.Array(AgentAttachment).pipe(Schema.optional),
}) {}

View File

@@ -1,241 +1,69 @@
import { SessionMessageTable, SessionTable } from "@/session/session.sql"
import { Context, Layer, Schema, Effect } from "effect"
import { SessionEntry } from "./session-entry"
import { Struct } from "effect"
import { Session } from "@/session/session"
import { SessionID } from "@/session/schema"
import { WorkspaceID } from "@/control-plane/schema"
import { and, asc, desc, eq, gt, gte, isNull, like, lt, or, type SQL } from "@/storage/db"
import * as Database from "@/storage/db"
import { Context, DateTime, Effect, Layer, Schema } from "effect"
import { SessionMessage } from "./session-message"
import type { Prompt } from "./session-prompt"
import { EventV2 } from "./event"
import { ProjectID } from "@/project/schema"
import { ModelID, ProviderID } from "@/provider/schema"
import { SessionEvent } from "./session-event"
export const Delivery = Schema.Union([Schema.Literal("immediate"), Schema.Literal("deferred")]).annotate({
identifier: "Session.Delivery",
})
export type Delivery = Schema.Schema.Type<typeof Delivery>
export const ID = SessionID
export const DefaultDelivery = "immediate" satisfies Delivery
export type ID = Schema.Schema.Type<typeof ID>
export class PromptInput extends Schema.Class<PromptInput>("Session.PromptInput")({
...Struct.omit(SessionEntry.User.fields, ["time", "type"]),
id: Schema.optionalKey(SessionEntry.ID),
sessionID: ID,
}) {}
export class CreateInput extends Schema.Class<CreateInput>("Session.CreateInput")({
id: Schema.optionalKey(ID),
}) {}
export class Info extends Schema.Class<Info>("Session.Info")({
id: SessionID,
parentID: SessionID.pipe(Schema.optional),
projectID: ProjectID,
workspaceID: WorkspaceID.pipe(Schema.optional),
path: Schema.String.pipe(Schema.optional),
agent: Schema.String.pipe(Schema.optional),
id: ID,
model: Schema.Struct({
id: ModelID,
providerID: ProviderID,
variant: Schema.String.pipe(Schema.optional),
id: Schema.String,
providerID: Schema.String,
modelID: Schema.String,
}).pipe(Schema.optional),
time: Schema.Struct({
created: Schema.DateTimeUtcFromMillis,
updated: Schema.DateTimeUtcFromMillis,
archived: Schema.DateTimeUtcFromMillis.pipe(Schema.optional),
}),
title: Schema.String,
/*
slug: Schema.String,
directory: Schema.String,
path: optionalOmitUndefined(Schema.String),
parentID: optionalOmitUndefined(SessionID),
summary: optionalOmitUndefined(Summary),
share: optionalOmitUndefined(Share),
title: Schema.String,
version: Schema.String,
time: Time,
permission: optionalOmitUndefined(Permission.Ruleset),
revert: optionalOmitUndefined(Revert),
*/
}) {}
export interface Interface {
readonly list: (input: {
limit?: number
order?: "asc" | "desc"
directory?: string
path?: string
workspaceID?: WorkspaceID
roots?: boolean
start?: number
search?: string
cursor?: {
id: SessionID
time: number
direction: "previous" | "next"
}
}) => Effect.Effect<Info[], never>
readonly messages: (input: {
sessionID: SessionID
limit?: number
order?: "asc" | "desc"
cursor?: {
id: SessionMessage.ID
time: number
direction: "previous" | "next"
}
}) => Effect.Effect<SessionMessage.Message[], never>
readonly prompt: (input: {
id?: EventV2.ID
sessionID: SessionID
prompt: Prompt
delivery?: Delivery
}) => Effect.Effect<SessionMessage.User, never>
readonly switchAgent: (input: { sessionID: SessionID; agent: string }) => Effect.Effect<void, never>
readonly switchModel: (input: {
sessionID: SessionID
id: ModelID
providerID: ProviderID
variant?: string
}) => Effect.Effect<void, never>
readonly compact: (sessionID: SessionID) => Effect.Effect<void, never>
readonly wait: (sessionID: SessionID) => Effect.Effect<void, never>
fromID: (id: ID) => Effect.Effect<Info>
create: (input: CreateInput) => Effect.Effect<Info>
prompt: (input: PromptInput) => Effect.Effect<SessionEntry.User>
}
export class Service extends Context.Service<Service, Interface>()("@opencode/v2/Session") {}
export class Service extends Context.Service<Service, Interface>()("Session.Service") {}
export const layer = Layer.effect(
Service,
export const layer = Layer.effect(Service)(
Effect.gen(function* () {
const decodeMessage = Schema.decodeUnknownSync(SessionMessage.Message)
const session = yield* Session.Service
const decode = (row: typeof SessionMessageTable.$inferSelect) =>
decodeMessage({ ...row.data, id: row.id, type: row.type })
const create: Interface["create"] = Effect.fn("Session.create")(function* (_input) {
throw new Error("Not implemented")
})
function fromRow(row: typeof SessionTable.$inferSelect): Info {
return {
id: SessionID.make(row.id),
projectID: ProjectID.make(row.project_id),
workspaceID: row.workspace_id ? WorkspaceID.make(row.workspace_id) : undefined,
title: row.title,
parentID: row.parent_id ? SessionID.make(row.parent_id) : undefined,
path: row.path ?? "",
agent: row.agent ?? undefined,
model: row.model
? {
id: ModelID.make(row.model.id),
providerID: ProviderID.make(row.model.providerID),
variant: row.model.variant,
}
: undefined,
time: {
created: DateTime.makeUnsafe(row.time_created),
updated: DateTime.makeUnsafe(row.time_updated),
archived: row.time_archived ? DateTime.makeUnsafe(row.time_archived) : undefined,
},
}
}
const prompt: Interface["prompt"] = Effect.fn("Session.prompt")(function* (_input) {
throw new Error("Not implemented")
})
const result: Interface = {
list: Effect.fn("V2Session.list")(function* (input) {
const direction = input.cursor?.direction ?? "next"
let order = input.order ?? "desc"
// Query the adjacent rows in reverse, then flip them back into the requested order below.
if (direction === "previous" && order === "asc") order = "desc"
if (direction === "previous" && order === "desc") order = "asc"
const conditions: SQL[] = []
if (input.directory) conditions.push(eq(SessionTable.directory, input.directory))
if (input.path)
conditions.push(or(eq(SessionTable.path, input.path), like(SessionTable.path, `${input.path}/%`))!)
if (input.workspaceID) conditions.push(eq(SessionTable.workspace_id, input.workspaceID))
if (input.roots) conditions.push(isNull(SessionTable.parent_id))
if (input.start) conditions.push(gte(SessionTable.time_created, input.start))
if (input.search) conditions.push(like(SessionTable.title, `%${input.search}%`))
if (input.cursor) {
conditions.push(
order === "asc"
? or(
gt(SessionTable.time_created, input.cursor.time),
and(eq(SessionTable.time_created, input.cursor.time), gt(SessionTable.id, input.cursor.id)),
)!
: or(
lt(SessionTable.time_created, input.cursor.time),
and(eq(SessionTable.time_created, input.cursor.time), lt(SessionTable.id, input.cursor.id)),
)!,
)
}
const query = Database.Client()
.select()
.from(SessionTable)
.where(conditions.length > 0 ? and(...conditions) : undefined)
.orderBy(
order === "asc" ? asc(SessionTable.time_created) : desc(SessionTable.time_created),
order === "asc" ? asc(SessionTable.id) : desc(SessionTable.id),
)
const fromID: Interface["fromID"] = Effect.fn("Session.fromID")(function* (id) {
const match = yield* session.get(id)
return fromV1(match)
})
const rows = input.limit === undefined ? query.all() : query.limit(input.limit).all()
return (direction === "previous" ? rows.toReversed() : rows).map((row) => fromRow(row))
}),
messages: Effect.fn("V2Session.messages")(function* (input) {
const direction = input.cursor?.direction ?? "next"
let order = input.order ?? "desc"
// Query the adjacent rows in reverse, then flip them back into the requested order below.
if (direction === "previous" && order === "asc") order = "desc"
if (direction === "previous" && order === "desc") order = "asc"
const boundary = input.cursor
? order === "asc"
? or(
gt(SessionMessageTable.time_created, input.cursor.time),
and(
eq(SessionMessageTable.time_created, input.cursor.time),
gt(SessionMessageTable.id, input.cursor.id),
),
)
: or(
lt(SessionMessageTable.time_created, input.cursor.time),
and(
eq(SessionMessageTable.time_created, input.cursor.time),
lt(SessionMessageTable.id, input.cursor.id),
),
)
: undefined
const where = boundary
? and(eq(SessionMessageTable.session_id, input.sessionID), boundary)
: eq(SessionMessageTable.session_id, input.sessionID)
const rows = Database.use((db) => {
const query = db
.select()
.from(SessionMessageTable)
.where(where)
.orderBy(
order === "asc" ? asc(SessionMessageTable.time_created) : desc(SessionMessageTable.time_created),
order === "asc" ? asc(SessionMessageTable.id) : desc(SessionMessageTable.id),
)
const rows = input.limit === undefined ? query.all() : query.limit(input.limit).all()
return direction === "previous" ? rows.toReversed() : rows
})
return rows.map((row) => decode(row))
}),
prompt: Effect.fn("V2Session.prompt")(function* (_input) {
return {} as any
}),
switchAgent: Effect.fn("V2Session.switchAgent")(function* (input) {
EventV2.run(SessionEvent.AgentSwitched.Sync, {
sessionID: input.sessionID,
timestamp: DateTime.makeUnsafe(Date.now()),
agent: input.agent,
})
}),
switchModel: Effect.fn("V2Session.switchModel")(function* (input) {
EventV2.run(SessionEvent.ModelSwitched.Sync, {
sessionID: input.sessionID,
timestamp: DateTime.makeUnsafe(Date.now()),
id: input.id,
providerID: input.providerID,
variant: input.variant,
})
}),
compact: Effect.fn("V2Session.compact")(function* (_sessionID) {}),
wait: Effect.fn("V2Session.wait")(function* (_sessionID) {}),
}
return Service.of(result)
return Service.of({
create,
prompt,
fromID,
})
}),
)
export const defaultLayer = layer
function fromV1(input: Session.Info): Info {
return new Info({
id: ID.make(input.id),
})
}
export * as SessionV2 from "./session"

View File

@@ -1,18 +0,0 @@
export * as ToolOutput from "./tool-output"
import { Schema } from "effect"
export class TextContent extends Schema.Class<TextContent>("Tool.TextContent")({
type: Schema.Literal("text"),
text: Schema.String,
}) {}
export class FileContent extends Schema.Class<FileContent>("Tool.FileContent")({
type: Schema.Literal("file"),
uri: Schema.String,
mime: Schema.String,
name: Schema.String.pipe(Schema.optional),
}) {}
export const Content = Schema.Union([TextContent, FileContent]).pipe(Schema.toTaggedUnion("type"))
export const Structured = Schema.Record(Schema.String, Schema.Any)

View File

@@ -58,7 +58,6 @@ function toolEvent(
raw: opts.raw,
}
const payload: EventMessagePartUpdated = {
id: `evt_${opts.callID}`,
type: "message.part.updated",
properties: {
sessionID: sessionId,

View File

@@ -25,7 +25,6 @@ function event(payload: Event, input: { directory: string; workspace?: string })
function vcs(branch: string): Event {
return {
id: `evt_vcs_${branch}`,
type: "vcs.branch.updated",
properties: {
branch,
@@ -35,7 +34,6 @@ function vcs(branch: string): Event {
function update(version: string): Event {
return {
id: `evt_update_${version}`,
type: "installation.update-available",
properties: {
version,

View File

@@ -79,7 +79,7 @@ delete process.env["OPENCODE_SERVER_USERNAME"]
process.env["OPENCODE_DB"] = ":memory:"
// Now safe to import from src/
const { Log } = await import("@opencode-ai/core/util/log")
const Log = await import("@opencode-ai/core/util/log")
const { initProjectors } = await import("../src/server/projectors")
void Log.init({

View File

@@ -16,9 +16,7 @@ import { Session } from "@/session/session"
import { MessageID, PartID, type SessionID } from "../../src/session/schema"
import { MessageV2 } from "../../src/session/message-v2"
import { Database } from "@/storage/db"
import { SessionMessageTable, SessionTable } from "@/session/session.sql"
import { SessionMessage } from "../../src/v2/session-message"
import * as DateTime from "effect/DateTime"
import { SessionTable } from "@/session/session.sql"
import * as Log from "@opencode-ai/core/util/log"
import { eq } from "drizzle-orm"
import { resetDatabase } from "../fixture/db"
@@ -204,45 +202,6 @@ describe("session HttpApi", () => {
{ headers },
),
).toMatchObject({ info: { id: message.info.id } })
yield* Effect.promise(() =>
Instance.provide({
directory: tmp.path,
fn: async () => {
const message = new SessionMessage.Assistant({
id: SessionMessage.ID.create(),
type: "assistant",
agent: "build",
model: { id: "model", providerID: "provider" },
time: { created: DateTime.makeUnsafe(1) },
content: [],
})
Database.use((db) =>
db
.insert(SessionMessageTable)
.values([
{
id: message.id,
session_id: parent.id,
type: message.type,
time_created: 1,
data: {
time: { created: 1 },
agent: message.agent,
model: message.model,
content: message.content,
} as NonNullable<typeof SessionMessageTable.$inferInsert["data"]>,
},
])
.run(),
)
},
}),
)
expect(yield* requestJson<SessionMessage.Message[]>(`/api/session/${parent.id}/message`, { headers })).toMatchObject([
{ type: "assistant" },
])
}),
),
)

View File

@@ -19,7 +19,6 @@ import { MessageV2 } from "../../src/session/message-v2"
import { MessageID, PartID, SessionID } from "../../src/session/schema"
import { SessionStatus } from "../../src/session/status"
import { SessionSummary } from "../../src/session/summary"
import { SessionV2 } from "../../src/v2/session"
import { ModelID, ProviderID } from "../../src/provider/schema"
import type { Provider } from "@/provider/provider"
import * as SessionProcessorModule from "../../src/session/processor"
@@ -596,15 +595,6 @@ describe("session.compaction.create", () => {
auto: true,
overflow: true,
})
const v2 = yield* SessionV2.Service.use((svc) => svc.messages({ sessionID: info.id })).pipe(
Effect.provide(SessionV2.defaultLayer),
)
expect(v2.at(-1)).toMatchObject({
type: "compaction",
reason: "auto",
summary: "",
})
}),
),
)

View File

@@ -19,7 +19,6 @@ import { ModelID, ProviderID } from "../../src/provider/schema"
import { Question } from "../../src/question"
import { Todo } from "../../src/session/todo"
import { Session } from "@/session/session"
import { SessionMessageTable } from "../../src/session/session.sql"
import { LLM } from "../../src/session/llm"
import { MessageV2 } from "../../src/session/message-v2"
import { AppFileSystem } from "@opencode-ai/core/filesystem"
@@ -32,7 +31,6 @@ import { SessionRevert } from "../../src/session/revert"
import { SessionRunState } from "../../src/session/run-state"
import { MessageID, PartID, SessionID } from "../../src/session/schema"
import { SessionStatus } from "../../src/session/status"
import { SessionV2 } from "../../src/v2/session"
import { Skill } from "../../src/skill"
import { SystemPrompt } from "../../src/session/system"
import { Shell } from "../../src/shell/shell"
@@ -41,7 +39,6 @@ import { ToolRegistry } from "@/tool/registry"
import { Truncate } from "@/tool/truncate"
import * as Log from "@opencode-ai/core/util/log"
import { CrossSpawnSpawner } from "@opencode-ai/core/cross-spawn-spawner"
import * as Database from "../../src/storage/db"
import { Ripgrep } from "../../src/file/ripgrep"
import { Format } from "../../src/format"
import { provideTmpdirInstance, provideTmpdirServer } from "../fixture/fixture"
@@ -374,44 +371,6 @@ it.live("loop calls LLM and returns assistant message", () =>
),
)
it.live("prompt emits v2 prompted and synthetic events", () =>
provideTmpdirServer(
Effect.fnUntraced(function* () {
const prompt = yield* SessionPrompt.Service
const sessions = yield* Session.Service
const chat = yield* sessions.create({ title: "Pinned" })
yield* prompt.prompt({
sessionID: chat.id,
agent: "build",
noReply: true,
parts: [
{ type: "text", text: "hello v2" },
{
type: "file",
mime: "text/plain",
filename: "note.txt",
url: "data:text/plain;base64,bm90ZSBjb250ZW50",
},
],
})
const messages = yield* SessionV2.Service.use((session) => session.messages({ sessionID: chat.id })).pipe(
Effect.provide(SessionV2.layer),
)
const row = Database.use((db) =>
db.select().from(SessionMessageTable).where(Database.eq(SessionMessageTable.session_id, chat.id)).get(),
)
expect(messages).toHaveLength(3)
expect(messages[0]).toMatchObject({ type: "user", text: "hello v2" })
expect(typeof row?.data.time.created).toBe("number")
expect(messages[1]).toMatchObject({ type: "synthetic", text: expect.stringContaining("Called the Read tool") })
expect(messages[2]).toMatchObject({ type: "synthetic", text: "note content" })
}),
{ git: true, config: providerCfg },
),
)
it.live("static loop returns assistant text through local provider", () =>
provideTmpdirServer(
Effect.fnUntraced(function* ({ llm }) {

View File

@@ -0,0 +1,916 @@
import { describe, expect, test } from "bun:test"
import * as DateTime from "effect/DateTime"
import * as FastCheck from "effect/testing/FastCheck"
import { SessionEntry } from "../../src/v2/session-entry"
import { SessionEntryStepper } from "../../src/v2/session-entry-stepper"
import { SessionEvent } from "../../src/v2/session-event"
const time = (n: number) => DateTime.makeUnsafe(n)
const word = FastCheck.string({ minLength: 1, maxLength: 8 })
const text = FastCheck.string({ maxLength: 16 })
const texts = FastCheck.array(text, { maxLength: 8 })
const val = FastCheck.oneof(FastCheck.boolean(), FastCheck.integer(), FastCheck.string({ maxLength: 12 }))
const dict = FastCheck.dictionary(word, val, { maxKeys: 4 })
const files = FastCheck.array(
word.map((x) => SessionEvent.FileAttachment.create({ uri: `file://${encodeURIComponent(x)}`, mime: "text/plain" })),
{ maxLength: 2 },
)
function maybe<A>(arb: FastCheck.Arbitrary<A>) {
return FastCheck.oneof(FastCheck.constant(undefined), arb)
}
function assistant() {
return new SessionEntry.Assistant({
id: SessionEvent.ID.create(),
type: "assistant",
time: { created: time(0) },
content: [],
retries: [],
})
}
function retryError(message: string) {
return new SessionEvent.RetryError({
message,
isRetryable: true,
})
}
function retry(attempt: number, message: string, created: number) {
return new SessionEntry.AssistantRetry({
attempt,
error: retryError(message),
time: {
created: time(created),
},
})
}
function memoryState() {
const state: SessionEntryStepper.MemoryState = {
entries: [],
pending: [],
}
return state
}
function active() {
const state: SessionEntryStepper.MemoryState = {
entries: [assistant()],
pending: [],
}
return state
}
function run(events: SessionEvent.Event[], state = memoryState()) {
return events.reduce<SessionEntryStepper.MemoryState>((state, event) => SessionEntryStepper.step(state, event), state)
}
function last(state: SessionEntryStepper.MemoryState) {
const entry = [...state.pending, ...state.entries].reverse().find((x) => x.type === "assistant")
expect(entry?.type).toBe("assistant")
return entry?.type === "assistant" ? entry : undefined
}
function texts_of(state: SessionEntryStepper.MemoryState) {
const entry = last(state)
if (!entry) return []
return entry.content.filter((x): x is SessionEntry.AssistantText => x.type === "text")
}
function reasons(state: SessionEntryStepper.MemoryState) {
const entry = last(state)
if (!entry) return []
return entry.content.filter((x): x is SessionEntry.AssistantReasoning => x.type === "reasoning")
}
function tools(state: SessionEntryStepper.MemoryState) {
const entry = last(state)
if (!entry) return []
return entry.content.filter((x): x is SessionEntry.AssistantTool => x.type === "tool")
}
function tool(state: SessionEntryStepper.MemoryState, callID: string) {
return tools(state).find((x) => x.callID === callID)
}
function retriesOf(state: SessionEntryStepper.MemoryState) {
const entry = last(state)
if (!entry) return []
return entry.retries ?? []
}
function adapterStore() {
return {
committed: [] as SessionEntry.Entry[],
deferred: [] as SessionEntry.Entry[],
}
}
function adapterFor(store: ReturnType<typeof adapterStore>): SessionEntryStepper.Adapter<typeof store> {
const activeAssistantIndex = () =>
store.committed.findLastIndex((entry) => entry.type === "assistant" && !entry.time.completed)
const getCurrentAssistant = () => {
const index = activeAssistantIndex()
if (index < 0) return
const assistant = store.committed[index]
return assistant?.type === "assistant" ? assistant : undefined
}
return {
getCurrentAssistant,
updateAssistant(assistant) {
const index = activeAssistantIndex()
if (index < 0) return
const current = store.committed[index]
if (current?.type !== "assistant") return
store.committed[index] = assistant
},
appendEntry(entry) {
store.committed.push(entry)
},
appendPending(entry) {
store.deferred.push(entry)
},
finish() {
return store
},
}
}
describe("session-entry-stepper", () => {
describe("stepWith", () => {
test("reduces through a custom adapter", () => {
const store = adapterStore()
store.committed.push(assistant())
SessionEntryStepper.stepWith(adapterFor(store), SessionEvent.Prompt.create({ text: "hello", timestamp: time(1) }))
SessionEntryStepper.stepWith(adapterFor(store), SessionEvent.Reasoning.Started.create({ timestamp: time(2) }))
SessionEntryStepper.stepWith(
adapterFor(store),
SessionEvent.Reasoning.Delta.create({ delta: "thinking", timestamp: time(3) }),
)
SessionEntryStepper.stepWith(
adapterFor(store),
SessionEvent.Reasoning.Ended.create({ text: "thought", timestamp: time(4) }),
)
SessionEntryStepper.stepWith(adapterFor(store), SessionEvent.Text.Started.create({ timestamp: time(5) }))
SessionEntryStepper.stepWith(
adapterFor(store),
SessionEvent.Text.Delta.create({ delta: "world", timestamp: time(6) }),
)
SessionEntryStepper.stepWith(
adapterFor(store),
SessionEvent.Step.Ended.create({
reason: "stop",
cost: 1,
tokens: {
input: 1,
output: 2,
reasoning: 3,
cache: {
read: 4,
write: 5,
},
},
timestamp: time(7),
}),
)
expect(store.deferred).toHaveLength(1)
expect(store.deferred[0]?.type).toBe("user")
expect(store.committed).toHaveLength(1)
expect(store.committed[0]?.type).toBe("assistant")
if (store.committed[0]?.type !== "assistant") return
expect(store.committed[0].content).toEqual([
{ type: "reasoning", text: "thought" },
{ type: "text", text: "world" },
])
expect(store.committed[0].time.completed).toEqual(time(7))
})
test("aggregates retry events onto the current assistant", () => {
const store = adapterStore()
store.committed.push(assistant())
SessionEntryStepper.stepWith(
adapterFor(store),
SessionEvent.Retried.create({
attempt: 1,
error: retryError("rate limited"),
timestamp: time(1),
}),
)
SessionEntryStepper.stepWith(
adapterFor(store),
SessionEvent.Retried.create({
attempt: 2,
error: retryError("provider overloaded"),
timestamp: time(2),
}),
)
expect(store.committed[0]?.type).toBe("assistant")
if (store.committed[0]?.type !== "assistant") return
expect(store.committed[0].retries).toEqual([retry(1, "rate limited", 1), retry(2, "provider overloaded", 2)])
})
})
describe("memory", () => {
test("tracks and replaces the current assistant", () => {
const state = active()
const adapter = SessionEntryStepper.memory(state)
const current = adapter.getCurrentAssistant()
expect(current?.type).toBe("assistant")
if (!current) return
adapter.updateAssistant(
new SessionEntry.Assistant({
...current,
content: [new SessionEntry.AssistantText({ type: "text", text: "done" })],
time: {
...current.time,
completed: time(1),
},
}),
)
expect(adapter.getCurrentAssistant()).toBeUndefined()
expect(state.entries[0]?.type).toBe("assistant")
if (state.entries[0]?.type !== "assistant") return
expect(state.entries[0].content).toEqual([{ type: "text", text: "done" }])
expect(state.entries[0].time.completed).toEqual(time(1))
})
test("appends committed and pending entries", () => {
const state = memoryState()
const adapter = SessionEntryStepper.memory(state)
const committed = SessionEntry.User.fromEvent(
SessionEvent.Prompt.create({ text: "committed", timestamp: time(1) }),
)
const pending = SessionEntry.User.fromEvent(SessionEvent.Prompt.create({ text: "pending", timestamp: time(2) }))
adapter.appendEntry(committed)
adapter.appendPending(pending)
expect(state.entries).toEqual([committed])
expect(state.pending).toEqual([pending])
})
test("stepWith through memory records reasoning", () => {
const state = active()
SessionEntryStepper.stepWith(
SessionEntryStepper.memory(state),
SessionEvent.Reasoning.Started.create({ timestamp: time(1) }),
)
SessionEntryStepper.stepWith(
SessionEntryStepper.memory(state),
SessionEvent.Reasoning.Delta.create({ delta: "draft", timestamp: time(2) }),
)
SessionEntryStepper.stepWith(
SessionEntryStepper.memory(state),
SessionEvent.Reasoning.Ended.create({ text: "final", timestamp: time(3) }),
)
expect(reasons(state)).toEqual([{ type: "reasoning", text: "final" }])
})
test("stepWith through memory records retries", () => {
const state = active()
SessionEntryStepper.stepWith(
SessionEntryStepper.memory(state),
SessionEvent.Retried.create({
attempt: 1,
error: retryError("rate limited"),
timestamp: time(1),
}),
)
expect(retriesOf(state)).toEqual([retry(1, "rate limited", 1)])
})
})
describe("step", () => {
describe("seeded pending assistant", () => {
test("stores prompts in entries when no assistant is pending", () => {
FastCheck.assert(
FastCheck.property(word, (body) => {
const next = SessionEntryStepper.step(
memoryState(),
SessionEvent.Prompt.create({ text: body, timestamp: time(1) }),
)
expect(next.entries).toHaveLength(1)
expect(next.entries[0]?.type).toBe("user")
if (next.entries[0]?.type !== "user") return
expect(next.entries[0].text).toBe(body)
}),
{ numRuns: 50 },
)
})
test("stores prompts in pending when an assistant is pending", () => {
FastCheck.assert(
FastCheck.property(word, (body) => {
const next = SessionEntryStepper.step(
active(),
SessionEvent.Prompt.create({ text: body, timestamp: time(1) }),
)
expect(next.pending).toHaveLength(1)
expect(next.pending[0]?.type).toBe("user")
if (next.pending[0]?.type !== "user") return
expect(next.pending[0].text).toBe(body)
}),
{ numRuns: 50 },
)
})
test("accumulates text deltas on the latest text part", () => {
FastCheck.assert(
FastCheck.property(texts, (parts) => {
const next = parts.reduce(
(state, part, i) =>
SessionEntryStepper.step(
state,
SessionEvent.Text.Delta.create({ delta: part, timestamp: time(i + 2) }),
),
SessionEntryStepper.step(active(), SessionEvent.Text.Started.create({ timestamp: time(1) })),
)
expect(texts_of(next)).toEqual([
{
type: "text",
text: parts.join(""),
},
])
}),
{ numRuns: 100 },
)
})
test("routes later text deltas to the latest text segment", () => {
FastCheck.assert(
FastCheck.property(texts, texts, (a, b) => {
const next = run(
[
SessionEvent.Text.Started.create({ timestamp: time(1) }),
...a.map((x, i) => SessionEvent.Text.Delta.create({ delta: x, timestamp: time(i + 2) })),
SessionEvent.Text.Started.create({ timestamp: time(a.length + 2) }),
...b.map((x, i) => SessionEvent.Text.Delta.create({ delta: x, timestamp: time(i + a.length + 3) })),
],
active(),
)
expect(texts_of(next)).toEqual([
{ type: "text", text: a.join("") },
{ type: "text", text: b.join("") },
])
}),
{ numRuns: 50 },
)
})
test("reasoning.ended replaces buffered reasoning text", () => {
FastCheck.assert(
FastCheck.property(texts, text, (parts, end) => {
const next = run(
[
SessionEvent.Reasoning.Started.create({ timestamp: time(1) }),
...parts.map((x, i) => SessionEvent.Reasoning.Delta.create({ delta: x, timestamp: time(i + 2) })),
SessionEvent.Reasoning.Ended.create({ text: end, timestamp: time(parts.length + 2) }),
],
active(),
)
expect(reasons(next)).toEqual([
{
type: "reasoning",
text: end,
},
])
}),
{ numRuns: 100 },
)
})
test("tool.success completes the latest running tool", () => {
FastCheck.assert(
FastCheck.property(
word,
word,
dict,
maybe(text),
maybe(dict),
maybe(files),
texts,
(callID, title, input, output, metadata, attachments, parts) => {
const next = run(
[
SessionEvent.Tool.Input.Started.create({ callID, name: "bash", timestamp: time(1) }),
...parts.map((x, i) =>
SessionEvent.Tool.Input.Delta.create({ callID, delta: x, timestamp: time(i + 2) }),
),
SessionEvent.Tool.Called.create({
callID,
tool: "bash",
input,
provider: { executed: true },
timestamp: time(parts.length + 2),
}),
SessionEvent.Tool.Success.create({
callID,
title,
output,
metadata,
attachments,
provider: { executed: true },
timestamp: time(parts.length + 3),
}),
],
active(),
)
const match = tool(next, callID)
expect(match?.state.status).toBe("completed")
if (match?.state.status !== "completed") return
expect(match.time.ran).toEqual(time(parts.length + 2))
expect(match.state.input).toEqual(input)
expect(match.state.output).toBe(output ?? "")
expect(match.state.title).toBe(title)
expect(match.state.metadata).toEqual(metadata ?? {})
expect(match.state.attachments).toEqual(attachments ?? [])
},
),
{ numRuns: 50 },
)
})
test("tool.error completes the latest running tool with an error", () => {
FastCheck.assert(
FastCheck.property(word, dict, word, maybe(dict), (callID, input, error, metadata) => {
const next = run(
[
SessionEvent.Tool.Input.Started.create({ callID, name: "bash", timestamp: time(1) }),
SessionEvent.Tool.Called.create({
callID,
tool: "bash",
input,
provider: { executed: true },
timestamp: time(2),
}),
SessionEvent.Tool.Error.create({
callID,
error,
metadata,
provider: { executed: true },
timestamp: time(3),
}),
],
active(),
)
const match = tool(next, callID)
expect(match?.state.status).toBe("error")
if (match?.state.status !== "error") return
expect(match.time.ran).toEqual(time(2))
expect(match.state.input).toEqual(input)
expect(match.state.error).toBe(error)
expect(match.state.metadata).toEqual(metadata ?? {})
}),
{ numRuns: 50 },
)
})
test("tool.success is ignored before tool.called promotes the tool to running", () => {
FastCheck.assert(
FastCheck.property(word, word, (callID, title) => {
const next = run(
[
SessionEvent.Tool.Input.Started.create({ callID, name: "bash", timestamp: time(1) }),
SessionEvent.Tool.Success.create({
callID,
title,
provider: { executed: true },
timestamp: time(2),
}),
],
active(),
)
const match = tool(next, callID)
expect(match?.state).toEqual({
status: "pending",
input: "",
})
}),
{ numRuns: 50 },
)
})
test("step.ended copies completion fields onto the pending assistant", () => {
FastCheck.assert(
FastCheck.property(FastCheck.integer({ min: 1, max: 1000 }), (n) => {
const event = SessionEvent.Step.Ended.create({
reason: "stop",
cost: 1,
tokens: {
input: 1,
output: 2,
reasoning: 3,
cache: {
read: 4,
write: 5,
},
},
timestamp: time(n),
})
const next = SessionEntryStepper.step(active(), event)
const entry = last(next)
expect(entry).toBeDefined()
if (!entry) return
expect(entry.time.completed).toEqual(event.timestamp)
expect(entry.cost).toBe(event.cost)
expect(entry.tokens).toEqual(event.tokens)
}),
{ numRuns: 50 },
)
})
})
describe("known reducer gaps", () => {
test("prompt appends immutably when no assistant is pending", () => {
FastCheck.assert(
FastCheck.property(word, (body) => {
const old = memoryState()
const next = SessionEntryStepper.step(old, SessionEvent.Prompt.create({ text: body, timestamp: time(1) }))
expect(old).not.toBe(next)
expect(old.entries).toHaveLength(0)
expect(next.entries).toHaveLength(1)
}),
{ numRuns: 50 },
)
})
test("prompt appends immutably when an assistant is pending", () => {
FastCheck.assert(
FastCheck.property(word, (body) => {
const old = active()
const next = SessionEntryStepper.step(old, SessionEvent.Prompt.create({ text: body, timestamp: time(1) }))
expect(old).not.toBe(next)
expect(old.pending).toHaveLength(0)
expect(next.pending).toHaveLength(1)
}),
{ numRuns: 50 },
)
})
test("step.started creates an assistant consumed by follow-up events", () => {
FastCheck.assert(
FastCheck.property(texts, (parts) => {
const next = run([
SessionEvent.Step.Started.create({
model: {
id: "model",
providerID: "provider",
},
timestamp: time(1),
}),
SessionEvent.Text.Started.create({ timestamp: time(2) }),
...parts.map((x, i) => SessionEvent.Text.Delta.create({ delta: x, timestamp: time(i + 3) })),
SessionEvent.Step.Ended.create({
reason: "stop",
cost: 1,
tokens: {
input: 1,
output: 2,
reasoning: 3,
cache: {
read: 4,
write: 5,
},
},
timestamp: time(parts.length + 3),
}),
])
const entry = last(next)
expect(entry).toBeDefined()
if (!entry) return
expect(entry.content).toEqual([
{
type: "text",
text: parts.join(""),
},
])
expect(entry.time.completed).toEqual(time(parts.length + 3))
}),
{ numRuns: 100 },
)
})
test("replays prompt -> step -> text -> step.ended", () => {
FastCheck.assert(
FastCheck.property(word, texts, (body, parts) => {
const next = run([
SessionEvent.Prompt.create({ text: body, timestamp: time(0) }),
SessionEvent.Step.Started.create({
model: {
id: "model",
providerID: "provider",
},
timestamp: time(1),
}),
SessionEvent.Text.Started.create({ timestamp: time(2) }),
...parts.map((x, i) => SessionEvent.Text.Delta.create({ delta: x, timestamp: time(i + 3) })),
SessionEvent.Step.Ended.create({
reason: "stop",
cost: 1,
tokens: {
input: 1,
output: 2,
reasoning: 3,
cache: {
read: 4,
write: 5,
},
},
timestamp: time(parts.length + 3),
}),
])
expect(next.entries).toHaveLength(2)
expect(next.entries[0]?.type).toBe("user")
expect(next.entries[1]?.type).toBe("assistant")
if (next.entries[1]?.type !== "assistant") return
expect(next.entries[1].content).toEqual([
{
type: "text",
text: parts.join(""),
},
])
expect(next.entries[1].time.completed).toEqual(time(parts.length + 3))
}),
{ numRuns: 50 },
)
})
test("replays prompt -> step -> reasoning -> tool -> success -> step.ended", () => {
FastCheck.assert(
FastCheck.property(
word,
texts,
text,
dict,
word,
maybe(text),
maybe(dict),
maybe(files),
(body, reason, end, input, title, output, metadata, attachments) => {
const callID = "call"
const next = run([
SessionEvent.Prompt.create({ text: body, timestamp: time(0) }),
SessionEvent.Step.Started.create({
model: {
id: "model",
providerID: "provider",
},
timestamp: time(1),
}),
SessionEvent.Reasoning.Started.create({ timestamp: time(2) }),
...reason.map((x, i) => SessionEvent.Reasoning.Delta.create({ delta: x, timestamp: time(i + 3) })),
SessionEvent.Reasoning.Ended.create({ text: end, timestamp: time(reason.length + 3) }),
SessionEvent.Tool.Input.Started.create({ callID, name: "bash", timestamp: time(reason.length + 4) }),
SessionEvent.Tool.Called.create({
callID,
tool: "bash",
input,
provider: { executed: true },
timestamp: time(reason.length + 5),
}),
SessionEvent.Tool.Success.create({
callID,
title,
output,
metadata,
attachments,
provider: { executed: true },
timestamp: time(reason.length + 6),
}),
SessionEvent.Step.Ended.create({
reason: "stop",
cost: 1,
tokens: {
input: 1,
output: 2,
reasoning: 3,
cache: {
read: 4,
write: 5,
},
},
timestamp: time(reason.length + 7),
}),
])
expect(next.entries.at(-1)?.type).toBe("assistant")
const entry = next.entries.at(-1)
if (entry?.type !== "assistant") return
expect(entry.content).toHaveLength(2)
expect(entry.content[0]).toEqual({
type: "reasoning",
text: end,
})
expect(entry.content[1]?.type).toBe("tool")
if (entry.content[1]?.type !== "tool") return
expect(entry.content[1].state.status).toBe("completed")
expect(entry.time.completed).toEqual(time(reason.length + 7))
},
),
{ numRuns: 50 },
)
})
test("starting a new step completes the old assistant and appends a new active assistant", () => {
const next = run(
[
SessionEvent.Step.Started.create({
model: {
id: "model",
providerID: "provider",
},
timestamp: time(1),
}),
],
active(),
)
expect(next.entries).toHaveLength(2)
expect(next.entries[0]?.type).toBe("assistant")
expect(next.entries[1]?.type).toBe("assistant")
if (next.entries[0]?.type !== "assistant" || next.entries[1]?.type !== "assistant") return
expect(next.entries[0].time.completed).toEqual(time(1))
expect(next.entries[1].time.created).toEqual(time(1))
expect(next.entries[1].time.completed).toBeUndefined()
})
test("handles sequential tools independently", () => {
FastCheck.assert(
FastCheck.property(dict, dict, word, word, (a, b, title, error) => {
const next = run(
[
SessionEvent.Tool.Input.Started.create({ callID: "a", name: "bash", timestamp: time(1) }),
SessionEvent.Tool.Called.create({
callID: "a",
tool: "bash",
input: a,
provider: { executed: true },
timestamp: time(2),
}),
SessionEvent.Tool.Success.create({
callID: "a",
title,
output: "done",
provider: { executed: true },
timestamp: time(3),
}),
SessionEvent.Tool.Input.Started.create({ callID: "b", name: "grep", timestamp: time(4) }),
SessionEvent.Tool.Called.create({
callID: "b",
tool: "bash",
input: b,
provider: { executed: true },
timestamp: time(5),
}),
SessionEvent.Tool.Error.create({
callID: "b",
error,
provider: { executed: true },
timestamp: time(6),
}),
],
active(),
)
const first = tool(next, "a")
const second = tool(next, "b")
expect(first?.state.status).toBe("completed")
if (first?.state.status !== "completed") return
expect(first.state.input).toEqual(a)
expect(first.state.output).toBe("done")
expect(first.state.title).toBe(title)
expect(second?.state.status).toBe("error")
if (second?.state.status !== "error") return
expect(second.state.input).toEqual(b)
expect(second.state.error).toBe(error)
}),
{ numRuns: 50 },
)
})
test("routes tool events by callID when tool streams interleave", () => {
FastCheck.assert(
FastCheck.property(dict, dict, word, word, text, text, (a, b, titleA, titleB, deltaA, deltaB) => {
const next = run(
[
SessionEvent.Tool.Input.Started.create({ callID: "a", name: "bash", timestamp: time(1) }),
SessionEvent.Tool.Input.Started.create({ callID: "b", name: "grep", timestamp: time(2) }),
SessionEvent.Tool.Input.Delta.create({ callID: "a", delta: deltaA, timestamp: time(3) }),
SessionEvent.Tool.Input.Delta.create({ callID: "b", delta: deltaB, timestamp: time(4) }),
SessionEvent.Tool.Called.create({
callID: "a",
tool: "bash",
input: a,
provider: { executed: true },
timestamp: time(5),
}),
SessionEvent.Tool.Called.create({
callID: "b",
tool: "grep",
input: b,
provider: { executed: true },
timestamp: time(6),
}),
SessionEvent.Tool.Success.create({
callID: "a",
title: titleA,
output: "done-a",
provider: { executed: true },
timestamp: time(7),
}),
SessionEvent.Tool.Success.create({
callID: "b",
title: titleB,
output: "done-b",
provider: { executed: true },
timestamp: time(8),
}),
],
active(),
)
const first = tool(next, "a")
const second = tool(next, "b")
expect(first?.state.status).toBe("completed")
expect(second?.state.status).toBe("completed")
if (first?.state.status !== "completed" || second?.state.status !== "completed") return
expect(first.state.input).toEqual(a)
expect(second.state.input).toEqual(b)
expect(first.state.title).toBe(titleA)
expect(second.state.title).toBe(titleB)
}),
{ numRuns: 50 },
)
})
test("records synthetic events", () => {
FastCheck.assert(
FastCheck.property(word, (body) => {
const next = SessionEntryStepper.step(
memoryState(),
SessionEvent.Synthetic.create({ text: body, timestamp: time(1) }),
)
expect(next.entries).toHaveLength(1)
expect(next.entries[0]?.type).toBe("synthetic")
if (next.entries[0]?.type !== "synthetic") return
expect(next.entries[0].text).toBe(body)
}),
{ numRuns: 50 },
)
})
test("records compaction events", () => {
FastCheck.assert(
FastCheck.property(FastCheck.boolean(), maybe(FastCheck.boolean()), (auto, overflow) => {
const next = SessionEntryStepper.step(
memoryState(),
SessionEvent.Compacted.create({ auto, overflow, timestamp: time(1) }),
)
expect(next.entries).toHaveLength(1)
expect(next.entries[0]?.type).toBe("compaction")
if (next.entries[0]?.type !== "compaction") return
expect(next.entries[0].auto).toBe(auto)
expect(next.entries[0].overflow).toBe(overflow)
}),
{ numRuns: 50 },
)
})
})
})
})

View File

@@ -1,203 +0,0 @@
import { expect, test } from "bun:test"
import * as DateTime from "effect/DateTime"
import { SessionID } from "../../src/session/schema"
import { EventV2 } from "../../src/v2/event"
import { SessionEvent } from "../../src/v2/session-event"
import { SessionMessageUpdater } from "../../src/v2/session-message-updater"
test("step snapshots carry over to assistant messages", () => {
const state: SessionMessageUpdater.MemoryState = { messages: [] }
const sessionID = SessionID.make("session")
SessionMessageUpdater.update(SessionMessageUpdater.memory(state), {
id: EventV2.ID.create(),
type: "session.next.step.started",
data: {
sessionID,
timestamp: DateTime.makeUnsafe(1),
agent: "build",
model: { id: "model", providerID: "provider" },
snapshot: "before",
},
} satisfies SessionEvent.Event)
SessionMessageUpdater.update(SessionMessageUpdater.memory(state), {
id: EventV2.ID.create(),
type: "session.next.step.ended",
data: {
sessionID,
timestamp: DateTime.makeUnsafe(2),
finish: "stop",
cost: 0,
tokens: {
input: 1,
output: 2,
reasoning: 0,
cache: { read: 0, write: 0 },
},
snapshot: "after",
},
} satisfies SessionEvent.Event)
expect(state.messages[0]?.type).toBe("assistant")
if (state.messages[0]?.type !== "assistant") return
expect(state.messages[0].snapshot).toEqual({ start: "before", end: "after" })
expect(state.messages[0].finish).toBe("stop")
})
test("text ended populates assistant text content", () => {
const state: SessionMessageUpdater.MemoryState = { messages: [] }
const sessionID = SessionID.make("session")
SessionMessageUpdater.update(SessionMessageUpdater.memory(state), {
id: EventV2.ID.create(),
type: "session.next.step.started",
data: {
sessionID,
timestamp: DateTime.makeUnsafe(1),
agent: "build",
model: { id: "model", providerID: "provider" },
},
} satisfies SessionEvent.Event)
SessionMessageUpdater.update(SessionMessageUpdater.memory(state), {
id: EventV2.ID.create(),
type: "session.next.text.started",
data: {
sessionID,
timestamp: DateTime.makeUnsafe(2),
},
} satisfies SessionEvent.Event)
SessionMessageUpdater.update(SessionMessageUpdater.memory(state), {
id: EventV2.ID.create(),
type: "session.next.text.ended",
data: {
sessionID,
timestamp: DateTime.makeUnsafe(3),
text: "hello assistant",
},
} satisfies SessionEvent.Event)
expect(state.messages[0]?.type).toBe("assistant")
if (state.messages[0]?.type !== "assistant") return
expect(state.messages[0].content).toEqual([{ type: "text", text: "hello assistant" }])
})
test("tool completion stores completed timestamp", () => {
const state: SessionMessageUpdater.MemoryState = { messages: [] }
const sessionID = SessionID.make("session")
const callID = "call"
SessionMessageUpdater.update(SessionMessageUpdater.memory(state), {
id: EventV2.ID.create(),
type: "session.next.step.started",
data: {
sessionID,
timestamp: DateTime.makeUnsafe(1),
agent: "build",
model: { id: "model", providerID: "provider" },
},
} satisfies SessionEvent.Event)
SessionMessageUpdater.update(SessionMessageUpdater.memory(state), {
id: EventV2.ID.create(),
type: "session.next.tool.input.started",
data: {
sessionID,
timestamp: DateTime.makeUnsafe(2),
callID,
name: "bash",
},
} satisfies SessionEvent.Event)
SessionMessageUpdater.update(SessionMessageUpdater.memory(state), {
id: EventV2.ID.create(),
type: "session.next.tool.called",
data: {
sessionID,
timestamp: DateTime.makeUnsafe(3),
callID,
tool: "bash",
input: { command: "pwd" },
provider: { executed: true, metadata: { source: "provider" } },
},
} satisfies SessionEvent.Event)
SessionMessageUpdater.update(SessionMessageUpdater.memory(state), {
id: EventV2.ID.create(),
type: "session.next.tool.success",
data: {
sessionID,
timestamp: DateTime.makeUnsafe(4),
callID,
structured: {},
content: [{ type: "text", text: "/tmp" }],
provider: { executed: true, metadata: { status: "done" } },
},
} satisfies SessionEvent.Event)
expect(state.messages[0]?.type).toBe("assistant")
if (state.messages[0]?.type !== "assistant") return
expect(state.messages[0].content[0]?.type).toBe("tool")
if (state.messages[0].content[0]?.type !== "tool") return
expect(state.messages[0].content[0].time.completed).toEqual(DateTime.makeUnsafe(4))
expect(state.messages[0].content[0].provider).toEqual({ executed: true, metadata: { status: "done" } })
})
test("compaction events reduce to compaction message", () => {
const state: SessionMessageUpdater.MemoryState = { messages: [] }
const sessionID = SessionID.make("session")
const id = EventV2.ID.create()
SessionMessageUpdater.update(SessionMessageUpdater.memory(state), {
id,
type: "session.next.compaction.started",
data: {
sessionID,
timestamp: DateTime.makeUnsafe(1),
reason: "auto",
},
} satisfies SessionEvent.Event)
SessionMessageUpdater.update(SessionMessageUpdater.memory(state), {
id: EventV2.ID.create(),
type: "session.next.compaction.delta",
data: {
sessionID,
timestamp: DateTime.makeUnsafe(2),
text: "hello ",
},
} satisfies SessionEvent.Event)
SessionMessageUpdater.update(SessionMessageUpdater.memory(state), {
id: EventV2.ID.create(),
type: "session.next.compaction.delta",
data: {
sessionID,
timestamp: DateTime.makeUnsafe(3),
text: "summary",
},
} satisfies SessionEvent.Event)
SessionMessageUpdater.update(SessionMessageUpdater.memory(state), {
id: EventV2.ID.create(),
type: "session.next.compaction.ended",
data: {
sessionID,
timestamp: DateTime.makeUnsafe(4),
text: "final summary",
include: "recent context",
},
} satisfies SessionEvent.Event)
expect(state.messages).toHaveLength(1)
expect(state.messages[0]).toMatchObject({
id,
type: "compaction",
reason: "auto",
summary: "final summary",
include: "recent context",
time: { created: DateTime.makeUnsafe(1) },
})
})

View File

@@ -187,10 +187,6 @@ import type {
TuiSelectSessionResponses,
TuiShowToastResponses,
TuiSubmitPromptResponses,
V2SessionListErrors,
V2SessionListResponses,
V2SessionMessagesErrors,
V2SessionMessagesResponses,
VcsDiffResponses,
VcsGetResponses,
WorktreeCreateErrors,
@@ -1695,12 +1691,6 @@ export class Session2 extends HeyApiClient {
workspace?: string
parentID?: string
title?: string
agent?: string
model?: {
id: string
providerID: string
variant?: string
}
permission?: PermissionRuleset
workspaceID?: string
},
@@ -1715,8 +1705,6 @@ export class Session2 extends HeyApiClient {
{ in: "query", key: "workspace" },
{ in: "body", key: "parentID" },
{ in: "body", key: "title" },
{ in: "body", key: "agent" },
{ in: "body", key: "model" },
{ in: "body", key: "permission" },
{ in: "body", key: "workspaceID" },
],
@@ -3191,97 +3179,6 @@ export class Sync extends HeyApiClient {
}
}
export class Session3 extends HeyApiClient {
/**
* List v2 sessions
*
* Retrieve sessions in the requested order. Items keep that order across pages; use cursor.next or cursor.previous to move through the ordered list.
*/
public list<ThrowOnError extends boolean = false>(
parameters?: {
directory?: string
workspace?: string
limit?: number
cursor?: string
order?: "asc" | "desc"
path?: string
roots?: "true" | "false"
start?: number
search?: string
},
options?: Options<never, ThrowOnError>,
) {
const params = buildClientParams(
[parameters],
[
{
args: [
{ in: "query", key: "directory" },
{ in: "query", key: "workspace" },
{ in: "query", key: "limit" },
{ in: "query", key: "cursor" },
{ in: "query", key: "order" },
{ in: "query", key: "path" },
{ in: "query", key: "roots" },
{ in: "query", key: "start" },
{ in: "query", key: "search" },
],
},
],
)
return (options?.client ?? this.client).get<V2SessionListResponses, V2SessionListErrors, ThrowOnError>({
url: "/api/session",
...options,
...params,
})
}
/**
* Get v2 session messages
*
* Retrieve projected v2 messages for a session directly from the message database.
*/
public messages<ThrowOnError extends boolean = false>(
parameters: {
sessionID: string
directory?: string
workspace?: string
limit?: number
cursor?: string
order?: "asc" | "desc"
},
options?: Options<never, ThrowOnError>,
) {
const params = buildClientParams(
[parameters],
[
{
args: [
{ in: "path", key: "sessionID" },
{ in: "query", key: "directory" },
{ in: "query", key: "workspace" },
{ in: "query", key: "limit" },
{ in: "query", key: "cursor" },
{ in: "query", key: "order" },
],
},
],
)
return (options?.client ?? this.client).get<V2SessionMessagesResponses, V2SessionMessagesErrors, ThrowOnError>({
url: "/api/session/{sessionID}/message",
...options,
...params,
})
}
}
export class V2 extends HeyApiClient {
private _session?: Session3
get session(): Session3 {
return (this._session ??= new Session3({ client: this.client }))
}
}
export class Find extends HeyApiClient {
/**
* Find text
@@ -4543,11 +4440,6 @@ export class OpencodeClient extends HeyApiClient {
return (this._sync ??= new Sync({ client: this.client }))
}
private _v2?: V2
get v2(): V2 {
return (this._v2 ??= new V2({ client: this.client }))
}
private _find?: Find
get find(): Find {
return (this._find ??= new Find({ client: this.client }))

File diff suppressed because it is too large Load Diff

View File

@@ -1,131 +0,0 @@
# Session V2 Concept Gaps
Compared with `packages/opencode/src/session/message-v2.ts` and `packages/opencode/src/session/processor.ts`, `packages/opencode/src/v2` currently captures the rough event stream for prompts, assistant steps, text, reasoning, tools, retries, and compaction, but it does not yet capture several persisted-message and processor concepts.
## Message Metadata
- User messages are missing selected `agent`, `model`, `system`, enabled `tools`, output `format`, and summary metadata.
- Assistant messages are missing `parentID`, `agent`, `providerID`, `modelID`, `variant`, `path.cwd`, `path.root`, deprecated `mode`, `summary`, `structured`, `finish`, and typed `error`.
## Output Format
- Text output format.
- JSON-schema output format.
- Structured-output retry count.
- Structured assistant result payload.
- Structured-output error classification.
## Errors
- Aborted error.
- Provider auth error.
- API error with status, retryability, headers, body, and metadata.
- Context-overflow error.
- Output-length error.
- Unknown error.
- V2 mostly reduces assistant errors to strings, except retry errors.
## Part Identity
- V1 has stable `MessageID`, `PartID`, `sessionID`, and `messageID` on every part.
- V2 assistant content does not preserve stable per-content IDs.
- Stable content IDs matter for deltas, updates, removals, sync events, and UI reconciliation.
## Part Timing And Metadata
- V1 text, reasoning, and tool states carry timing and provider metadata.
- V2 assistant text and reasoning content only store text.
- V2 events include metadata, but `SessionEntry` currently drops most provider metadata.
## Snapshots And Patches
- Snapshot parts.
- Patch parts.
- Step-start snapshot references.
- Step-finish snapshot references.
- Processor behavior that tracks a snapshot before the stream and emits patches after step finish or cleanup.
## Step Boundaries
- V1 stores `step-start` and `step-finish` as first-class parts.
- V2 has `step.started` and `step.ended` events, but the assistant entry only stores aggregate cost and tokens.
- V2 does not preserve step boundary parts, finish reason, or snapshot details in the entry model.
## Compaction
- V1 compaction parts have `auto`, `overflow`, and `tail_start_id`.
- V2 compacted events have `auto` and optional `overflow`, but no retained-tail marker.
- V1 also has history filtering semantics around completed summary messages and retained tails.
## Files And Sources
- V1 file parts have `mime`, `filename`, `url`, and typed source information.
- V1 source variants include file, symbol, and resource sources.
- Symbol sources include LSP range, name, and kind.
- Resource sources include client name and URI.
- V2 file attachments have `uri`, `mime`, `name`, `description`, and a generic text source, but lose source type, LSP metadata, and resource metadata.
## Agents And Subtasks
- Agent parts.
- Subtask parts.
- Subtask prompt, description, agent, model, and command.
- V2 has agent attachments on prompts, but no assistant/session content equivalent for subtask execution.
## Text Flags
- Synthetic text flag.
- Ignored text flag.
- V2 has a separate synthetic entry, but no ignored text concept.
## Tool Calls
- V1 pending tool state stores parsed input and raw input text separately.
- V2 pending tool state stores a string input but does not preserve a separate raw field.
- V1 completed tool state has `time.start`, `time.end`, and optional `time.compacted`.
- V2 tool time has `created`, `ran`, `completed`, and `pruned`, but the stepper currently does not set `completed` or `pruned`.
- V1 error tool state has `time.start` and `time.end`.
- V1 supports interrupted tool errors with `metadata.interrupted` and preserved partial output.
- V1 tracks provider execution and provider call metadata.
- V2 events include provider info, but `SessionEntryStepper` drops it from entries.
- V1 has tool-output compaction and truncation behavior via `time.compacted`.
## Media Handling
- V1 models tool attachments as file parts and has provider-specific handling for media in tool results.
- V1 can strip media, inject synthetic user messages for unsupported providers, and uses a synthetic attachment prompt.
- V2 has attachments but not these model-message conversion semantics.
## Retries
- V1 stores retries as independently addressable retry parts.
- V2 stores retries as an assistant aggregate.
- V2 captures some retry information, but not the independent part identity/update model.
## Processor Control Flow
- Session status transitions: busy, retry, and idle.
- Retry policy integration.
- Context-overflow-driven compaction.
- Abort and interrupt handling.
- Permission-denied blocking.
- Doom-loop detection.
- Plugin hook for `experimental.text.complete`.
- Background summary generation after steps.
- Cleanup semantics for open text, reasoning, and tool calls.
## Sync And Bus Events
- Message updated.
- Message removed.
- Message part updated.
- Message part delta.
- Message part removed.
- V2 has domain events, but not the sync/bus event model for persisted message and part updates/removals.
## History Retrieval
- Cursor encoding and decoding.
- Paged message retrieval.
- Reverse streaming through history.
- Compaction-aware history filtering.