Files
opencode/sync-routes.ts
James Long ba499fb405 wip
2026-04-14 11:24:34 -04:00

190 lines
5.5 KiB
TypeScript

import z from "zod"
import { Hono } from "hono"
import { describeRoute, validator, resolver } from "hono-openapi"
import { SyncEvent } from "@/sync"
import { Database, asc, and, not, or, lte, eq } from "@/storage/db"
import { EventTable } from "@/sync/event.sql"
import { Log } from "@/util/log"
import { lazy } from "@/util/lazy"
import { Instance } from "@/project/instance"
import { InstanceBootstrap } from "../../project/bootstrap"
import { errors } from "../error"
import { streamQueue } from "../stream-queue"
const log = Log.create({ service: "server" })
const ReplayEvent = z.object({
id: z.string(),
aggregateID: z.string(),
seq: z.number().int().min(0),
type: z.string(),
data: z.record(z.string(), z.unknown()),
})
export const SyncRoutes = lazy(() =>
new Hono()
.get(
"/event",
describeRoute({
summary: "Subscribe to sync events",
description: "Get sync events",
operationId: "sync.event",
responses: {
200: {
description: "Event stream",
content: {
"text/event-stream": {
schema: resolver(
z
.object({
payload: SyncEvent.payloads(),
})
.meta({
ref: "SyncEvent",
}),
),
},
},
},
},
}),
async (c) => {
log.info("sync event connected")
c.header("X-Accel-Buffering", "no")
c.header("X-Content-Type-Options", "nosniff")
return streamQueue(c, {
connect: (q) => {
log.info("sync event connected")
q.push(
JSON.stringify({
type: "server.connected",
properties: {},
}),
)
},
heartbeat: (q) => {
q.push(
JSON.stringify({
type: "server.heartbeat",
properties: {},
}),
)
},
subscribe: (q) => {
const unsub = SyncEvent.subscribeAll(({ def, event }) => {
q.push(JSON.stringify({ ...event, type: SyncEvent.versionedType(def.type, def.version) }))
})
return () => {
unsub()
log.info("sync event disconnected")
}
},
})
},
)
.post(
"/replay",
describeRoute({
summary: "Replay sync events",
description: "Validate and replay a complete sync event history.",
operationId: "global.sync-replay",
responses: {
200: {
description: "Replayed sync events",
content: {
"application/json": {
schema: resolver(
z.object({
sessionID: z.string(),
}),
),
},
},
},
...errors(400),
},
}),
validator(
"json",
z.object({
directory: z.string(),
events: z.array(ReplayEvent).min(1),
}),
),
async (c) => {
const body = c.req.valid("json")
const events = body.events
const source = events[0].aggregateID
if (events.some((item) => item.aggregateID !== source)) {
throw new Error("Replay events must belong to the same session")
}
for (const [i, item] of events.entries()) {
if (item.seq !== i) throw new Error(`Replay sequence mismatch at index ${i}: expected ${i}, got ${item.seq}`)
}
return Instance.provide({
directory: body.directory,
init: InstanceBootstrap,
async fn() {
for (const item of events) {
SyncEvent.replay(item)
}
return c.json({ sessionID: source })
},
})
},
)
.get(
"/history",
describeRoute({
summary: "List sync events",
description: "List sync events for all aggregates. Keys are aggregate IDs the client already knows about, values are the last known sequence ID. Events with seq > value are returned for those aggregates. Aggregates not listed in the input get their full history.",
operationId: "global.sync-history.list",
responses: {
200: {
description: "Sync events",
content: {
"application/json": {
schema: resolver(
z.array(
z.object({
id: z.string(),
aggregate_id: z.string(),
seq: z.number(),
type: z.string(),
data: z.record(z.string(), z.unknown()),
}),
),
),
},
},
},
...errors(400),
},
}),
validator(
"json",
z.record(z.string(), z.number().int().min(0)),
),
async (c) => {
const body = c.req.valid("json")
const exclude = Object.entries(body)
const where = exclude.length > 0
? not(or(...exclude.map(([id, seq]) => and(eq(EventTable.aggregate_id, id), lte(EventTable.seq, seq))))!)
: undefined
const rows = Database.use((db) =>
db
.select()
.from(EventTable)
.where(where)
.orderBy(asc(EventTable.seq))
.all(),
)
return c.json(rows)
},
),
)