From 074ef032eef2cb6a9a9b8dde5626ad5c0080d808 Mon Sep 17 00:00:00 2001 From: James Long Date: Wed, 15 Apr 2026 21:04:37 -0400 Subject: [PATCH] feat(core): add fence to make all methods strongly consistent when syncing (#22679) --- packages/opencode/src/bus/global.ts | 16 +-- packages/opencode/src/control-plane/util.ts | 37 +++++++ .../opencode/src/control-plane/workspace.ts | 103 ++++++++++++++++-- packages/opencode/src/flag/flag.ts | 4 +- packages/opencode/src/server/fence.ts | 81 ++++++++++++++ .../src/server/instance/middleware.ts | 9 +- packages/opencode/src/server/proxy.ts | 47 ++++++-- packages/opencode/src/server/server.ts | 18 +++ .../test/plugin/workspace-adaptor.test.ts | 13 ++- 9 files changed, 289 insertions(+), 39 deletions(-) create mode 100644 packages/opencode/src/control-plane/util.ts create mode 100644 packages/opencode/src/server/fence.ts diff --git a/packages/opencode/src/bus/global.ts b/packages/opencode/src/bus/global.ts index e751b59faf..b5392a81b9 100644 --- a/packages/opencode/src/bus/global.ts +++ b/packages/opencode/src/bus/global.ts @@ -1,12 +1,12 @@ import { EventEmitter } from "events" +export type GlobalEvent = { + directory?: string + project?: string + workspace?: string + payload: any +} + export const GlobalBus = new EventEmitter<{ - event: [ - { - directory?: string - project?: string - workspace?: string - payload: any - }, - ] + event: [GlobalEvent] }>() diff --git a/packages/opencode/src/control-plane/util.ts b/packages/opencode/src/control-plane/util.ts new file mode 100644 index 0000000000..023c2ae150 --- /dev/null +++ b/packages/opencode/src/control-plane/util.ts @@ -0,0 +1,37 @@ +import { GlobalBus, type GlobalEvent } from "@/bus/global" + +export function waitEvent(input: { timeout: number; signal?: AbortSignal; fn: (event: GlobalEvent) => boolean }) { + if (input.signal?.aborted) return Promise.reject(input.signal.reason ?? new Error("Request aborted")) + + return new Promise((resolve, reject) => { + const abort = () => { + cleanup() + reject(input.signal?.reason ?? new Error("Request aborted")) + } + + const handler = (event: GlobalEvent) => { + try { + if (!input.fn(event)) return + cleanup() + resolve() + } catch (error) { + cleanup() + reject(error) + } + } + + const cleanup = () => { + clearTimeout(timeout) + GlobalBus.off("event", handler) + input.signal?.removeEventListener("abort", abort) + } + + const timeout = setTimeout(() => { + cleanup() + reject(new Error("Timed out waiting for global event")) + }, input.timeout) + + GlobalBus.on("event", handler) + input.signal?.addEventListener("abort", abort, { once: true }) + }) +} diff --git a/packages/opencode/src/control-plane/workspace.ts b/packages/opencode/src/control-plane/workspace.ts index b9ac0a6b43..67583107fc 100644 --- a/packages/opencode/src/control-plane/workspace.ts +++ b/packages/opencode/src/control-plane/workspace.ts @@ -1,7 +1,7 @@ import z from "zod" import { setTimeout as sleep } from "node:timers/promises" import { fn } from "@/util/fn" -import { Database, asc, eq } from "@/storage/db" +import { Database, asc, eq, inArray } from "@/storage/db" import { Project } from "@/project/project" import { BusEvent } from "@/bus/bus-event" import { GlobalBus } from "@/bus/global" @@ -22,6 +22,8 @@ import { SessionTable } from "@/session/session.sql" import { SessionID } from "@/session/schema" import { errorData } from "@/util/error" import { AppRuntime } from "@/effect/app-runtime" +import { EventSequenceTable } from "@/sync/event.sql" +import { waitEvent } from "./util" export namespace Workspace { export const Info = WorkspaceInfo.meta({ @@ -114,6 +116,17 @@ export namespace Workspace { startSync(info) + await waitEvent({ + timeout: TIMEOUT, + fn(event) { + if (event.workspace === info.id && event.payload.type === Event.Status.type) { + const { status } = event.payload.properties + return status === "error" || status === "connected" + } + return false + }, + }) + return info }) @@ -285,10 +298,15 @@ export namespace Workspace { return spaces } - export const get = fn(WorkspaceID.zod, async (id) => { + function lookup(id: WorkspaceID) { const row = Database.use((db) => db.select().from(WorkspaceTable).where(eq(WorkspaceTable.id, id)).get()) if (!row) return - const space = fromRow(row) + return fromRow(row) + } + + export const get = fn(WorkspaceID.zod, async (id) => { + const space = lookup(id) + if (!space) return startSync(space) return space }) @@ -320,12 +338,18 @@ export namespace Workspace { const connections = new Map() const aborts = new Map() + const TIMEOUT = 5000 function setStatus(id: WorkspaceID, status: ConnectionStatus["status"], error?: string) { const prev = connections.get(id) if (prev?.status === status && prev?.error === error) return const next = { workspaceID: id, status, error } connections.set(id, next) + + if (status === "error") { + aborts.delete(id) + } + GlobalBus.emit("event", { directory: "global", workspace: id, @@ -340,6 +364,52 @@ export namespace Workspace { return [...connections.values()] } + function synced(state: Record) { + const ids = Object.keys(state) + if (ids.length === 0) return true + + const done = Object.fromEntries( + Database.use((db) => + db + .select({ + id: EventSequenceTable.aggregate_id, + seq: EventSequenceTable.seq, + }) + .from(EventSequenceTable) + .where(inArray(EventSequenceTable.aggregate_id, ids)) + .all(), + ).map((row) => [row.id, row.seq]), + ) as Record + + return ids.every((id) => { + return (done[id] ?? -1) >= state[id] + }) + } + + export async function isSyncing(workspaceID: WorkspaceID) { + return aborts.has(workspaceID) + } + + export async function waitForSync(workspaceID: WorkspaceID, state: Record, signal?: AbortSignal) { + if (synced(state)) return + + try { + await waitEvent({ + timeout: TIMEOUT, + signal, + fn(event) { + if (event.workspace !== workspaceID && event.payload.type !== "sync") { + return false + } + return synced(state) + }, + }) + } catch (error) { + if (signal?.aborted) throw signal.reason ?? new Error("Request aborted") + throw new Error(`Timed out waiting for sync fence: ${JSON.stringify(state)}`) + } + } + const log = Log.create({ service: "workspace-sync" }) function route(url: string | URL, path: string) { @@ -353,6 +423,7 @@ export namespace Workspace { async function syncWorkspace(space: Info, signal: AbortSignal) { while (!signal.aborted) { log.info("connecting to global sync", { workspace: space.name }) + setStatus(space.id, "connecting") const adaptor = await getAdaptor(space.projectID, space.type) const target = await adaptor.target(space) @@ -364,7 +435,7 @@ export namespace Workspace { headers: target.headers, signal, }).catch((err: unknown) => { - setStatus(space.id, "error") + setStatus(space.id, "error", err instanceof Error ? err.message : String(err)) log.info("failed to connect to global sync", { workspace: space.name, @@ -374,8 +445,9 @@ export namespace Workspace { }) if (!res || !res.ok || !res.body) { - log.info("failed to connect to global sync", { workspace: space.name }) - setStatus(space.id, "error") + const error = !res ? "No response from global sync" : `Global sync HTTP ${res.status}` + log.info("failed to connect to global sync", { workspace: space.name, error }) + setStatus(space.id, "error", error) await sleep(1000) continue } @@ -414,22 +486,29 @@ export namespace Workspace { } } - function startSync(space: Info) { + async function startSync(space: Info) { if (!Flag.OPENCODE_EXPERIMENTAL_WORKSPACES) return - if (space.type === "worktree") { - void Filesystem.exists(space.directory!).then((exists) => { + const adaptor = await getAdaptor(space.projectID, space.type) + const target = await adaptor.target(space) + + if (target.type === "local") { + void Filesystem.exists(target.directory).then((exists) => { setStatus(space.id, exists ? "connected" : "error", exists ? undefined : "directory does not exist") }) return } - if (aborts.has(space.id)) return - const abort = new AbortController() - aborts.set(space.id, abort) + if (aborts.has(space.id)) return true + setStatus(space.id, "disconnected") + const abort = new AbortController() + aborts.set(space.id, abort) + void syncWorkspace(space, abort.signal).catch((error) => { + aborts.delete(space.id) + setStatus(space.id, "error", String(error)) log.warn("workspace listener failed", { workspaceID: space.id, diff --git a/packages/opencode/src/flag/flag.ts b/packages/opencode/src/flag/flag.ts index f091fa02a9..a63f8d1c66 100644 --- a/packages/opencode/src/flag/flag.ts +++ b/packages/opencode/src/flag/flag.ts @@ -74,7 +74,6 @@ export namespace Flag { Config.withDefault(false), ) export const OPENCODE_EXPERIMENTAL_PLAN_MODE = OPENCODE_EXPERIMENTAL || truthy("OPENCODE_EXPERIMENTAL_PLAN_MODE") - export const OPENCODE_EXPERIMENTAL_WORKSPACES = OPENCODE_EXPERIMENTAL || truthy("OPENCODE_EXPERIMENTAL_WORKSPACES") export const OPENCODE_EXPERIMENTAL_MARKDOWN = !falsy("OPENCODE_EXPERIMENTAL_MARKDOWN") export const OPENCODE_MODELS_URL = process.env["OPENCODE_MODELS_URL"] export const OPENCODE_MODELS_PATH = process.env["OPENCODE_MODELS_PATH"] @@ -84,6 +83,9 @@ export namespace Flag { export const OPENCODE_SKIP_MIGRATIONS = truthy("OPENCODE_SKIP_MIGRATIONS") export const OPENCODE_STRICT_CONFIG_DEPS = truthy("OPENCODE_STRICT_CONFIG_DEPS") + export const OPENCODE_WORKSPACE_ID = process.env["OPENCODE_WORKSPACE_ID"] + export const OPENCODE_EXPERIMENTAL_WORKSPACES = OPENCODE_EXPERIMENTAL || truthy("OPENCODE_EXPERIMENTAL_WORKSPACES") + function number(key: string) { const value = process.env[key] if (!value) return undefined diff --git a/packages/opencode/src/server/fence.ts b/packages/opencode/src/server/fence.ts new file mode 100644 index 0000000000..bb41bd7a43 --- /dev/null +++ b/packages/opencode/src/server/fence.ts @@ -0,0 +1,81 @@ +import type { MiddlewareHandler } from "hono" +import { Database, inArray } from "@/storage/db" +import { EventSequenceTable } from "@/sync/event.sql" +import { Workspace } from "@/control-plane/workspace" +import type { WorkspaceID } from "@/control-plane/schema" +import { Log } from "@/util/log" + +const HEADER = "x-opencode-sync" +type State = Record +const log = Log.create({ service: "fence" }) + +export function load(ids?: string[]) { + const rows = Database.use((db) => { + if (!ids?.length) { + return db.select().from(EventSequenceTable).all() + } + + return db.select().from(EventSequenceTable).where(inArray(EventSequenceTable.aggregate_id, ids)).all() + }) + + return Object.fromEntries(rows.map((row) => [row.aggregate_id, row.seq])) as State +} + +export function diff(prev: State, next: State) { + const ids = new Set([...Object.keys(prev), ...Object.keys(next)]) + return Object.fromEntries( + [...ids] + .map((id) => [id, next[id] ?? -1] as const) + .filter(([id, seq]) => { + return (prev[id] ?? -1) !== seq + }), + ) as State +} + +export function parse(headers: Headers) { + const raw = headers.get(HEADER) + if (!raw) return + + let data + + try { + data = JSON.parse(raw) + } catch (err) { + return + } + + if (!data || typeof data !== "object") return + + return Object.fromEntries( + Object.entries(data).filter(([id, seq]) => { + return typeof id === "string" && Number.isInteger(seq) + }), + ) as State +} + +export async function wait(workspaceID: WorkspaceID, state: State, signal?: AbortSignal) { + log.info("waiting for state", { + workspaceID, + state, + }) + await Workspace.waitForSync(workspaceID, state, signal) + log.info("state fully synced", { + workspaceID, + state, + }) +} + +export const FenceMiddleware: MiddlewareHandler = async (c, next) => { + if (c.req.method === "GET" || c.req.method === "HEAD" || c.req.method === "OPTIONS") return next() + + const prev = load() + await next() + const current = diff(prev, load()) + + if (Object.keys(current).length > 0) { + log.info("header", { + diff: current, + }) + c.res.headers.set(HEADER, JSON.stringify(current)) + } +} diff --git a/packages/opencode/src/server/instance/middleware.ts b/packages/opencode/src/server/instance/middleware.ts index 549fb38d5d..0e29daa9ee 100644 --- a/packages/opencode/src/server/instance/middleware.ts +++ b/packages/opencode/src/server/instance/middleware.ts @@ -6,6 +6,7 @@ import { Workspace } from "@/control-plane/workspace" import { ServerProxy } from "../proxy" import { Instance } from "@/project/instance" import { InstanceBootstrap } from "@/project/bootstrap" +import { Flag } from "@/flag/flag" import { Session } from "@/session" import { SessionID } from "@/session/schema" import { WorkspaceContext } from "@/control-plane/workspace-context" @@ -68,10 +69,10 @@ export function WorkspaceRouterMiddleware(upgrade: UpgradeWebSocket): Middleware const sessionWorkspaceID = await getSessionWorkspace(url) const workspaceID = sessionWorkspaceID || url.searchParams.get("workspace") - if (!workspaceID || url.pathname.startsWith("/console") || OPENCODE_WORKSPACE) { - if (OPENCODE_WORKSPACE) { + if (!workspaceID || url.pathname.startsWith("/console") || Flag.OPENCODE_WORKSPACE_ID) { + if (Flag.OPENCODE_WORKSPACE_ID) { return WorkspaceContext.provide({ - workspaceID: WorkspaceID.make(OPENCODE_WORKSPACE), + workspaceID: WorkspaceID.make(Flag.OPENCODE_WORKSPACE_ID), async fn() { return Instance.provide({ directory, @@ -148,6 +149,6 @@ export function WorkspaceRouterMiddleware(upgrade: UpgradeWebSocket): Middleware headers.delete("x-opencode-workspace") const req = new Request(c.req.raw, { headers }) - return ServerProxy.http(proxyURL, target.headers, req) + return ServerProxy.http(proxyURL, target.headers, req, workspace.id) } } diff --git a/packages/opencode/src/server/proxy.ts b/packages/opencode/src/server/proxy.ts index 0c0deba20c..5effa5d05f 100644 --- a/packages/opencode/src/server/proxy.ts +++ b/packages/opencode/src/server/proxy.ts @@ -1,6 +1,9 @@ import { Hono } from "hono" import type { UpgradeWebSocket } from "hono/ws" import { Log } from "@/util/log" +import * as Fence from "./fence" +import type { WorkspaceID } from "@/control-plane/schema" +import { Workspace } from "@/control-plane/workspace" const hop = new Set([ "connection", @@ -101,12 +104,27 @@ const app = (upgrade: UpgradeWebSocket) => export namespace ServerProxy { const log = Log.Default.clone().tag("service", "server-proxy") - export function http(url: string | URL, extra: HeadersInit | undefined, req: Request) { + export async function http( + url: string | URL, + extra: HeadersInit | undefined, + req: Request, + workspaceID: WorkspaceID, + ) { console.log("proxy http request", { method: req.method, request: req.url, url: String(url), }) + + if (!Workspace.isSyncing(workspaceID)) { + return new Response(`broken sync connection for workspace: ${workspaceID}`, { + status: 503, + headers: { + "content-type": "text/plain; charset=utf-8", + }, + }) + } + return fetch( new Request(url, { method: req.method, @@ -116,21 +134,26 @@ export namespace ServerProxy { signal: req.signal, }), ).then((res) => { + const sync = Fence.parse(res.headers) const next = new Headers(res.headers) next.delete("content-encoding") next.delete("content-length") - console.log("proxy http response", { - method: req.method, - request: req.url, - url: String(url), - status: res.status, - statusText: res.statusText, - }) - return new Response(res.body, { - status: res.status, - statusText: res.statusText, - headers: next, + const done = sync ? Fence.wait(workspaceID, sync, req.signal) : Promise.resolve() + + return done.then(async () => { + console.log("proxy http response", { + method: req.method, + request: req.url, + url: String(url), + status: res.status, + statusText: res.statusText, + }) + return new Response(res.body, { + status: res.status, + statusText: res.statusText, + headers: next, + }) }) }) } diff --git a/packages/opencode/src/server/server.ts b/packages/opencode/src/server/server.ts index 02ec7356ec..c6c37ee438 100644 --- a/packages/opencode/src/server/server.ts +++ b/packages/opencode/src/server/server.ts @@ -4,9 +4,11 @@ import { adapter } from "#hono" import { MDNS } from "./mdns" import { lazy } from "@/util/lazy" import { AuthMiddleware, CompressionMiddleware, CorsMiddleware, ErrorMiddleware, LoggerMiddleware } from "./middleware" +import { FenceMiddleware } from "./fence" import { InstanceRoutes } from "./instance" import { initProjectors } from "./projectors" import { Log } from "@/util/log" +import { Flag } from "@/flag/flag" import { ControlPlaneRoutes } from "./control" import { UIRoutes } from "./ui" @@ -30,6 +32,22 @@ export namespace Server { function create(opts: { cors?: string[] }) { const app = new Hono() const runtime = adapter.create(app) + + if (Flag.OPENCODE_WORKSPACE_ID) { + return { + app: app + .onError(ErrorMiddleware) + .use(AuthMiddleware) + .use(LoggerMiddleware) + .use(CompressionMiddleware) + .use(CorsMiddleware(opts)) + .use(FenceMiddleware) + .route("/", ControlPlaneRoutes()) + .route("/", InstanceRoutes(runtime.upgradeWebSocket)), + runtime, + } + } + return { app: app .onError(ErrorMiddleware) diff --git a/packages/opencode/test/plugin/workspace-adaptor.test.ts b/packages/opencode/test/plugin/workspace-adaptor.test.ts index 669a822a2f..ff8df7490d 100644 --- a/packages/opencode/test/plugin/workspace-adaptor.test.ts +++ b/packages/opencode/test/plugin/workspace-adaptor.test.ts @@ -7,10 +7,16 @@ import { tmpdir } from "../fixture/fixture" const disableDefault = process.env.OPENCODE_DISABLE_DEFAULT_PLUGINS process.env.OPENCODE_DISABLE_DEFAULT_PLUGINS = "1" +const { Flag } = await import("../../src/flag/flag") const { Plugin } = await import("../../src/plugin/index") const { Workspace } = await import("../../src/control-plane/workspace") const { Instance } = await import("../../src/project/instance") +const experimental = Flag.OPENCODE_EXPERIMENTAL_WORKSPACES + +// @ts-expect-error tests override the flag directly +Flag.OPENCODE_EXPERIMENTAL_WORKSPACES = true + afterEach(async () => { await Instance.disposeAll() }) @@ -18,9 +24,12 @@ afterEach(async () => { afterAll(() => { if (disableDefault === undefined) { delete process.env.OPENCODE_DISABLE_DEFAULT_PLUGINS - return + } else { + process.env.OPENCODE_DISABLE_DEFAULT_PLUGINS = disableDefault } - process.env.OPENCODE_DISABLE_DEFAULT_PLUGINS = disableDefault + + // @ts-expect-error restore original test flag value + Flag.OPENCODE_EXPERIMENTAL_WORKSPACES = experimental }) describe("plugin.workspace", () => {