Compare commits

...

2 Commits

Author SHA1 Message Date
Kit Langton
adc5f528af feat(server): wire Server.listen() through native HttpApi listener (kill-switch)
When the effect-httpapi backend is selected, Server.listen() now delegates
to HttpApiListener.listen() — a native Bun.serve listener with inline
WebSocket upgrade handling — instead of routing through the Hono runtime
adapter (Hono.fetch + createBunWebSocket).

The Hono backend path is unchanged, and a kill-switch env var
(OPENCODE_HTTPAPI_LEGACY_LISTENER) forces the effect-httpapi backend back
through the Hono adapter as an escape hatch if the native listener
regresses for a user.

This unblocks the Hono deletion arc by giving the native listener real
production traffic on dev/beta/local channels (where
OPENCODE_EXPERIMENTAL_HTTPAPI defaults on) while leaving prod/latest
channels on the Hono path.
2026-05-03 09:24:18 -04:00
Kit Langton
39288a6953 feat(httpapi-listener): workspace-proxy WS forwarding
Bridge workspace-proxy WebSocket upgrades inside the Bun.serve listener so
the Hono path's WorkspaceRouterMiddleware → ServerProxy.websocket flow has
a native equivalent. The HttpApi handler still owns HTTP; the listener now
also resolves the workspace target inline (?workspace=… or session lookup),
upgrades the client connection, and bridges it to a remote WebSocket with
queueing, subprotocol forwarding, and close-code propagation.

This unblocks flipping Server.listen() over to the new listener but does
not flip it — the Hono path remains canonical. Bun-only; node:http + ws
adapter is a follow-up (TODO inline).
2026-05-03 09:18:28 -04:00
4 changed files with 334 additions and 22 deletions

View File

