diff --git a/packages/opencode/src/effect/runner.ts b/packages/opencode/src/effect/runner.ts index 0e923b1194..1e7d4c2966 100644 --- a/packages/opencode/src/effect/runner.ts +++ b/packages/opencode/src/effect/runner.ts @@ -1,10 +1,10 @@ -import { Cause, Deferred, Effect, Exit, Fiber, Schema, Scope, SynchronizedRef } from "effect" +import { Cause, Deferred, Effect, Exit, Fiber, Latch, Schema, Scope, SynchronizedRef } from "effect" export interface Runner { readonly state: State readonly busy: boolean readonly ensureRunning: (work: Effect.Effect) => Effect.Effect - readonly startShell: (work: Effect.Effect) => Effect.Effect + readonly startShell: (work: Effect.Effect, ready?: Latch.Latch) => Effect.Effect readonly cancel: Effect.Effect } @@ -18,6 +18,8 @@ interface RunHandle { interface ShellHandle { id: number + cancelled: Deferred.Deferred + ready?: Latch.Latch fiber: Fiber.Fiber } @@ -59,6 +61,9 @@ export const make = ( ? Deferred.fail(done, new Cancelled()).pipe(Effect.asVoid) : Deferred.done(done, exit).pipe(Effect.asVoid) + const awaitDone = (done: Deferred.Deferred) => + Deferred.await(done).pipe(Effect.catchTag("RunnerCancelled", (e) => onInterrupt ?? Effect.die(e))) + const idleIfCurrent = () => SynchronizedRef.modify(ref, (st) => [st._tag === "Idle" ? idle : Effect.void, st] as const).pipe(Effect.flatten) @@ -89,7 +94,9 @@ export const make = ( SynchronizedRef.modifyEffect( ref, Effect.fnUntraced(function* (st) { - if (st._tag === "Shell" && st.shell.id === id) return [idle, { _tag: "Idle" }] as const + if (st._tag === "Shell" && st.shell.id === id) { + return [idle, { _tag: "Idle" }] as const + } if (st._tag === "ShellThenRun" && st.shell.id === id) { const run = yield* startRun(st.run.work, st.run.done) return [Effect.void, { _tag: "Running", run }] as const @@ -98,7 +105,12 @@ export const make = ( }), ).pipe(Effect.flatten) - const stopShell = (shell: ShellHandle) => Fiber.interrupt(shell.fiber) + const stopShell = (shell: ShellHandle) => + Effect.gen(function* () { + if (shell.ready) yield* shell.ready.await.pipe(Effect.exit, Effect.asVoid) + yield* Deferred.succeed(shell.cancelled, undefined).pipe(Effect.asVoid) + yield* Fiber.interrupt(shell.fiber) + }) const ensureRunning = (work: Effect.Effect) => SynchronizedRef.modifyEffect( @@ -107,30 +119,25 @@ export const make = ( switch (st._tag) { case "Running": case "ShellThenRun": - return [Deferred.await(st.run.done), st] as const + return [awaitDone(st.run.done), st] as const case "Shell": { const run = { id: next(), done: yield* Deferred.make(), work, } satisfies PendingHandle - return [Deferred.await(run.done), { _tag: "ShellThenRun", shell: st.shell, run }] as const + return [awaitDone(run.done), { _tag: "ShellThenRun", shell: st.shell, run }] as const } case "Idle": { const done = yield* Deferred.make() const run = yield* startRun(work, done) - return [Deferred.await(done), { _tag: "Running", run }] as const + return [awaitDone(done), { _tag: "Running", run }] as const } } }), - ).pipe( - Effect.flatten, - Effect.catch( - (e): Effect.Effect => (e instanceof Cancelled ? (onInterrupt ?? Effect.die(e)) : Effect.fail(e as E)), - ), - ) + ).pipe(Effect.flatten) - const startShell = (work: Effect.Effect) => + const startShell = (work: Effect.Effect, ready?: Latch.Latch) => SynchronizedRef.modifyEffect( ref, Effect.fnUntraced(function* (st) { @@ -145,13 +152,20 @@ export const make = ( } yield* busy const id = next() + const cancelled = yield* Deferred.make() const fiber = yield* work.pipe(Effect.ensuring(finishShell(id)), Effect.forkChild) - const shell = { id, fiber } satisfies ShellHandle + const shell = { id, cancelled, ready, fiber } satisfies ShellHandle return [ Effect.gen(function* () { const exit = yield* Fiber.await(fiber) if (Exit.isSuccess(exit)) return exit.value - if (Cause.hasInterruptsOnly(exit.cause) && onInterrupt) return yield* onInterrupt + if ( + Cause.hasInterruptsOnly(exit.cause) || + ((yield* Deferred.isDone(cancelled)) && Cause.hasInterrupts(exit.cause) && !Cause.hasDies(exit.cause)) + ) { + if (onInterrupt) return yield* onInterrupt + return yield* Effect.die(new Cancelled()) + } return yield* Effect.failCause(exit.cause) }), { _tag: "Shell", shell }, @@ -183,8 +197,8 @@ export const make = ( case "ShellThenRun": return [ Effect.gen(function* () { - yield* Deferred.fail(st.run.done, new Cancelled()).pipe(Effect.asVoid) yield* stopShell(st.shell) + yield* Deferred.fail(st.run.done, new Cancelled()).pipe(Effect.asVoid) yield* idleIfCurrent() }), { _tag: "Idle" } as const, diff --git a/packages/opencode/src/session/prompt.ts b/packages/opencode/src/session/prompt.ts index f7306280fe..4c259e4aef 100644 --- a/packages/opencode/src/session/prompt.ts +++ b/packages/opencode/src/session/prompt.ts @@ -45,7 +45,7 @@ import { AppFileSystem } from "@opencode-ai/core/filesystem" import { Truncate } from "@/tool/truncate" import { decodeDataUrl } from "@/util/data-url" import { Process } from "@/util/process" -import { Cause, Effect, Exit, Layer, Option, Scope, Context, Schema } from "effect" +import { Cause, Effect, Exit, Latch, Layer, Option, Scope, Context, Schema } from "effect" import { zod } from "@/util/effect-zod" import { withStatics } from "@/util/schema" import * as EffectLogger from "@opencode-ai/core/effect/logger" @@ -720,143 +720,145 @@ NOTE: At any point in time through this workflow you should feel free to ask the } satisfies MessageV2.TextPart) }) - const shellImpl = Effect.fn("SessionPrompt.shellImpl")(function* (input: ShellInput) { - const ctx = yield* InstanceState.context - const run = yield* runner() - const session = yield* sessions.get(input.sessionID) - if (session.revert) { - yield* revert.cleanup(session) - } - const agent = yield* agents.get(input.agent) - if (!agent) { - const available = (yield* agents.list()).filter((a) => !a.hidden).map((a) => a.name) - const hint = available.length ? ` Available agents: ${available.join(", ")}` : "" - const error = new NamedError.Unknown({ message: `Agent not found: "${input.agent}".${hint}` }) - yield* bus.publish(Session.Event.Error, { sessionID: input.sessionID, error: error.toObject() }) - throw error - } - const model = input.model ?? agent.model ?? (yield* lastModel(input.sessionID)) - const userMsg: MessageV2.User = { - id: input.messageID ?? MessageID.ascending(), - sessionID: input.sessionID, - time: { created: Date.now() }, - role: "user", - agent: input.agent, - model: { providerID: model.providerID, modelID: model.modelID }, - } - yield* sessions.updateMessage(userMsg) - const userPart: MessageV2.Part = { - type: "text", - id: PartID.ascending(), - messageID: userMsg.id, - sessionID: input.sessionID, - text: "The following tool was executed by the user", - synthetic: true, - } - yield* sessions.updatePart(userPart) - - const msg: MessageV2.Assistant = { - id: MessageID.ascending(), - sessionID: input.sessionID, - parentID: userMsg.id, - mode: input.agent, - agent: input.agent, - cost: 0, - path: { cwd: ctx.directory, root: ctx.worktree }, - time: { created: Date.now() }, - role: "assistant", - tokens: { input: 0, output: 0, reasoning: 0, cache: { read: 0, write: 0 } }, - modelID: model.modelID, - providerID: model.providerID, - } - yield* sessions.updateMessage(msg) - const part: MessageV2.ToolPart = { - type: "tool", - id: PartID.ascending(), - messageID: msg.id, - sessionID: input.sessionID, - tool: "bash", - callID: ulid(), - state: { - status: "running", - time: { start: Date.now() }, - input: { command: input.command }, - }, - } - yield* sessions.updatePart(part) - - const cfg = yield* config.get() - const sh = Shell.preferred(cfg.shell) - const cwd = ctx.directory - const args = Shell.args(sh, input.command, cwd) - const shellEnv = yield* plugin.trigger( - "shell.env", - { cwd, sessionID: input.sessionID, callID: part.callID }, - { env: {} }, - ) - - const cmd = ChildProcess.make(sh, args, { - cwd, - extendEnv: true, - env: { ...shellEnv.env, TERM: "dumb" }, - stdin: "ignore", - forceKillAfter: "3 seconds", - }) - - let output = "" - let aborted = false - const finish = Effect.uninterruptible( + const shellImpl = Effect.fn("SessionPrompt.shellImpl")(function* (input: ShellInput, ready?: Latch.Latch) { + return yield* Effect.uninterruptibleMask((restore) => Effect.gen(function* () { - if (aborted) { - output += "\n\n" + ["", "User aborted the command", ""].join("\n") - } - if (!msg.time.completed) { - msg.time.completed = Date.now() + const markReady = ready ? ready.open.pipe(Effect.asVoid) : Effect.void + const { msg, part, cwd } = yield* Effect.gen(function* () { + const ctx = yield* InstanceState.context + const session = yield* sessions.get(input.sessionID) + if (session.revert) { + yield* revert.cleanup(session) + } + const agent = yield* agents.get(input.agent) + if (!agent) { + const available = (yield* agents.list()).filter((a) => !a.hidden).map((a) => a.name) + const hint = available.length ? ` Available agents: ${available.join(", ")}` : "" + const error = new NamedError.Unknown({ message: `Agent not found: "${input.agent}".${hint}` }) + yield* bus.publish(Session.Event.Error, { sessionID: input.sessionID, error: error.toObject() }) + throw error + } + const model = input.model ?? agent.model ?? (yield* lastModel(input.sessionID)) + const userMsg: MessageV2.User = { + id: input.messageID ?? MessageID.ascending(), + sessionID: input.sessionID, + time: { created: Date.now() }, + role: "user", + agent: input.agent, + model: { providerID: model.providerID, modelID: model.modelID }, + } + yield* sessions.updateMessage(userMsg) + const userPart: MessageV2.Part = { + type: "text", + id: PartID.ascending(), + messageID: userMsg.id, + sessionID: input.sessionID, + text: "The following tool was executed by the user", + synthetic: true, + } + yield* sessions.updatePart(userPart) + + const msg: MessageV2.Assistant = { + id: MessageID.ascending(), + sessionID: input.sessionID, + parentID: userMsg.id, + mode: input.agent, + agent: input.agent, + cost: 0, + path: { cwd: ctx.directory, root: ctx.worktree }, + time: { created: Date.now() }, + role: "assistant", + tokens: { input: 0, output: 0, reasoning: 0, cache: { read: 0, write: 0 } }, + modelID: model.modelID, + providerID: model.providerID, + } yield* sessions.updateMessage(msg) - } - if (part.state.status === "running") { - part.state = { - status: "completed", - time: { ...part.state.time, end: Date.now() }, - input: part.state.input, - title: "", - metadata: { output, description: "" }, - output, + const part: MessageV2.ToolPart = { + type: "tool", + id: PartID.ascending(), + messageID: msg.id, + sessionID: input.sessionID, + tool: "bash", + callID: ulid(), + state: { + status: "running", + time: { start: Date.now() }, + input: { command: input.command }, + }, } yield* sessions.updatePart(part) + return { msg, part, cwd: ctx.directory } + }).pipe(Effect.ensuring(markReady)) + + const cfg = yield* config.get() + const sh = Shell.preferred(cfg.shell) + const args = Shell.args(sh, input.command, cwd) + let output = "" + let aborted = false + + const finish = Effect.uninterruptible( + Effect.gen(function* () { + if (aborted) { + output += "\n\n" + ["", "User aborted the command", ""].join("\n") + } + if (!msg.time.completed) { + msg.time.completed = Date.now() + yield* sessions.updateMessage(msg) + } + if (part.state.status === "running") { + part.state = { + status: "completed", + time: { ...part.state.time, end: Date.now() }, + input: part.state.input, + title: "", + metadata: { output, description: "" }, + output, + } + yield* sessions.updatePart(part) + } + }), + ) + + const exit = yield* restore( + Effect.gen(function* () { + const shellEnv = yield* plugin.trigger( + "shell.env", + { cwd, sessionID: input.sessionID, callID: part.callID }, + { env: {} }, + ) + const cmd = ChildProcess.make(sh, args, { + cwd, + extendEnv: true, + env: { ...shellEnv.env, TERM: "dumb" }, + stdin: "ignore", + forceKillAfter: "3 seconds", + }) + const handle = yield* spawner.spawn(cmd) + yield* Stream.runForEach(Stream.decodeText(handle.all), (chunk) => + Effect.gen(function* () { + output += chunk + if (part.state.status === "running") { + part.state.metadata = { output, description: "" } + yield* sessions.updatePart(part) + } + }), + ) + yield* handle.exitCode + }).pipe(Effect.scoped, Effect.orDie), + ).pipe(Effect.exit) + + if (Exit.isFailure(exit) && Cause.hasInterrupts(exit.cause) && !Cause.hasDies(exit.cause)) { + aborted = true } + yield* finish + + if (Exit.isFailure(exit) && !aborted && !Cause.hasInterruptsOnly(exit.cause)) { + return yield* Effect.failCause(exit.cause) + } + + return { info: msg, parts: [part] } }), ) - - const exit = yield* Effect.gen(function* () { - const handle = yield* spawner.spawn(cmd) - yield* Stream.runForEach(Stream.decodeText(handle.all), (chunk) => - Effect.sync(() => { - output += chunk - if (part.state.status === "running") { - part.state.metadata = { output, description: "" } - void run.fork(sessions.updatePart(part)) - } - }), - ) - yield* handle.exitCode - }).pipe( - Effect.scoped, - Effect.onInterrupt(() => - Effect.sync(() => { - aborted = true - }), - ), - Effect.orDie, - Effect.ensuring(finish), - Effect.exit, - ) - - if (Exit.isFailure(exit) && !Cause.hasInterruptsOnly(exit.cause)) { - return yield* Effect.failCause(exit.cause) - } - - return { info: msg, parts: [part] } }) const getModel = Effect.fn("SessionPrompt.getModel")(function* ( @@ -1507,7 +1509,8 @@ NOTE: At any point in time through this workflow you should feel free to ask the const shell: (input: ShellInput) => Effect.Effect = Effect.fn("SessionPrompt.shell")( function* (input: ShellInput) { - return yield* state.startShell(input.sessionID, lastAssistant(input.sessionID), shellImpl(input)) + const ready = yield* Latch.make() + return yield* state.startShell(input.sessionID, lastAssistant(input.sessionID), shellImpl(input, ready), ready) }, ) diff --git a/packages/opencode/src/session/run-state.ts b/packages/opencode/src/session/run-state.ts index 4b210d63d7..9d4986f174 100644 --- a/packages/opencode/src/session/run-state.ts +++ b/packages/opencode/src/session/run-state.ts @@ -1,6 +1,6 @@ import { InstanceState } from "@/effect/instance-state" import { Runner } from "@/effect/runner" -import { Effect, Layer, Scope, Context } from "effect" +import { Effect, Latch, Layer, Scope, Context } from "effect" import * as Session from "./session" import { MessageV2 } from "./message-v2" import { SessionID } from "./schema" @@ -18,6 +18,7 @@ export interface Interface { sessionID: SessionID, onInterrupt: Effect.Effect, work: Effect.Effect, + ready?: Latch.Latch, ) => Effect.Effect } @@ -95,8 +96,9 @@ export const layer = Layer.effect( sessionID: SessionID, onInterrupt: Effect.Effect, work: Effect.Effect, + ready?: Latch.Latch, ) { - return yield* (yield* runner(sessionID, onInterrupt)).startShell(work) + return yield* (yield* runner(sessionID, onInterrupt)).startShell(work, ready) }) return Service.of({ assertNotBusy, cancel, ensureRunning, startShell }) diff --git a/packages/opencode/test/effect/runner.test.ts b/packages/opencode/test/effect/runner.test.ts index 4b0fbc1b51..ee99050a8c 100644 --- a/packages/opencode/test/effect/runner.test.ts +++ b/packages/opencode/test/effect/runner.test.ts @@ -334,6 +334,22 @@ describe("Runner", () => { }), ) + it.live( + "cancel does not mask shell defects", + Effect.gen(function* () { + const s = yield* Scope.Scope + const runner = Runner.make(s, { onInterrupt: Effect.succeed("interrupted") }) + + const sh = yield* runner + .startShell(Effect.never.pipe(Effect.ensuring(Effect.die("boom")), Effect.as("ignored"))) + .pipe(Effect.forkChild) + yield* Effect.sleep("10 millis") + + yield* runner.cancel + expect(Exit.isFailure(yield* Fiber.await(sh))).toBe(true) + }), + ) + // --- shell→run handoff --- it.live( diff --git a/packages/opencode/test/session/prompt.test.ts b/packages/opencode/test/session/prompt.test.ts index 422c1400c9..5330569401 100644 --- a/packages/opencode/test/session/prompt.test.ts +++ b/packages/opencode/test/session/prompt.test.ts @@ -1470,6 +1470,10 @@ unix( const exit = yield* Fiber.await(loop) expect(Exit.isSuccess(exit)).toBe(true) + if (Exit.isSuccess(exit)) { + const tool = completedTool(exit.value.parts) + expect(tool?.state.output).toContain("User aborted the command") + } yield* Fiber.await(sh) }),