diff --git a/packages/opencode/src/bus/bus.ts b/packages/opencode/src/bus/bus.ts deleted file mode 100644 index beac809925..0000000000 --- a/packages/opencode/src/bus/bus.ts +++ /dev/null @@ -1,191 +0,0 @@ -import z from "zod" -import { Effect, Exit, Layer, PubSub, Scope, Context, Stream } from "effect" -import { EffectBridge } from "@/effect" -import { Log } from "../util" -import { BusEvent } from "./bus-event" -import { GlobalBus } from "./global" -import { InstanceState } from "@/effect" -import { makeRuntime } from "@/effect/run-service" - -const log = Log.create({ service: "bus" }) - -export const InstanceDisposed = BusEvent.define( - "server.instance.disposed", - z.object({ - directory: z.string(), - }), -) - -type Payload = { - type: D["type"] - properties: z.infer -} - -type State = { - wildcard: PubSub.PubSub - typed: Map> -} - -export interface Interface { - readonly publish: ( - def: D, - properties: z.output, - ) => Effect.Effect - readonly subscribe: (def: D) => Stream.Stream> - readonly subscribeAll: () => Stream.Stream - readonly subscribeCallback: ( - def: D, - callback: (event: Payload) => unknown, - ) => Effect.Effect<() => void> - readonly subscribeAllCallback: (callback: (event: any) => unknown) => Effect.Effect<() => void> -} - -export class Service extends Context.Service()("@opencode/Bus") {} - -export const layer = Layer.effect( - Service, - Effect.gen(function* () { - const state = yield* InstanceState.make( - Effect.fn("Bus.state")(function* (ctx) { - const wildcard = yield* PubSub.unbounded() - const typed = new Map>() - - yield* Effect.addFinalizer(() => - Effect.gen(function* () { - // Publish InstanceDisposed before shutting down so subscribers see it - yield* PubSub.publish(wildcard, { - type: InstanceDisposed.type, - properties: { directory: ctx.directory }, - }) - yield* PubSub.shutdown(wildcard) - for (const ps of typed.values()) { - yield* PubSub.shutdown(ps) - } - }), - ) - - return { wildcard, typed } - }), - ) - - function getOrCreate(state: State, def: D) { - return Effect.gen(function* () { - let ps = state.typed.get(def.type) - if (!ps) { - ps = yield* PubSub.unbounded() - state.typed.set(def.type, ps) - } - return ps as unknown as PubSub.PubSub> - }) - } - - function publish(def: D, properties: z.output) { - return Effect.gen(function* () { - const s = yield* InstanceState.get(state) - const payload: Payload = { type: def.type, properties } - log.info("publishing", { type: def.type }) - - const ps = s.typed.get(def.type) - if (ps) yield* PubSub.publish(ps, payload) - yield* PubSub.publish(s.wildcard, payload) - - const dir = yield* InstanceState.directory - const context = yield* InstanceState.context - const workspace = yield* InstanceState.workspaceID - - GlobalBus.emit("event", { - directory: dir, - project: context.project.id, - workspace, - payload, - }) - }) - } - - function subscribe(def: D): Stream.Stream> { - log.info("subscribing", { type: def.type }) - return Stream.unwrap( - Effect.gen(function* () { - const s = yield* InstanceState.get(state) - const ps = yield* getOrCreate(s, def) - return Stream.fromPubSub(ps) - }), - ).pipe(Stream.ensuring(Effect.sync(() => log.info("unsubscribing", { type: def.type })))) - } - - function subscribeAll(): Stream.Stream { - log.info("subscribing", { type: "*" }) - return Stream.unwrap( - Effect.gen(function* () { - const s = yield* InstanceState.get(state) - return Stream.fromPubSub(s.wildcard) - }), - ).pipe(Stream.ensuring(Effect.sync(() => log.info("unsubscribing", { type: "*" })))) - } - - function on(pubsub: PubSub.PubSub, type: string, callback: (event: T) => unknown) { - return Effect.gen(function* () { - log.info("subscribing", { type }) - const bridge = yield* EffectBridge.make() - const scope = yield* Scope.make() - const subscription = yield* Scope.provide(scope)(PubSub.subscribe(pubsub)) - - yield* Scope.provide(scope)( - Stream.fromSubscription(subscription).pipe( - Stream.runForEach((msg) => - Effect.tryPromise({ - try: () => Promise.resolve().then(() => callback(msg)), - catch: (cause) => { - log.error("subscriber failed", { type, cause }) - }, - }).pipe(Effect.ignore), - ), - Effect.forkScoped, - ), - ) - - return () => { - log.info("unsubscribing", { type }) - bridge.fork(Scope.close(scope, Exit.void)) - } - }) - } - - const subscribeCallback = Effect.fn("Bus.subscribeCallback")(function* ( - def: D, - callback: (event: Payload) => unknown, - ) { - const s = yield* InstanceState.get(state) - const ps = yield* getOrCreate(s, def) - return yield* on(ps, def.type, callback) - }) - - const subscribeAllCallback = Effect.fn("Bus.subscribeAllCallback")(function* (callback: (event: any) => unknown) { - const s = yield* InstanceState.get(state) - return yield* on(s.wildcard, "*", callback) - }) - - return Service.of({ publish, subscribe, subscribeAll, subscribeCallback, subscribeAllCallback }) - }), -) - -export const defaultLayer = layer - -const { runPromise, runSync } = makeRuntime(Service, layer) - -// runSync is safe here because the subscribe chain (InstanceState.get, PubSub.subscribe, -// Scope.make, Effect.forkScoped) is entirely synchronous. If any step becomes async, this will throw. -export async function publish(def: D, properties: z.output) { - return runPromise((svc) => svc.publish(def, properties)) -} - -export function subscribe( - def: D, - callback: (event: { type: D["type"]; properties: z.infer }) => unknown, -) { - return runSync((svc) => svc.subscribeCallback(def, callback)) -} - -export function subscribeAll(callback: (event: any) => unknown) { - return runSync((svc) => svc.subscribeAllCallback(callback)) -} diff --git a/packages/opencode/src/bus/index.ts b/packages/opencode/src/bus/index.ts index 3c21d7c7d1..8a9579b599 100644 --- a/packages/opencode/src/bus/index.ts +++ b/packages/opencode/src/bus/index.ts @@ -1 +1,193 @@ -export * as Bus from "./bus" +import z from "zod" +import { Effect, Exit, Layer, PubSub, Scope, Context, Stream } from "effect" +import { EffectBridge } from "@/effect" +import { Log } from "../util" +import { BusEvent } from "./bus-event" +import { GlobalBus } from "./global" +import { InstanceState } from "@/effect" +import { makeRuntime } from "@/effect/run-service" + +const log = Log.create({ service: "bus" }) + +export const InstanceDisposed = BusEvent.define( + "server.instance.disposed", + z.object({ + directory: z.string(), + }), +) + +type Payload = { + type: D["type"] + properties: z.infer +} + +type State = { + wildcard: PubSub.PubSub + typed: Map> +} + +export interface Interface { + readonly publish: ( + def: D, + properties: z.output, + ) => Effect.Effect + readonly subscribe: (def: D) => Stream.Stream> + readonly subscribeAll: () => Stream.Stream + readonly subscribeCallback: ( + def: D, + callback: (event: Payload) => unknown, + ) => Effect.Effect<() => void> + readonly subscribeAllCallback: (callback: (event: any) => unknown) => Effect.Effect<() => void> +} + +export class Service extends Context.Service()("@opencode/Bus") {} + +export const layer = Layer.effect( + Service, + Effect.gen(function* () { + const state = yield* InstanceState.make( + Effect.fn("Bus.state")(function* (ctx) { + const wildcard = yield* PubSub.unbounded() + const typed = new Map>() + + yield* Effect.addFinalizer(() => + Effect.gen(function* () { + // Publish InstanceDisposed before shutting down so subscribers see it + yield* PubSub.publish(wildcard, { + type: InstanceDisposed.type, + properties: { directory: ctx.directory }, + }) + yield* PubSub.shutdown(wildcard) + for (const ps of typed.values()) { + yield* PubSub.shutdown(ps) + } + }), + ) + + return { wildcard, typed } + }), + ) + + function getOrCreate(state: State, def: D) { + return Effect.gen(function* () { + let ps = state.typed.get(def.type) + if (!ps) { + ps = yield* PubSub.unbounded() + state.typed.set(def.type, ps) + } + return ps as unknown as PubSub.PubSub> + }) + } + + function publish(def: D, properties: z.output) { + return Effect.gen(function* () { + const s = yield* InstanceState.get(state) + const payload: Payload = { type: def.type, properties } + log.info("publishing", { type: def.type }) + + const ps = s.typed.get(def.type) + if (ps) yield* PubSub.publish(ps, payload) + yield* PubSub.publish(s.wildcard, payload) + + const dir = yield* InstanceState.directory + const context = yield* InstanceState.context + const workspace = yield* InstanceState.workspaceID + + GlobalBus.emit("event", { + directory: dir, + project: context.project.id, + workspace, + payload, + }) + }) + } + + function subscribe(def: D): Stream.Stream> { + log.info("subscribing", { type: def.type }) + return Stream.unwrap( + Effect.gen(function* () { + const s = yield* InstanceState.get(state) + const ps = yield* getOrCreate(s, def) + return Stream.fromPubSub(ps) + }), + ).pipe(Stream.ensuring(Effect.sync(() => log.info("unsubscribing", { type: def.type })))) + } + + function subscribeAll(): Stream.Stream { + log.info("subscribing", { type: "*" }) + return Stream.unwrap( + Effect.gen(function* () { + const s = yield* InstanceState.get(state) + return Stream.fromPubSub(s.wildcard) + }), + ).pipe(Stream.ensuring(Effect.sync(() => log.info("unsubscribing", { type: "*" })))) + } + + function on(pubsub: PubSub.PubSub, type: string, callback: (event: T) => unknown) { + return Effect.gen(function* () { + log.info("subscribing", { type }) + const bridge = yield* EffectBridge.make() + const scope = yield* Scope.make() + const subscription = yield* Scope.provide(scope)(PubSub.subscribe(pubsub)) + + yield* Scope.provide(scope)( + Stream.fromSubscription(subscription).pipe( + Stream.runForEach((msg) => + Effect.tryPromise({ + try: () => Promise.resolve().then(() => callback(msg)), + catch: (cause) => { + log.error("subscriber failed", { type, cause }) + }, + }).pipe(Effect.ignore), + ), + Effect.forkScoped, + ), + ) + + return () => { + log.info("unsubscribing", { type }) + bridge.fork(Scope.close(scope, Exit.void)) + } + }) + } + + const subscribeCallback = Effect.fn("Bus.subscribeCallback")(function* ( + def: D, + callback: (event: Payload) => unknown, + ) { + const s = yield* InstanceState.get(state) + const ps = yield* getOrCreate(s, def) + return yield* on(ps, def.type, callback) + }) + + const subscribeAllCallback = Effect.fn("Bus.subscribeAllCallback")(function* (callback: (event: any) => unknown) { + const s = yield* InstanceState.get(state) + return yield* on(s.wildcard, "*", callback) + }) + + return Service.of({ publish, subscribe, subscribeAll, subscribeCallback, subscribeAllCallback }) + }), +) + +export const defaultLayer = layer + +const { runPromise, runSync } = makeRuntime(Service, layer) + +// runSync is safe here because the subscribe chain (InstanceState.get, PubSub.subscribe, +// Scope.make, Effect.forkScoped) is entirely synchronous. If any step becomes async, this will throw. +export async function publish(def: D, properties: z.output) { + return runPromise((svc) => svc.publish(def, properties)) +} + +export function subscribe( + def: D, + callback: (event: { type: D["type"]; properties: z.infer }) => unknown, +) { + return runSync((svc) => svc.subscribeCallback(def, callback)) +} + +export function subscribeAll(callback: (event: any) => unknown) { + return runSync((svc) => svc.subscribeAllCallback(callback)) +} + +export * as Bus from "."