From af20191d1cd60a7f4a421ad81eca5053f7deace1 Mon Sep 17 00:00:00 2001 From: James Long Date: Wed, 15 Apr 2026 10:18:48 -0400 Subject: [PATCH] feat(core): sync routes, refactor proxy, session restore, and more syncing (#22518) --- .../opencode/src/control-plane/workspace.ts | 260 ++++++++++++-- .../opencode/src/server/instance/index.ts | 2 + .../src/server/instance/middleware.ts | 66 ++-- packages/opencode/src/server/instance/sync.ts | 118 +++++++ .../opencode/src/server/instance/workspace.ts | 71 +++- packages/opencode/src/server/middleware.ts | 2 +- packages/opencode/src/server/proxy.ts | 55 ++- packages/opencode/src/sync/index.ts | 19 ++ packages/sdk/js/src/v2/gen/sdk.gen.ts | 157 +++++++++ packages/sdk/js/src/v2/gen/types.gen.ts | 127 +++++++ packages/sdk/openapi.json | 320 ++++++++++++++++++ 11 files changed, 1133 insertions(+), 64 deletions(-) create mode 100644 packages/opencode/src/server/instance/sync.ts diff --git a/packages/opencode/src/control-plane/workspace.ts b/packages/opencode/src/control-plane/workspace.ts index f330e07b7a..78f3d770eb 100644 --- a/packages/opencode/src/control-plane/workspace.ts +++ b/packages/opencode/src/control-plane/workspace.ts @@ -1,11 +1,13 @@ import z from "zod" import { setTimeout as sleep } from "node:timers/promises" import { fn } from "@/util/fn" -import { Database, eq } from "@/storage/db" +import { Database, asc, eq } from "@/storage/db" import { Project } from "@/project/project" import { BusEvent } from "@/bus/bus-event" import { GlobalBus } from "@/bus/global" import { SyncEvent } from "@/sync" +import { EventTable } from "@/sync/event.sql" +import { Flag } from "@/flag/flag" import { Log } from "@/util/log" import { Filesystem } from "@/util/filesystem" import { ProjectID } from "@/project/schema" @@ -15,6 +17,11 @@ import { getAdaptor } from "./adaptors" import { WorkspaceInfo } from "./types" import { WorkspaceID } from "./schema" import { parseSSE } from "./sse" +import { Session } from "@/session" +import { SessionTable } from "@/session/session.sql" +import { SessionID } from "@/session/schema" +import { errorData } from "@/util/error" +import { AppRuntime } from "@/effect/app-runtime" export namespace Workspace { export const Info = WorkspaceInfo.meta({ @@ -29,6 +36,13 @@ export namespace Workspace { }) export type ConnectionStatus = z.infer + const Restore = z.object({ + workspaceID: WorkspaceID.zod, + sessionID: SessionID.zod, + total: z.number().int().min(0), + step: z.number().int().min(0), + }) + export const Event = { Ready: BusEvent.define( "workspace.ready", @@ -42,6 +56,7 @@ export namespace Workspace { message: z.string(), }), ), + Restore: BusEvent.define("workspace.restore", Restore), Status: BusEvent.define("workspace.status", ConnectionStatus), } @@ -102,11 +117,170 @@ export namespace Workspace { return info }) + const SessionRestoreInput = z.object({ + workspaceID: WorkspaceID.zod, + sessionID: SessionID.zod, + }) + + export const sessionRestore = fn(SessionRestoreInput, async (input) => { + log.info("session restore requested", { + workspaceID: input.workspaceID, + sessionID: input.sessionID, + }) + try { + const space = await get(input.workspaceID) + if (!space) throw new Error(`Workspace not found: ${input.workspaceID}`) + + const adaptor = await getAdaptor(space.projectID, space.type) + const target = await adaptor.target(space) + + // Need to switch the workspace of the session + SyncEvent.run(Session.Event.Updated, { + sessionID: input.sessionID, + info: { + workspaceID: input.workspaceID, + }, + }) + + const rows = Database.use((db) => + db + .select({ + id: EventTable.id, + aggregateID: EventTable.aggregate_id, + seq: EventTable.seq, + type: EventTable.type, + data: EventTable.data, + }) + .from(EventTable) + .where(eq(EventTable.aggregate_id, input.sessionID)) + .orderBy(asc(EventTable.seq)) + .all(), + ) + if (rows.length === 0) throw new Error(`No events found for session: ${input.sessionID}`) + + const all = rows + + const size = 10 + const sets = Array.from({ length: Math.ceil(all.length / size) }, (_, i) => all.slice(i * size, (i + 1) * size)) + const total = sets.length + log.info("session restore prepared", { + workspaceID: input.workspaceID, + sessionID: input.sessionID, + workspaceType: space.type, + directory: space.directory, + target: target.type === "remote" ? String(route(target.url, "/sync/replay")) : target.directory, + events: all.length, + batches: total, + first: all[0]?.seq, + last: all.at(-1)?.seq, + }) + GlobalBus.emit("event", { + directory: "global", + workspace: input.workspaceID, + payload: { + type: Event.Restore.type, + properties: { + workspaceID: input.workspaceID, + sessionID: input.sessionID, + total, + step: 0, + }, + }, + }) + for (const [i, events] of sets.entries()) { + log.info("session restore batch starting", { + workspaceID: input.workspaceID, + sessionID: input.sessionID, + step: i + 1, + total, + events: events.length, + first: events[0]?.seq, + last: events.at(-1)?.seq, + target: target.type === "remote" ? String(route(target.url, "/sync/replay")) : target.directory, + }) + if (target.type === "local") { + SyncEvent.replayAll(events) + log.info("session restore batch replayed locally", { + workspaceID: input.workspaceID, + sessionID: input.sessionID, + step: i + 1, + total, + events: events.length, + }) + } else { + const url = route(target.url, "/sync/replay") + const headers = new Headers(target.headers) + headers.set("content-type", "application/json") + const res = await fetch(url, { + method: "POST", + headers, + body: JSON.stringify({ + directory: space.directory ?? "", + events, + }), + }) + if (!res.ok) { + const body = await res.text() + log.error("session restore batch failed", { + workspaceID: input.workspaceID, + sessionID: input.sessionID, + step: i + 1, + total, + status: res.status, + body, + }) + throw new Error( + `Failed to replay session ${input.sessionID} into workspace ${input.workspaceID}: HTTP ${res.status} ${body}`, + ) + } + log.info("session restore batch posted", { + workspaceID: input.workspaceID, + sessionID: input.sessionID, + step: i + 1, + total, + status: res.status, + }) + } + GlobalBus.emit("event", { + directory: "global", + workspace: input.workspaceID, + payload: { + type: Event.Restore.type, + properties: { + workspaceID: input.workspaceID, + sessionID: input.sessionID, + total, + step: i + 1, + }, + }, + }) + } + + log.info("session restore complete", { + workspaceID: input.workspaceID, + sessionID: input.sessionID, + batches: total, + }) + + return { + total, + } + } catch (err) { + log.error("session restore failed", { + workspaceID: input.workspaceID, + sessionID: input.sessionID, + error: errorData(err), + }) + throw err + } + }) + export function list(project: Project.Info) { const rows = Database.use((db) => db.select().from(WorkspaceTable).where(eq(WorkspaceTable.project_id, project.id)).all(), ) const spaces = rows.map(fromRow).sort((a, b) => a.id.localeCompare(b.id)) + for (const space of spaces) startSync(space) return spaces } @@ -120,13 +294,25 @@ export namespace Workspace { }) export const remove = fn(WorkspaceID.zod, async (id) => { + const sessions = Database.use((db) => + db.select({ id: SessionTable.id }).from(SessionTable).where(eq(SessionTable.workspace_id, id)).all(), + ) + for (const session of sessions) { + await AppRuntime.runPromise(Session.Service.use((svc) => svc.remove(session.id))) + } + const row = Database.use((db) => db.select().from(WorkspaceTable).where(eq(WorkspaceTable.id, id)).get()) + if (row) { stopSync(id) const info = fromRow(row) - const adaptor = await getAdaptor(info.projectID, row.type) - adaptor.remove(info) + try { + const adaptor = await getAdaptor(info.projectID, row.type) + await adaptor.remove(info) + } catch (err) { + log.error("adaptor not available when removing workspace", { type: row.type }) + } Database.use((db) => db.delete(WorkspaceTable).where(eq(WorkspaceTable.id, id)).run()) return info } @@ -156,51 +342,81 @@ export namespace Workspace { const log = Log.create({ service: "workspace-sync" }) - async function workspaceEventLoop(space: Info, signal: AbortSignal) { - log.info("starting sync: " + space.id) + function route(url: string | URL, path: string) { + const next = new URL(url) + next.pathname = `${next.pathname.replace(/\/$/, "")}${path}` + next.search = "" + next.hash = "" + return next + } + async function syncWorkspace(space: Info, signal: AbortSignal) { while (!signal.aborted) { - log.info("connecting to sync: " + space.id) + log.info("connecting to global sync", { workspace: space.name }) - setStatus(space.id, "connecting") const adaptor = await getAdaptor(space.projectID, space.type) const target = await adaptor.target(space) if (target.type === "local") return - const res = await fetch(target.url + "/sync/event", { method: "GET", signal }).catch((err: unknown) => { - setStatus(space.id, "error", String(err)) + const res = await fetch(route(target.url, "/global/event"), { + method: "GET", + headers: target.headers, + signal, + }).catch((err: unknown) => { + setStatus(space.id, "error") + + log.info("failed to connect to global sync", { + workspace: space.name, + error: err, + }) return undefined }) - if (!res || !res.ok || !res.body) { - log.info("failed to connect to sync: " + res?.status) - setStatus(space.id, "error", res ? `HTTP ${res.status}` : "no response") + if (!res || !res.ok || !res.body) { + log.info("failed to connect to global sync", { workspace: space.name }) + setStatus(space.id, "error") await sleep(1000) continue } - setStatus(space.id, "connected") - await parseSSE(res.body, signal, (evt) => { - const event = evt as SyncEvent.SerializedEvent + log.info("global sync connected", { workspace: space.name }) + setStatus(space.id, "connected") + + await parseSSE(res.body, signal, (evt: any) => { try { - if (!event.type.startsWith("server.")) { - SyncEvent.replay(event) + if (!("payload" in evt)) return + + if (evt.payload.type === "sync") { + // This name -> type is temporary + SyncEvent.replay({ ...evt.payload, type: evt.payload.name } as SyncEvent.SerializedEvent) } + + GlobalBus.emit("event", { + directory: evt.directory, + project: evt.project, + workspace: space.id, + payload: evt.payload, + }) } catch (err) { - log.warn("failed to replay sync event", { + log.info("failed to replay global event", { workspaceID: space.id, error: err, }) } }) + + log.info("disconnected from global sync: " + space.id) setStatus(space.id, "disconnected") - log.info("disconnected to sync: " + space.id) - await sleep(250) + + // TODO: Implement exponential backoff + await sleep(1000) } } function startSync(space: Info) { + if (!Flag.OPENCODE_EXPERIMENTAL_WORKSPACES) return + if (space.type === "worktree") { void Filesystem.exists(space.directory!).then((exists) => { setStatus(space.id, exists ? "connected" : "error", exists ? undefined : "directory does not exist") @@ -213,9 +429,9 @@ export namespace Workspace { aborts.set(space.id, abort) setStatus(space.id, "disconnected") - void workspaceEventLoop(space, abort.signal).catch((error) => { + void syncWorkspace(space, abort.signal).catch((error) => { setStatus(space.id, "error", String(error)) - log.warn("workspace sync listener failed", { + log.warn("workspace listener failed", { workspaceID: space.id, error, }) diff --git a/packages/opencode/src/server/instance/index.ts b/packages/opencode/src/server/instance/index.ts index 86a18dc673..4a03b7b29c 100644 --- a/packages/opencode/src/server/instance/index.ts +++ b/packages/opencode/src/server/instance/index.ts @@ -23,6 +23,7 @@ import { ConfigRoutes } from "./config" import { ExperimentalRoutes } from "./experimental" import { ProviderRoutes } from "./provider" import { EventRoutes } from "./event" +import { SyncRoutes } from "./sync" import { WorkspaceRouterMiddleware } from "./middleware" import { AppRuntime } from "@/effect/app-runtime" @@ -37,6 +38,7 @@ export const InstanceRoutes = (upgrade: UpgradeWebSocket): Hono => .route("/permission", PermissionRoutes()) .route("/question", QuestionRoutes()) .route("/provider", ProviderRoutes()) + .route("/sync", SyncRoutes()) .route("/", FileRoutes()) .route("/", EventRoutes()) .route("/mcp", McpRoutes()) diff --git a/packages/opencode/src/server/instance/middleware.ts b/packages/opencode/src/server/instance/middleware.ts index 9155ad451b..824c265efe 100644 --- a/packages/opencode/src/server/instance/middleware.ts +++ b/packages/opencode/src/server/instance/middleware.ts @@ -11,9 +11,12 @@ import { Session } from "@/session" import { SessionID } from "@/session/schema" import { WorkspaceContext } from "@/control-plane/workspace-context" import { AppRuntime } from "@/effect/app-runtime" +import { Log } from "@/util/log" type Rule = { method?: string; path: string; exact?: boolean; action: "local" | "forward" } +const OPENCODE_WORKSPACE = process.env.OPENCODE_WORKSPACE + const RULES: Array = [ { path: "/session/status", action: "forward" }, { method: "GET", path: "/session", action: "local" }, @@ -46,6 +49,8 @@ async function getSessionWorkspace(url: URL) { } export function WorkspaceRouterMiddleware(upgrade: UpgradeWebSocket): MiddlewareHandler { + const log = Log.create({ service: "workspace-router" }) + return async (c, next) => { const raw = c.req.query("directory") || c.req.header("x-opencode-directory") || process.cwd() const directory = Filesystem.resolve( @@ -63,8 +68,22 @@ export function WorkspaceRouterMiddleware(upgrade: UpgradeWebSocket): Middleware const sessionWorkspaceID = await getSessionWorkspace(url) const workspaceID = sessionWorkspaceID || url.searchParams.get("workspace") - // If no workspace is provided we use the project - if (!workspaceID) { + if (!workspaceID || url.pathname.startsWith("/console") || OPENCODE_WORKSPACE) { + if (OPENCODE_WORKSPACE) { + return WorkspaceContext.provide({ + workspaceID: WorkspaceID.make(OPENCODE_WORKSPACE), + async fn() { + return Instance.provide({ + directory, + init: () => AppRuntime.runPromise(InstanceBootstrap), + async fn() { + return next() + }, + }) + }, + }) + } + return Instance.provide({ directory, init: () => AppRuntime.runPromise(InstanceBootstrap), @@ -77,16 +96,6 @@ export function WorkspaceRouterMiddleware(upgrade: UpgradeWebSocket): Middleware const workspace = await Workspace.get(WorkspaceID.make(workspaceID)) if (!workspace) { - // Special-case deleting a session in case user's data in a - // weird state. Allow them to forcefully delete a synced session - // even if the remote workspace is not in their data. - // - // The lets the `DELETE /session/:id` endpoint through and we've - // made sure that it will run without an instance - if (url.pathname.match(/\/session\/[^/]+$/) && c.req.method === "DELETE") { - return next() - } - return new Response(`Workspace not found: ${workspaceID}`, { status: 500, headers: { @@ -95,6 +104,12 @@ export function WorkspaceRouterMiddleware(upgrade: UpgradeWebSocket): Middleware }) } + if (local(c.req.method, url.pathname)) { + // No instance provided because we are serving cached data; there + // is no instance to work with + return next() + } + const adaptor = await getAdaptor(workspace.projectID, workspace.type) const target = await adaptor.target(workspace) @@ -112,24 +127,27 @@ export function WorkspaceRouterMiddleware(upgrade: UpgradeWebSocket): Middleware }) } - if (local(c.req.method, url.pathname)) { - // No instance provided because we are serving cached data; there - // is no instance to work with - return next() - } + const proxyURL = new URL(target.url) + proxyURL.pathname = `${proxyURL.pathname.replace(/\/$/, "")}${url.pathname}` + proxyURL.search = url.search + proxyURL.hash = url.hash + proxyURL.searchParams.delete("workspace") + + log.info("workspace proxy forwarding", { + workspaceID, + request: url.toString(), + target: String(target.url), + proxy: proxyURL.toString(), + }) if (c.req.header("upgrade")?.toLowerCase() === "websocket") { - return ServerProxy.websocket(upgrade, target, c.req.raw, c.env) + return ServerProxy.websocket(upgrade, proxyURL, target.headers, c.req.raw, c.env) } const headers = new Headers(c.req.raw.headers) headers.delete("x-opencode-workspace") - return ServerProxy.http( - target, - new Request(c.req.raw, { - headers, - }), - ) + const req = new Request(c.req.raw, { headers }) + return ServerProxy.http(proxyURL, target.headers, req) } } diff --git a/packages/opencode/src/server/instance/sync.ts b/packages/opencode/src/server/instance/sync.ts new file mode 100644 index 0000000000..c22969130a --- /dev/null +++ b/packages/opencode/src/server/instance/sync.ts @@ -0,0 +1,118 @@ +import z from "zod" +import { Hono } from "hono" +import { describeRoute, validator, resolver } from "hono-openapi" +import { SyncEvent } from "@/sync" +import { Database, asc, and, not, or, lte, eq } from "@/storage/db" +import { EventTable } from "@/sync/event.sql" +import { lazy } from "@/util/lazy" +import { Log } from "@/util/log" +import { errors } from "../error" + +const ReplayEvent = z.object({ + id: z.string(), + aggregateID: z.string(), + seq: z.number().int().min(0), + type: z.string(), + data: z.record(z.string(), z.unknown()), +}) + +const log = Log.create({ service: "server.sync" }) + +export const SyncRoutes = lazy(() => + new Hono() + .post( + "/replay", + describeRoute({ + summary: "Replay sync events", + description: "Validate and replay a complete sync event history.", + operationId: "sync.replay", + responses: { + 200: { + description: "Replayed sync events", + content: { + "application/json": { + schema: resolver( + z.object({ + sessionID: z.string(), + }), + ), + }, + }, + }, + ...errors(400), + }, + }), + validator( + "json", + z.object({ + directory: z.string(), + events: z.array(ReplayEvent).min(1), + }), + ), + async (c) => { + const body = c.req.valid("json") + const events = body.events + const source = events[0].aggregateID + log.info("sync replay requested", { + sessionID: source, + events: events.length, + first: events[0]?.seq, + last: events.at(-1)?.seq, + directory: body.directory, + }) + SyncEvent.replayAll(events) + + log.info("sync replay complete", { + sessionID: source, + events: events.length, + first: events[0]?.seq, + last: events.at(-1)?.seq, + }) + + return c.json({ + sessionID: source, + }) + }, + ) + .get( + "/history", + describeRoute({ + summary: "List sync events", + description: + "List sync events for all aggregates. Keys are aggregate IDs the client already knows about, values are the last known sequence ID. Events with seq > value are returned for those aggregates. Aggregates not listed in the input get their full history.", + operationId: "sync.history.list", + responses: { + 200: { + description: "Sync events", + content: { + "application/json": { + schema: resolver( + z.array( + z.object({ + id: z.string(), + aggregate_id: z.string(), + seq: z.number(), + type: z.string(), + data: z.record(z.string(), z.unknown()), + }), + ), + ), + }, + }, + }, + ...errors(400), + }, + }), + validator("json", z.record(z.string(), z.number().int().min(0))), + async (c) => { + const body = c.req.valid("json") + const exclude = Object.entries(body) + const where = + exclude.length > 0 + ? not(or(...exclude.map(([id, seq]) => and(eq(EventTable.aggregate_id, id), lte(EventTable.seq, seq))))!) + : undefined + const rows = Database.use((db) => db.select().from(EventTable).where(where).orderBy(asc(EventTable.seq)).all()) + return c.json(rows) + }, + ), +) diff --git a/packages/opencode/src/server/instance/workspace.ts b/packages/opencode/src/server/instance/workspace.ts index 7cee031975..a4ff4eda8d 100644 --- a/packages/opencode/src/server/instance/workspace.ts +++ b/packages/opencode/src/server/instance/workspace.ts @@ -6,12 +6,10 @@ import { Workspace } from "../../control-plane/workspace" import { Instance } from "../../project/instance" import { errors } from "../error" import { lazy } from "../../util/lazy" +import { Log } from "@/util/log" +import { errorData } from "@/util/error" -const WorkspaceAdaptor = z.object({ - type: z.string(), - name: z.string(), - description: z.string(), -}) +const log = Log.create({ service: "server.workspace" }) export const WorkspaceRoutes = lazy(() => new Hono() @@ -26,7 +24,15 @@ export const WorkspaceRoutes = lazy(() => description: "Workspace adaptors", content: { "application/json": { - schema: resolver(z.array(WorkspaceAdaptor)), + schema: resolver( + z.array( + z.object({ + type: z.string(), + name: z.string(), + description: z.string(), + }), + ), + ), }, }, }, @@ -140,5 +146,58 @@ export const WorkspaceRoutes = lazy(() => const { id } = c.req.valid("param") return c.json(await Workspace.remove(id)) }, + ) + .post( + "/:id/session-restore", + describeRoute({ + summary: "Restore session into workspace", + description: "Replay a session's sync events into the target workspace in batches.", + operationId: "experimental.workspace.sessionRestore", + responses: { + 200: { + description: "Session replay started", + content: { + "application/json": { + schema: resolver( + z.object({ + total: z.number().int().min(0), + }), + ), + }, + }, + }, + ...errors(400), + }, + }), + validator("param", z.object({ id: Workspace.Info.shape.id })), + validator("json", Workspace.sessionRestore.schema.omit({ workspaceID: true })), + async (c) => { + const { id } = c.req.valid("param") + const body = c.req.valid("json") + log.info("session restore route requested", { + workspaceID: id, + sessionID: body.sessionID, + directory: Instance.directory, + }) + try { + const result = await Workspace.sessionRestore({ + workspaceID: id, + ...body, + }) + log.info("session restore route complete", { + workspaceID: id, + sessionID: body.sessionID, + total: result.total, + }) + return c.json(result) + } catch (err) { + log.error("session restore route failed", { + workspaceID: id, + sessionID: body.sessionID, + error: errorData(err), + }) + throw err + } + }, ), ) diff --git a/packages/opencode/src/server/middleware.ts b/packages/opencode/src/server/middleware.ts index a51ba602b5..d0539eb247 100644 --- a/packages/opencode/src/server/middleware.ts +++ b/packages/opencode/src/server/middleware.ts @@ -86,7 +86,7 @@ const zipped = compress() export const CompressionMiddleware: MiddlewareHandler = (c, next) => { const path = c.req.path const method = c.req.method - if (path === "/event" || path === "/global/event" || path === "/global/sync-event") return next() + if (path === "/event" || path === "/global/event") return next() if (method === "POST" && /\/session\/[^/]+\/(message|prompt_async)$/.test(path)) return next() return zipped(c, next) } diff --git a/packages/opencode/src/server/proxy.ts b/packages/opencode/src/server/proxy.ts index c90a657dc2..0c0deba20c 100644 --- a/packages/opencode/src/server/proxy.ts +++ b/packages/opencode/src/server/proxy.ts @@ -1,6 +1,6 @@ -import type { Target } from "@/control-plane/types" import { Hono } from "hono" import type { UpgradeWebSocket } from "hono/ws" +import { Log } from "@/util/log" const hop = new Set([ "connection", @@ -20,6 +20,7 @@ type Msg = string | ArrayBuffer | Uint8Array function headers(req: Request, extra?: HeadersInit) { const out = new Headers(req.headers) for (const key of hop) out.delete(key) + out.delete("accept-encoding") out.delete("x-opencode-directory") out.delete("x-opencode-workspace") if (!extra) return out @@ -98,31 +99,63 @@ const app = (upgrade: UpgradeWebSocket) => ) export namespace ServerProxy { - export function http(target: Extract, req: Request) { + const log = Log.Default.clone().tag("service", "server-proxy") + + export function http(url: string | URL, extra: HeadersInit | undefined, req: Request) { + console.log("proxy http request", { + method: req.method, + request: req.url, + url: String(url), + }) return fetch( - new Request(target.url, { + new Request(url, { method: req.method, - headers: headers(req, target.headers), + headers: headers(req, extra), body: req.method === "GET" || req.method === "HEAD" ? undefined : req.body, redirect: "manual", signal: req.signal, }), - ) + ).then((res) => { + const next = new Headers(res.headers) + next.delete("content-encoding") + next.delete("content-length") + + console.log("proxy http response", { + method: req.method, + request: req.url, + url: String(url), + status: res.status, + statusText: res.statusText, + }) + return new Response(res.body, { + status: res.status, + statusText: res.statusText, + headers: next, + }) + }) } export function websocket( upgrade: UpgradeWebSocket, - target: Extract, + target: string | URL, + extra: HeadersInit | undefined, req: Request, env: unknown, ) { - const url = new URL(req.url) - url.pathname = "/__workspace_ws" - url.search = "" + const proxy = new URL(req.url) + proxy.pathname = "/__workspace_ws" + proxy.search = "" const next = new Headers(req.headers) - next.set("x-opencode-proxy-url", socket(target.url)) + next.set("x-opencode-proxy-url", socket(target)) + for (const [key, value] of new Headers(extra).entries()) { + next.set(key, value) + } + log.info("proxy websocket", { + request: req.url, + target: String(target), + }) return app(upgrade).fetch( - new Request(url, { + new Request(proxy, { method: req.method, headers: next, signal: req.signal, diff --git a/packages/opencode/src/sync/index.ts b/packages/opencode/src/sync/index.ts index d7cb7f774f..ce598dae67 100644 --- a/packages/opencode/src/sync/index.ts +++ b/packages/opencode/src/sync/index.ts @@ -199,6 +199,25 @@ export namespace SyncEvent { process(def, event, { publish: !!options?.publish }) } + export function replayAll(events: SerializedEvent[], options?: { publish: boolean }) { + const source = events[0]?.aggregateID + if (!source) return + if (events.some((item) => item.aggregateID !== source)) { + throw new Error("Replay events must belong to the same session") + } + const start = events[0].seq + for (const [i, item] of events.entries()) { + const seq = start + i + if (item.seq !== seq) { + throw new Error(`Replay sequence mismatch at index ${i}: expected ${seq}, got ${item.seq}`) + } + } + for (const item of events) { + replay(item, options) + } + return source + } + export function run(def: Def, data: Event["data"], options?: { publish?: boolean }) { const agg = (data as Record)[def.aggregate] // This should never happen: we've enforced it via typescript in diff --git a/packages/sdk/js/src/v2/gen/sdk.gen.ts b/packages/sdk/js/src/v2/gen/sdk.gen.ts index b5fc976bba..d7bf43f506 100644 --- a/packages/sdk/js/src/v2/gen/sdk.gen.ts +++ b/packages/sdk/js/src/v2/gen/sdk.gen.ts @@ -35,6 +35,8 @@ import type { ExperimentalWorkspaceListResponses, ExperimentalWorkspaceRemoveErrors, ExperimentalWorkspaceRemoveResponses, + ExperimentalWorkspaceSessionRestoreErrors, + ExperimentalWorkspaceSessionRestoreResponses, ExperimentalWorkspaceStatusResponses, FileListResponses, FilePartInput, @@ -157,6 +159,10 @@ import type { SessionUpdateErrors, SessionUpdateResponses, SubtaskPartInput, + SyncHistoryListErrors, + SyncHistoryListResponses, + SyncReplayErrors, + SyncReplayResponses, TextPartInput, ToolIdsErrors, ToolIdsResponses, @@ -1243,6 +1249,49 @@ export class Workspace extends HeyApiClient { }) } + /** + * Restore session into workspace + * + * Replay a session's sync events into the target workspace in batches. + */ + public sessionRestore( + parameters: { + id: string + directory?: string + workspace?: string + sessionID?: string + }, + options?: Options, + ) { + const params = buildClientParams( + [parameters], + [ + { + args: [ + { in: "path", key: "id" }, + { in: "query", key: "directory" }, + { in: "query", key: "workspace" }, + { in: "body", key: "sessionID" }, + ], + }, + ], + ) + return (options?.client ?? this.client).post< + ExperimentalWorkspaceSessionRestoreResponses, + ExperimentalWorkspaceSessionRestoreErrors, + ThrowOnError + >({ + url: "/experimental/workspace/{id}/session-restore", + ...options, + ...params, + headers: { + "Content-Type": "application/json", + ...options?.headers, + ...params.headers, + }, + }) + } + private _adaptor?: Adaptor get adaptor(): Adaptor { return (this._adaptor ??= new Adaptor({ client: this.client })) @@ -2961,6 +3010,109 @@ export class Provider extends HeyApiClient { } } +export class History extends HeyApiClient { + /** + * List sync events + * + * List sync events for all aggregates. Keys are aggregate IDs the client already knows about, values are the last known sequence ID. Events with seq > value are returned for those aggregates. Aggregates not listed in the input get their full history. + */ + public list( + parameters?: { + directory?: string + workspace?: string + body?: { + [key: string]: number + } + }, + options?: Options, + ) { + const params = buildClientParams( + [parameters], + [ + { + args: [ + { in: "query", key: "directory" }, + { in: "query", key: "workspace" }, + { key: "body", map: "body" }, + ], + }, + ], + ) + return (options?.client ?? this.client).get({ + url: "/sync/history", + ...options, + ...params, + headers: { + "Content-Type": "application/json", + ...options?.headers, + ...params.headers, + }, + }) + } +} + +export class Sync extends HeyApiClient { + /** + * Replay sync events + * + * Validate and replay a complete sync event history. + */ + public replay( + parameters?: { + query_directory?: string + workspace?: string + body_directory?: string + events?: Array<{ + id: string + aggregateID: string + seq: number + type: string + data: { + [key: string]: unknown + } + }> + }, + options?: Options, + ) { + const params = buildClientParams( + [parameters], + [ + { + args: [ + { + in: "query", + key: "query_directory", + map: "directory", + }, + { in: "query", key: "workspace" }, + { + in: "body", + key: "body_directory", + map: "directory", + }, + { in: "body", key: "events" }, + ], + }, + ], + ) + return (options?.client ?? this.client).post({ + url: "/sync/replay", + ...options, + ...params, + headers: { + "Content-Type": "application/json", + ...options?.headers, + ...params.headers, + }, + }) + } + + private _history?: History + get history(): History { + return (this._history ??= new History({ client: this.client })) + } +} + export class Find extends HeyApiClient { /** * Find text @@ -4217,6 +4369,11 @@ export class OpencodeClient extends HeyApiClient { return (this._provider ??= new Provider({ client: this.client })) } + private _sync?: Sync + get sync(): Sync { + return (this._sync ??= new Sync({ client: this.client })) + } + private _find?: Find get find(): Find { return (this._find ??= new Find({ client: this.client })) diff --git a/packages/sdk/js/src/v2/gen/types.gen.ts b/packages/sdk/js/src/v2/gen/types.gen.ts index 8f4c16c5bd..24c1d53bf7 100644 --- a/packages/sdk/js/src/v2/gen/types.gen.ts +++ b/packages/sdk/js/src/v2/gen/types.gen.ts @@ -520,6 +520,16 @@ export type EventWorkspaceFailed = { } } +export type EventWorkspaceRestore = { + type: "workspace.restore" + properties: { + workspaceID: string + sessionID: string + total: number + step: number + } +} + export type EventWorkspaceStatus = { type: "workspace.status" properties: { @@ -1137,6 +1147,7 @@ export type GlobalEvent = { | EventPtyDeleted | EventWorkspaceReady | EventWorkspaceFailed + | EventWorkspaceRestore | EventWorkspaceStatus | EventMessageUpdated | EventMessageRemoved @@ -2049,6 +2060,7 @@ export type Event = | EventPtyDeleted | EventWorkspaceReady | EventWorkspaceFailed + | EventWorkspaceRestore | EventWorkspaceStatus | EventMessageUpdated | EventMessageRemoved @@ -3006,6 +3018,42 @@ export type ExperimentalWorkspaceRemoveResponses = { export type ExperimentalWorkspaceRemoveResponse = ExperimentalWorkspaceRemoveResponses[keyof ExperimentalWorkspaceRemoveResponses] +export type ExperimentalWorkspaceSessionRestoreData = { + body?: { + sessionID: string + } + path: { + id: string + } + query?: { + directory?: string + workspace?: string + } + url: "/experimental/workspace/{id}/session-restore" +} + +export type ExperimentalWorkspaceSessionRestoreErrors = { + /** + * Bad request + */ + 400: BadRequestError +} + +export type ExperimentalWorkspaceSessionRestoreError = + ExperimentalWorkspaceSessionRestoreErrors[keyof ExperimentalWorkspaceSessionRestoreErrors] + +export type ExperimentalWorkspaceSessionRestoreResponses = { + /** + * Session replay started + */ + 200: { + total: number + } +} + +export type ExperimentalWorkspaceSessionRestoreResponse = + ExperimentalWorkspaceSessionRestoreResponses[keyof ExperimentalWorkspaceSessionRestoreResponses] + export type WorktreeRemoveData = { body?: WorktreeRemoveInput path?: never @@ -4456,6 +4504,85 @@ export type ProviderOauthCallbackResponses = { export type ProviderOauthCallbackResponse = ProviderOauthCallbackResponses[keyof ProviderOauthCallbackResponses] +export type SyncReplayData = { + body?: { + directory: string + events: Array<{ + id: string + aggregateID: string + seq: number + type: string + data: { + [key: string]: unknown + } + }> + } + path?: never + query?: { + directory?: string + workspace?: string + } + url: "/sync/replay" +} + +export type SyncReplayErrors = { + /** + * Bad request + */ + 400: BadRequestError +} + +export type SyncReplayError = SyncReplayErrors[keyof SyncReplayErrors] + +export type SyncReplayResponses = { + /** + * Replayed sync events + */ + 200: { + sessionID: string + } +} + +export type SyncReplayResponse = SyncReplayResponses[keyof SyncReplayResponses] + +export type SyncHistoryListData = { + body?: { + [key: string]: number + } + path?: never + query?: { + directory?: string + workspace?: string + } + url: "/sync/history" +} + +export type SyncHistoryListErrors = { + /** + * Bad request + */ + 400: BadRequestError +} + +export type SyncHistoryListError = SyncHistoryListErrors[keyof SyncHistoryListErrors] + +export type SyncHistoryListResponses = { + /** + * Sync events + */ + 200: Array<{ + id: string + aggregate_id: string + seq: number + type: string + data: { + [key: string]: unknown + } + }> +} + +export type SyncHistoryListResponse = SyncHistoryListResponses[keyof SyncHistoryListResponses] + export type FindTextData = { body?: never path?: never diff --git a/packages/sdk/openapi.json b/packages/sdk/openapi.json index 6000e66042..ee3538d55f 100644 --- a/packages/sdk/openapi.json +++ b/packages/sdk/openapi.json @@ -1805,6 +1805,90 @@ ] } }, + "/experimental/workspace/{id}/session-restore": { + "post": { + "operationId": "experimental.workspace.sessionRestore", + "parameters": [ + { + "in": "query", + "name": "directory", + "schema": { + "type": "string" + } + }, + { + "in": "query", + "name": "workspace", + "schema": { + "type": "string" + } + }, + { + "in": "path", + "name": "id", + "schema": { + "type": "string", + "pattern": "^wrk.*" + }, + "required": true + } + ], + "summary": "Restore session into workspace", + "description": "Replay a session's sync events into the target workspace in batches.", + "responses": { + "200": { + "description": "Session replay started", + "content": { + "application/json": { + "schema": { + "type": "object", + "properties": { + "total": { + "type": "integer", + "minimum": 0, + "maximum": 9007199254740991 + } + }, + "required": ["total"] + } + } + } + }, + "400": { + "description": "Bad request", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/BadRequestError" + } + } + } + } + }, + "requestBody": { + "content": { + "application/json": { + "schema": { + "type": "object", + "properties": { + "sessionID": { + "type": "string", + "pattern": "^ses.*" + } + }, + "required": ["sessionID"] + } + } + } + }, + "x-codeSamples": [ + { + "lang": "js", + "source": "import { createOpencodeClient } from \"@opencode-ai/sdk\n\nconst client = createOpencodeClient()\nawait client.experimental.workspace.sessionRestore({\n ...\n})" + } + ] + } + }, "/experimental/worktree": { "post": { "operationId": "worktree.create", @@ -5143,6 +5227,202 @@ ] } }, + "/sync/replay": { + "post": { + "operationId": "sync.replay", + "parameters": [ + { + "in": "query", + "name": "directory", + "schema": { + "type": "string" + } + }, + { + "in": "query", + "name": "workspace", + "schema": { + "type": "string" + } + } + ], + "summary": "Replay sync events", + "description": "Validate and replay a complete sync event history.", + "responses": { + "200": { + "description": "Replayed sync events", + "content": { + "application/json": { + "schema": { + "type": "object", + "properties": { + "sessionID": { + "type": "string" + } + }, + "required": ["sessionID"] + } + } + } + }, + "400": { + "description": "Bad request", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/BadRequestError" + } + } + } + } + }, + "requestBody": { + "content": { + "application/json": { + "schema": { + "type": "object", + "properties": { + "directory": { + "type": "string" + }, + "events": { + "minItems": 1, + "type": "array", + "items": { + "type": "object", + "properties": { + "id": { + "type": "string" + }, + "aggregateID": { + "type": "string" + }, + "seq": { + "type": "integer", + "minimum": 0, + "maximum": 9007199254740991 + }, + "type": { + "type": "string" + }, + "data": { + "type": "object", + "propertyNames": { + "type": "string" + }, + "additionalProperties": {} + } + }, + "required": ["id", "aggregateID", "seq", "type", "data"] + } + } + }, + "required": ["directory", "events"] + } + } + } + }, + "x-codeSamples": [ + { + "lang": "js", + "source": "import { createOpencodeClient } from \"@opencode-ai/sdk\n\nconst client = createOpencodeClient()\nawait client.sync.replay({\n ...\n})" + } + ] + } + }, + "/sync/history": { + "get": { + "operationId": "sync.history.list", + "parameters": [ + { + "in": "query", + "name": "directory", + "schema": { + "type": "string" + } + }, + { + "in": "query", + "name": "workspace", + "schema": { + "type": "string" + } + } + ], + "summary": "List sync events", + "description": "List sync events for all aggregates. Keys are aggregate IDs the client already knows about, values are the last known sequence ID. Events with seq > value are returned for those aggregates. Aggregates not listed in the input get their full history.", + "responses": { + "200": { + "description": "Sync events", + "content": { + "application/json": { + "schema": { + "type": "array", + "items": { + "type": "object", + "properties": { + "id": { + "type": "string" + }, + "aggregate_id": { + "type": "string" + }, + "seq": { + "type": "number" + }, + "type": { + "type": "string" + }, + "data": { + "type": "object", + "propertyNames": { + "type": "string" + }, + "additionalProperties": {} + } + }, + "required": ["id", "aggregate_id", "seq", "type", "data"] + } + } + } + } + }, + "400": { + "description": "Bad request", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/BadRequestError" + } + } + } + } + }, + "requestBody": { + "content": { + "application/json": { + "schema": { + "type": "object", + "propertyNames": { + "type": "string" + }, + "additionalProperties": { + "type": "integer", + "minimum": 0, + "maximum": 9007199254740991 + } + } + } + } + }, + "x-codeSamples": [ + { + "lang": "js", + "source": "import { createOpencodeClient } from \"@opencode-ai/sdk\n\nconst client = createOpencodeClient()\nawait client.sync.history.list({\n ...\n})" + } + ] + } + }, "/find": { "get": { "operationId": "find.text", @@ -8514,6 +8794,40 @@ }, "required": ["type", "properties"] }, + "Event.workspace.restore": { + "type": "object", + "properties": { + "type": { + "type": "string", + "const": "workspace.restore" + }, + "properties": { + "type": "object", + "properties": { + "workspaceID": { + "type": "string", + "pattern": "^wrk.*" + }, + "sessionID": { + "type": "string", + "pattern": "^ses.*" + }, + "total": { + "type": "integer", + "minimum": 0, + "maximum": 9007199254740991 + }, + "step": { + "type": "integer", + "minimum": 0, + "maximum": 9007199254740991 + } + }, + "required": ["workspaceID", "sessionID", "total", "step"] + } + }, + "required": ["type", "properties"] + }, "Event.workspace.status": { "type": "object", "properties": { @@ -10523,6 +10837,9 @@ { "$ref": "#/components/schemas/Event.workspace.failed" }, + { + "$ref": "#/components/schemas/Event.workspace.restore" + }, { "$ref": "#/components/schemas/Event.workspace.status" }, @@ -12780,6 +13097,9 @@ { "$ref": "#/components/schemas/Event.workspace.failed" }, + { + "$ref": "#/components/schemas/Event.workspace.restore" + }, { "$ref": "#/components/schemas/Event.workspace.status" },