Compare commits

...

4 Commits

Author SHA1 Message Date
Kit Langton
53e1d7b8bc refactor(core): replace sync event cast with Schema guard
Use Schema.is to narrow converted sync events instead of asserting the payload type manually. Move the Zod-to-Effect bridge into effect-zod so remaining Zod-backed sync payloads have one explicit interop helper.
2026-04-23 11:44:23 -04:00
Kit Langton
25c30e06d8 refactor(core): make SyncEvent schema-first
Store sync event definitions as Effect Schema and derive Zod only at the bus and OpenAPI edges. Bridge the remaining Zod-native session payloads through Schema annotations so the sync layer no longer needs mixed-schema definition helpers.
2026-04-23 11:41:59 -04:00
Kit Langton
96039bdb83 test(core): cover Effect Schema event definitions
Exercise the new SyncEvent and BusEvent Schema overloads end to end and use Schema.isSchema for the conversion check so the mixed-schema path stays explicit and covered.
2026-04-23 11:38:02 -04:00
Kit Langton
3085279724 refactor(core): allow SyncEvent.define and BusEvent.define to accept Effect Schema
Overloads BusEvent.define and SyncEvent.define so payload schemas can be passed as Effect Schema values directly. Effect Schemas are converted to Zod via the effect-zod walker since the sync/bus pipelines still use Zod internally. Migrates MessageV2.Event.* to use Schema.Struct directly instead of z.object with .zod references.
2026-04-23 11:38:01 -04:00
8 changed files with 191 additions and 69 deletions

View File

