From af5d7b67b07c71132ebdd4bf466cb268be20d42c Mon Sep 17 00:00:00 2001 From: Kit Langton Date: Tue, 14 Apr 2026 17:44:01 -0400 Subject: [PATCH] fix(effect): preserve context across callback bridges --- packages/opencode/src/bus/index.ts | 4 ++-- packages/opencode/src/command/index.ts | 5 ++--- packages/opencode/src/mcp/index.ts | 12 ++++++----- packages/opencode/src/plugin/index.ts | 24 ++++++++++------------ packages/opencode/src/provider/provider.ts | 5 ++--- packages/opencode/src/pty/index.ts | 6 +++--- 6 files changed, 27 insertions(+), 29 deletions(-) diff --git a/packages/opencode/src/bus/index.ts b/packages/opencode/src/bus/index.ts index 0638777bd4..91035084ab 100644 --- a/packages/opencode/src/bus/index.ts +++ b/packages/opencode/src/bus/index.ts @@ -1,6 +1,5 @@ import z from "zod" import { Effect, Exit, Layer, PubSub, Scope, Context, Stream } from "effect" -import { EffectLogger } from "@/effect/logger" import { Log } from "../util/log" import { BusEvent } from "./bus-event" import { GlobalBus } from "./global" @@ -128,6 +127,7 @@ export namespace Bus { function on(pubsub: PubSub.PubSub, type: string, callback: (event: T) => unknown) { return Effect.gen(function* () { log.info("subscribing", { type }) + const ctx = yield* Effect.context() const scope = yield* Scope.make() const subscription = yield* Scope.provide(scope)(PubSub.subscribe(pubsub)) @@ -147,7 +147,7 @@ export namespace Bus { return () => { log.info("unsubscribing", { type }) - Effect.runFork(Scope.close(scope, Exit.void).pipe(Effect.provide(EffectLogger.layer))) + Effect.runForkWith(ctx)(Scope.close(scope, Exit.void)) } }) } diff --git a/packages/opencode/src/command/index.ts b/packages/opencode/src/command/index.ts index 42f53301b2..2d8e9d4928 100644 --- a/packages/opencode/src/command/index.ts +++ b/packages/opencode/src/command/index.ts @@ -3,7 +3,6 @@ import { InstanceState } from "@/effect/instance-state" import type { InstanceContext } from "@/project/instance" import { SessionID, MessageID } from "@/session/schema" import { Effect, Layer, Context } from "effect" -import { EffectLogger } from "@/effect/logger" import z from "zod" import { Config } from "../config/config" import { MCP } from "../mcp" @@ -79,6 +78,7 @@ export namespace Command { const config = yield* Config.Service const mcp = yield* MCP.Service const skill = yield* Skill.Service + const fx = yield* Effect.context() const init = Effect.fn("Command.state")(function* (ctx: InstanceContext) { const cfg = yield* config.get() @@ -125,7 +125,7 @@ export namespace Command { source: "mcp", description: prompt.description, get template() { - return Effect.runPromise( + return Effect.runPromiseWith(fx)( mcp .getPrompt( prompt.client, @@ -141,7 +141,6 @@ export namespace Command { .map((message) => (message.content.type === "text" ? message.content.text : "")) .join("\n") || "", ), - Effect.provide(EffectLogger.layer), ), ) }, diff --git a/packages/opencode/src/mcp/index.ts b/packages/opencode/src/mcp/index.ts index 3de427d7e4..83a2af297a 100644 --- a/packages/opencode/src/mcp/index.ts +++ b/packages/opencode/src/mcp/index.ts @@ -25,7 +25,6 @@ import { Bus } from "@/bus" import { TuiEvent } from "@/cli/cmd/tui/event" import open from "open" import { Effect, Exit, Layer, Option, Context, Stream } from "effect" -import { EffectLogger } from "@/effect/logger" import { InstanceState } from "@/effect/instance-state" import { ChildProcess, ChildProcessSpawner } from "effect/unstable/process" import * as CrossSpawnSpawner from "@/effect/cross-spawn-spawner" @@ -444,6 +443,11 @@ export namespace MCP { return { mcpClient, status, defs: listed } satisfies CreateResult }) const cfgSvc = yield* Config.Service + const ctx = yield* Effect.context() + + const run = { + promise: (effect: Effect.Effect) => Effect.runPromiseWith(ctx)(effect), + } const descendants = Effect.fnUntraced( function* (pid: number) { @@ -476,14 +480,12 @@ export namespace MCP { log.info("tools list changed notification received", { server: name }) if (s.clients[name] !== client || s.status[name]?.status !== "connected") return - const listed = await Effect.runPromise(defs(name, client, timeout).pipe(Effect.provide(EffectLogger.layer))) + const listed = await run.promise(defs(name, client, timeout)) if (!listed) return if (s.clients[name] !== client || s.status[name]?.status !== "connected") return s.defs[name] = listed - await Effect.runPromise( - bus.publish(ToolsChanged, { server: name }).pipe(Effect.ignore, Effect.provide(EffectLogger.layer)), - ) + await run.promise(bus.publish(ToolsChanged, { server: name }).pipe(Effect.ignore)) }) } diff --git a/packages/opencode/src/plugin/index.ts b/packages/opencode/src/plugin/index.ts index 0dc53d997d..d4c29a0e7c 100644 --- a/packages/opencode/src/plugin/index.ts +++ b/packages/opencode/src/plugin/index.ts @@ -18,7 +18,6 @@ import { gitlabAuthPlugin as GitlabAuthPlugin } from "opencode-gitlab-auth" import { PoeAuthPlugin } from "opencode-poe-auth" import { CloudflareAIGatewayAuthPlugin, CloudflareWorkersAuthPlugin } from "./cloudflare" import { Effect, Layer, Context, Stream } from "effect" -import { EffectLogger } from "@/effect/logger" import { InstanceState } from "@/effect/instance-state" import { errorMessage } from "@/util/error" import { PluginLoader } from "./loader" @@ -90,14 +89,6 @@ export namespace Plugin { return result } - function publishPluginError(bus: Bus.Interface, message: string) { - Effect.runFork( - bus - .publish(Session.Event.Error, { error: new NamedError.Unknown({ message }).toObject() }) - .pipe(Effect.provide(EffectLogger.layer)), - ) - } - async function applyPlugin(load: PluginLoader.Loaded, input: PluginInput, hooks: Hooks[]) { const plugin = readV1Plugin(load.mod, load.spec, "server", "detect") if (plugin) { @@ -116,6 +107,13 @@ export namespace Plugin { Effect.gen(function* () { const bus = yield* Bus.Service const config = yield* Config.Service + const ctx = yield* Effect.context() + + function publishPluginError(message: string) { + Effect.runForkWith(ctx)( + bus.publish(Session.Event.Error, { error: new NamedError.Unknown({ message }).toObject() }), + ) + } const state = yield* InstanceState.make( Effect.fn("Plugin.state")(function* (ctx) { @@ -187,24 +185,24 @@ export namespace Plugin { if (stage === "install") { const parsed = parsePluginSpecifier(spec) log.error("failed to install plugin", { pkg: parsed.pkg, version: parsed.version, error: message }) - publishPluginError(bus, `Failed to install plugin ${parsed.pkg}@${parsed.version}: ${message}`) + publishPluginError(`Failed to install plugin ${parsed.pkg}@${parsed.version}: ${message}`) return } if (stage === "compatibility") { log.warn("plugin incompatible", { path: spec, error: message }) - publishPluginError(bus, `Plugin ${spec} skipped: ${message}`) + publishPluginError(`Plugin ${spec} skipped: ${message}`) return } if (stage === "entry") { log.error("failed to resolve plugin server entry", { path: spec, error: message }) - publishPluginError(bus, `Failed to load plugin ${spec}: ${message}`) + publishPluginError(`Failed to load plugin ${spec}: ${message}`) return } log.error("failed to load plugin", { path: spec, target: resolved?.entry, error: message }) - publishPluginError(bus, `Failed to load plugin ${spec}: ${message}`) + publishPluginError(`Failed to load plugin ${spec}: ${message}`) }, }, }), diff --git a/packages/opencode/src/provider/provider.ts b/packages/opencode/src/provider/provider.ts index 31d88f1f9a..291aa52c7a 100644 --- a/packages/opencode/src/provider/provider.ts +++ b/packages/opencode/src/provider/provider.ts @@ -19,7 +19,6 @@ import { iife } from "@/util/iife" import { Global } from "../global" import path from "path" import { Effect, Layer, Context } from "effect" -import { EffectLogger } from "@/effect/logger" import { InstanceState } from "@/effect/instance-state" import { AppFileSystem } from "@/filesystem" import { isRecord } from "@/util/record" @@ -1039,6 +1038,7 @@ export namespace Provider { const auth = yield* Auth.Service const env = yield* Env.Service const plugin = yield* Plugin.Service + const ctx = yield* Effect.context() const state = yield* InstanceState.make(() => Effect.gen(function* () { @@ -1223,8 +1223,7 @@ export namespace Provider { const options = yield* Effect.promise(() => plugin.auth!.loader!( - () => - Effect.runPromise(auth.get(providerID).pipe(Effect.orDie, Effect.provide(EffectLogger.layer))) as any, + () => Effect.runPromiseWith(ctx)(auth.get(providerID).pipe(Effect.orDie)) as any, database[plugin.auth!.provider], ), ) diff --git a/packages/opencode/src/pty/index.ts b/packages/opencode/src/pty/index.ts index 1891721851..e2fd31f6c0 100644 --- a/packages/opencode/src/pty/index.ts +++ b/packages/opencode/src/pty/index.ts @@ -10,7 +10,6 @@ import { Shell } from "@/shell/shell" import { Plugin } from "@/plugin" import { PtyID } from "./schema" import { Effect, Layer, Context } from "effect" -import { EffectLogger } from "@/effect/logger" export namespace Pty { const log = Log.create({ service: "pty" }) @@ -119,6 +118,7 @@ export namespace Pty { Effect.gen(function* () { const bus = yield* Bus.Service const plugin = yield* Plugin.Service + const ctx = yield* Effect.context() function teardown(session: Active) { try { session.process.kill() @@ -256,8 +256,8 @@ export namespace Pty { if (session.info.status === "exited") return log.info("session exited", { id, exitCode }) session.info.status = "exited" - Effect.runFork(bus.publish(Event.Exited, { id, exitCode }).pipe(Effect.provide(EffectLogger.layer))) - Effect.runFork(remove(id).pipe(Effect.provide(EffectLogger.layer))) + Effect.runForkWith(ctx)(bus.publish(Event.Exited, { id, exitCode })) + Effect.runForkWith(ctx)(remove(id)) }), ) yield* bus.publish(Event.Created, { info })