mirror of
https://fastgit.cc/https://github.com/anomalyco/opencode
synced 2026-04-21 05:10:58 +08:00
fix(effect): preserve context across callback bridges
This commit is contained in:
@@ -1,6 +1,5 @@
|
||||
import z from "zod"
|
||||
import { Effect, Exit, Layer, PubSub, Scope, Context, Stream } from "effect"
|
||||
import { EffectLogger } from "@/effect/logger"
|
||||
import { Log } from "../util/log"
|
||||
import { BusEvent } from "./bus-event"
|
||||
import { GlobalBus } from "./global"
|
||||
@@ -128,6 +127,7 @@ export namespace Bus {
|
||||
function on<T>(pubsub: PubSub.PubSub<T>, type: string, callback: (event: T) => unknown) {
|
||||
return Effect.gen(function* () {
|
||||
log.info("subscribing", { type })
|
||||
const ctx = yield* Effect.context()
|
||||
const scope = yield* Scope.make()
|
||||
const subscription = yield* Scope.provide(scope)(PubSub.subscribe(pubsub))
|
||||
|
||||
@@ -147,7 +147,7 @@ export namespace Bus {
|
||||
|
||||
return () => {
|
||||
log.info("unsubscribing", { type })
|
||||
Effect.runFork(Scope.close(scope, Exit.void).pipe(Effect.provide(EffectLogger.layer)))
|
||||
Effect.runForkWith(ctx)(Scope.close(scope, Exit.void))
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
@@ -3,7 +3,6 @@ import { InstanceState } from "@/effect/instance-state"
|
||||
import type { InstanceContext } from "@/project/instance"
|
||||
import { SessionID, MessageID } from "@/session/schema"
|
||||
import { Effect, Layer, Context } from "effect"
|
||||
import { EffectLogger } from "@/effect/logger"
|
||||
import z from "zod"
|
||||
import { Config } from "../config/config"
|
||||
import { MCP } from "../mcp"
|
||||
@@ -79,6 +78,7 @@ export namespace Command {
|
||||
const config = yield* Config.Service
|
||||
const mcp = yield* MCP.Service
|
||||
const skill = yield* Skill.Service
|
||||
const fx = yield* Effect.context()
|
||||
|
||||
const init = Effect.fn("Command.state")(function* (ctx: InstanceContext) {
|
||||
const cfg = yield* config.get()
|
||||
@@ -125,7 +125,7 @@ export namespace Command {
|
||||
source: "mcp",
|
||||
description: prompt.description,
|
||||
get template() {
|
||||
return Effect.runPromise(
|
||||
return Effect.runPromiseWith(fx)(
|
||||
mcp
|
||||
.getPrompt(
|
||||
prompt.client,
|
||||
@@ -141,7 +141,6 @@ export namespace Command {
|
||||
.map((message) => (message.content.type === "text" ? message.content.text : ""))
|
||||
.join("\n") || "",
|
||||
),
|
||||
Effect.provide(EffectLogger.layer),
|
||||
),
|
||||
)
|
||||
},
|
||||
|
||||
@@ -25,7 +25,6 @@ import { Bus } from "@/bus"
|
||||
import { TuiEvent } from "@/cli/cmd/tui/event"
|
||||
import open from "open"
|
||||
import { Effect, Exit, Layer, Option, Context, Stream } from "effect"
|
||||
import { EffectLogger } from "@/effect/logger"
|
||||
import { InstanceState } from "@/effect/instance-state"
|
||||
import { ChildProcess, ChildProcessSpawner } from "effect/unstable/process"
|
||||
import * as CrossSpawnSpawner from "@/effect/cross-spawn-spawner"
|
||||
@@ -444,6 +443,11 @@ export namespace MCP {
|
||||
return { mcpClient, status, defs: listed } satisfies CreateResult
|
||||
})
|
||||
const cfgSvc = yield* Config.Service
|
||||
const ctx = yield* Effect.context()
|
||||
|
||||
const run = {
|
||||
promise: <A, E>(effect: Effect.Effect<A, E>) => Effect.runPromiseWith(ctx)(effect),
|
||||
}
|
||||
|
||||
const descendants = Effect.fnUntraced(
|
||||
function* (pid: number) {
|
||||
@@ -476,14 +480,12 @@ export namespace MCP {
|
||||
log.info("tools list changed notification received", { server: name })
|
||||
if (s.clients[name] !== client || s.status[name]?.status !== "connected") return
|
||||
|
||||
const listed = await Effect.runPromise(defs(name, client, timeout).pipe(Effect.provide(EffectLogger.layer)))
|
||||
const listed = await run.promise(defs(name, client, timeout))
|
||||
if (!listed) return
|
||||
if (s.clients[name] !== client || s.status[name]?.status !== "connected") return
|
||||
|
||||
s.defs[name] = listed
|
||||
await Effect.runPromise(
|
||||
bus.publish(ToolsChanged, { server: name }).pipe(Effect.ignore, Effect.provide(EffectLogger.layer)),
|
||||
)
|
||||
await run.promise(bus.publish(ToolsChanged, { server: name }).pipe(Effect.ignore))
|
||||
})
|
||||
}
|
||||
|
||||
|
||||
@@ -18,7 +18,6 @@ import { gitlabAuthPlugin as GitlabAuthPlugin } from "opencode-gitlab-auth"
|
||||
import { PoeAuthPlugin } from "opencode-poe-auth"
|
||||
import { CloudflareAIGatewayAuthPlugin, CloudflareWorkersAuthPlugin } from "./cloudflare"
|
||||
import { Effect, Layer, Context, Stream } from "effect"
|
||||
import { EffectLogger } from "@/effect/logger"
|
||||
import { InstanceState } from "@/effect/instance-state"
|
||||
import { errorMessage } from "@/util/error"
|
||||
import { PluginLoader } from "./loader"
|
||||
@@ -90,14 +89,6 @@ export namespace Plugin {
|
||||
return result
|
||||
}
|
||||
|
||||
function publishPluginError(bus: Bus.Interface, message: string) {
|
||||
Effect.runFork(
|
||||
bus
|
||||
.publish(Session.Event.Error, { error: new NamedError.Unknown({ message }).toObject() })
|
||||
.pipe(Effect.provide(EffectLogger.layer)),
|
||||
)
|
||||
}
|
||||
|
||||
async function applyPlugin(load: PluginLoader.Loaded, input: PluginInput, hooks: Hooks[]) {
|
||||
const plugin = readV1Plugin(load.mod, load.spec, "server", "detect")
|
||||
if (plugin) {
|
||||
@@ -116,6 +107,13 @@ export namespace Plugin {
|
||||
Effect.gen(function* () {
|
||||
const bus = yield* Bus.Service
|
||||
const config = yield* Config.Service
|
||||
const ctx = yield* Effect.context()
|
||||
|
||||
function publishPluginError(message: string) {
|
||||
Effect.runForkWith(ctx)(
|
||||
bus.publish(Session.Event.Error, { error: new NamedError.Unknown({ message }).toObject() }),
|
||||
)
|
||||
}
|
||||
|
||||
const state = yield* InstanceState.make<State>(
|
||||
Effect.fn("Plugin.state")(function* (ctx) {
|
||||
@@ -187,24 +185,24 @@ export namespace Plugin {
|
||||
if (stage === "install") {
|
||||
const parsed = parsePluginSpecifier(spec)
|
||||
log.error("failed to install plugin", { pkg: parsed.pkg, version: parsed.version, error: message })
|
||||
publishPluginError(bus, `Failed to install plugin ${parsed.pkg}@${parsed.version}: ${message}`)
|
||||
publishPluginError(`Failed to install plugin ${parsed.pkg}@${parsed.version}: ${message}`)
|
||||
return
|
||||
}
|
||||
|
||||
if (stage === "compatibility") {
|
||||
log.warn("plugin incompatible", { path: spec, error: message })
|
||||
publishPluginError(bus, `Plugin ${spec} skipped: ${message}`)
|
||||
publishPluginError(`Plugin ${spec} skipped: ${message}`)
|
||||
return
|
||||
}
|
||||
|
||||
if (stage === "entry") {
|
||||
log.error("failed to resolve plugin server entry", { path: spec, error: message })
|
||||
publishPluginError(bus, `Failed to load plugin ${spec}: ${message}`)
|
||||
publishPluginError(`Failed to load plugin ${spec}: ${message}`)
|
||||
return
|
||||
}
|
||||
|
||||
log.error("failed to load plugin", { path: spec, target: resolved?.entry, error: message })
|
||||
publishPluginError(bus, `Failed to load plugin ${spec}: ${message}`)
|
||||
publishPluginError(`Failed to load plugin ${spec}: ${message}`)
|
||||
},
|
||||
},
|
||||
}),
|
||||
|
||||
@@ -19,7 +19,6 @@ import { iife } from "@/util/iife"
|
||||
import { Global } from "../global"
|
||||
import path from "path"
|
||||
import { Effect, Layer, Context } from "effect"
|
||||
import { EffectLogger } from "@/effect/logger"
|
||||
import { InstanceState } from "@/effect/instance-state"
|
||||
import { AppFileSystem } from "@/filesystem"
|
||||
import { isRecord } from "@/util/record"
|
||||
@@ -1039,6 +1038,7 @@ export namespace Provider {
|
||||
const auth = yield* Auth.Service
|
||||
const env = yield* Env.Service
|
||||
const plugin = yield* Plugin.Service
|
||||
const ctx = yield* Effect.context()
|
||||
|
||||
const state = yield* InstanceState.make<State>(() =>
|
||||
Effect.gen(function* () {
|
||||
@@ -1223,8 +1223,7 @@ export namespace Provider {
|
||||
|
||||
const options = yield* Effect.promise(() =>
|
||||
plugin.auth!.loader!(
|
||||
() =>
|
||||
Effect.runPromise(auth.get(providerID).pipe(Effect.orDie, Effect.provide(EffectLogger.layer))) as any,
|
||||
() => Effect.runPromiseWith(ctx)(auth.get(providerID).pipe(Effect.orDie)) as any,
|
||||
database[plugin.auth!.provider],
|
||||
),
|
||||
)
|
||||
|
||||
@@ -10,7 +10,6 @@ import { Shell } from "@/shell/shell"
|
||||
import { Plugin } from "@/plugin"
|
||||
import { PtyID } from "./schema"
|
||||
import { Effect, Layer, Context } from "effect"
|
||||
import { EffectLogger } from "@/effect/logger"
|
||||
|
||||
export namespace Pty {
|
||||
const log = Log.create({ service: "pty" })
|
||||
@@ -119,6 +118,7 @@ export namespace Pty {
|
||||
Effect.gen(function* () {
|
||||
const bus = yield* Bus.Service
|
||||
const plugin = yield* Plugin.Service
|
||||
const ctx = yield* Effect.context()
|
||||
function teardown(session: Active) {
|
||||
try {
|
||||
session.process.kill()
|
||||
@@ -256,8 +256,8 @@ export namespace Pty {
|
||||
if (session.info.status === "exited") return
|
||||
log.info("session exited", { id, exitCode })
|
||||
session.info.status = "exited"
|
||||
Effect.runFork(bus.publish(Event.Exited, { id, exitCode }).pipe(Effect.provide(EffectLogger.layer)))
|
||||
Effect.runFork(remove(id).pipe(Effect.provide(EffectLogger.layer)))
|
||||
Effect.runForkWith(ctx)(bus.publish(Event.Exited, { id, exitCode }))
|
||||
Effect.runForkWith(ctx)(remove(id))
|
||||
}),
|
||||
)
|
||||
yield* bus.publish(Event.Created, { info })
|
||||
|
||||
Reference in New Issue
Block a user