@@ -1,16 +1,31 @@
import z from "zod"
import type { ZodType } from "zod"
import { Schema, Types } from "effect"
import { zod } from "@/util/effect-zod"
export type Definition = ReturnType<typeof define>
const registry = new Map<string, Definition>()
export function define<Type extends string, Properties extends ZodType>(type: Type, properties: Properties) {
const result = {
type,
properties,
}
registry.set(type, result)
/**
* Define a bus event type with a payload schema.
*
* Accepts either a Zod schema or an Effect Schema. Effect Schemas are
* converted to Zod internally via the effect-zod walker so that the bus
* continues to use Zod as the lingua franca for serialization/validation.
*/
export function define<Type extends string, P extends Schema.Top>(
type: Type,
properties: P,
): { type: Type; properties: z.ZodType<Types.DeepMutable<Schema.Schema.Type<P>>> }
export function define<Type extends string, P extends ZodType>(
type: Type,
properties: P,
): { type: Type; properties: P }
export function define(type: string, properties: unknown) {
const zodProperties = Schema.isSchema(properties) ? zod(properties) : (properties as ZodType)
const result = { type, properties: zodProperties }
registry.set(type, result as Definition)
return result
}

View File

@@ -1,16 +1,18 @@
import z from "zod"
import { Schema } from "effect"
import sessionProjectors from "../session/projectors"
import { SyncEvent } from "@/sync"
import { Session } from "@/session"
import { SessionTable } from "@/session/session.sql"
import { Database, eq } from "@/storage"
const isSessionUpdated = Schema.is(Session.Event.Updated.schema)
export function initProjectors() {
SyncEvent.init({
projectors: sessionProjectors,
convertEvent: (type, data) => {
if (type === "session.updated") {
const id = (data as z.infer<typeof Session.Event.Updated.schema>).sessionID
if (type === Session.Event.Updated.type && isSessionUpdated(data)) {
const id = data.sessionID
const row = Database.use((db) => db.select().from(SessionTable).where(eq(SessionTable.id, id)).get())
if (!row) return data

View File

@@ -581,48 +581,48 @@ export const Event = {
type: "message.updated",
version: 1,
aggregate: "sessionID",
schema: z.object({
sessionID: SessionID.zod,
info: Info.zod,
schema: Schema.Struct({
sessionID: SessionID,
info: _Info,
}),
}),
Removed: SyncEvent.define({
type: "message.removed",
version: 1,
aggregate: "sessionID",
schema: z.object({
sessionID: SessionID.zod,
messageID: MessageID.zod,
schema: Schema.Struct({
sessionID: SessionID,
messageID: MessageID,
}),
}),
PartUpdated: SyncEvent.define({
type: "message.part.updated",
version: 1,
aggregate: "sessionID",
schema: z.object({
sessionID: SessionID.zod,
part: Part.zod,
time: z.number(),
schema: Schema.Struct({
sessionID: SessionID,
part: _Part,
time: Schema.Number,
}),
}),
PartDelta: BusEvent.define(
"message.part.delta",
z.object({
sessionID: SessionID.zod,
messageID: MessageID.zod,
partID: PartID.zod,
field: z.string(),
delta: z.string(),
Schema.Struct({
sessionID: SessionID,
messageID: MessageID,
partID: PartID,
field: Schema.String,
delta: Schema.String,
}),
),
PartRemoved: SyncEvent.define({
type: "message.part.removed",
version: 1,
aggregate: "sessionID",
schema: z.object({
sessionID: SessionID.zod,
messageID: MessageID.zod,
partID: PartID.zod,
schema: Schema.Struct({
sessionID: SessionID,
messageID: MessageID,
partID: PartID,
}),
}),
}

View File

@@ -28,7 +28,7 @@ import type { Provider } from "@/provider"
import { Permission } from "@/permission"
import { Global } from "@/global"
import { Effect, Layer, Option, Context, Schema, Types } from "effect"
import { zod, zodObject } from "@/util/effect-zod"
import { fromZod, zod, zodObject } from "@/util/effect-zod"
import { withStatics } from "@/util/schema"
const log = Log.create({ service: "session" })
@@ -215,40 +215,44 @@ export const MessagesInput = Schema.Struct({
limit: Schema.optional(Schema.Number),
}).pipe(withStatics((s) => ({ zod: zod(s) })))
const SessionUpdateInfoSchema = fromZod(
updateSchema(zodObject(Info)).extend({
share: updateSchema(zodObject(Share)).optional(),
time: updateSchema(zodObject(Time)).optional(),
}),
)
export const Event = {
Created: SyncEvent.define({
type: "session.created",
version: 1,
aggregate: "sessionID",
schema: z.object({
sessionID: SessionID.zod,
info: Info.zod,
}),
schema: {
sessionID: SessionID,
info: Info,
},
}),
Updated: SyncEvent.define({
type: "session.updated",
version: 1,
aggregate: "sessionID",
schema: z.object({
sessionID: SessionID.zod,
info: updateSchema(zodObject(Info)).extend({
share: updateSchema(zodObject(Share)).optional(),
time: updateSchema(zodObject(Time)).optional(),
}),
}),
busSchema: z.object({
sessionID: SessionID.zod,
info: Info.zod,
}),
schema: {
sessionID: SessionID,
info: SessionUpdateInfoSchema,
},
busSchema: {
sessionID: SessionID,
info: Info,
},
}),
Deleted: SyncEvent.define({
type: "session.deleted",
version: 1,
aggregate: "sessionID",
schema: z.object({
sessionID: SessionID.zod,
info: Info.zod,
}),
schema: {
sessionID: SessionID,
info: Info,
},
}),
Diff: BusEvent.define(
"session.diff",
@@ -394,7 +398,7 @@ export interface Interface {
export class Service extends Context.Service<Service, Interface>()("@opencode/Session") {}
type Patch = z.infer<typeof Event.Updated.schema>["info"]
type Patch = Schema.Schema.Type<typeof Event.Updated.schema>["info"]
const db = <T>(fn: (d: Parameters<typeof Database.use>[0] extends (trx: infer D) => any ? D : never) => T) =>
Effect.sync(() => Database.use(fn))

View File

@@ -1,5 +1,5 @@
import z from "zod"
import type { ZodObject } from "zod"
import { Schema, Types } from "effect"
import { Database, eq } from "@/storage"
import { GlobalBus } from "@/bus/global"
import { Bus as ProjectBus } from "@/bus"
@@ -9,23 +9,36 @@ import { EventSequenceTable, EventTable } from "./event.sql"
import { WorkspaceContext } from "@/control-plane/workspace-context"
import { EventID } from "./schema"
import { Flag } from "@/flag/flag"
import { zod } from "@/util/effect-zod"
type StructLike<Fields extends Schema.Struct.Fields> = Fields | Schema.Struct<Fields>
export type Definition = {
type: string
version: number
aggregate: string
schema: z.ZodObject
// This is temporary and only exists for compatibility with bus
// event definitions
properties: z.ZodObject
schema: Schema.Top
busSchema: Schema.Top
properties: z.ZodTypeAny
}
type MutableType<S extends Schema.Top> = Types.DeepMutable<Schema.Schema.Type<S>>
type DefinedEvent<Type extends string, Agg extends string, SchemaDef extends Schema.Top, BusDef extends Schema.Top> = Definition & {
type: Type
aggregate: Agg
schema: SchemaDef
busSchema: BusDef
properties: z.ZodType<MutableType<BusDef>>
}
type Data<Def extends Definition> = MutableType<Def["schema"]>
export type Event<Def extends Definition = Definition> = {
id: string
seq: number
aggregateID: string
data: z.infer<Def["schema"]>
data: Data<Def>
}
export type SerializedEvent<Def extends Definition = Definition> = Event<Def> & { type: string }
@@ -54,7 +67,7 @@ export function init(input: { projectors: Array<[Definition, ProjectorFunc]>; co
for (let [type, version] of versions.entries()) {
let def = registry.get(versionedType(type, version))!
BusEvent.define(def.type, def.properties || def.schema)
BusEvent.define(def.type, def.properties)
}
// Freeze the system so it clearly errors if events are defined
@@ -69,22 +82,37 @@ export function versionedType(type: string, version?: number) {
return version ? `${type}.${version}` : type
}
function struct<Fields extends Schema.Struct.Fields>(value: StructLike<Fields>) {
return (Schema.isSchema(value) ? value : Schema.Struct(value as Fields)) as Schema.Struct<Fields>
}
export function define<
Type extends string,
Agg extends string,
Schema extends ZodObject<Record<Agg, z.ZodType<string>>>,
BusSchema extends ZodObject = Schema,
>(input: { type: Type; version: number; aggregate: Agg; schema: Schema; busSchema?: BusSchema }) {
Fields extends Schema.Struct.Fields & Record<Agg, Schema.Top>,
BusFields extends Schema.Struct.Fields = Fields,
>(input: {
type: Type
version: number
aggregate: Agg
schema: StructLike<Fields>
busSchema?: StructLike<BusFields>
}): DefinedEvent<Type, Agg, Schema.Struct<Fields>, Schema.Struct<BusFields>> {
if (frozen) {
throw new Error("Error defining sync event: sync system has been frozen")
}
const def = {
const schema = struct(input.schema)
const busSchema = (input.busSchema ? struct(input.busSchema) : schema) as Schema.Struct<BusFields>
const properties = zod(busSchema) as unknown as z.ZodType<MutableType<typeof busSchema>>
const def: DefinedEvent<Type, Agg, typeof schema, typeof busSchema> = {
type: input.type,
version: input.version,
aggregate: input.aggregate,
schema: input.schema,
properties: input.busSchema ? input.busSchema : input.schema,
schema,
busSchema,
properties,
}
versions.set(def.type, Math.max(def.version, versions.get(def.type) || 0))
@@ -143,10 +171,10 @@ function process<Def extends Definition>(def: Def, event: Event<Def>, options: {
const result = convertEvent(def.type, event.data)
if (result instanceof Promise) {
void result.then((data) => {
void ProjectBus.publish({ type: def.type, properties: def.schema }, data)
void ProjectBus.publish({ type: def.type, properties: def.properties }, data)
})
} else {
void ProjectBus.publish({ type: def.type, properties: def.schema }, result)
void ProjectBus.publish({ type: def.type, properties: def.properties }, result)
}
GlobalBus.emit("event", {
@@ -266,7 +294,7 @@ export function payloads() {
id: z.string(),
seq: z.number(),
aggregateID: z.literal(def.aggregate),
data: def.schema,
data: zod(def.schema),
})
.meta({
ref: `SyncEvent.${def.type}`,

View File

@@ -49,6 +49,14 @@ function isZodType(value: unknown): value is z.ZodTypeAny {
return typeof value === "object" && value !== null && "_zod" in value
}
// Bridge a Zod-first schema into Effect Schema while preserving the original
// Zod for downstream JSON Schema/OpenAPI generation.
export function fromZod<T extends z.ZodTypeAny>(value: T) {
return Schema.declare((input): input is z.output<T> => value.safeParse(input).success).annotate({
[ZodOverride]: value,
})
}
function walk(ast: SchemaAST.AST): z.ZodTypeAny {
const cached = walkCache.get(ast)
if (cached) return cached

View File

@@ -1,4 +1,5 @@
import { afterEach, describe, expect, test } from "bun:test"
import { Schema } from "effect"
import z from "zod"
import { Bus } from "../../src/bus"
import { BusEvent } from "../../src/bus/bus-event"
@@ -10,6 +11,8 @@ const TestEvent = {
Pong: BusEvent.define("test.pong", z.object({ message: z.string() })),
}
const EffectTestEvent = BusEvent.define("test.effect-schema.ping", Schema.Struct({ value: Schema.Number }))
function withInstance(directory: string, fn: () => Promise<void>) {
return Instance.provide({ directory, fn })
}
@@ -76,6 +79,22 @@ describe("Bus", () => {
await Bus.publish(TestEvent.Ping, { value: 1 })
})
})
test("accepts Effect Schema event definitions", async () => {
await using tmp = await tmpdir()
const received: number[] = []
await withInstance(tmp.path, async () => {
Bus.subscribe(EffectTestEvent, (evt) => {
received.push(evt.properties.value)
})
await Bun.sleep(10)
await Bus.publish(EffectTestEvent, { value: 42 })
await Bun.sleep(10)
})
expect(received).toEqual([42])
})
})
describe("unsubscribe", () => {

View File

@@ -1,4 +1,5 @@
import { describe, test, expect, beforeEach, afterEach, afterAll } from "bun:test"
import { Schema } from "effect"
import { tmpdir } from "../fixture/fixture"
import z from "zod"
import { Bus } from "../../src/bus"
@@ -43,13 +44,13 @@ describe("SyncEvent", () => {
type: "item.created",
version: 1,
aggregate: "id",
schema: z.object({ id: z.string(), name: z.string() }),
schema: { id: Schema.String, name: Schema.String },
})
const Sent = SyncEvent.define({
type: "item.sent",
version: 1,
aggregate: "item_id",
schema: z.object({ item_id: z.string(), to: z.string() }),
schema: { item_id: Schema.String, to: Schema.String },
})
SyncEvent.init({
@@ -128,6 +129,51 @@ describe("SyncEvent", () => {
})
}),
)
test(
"accepts Effect Schema event definitions",
withInstance(async () => {
SyncEvent.reset()
try {
const Created = SyncEvent.define({
type: "item.effect.created",
version: 1,
aggregate: "id",
schema: { id: Schema.String, name: Schema.String },
})
SyncEvent.init({
projectors: [SyncEvent.project(Created, () => {})],
})
const events: Array<{
type: string
properties: { id: string; name: string }
}> = []
const received = new Promise<void>((resolve) => {
Bus.subscribeAll((event) => {
events.push(event)
resolve()
})
})
SyncEvent.run(Created, { id: "evt_1", name: "schema" })
await received
expect(events).toEqual([
{
type: "item.effect.created",
properties: {
id: "evt_1",
name: "schema",
},
},
])
} finally {
setup()
}
}),
)
})
describe("replay", () => {