mirror of
https://fastgit.cc/https://github.com/anomalyco/opencode
synced 2026-04-30 13:51:48 +08:00
190 lines
5.5 KiB
TypeScript
190 lines
5.5 KiB
TypeScript
import z from "zod"
|
|
import { Hono } from "hono"
|
|
import { describeRoute, validator, resolver } from "hono-openapi"
|
|
import { SyncEvent } from "@/sync"
|
|
import { Database, asc, and, not, or, lte, eq } from "@/storage/db"
|
|
import { EventTable } from "@/sync/event.sql"
|
|
import { Log } from "@/util/log"
|
|
import { lazy } from "@/util/lazy"
|
|
import { Instance } from "@/project/instance"
|
|
import { InstanceBootstrap } from "../../project/bootstrap"
|
|
import { errors } from "../error"
|
|
import { streamQueue } from "../stream-queue"
|
|
|
|
const log = Log.create({ service: "server" })
|
|
|
|
const ReplayEvent = z.object({
|
|
id: z.string(),
|
|
aggregateID: z.string(),
|
|
seq: z.number().int().min(0),
|
|
type: z.string(),
|
|
data: z.record(z.string(), z.unknown()),
|
|
})
|
|
|
|
export const SyncRoutes = lazy(() =>
|
|
new Hono()
|
|
.get(
|
|
"/event",
|
|
describeRoute({
|
|
summary: "Subscribe to sync events",
|
|
description: "Get sync events",
|
|
operationId: "sync.event",
|
|
responses: {
|
|
200: {
|
|
description: "Event stream",
|
|
content: {
|
|
"text/event-stream": {
|
|
schema: resolver(
|
|
z
|
|
.object({
|
|
payload: SyncEvent.payloads(),
|
|
})
|
|
.meta({
|
|
ref: "SyncEvent",
|
|
}),
|
|
),
|
|
},
|
|
},
|
|
},
|
|
},
|
|
}),
|
|
async (c) => {
|
|
log.info("sync event connected")
|
|
c.header("X-Accel-Buffering", "no")
|
|
c.header("X-Content-Type-Options", "nosniff")
|
|
return streamQueue(c, {
|
|
connect: (q) => {
|
|
log.info("sync event connected")
|
|
|
|
q.push(
|
|
JSON.stringify({
|
|
type: "server.connected",
|
|
properties: {},
|
|
}),
|
|
)
|
|
},
|
|
heartbeat: (q) => {
|
|
q.push(
|
|
JSON.stringify({
|
|
type: "server.heartbeat",
|
|
properties: {},
|
|
}),
|
|
)
|
|
},
|
|
|
|
subscribe: (q) => {
|
|
const unsub = SyncEvent.subscribeAll(({ def, event }) => {
|
|
q.push(JSON.stringify({ ...event, type: SyncEvent.versionedType(def.type, def.version) }))
|
|
})
|
|
|
|
return () => {
|
|
unsub()
|
|
log.info("sync event disconnected")
|
|
}
|
|
},
|
|
})
|
|
},
|
|
)
|
|
.post(
|
|
"/replay",
|
|
describeRoute({
|
|
summary: "Replay sync events",
|
|
description: "Validate and replay a complete sync event history.",
|
|
operationId: "global.sync-replay",
|
|
responses: {
|
|
200: {
|
|
description: "Replayed sync events",
|
|
content: {
|
|
"application/json": {
|
|
schema: resolver(
|
|
z.object({
|
|
sessionID: z.string(),
|
|
}),
|
|
),
|
|
},
|
|
},
|
|
},
|
|
...errors(400),
|
|
},
|
|
}),
|
|
validator(
|
|
"json",
|
|
z.object({
|
|
directory: z.string(),
|
|
events: z.array(ReplayEvent).min(1),
|
|
}),
|
|
),
|
|
async (c) => {
|
|
const body = c.req.valid("json")
|
|
const events = body.events
|
|
const source = events[0].aggregateID
|
|
if (events.some((item) => item.aggregateID !== source)) {
|
|
throw new Error("Replay events must belong to the same session")
|
|
}
|
|
for (const [i, item] of events.entries()) {
|
|
if (item.seq !== i) throw new Error(`Replay sequence mismatch at index ${i}: expected ${i}, got ${item.seq}`)
|
|
}
|
|
|
|
return Instance.provide({
|
|
directory: body.directory,
|
|
init: InstanceBootstrap,
|
|
async fn() {
|
|
for (const item of events) {
|
|
SyncEvent.replay(item)
|
|
}
|
|
return c.json({ sessionID: source })
|
|
},
|
|
})
|
|
},
|
|
)
|
|
.get(
|
|
"/history",
|
|
describeRoute({
|
|
summary: "List sync events",
|
|
description: "List sync events for all aggregates. Keys are aggregate IDs the client already knows about, values are the last known sequence ID. Events with seq > value are returned for those aggregates. Aggregates not listed in the input get their full history.",
|
|
operationId: "global.sync-history.list",
|
|
responses: {
|
|
200: {
|
|
description: "Sync events",
|
|
content: {
|
|
"application/json": {
|
|
schema: resolver(
|
|
z.array(
|
|
z.object({
|
|
id: z.string(),
|
|
aggregate_id: z.string(),
|
|
seq: z.number(),
|
|
type: z.string(),
|
|
data: z.record(z.string(), z.unknown()),
|
|
}),
|
|
),
|
|
),
|
|
},
|
|
},
|
|
},
|
|
...errors(400),
|
|
},
|
|
}),
|
|
validator(
|
|
"json",
|
|
z.record(z.string(), z.number().int().min(0)),
|
|
),
|
|
async (c) => {
|
|
const body = c.req.valid("json")
|
|
const exclude = Object.entries(body)
|
|
const where = exclude.length > 0
|
|
? not(or(...exclude.map(([id, seq]) => and(eq(EventTable.aggregate_id, id), lte(EventTable.seq, seq))))!)
|
|
: undefined
|
|
const rows = Database.use((db) =>
|
|
db
|
|
.select()
|
|
.from(EventTable)
|
|
.where(where)
|
|
.orderBy(asc(EventTable.seq))
|
|
.all(),
|
|
)
|
|
return c.json(rows)
|
|
},
|
|
),
|
|
)
|