diff --git a/packages/opencode/src/effect/app-runtime.ts b/packages/opencode/src/effect/app-runtime.ts index 495cf9eea8..bd27df3435 100644 --- a/packages/opencode/src/effect/app-runtime.ts +++ b/packages/opencode/src/effect/app-runtime.ts @@ -1,6 +1,6 @@ import { Layer, ManagedRuntime } from "effect" import { attach, memoMap } from "./run-service" -import { Observability } from "./observability" +import { Observability } from "." import { AppFileSystem } from "@opencode-ai/shared/filesystem" import { Bus } from "@/bus" diff --git a/packages/opencode/src/effect/bootstrap-runtime.ts b/packages/opencode/src/effect/bootstrap-runtime.ts index 9be456b095..208a83bf85 100644 --- a/packages/opencode/src/effect/bootstrap-runtime.ts +++ b/packages/opencode/src/effect/bootstrap-runtime.ts @@ -10,7 +10,7 @@ import { File } from "@/file" import { Vcs } from "@/project" import { Snapshot } from "@/snapshot" import { Bus } from "@/bus" -import { Observability } from "./observability" +import { Observability } from "." export const BootstrapLayer = Layer.mergeAll( Plugin.defaultLayer, diff --git a/packages/opencode/src/effect/index.ts b/packages/opencode/src/effect/index.ts index d10afdff2b..410ce00c22 100644 --- a/packages/opencode/src/effect/index.ts +++ b/packages/opencode/src/effect/index.ts @@ -1,2 +1,5 @@ export * as InstanceState from "./instance-state" export * as EffectBridge from "./bridge" +export * as Runner from "./runner" +export * as Observability from "./observability" +export * as EffectLogger from "./logger" diff --git a/packages/opencode/src/effect/instance-state.ts b/packages/opencode/src/effect/instance-state.ts index b6249db049..d71f82df97 100644 --- a/packages/opencode/src/effect/instance-state.ts +++ b/packages/opencode/src/effect/instance-state.ts @@ -1,5 +1,5 @@ import { Effect, Fiber, ScopedCache, Scope, Context } from "effect" -import { EffectLogger } from "@/effect/logger" +import { EffectLogger } from "@/effect" import { Instance, type InstanceContext } from "@/project/instance" import { LocalContext } from "@/util" import { InstanceRef, WorkspaceRef } from "./instance-ref" diff --git a/packages/opencode/src/effect/logger.ts b/packages/opencode/src/effect/logger.ts index 7f21084ddc..97b614fc0a 100644 --- a/packages/opencode/src/effect/logger.ts +++ b/packages/opencode/src/effect/logger.ts @@ -1,67 +1,65 @@ import { Cause, Effect, Logger, References } from "effect" import { Log } from "@/util" -export namespace EffectLogger { - type Fields = Record +type Fields = Record - export interface Handle { - readonly debug: (msg?: unknown, extra?: Fields) => Effect.Effect - readonly info: (msg?: unknown, extra?: Fields) => Effect.Effect - readonly warn: (msg?: unknown, extra?: Fields) => Effect.Effect - readonly error: (msg?: unknown, extra?: Fields) => Effect.Effect - readonly with: (extra: Fields) => Handle - } - - const clean = (input?: Fields): Fields => - Object.fromEntries(Object.entries(input ?? {}).filter((entry) => entry[1] !== undefined && entry[1] !== null)) - - const text = (input: unknown): string => { - if (Array.isArray(input)) return input.map((item) => String(item)).join(" ") - return input === undefined ? "" : String(input) - } - - const call = (run: (msg?: unknown) => Effect.Effect, base: Fields, msg?: unknown, extra?: Fields) => { - const ann = clean({ ...base, ...extra }) - const fx = run(msg) - return Object.keys(ann).length ? Effect.annotateLogs(fx, ann) : fx - } - - export const logger = Logger.make((opts) => { - const extra = clean(opts.fiber.getRef(References.CurrentLogAnnotations)) - const now = opts.date.getTime() - for (const [key, start] of opts.fiber.getRef(References.CurrentLogSpans)) { - extra[`logSpan.${key}`] = `${now - start}ms` - } - if (opts.cause.reasons.length > 0) { - extra.cause = Cause.pretty(opts.cause) - } - - const svc = typeof extra.service === "string" ? extra.service : undefined - if (svc) delete extra.service - const log = svc ? Log.create({ service: svc }) : Log.Default - const msg = text(opts.message) - - switch (opts.logLevel) { - case "Trace": - case "Debug": - return log.debug(msg, extra) - case "Warn": - return log.warn(msg, extra) - case "Error": - case "Fatal": - return log.error(msg, extra) - default: - return log.info(msg, extra) - } - }) - - export const layer = Logger.layer([logger], { mergeWithExisting: false }) - - export const create = (base: Fields = {}): Handle => ({ - debug: (msg, extra) => call((item) => Effect.logDebug(item), base, msg, extra), - info: (msg, extra) => call((item) => Effect.logInfo(item), base, msg, extra), - warn: (msg, extra) => call((item) => Effect.logWarning(item), base, msg, extra), - error: (msg, extra) => call((item) => Effect.logError(item), base, msg, extra), - with: (extra) => create({ ...base, ...extra }), - }) +export interface Handle { + readonly debug: (msg?: unknown, extra?: Fields) => Effect.Effect + readonly info: (msg?: unknown, extra?: Fields) => Effect.Effect + readonly warn: (msg?: unknown, extra?: Fields) => Effect.Effect + readonly error: (msg?: unknown, extra?: Fields) => Effect.Effect + readonly with: (extra: Fields) => Handle } + +const clean = (input?: Fields): Fields => + Object.fromEntries(Object.entries(input ?? {}).filter((entry) => entry[1] !== undefined && entry[1] !== null)) + +const text = (input: unknown): string => { + if (Array.isArray(input)) return input.map((item) => String(item)).join(" ") + return input === undefined ? "" : String(input) +} + +const call = (run: (msg?: unknown) => Effect.Effect, base: Fields, msg?: unknown, extra?: Fields) => { + const ann = clean({ ...base, ...extra }) + const fx = run(msg) + return Object.keys(ann).length ? Effect.annotateLogs(fx, ann) : fx +} + +export const logger = Logger.make((opts) => { + const extra = clean(opts.fiber.getRef(References.CurrentLogAnnotations)) + const now = opts.date.getTime() + for (const [key, start] of opts.fiber.getRef(References.CurrentLogSpans)) { + extra[`logSpan.${key}`] = `${now - start}ms` + } + if (opts.cause.reasons.length > 0) { + extra.cause = Cause.pretty(opts.cause) + } + + const svc = typeof extra.service === "string" ? extra.service : undefined + if (svc) delete extra.service + const log = svc ? Log.create({ service: svc }) : Log.Default + const msg = text(opts.message) + + switch (opts.logLevel) { + case "Trace": + case "Debug": + return log.debug(msg, extra) + case "Warn": + return log.warn(msg, extra) + case "Error": + case "Fatal": + return log.error(msg, extra) + default: + return log.info(msg, extra) + } +}) + +export const layer = Logger.layer([logger], { mergeWithExisting: false }) + +export const create = (base: Fields = {}): Handle => ({ + debug: (msg, extra) => call((item) => Effect.logDebug(item), base, msg, extra), + info: (msg, extra) => call((item) => Effect.logInfo(item), base, msg, extra), + warn: (msg, extra) => call((item) => Effect.logWarning(item), base, msg, extra), + error: (msg, extra) => call((item) => Effect.logError(item), base, msg, extra), + with: (extra) => create({ ...base, ...extra }), +}) diff --git a/packages/opencode/src/effect/observability.ts b/packages/opencode/src/effect/observability.ts index f79306bf1e..4e8ae22217 100644 --- a/packages/opencode/src/effect/observability.ts +++ b/packages/opencode/src/effect/observability.ts @@ -1,80 +1,78 @@ import { Effect, Layer, Logger } from "effect" import { FetchHttpClient } from "effect/unstable/http" import { OtlpLogger, OtlpSerialization } from "effect/unstable/observability" -import { EffectLogger } from "@/effect/logger" +import { EffectLogger } from "@/effect" import { Flag } from "@/flag/flag" import { CHANNEL, VERSION } from "@/installation/meta" -export namespace Observability { - const base = Flag.OTEL_EXPORTER_OTLP_ENDPOINT - export const enabled = !!base +const base = Flag.OTEL_EXPORTER_OTLP_ENDPOINT +export const enabled = !!base - const headers = Flag.OTEL_EXPORTER_OTLP_HEADERS - ? Flag.OTEL_EXPORTER_OTLP_HEADERS.split(",").reduce( - (acc, x) => { - const [key, ...value] = x.split("=") - acc[key] = value.join("=") - return acc - }, - {} as Record, - ) - : undefined +const headers = Flag.OTEL_EXPORTER_OTLP_HEADERS + ? Flag.OTEL_EXPORTER_OTLP_HEADERS.split(",").reduce( + (acc, x) => { + const [key, ...value] = x.split("=") + acc[key] = value.join("=") + return acc + }, + {} as Record, + ) + : undefined - const resource = { - serviceName: "opencode", - serviceVersion: VERSION, - attributes: { - "deployment.environment.name": CHANNEL === "local" ? "local" : CHANNEL, - "opencode.client": Flag.OPENCODE_CLIENT, - }, - } +const resource = { + serviceName: "opencode", + serviceVersion: VERSION, + attributes: { + "deployment.environment.name": CHANNEL === "local" ? "local" : CHANNEL, + "opencode.client": Flag.OPENCODE_CLIENT, + }, +} - const logs = Logger.layer( - [ - EffectLogger.logger, - OtlpLogger.make({ - url: `${base}/v1/logs`, - resource, +const logs = Logger.layer( + [ + EffectLogger.logger, + OtlpLogger.make({ + url: `${base}/v1/logs`, + resource, + headers, + }), + ], + { mergeWithExisting: false }, +).pipe(Layer.provide(OtlpSerialization.layerJson), Layer.provide(FetchHttpClient.layer)) + +const traces = async () => { + const NodeSdk = await import("@effect/opentelemetry/NodeSdk") + const OTLP = await import("@opentelemetry/exporter-trace-otlp-http") + const SdkBase = await import("@opentelemetry/sdk-trace-base") + + // @effect/opentelemetry creates a NodeTracerProvider but never calls + // register(), so the global @opentelemetry/api context manager stays + // as the no-op default. Non-Effect code (like the AI SDK) that calls + // tracer.startActiveSpan() relies on context.active() to find the + // parent span — without a real context manager every span starts a + // new trace. Registering AsyncLocalStorageContextManager fixes this. + const { AsyncLocalStorageContextManager } = await import("@opentelemetry/context-async-hooks") + const { context } = await import("@opentelemetry/api") + const mgr = new AsyncLocalStorageContextManager() + mgr.enable() + context.setGlobalContextManager(mgr) + + return NodeSdk.layer(() => ({ + resource, + spanProcessor: new SdkBase.BatchSpanProcessor( + new OTLP.OTLPTraceExporter({ + url: `${base}/v1/traces`, headers, }), - ], - { mergeWithExisting: false }, - ).pipe(Layer.provide(OtlpSerialization.layerJson), Layer.provide(FetchHttpClient.layer)) - - const traces = async () => { - const NodeSdk = await import("@effect/opentelemetry/NodeSdk") - const OTLP = await import("@opentelemetry/exporter-trace-otlp-http") - const SdkBase = await import("@opentelemetry/sdk-trace-base") - - // @effect/opentelemetry creates a NodeTracerProvider but never calls - // register(), so the global @opentelemetry/api context manager stays - // as the no-op default. Non-Effect code (like the AI SDK) that calls - // tracer.startActiveSpan() relies on context.active() to find the - // parent span — without a real context manager every span starts a - // new trace. Registering AsyncLocalStorageContextManager fixes this. - const { AsyncLocalStorageContextManager } = await import("@opentelemetry/context-async-hooks") - const { context } = await import("@opentelemetry/api") - const mgr = new AsyncLocalStorageContextManager() - mgr.enable() - context.setGlobalContextManager(mgr) - - return NodeSdk.layer(() => ({ - resource, - spanProcessor: new SdkBase.BatchSpanProcessor( - new OTLP.OTLPTraceExporter({ - url: `${base}/v1/traces`, - headers, - }), - ), - })) - } - - export const layer = !base - ? EffectLogger.layer - : Layer.unwrap( - Effect.gen(function* () { - const trace = yield* Effect.promise(traces) - return Layer.mergeAll(trace, logs) - }), - ) + ), + })) } + +export const layer = !base + ? EffectLogger.layer + : Layer.unwrap( + Effect.gen(function* () { + const trace = yield* Effect.promise(traces) + return Layer.mergeAll(trace, logs) + }), + ) diff --git a/packages/opencode/src/effect/run-service.ts b/packages/opencode/src/effect/run-service.ts index 9553e7a3aa..a9d653b108 100644 --- a/packages/opencode/src/effect/run-service.ts +++ b/packages/opencode/src/effect/run-service.ts @@ -3,7 +3,7 @@ import * as Context from "effect/Context" import { Instance } from "@/project/instance" import { LocalContext } from "@/util" import { InstanceRef, WorkspaceRef } from "./instance-ref" -import { Observability } from "./observability" +import { Observability } from "." import { WorkspaceContext } from "@/control-plane/workspace-context" import type { InstanceContext } from "@/project/instance" diff --git a/packages/opencode/src/effect/runner.ts b/packages/opencode/src/effect/runner.ts index 38c45a6342..925c268f8e 100644 --- a/packages/opencode/src/effect/runner.ts +++ b/packages/opencode/src/effect/runner.ts @@ -1,208 +1,206 @@ import { Cause, Deferred, Effect, Exit, Fiber, Schema, Scope, SynchronizedRef } from "effect" export interface Runner { - readonly state: Runner.State + readonly state: State readonly busy: boolean readonly ensureRunning: (work: Effect.Effect) => Effect.Effect readonly startShell: (work: Effect.Effect) => Effect.Effect readonly cancel: Effect.Effect } -export namespace Runner { - export class Cancelled extends Schema.TaggedErrorClass()("RunnerCancelled", {}) {} +export class Cancelled extends Schema.TaggedErrorClass()("RunnerCancelled", {}) {} - interface RunHandle { - id: number - done: Deferred.Deferred - fiber: Fiber.Fiber +interface RunHandle { + id: number + done: Deferred.Deferred + fiber: Fiber.Fiber +} + +interface ShellHandle { + id: number + fiber: Fiber.Fiber +} + +interface PendingHandle { + id: number + done: Deferred.Deferred + work: Effect.Effect +} + +export type State = + | { readonly _tag: "Idle" } + | { readonly _tag: "Running"; readonly run: RunHandle } + | { readonly _tag: "Shell"; readonly shell: ShellHandle } + | { readonly _tag: "ShellThenRun"; readonly shell: ShellHandle; readonly run: PendingHandle } + +export const make = ( + scope: Scope.Scope, + opts?: { + onIdle?: Effect.Effect + onBusy?: Effect.Effect + onInterrupt?: Effect.Effect + busy?: () => never + }, +): Runner => { + const ref = SynchronizedRef.makeUnsafe>({ _tag: "Idle" }) + const idle = opts?.onIdle ?? Effect.void + const busy = opts?.onBusy ?? Effect.void + const onInterrupt = opts?.onInterrupt + let ids = 0 + + const state = () => SynchronizedRef.getUnsafe(ref) + const next = () => { + ids += 1 + return ids } - interface ShellHandle { - id: number - fiber: Fiber.Fiber - } + const complete = (done: Deferred.Deferred, exit: Exit.Exit) => + Exit.isFailure(exit) && Cause.hasInterruptsOnly(exit.cause) + ? Deferred.fail(done, new Cancelled()).pipe(Effect.asVoid) + : Deferred.done(done, exit).pipe(Effect.asVoid) - interface PendingHandle { - id: number - done: Deferred.Deferred - work: Effect.Effect - } + const idleIfCurrent = () => + SynchronizedRef.modify(ref, (st) => [st._tag === "Idle" ? idle : Effect.void, st] as const).pipe(Effect.flatten) - export type State = - | { readonly _tag: "Idle" } - | { readonly _tag: "Running"; readonly run: RunHandle } - | { readonly _tag: "Shell"; readonly shell: ShellHandle } - | { readonly _tag: "ShellThenRun"; readonly shell: ShellHandle; readonly run: PendingHandle } + const finishRun = (id: number, done: Deferred.Deferred, exit: Exit.Exit) => + SynchronizedRef.modify( + ref, + (st) => + [ + Effect.gen(function* () { + if (st._tag === "Running" && st.run.id === id) yield* idle + yield* complete(done, exit) + }), + st._tag === "Running" && st.run.id === id ? ({ _tag: "Idle" } as const) : st, + ] as const, + ).pipe(Effect.flatten) - export const make = ( - scope: Scope.Scope, - opts?: { - onIdle?: Effect.Effect - onBusy?: Effect.Effect - onInterrupt?: Effect.Effect - busy?: () => never - }, - ): Runner => { - const ref = SynchronizedRef.makeUnsafe>({ _tag: "Idle" }) - const idle = opts?.onIdle ?? Effect.void - const busy = opts?.onBusy ?? Effect.void - const onInterrupt = opts?.onInterrupt - let ids = 0 - - const state = () => SynchronizedRef.getUnsafe(ref) - const next = () => { - ids += 1 - return ids - } - - const complete = (done: Deferred.Deferred, exit: Exit.Exit) => - Exit.isFailure(exit) && Cause.hasInterruptsOnly(exit.cause) - ? Deferred.fail(done, new Cancelled()).pipe(Effect.asVoid) - : Deferred.done(done, exit).pipe(Effect.asVoid) - - const idleIfCurrent = () => - SynchronizedRef.modify(ref, (st) => [st._tag === "Idle" ? idle : Effect.void, st] as const).pipe(Effect.flatten) - - const finishRun = (id: number, done: Deferred.Deferred, exit: Exit.Exit) => - SynchronizedRef.modify( - ref, - (st) => - [ - Effect.gen(function* () { - if (st._tag === "Running" && st.run.id === id) yield* idle - yield* complete(done, exit) - }), - st._tag === "Running" && st.run.id === id ? ({ _tag: "Idle" } as const) : st, - ] as const, - ).pipe(Effect.flatten) - - const startRun = (work: Effect.Effect, done: Deferred.Deferred) => - Effect.gen(function* () { - const id = next() - const fiber = yield* work.pipe( - Effect.onExit((exit) => finishRun(id, done, exit)), - Effect.forkIn(scope), - ) - return { id, done, fiber } satisfies RunHandle - }) - - const finishShell = (id: number) => - SynchronizedRef.modifyEffect( - ref, - Effect.fnUntraced(function* (st) { - if (st._tag === "Shell" && st.shell.id === id) return [idle, { _tag: "Idle" }] as const - if (st._tag === "ShellThenRun" && st.shell.id === id) { - const run = yield* startRun(st.run.work, st.run.done) - return [Effect.void, { _tag: "Running", run }] as const - } - return [Effect.void, st] as const - }), - ).pipe(Effect.flatten) - - const stopShell = (shell: ShellHandle) => Fiber.interrupt(shell.fiber) - - const ensureRunning = (work: Effect.Effect) => - SynchronizedRef.modifyEffect( - ref, - Effect.fnUntraced(function* (st) { - switch (st._tag) { - case "Running": - case "ShellThenRun": - return [Deferred.await(st.run.done), st] as const - case "Shell": { - const run = { - id: next(), - done: yield* Deferred.make(), - work, - } satisfies PendingHandle - return [Deferred.await(run.done), { _tag: "ShellThenRun", shell: st.shell, run }] as const - } - case "Idle": { - const done = yield* Deferred.make() - const run = yield* startRun(work, done) - return [Deferred.await(done), { _tag: "Running", run }] as const - } - } - }), - ).pipe( - Effect.flatten, - Effect.catch( - (e): Effect.Effect => (e instanceof Cancelled ? (onInterrupt ?? Effect.die(e)) : Effect.fail(e as E)), - ), + const startRun = (work: Effect.Effect, done: Deferred.Deferred) => + Effect.gen(function* () { + const id = next() + const fiber = yield* work.pipe( + Effect.onExit((exit) => finishRun(id, done, exit)), + Effect.forkIn(scope), ) + return { id, done, fiber } satisfies RunHandle + }) - const startShell = (work: Effect.Effect) => - SynchronizedRef.modifyEffect( - ref, - Effect.fnUntraced(function* (st) { - if (st._tag !== "Idle") { - return [ - Effect.sync(() => { - if (opts?.busy) opts.busy() - throw new Error("Runner is busy") - }), - st, - ] as const + const finishShell = (id: number) => + SynchronizedRef.modifyEffect( + ref, + Effect.fnUntraced(function* (st) { + if (st._tag === "Shell" && st.shell.id === id) return [idle, { _tag: "Idle" }] as const + if (st._tag === "ShellThenRun" && st.shell.id === id) { + const run = yield* startRun(st.run.work, st.run.done) + return [Effect.void, { _tag: "Running", run }] as const + } + return [Effect.void, st] as const + }), + ).pipe(Effect.flatten) + + const stopShell = (shell: ShellHandle) => Fiber.interrupt(shell.fiber) + + const ensureRunning = (work: Effect.Effect) => + SynchronizedRef.modifyEffect( + ref, + Effect.fnUntraced(function* (st) { + switch (st._tag) { + case "Running": + case "ShellThenRun": + return [Deferred.await(st.run.done), st] as const + case "Shell": { + const run = { + id: next(), + done: yield* Deferred.make(), + work, + } satisfies PendingHandle + return [Deferred.await(run.done), { _tag: "ShellThenRun", shell: st.shell, run }] as const } - yield* busy - const id = next() - const fiber = yield* work.pipe(Effect.ensuring(finishShell(id)), Effect.forkChild) - const shell = { id, fiber } satisfies ShellHandle - return [ - Effect.gen(function* () { - const exit = yield* Fiber.await(fiber) - if (Exit.isSuccess(exit)) return exit.value - if (Cause.hasInterruptsOnly(exit.cause) && onInterrupt) return yield* onInterrupt - return yield* Effect.failCause(exit.cause) - }), - { _tag: "Shell", shell }, - ] as const - }), - ).pipe(Effect.flatten) + case "Idle": { + const done = yield* Deferred.make() + const run = yield* startRun(work, done) + return [Deferred.await(done), { _tag: "Running", run }] as const + } + } + }), + ).pipe( + Effect.flatten, + Effect.catch( + (e): Effect.Effect => (e instanceof Cancelled ? (onInterrupt ?? Effect.die(e)) : Effect.fail(e as E)), + ), + ) - const cancel = SynchronizedRef.modify(ref, (st) => { - switch (st._tag) { - case "Idle": - return [Effect.void, st] as const - case "Running": + const startShell = (work: Effect.Effect) => + SynchronizedRef.modifyEffect( + ref, + Effect.fnUntraced(function* (st) { + if (st._tag !== "Idle") { return [ - Effect.gen(function* () { - yield* Fiber.interrupt(st.run.fiber) - yield* Deferred.await(st.run.done).pipe(Effect.exit, Effect.asVoid) - yield* idleIfCurrent() + Effect.sync(() => { + if (opts?.busy) opts.busy() + throw new Error("Runner is busy") }), - { _tag: "Idle" } as const, + st, ] as const - case "Shell": - return [ - Effect.gen(function* () { - yield* stopShell(st.shell) - yield* idleIfCurrent() - }), - { _tag: "Idle" } as const, - ] as const - case "ShellThenRun": - return [ - Effect.gen(function* () { - yield* Deferred.fail(st.run.done, new Cancelled()).pipe(Effect.asVoid) - yield* stopShell(st.shell) - yield* idleIfCurrent() - }), - { _tag: "Idle" } as const, - ] as const - } - }).pipe(Effect.flatten) + } + yield* busy + const id = next() + const fiber = yield* work.pipe(Effect.ensuring(finishShell(id)), Effect.forkChild) + const shell = { id, fiber } satisfies ShellHandle + return [ + Effect.gen(function* () { + const exit = yield* Fiber.await(fiber) + if (Exit.isSuccess(exit)) return exit.value + if (Cause.hasInterruptsOnly(exit.cause) && onInterrupt) return yield* onInterrupt + return yield* Effect.failCause(exit.cause) + }), + { _tag: "Shell", shell }, + ] as const + }), + ).pipe(Effect.flatten) - return { - get state() { - return state() - }, - get busy() { - return state()._tag !== "Idle" - }, - ensureRunning, - startShell, - cancel, + const cancel = SynchronizedRef.modify(ref, (st) => { + switch (st._tag) { + case "Idle": + return [Effect.void, st] as const + case "Running": + return [ + Effect.gen(function* () { + yield* Fiber.interrupt(st.run.fiber) + yield* Deferred.await(st.run.done).pipe(Effect.exit, Effect.asVoid) + yield* idleIfCurrent() + }), + { _tag: "Idle" } as const, + ] as const + case "Shell": + return [ + Effect.gen(function* () { + yield* stopShell(st.shell) + yield* idleIfCurrent() + }), + { _tag: "Idle" } as const, + ] as const + case "ShellThenRun": + return [ + Effect.gen(function* () { + yield* Deferred.fail(st.run.done, new Cancelled()).pipe(Effect.asVoid) + yield* stopShell(st.shell) + yield* idleIfCurrent() + }), + { _tag: "Idle" } as const, + ] as const } + }).pipe(Effect.flatten) + + return { + get state() { + return state() + }, + get busy() { + return state()._tag !== "Idle" + }, + ensureRunning, + startShell, + cancel, } } diff --git a/packages/opencode/src/server/instance/httpapi/server.ts b/packages/opencode/src/server/instance/httpapi/server.ts index 62ffb5940d..299a177f50 100644 --- a/packages/opencode/src/server/instance/httpapi/server.ts +++ b/packages/opencode/src/server/instance/httpapi/server.ts @@ -3,7 +3,7 @@ import { HttpApiBuilder, HttpApiMiddleware, HttpApiSecurity } from "effect/unsta import { HttpRouter, HttpServer, HttpServerRequest } from "effect/unstable/http" import { AppRuntime } from "@/effect/app-runtime" import { InstanceRef, WorkspaceRef } from "@/effect/instance-ref" -import { Observability } from "@/effect/observability" +import { Observability } from "@/effect" import { memoMap } from "@/effect/run-service" import { Flag } from "@/flag/flag" import { InstanceBootstrap } from "@/project/bootstrap" diff --git a/packages/opencode/src/session/message-v2.ts b/packages/opencode/src/session/message-v2.ts index 2a501167a5..f4a7235e15 100644 --- a/packages/opencode/src/session/message-v2.ts +++ b/packages/opencode/src/session/message-v2.ts @@ -15,7 +15,7 @@ import type { SystemError } from "bun" import type { Provider } from "@/provider" import { ModelID, ProviderID } from "@/provider/schema" import { Effect } from "effect" -import { EffectLogger } from "@/effect/logger" +import { EffectLogger } from "@/effect" /** Error shape thrown by Bun's fetch() when gzip/br decompression fails mid-stream */ interface FetchDecompressionError extends Error { diff --git a/packages/opencode/src/session/prompt.ts b/packages/opencode/src/session/prompt.ts index a072633aa7..157533af0a 100644 --- a/packages/opencode/src/session/prompt.ts +++ b/packages/opencode/src/session/prompt.ts @@ -44,7 +44,7 @@ import { Truncate } from "@/tool/truncate" import { decodeDataUrl } from "@/util/data-url" import { Process } from "@/util" import { Cause, Effect, Exit, Layer, Option, Scope, Context } from "effect" -import { EffectLogger } from "@/effect/logger" +import { EffectLogger } from "@/effect" import { InstanceState } from "@/effect" import { TaskTool, type TaskPromptOps } from "@/tool/task" import { SessionRunState } from "./run-state" diff --git a/packages/opencode/src/session/run-state.ts b/packages/opencode/src/session/run-state.ts index 922daf1178..179f287fa8 100644 --- a/packages/opencode/src/session/run-state.ts +++ b/packages/opencode/src/session/run-state.ts @@ -1,5 +1,5 @@ import { InstanceState } from "@/effect" -import { Runner } from "@/effect/runner" +import { Runner } from "@/effect" import { Effect, Layer, Scope, Context } from "effect" import { Session } from "." import { MessageV2 } from "./message-v2" @@ -32,7 +32,7 @@ export namespace SessionRunState { const state = yield* InstanceState.make( Effect.fn("SessionRunState.state")(function* () { const scope = yield* Scope.Scope - const runners = new Map>() + const runners = new Map>() yield* Effect.addFinalizer( Effect.fnUntraced(function* () { yield* Effect.forEach(runners.values(), (runner) => runner.cancel, { diff --git a/packages/opencode/src/tool/external-directory.ts b/packages/opencode/src/tool/external-directory.ts index c91b698038..810206f817 100644 --- a/packages/opencode/src/tool/external-directory.ts +++ b/packages/opencode/src/tool/external-directory.ts @@ -1,6 +1,6 @@ import path from "path" import { Effect } from "effect" -import { EffectLogger } from "@/effect/logger" +import { EffectLogger } from "@/effect" import { InstanceState } from "@/effect" import type { Tool } from "./tool" import { Instance } from "../project/instance" diff --git a/packages/opencode/src/tool/skill.ts b/packages/opencode/src/tool/skill.ts index d5f3787ed6..eaec667e58 100644 --- a/packages/opencode/src/tool/skill.ts +++ b/packages/opencode/src/tool/skill.ts @@ -3,7 +3,7 @@ import { pathToFileURL } from "url" import z from "zod" import { Effect } from "effect" import * as Stream from "effect/Stream" -import { EffectLogger } from "@/effect/logger" +import { EffectLogger } from "@/effect" import { Ripgrep } from "../file/ripgrep" import { Skill } from "../skill" import { Tool } from "./tool" diff --git a/packages/opencode/test/effect/app-runtime-logger.test.ts b/packages/opencode/test/effect/app-runtime-logger.test.ts index 91f367ff3e..8d5649a20c 100644 --- a/packages/opencode/test/effect/app-runtime-logger.test.ts +++ b/packages/opencode/test/effect/app-runtime-logger.test.ts @@ -3,7 +3,7 @@ import { Context, Effect, Layer, Logger } from "effect" import { AppRuntime } from "../../src/effect/app-runtime" import { EffectBridge } from "../../src/effect" import { InstanceRef } from "../../src/effect/instance-ref" -import { EffectLogger } from "../../src/effect/logger" +import { EffectLogger } from "../../src/effect" import { makeRuntime } from "../../src/effect/run-service" import { Instance } from "../../src/project/instance" import { tmpdir } from "../fixture/fixture" diff --git a/packages/opencode/test/effect/runner.test.ts b/packages/opencode/test/effect/runner.test.ts index a91df76ebf..241e7c2a88 100644 --- a/packages/opencode/test/effect/runner.test.ts +++ b/packages/opencode/test/effect/runner.test.ts @@ -1,6 +1,6 @@ import { describe, expect, test } from "bun:test" import { Deferred, Effect, Exit, Fiber, Ref, Scope } from "effect" -import { Runner } from "../../src/effect/runner" +import { Runner } from "../../src/effect" import { it } from "../lib/effect" describe("Runner", () => {