mirror of
https://fastgit.cc/https://github.com/anomalyco/opencode
synced 2026-05-03 23:31:41 +08:00
Compare commits
2 Commits
beta
...
kit/server
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
adc5f528af | ||
|
|
39288a6953 |
@@ -94,6 +94,12 @@ export const Flag = {
|
||||
OPENCODE_EXPERIMENTAL_HTTPAPI:
|
||||
truthy("OPENCODE_EXPERIMENTAL_HTTPAPI") ||
|
||||
(!falsy("OPENCODE_EXPERIMENTAL_HTTPAPI") && HTTPAPI_DEFAULT_ON_CHANNELS.has(InstallationChannel)),
|
||||
// Kill-switch that forces the effect-httpapi backend back through the legacy
|
||||
// hono runtime adapter (Hono.fetch + createBunWebSocket) instead of the
|
||||
// native Bun.serve listener. Defaults to false; set to "true"/"1" to revert
|
||||
// if the native listener regresses for a user. Has no effect when the hono
|
||||
// backend is selected.
|
||||
OPENCODE_HTTPAPI_LEGACY_LISTENER: truthy("OPENCODE_HTTPAPI_LEGACY_LISTENER"),
|
||||
OPENCODE_EXPERIMENTAL_WORKSPACES: OPENCODE_EXPERIMENTAL || truthy("OPENCODE_EXPERIMENTAL_WORKSPACES"),
|
||||
OPENCODE_EXPERIMENTAL_EVENT_SYSTEM: OPENCODE_EXPERIMENTAL || truthy("OPENCODE_EXPERIMENTAL_EVENT_SYSTEM"),
|
||||
|
||||
|
||||
@@ -8,6 +8,7 @@
|
||||
|
||||
import type { ServerWebSocket } from "bun"
|
||||
import { Effect, Schema } from "effect"
|
||||
import { Flag } from "@opencode-ai/core/flag/flag"
|
||||
import { AppRuntime } from "@/effect/app-runtime"
|
||||
import { WithInstance } from "@/project/with-instance"
|
||||
import { Pty } from "@/pty"
|
||||
@@ -15,8 +16,14 @@ import { handlePtyInput } from "@/pty/input"
|
||||
import { PtyID } from "@/pty/schema"
|
||||
import { PtyPaths } from "@/server/routes/instance/httpapi/groups/pty"
|
||||
import { ExperimentalHttpApiServer } from "@/server/routes/instance/httpapi/server"
|
||||
import { getAdapter } from "@/control-plane/adapters"
|
||||
import { WorkspaceID } from "@/control-plane/schema"
|
||||
import { Workspace } from "@/control-plane/workspace"
|
||||
import { Session } from "@/session/session"
|
||||
import * as Log from "@opencode-ai/core/util/log"
|
||||
import type { CorsOptions } from "./cors"
|
||||
import { ProxyUtil } from "./proxy-util"
|
||||
import { getWorkspaceRouteSessionID, isLocalWorkspaceRoute, workspaceProxyURL } from "./shared/workspace-routing"
|
||||
|
||||
const log = Log.create({ service: "httpapi-listener" })
|
||||
const decodePtyID = Schema.decodeUnknownSync(PtyID)
|
||||
@@ -33,7 +40,9 @@ export type ListenOptions = CorsOptions & {
|
||||
hostname: string
|
||||
}
|
||||
|
||||
type WsKind = { kind: "pty"; ptyID: string; cursor: number | undefined; directory: string }
|
||||
type WsKind =
|
||||
| { kind: "pty"; ptyID: string; cursor: number | undefined; directory: string }
|
||||
| { kind: "proxy"; remoteURL: string; subprotocols: string[] }
|
||||
|
||||
type PtyHandler = {
|
||||
onMessage: (message: string | ArrayBuffer) => void
|
||||
@@ -41,10 +50,14 @@ type PtyHandler = {
|
||||
}
|
||||
|
||||
type WsState = WsKind & {
|
||||
// pty fields
|
||||
handler?: PtyHandler
|
||||
pending: Array<string | Uint8Array>
|
||||
ready: boolean
|
||||
closed: boolean
|
||||
// proxy fields
|
||||
remote?: WebSocket
|
||||
proxyQueue?: Array<string | Uint8Array | ArrayBuffer>
|
||||
}
|
||||
|
||||
// Derive from the OpenAPI path so this stays in sync if the route literal moves.
|
||||
@@ -57,6 +70,71 @@ function parseCursor(value: string | null): number | undefined {
|
||||
return parsed
|
||||
}
|
||||
|
||||
function openProxy(ws: ServerWebSocket<WsState>) {
|
||||
const data = ws.data
|
||||
if (data.kind !== "proxy") return
|
||||
let remote: WebSocket
|
||||
try {
|
||||
remote = new WebSocket(data.remoteURL, data.subprotocols.length ? data.subprotocols : undefined)
|
||||
} catch (err) {
|
||||
log.error("proxy remote WebSocket construct failed", { error: err })
|
||||
ws.close(1011, "proxy connect failed")
|
||||
return
|
||||
}
|
||||
remote.binaryType = "arraybuffer"
|
||||
data.remote = remote
|
||||
|
||||
remote.onopen = () => {
|
||||
const queue = data.proxyQueue
|
||||
if (queue) {
|
||||
for (const item of queue) {
|
||||
try {
|
||||
remote.send(item as never)
|
||||
} catch {
|
||||
// ignore — close handlers will clean up
|
||||
}
|
||||
}
|
||||
queue.length = 0
|
||||
}
|
||||
}
|
||||
remote.onmessage = (event: MessageEvent) => {
|
||||
try {
|
||||
const payload = event.data
|
||||
if (typeof payload === "string") {
|
||||
ws.send(payload)
|
||||
} else if (payload instanceof ArrayBuffer) {
|
||||
ws.send(new Uint8Array(payload))
|
||||
} else if (payload instanceof Uint8Array) {
|
||||
ws.send(payload)
|
||||
} else if (payload instanceof Blob) {
|
||||
void payload.arrayBuffer().then((buf) => {
|
||||
try {
|
||||
ws.send(new Uint8Array(buf))
|
||||
} catch {
|
||||
// ignore
|
||||
}
|
||||
})
|
||||
}
|
||||
} catch {
|
||||
// ignore — socket likely closed
|
||||
}
|
||||
}
|
||||
remote.onerror = () => {
|
||||
try {
|
||||
ws.close(1011, "proxy error")
|
||||
} catch {
|
||||
// ignore
|
||||
}
|
||||
}
|
||||
remote.onclose = (event: CloseEvent) => {
|
||||
try {
|
||||
ws.close(event.code, event.reason)
|
||||
} catch {
|
||||
// ignore
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
function asAdapter(ws: ServerWebSocket<WsState>) {
|
||||
return {
|
||||
get readyState() {
|
||||
@@ -80,6 +158,55 @@ function asAdapter(ws: ServerWebSocket<WsState>) {
|
||||
}
|
||||
}
|
||||
|
||||
async function resolveWorkspaceProxy(
|
||||
request: Request,
|
||||
url: URL,
|
||||
): Promise<{ remoteURL: URL; subprotocols: string[] } | undefined> {
|
||||
// Skip proxy resolution entirely when this process is pinned to a single
|
||||
// workspace (the Hono path's WorkspaceRouterMiddleware uses the same guard).
|
||||
if (Flag.OPENCODE_WORKSPACE_ID) return undefined
|
||||
|
||||
// Local-only routes (e.g. /experimental/workspace, GET /session) never
|
||||
// forward — match the Hono behavior even though those routes don't currently
|
||||
// upgrade to WS.
|
||||
if (isLocalWorkspaceRoute(request.method, url.pathname)) return undefined
|
||||
|
||||
// /console paths are served locally and never proxied.
|
||||
if (url.pathname.startsWith("/console")) return undefined
|
||||
|
||||
let workspaceID: string | null = null
|
||||
|
||||
// Prefer session-derived workspace lookup when a session ID is present in
|
||||
// the path; fall back to the explicit ?workspace=... query parameter.
|
||||
const sessionID = getWorkspaceRouteSessionID(url)
|
||||
if (sessionID) {
|
||||
const session = await AppRuntime.runPromise(
|
||||
Session.Service.use((svc) => svc.get(sessionID)).pipe(Effect.withSpan("HttpApiListener.proxy.session")),
|
||||
).catch(() => undefined)
|
||||
if (session?.workspaceID) workspaceID = session.workspaceID
|
||||
}
|
||||
if (!workspaceID) workspaceID = url.searchParams.get("workspace")
|
||||
if (!workspaceID) return undefined
|
||||
|
||||
const workspace = await AppRuntime.runPromise(
|
||||
Workspace.Service.use((svc) => svc.get(WorkspaceID.make(workspaceID))).pipe(
|
||||
Effect.withSpan("HttpApiListener.proxy.workspace"),
|
||||
),
|
||||
).catch(() => undefined)
|
||||
if (!workspace) return undefined
|
||||
|
||||
const adapter = getAdapter(workspace.projectID, workspace.type)
|
||||
const target = await adapter.target(workspace)
|
||||
if (target.type !== "remote") return undefined
|
||||
|
||||
const proxyURL = workspaceProxyURL(target.url, url)
|
||||
const remoteURL = new URL(ProxyUtil.websocketTargetURL(proxyURL))
|
||||
return {
|
||||
remoteURL,
|
||||
subprotocols: ProxyUtil.websocketProtocols(request),
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Spin up a native Bun.serve that:
|
||||
* 1. Routes all HTTP traffic through the HttpApi web handler.
|
||||
@@ -98,10 +225,11 @@ export async function listen(opts: ListenOptions): Promise<Listener> {
|
||||
hostname: opts.hostname,
|
||||
port,
|
||||
idleTimeout: 0,
|
||||
fetch(request, server) {
|
||||
async fetch(request, server) {
|
||||
const url = new URL(request.url)
|
||||
const isUpgrade = request.headers.get("upgrade")?.toLowerCase() === "websocket"
|
||||
const ptyMatch = url.pathname.match(ptyConnectPattern)
|
||||
if (ptyMatch && request.headers.get("upgrade")?.toLowerCase() === "websocket") {
|
||||
if (ptyMatch && isUpgrade) {
|
||||
const ptyID = ptyMatch[1]!
|
||||
const cursor = parseCursor(url.searchParams.get("cursor"))
|
||||
// Resolve the instance directory the same way the HttpApi
|
||||
@@ -124,20 +252,50 @@ export async function listen(opts: ListenOptions): Promise<Listener> {
|
||||
return new Response("upgrade failed", { status: 400 })
|
||||
}
|
||||
|
||||
// TODO: workspace-proxy WS upgrade detection. The Hono path forwards via a
|
||||
// remote `new WebSocket(url, ...)` (see ServerProxy.websocket). To support
|
||||
// that here we'd need to (a) resolve the workspace target the same way
|
||||
// `WorkspaceRouterMiddleware` does today, then (b) `server.upgrade(request,
|
||||
// { data: { kind: "proxy", target, headers, protocols } })` and bridge the
|
||||
// ServerWebSocket to a remote WebSocket inside the `websocket` handlers.
|
||||
// Deferred to a follow-up — the proxy story needs more design (auth header
|
||||
// forwarding, fence sync, reconnection semantics) than fits this PR.
|
||||
// Workspace-proxy WS forwarding. Mirrors the Hono path's
|
||||
// `WorkspaceRouterMiddleware` → `ServerProxy.websocket` flow but inline.
|
||||
// Bridging to the remote `new WebSocket(...)` happens inside the
|
||||
// `websocket.open` handler below.
|
||||
//
|
||||
// TODO: Node adapter (no Bun.serve) needs an equivalent path using
|
||||
// `node:http` + `ws`.
|
||||
if (isUpgrade) {
|
||||
try {
|
||||
const proxy = await resolveWorkspaceProxy(request, url)
|
||||
if (proxy) {
|
||||
log.info("workspace-proxy websocket", {
|
||||
request: url.toString(),
|
||||
remote: proxy.remoteURL.toString(),
|
||||
})
|
||||
const upgraded = server.upgrade(request, {
|
||||
data: {
|
||||
kind: "proxy",
|
||||
remoteURL: proxy.remoteURL.toString(),
|
||||
subprotocols: proxy.subprotocols,
|
||||
pending: [],
|
||||
ready: false,
|
||||
closed: false,
|
||||
proxyQueue: [],
|
||||
} satisfies WsState,
|
||||
})
|
||||
if (upgraded) return undefined
|
||||
return new Response("upgrade failed", { status: 400 })
|
||||
}
|
||||
} catch (err) {
|
||||
log.error("workspace-proxy ws resolve failed", { error: err })
|
||||
return new Response("workspace lookup failed", { status: 500 })
|
||||
}
|
||||
}
|
||||
|
||||
return handler(request as Request, context as never)
|
||||
},
|
||||
websocket: {
|
||||
open(ws) {
|
||||
const data = ws.data
|
||||
if (data.kind === "proxy") {
|
||||
openProxy(ws)
|
||||
return
|
||||
}
|
||||
if (data.kind !== "pty") {
|
||||
ws.close(1011, "unknown ws kind")
|
||||
return
|
||||
@@ -187,6 +345,25 @@ export async function listen(opts: ListenOptions): Promise<Listener> {
|
||||
},
|
||||
message(ws, message) {
|
||||
const data = ws.data
|
||||
if (data.kind === "proxy") {
|
||||
const payload =
|
||||
typeof message === "string"
|
||||
? message
|
||||
: message instanceof Buffer
|
||||
? new Uint8Array(message.buffer, message.byteOffset, message.byteLength)
|
||||
: (message as Uint8Array)
|
||||
const remote = data.remote
|
||||
if (remote && remote.readyState === WebSocket.OPEN) {
|
||||
try {
|
||||
remote.send(payload)
|
||||
} catch {
|
||||
// ignore send errors; lifecycle handlers will tear things down
|
||||
}
|
||||
return
|
||||
}
|
||||
data.proxyQueue?.push(payload)
|
||||
return
|
||||
}
|
||||
if (data.kind !== "pty") return
|
||||
const payload =
|
||||
typeof message === "string"
|
||||
@@ -200,9 +377,17 @@ export async function listen(opts: ListenOptions): Promise<Listener> {
|
||||
}
|
||||
AppRuntime.runPromise(handlePtyInput(data.handler, payload)).catch(() => undefined)
|
||||
},
|
||||
close(ws) {
|
||||
close(ws, code, reason) {
|
||||
const data = ws.data
|
||||
data.closed = true
|
||||
if (data.kind === "proxy") {
|
||||
try {
|
||||
data.remote?.close(code, reason)
|
||||
} catch {
|
||||
// ignore
|
||||
}
|
||||
return
|
||||
}
|
||||
data.handler?.onClose()
|
||||
},
|
||||
},
|
||||
|
||||
@@ -19,6 +19,7 @@ import { InstanceMiddleware } from "./routes/instance/middleware"
|
||||
import { WorkspaceRoutes } from "./routes/control/workspace"
|
||||
import { ExperimentalHttpApiServer } from "./routes/instance/httpapi/server"
|
||||
import { PublicApi } from "./routes/instance/httpapi/public"
|
||||
import { HttpApiListener } from "./httpapi-listener"
|
||||
import * as ServerBackend from "./backend"
|
||||
import type { CorsOptions } from "./cors"
|
||||
|
||||
@@ -182,35 +183,53 @@ export async function openapiHono() {
|
||||
export let url: URL
|
||||
|
||||
export async function listen(opts: ListenOptions): Promise<Listener> {
|
||||
const built = create(opts)
|
||||
const server = await built.runtime.listen(opts)
|
||||
const selected = select()
|
||||
const native = selected.backend === "effect-httpapi" && !Flag.OPENCODE_HTTPAPI_LEGACY_LISTENER
|
||||
let inner: Listener
|
||||
if (native) {
|
||||
log.info("server backend selected", {
|
||||
...ServerBackend.attributes(selected),
|
||||
"opencode.server.listener": "bun-native",
|
||||
})
|
||||
inner = await HttpApiListener.listen(opts)
|
||||
} else {
|
||||
const built = create(opts)
|
||||
const server = await built.runtime.listen(opts)
|
||||
const innerUrl = new URL("http://localhost")
|
||||
innerUrl.hostname = opts.hostname
|
||||
innerUrl.port = String(server.port)
|
||||
inner = {
|
||||
hostname: opts.hostname,
|
||||
port: server.port,
|
||||
url: innerUrl,
|
||||
stop: (close?: boolean) => server.stop(close),
|
||||
}
|
||||
}
|
||||
|
||||
const next = new URL("http://localhost")
|
||||
next.hostname = opts.hostname
|
||||
next.port = String(server.port)
|
||||
const next = new URL(inner.url)
|
||||
url = next
|
||||
|
||||
const mdns =
|
||||
opts.mdns &&
|
||||
server.port &&
|
||||
inner.port &&
|
||||
opts.hostname !== "127.0.0.1" &&
|
||||
opts.hostname !== "localhost" &&
|
||||
opts.hostname !== "::1"
|
||||
if (mdns) {
|
||||
MDNS.publish(server.port, opts.mdnsDomain)
|
||||
MDNS.publish(inner.port, opts.mdnsDomain)
|
||||
} else if (opts.mdns) {
|
||||
log.warn("mDNS enabled but hostname is loopback; skipping mDNS publish")
|
||||
}
|
||||
|
||||
let closing: Promise<void> | undefined
|
||||
return {
|
||||
hostname: opts.hostname,
|
||||
port: server.port,
|
||||
hostname: inner.hostname,
|
||||
port: inner.port,
|
||||
url: next,
|
||||
stop(close?: boolean) {
|
||||
closing ??= (async () => {
|
||||
if (mdns) MDNS.unpublish()
|
||||
await server.stop(close)
|
||||
await inner.stop(close)
|
||||
})()
|
||||
return closing
|
||||
},
|
||||
|
||||
@@ -1,10 +1,19 @@
|
||||
import { afterEach, describe, expect, test } from "bun:test"
|
||||
import type { ServerWebSocket } from "bun"
|
||||
import { mkdir } from "node:fs/promises"
|
||||
import path from "node:path"
|
||||
import { Flag } from "@opencode-ai/core/flag/flag"
|
||||
import * as Log from "@opencode-ai/core/util/log"
|
||||
import { resetDatabase } from "../fixture/db"
|
||||
import { disposeAllInstances, tmpdir } from "../fixture/fixture"
|
||||
import { registerAdapter } from "../../src/control-plane/adapters"
|
||||
import type { WorkspaceAdapter } from "../../src/control-plane/types"
|
||||
import { Workspace } from "../../src/control-plane/workspace"
|
||||
import { AppRuntime } from "../../src/effect/app-runtime"
|
||||
import { Project } from "../../src/project/project"
|
||||
import { HttpApiListener } from "../../src/server/httpapi-listener"
|
||||
import { PtyPaths } from "../../src/server/routes/instance/httpapi/groups/pty"
|
||||
import { Effect } from "effect"
|
||||
|
||||
void Log.init({ print: false })
|
||||
|
||||
@@ -43,6 +52,99 @@ describe("native HttpApi listener", () => {
|
||||
}
|
||||
})
|
||||
|
||||
test("workspace-proxy WS forwarding round-trips through a fake remote", async () => {
|
||||
await using tmp = await tmpdir({ git: true, config: { formatter: false, lsp: false } })
|
||||
|
||||
// Tiny Bun.serve fake remote that echoes every WS frame it receives.
|
||||
type EchoState = { closed: boolean }
|
||||
const remote = Bun.serve<EchoState>({
|
||||
hostname: "127.0.0.1",
|
||||
port: 0,
|
||||
fetch(request, server) {
|
||||
if (request.headers.get("upgrade")?.toLowerCase() === "websocket") {
|
||||
if (server.upgrade(request, { data: { closed: false } })) return undefined
|
||||
return new Response("upgrade failed", { status: 400 })
|
||||
}
|
||||
return new Response("ok")
|
||||
},
|
||||
websocket: {
|
||||
open(_ws: ServerWebSocket<EchoState>) {},
|
||||
message(ws: ServerWebSocket<EchoState>, msg: string | Buffer) {
|
||||
ws.send(typeof msg === "string" ? `echo:${msg}` : msg)
|
||||
},
|
||||
close(_ws: ServerWebSocket<EchoState>) {},
|
||||
},
|
||||
})
|
||||
|
||||
// The path "/probe" is not a known local-only or PTY route, so the listener
|
||||
// should treat it as a candidate for workspace-proxy WS forwarding.
|
||||
const remoteBase = `http://${remote.hostname}:${remote.port}`
|
||||
|
||||
// Register a remote workspace whose target points at the echo server.
|
||||
const adapter: WorkspaceAdapter = {
|
||||
name: "Remote Listener Test",
|
||||
description: "Remote workspace target for HttpApiListener proxy WS test",
|
||||
configure: (info) => ({ ...info, name: "remote-listener-test", directory: path.join(tmp.path, ".remote") }),
|
||||
create: async () => {
|
||||
await mkdir(path.join(tmp.path, ".remote"), { recursive: true })
|
||||
},
|
||||
async remove() {},
|
||||
target: () => ({ type: "remote" as const, url: remoteBase }),
|
||||
}
|
||||
|
||||
const workspaceID = await AppRuntime.runPromise(
|
||||
Effect.gen(function* () {
|
||||
const project = yield* Project.Service.use((svc) => svc.fromDirectory(tmp.path))
|
||||
registerAdapter(project.project.id, "httpapi-listener-proxy-ws", adapter)
|
||||
const created = yield* Workspace.Service.use((svc) =>
|
||||
svc.create({
|
||||
type: "httpapi-listener-proxy-ws",
|
||||
branch: null,
|
||||
extra: null,
|
||||
projectID: project.project.id,
|
||||
}),
|
||||
)
|
||||
return created.id
|
||||
}),
|
||||
)
|
||||
|
||||
const listener = await startListener()
|
||||
try {
|
||||
const wsURL = new URL("/probe", listener.url)
|
||||
wsURL.protocol = "ws:"
|
||||
wsURL.searchParams.set("workspace", workspaceID)
|
||||
|
||||
const messages: string[] = []
|
||||
const ws = new WebSocket(wsURL)
|
||||
ws.binaryType = "arraybuffer"
|
||||
|
||||
const opened = new Promise<void>((resolve, reject) => {
|
||||
ws.addEventListener("open", () => resolve(), { once: true })
|
||||
ws.addEventListener("error", () => reject(new Error("ws error before open")), { once: true })
|
||||
})
|
||||
|
||||
ws.addEventListener("message", (event) => {
|
||||
const data = event.data
|
||||
messages.push(typeof data === "string" ? data : new TextDecoder().decode(data as ArrayBuffer))
|
||||
})
|
||||
|
||||
await opened
|
||||
ws.send("hello-proxy")
|
||||
|
||||
const start = Date.now()
|
||||
while (!messages.some((m) => m === "echo:hello-proxy") && Date.now() - start < 5_000) {
|
||||
await new Promise((r) => setTimeout(r, 25))
|
||||
}
|
||||
|
||||
expect(messages).toContain("echo:hello-proxy")
|
||||
|
||||
ws.close(1000, "done")
|
||||
} finally {
|
||||
await listener.stop(true)
|
||||
remote.stop(true)
|
||||
}
|
||||
})
|
||||
|
||||
testPty("PTY websocket connect echoes input back to the client", async () => {
|
||||
await using tmp = await tmpdir({ git: true, config: { formatter: false, lsp: false } })
|
||||
const listener = await startListener()
|
||||
|
||||
Reference in New Issue
Block a user