refactor(effect): drop shell abort signals from runner (#21599)

This commit is contained in:
Kit Langton
2026-04-09 10:54:26 -04:00
committed by GitHub
parent 58a99916bb
commit 46da801f30
3 changed files with 22 additions and 58 deletions

View File

@@ -1,10 +1,10 @@
import { Cause, Deferred, Effect, Exit, Fiber, Option, Schema, Scope, SynchronizedRef } from "effect"
import { Cause, Deferred, Effect, Exit, Fiber, Schema, Scope, SynchronizedRef } from "effect"
export interface Runner<A, E = never> {
readonly state: Runner.State<A, E>
readonly busy: boolean
readonly ensureRunning: (work: Effect.Effect<A, E>) => Effect.Effect<A, E>
readonly startShell: (work: (signal: AbortSignal) => Effect.Effect<A, E>) => Effect.Effect<A, E>
readonly startShell: (work: Effect.Effect<A, E>) => Effect.Effect<A, E>
readonly cancel: Effect.Effect<void>
}
@@ -20,7 +20,6 @@ export namespace Runner {
interface ShellHandle<A, E> {
id: number
fiber: Fiber.Fiber<A, E>
abort: AbortController
}
interface PendingHandle<A, E> {
@@ -100,13 +99,7 @@ export namespace Runner {
}),
).pipe(Effect.flatten)
const stopShell = (shell: ShellHandle<A, E>) =>
Effect.gen(function* () {
shell.abort.abort()
const exit = yield* Fiber.await(shell.fiber).pipe(Effect.timeoutOption("100 millis"))
if (Option.isNone(exit)) yield* Fiber.interrupt(shell.fiber)
yield* Fiber.await(shell.fiber).pipe(Effect.exit, Effect.asVoid)
})
const stopShell = (shell: ShellHandle<A, E>) => Fiber.interrupt(shell.fiber)
const ensureRunning = (work: Effect.Effect<A, E>) =>
SynchronizedRef.modifyEffect(
@@ -138,7 +131,7 @@ export namespace Runner {
),
)
const startShell = (work: (signal: AbortSignal) => Effect.Effect<A, E>) =>
const startShell = (work: Effect.Effect<A, E>) =>
SynchronizedRef.modifyEffect(
ref,
Effect.fnUntraced(function* (st) {
@@ -153,9 +146,8 @@ export namespace Runner {
}
yield* busy
const id = next()
const abort = new AbortController()
const fiber = yield* work(abort.signal).pipe(Effect.ensuring(finishShell(id)), Effect.forkChild)
const shell = { id, fiber, abort } satisfies ShellHandle<A, E>
const fiber = yield* work.pipe(Effect.ensuring(finishShell(id)), Effect.forkChild)
const shell = { id, fiber } satisfies ShellHandle<A, E>
return [
Effect.gen(function* () {
const exit = yield* Fiber.await(fiber)

View File

@@ -743,7 +743,7 @@ NOTE: At any point in time through this workflow you should feel free to ask the
} satisfies MessageV2.TextPart)
})
const shellImpl = Effect.fn("SessionPrompt.shellImpl")(function* (input: ShellInput, signal: AbortSignal) {
const shellImpl = Effect.fn("SessionPrompt.shellImpl")(function* (input: ShellInput) {
const ctx = yield* InstanceState.context
const session = yield* sessions.get(input.sessionID)
if (session.revert) {
@@ -1577,7 +1577,7 @@ NOTE: At any point in time through this workflow you should feel free to ask the
function* (input: ShellInput) {
const s = yield* InstanceState.get(state)
const runner = getRunner(s.runners, input.sessionID)
return yield* runner.startShell((signal) => shellImpl(input, signal))
return yield* runner.startShell(shellImpl(input))
},
)

View File

@@ -250,7 +250,7 @@ describe("Runner", () => {
Effect.gen(function* () {
const s = yield* Scope.Scope
const runner = Runner.make<string>(s)
const result = yield* runner.startShell((_signal) => Effect.succeed("shell-done"))
const result = yield* runner.startShell(Effect.succeed("shell-done"))
expect(result).toBe("shell-done")
expect(runner.busy).toBe(false)
}),
@@ -264,7 +264,7 @@ describe("Runner", () => {
const fiber = yield* runner.ensureRunning(Effect.never.pipe(Effect.as("x"))).pipe(Effect.forkChild)
yield* Effect.sleep("10 millis")
const exit = yield* runner.startShell((_s) => Effect.succeed("nope")).pipe(Effect.exit)
const exit = yield* runner.startShell(Effect.succeed("nope")).pipe(Effect.exit)
expect(Exit.isFailure(exit)).toBe(true)
yield* runner.cancel
@@ -279,12 +279,10 @@ describe("Runner", () => {
const runner = Runner.make<string>(s)
const gate = yield* Deferred.make<void>()
const sh = yield* runner
.startShell((_signal) => Deferred.await(gate).pipe(Effect.as("first")))
.pipe(Effect.forkChild)
const sh = yield* runner.startShell(Deferred.await(gate).pipe(Effect.as("first"))).pipe(Effect.forkChild)
yield* Effect.sleep("10 millis")
const exit = yield* runner.startShell((_s) => Effect.succeed("second")).pipe(Effect.exit)
const exit = yield* runner.startShell(Effect.succeed("second")).pipe(Effect.exit)
expect(Exit.isFailure(exit)).toBe(true)
yield* Deferred.succeed(gate, undefined)
@@ -302,37 +300,26 @@ describe("Runner", () => {
},
})
const sh = yield* runner
.startShell((signal) =>
Effect.promise(
() =>
new Promise<string>((resolve) => {
signal.addEventListener("abort", () => resolve("aborted"), { once: true })
}),
),
)
.pipe(Effect.forkChild)
const sh = yield* runner.startShell(Effect.never.pipe(Effect.as("aborted"))).pipe(Effect.forkChild)
yield* Effect.sleep("10 millis")
const exit = yield* runner.startShell((_s) => Effect.succeed("second")).pipe(Effect.exit)
const exit = yield* runner.startShell(Effect.succeed("second")).pipe(Effect.exit)
expect(Exit.isFailure(exit)).toBe(true)
yield* runner.cancel
const done = yield* Fiber.await(sh)
expect(Exit.isSuccess(done)).toBe(true)
expect(Exit.isFailure(done)).toBe(true)
}),
)
it.live(
"cancel interrupts shell that ignores abort signal",
"cancel interrupts shell",
Effect.gen(function* () {
const s = yield* Scope.Scope
const runner = Runner.make<string>(s)
const gate = yield* Deferred.make<void>()
const sh = yield* runner
.startShell((_signal) => Deferred.await(gate).pipe(Effect.as("ignored")))
.pipe(Effect.forkChild)
const sh = yield* runner.startShell(Deferred.await(gate).pipe(Effect.as("ignored"))).pipe(Effect.forkChild)
yield* Effect.sleep("10 millis")
const stop = yield* runner.cancel.pipe(Effect.forkChild)
@@ -356,9 +343,7 @@ describe("Runner", () => {
const runner = Runner.make<string>(s)
const gate = yield* Deferred.make<void>()
const sh = yield* runner
.startShell((_signal) => Deferred.await(gate).pipe(Effect.as("shell-result")))
.pipe(Effect.forkChild)
const sh = yield* runner.startShell(Deferred.await(gate).pipe(Effect.as("shell-result"))).pipe(Effect.forkChild)
yield* Effect.sleep("10 millis")
expect(runner.state._tag).toBe("Shell")
@@ -384,9 +369,7 @@ describe("Runner", () => {
const calls = yield* Ref.make(0)
const gate = yield* Deferred.make<void>()
const sh = yield* runner
.startShell((_signal) => Deferred.await(gate).pipe(Effect.as("shell")))
.pipe(Effect.forkChild)
const sh = yield* runner.startShell(Deferred.await(gate).pipe(Effect.as("shell"))).pipe(Effect.forkChild)
yield* Effect.sleep("10 millis")
const work = Effect.gen(function* () {
@@ -414,16 +397,7 @@ describe("Runner", () => {
const runner = Runner.make<string>(s)
const gate = yield* Deferred.make<void>()
const sh = yield* runner
.startShell((signal) =>
Effect.promise(
() =>
new Promise<string>((resolve) => {
signal.addEventListener("abort", () => resolve("aborted"), { once: true })
}),
),
)
.pipe(Effect.forkChild)
const sh = yield* runner.startShell(Effect.never.pipe(Effect.as("aborted"))).pipe(Effect.forkChild)
yield* Effect.sleep("10 millis")
const run = yield* runner.ensureRunning(Effect.succeed("y")).pipe(Effect.forkChild)
@@ -478,7 +452,7 @@ describe("Runner", () => {
const runner = Runner.make<string>(s, {
onBusy: Ref.update(count, (n) => n + 1),
})
yield* runner.startShell((_signal) => Effect.succeed("done"))
yield* runner.startShell(Effect.succeed("done"))
expect(yield* Ref.get(count)).toBe(1)
}),
)
@@ -509,9 +483,7 @@ describe("Runner", () => {
const runner = Runner.make<string>(s)
const gate = yield* Deferred.make<void>()
const fiber = yield* runner
.startShell((_signal) => Deferred.await(gate).pipe(Effect.as("ok")))
.pipe(Effect.forkChild)
const fiber = yield* runner.startShell(Deferred.await(gate).pipe(Effect.as("ok"))).pipe(Effect.forkChild)
yield* Effect.sleep("10 millis")
expect(runner.busy).toBe(true)