mirror of
https://fastgit.cc/https://github.com/anomalyco/opencode
synced 2026-05-03 07:11:31 +08:00
Compare commits
4 Commits
chore-rm-l
...
kit/sessio
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
53e1d7b8bc | ||
|
|
25c30e06d8 | ||
|
|
96039bdb83 | ||
|
|
3085279724 |
@@ -1,16 +1,31 @@
|
||||
import z from "zod"
|
||||
import type { ZodType } from "zod"
|
||||
import { Schema, Types } from "effect"
|
||||
import { zod } from "@/util/effect-zod"
|
||||
|
||||
export type Definition = ReturnType<typeof define>
|
||||
|
||||
const registry = new Map<string, Definition>()
|
||||
|
||||
export function define<Type extends string, Properties extends ZodType>(type: Type, properties: Properties) {
|
||||
const result = {
|
||||
type,
|
||||
properties,
|
||||
}
|
||||
registry.set(type, result)
|
||||
/**
|
||||
* Define a bus event type with a payload schema.
|
||||
*
|
||||
* Accepts either a Zod schema or an Effect Schema. Effect Schemas are
|
||||
* converted to Zod internally via the effect-zod walker so that the bus
|
||||
* continues to use Zod as the lingua franca for serialization/validation.
|
||||
*/
|
||||
export function define<Type extends string, P extends Schema.Top>(
|
||||
type: Type,
|
||||
properties: P,
|
||||
): { type: Type; properties: z.ZodType<Types.DeepMutable<Schema.Schema.Type<P>>> }
|
||||
export function define<Type extends string, P extends ZodType>(
|
||||
type: Type,
|
||||
properties: P,
|
||||
): { type: Type; properties: P }
|
||||
export function define(type: string, properties: unknown) {
|
||||
const zodProperties = Schema.isSchema(properties) ? zod(properties) : (properties as ZodType)
|
||||
const result = { type, properties: zodProperties }
|
||||
registry.set(type, result as Definition)
|
||||
return result
|
||||
}
|
||||
|
||||
|
||||
@@ -1,16 +1,18 @@
|
||||
import z from "zod"
|
||||
import { Schema } from "effect"
|
||||
import sessionProjectors from "../session/projectors"
|
||||
import { SyncEvent } from "@/sync"
|
||||
import { Session } from "@/session"
|
||||
import { SessionTable } from "@/session/session.sql"
|
||||
import { Database, eq } from "@/storage"
|
||||
|
||||
const isSessionUpdated = Schema.is(Session.Event.Updated.schema)
|
||||
|
||||
export function initProjectors() {
|
||||
SyncEvent.init({
|
||||
projectors: sessionProjectors,
|
||||
convertEvent: (type, data) => {
|
||||
if (type === "session.updated") {
|
||||
const id = (data as z.infer<typeof Session.Event.Updated.schema>).sessionID
|
||||
if (type === Session.Event.Updated.type && isSessionUpdated(data)) {
|
||||
const id = data.sessionID
|
||||
const row = Database.use((db) => db.select().from(SessionTable).where(eq(SessionTable.id, id)).get())
|
||||
|
||||
if (!row) return data
|
||||
|
||||
@@ -581,48 +581,48 @@ export const Event = {
|
||||
type: "message.updated",
|
||||
version: 1,
|
||||
aggregate: "sessionID",
|
||||
schema: z.object({
|
||||
sessionID: SessionID.zod,
|
||||
info: Info.zod,
|
||||
schema: Schema.Struct({
|
||||
sessionID: SessionID,
|
||||
info: _Info,
|
||||
}),
|
||||
}),
|
||||
Removed: SyncEvent.define({
|
||||
type: "message.removed",
|
||||
version: 1,
|
||||
aggregate: "sessionID",
|
||||
schema: z.object({
|
||||
sessionID: SessionID.zod,
|
||||
messageID: MessageID.zod,
|
||||
schema: Schema.Struct({
|
||||
sessionID: SessionID,
|
||||
messageID: MessageID,
|
||||
}),
|
||||
}),
|
||||
PartUpdated: SyncEvent.define({
|
||||
type: "message.part.updated",
|
||||
version: 1,
|
||||
aggregate: "sessionID",
|
||||
schema: z.object({
|
||||
sessionID: SessionID.zod,
|
||||
part: Part.zod,
|
||||
time: z.number(),
|
||||
schema: Schema.Struct({
|
||||
sessionID: SessionID,
|
||||
part: _Part,
|
||||
time: Schema.Number,
|
||||
}),
|
||||
}),
|
||||
PartDelta: BusEvent.define(
|
||||
"message.part.delta",
|
||||
z.object({
|
||||
sessionID: SessionID.zod,
|
||||
messageID: MessageID.zod,
|
||||
partID: PartID.zod,
|
||||
field: z.string(),
|
||||
delta: z.string(),
|
||||
Schema.Struct({
|
||||
sessionID: SessionID,
|
||||
messageID: MessageID,
|
||||
partID: PartID,
|
||||
field: Schema.String,
|
||||
delta: Schema.String,
|
||||
}),
|
||||
),
|
||||
PartRemoved: SyncEvent.define({
|
||||
type: "message.part.removed",
|
||||
version: 1,
|
||||
aggregate: "sessionID",
|
||||
schema: z.object({
|
||||
sessionID: SessionID.zod,
|
||||
messageID: MessageID.zod,
|
||||
partID: PartID.zod,
|
||||
schema: Schema.Struct({
|
||||
sessionID: SessionID,
|
||||
messageID: MessageID,
|
||||
partID: PartID,
|
||||
}),
|
||||
}),
|
||||
}
|
||||
|
||||
@@ -28,7 +28,7 @@ import type { Provider } from "@/provider"
|
||||
import { Permission } from "@/permission"
|
||||
import { Global } from "@/global"
|
||||
import { Effect, Layer, Option, Context, Schema, Types } from "effect"
|
||||
import { zod, zodObject } from "@/util/effect-zod"
|
||||
import { fromZod, zod, zodObject } from "@/util/effect-zod"
|
||||
import { withStatics } from "@/util/schema"
|
||||
|
||||
const log = Log.create({ service: "session" })
|
||||
@@ -215,40 +215,44 @@ export const MessagesInput = Schema.Struct({
|
||||
limit: Schema.optional(Schema.Number),
|
||||
}).pipe(withStatics((s) => ({ zod: zod(s) })))
|
||||
|
||||
const SessionUpdateInfoSchema = fromZod(
|
||||
updateSchema(zodObject(Info)).extend({
|
||||
share: updateSchema(zodObject(Share)).optional(),
|
||||
time: updateSchema(zodObject(Time)).optional(),
|
||||
}),
|
||||
)
|
||||
|
||||
export const Event = {
|
||||
Created: SyncEvent.define({
|
||||
type: "session.created",
|
||||
version: 1,
|
||||
aggregate: "sessionID",
|
||||
schema: z.object({
|
||||
sessionID: SessionID.zod,
|
||||
info: Info.zod,
|
||||
}),
|
||||
schema: {
|
||||
sessionID: SessionID,
|
||||
info: Info,
|
||||
},
|
||||
}),
|
||||
Updated: SyncEvent.define({
|
||||
type: "session.updated",
|
||||
version: 1,
|
||||
aggregate: "sessionID",
|
||||
schema: z.object({
|
||||
sessionID: SessionID.zod,
|
||||
info: updateSchema(zodObject(Info)).extend({
|
||||
share: updateSchema(zodObject(Share)).optional(),
|
||||
time: updateSchema(zodObject(Time)).optional(),
|
||||
}),
|
||||
}),
|
||||
busSchema: z.object({
|
||||
sessionID: SessionID.zod,
|
||||
info: Info.zod,
|
||||
}),
|
||||
schema: {
|
||||
sessionID: SessionID,
|
||||
info: SessionUpdateInfoSchema,
|
||||
},
|
||||
busSchema: {
|
||||
sessionID: SessionID,
|
||||
info: Info,
|
||||
},
|
||||
}),
|
||||
Deleted: SyncEvent.define({
|
||||
type: "session.deleted",
|
||||
version: 1,
|
||||
aggregate: "sessionID",
|
||||
schema: z.object({
|
||||
sessionID: SessionID.zod,
|
||||
info: Info.zod,
|
||||
}),
|
||||
schema: {
|
||||
sessionID: SessionID,
|
||||
info: Info,
|
||||
},
|
||||
}),
|
||||
Diff: BusEvent.define(
|
||||
"session.diff",
|
||||
@@ -394,7 +398,7 @@ export interface Interface {
|
||||
|
||||
export class Service extends Context.Service<Service, Interface>()("@opencode/Session") {}
|
||||
|
||||
type Patch = z.infer<typeof Event.Updated.schema>["info"]
|
||||
type Patch = Schema.Schema.Type<typeof Event.Updated.schema>["info"]
|
||||
|
||||
const db = <T>(fn: (d: Parameters<typeof Database.use>[0] extends (trx: infer D) => any ? D : never) => T) =>
|
||||
Effect.sync(() => Database.use(fn))
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
import z from "zod"
|
||||
import type { ZodObject } from "zod"
|
||||
import { Schema, Types } from "effect"
|
||||
import { Database, eq } from "@/storage"
|
||||
import { GlobalBus } from "@/bus/global"
|
||||
import { Bus as ProjectBus } from "@/bus"
|
||||
@@ -9,23 +9,36 @@ import { EventSequenceTable, EventTable } from "./event.sql"
|
||||
import { WorkspaceContext } from "@/control-plane/workspace-context"
|
||||
import { EventID } from "./schema"
|
||||
import { Flag } from "@/flag/flag"
|
||||
import { zod } from "@/util/effect-zod"
|
||||
|
||||
type StructLike<Fields extends Schema.Struct.Fields> = Fields | Schema.Struct<Fields>
|
||||
|
||||
export type Definition = {
|
||||
type: string
|
||||
version: number
|
||||
aggregate: string
|
||||
schema: z.ZodObject
|
||||
|
||||
// This is temporary and only exists for compatibility with bus
|
||||
// event definitions
|
||||
properties: z.ZodObject
|
||||
schema: Schema.Top
|
||||
busSchema: Schema.Top
|
||||
properties: z.ZodTypeAny
|
||||
}
|
||||
|
||||
type MutableType<S extends Schema.Top> = Types.DeepMutable<Schema.Schema.Type<S>>
|
||||
|
||||
type DefinedEvent<Type extends string, Agg extends string, SchemaDef extends Schema.Top, BusDef extends Schema.Top> = Definition & {
|
||||
type: Type
|
||||
aggregate: Agg
|
||||
schema: SchemaDef
|
||||
busSchema: BusDef
|
||||
properties: z.ZodType<MutableType<BusDef>>
|
||||
}
|
||||
|
||||
type Data<Def extends Definition> = MutableType<Def["schema"]>
|
||||
|
||||
export type Event<Def extends Definition = Definition> = {
|
||||
id: string
|
||||
seq: number
|
||||
aggregateID: string
|
||||
data: z.infer<Def["schema"]>
|
||||
data: Data<Def>
|
||||
}
|
||||
|
||||
export type SerializedEvent<Def extends Definition = Definition> = Event<Def> & { type: string }
|
||||
@@ -54,7 +67,7 @@ export function init(input: { projectors: Array<[Definition, ProjectorFunc]>; co
|
||||
for (let [type, version] of versions.entries()) {
|
||||
let def = registry.get(versionedType(type, version))!
|
||||
|
||||
BusEvent.define(def.type, def.properties || def.schema)
|
||||
BusEvent.define(def.type, def.properties)
|
||||
}
|
||||
|
||||
// Freeze the system so it clearly errors if events are defined
|
||||
@@ -69,22 +82,37 @@ export function versionedType(type: string, version?: number) {
|
||||
return version ? `${type}.${version}` : type
|
||||
}
|
||||
|
||||
function struct<Fields extends Schema.Struct.Fields>(value: StructLike<Fields>) {
|
||||
return (Schema.isSchema(value) ? value : Schema.Struct(value as Fields)) as Schema.Struct<Fields>
|
||||
}
|
||||
|
||||
export function define<
|
||||
Type extends string,
|
||||
Agg extends string,
|
||||
Schema extends ZodObject<Record<Agg, z.ZodType<string>>>,
|
||||
BusSchema extends ZodObject = Schema,
|
||||
>(input: { type: Type; version: number; aggregate: Agg; schema: Schema; busSchema?: BusSchema }) {
|
||||
Fields extends Schema.Struct.Fields & Record<Agg, Schema.Top>,
|
||||
BusFields extends Schema.Struct.Fields = Fields,
|
||||
>(input: {
|
||||
type: Type
|
||||
version: number
|
||||
aggregate: Agg
|
||||
schema: StructLike<Fields>
|
||||
busSchema?: StructLike<BusFields>
|
||||
}): DefinedEvent<Type, Agg, Schema.Struct<Fields>, Schema.Struct<BusFields>> {
|
||||
if (frozen) {
|
||||
throw new Error("Error defining sync event: sync system has been frozen")
|
||||
}
|
||||
|
||||
const def = {
|
||||
const schema = struct(input.schema)
|
||||
const busSchema = (input.busSchema ? struct(input.busSchema) : schema) as Schema.Struct<BusFields>
|
||||
const properties = zod(busSchema) as unknown as z.ZodType<MutableType<typeof busSchema>>
|
||||
|
||||
const def: DefinedEvent<Type, Agg, typeof schema, typeof busSchema> = {
|
||||
type: input.type,
|
||||
version: input.version,
|
||||
aggregate: input.aggregate,
|
||||
schema: input.schema,
|
||||
properties: input.busSchema ? input.busSchema : input.schema,
|
||||
schema,
|
||||
busSchema,
|
||||
properties,
|
||||
}
|
||||
|
||||
versions.set(def.type, Math.max(def.version, versions.get(def.type) || 0))
|
||||
@@ -143,10 +171,10 @@ function process<Def extends Definition>(def: Def, event: Event<Def>, options: {
|
||||
const result = convertEvent(def.type, event.data)
|
||||
if (result instanceof Promise) {
|
||||
void result.then((data) => {
|
||||
void ProjectBus.publish({ type: def.type, properties: def.schema }, data)
|
||||
void ProjectBus.publish({ type: def.type, properties: def.properties }, data)
|
||||
})
|
||||
} else {
|
||||
void ProjectBus.publish({ type: def.type, properties: def.schema }, result)
|
||||
void ProjectBus.publish({ type: def.type, properties: def.properties }, result)
|
||||
}
|
||||
|
||||
GlobalBus.emit("event", {
|
||||
@@ -266,7 +294,7 @@ export function payloads() {
|
||||
id: z.string(),
|
||||
seq: z.number(),
|
||||
aggregateID: z.literal(def.aggregate),
|
||||
data: def.schema,
|
||||
data: zod(def.schema),
|
||||
})
|
||||
.meta({
|
||||
ref: `SyncEvent.${def.type}`,
|
||||
|
||||
@@ -49,6 +49,14 @@ function isZodType(value: unknown): value is z.ZodTypeAny {
|
||||
return typeof value === "object" && value !== null && "_zod" in value
|
||||
}
|
||||
|
||||
// Bridge a Zod-first schema into Effect Schema while preserving the original
|
||||
// Zod for downstream JSON Schema/OpenAPI generation.
|
||||
export function fromZod<T extends z.ZodTypeAny>(value: T) {
|
||||
return Schema.declare((input): input is z.output<T> => value.safeParse(input).success).annotate({
|
||||
[ZodOverride]: value,
|
||||
})
|
||||
}
|
||||
|
||||
function walk(ast: SchemaAST.AST): z.ZodTypeAny {
|
||||
const cached = walkCache.get(ast)
|
||||
if (cached) return cached
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
import { afterEach, describe, expect, test } from "bun:test"
|
||||
import { Schema } from "effect"
|
||||
import z from "zod"
|
||||
import { Bus } from "../../src/bus"
|
||||
import { BusEvent } from "../../src/bus/bus-event"
|
||||
@@ -10,6 +11,8 @@ const TestEvent = {
|
||||
Pong: BusEvent.define("test.pong", z.object({ message: z.string() })),
|
||||
}
|
||||
|
||||
const EffectTestEvent = BusEvent.define("test.effect-schema.ping", Schema.Struct({ value: Schema.Number }))
|
||||
|
||||
function withInstance(directory: string, fn: () => Promise<void>) {
|
||||
return Instance.provide({ directory, fn })
|
||||
}
|
||||
@@ -76,6 +79,22 @@ describe("Bus", () => {
|
||||
await Bus.publish(TestEvent.Ping, { value: 1 })
|
||||
})
|
||||
})
|
||||
|
||||
test("accepts Effect Schema event definitions", async () => {
|
||||
await using tmp = await tmpdir()
|
||||
const received: number[] = []
|
||||
|
||||
await withInstance(tmp.path, async () => {
|
||||
Bus.subscribe(EffectTestEvent, (evt) => {
|
||||
received.push(evt.properties.value)
|
||||
})
|
||||
await Bun.sleep(10)
|
||||
await Bus.publish(EffectTestEvent, { value: 42 })
|
||||
await Bun.sleep(10)
|
||||
})
|
||||
|
||||
expect(received).toEqual([42])
|
||||
})
|
||||
})
|
||||
|
||||
describe("unsubscribe", () => {
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
import { describe, test, expect, beforeEach, afterEach, afterAll } from "bun:test"
|
||||
import { Schema } from "effect"
|
||||
import { tmpdir } from "../fixture/fixture"
|
||||
import z from "zod"
|
||||
import { Bus } from "../../src/bus"
|
||||
@@ -43,13 +44,13 @@ describe("SyncEvent", () => {
|
||||
type: "item.created",
|
||||
version: 1,
|
||||
aggregate: "id",
|
||||
schema: z.object({ id: z.string(), name: z.string() }),
|
||||
schema: { id: Schema.String, name: Schema.String },
|
||||
})
|
||||
const Sent = SyncEvent.define({
|
||||
type: "item.sent",
|
||||
version: 1,
|
||||
aggregate: "item_id",
|
||||
schema: z.object({ item_id: z.string(), to: z.string() }),
|
||||
schema: { item_id: Schema.String, to: Schema.String },
|
||||
})
|
||||
|
||||
SyncEvent.init({
|
||||
@@ -128,6 +129,51 @@ describe("SyncEvent", () => {
|
||||
})
|
||||
}),
|
||||
)
|
||||
|
||||
test(
|
||||
"accepts Effect Schema event definitions",
|
||||
withInstance(async () => {
|
||||
SyncEvent.reset()
|
||||
try {
|
||||
const Created = SyncEvent.define({
|
||||
type: "item.effect.created",
|
||||
version: 1,
|
||||
aggregate: "id",
|
||||
schema: { id: Schema.String, name: Schema.String },
|
||||
})
|
||||
|
||||
SyncEvent.init({
|
||||
projectors: [SyncEvent.project(Created, () => {})],
|
||||
})
|
||||
|
||||
const events: Array<{
|
||||
type: string
|
||||
properties: { id: string; name: string }
|
||||
}> = []
|
||||
const received = new Promise<void>((resolve) => {
|
||||
Bus.subscribeAll((event) => {
|
||||
events.push(event)
|
||||
resolve()
|
||||
})
|
||||
})
|
||||
|
||||
SyncEvent.run(Created, { id: "evt_1", name: "schema" })
|
||||
|
||||
await received
|
||||
expect(events).toEqual([
|
||||
{
|
||||
type: "item.effect.created",
|
||||
properties: {
|
||||
id: "evt_1",
|
||||
name: "schema",
|
||||
},
|
||||
},
|
||||
])
|
||||
} finally {
|
||||
setup()
|
||||
}
|
||||
}),
|
||||
)
|
||||
})
|
||||
|
||||
describe("replay", () => {
|
||||
|
||||
Reference in New Issue
Block a user