@@ -94,6 +94,12 @@ export const Flag = {
OPENCODE_EXPERIMENTAL_HTTPAPI:
truthy("OPENCODE_EXPERIMENTAL_HTTPAPI") ||
(!falsy("OPENCODE_EXPERIMENTAL_HTTPAPI") && HTTPAPI_DEFAULT_ON_CHANNELS.has(InstallationChannel)),
// Kill-switch that forces the effect-httpapi backend back through the legacy
// hono runtime adapter (Hono.fetch + createBunWebSocket) instead of the
// native Bun.serve listener. Defaults to false; set to "true"/"1" to revert
// if the native listener regresses for a user. Has no effect when the hono
// backend is selected.
OPENCODE_HTTPAPI_LEGACY_LISTENER: truthy("OPENCODE_HTTPAPI_LEGACY_LISTENER"),
OPENCODE_EXPERIMENTAL_WORKSPACES: OPENCODE_EXPERIMENTAL || truthy("OPENCODE_EXPERIMENTAL_WORKSPACES"),
OPENCODE_EXPERIMENTAL_EVENT_SYSTEM: OPENCODE_EXPERIMENTAL || truthy("OPENCODE_EXPERIMENTAL_EVENT_SYSTEM"),

View File

@@ -8,6 +8,7 @@
import type { ServerWebSocket } from "bun"
import { Effect, Schema } from "effect"
import { Flag } from "@opencode-ai/core/flag/flag"
import { AppRuntime } from "@/effect/app-runtime"
import { WithInstance } from "@/project/with-instance"
import { Pty } from "@/pty"
@@ -15,8 +16,14 @@ import { handlePtyInput } from "@/pty/input"
import { PtyID } from "@/pty/schema"
import { PtyPaths } from "@/server/routes/instance/httpapi/groups/pty"
import { ExperimentalHttpApiServer } from "@/server/routes/instance/httpapi/server"
import { getAdapter } from "@/control-plane/adapters"
import { WorkspaceID } from "@/control-plane/schema"
import { Workspace } from "@/control-plane/workspace"
import { Session } from "@/session/session"
import * as Log from "@opencode-ai/core/util/log"
import type { CorsOptions } from "./cors"
import { ProxyUtil } from "./proxy-util"
import { getWorkspaceRouteSessionID, isLocalWorkspaceRoute, workspaceProxyURL } from "./shared/workspace-routing"
const log = Log.create({ service: "httpapi-listener" })
const decodePtyID = Schema.decodeUnknownSync(PtyID)
@@ -33,7 +40,9 @@ export type ListenOptions = CorsOptions & {
hostname: string
}
type WsKind = { kind: "pty"; ptyID: string; cursor: number | undefined; directory: string }
type WsKind =
| { kind: "pty"; ptyID: string; cursor: number | undefined; directory: string }
| { kind: "proxy"; remoteURL: string; subprotocols: string[] }
type PtyHandler = {
onMessage: (message: string | ArrayBuffer) => void
@@ -41,10 +50,14 @@ type PtyHandler = {
}
type WsState = WsKind & {
// pty fields
handler?: PtyHandler
pending: Array<string | Uint8Array>
ready: boolean
closed: boolean
// proxy fields
remote?: WebSocket
proxyQueue?: Array<string | Uint8Array | ArrayBuffer>
}
// Derive from the OpenAPI path so this stays in sync if the route literal moves.
@@ -57,6 +70,71 @@ function parseCursor(value: string | null): number | undefined {
return parsed
}
function openProxy(ws: ServerWebSocket<WsState>) {
const data = ws.data
if (data.kind !== "proxy") return
let remote: WebSocket
try {
remote = new WebSocket(data.remoteURL, data.subprotocols.length ? data.subprotocols : undefined)
} catch (err) {
log.error("proxy remote WebSocket construct failed", { error: err })
ws.close(1011, "proxy connect failed")
return
}
remote.binaryType = "arraybuffer"
data.remote = remote
remote.onopen = () => {
const queue = data.proxyQueue
if (queue) {
for (const item of queue) {
try {
remote.send(item as never)
} catch {
// ignore — close handlers will clean up
}
}
queue.length = 0
}
}
remote.onmessage = (event: MessageEvent) => {
try {
const payload = event.data
if (typeof payload === "string") {
ws.send(payload)
} else if (payload instanceof ArrayBuffer) {
ws.send(new Uint8Array(payload))
} else if (payload instanceof Uint8Array) {
ws.send(payload)
} else if (payload instanceof Blob) {
void payload.arrayBuffer().then((buf) => {
try {
ws.send(new Uint8Array(buf))
} catch {
// ignore
}
})
}
} catch {
// ignore — socket likely closed
}
}
remote.onerror = () => {
try {
ws.close(1011, "proxy error")
} catch {
// ignore
}
}
remote.onclose = (event: CloseEvent) => {
try {
ws.close(event.code, event.reason)
} catch {
// ignore
}
}
}
function asAdapter(ws: ServerWebSocket<WsState>) {
return {
get readyState() {
@@ -80,6 +158,55 @@ function asAdapter(ws: ServerWebSocket<WsState>) {
}
}
async function resolveWorkspaceProxy(
request: Request,
url: URL,
): Promise<{ remoteURL: URL; subprotocols: string[] } | undefined> {
// Skip proxy resolution entirely when this process is pinned to a single
// workspace (the Hono path's WorkspaceRouterMiddleware uses the same guard).
if (Flag.OPENCODE_WORKSPACE_ID) return undefined
// Local-only routes (e.g. /experimental/workspace, GET /session) never
// forward — match the Hono behavior even though those routes don't currently
// upgrade to WS.
if (isLocalWorkspaceRoute(request.method, url.pathname)) return undefined
// /console paths are served locally and never proxied.
if (url.pathname.startsWith("/console")) return undefined
let workspaceID: string | null = null
// Prefer session-derived workspace lookup when a session ID is present in
// the path; fall back to the explicit ?workspace=... query parameter.
const sessionID = getWorkspaceRouteSessionID(url)
if (sessionID) {
const session = await AppRuntime.runPromise(
Session.Service.use((svc) => svc.get(sessionID)).pipe(Effect.withSpan("HttpApiListener.proxy.session")),
).catch(() => undefined)
if (session?.workspaceID) workspaceID = session.workspaceID
}
if (!workspaceID) workspaceID = url.searchParams.get("workspace")
if (!workspaceID) return undefined
const workspace = await AppRuntime.runPromise(
Workspace.Service.use((svc) => svc.get(WorkspaceID.make(workspaceID))).pipe(
Effect.withSpan("HttpApiListener.proxy.workspace"),
),
).catch(() => undefined)
if (!workspace) return undefined
const adapter = getAdapter(workspace.projectID, workspace.type)
const target = await adapter.target(workspace)
if (target.type !== "remote") return undefined
const proxyURL = workspaceProxyURL(target.url, url)
const remoteURL = new URL(ProxyUtil.websocketTargetURL(proxyURL))
return {
remoteURL,
subprotocols: ProxyUtil.websocketProtocols(request),
}
}
/**
* Spin up a native Bun.serve that:
* 1. Routes all HTTP traffic through the HttpApi web handler.
@@ -98,10 +225,11 @@ export async function listen(opts: ListenOptions): Promise<Listener> {
hostname: opts.hostname,
port,
idleTimeout: 0,
fetch(request, server) {
async fetch(request, server) {
const url = new URL(request.url)
const isUpgrade = request.headers.get("upgrade")?.toLowerCase() === "websocket"
const ptyMatch = url.pathname.match(ptyConnectPattern)
if (ptyMatch && request.headers.get("upgrade")?.toLowerCase() === "websocket") {
if (ptyMatch && isUpgrade) {
const ptyID = ptyMatch[1]!
const cursor = parseCursor(url.searchParams.get("cursor"))
// Resolve the instance directory the same way the HttpApi
@@ -124,20 +252,50 @@ export async function listen(opts: ListenOptions): Promise<Listener> {
return new Response("upgrade failed", { status: 400 })
}
// TODO: workspace-proxy WS upgrade detection. The Hono path forwards via a
// remote `new WebSocket(url, ...)` (see ServerProxy.websocket). To support
// that here we'd need to (a) resolve the workspace target the same way
// `WorkspaceRouterMiddleware` does today, then (b) `server.upgrade(request,
// { data: { kind: "proxy", target, headers, protocols } })` and bridge the
// ServerWebSocket to a remote WebSocket inside the `websocket` handlers.
// Deferred to a follow-up — the proxy story needs more design (auth header
// forwarding, fence sync, reconnection semantics) than fits this PR.
// Workspace-proxy WS forwarding. Mirrors the Hono path's
// `WorkspaceRouterMiddleware` → `ServerProxy.websocket` flow but inline.
// Bridging to the remote `new WebSocket(...)` happens inside the
// `websocket.open` handler below.
//
// TODO: Node adapter (no Bun.serve) needs an equivalent path using
// `node:http` + `ws`.
if (isUpgrade) {
try {
const proxy = await resolveWorkspaceProxy(request, url)
if (proxy) {
log.info("workspace-proxy websocket", {
request: url.toString(),
remote: proxy.remoteURL.toString(),
})
const upgraded = server.upgrade(request, {
data: {
kind: "proxy",
remoteURL: proxy.remoteURL.toString(),
subprotocols: proxy.subprotocols,
pending: [],
ready: false,
closed: false,
proxyQueue: [],
} satisfies WsState,
})
if (upgraded) return undefined
return new Response("upgrade failed", { status: 400 })
}
} catch (err) {
log.error("workspace-proxy ws resolve failed", { error: err })
return new Response("workspace lookup failed", { status: 500 })
}
}
return handler(request as Request, context as never)
},
websocket: {
open(ws) {
const data = ws.data
if (data.kind === "proxy") {
openProxy(ws)
return
}
if (data.kind !== "pty") {
ws.close(1011, "unknown ws kind")
return
@@ -187,6 +345,25 @@ export async function listen(opts: ListenOptions): Promise<Listener> {
},
message(ws, message) {
const data = ws.data
if (data.kind === "proxy") {
const payload =
typeof message === "string"
? message
: message instanceof Buffer
? new Uint8Array(message.buffer, message.byteOffset, message.byteLength)
: (message as Uint8Array)
const remote = data.remote
if (remote && remote.readyState === WebSocket.OPEN) {
try {
remote.send(payload)
} catch {
// ignore send errors; lifecycle handlers will tear things down
}
return
}
data.proxyQueue?.push(payload)
return
}
if (data.kind !== "pty") return
const payload =
typeof message === "string"
@@ -200,9 +377,17 @@ export async function listen(opts: ListenOptions): Promise<Listener> {
}
AppRuntime.runPromise(handlePtyInput(data.handler, payload)).catch(() => undefined)
},
close(ws) {
close(ws, code, reason) {
const data = ws.data
data.closed = true
if (data.kind === "proxy") {
try {
data.remote?.close(code, reason)
} catch {
// ignore
}
return
}
data.handler?.onClose()
},
},

View File

@@ -19,6 +19,7 @@ import { InstanceMiddleware } from "./routes/instance/middleware"
import { WorkspaceRoutes } from "./routes/control/workspace"
import { ExperimentalHttpApiServer } from "./routes/instance/httpapi/server"
import { PublicApi } from "./routes/instance/httpapi/public"
import { HttpApiListener } from "./httpapi-listener"
import * as ServerBackend from "./backend"
import type { CorsOptions } from "./cors"
@@ -182,35 +183,53 @@ export async function openapiHono() {
export let url: URL
export async function listen(opts: ListenOptions): Promise<Listener> {
const built = create(opts)
const server = await built.runtime.listen(opts)
const selected = select()
const native = selected.backend === "effect-httpapi" && !Flag.OPENCODE_HTTPAPI_LEGACY_LISTENER
let inner: Listener
if (native) {
log.info("server backend selected", {
...ServerBackend.attributes(selected),
"opencode.server.listener": "bun-native",
})
inner = await HttpApiListener.listen(opts)
} else {
const built = create(opts)
const server = await built.runtime.listen(opts)
const innerUrl = new URL("http://localhost")
innerUrl.hostname = opts.hostname
innerUrl.port = String(server.port)
inner = {
hostname: opts.hostname,
port: server.port,
url: innerUrl,
stop: (close?: boolean) => server.stop(close),
}
}
const next = new URL("http://localhost")
next.hostname = opts.hostname
next.port = String(server.port)
const next = new URL(inner.url)
url = next
const mdns =
opts.mdns &&
server.port &&
inner.port &&
opts.hostname !== "127.0.0.1" &&
opts.hostname !== "localhost" &&
opts.hostname !== "::1"
if (mdns) {
MDNS.publish(server.port, opts.mdnsDomain)
MDNS.publish(inner.port, opts.mdnsDomain)
} else if (opts.mdns) {
log.warn("mDNS enabled but hostname is loopback; skipping mDNS publish")
}
let closing: Promise<void> | undefined
return {
hostname: opts.hostname,
port: server.port,
hostname: inner.hostname,
port: inner.port,
url: next,
stop(close?: boolean) {
closing ??= (async () => {
if (mdns) MDNS.unpublish()
await server.stop(close)
await inner.stop(close)
})()
return closing
},

View File

@@ -1,10 +1,19 @@
import { afterEach, describe, expect, test } from "bun:test"
import type { ServerWebSocket } from "bun"
import { mkdir } from "node:fs/promises"
import path from "node:path"
import { Flag } from "@opencode-ai/core/flag/flag"
import * as Log from "@opencode-ai/core/util/log"
import { resetDatabase } from "../fixture/db"
import { disposeAllInstances, tmpdir } from "../fixture/fixture"
import { registerAdapter } from "../../src/control-plane/adapters"
import type { WorkspaceAdapter } from "../../src/control-plane/types"
import { Workspace } from "../../src/control-plane/workspace"
import { AppRuntime } from "../../src/effect/app-runtime"
import { Project } from "../../src/project/project"
import { HttpApiListener } from "../../src/server/httpapi-listener"
import { PtyPaths } from "../../src/server/routes/instance/httpapi/groups/pty"
import { Effect } from "effect"
void Log.init({ print: false })
@@ -43,6 +52,99 @@ describe("native HttpApi listener", () => {
}
})
test("workspace-proxy WS forwarding round-trips through a fake remote", async () => {
await using tmp = await tmpdir({ git: true, config: { formatter: false, lsp: false } })
// Tiny Bun.serve fake remote that echoes every WS frame it receives.
type EchoState = { closed: boolean }
const remote = Bun.serve<EchoState>({
hostname: "127.0.0.1",
port: 0,
fetch(request, server) {
if (request.headers.get("upgrade")?.toLowerCase() === "websocket") {
if (server.upgrade(request, { data: { closed: false } })) return undefined
return new Response("upgrade failed", { status: 400 })
}
return new Response("ok")
},
websocket: {
open(_ws: ServerWebSocket<EchoState>) {},
message(ws: ServerWebSocket<EchoState>, msg: string | Buffer) {
ws.send(typeof msg === "string" ? `echo:${msg}` : msg)
},
close(_ws: ServerWebSocket<EchoState>) {},
},
})
// The path "/probe" is not a known local-only or PTY route, so the listener
// should treat it as a candidate for workspace-proxy WS forwarding.
const remoteBase = `http://${remote.hostname}:${remote.port}`
// Register a remote workspace whose target points at the echo server.
const adapter: WorkspaceAdapter = {
name: "Remote Listener Test",
description: "Remote workspace target for HttpApiListener proxy WS test",
configure: (info) => ({ ...info, name: "remote-listener-test", directory: path.join(tmp.path, ".remote") }),
create: async () => {
await mkdir(path.join(tmp.path, ".remote"), { recursive: true })
},
async remove() {},
target: () => ({ type: "remote" as const, url: remoteBase }),
}
const workspaceID = await AppRuntime.runPromise(
Effect.gen(function* () {
const project = yield* Project.Service.use((svc) => svc.fromDirectory(tmp.path))
registerAdapter(project.project.id, "httpapi-listener-proxy-ws", adapter)
const created = yield* Workspace.Service.use((svc) =>
svc.create({
type: "httpapi-listener-proxy-ws",
branch: null,
extra: null,
projectID: project.project.id,
}),
)
return created.id
}),
)
const listener = await startListener()
try {
const wsURL = new URL("/probe", listener.url)
wsURL.protocol = "ws:"
wsURL.searchParams.set("workspace", workspaceID)
const messages: string[] = []
const ws = new WebSocket(wsURL)
ws.binaryType = "arraybuffer"
const opened = new Promise<void>((resolve, reject) => {
ws.addEventListener("open", () => resolve(), { once: true })
ws.addEventListener("error", () => reject(new Error("ws error before open")), { once: true })
})
ws.addEventListener("message", (event) => {
const data = event.data
messages.push(typeof data === "string" ? data : new TextDecoder().decode(data as ArrayBuffer))
})
await opened
ws.send("hello-proxy")
const start = Date.now()
while (!messages.some((m) => m === "echo:hello-proxy") && Date.now() - start < 5_000) {
await new Promise((r) => setTimeout(r, 25))
}
expect(messages).toContain("echo:hello-proxy")
ws.close(1000, "done")
} finally {
await listener.stop(true)
remote.stop(true)
}
})
testPty("PTY websocket connect echoes input back to the client", async () => {
await using tmp = await tmpdir({ git: true, config: { formatter: false, lsp: false } })
const listener = await startListener()