refactor(effect): extract session run state service (#21744)

This commit is contained in:
Kit Langton
2026-04-09 16:03:40 -04:00
committed by GitHub
parent 3199383eef
commit 10441efad1
9 changed files with 184 additions and 111 deletions

View File

@@ -161,39 +161,37 @@ export namespace Vcs {
const bus = yield* Bus.Service
const state = yield* InstanceState.make<State>(
Effect.fn("Vcs.state")((ctx) =>
Effect.gen(function* () {
if (ctx.project.vcs !== "git") {
return { current: undefined, root: undefined }
}
Effect.fn("Vcs.state")(function* (ctx) {
if (ctx.project.vcs !== "git") {
return { current: undefined, root: undefined }
}
const get = Effect.fnUntraced(function* () {
return yield* git.branch(ctx.directory)
})
const [current, root] = yield* Effect.all([git.branch(ctx.directory), git.defaultBranch(ctx.directory)], {
concurrency: 2,
})
const value = { current, root }
log.info("initialized", { branch: value.current, default_branch: value.root?.name })
const get = Effect.fnUntraced(function* () {
return yield* git.branch(ctx.directory)
})
const [current, root] = yield* Effect.all([git.branch(ctx.directory), git.defaultBranch(ctx.directory)], {
concurrency: 2,
})
const value = { current, root }
log.info("initialized", { branch: value.current, default_branch: value.root?.name })
yield* bus.subscribe(FileWatcher.Event.Updated).pipe(
Stream.filter((evt) => evt.properties.file.endsWith("HEAD")),
Stream.runForEach((_evt) =>
Effect.gen(function* () {
const next = yield* get()
if (next !== value.current) {
log.info("branch changed", { from: value.current, to: next })
value.current = next
yield* bus.publish(Event.BranchUpdated, { branch: next })
}
}),
),
Effect.forkScoped,
)
yield* bus.subscribe(FileWatcher.Event.Updated).pipe(
Stream.filter((evt) => evt.properties.file.endsWith("HEAD")),
Stream.runForEach((_evt) =>
Effect.gen(function* () {
const next = yield* get()
if (next !== value.current) {
log.info("branch changed", { from: value.current, to: next })
value.current = next
yield* bus.publish(Event.BranchUpdated, { branch: next })
}
}),
),
Effect.forkScoped,
)
return value
}),
),
return value
}),
)
return Service.of({

View File

@@ -6,6 +6,7 @@ import z from "zod"
import { Session } from "../../session"
import { MessageV2 } from "../../session/message-v2"
import { SessionPrompt } from "../../session/prompt"
import { SessionRunState } from "@/session/run-state"
import { SessionCompaction } from "../../session/compaction"
import { SessionRevert } from "../../session/revert"
import { SessionStatus } from "@/session/status"
@@ -698,7 +699,7 @@ export const SessionRoutes = lazy(() =>
),
async (c) => {
const params = c.req.valid("param")
await SessionPrompt.assertNotBusy(params.sessionID)
await SessionRunState.assertNotBusy(params.sessionID)
await Session.removeMessage({
sessionID: params.sessionID,
messageID: params.messageID,

View File

@@ -20,7 +20,6 @@ import PROMPT_PLAN from "../session/prompt/plan.txt"
import BUILD_SWITCH from "../session/prompt/build-switch.txt"
import MAX_STEPS from "../session/prompt/max-steps.txt"
import { ToolRegistry } from "../tool/registry"
import { Runner } from "@/effect/runner"
import { MCP } from "../mcp"
import { LSP } from "../lsp"
import { FileTime } from "../file/time"
@@ -48,6 +47,7 @@ import { Cause, Effect, Exit, Layer, Option, Scope, ServiceMap } from "effect"
import { InstanceState } from "@/effect/instance-state"
import { makeRuntime } from "@/effect/run-service"
import { TaskTool } from "@/tool/task"
import { SessionRunState } from "./run-state"
// @ts-ignore
globalThis.AI_SDK_LOG_WARNINGS = false
@@ -66,7 +66,6 @@ export namespace SessionPrompt {
const log = Log.create({ service: "session.prompt" })
export interface Interface {
readonly assertNotBusy: (sessionID: SessionID) => Effect.Effect<void, Session.BusyError>
readonly cancel: (sessionID: SessionID) => Effect.Effect<void>
readonly prompt: (input: PromptInput) => Effect.Effect<MessageV2.WithParts>
readonly loop: (input: z.infer<typeof LoopInput>) => Effect.Effect<MessageV2.WithParts>
@@ -99,55 +98,11 @@ export namespace SessionPrompt {
const spawner = yield* ChildProcessSpawner.ChildProcessSpawner
const scope = yield* Scope.Scope
const instruction = yield* Instruction.Service
const state = yield* InstanceState.make(
Effect.fn("SessionPrompt.state")(function* () {
const runners = new Map<string, Runner<MessageV2.WithParts>>()
yield* Effect.addFinalizer(
Effect.fnUntraced(function* () {
yield* Effect.forEach(runners.values(), (r) => r.cancel, { concurrency: "unbounded", discard: true })
runners.clear()
}),
)
return { runners }
}),
)
const getRunner = (runners: Map<string, Runner<MessageV2.WithParts>>, sessionID: SessionID) => {
const existing = runners.get(sessionID)
if (existing) return existing
const runner = Runner.make<MessageV2.WithParts>(scope, {
onIdle: Effect.gen(function* () {
runners.delete(sessionID)
yield* status.set(sessionID, { type: "idle" })
}),
onBusy: status.set(sessionID, { type: "busy" }),
onInterrupt: lastAssistant(sessionID),
busy: () => {
throw new Session.BusyError(sessionID)
},
})
runners.set(sessionID, runner)
return runner
}
const assertNotBusy: (sessionID: SessionID) => Effect.Effect<void, Session.BusyError> = Effect.fn(
"SessionPrompt.assertNotBusy",
)(function* (sessionID: SessionID) {
const s = yield* InstanceState.get(state)
const runner = s.runners.get(sessionID)
if (runner?.busy) throw new Session.BusyError(sessionID)
})
const state = yield* SessionRunState.Service
const cancel = Effect.fn("SessionPrompt.cancel")(function* (sessionID: SessionID) {
log.info("cancel", { sessionID })
const s = yield* InstanceState.get(state)
const runner = s.runners.get(sessionID)
if (!runner || !runner.busy) {
yield* status.set(sessionID, { type: "idle" })
return
}
yield* runner.cancel
yield* state.cancel(sessionID)
})
const resolvePromptParts = Effect.fn("SessionPrompt.resolvePromptParts")(function* (template: string) {
@@ -1574,16 +1529,12 @@ NOTE: At any point in time through this workflow you should feel free to ask the
const loop: (input: z.infer<typeof LoopInput>) => Effect.Effect<MessageV2.WithParts> = Effect.fn(
"SessionPrompt.loop",
)(function* (input: z.infer<typeof LoopInput>) {
const s = yield* InstanceState.get(state)
const runner = getRunner(s.runners, input.sessionID)
return yield* runner.ensureRunning(runLoop(input.sessionID))
return yield* state.ensureRunning(input.sessionID, lastAssistant(input.sessionID), runLoop(input.sessionID))
})
const shell: (input: ShellInput) => Effect.Effect<MessageV2.WithParts> = Effect.fn("SessionPrompt.shell")(
function* (input: ShellInput) {
const s = yield* InstanceState.get(state)
const runner = getRunner(s.runners, input.sessionID)
return yield* runner.startShell(shellImpl(input))
return yield* state.startShell(input.sessionID, lastAssistant(input.sessionID), shellImpl(input))
},
)
@@ -1704,7 +1655,6 @@ NOTE: At any point in time through this workflow you should feel free to ask the
})
return Service.of({
assertNotBusy,
cancel,
prompt,
loop,
@@ -1718,6 +1668,7 @@ NOTE: At any point in time through this workflow you should feel free to ask the
const defaultLayer = Layer.unwrap(
Effect.sync(() =>
layer.pipe(
Layer.provide(SessionRunState.layer),
Layer.provide(SessionStatus.layer),
Layer.provide(SessionCompaction.defaultLayer),
Layer.provide(SessionProcessor.defaultLayer),
@@ -1741,10 +1692,6 @@ NOTE: At any point in time through this workflow you should feel free to ask the
)
const { runPromise } = makeRuntime(Service, defaultLayer)
export async function assertNotBusy(sessionID: SessionID) {
return runPromise((svc) => svc.assertNotBusy(SessionID.zod.parse(sessionID)))
}
export const PromptInput = z.object({
sessionID: SessionID.zod,
messageID: MessageID.zod.optional(),

View File

@@ -9,8 +9,9 @@ import { Log } from "../util/log"
import { Session } from "."
import { MessageV2 } from "./message-v2"
import { SessionID, MessageID, PartID } from "./schema"
import { SessionPrompt } from "./prompt"
import { SessionRunState } from "./run-state"
import { SessionSummary } from "./summary"
import { SessionStatus } from "./status"
export namespace SessionRevert {
const log = Log.create({ service: "session.revert" })
@@ -38,9 +39,10 @@ export namespace SessionRevert {
const storage = yield* Storage.Service
const bus = yield* Bus.Service
const summary = yield* SessionSummary.Service
const state = yield* SessionRunState.Service
const revert = Effect.fn("SessionRevert.revert")(function* (input: RevertInput) {
yield* Effect.promise(() => SessionPrompt.assertNotBusy(input.sessionID))
yield* state.assertNotBusy(input.sessionID)
const all = yield* sessions.messages({ sessionID: input.sessionID })
let lastUser: MessageV2.User | undefined
const session = yield* sessions.get(input.sessionID)
@@ -93,7 +95,7 @@ export namespace SessionRevert {
const unrevert = Effect.fn("SessionRevert.unrevert")(function* (input: { sessionID: SessionID }) {
log.info("unreverting", input)
yield* Effect.promise(() => SessionPrompt.assertNotBusy(input.sessionID))
yield* state.assertNotBusy(input.sessionID)
const session = yield* sessions.get(input.sessionID)
if (!session.revert) return session
if (session.revert.snapshot) yield* snap.restore(session.revert!.snapshot!)
@@ -151,6 +153,8 @@ export namespace SessionRevert {
export const defaultLayer = Layer.unwrap(
Effect.sync(() =>
layer.pipe(
Layer.provide(SessionRunState.layer),
Layer.provide(SessionStatus.layer),
Layer.provide(Session.defaultLayer),
Layer.provide(Snapshot.defaultLayer),
Layer.provide(Storage.defaultLayer),

View File

@@ -0,0 +1,114 @@
import { InstanceState } from "@/effect/instance-state"
import { Runner } from "@/effect/runner"
import { makeRuntime } from "@/effect/run-service"
import { Effect, Layer, Scope, ServiceMap } from "effect"
import { Session } from "."
import { MessageV2 } from "./message-v2"
import { SessionID } from "./schema"
import { SessionStatus } from "./status"
export namespace SessionRunState {
export interface Interface {
readonly assertNotBusy: (sessionID: SessionID) => Effect.Effect<void>
readonly cancel: (sessionID: SessionID) => Effect.Effect<void>
readonly ensureRunning: (
sessionID: SessionID,
onInterrupt: Effect.Effect<MessageV2.WithParts>,
work: Effect.Effect<MessageV2.WithParts>,
) => Effect.Effect<MessageV2.WithParts>
readonly startShell: (
sessionID: SessionID,
onInterrupt: Effect.Effect<MessageV2.WithParts>,
work: Effect.Effect<MessageV2.WithParts>,
) => Effect.Effect<MessageV2.WithParts>
}
export class Service extends ServiceMap.Service<Service, Interface>()("@opencode/SessionRunState") {}
export const layer = Layer.effect(
Service,
Effect.gen(function* () {
const status = yield* SessionStatus.Service
const state = yield* InstanceState.make(
Effect.fn("SessionRunState.state")(function* () {
const scope = yield* Scope.Scope
const runners = new Map<SessionID, Runner<MessageV2.WithParts>>()
yield* Effect.addFinalizer(
Effect.fnUntraced(function* () {
yield* Effect.forEach(runners.values(), (runner) => runner.cancel, {
concurrency: "unbounded",
discard: true,
})
runners.clear()
}),
)
return { runners, scope }
}),
)
const runner = Effect.fn("SessionRunState.runner")(function* (
sessionID: SessionID,
onInterrupt: Effect.Effect<MessageV2.WithParts>,
) {
const data = yield* InstanceState.get(state)
const existing = data.runners.get(sessionID)
if (existing) return existing
const next = Runner.make<MessageV2.WithParts>(data.scope, {
onIdle: Effect.gen(function* () {
data.runners.delete(sessionID)
yield* status.set(sessionID, { type: "idle" })
}),
onBusy: status.set(sessionID, { type: "busy" }),
onInterrupt,
busy: () => {
throw new Session.BusyError(sessionID)
},
})
data.runners.set(sessionID, next)
return next
})
const assertNotBusy = Effect.fn("SessionRunState.assertNotBusy")(function* (sessionID: SessionID) {
const data = yield* InstanceState.get(state)
const existing = data.runners.get(sessionID)
if (existing?.busy) throw new Session.BusyError(sessionID)
})
const cancel = Effect.fn("SessionRunState.cancel")(function* (sessionID: SessionID) {
const data = yield* InstanceState.get(state)
const existing = data.runners.get(sessionID)
if (!existing || !existing.busy) {
yield* status.set(sessionID, { type: "idle" })
return
}
yield* existing.cancel
})
const ensureRunning = Effect.fn("SessionRunState.ensureRunning")(function* (
sessionID: SessionID,
onInterrupt: Effect.Effect<MessageV2.WithParts>,
work: Effect.Effect<MessageV2.WithParts>,
) {
return yield* (yield* runner(sessionID, onInterrupt)).ensureRunning(work)
})
const startShell = Effect.fn("SessionRunState.startShell")(function* (
sessionID: SessionID,
onInterrupt: Effect.Effect<MessageV2.WithParts>,
work: Effect.Effect<MessageV2.WithParts>,
) {
return yield* (yield* runner(sessionID, onInterrupt)).startShell(work)
})
return Service.of({ assertNotBusy, cancel, ensureRunning, startShell })
}),
)
export const defaultLayer = layer.pipe(Layer.provide(SessionStatus.defaultLayer))
const { runPromise } = makeRuntime(Service, defaultLayer)
export async function assertNotBusy(sessionID: SessionID) {
return runPromise((svc) => svc.assertNotBusy(sessionID))
}
}

View File

@@ -85,7 +85,7 @@ export namespace SessionStatus {
}),
)
const defaultLayer = layer.pipe(Layer.provide(Bus.layer))
export const defaultLayer = layer.pipe(Layer.provide(Bus.layer))
const { runPromise } = makeRuntime(Service, defaultLayer)
export async function get(sessionID: SessionID) {

View File

@@ -5,6 +5,7 @@ import { Session } from "../../src/session"
import { ModelID, ProviderID } from "../../src/provider/schema"
import { MessageID, PartID, type SessionID } from "../../src/session/schema"
import { SessionPrompt } from "../../src/session/prompt"
import { SessionRunState } from "../../src/session/run-state"
import { Log } from "../../src/util/log"
import { tmpdir } from "../fixture/fixture"
@@ -64,7 +65,7 @@ describe("session action routes", () => {
fn: async () => {
const session = await Session.create({})
const msg = await user(session.id, "hello")
const busy = spyOn(SessionPrompt, "assertNotBusy").mockRejectedValue(new Session.BusyError(session.id))
const busy = spyOn(SessionRunState, "assertNotBusy").mockRejectedValue(new Session.BusyError(session.id))
const remove = spyOn(Session, "removeMessage").mockResolvedValue(msg.id)
const app = Server.Default().app

View File

@@ -25,6 +25,7 @@ import { SessionCompaction } from "../../src/session/compaction"
import { Instruction } from "../../src/session/instruction"
import { SessionProcessor } from "../../src/session/processor"
import { SessionPrompt } from "../../src/session/prompt"
import { SessionRunState } from "../../src/session/run-state"
import { MessageID, PartID, SessionID } from "../../src/session/schema"
import { SessionStatus } from "../../src/session/status"
import { Shell } from "../../src/shell/shell"
@@ -143,6 +144,7 @@ const filetime = Layer.succeed(
)
const status = SessionStatus.layer.pipe(Layer.provideMerge(Bus.layer))
const run = SessionRunState.layer.pipe(Layer.provide(status))
const infra = Layer.mergeAll(NodeFileSystem.layer, CrossSpawnSpawner.defaultLayer)
function makeHttp() {
const deps = Layer.mergeAll(
@@ -174,6 +176,7 @@ function makeHttp() {
return Layer.mergeAll(
TestLLMServer.layer,
SessionPrompt.layer.pipe(
Layer.provideMerge(run),
Layer.provideMerge(compact),
Layer.provideMerge(proc),
Layer.provideMerge(registry),
@@ -300,9 +303,10 @@ const addSubtask = (sessionID: SessionID, messageID: MessageID, model = ref) =>
const boot = Effect.fn("test.boot")(function* (input?: { title?: string }) {
const prompt = yield* SessionPrompt.Service
const run = yield* SessionRunState.Service
const sessions = yield* Session.Service
const chat = yield* sessions.create(input ?? { title: "Pinned" })
return { prompt, sessions, chat }
return { prompt, run, sessions, chat }
})
// Loop semantics
@@ -800,7 +804,7 @@ it.live("concurrent loop callers get same result", () =>
provideTmpdirInstance(
(dir) =>
Effect.gen(function* () {
const { prompt, chat } = yield* boot()
const { prompt, run, chat } = yield* boot()
yield* seed(chat.id, { finish: "stop" })
const [a, b] = yield* Effect.all([prompt.loop({ sessionID: chat.id }), prompt.loop({ sessionID: chat.id })], {
@@ -809,7 +813,7 @@ it.live("concurrent loop callers get same result", () =>
expect(a.info.id).toBe(b.info.id)
expect(a.info.role).toBe("assistant")
yield* prompt.assertNotBusy(chat.id)
yield* run.assertNotBusy(chat.id)
}),
{ git: true },
),
@@ -913,6 +917,7 @@ it.live(
provideTmpdirServer(
Effect.fnUntraced(function* ({ llm }) {
const prompt = yield* SessionPrompt.Service
const run = yield* SessionRunState.Service
const sessions = yield* Session.Service
yield* llm.hang
@@ -922,7 +927,7 @@ it.live(
const fiber = yield* prompt.loop({ sessionID: chat.id }).pipe(Effect.forkChild)
yield* llm.wait(1)
const exit = yield* prompt.assertNotBusy(chat.id).pipe(Effect.exit)
const exit = yield* run.assertNotBusy(chat.id).pipe(Effect.exit)
expect(Exit.isFailure(exit)).toBe(true)
if (Exit.isFailure(exit)) {
expect(Cause.squash(exit.cause)).toBeInstanceOf(Session.BusyError)
@@ -940,11 +945,11 @@ it.live("assertNotBusy succeeds when idle", () =>
provideTmpdirInstance(
(dir) =>
Effect.gen(function* () {
const prompt = yield* SessionPrompt.Service
const run = yield* SessionRunState.Service
const sessions = yield* Session.Service
const chat = yield* sessions.create({})
const exit = yield* prompt.assertNotBusy(chat.id).pipe(Effect.exit)
const exit = yield* run.assertNotBusy(chat.id).pipe(Effect.exit)
expect(Exit.isSuccess(exit)).toBe(true)
}),
{ git: true },
@@ -985,7 +990,7 @@ unix("shell captures stdout and stderr in completed tool output", () =>
provideTmpdirInstance(
(dir) =>
Effect.gen(function* () {
const { prompt, chat } = yield* boot()
const { prompt, run, chat } = yield* boot()
const result = yield* prompt.shell({
sessionID: chat.id,
agent: "build",
@@ -1000,7 +1005,7 @@ unix("shell captures stdout and stderr in completed tool output", () =>
expect(tool.state.output).toContain("err")
expect(tool.state.metadata.output).toContain("out")
expect(tool.state.metadata.output).toContain("err")
yield* prompt.assertNotBusy(chat.id)
yield* run.assertNotBusy(chat.id)
}),
{ git: true, config: cfg },
),
@@ -1010,7 +1015,7 @@ unix("shell completes a fast command on the preferred shell", () =>
provideTmpdirInstance(
(dir) =>
Effect.gen(function* () {
const { prompt, chat } = yield* boot()
const { prompt, run, chat } = yield* boot()
const result = yield* prompt.shell({
sessionID: chat.id,
agent: "build",
@@ -1024,7 +1029,7 @@ unix("shell completes a fast command on the preferred shell", () =>
expect(tool.state.input.command).toBe("pwd")
expect(tool.state.output).toContain(dir)
expect(tool.state.metadata.output).toContain(dir)
yield* prompt.assertNotBusy(chat.id)
yield* run.assertNotBusy(chat.id)
}),
{ git: true, config: cfg },
),
@@ -1034,7 +1039,7 @@ unix("shell lists files from the project directory", () =>
provideTmpdirInstance(
(dir) =>
Effect.gen(function* () {
const { prompt, chat } = yield* boot()
const { prompt, run, chat } = yield* boot()
yield* Effect.promise(() => Bun.write(path.join(dir, "README.md"), "# e2e\n"))
const result = yield* prompt.shell({
@@ -1050,7 +1055,7 @@ unix("shell lists files from the project directory", () =>
expect(tool.state.input.command).toBe("command ls")
expect(tool.state.output).toContain("README.md")
expect(tool.state.metadata.output).toContain("README.md")
yield* prompt.assertNotBusy(chat.id)
yield* run.assertNotBusy(chat.id)
}),
{ git: true, config: cfg },
),
@@ -1060,7 +1065,7 @@ unix("shell captures stderr from a failing command", () =>
provideTmpdirInstance(
(dir) =>
Effect.gen(function* () {
const { prompt, chat } = yield* boot()
const { prompt, run, chat } = yield* boot()
const result = yield* prompt.shell({
sessionID: chat.id,
agent: "build",
@@ -1073,7 +1078,7 @@ unix("shell captures stderr from a failing command", () =>
expect(tool.state.output).toContain("not found")
expect(tool.state.metadata.output).toContain("not found")
yield* prompt.assertNotBusy(chat.id)
yield* run.assertNotBusy(chat.id)
}),
{ git: true, config: cfg },
),
@@ -1198,7 +1203,7 @@ unix(
provideTmpdirInstance(
(dir) =>
Effect.gen(function* () {
const { prompt, chat } = yield* boot()
const { prompt, run, chat } = yield* boot()
const sh = yield* prompt
.shell({ sessionID: chat.id, agent: "build", command: "sleep 30" })
@@ -1209,7 +1214,7 @@ unix(
const status = yield* SessionStatus.Service
expect((yield* status.get(chat.id)).type).toBe("idle")
const busy = yield* prompt.assertNotBusy(chat.id).pipe(Effect.exit)
const busy = yield* run.assertNotBusy(chat.id).pipe(Effect.exit)
expect(Exit.isSuccess(busy)).toBe(true)
const exit = yield* Fiber.await(sh)

View File

@@ -43,6 +43,7 @@ import { Todo } from "../../src/session/todo"
import { SessionCompaction } from "../../src/session/compaction"
import { Instruction } from "../../src/session/instruction"
import { SessionProcessor } from "../../src/session/processor"
import { SessionRunState } from "../../src/session/run-state"
import { SessionStatus } from "../../src/session/status"
import { Shell } from "../../src/shell/shell"
import { Snapshot } from "../../src/snapshot"
@@ -107,6 +108,7 @@ const filetime = Layer.succeed(
)
const status = SessionStatus.layer.pipe(Layer.provideMerge(Bus.layer))
const run = SessionRunState.layer.pipe(Layer.provide(status))
const infra = Layer.mergeAll(NodeFileSystem.layer, CrossSpawnSpawner.defaultLayer)
function makeHttp() {
@@ -139,6 +141,7 @@ function makeHttp() {
return Layer.mergeAll(
TestLLMServer.layer,
SessionPrompt.layer.pipe(
Layer.provideMerge(run),
Layer.provideMerge(compact),
Layer.provideMerge(proc),
Layer.provideMerge(registry),