diff --git a/packages/opencode/src/session/index.ts b/packages/opencode/src/session/index.ts index d7bf99637a..26b2da5b06 100644 --- a/packages/opencode/src/session/index.ts +++ b/packages/opencode/src/session/index.ts @@ -366,11 +366,12 @@ export namespace Session { const db = (fn: (d: Parameters[0] extends (trx: infer D) => any ? D : never) => T) => Effect.sync(() => Database.use(fn)) - export const layer: Layer.Layer = Layer.effect( + export const layer: Layer.Layer = Layer.effect( Service, Effect.gen(function* () { const bus = yield* Bus.Service const storage = yield* Storage.Service + const sync = yield* SyncEvent.Service const createNext = Effect.fn("Session.createNext")(function* (input: { id?: SessionID @@ -398,7 +399,7 @@ export namespace Session { } log.info("created", result) - yield* Effect.sync(() => SyncEvent.run(Event.Created, { sessionID: result.id, info: result })) + yield* sync.run(Event.Created, { sessionID: result.id, info: result }) if (!Flag.OPENCODE_EXPERIMENTAL_WORKSPACES) { // This only exist for backwards compatibility. We should not be @@ -446,10 +447,12 @@ export namespace Session { Effect.catchCause(() => Effect.succeed(false)), ) - yield* Effect.sync(() => { - SyncEvent.run(Event.Deleted, { sessionID, info: session }, { publish: hasInstance }) - SyncEvent.remove(sessionID) - }) + if (hasInstance) { + yield* sync.run(Event.Deleted, { sessionID, info: session }, { publish: true }) + } else { + yield* Effect.sync(() => SyncEvent.run(Event.Deleted, { sessionID, info: session }, { publish: false })) + } + yield* sync.remove(sessionID) } catch (e) { log.error(e) } @@ -457,19 +460,17 @@ export namespace Session { const updateMessage = (msg: T): Effect.Effect => Effect.gen(function* () { - yield* Effect.sync(() => SyncEvent.run(MessageV2.Event.Updated, { sessionID: msg.sessionID, info: msg })) + yield* sync.run(MessageV2.Event.Updated, { sessionID: msg.sessionID, info: msg }) return msg }).pipe(Effect.withSpan("Session.updateMessage")) const updatePart = (part: T): Effect.Effect => Effect.gen(function* () { - yield* Effect.sync(() => - SyncEvent.run(MessageV2.Event.PartUpdated, { - sessionID: part.sessionID, - part: structuredClone(part), - time: Date.now(), - }), - ) + yield* sync.run(MessageV2.Event.PartUpdated, { + sessionID: part.sessionID, + part: structuredClone(part), + time: Date.now(), + }) return part }).pipe(Effect.withSpan("Session.updatePart")) @@ -549,8 +550,7 @@ export namespace Session { return session }) - const patch = (sessionID: SessionID, info: Patch) => - Effect.sync(() => SyncEvent.run(Event.Updated, { sessionID, info })) + const patch = (sessionID: SessionID, info: Patch) => sync.run(Event.Updated, { sessionID, info }) const touch = Effect.fn("Session.touch")(function* (sessionID: SessionID) { yield* patch(sessionID, { time: { updated: Date.now() } }) @@ -607,12 +607,10 @@ export namespace Session { sessionID: SessionID messageID: MessageID }) { - yield* Effect.sync(() => - SyncEvent.run(MessageV2.Event.Removed, { - sessionID: input.sessionID, - messageID: input.messageID, - }), - ) + yield* sync.run(MessageV2.Event.Removed, { + sessionID: input.sessionID, + messageID: input.messageID, + }) return input.messageID }) @@ -621,13 +619,11 @@ export namespace Session { messageID: MessageID partID: PartID }) { - yield* Effect.sync(() => - SyncEvent.run(MessageV2.Event.PartRemoved, { - sessionID: input.sessionID, - messageID: input.messageID, - partID: input.partID, - }), - ) + yield* sync.run(MessageV2.Event.PartRemoved, { + sessionID: input.sessionID, + messageID: input.messageID, + partID: input.partID, + }) return input.partID }) @@ -678,7 +674,11 @@ export namespace Session { }), ) - export const defaultLayer = layer.pipe(Layer.provide(Bus.layer), Layer.provide(Storage.defaultLayer)) + export const defaultLayer = layer.pipe( + Layer.provide(Bus.layer), + Layer.provide(Storage.defaultLayer), + Layer.provide(SyncEvent.defaultLayer), + ) const { runPromise } = makeRuntime(Service, defaultLayer) diff --git a/packages/opencode/src/session/revert.ts b/packages/opencode/src/session/revert.ts index 416b8555de..afff158d1b 100644 --- a/packages/opencode/src/session/revert.ts +++ b/packages/opencode/src/session/revert.ts @@ -40,6 +40,7 @@ export namespace SessionRevert { const bus = yield* Bus.Service const summary = yield* SessionSummary.Service const state = yield* SessionRunState.Service + const sync = yield* SyncEvent.Service const revert = Effect.fn("SessionRevert.revert")(function* (input: RevertInput) { yield* state.assertNotBusy(input.sessionID) @@ -123,7 +124,7 @@ export namespace SessionRevert { remove.push(msg) } for (const msg of remove) { - SyncEvent.run(MessageV2.Event.Removed, { + yield* sync.run(MessageV2.Event.Removed, { sessionID, messageID: msg.info.id, }) @@ -135,7 +136,7 @@ export namespace SessionRevert { const removeParts = target.parts.slice(idx) target.parts = target.parts.slice(0, idx) for (const part of removeParts) { - SyncEvent.run(MessageV2.Event.PartRemoved, { + yield* sync.run(MessageV2.Event.PartRemoved, { sessionID, messageID: target.info.id, partID: part.id, @@ -158,6 +159,7 @@ export namespace SessionRevert { Layer.provide(Storage.defaultLayer), Layer.provide(Bus.layer), Layer.provide(SessionSummary.defaultLayer), + Layer.provide(SyncEvent.defaultLayer), ), ) diff --git a/packages/opencode/src/share/session.ts b/packages/opencode/src/share/session.ts index f98bf14cb4..9c56aefa96 100644 --- a/packages/opencode/src/share/session.ts +++ b/packages/opencode/src/share/session.ts @@ -24,20 +24,19 @@ export namespace SessionShare { const session = yield* Session.Service const shareNext = yield* ShareNext.Service const scope = yield* Scope.Scope + const sync = yield* SyncEvent.Service const share = Effect.fn("SessionShare.share")(function* (sessionID: SessionID) { const conf = yield* cfg.get() if (conf.share === "disabled") throw new Error("Sharing is disabled in configuration") const result = yield* shareNext.create(sessionID) - yield* Effect.sync(() => - SyncEvent.run(Session.Event.Updated, { sessionID, info: { share: { url: result.url } } }), - ) + yield* sync.run(Session.Event.Updated, { sessionID, info: { share: { url: result.url } } }) return result }) const unshare = Effect.fn("SessionShare.unshare")(function* (sessionID: SessionID) { yield* shareNext.remove(sessionID) - yield* Effect.sync(() => SyncEvent.run(Session.Event.Updated, { sessionID, info: { share: { url: null } } })) + yield* sync.run(Session.Event.Updated, { sessionID, info: { share: { url: null } } }) }) const create = Effect.fn("SessionShare.create")(function* (input?: Parameters[0]) { @@ -57,6 +56,7 @@ export namespace SessionShare { Layer.provide(ShareNext.defaultLayer), Layer.provide(Session.defaultLayer), Layer.provide(Config.defaultLayer), + Layer.provide(SyncEvent.defaultLayer), ) const { runPromise } = makeRuntime(Service, defaultLayer) diff --git a/packages/opencode/src/sync/index.ts b/packages/opencode/src/sync/index.ts index 7da809cc30..5737d98ef9 100644 --- a/packages/opencode/src/sync/index.ts +++ b/packages/opencode/src/sync/index.ts @@ -2,8 +2,8 @@ import z from "zod" import type { ZodObject } from "zod" import { EventEmitter } from "events" import { Context, Effect, Layer } from "effect" -import { makeRuntime } from "@/effect/run-service" import { InstanceState } from "@/effect/instance-state" +import { makeRuntime } from "@/effect/run-service" import { Database, eq } from "@/storage/db" import { Bus as ProjectBus } from "@/bus" import { BusEvent } from "@/bus/bus-event" @@ -42,6 +42,9 @@ export namespace SyncEvent { export const registry = new Map() const versions = new Map() let frozen = false + let projectors: Map | undefined + let convert: Convert = (_, data) => data as Record + const bus = new EventEmitter<{ event: [Payload] }>() export interface Interface { readonly reset: () => Effect.Effect @@ -62,10 +65,6 @@ export namespace SyncEvent { export const layer = Layer.effect( Service, Effect.gen(function* () { - let projectors: Map | undefined - let convert: Convert = (_, data) => data as Record - const bus = new EventEmitter<{ event: [Payload] }>() - const reset = Effect.fn("SyncEvent.reset")(() => Effect.sync(() => { frozen = false @@ -94,13 +93,7 @@ export namespace SyncEvent { }), ) - const process = Effect.fn("SyncEvent.process")(function* ( - def: Def, - event: Event, - options: { publish: boolean }, - ) { - const ctx = yield* InstanceState.context - + function process(def: Def, event: Event, options: { publish: boolean }) { if (projectors == null) { throw new Error("No projectors available. Call `SyncEvent.init` to install projectors") } @@ -110,57 +103,55 @@ export namespace SyncEvent { throw new Error(`Projector not found for event: ${def.type}`) } - yield* Effect.sync(() => { - // idempotent: need to ignore any events already logged - Database.transaction((tx) => { - projector(tx, event.data) + // idempotent: need to ignore any events already logged + Database.transaction((tx) => { + projector(tx, event.data) - if (Flag.OPENCODE_EXPERIMENTAL_WORKSPACES) { - tx.insert(EventSequenceTable) - .values({ - aggregate_id: event.aggregateID, - seq: event.seq, - }) - .onConflictDoUpdate({ - target: EventSequenceTable.aggregate_id, - set: { seq: event.seq }, - }) - .run() - tx.insert(EventTable) - .values({ - id: event.id, - seq: event.seq, - aggregate_id: event.aggregateID, - type: versionedType(def.type, def.version), - data: event.data as Record, - }) - .run() - } - - Database.effect(() => { - Instance.restore(ctx, () => { - bus.emit("event", { def, event }) - - if (!options.publish) return - - const result = convert(def.type, event.data) - if (result instanceof Promise) { - void result.then((data) => { - Instance.restore(ctx, () => { - void ProjectBus.publish({ type: def.type, properties: def.schema }, data) - }) - }) - return - } - - void ProjectBus.publish({ type: def.type, properties: def.schema }, result) + if (Flag.OPENCODE_EXPERIMENTAL_WORKSPACES) { + tx.insert(EventSequenceTable) + .values({ + aggregate_id: event.aggregateID, + seq: event.seq, }) - }) - }) - }) - }) + .onConflictDoUpdate({ + target: EventSequenceTable.aggregate_id, + set: { seq: event.seq }, + }) + .run() + tx.insert(EventTable) + .values({ + id: event.id, + seq: event.seq, + aggregate_id: event.aggregateID, + type: versionedType(def.type, def.version), + data: event.data as Record, + }) + .run() + } - const replay = Effect.fn("SyncEvent.replay")(function* (event: SerializedEvent, options?: { publish: boolean }) { + Database.effect( + InstanceState.bind(() => { + bus.emit("event", { def, event }) + + if (!options.publish) return + + const result = convert(def.type, event.data) + if (result instanceof Promise) { + void result.then( + InstanceState.bind((data) => { + void ProjectBus.publish({ type: def.type, properties: def.schema }, data) + }), + ) + return + } + + void ProjectBus.publish({ type: def.type, properties: def.schema }, result) + }), + ) + }) + } + + function replay(event: SerializedEvent, options?: { publish: boolean }) { const def = registry.get(event.type) if (!def) { throw new Error(`Unknown event type: ${event.type}`) @@ -184,14 +175,10 @@ export namespace SyncEvent { ) } - yield* process(def, event, { publish: !!options?.publish }) - }) + process(def, event, { publish: !!options?.publish }) + } - const run = Effect.fn("SyncEvent.run")(function* ( - def: Def, - data: Event["data"], - options?: { publish?: boolean }, - ) { + function run(def: Def, data: Event["data"], options?: { publish?: boolean }) { const agg = (data as Record)[def.aggregate] if (agg == null) { throw new Error(`SyncEvent.run: "${def.aggregate}" required but not found: ${JSON.stringify(data)}`) @@ -203,45 +190,39 @@ export namespace SyncEvent { const publish = options?.publish ?? true - yield* Effect.sync(() => { - // Note that this is an "immediate" transaction which is critical. - // We need to make sure we can safely read and write with nothing - // else changing the data from under us - Database.transaction( - (tx) => { - const id = EventID.ascending() - const row = tx - .select({ seq: EventSequenceTable.seq }) - .from(EventSequenceTable) - .where(eq(EventSequenceTable.aggregate_id, agg)) - .get() - const seq = row?.seq != null ? row.seq + 1 : 0 + // Note that this is an "immediate" transaction which is critical. + // We need to make sure we can safely read and write with nothing + // else changing the data from under us + Database.transaction( + (tx) => { + const id = EventID.ascending() + const row = tx + .select({ seq: EventSequenceTable.seq }) + .from(EventSequenceTable) + .where(eq(EventSequenceTable.aggregate_id, agg)) + .get() + const seq = row?.seq != null ? row.seq + 1 : 0 - const event = { id, seq, aggregateID: agg, data } - Effect.runSync(process(def, event, { publish })) - }, - { - behavior: "immediate", - }, - ) + const event = { id, seq, aggregateID: agg, data } + process(def, event, { publish }) + }, + { + behavior: "immediate", + }, + ) + } + + function remove(aggregateID: string) { + Database.transaction((tx) => { + tx.delete(EventSequenceTable).where(eq(EventSequenceTable.aggregate_id, aggregateID)).run() + tx.delete(EventTable).where(eq(EventTable.aggregate_id, aggregateID)).run() }) - }) + } - const remove = Effect.fn("SyncEvent.remove")((aggregateID: string) => - Effect.sync(() => { - Database.transaction((tx) => { - tx.delete(EventSequenceTable).where(eq(EventSequenceTable.aggregate_id, aggregateID)).run() - tx.delete(EventTable).where(eq(EventTable.aggregate_id, aggregateID)).run() - }) - }), - ) - - const subscribeAll = Effect.fn("SyncEvent.subscribeAll")((handler: (event: Payload) => void) => - Effect.sync(() => { - bus.on("event", handler) - return () => bus.off("event", handler) - }), - ) + function subscribeAll(handler: (event: Payload) => void) { + bus.on("event", handler) + return () => bus.off("event", handler) + } function payloads() { return z @@ -266,7 +247,25 @@ export namespace SyncEvent { }) } - return Service.of({ reset, init, replay, run, remove, subscribeAll, payloads }) + return Service.of({ + reset, + init, + replay: (event, options) => + Effect.gen(function* () { + const ctx = yield* InstanceState.context + return yield* Effect.sync(() => Instance.restore(ctx, () => replay(event, options))) + }), + run: (def, data, options) => + options?.publish === false + ? Effect.sync(() => run(def, data, options)) + : Effect.gen(function* () { + const ctx = yield* InstanceState.context + return yield* Effect.sync(() => Instance.restore(ctx, () => run(def, data, options))) + }), + remove: (aggregateID) => Effect.sync(() => remove(aggregateID)), + subscribeAll: (handler) => Effect.sync(() => subscribeAll(handler)), + payloads, + }) }), )