feat(core): add fence to make all methods strongly consistent when syncing (#22679)

This commit is contained in:
James Long
2026-04-15 21:04:37 -04:00
committed by GitHub
parent 4ca809ef4e
commit 074ef032ee
9 changed files with 289 additions and 39 deletions

View File

@@ -1,12 +1,12 @@
import { EventEmitter } from "events"
export type GlobalEvent = {
directory?: string
project?: string
workspace?: string
payload: any
}
export const GlobalBus = new EventEmitter<{
event: [
{
directory?: string
project?: string
workspace?: string
payload: any
},
]
event: [GlobalEvent]
}>()

View File

@@ -0,0 +1,37 @@
import { GlobalBus, type GlobalEvent } from "@/bus/global"
export function waitEvent(input: { timeout: number; signal?: AbortSignal; fn: (event: GlobalEvent) => boolean }) {
if (input.signal?.aborted) return Promise.reject(input.signal.reason ?? new Error("Request aborted"))
return new Promise<void>((resolve, reject) => {
const abort = () => {
cleanup()
reject(input.signal?.reason ?? new Error("Request aborted"))
}
const handler = (event: GlobalEvent) => {
try {
if (!input.fn(event)) return
cleanup()
resolve()
} catch (error) {
cleanup()
reject(error)
}
}
const cleanup = () => {
clearTimeout(timeout)
GlobalBus.off("event", handler)
input.signal?.removeEventListener("abort", abort)
}
const timeout = setTimeout(() => {
cleanup()
reject(new Error("Timed out waiting for global event"))
}, input.timeout)
GlobalBus.on("event", handler)
input.signal?.addEventListener("abort", abort, { once: true })
})
}

View File

@@ -1,7 +1,7 @@
import z from "zod"
import { setTimeout as sleep } from "node:timers/promises"
import { fn } from "@/util/fn"
import { Database, asc, eq } from "@/storage/db"
import { Database, asc, eq, inArray } from "@/storage/db"
import { Project } from "@/project/project"
import { BusEvent } from "@/bus/bus-event"
import { GlobalBus } from "@/bus/global"
@@ -22,6 +22,8 @@ import { SessionTable } from "@/session/session.sql"
import { SessionID } from "@/session/schema"
import { errorData } from "@/util/error"
import { AppRuntime } from "@/effect/app-runtime"
import { EventSequenceTable } from "@/sync/event.sql"
import { waitEvent } from "./util"
export namespace Workspace {
export const Info = WorkspaceInfo.meta({
@@ -114,6 +116,17 @@ export namespace Workspace {
startSync(info)
await waitEvent({
timeout: TIMEOUT,
fn(event) {
if (event.workspace === info.id && event.payload.type === Event.Status.type) {
const { status } = event.payload.properties
return status === "error" || status === "connected"
}
return false
},
})
return info
})
@@ -285,10 +298,15 @@ export namespace Workspace {
return spaces
}
export const get = fn(WorkspaceID.zod, async (id) => {
function lookup(id: WorkspaceID) {
const row = Database.use((db) => db.select().from(WorkspaceTable).where(eq(WorkspaceTable.id, id)).get())
if (!row) return
const space = fromRow(row)
return fromRow(row)
}
export const get = fn(WorkspaceID.zod, async (id) => {
const space = lookup(id)
if (!space) return
startSync(space)
return space
})
@@ -320,12 +338,18 @@ export namespace Workspace {
const connections = new Map<WorkspaceID, ConnectionStatus>()
const aborts = new Map<WorkspaceID, AbortController>()
const TIMEOUT = 5000
function setStatus(id: WorkspaceID, status: ConnectionStatus["status"], error?: string) {
const prev = connections.get(id)
if (prev?.status === status && prev?.error === error) return
const next = { workspaceID: id, status, error }
connections.set(id, next)
if (status === "error") {
aborts.delete(id)
}
GlobalBus.emit("event", {
directory: "global",
workspace: id,
@@ -340,6 +364,52 @@ export namespace Workspace {
return [...connections.values()]
}
function synced(state: Record<string, number>) {
const ids = Object.keys(state)
if (ids.length === 0) return true
const done = Object.fromEntries(
Database.use((db) =>
db
.select({
id: EventSequenceTable.aggregate_id,
seq: EventSequenceTable.seq,
})
.from(EventSequenceTable)
.where(inArray(EventSequenceTable.aggregate_id, ids))
.all(),
).map((row) => [row.id, row.seq]),
) as Record<string, number>
return ids.every((id) => {
return (done[id] ?? -1) >= state[id]
})
}
export async function isSyncing(workspaceID: WorkspaceID) {
return aborts.has(workspaceID)
}
export async function waitForSync(workspaceID: WorkspaceID, state: Record<string, number>, signal?: AbortSignal) {
if (synced(state)) return
try {
await waitEvent({
timeout: TIMEOUT,
signal,
fn(event) {
if (event.workspace !== workspaceID && event.payload.type !== "sync") {
return false
}
return synced(state)
},
})
} catch (error) {
if (signal?.aborted) throw signal.reason ?? new Error("Request aborted")
throw new Error(`Timed out waiting for sync fence: ${JSON.stringify(state)}`)
}
}
const log = Log.create({ service: "workspace-sync" })
function route(url: string | URL, path: string) {
@@ -353,6 +423,7 @@ export namespace Workspace {
async function syncWorkspace(space: Info, signal: AbortSignal) {
while (!signal.aborted) {
log.info("connecting to global sync", { workspace: space.name })
setStatus(space.id, "connecting")
const adaptor = await getAdaptor(space.projectID, space.type)
const target = await adaptor.target(space)
@@ -364,7 +435,7 @@ export namespace Workspace {
headers: target.headers,
signal,
}).catch((err: unknown) => {
setStatus(space.id, "error")
setStatus(space.id, "error", err instanceof Error ? err.message : String(err))
log.info("failed to connect to global sync", {
workspace: space.name,
@@ -374,8 +445,9 @@ export namespace Workspace {
})
if (!res || !res.ok || !res.body) {
log.info("failed to connect to global sync", { workspace: space.name })
setStatus(space.id, "error")
const error = !res ? "No response from global sync" : `Global sync HTTP ${res.status}`
log.info("failed to connect to global sync", { workspace: space.name, error })
setStatus(space.id, "error", error)
await sleep(1000)
continue
}
@@ -414,22 +486,29 @@ export namespace Workspace {
}
}
function startSync(space: Info) {
async function startSync(space: Info) {
if (!Flag.OPENCODE_EXPERIMENTAL_WORKSPACES) return
if (space.type === "worktree") {
void Filesystem.exists(space.directory!).then((exists) => {
const adaptor = await getAdaptor(space.projectID, space.type)
const target = await adaptor.target(space)
if (target.type === "local") {
void Filesystem.exists(target.directory).then((exists) => {
setStatus(space.id, exists ? "connected" : "error", exists ? undefined : "directory does not exist")
})
return
}
if (aborts.has(space.id)) return
const abort = new AbortController()
aborts.set(space.id, abort)
if (aborts.has(space.id)) return true
setStatus(space.id, "disconnected")
const abort = new AbortController()
aborts.set(space.id, abort)
void syncWorkspace(space, abort.signal).catch((error) => {
aborts.delete(space.id)
setStatus(space.id, "error", String(error))
log.warn("workspace listener failed", {
workspaceID: space.id,

View File

@@ -74,7 +74,6 @@ export namespace Flag {
Config.withDefault(false),
)
export const OPENCODE_EXPERIMENTAL_PLAN_MODE = OPENCODE_EXPERIMENTAL || truthy("OPENCODE_EXPERIMENTAL_PLAN_MODE")
export const OPENCODE_EXPERIMENTAL_WORKSPACES = OPENCODE_EXPERIMENTAL || truthy("OPENCODE_EXPERIMENTAL_WORKSPACES")
export const OPENCODE_EXPERIMENTAL_MARKDOWN = !falsy("OPENCODE_EXPERIMENTAL_MARKDOWN")
export const OPENCODE_MODELS_URL = process.env["OPENCODE_MODELS_URL"]
export const OPENCODE_MODELS_PATH = process.env["OPENCODE_MODELS_PATH"]
@@ -84,6 +83,9 @@ export namespace Flag {
export const OPENCODE_SKIP_MIGRATIONS = truthy("OPENCODE_SKIP_MIGRATIONS")
export const OPENCODE_STRICT_CONFIG_DEPS = truthy("OPENCODE_STRICT_CONFIG_DEPS")
export const OPENCODE_WORKSPACE_ID = process.env["OPENCODE_WORKSPACE_ID"]
export const OPENCODE_EXPERIMENTAL_WORKSPACES = OPENCODE_EXPERIMENTAL || truthy("OPENCODE_EXPERIMENTAL_WORKSPACES")
function number(key: string) {
const value = process.env[key]
if (!value) return undefined

View File

@@ -0,0 +1,81 @@
import type { MiddlewareHandler } from "hono"
import { Database, inArray } from "@/storage/db"
import { EventSequenceTable } from "@/sync/event.sql"
import { Workspace } from "@/control-plane/workspace"
import type { WorkspaceID } from "@/control-plane/schema"
import { Log } from "@/util/log"
const HEADER = "x-opencode-sync"
type State = Record<string, number>
const log = Log.create({ service: "fence" })
export function load(ids?: string[]) {
const rows = Database.use((db) => {
if (!ids?.length) {
return db.select().from(EventSequenceTable).all()
}
return db.select().from(EventSequenceTable).where(inArray(EventSequenceTable.aggregate_id, ids)).all()
})
return Object.fromEntries(rows.map((row) => [row.aggregate_id, row.seq])) as State
}
export function diff(prev: State, next: State) {
const ids = new Set([...Object.keys(prev), ...Object.keys(next)])
return Object.fromEntries(
[...ids]
.map((id) => [id, next[id] ?? -1] as const)
.filter(([id, seq]) => {
return (prev[id] ?? -1) !== seq
}),
) as State
}
export function parse(headers: Headers) {
const raw = headers.get(HEADER)
if (!raw) return
let data
try {
data = JSON.parse(raw)
} catch (err) {
return
}
if (!data || typeof data !== "object") return
return Object.fromEntries(
Object.entries(data).filter(([id, seq]) => {
return typeof id === "string" && Number.isInteger(seq)
}),
) as State
}
export async function wait(workspaceID: WorkspaceID, state: State, signal?: AbortSignal) {
log.info("waiting for state", {
workspaceID,
state,
})
await Workspace.waitForSync(workspaceID, state, signal)
log.info("state fully synced", {
workspaceID,
state,
})
}
export const FenceMiddleware: MiddlewareHandler = async (c, next) => {
if (c.req.method === "GET" || c.req.method === "HEAD" || c.req.method === "OPTIONS") return next()
const prev = load()
await next()
const current = diff(prev, load())
if (Object.keys(current).length > 0) {
log.info("header", {
diff: current,
})
c.res.headers.set(HEADER, JSON.stringify(current))
}
}

View File

@@ -6,6 +6,7 @@ import { Workspace } from "@/control-plane/workspace"
import { ServerProxy } from "../proxy"
import { Instance } from "@/project/instance"
import { InstanceBootstrap } from "@/project/bootstrap"
import { Flag } from "@/flag/flag"
import { Session } from "@/session"
import { SessionID } from "@/session/schema"
import { WorkspaceContext } from "@/control-plane/workspace-context"
@@ -68,10 +69,10 @@ export function WorkspaceRouterMiddleware(upgrade: UpgradeWebSocket): Middleware
const sessionWorkspaceID = await getSessionWorkspace(url)
const workspaceID = sessionWorkspaceID || url.searchParams.get("workspace")
if (!workspaceID || url.pathname.startsWith("/console") || OPENCODE_WORKSPACE) {
if (OPENCODE_WORKSPACE) {
if (!workspaceID || url.pathname.startsWith("/console") || Flag.OPENCODE_WORKSPACE_ID) {
if (Flag.OPENCODE_WORKSPACE_ID) {
return WorkspaceContext.provide({
workspaceID: WorkspaceID.make(OPENCODE_WORKSPACE),
workspaceID: WorkspaceID.make(Flag.OPENCODE_WORKSPACE_ID),
async fn() {
return Instance.provide({
directory,
@@ -148,6 +149,6 @@ export function WorkspaceRouterMiddleware(upgrade: UpgradeWebSocket): Middleware
headers.delete("x-opencode-workspace")
const req = new Request(c.req.raw, { headers })
return ServerProxy.http(proxyURL, target.headers, req)
return ServerProxy.http(proxyURL, target.headers, req, workspace.id)
}
}

View File

@@ -1,6 +1,9 @@
import { Hono } from "hono"
import type { UpgradeWebSocket } from "hono/ws"
import { Log } from "@/util/log"
import * as Fence from "./fence"
import type { WorkspaceID } from "@/control-plane/schema"
import { Workspace } from "@/control-plane/workspace"
const hop = new Set([
"connection",
@@ -101,12 +104,27 @@ const app = (upgrade: UpgradeWebSocket) =>
export namespace ServerProxy {
const log = Log.Default.clone().tag("service", "server-proxy")
export function http(url: string | URL, extra: HeadersInit | undefined, req: Request) {
export async function http(
url: string | URL,
extra: HeadersInit | undefined,
req: Request,
workspaceID: WorkspaceID,
) {
console.log("proxy http request", {
method: req.method,
request: req.url,
url: String(url),
})
if (!Workspace.isSyncing(workspaceID)) {
return new Response(`broken sync connection for workspace: ${workspaceID}`, {
status: 503,
headers: {
"content-type": "text/plain; charset=utf-8",
},
})
}
return fetch(
new Request(url, {
method: req.method,
@@ -116,21 +134,26 @@ export namespace ServerProxy {
signal: req.signal,
}),
).then((res) => {
const sync = Fence.parse(res.headers)
const next = new Headers(res.headers)
next.delete("content-encoding")
next.delete("content-length")
console.log("proxy http response", {
method: req.method,
request: req.url,
url: String(url),
status: res.status,
statusText: res.statusText,
})
return new Response(res.body, {
status: res.status,
statusText: res.statusText,
headers: next,
const done = sync ? Fence.wait(workspaceID, sync, req.signal) : Promise.resolve()
return done.then(async () => {
console.log("proxy http response", {
method: req.method,
request: req.url,
url: String(url),
status: res.status,
statusText: res.statusText,
})
return new Response(res.body, {
status: res.status,
statusText: res.statusText,
headers: next,
})
})
})
}

View File

@@ -4,9 +4,11 @@ import { adapter } from "#hono"
import { MDNS } from "./mdns"
import { lazy } from "@/util/lazy"
import { AuthMiddleware, CompressionMiddleware, CorsMiddleware, ErrorMiddleware, LoggerMiddleware } from "./middleware"
import { FenceMiddleware } from "./fence"
import { InstanceRoutes } from "./instance"
import { initProjectors } from "./projectors"
import { Log } from "@/util/log"
import { Flag } from "@/flag/flag"
import { ControlPlaneRoutes } from "./control"
import { UIRoutes } from "./ui"
@@ -30,6 +32,22 @@ export namespace Server {
function create(opts: { cors?: string[] }) {
const app = new Hono()
const runtime = adapter.create(app)
if (Flag.OPENCODE_WORKSPACE_ID) {
return {
app: app
.onError(ErrorMiddleware)
.use(AuthMiddleware)
.use(LoggerMiddleware)
.use(CompressionMiddleware)
.use(CorsMiddleware(opts))
.use(FenceMiddleware)
.route("/", ControlPlaneRoutes())
.route("/", InstanceRoutes(runtime.upgradeWebSocket)),
runtime,
}
}
return {
app: app
.onError(ErrorMiddleware)

View File

@@ -7,10 +7,16 @@ import { tmpdir } from "../fixture/fixture"
const disableDefault = process.env.OPENCODE_DISABLE_DEFAULT_PLUGINS
process.env.OPENCODE_DISABLE_DEFAULT_PLUGINS = "1"
const { Flag } = await import("../../src/flag/flag")
const { Plugin } = await import("../../src/plugin/index")
const { Workspace } = await import("../../src/control-plane/workspace")
const { Instance } = await import("../../src/project/instance")
const experimental = Flag.OPENCODE_EXPERIMENTAL_WORKSPACES
// @ts-expect-error tests override the flag directly
Flag.OPENCODE_EXPERIMENTAL_WORKSPACES = true
afterEach(async () => {
await Instance.disposeAll()
})
@@ -18,9 +24,12 @@ afterEach(async () => {
afterAll(() => {
if (disableDefault === undefined) {
delete process.env.OPENCODE_DISABLE_DEFAULT_PLUGINS
return
} else {
process.env.OPENCODE_DISABLE_DEFAULT_PLUGINS = disableDefault
}
process.env.OPENCODE_DISABLE_DEFAULT_PLUGINS = disableDefault
// @ts-expect-error restore original test flag value
Flag.OPENCODE_EXPERIMENTAL_WORKSPACES = experimental
})
describe("plugin.workspace", () => {