fix(sync): restore instance context for deferred publish

This commit is contained in:
Kit Langton
2026-04-10 23:56:13 -04:00
parent dbc136d3b3
commit 397adb4190

View File

@@ -3,12 +3,14 @@ import type { ZodObject } from "zod"
import { EventEmitter } from "events"
import { Context, Effect, Layer } from "effect"
import { makeRuntime } from "@/effect/run-service"
import { InstanceState } from "@/effect/instance-state"
import { Database, eq } from "@/storage/db"
import { Bus as ProjectBus } from "@/bus"
import { BusEvent } from "@/bus/bus-event"
import { EventSequenceTable, EventTable } from "./event.sql"
import { EventID } from "./schema"
import { Flag } from "@/flag/flag"
import { Instance } from "@/project/instance"
export namespace SyncEvent {
type Convert = (type: string, event: Event["data"]) => Promise<Record<string, unknown>> | Record<string, unknown>
@@ -97,6 +99,8 @@ export namespace SyncEvent {
event: Event<Def>,
options: { publish: boolean },
) {
const ctx = yield* InstanceState.context
if (projectors == null) {
throw new Error("No projectors available. Call `SyncEvent.init` to install projectors")
}
@@ -134,19 +138,23 @@ export namespace SyncEvent {
}
Database.effect(() => {
bus.emit("event", { def, event })
Instance.restore(ctx, () => {
bus.emit("event", { def, event })
if (!options.publish) return
if (!options.publish) return
const result = convert(def.type, event.data)
if (result instanceof Promise) {
void result.then((data) => {
void ProjectBus.publish({ type: def.type, properties: def.schema }, data)
})
return
}
const result = convert(def.type, event.data)
if (result instanceof Promise) {
void result.then((data) => {
Instance.restore(ctx, () => {
void ProjectBus.publish({ type: def.type, properties: def.schema }, data)
})
})
return
}
void ProjectBus.publish({ type: def.type, properties: def.schema }, result)
void ProjectBus.publish({ type: def.type, properties: def.schema }, result)
})
})
})
})