From 66257663509bc12bb208c4c65f73c45206106aae Mon Sep 17 00:00:00 2001 From: Kit Langton Date: Wed, 15 Apr 2026 21:56:02 -0400 Subject: [PATCH] feat: unwrap MCP namespace to flat exports + barrel (#22693) --- packages/opencode/src/mcp/index.ts | 931 +---------------------------- packages/opencode/src/mcp/mcp.ts | 928 ++++++++++++++++++++++++++++ 2 files changed, 929 insertions(+), 930 deletions(-) create mode 100644 packages/opencode/src/mcp/mcp.ts diff --git a/packages/opencode/src/mcp/index.ts b/packages/opencode/src/mcp/index.ts index cbaa2c24b3..c42b9eb5c1 100644 --- a/packages/opencode/src/mcp/index.ts +++ b/packages/opencode/src/mcp/index.ts @@ -1,930 +1 @@ -import { dynamicTool, type Tool, jsonSchema, type JSONSchema7 } from "ai" -import { Client } from "@modelcontextprotocol/sdk/client/index.js" -import { StreamableHTTPClientTransport } from "@modelcontextprotocol/sdk/client/streamableHttp.js" -import { SSEClientTransport } from "@modelcontextprotocol/sdk/client/sse.js" -import { StdioClientTransport } from "@modelcontextprotocol/sdk/client/stdio.js" -import { UnauthorizedError } from "@modelcontextprotocol/sdk/client/auth.js" -import { - CallToolResultSchema, - type Tool as MCPToolDef, - ToolListChangedNotificationSchema, -} from "@modelcontextprotocol/sdk/types.js" -import { Config } from "../config" -import { Log } from "../util/log" -import { NamedError } from "@opencode-ai/shared/util/error" -import z from "zod/v4" -import { Instance } from "../project/instance" -import { Installation } from "../installation" -import { withTimeout } from "@/util/timeout" -import { AppFileSystem } from "@opencode-ai/shared/filesystem" -import { McpOAuthProvider } from "./oauth-provider" -import { McpOAuthCallback } from "./oauth-callback" -import { McpAuth } from "./auth" -import { BusEvent } from "../bus/bus-event" -import { Bus } from "@/bus" -import { TuiEvent } from "@/cli/cmd/tui/event" -import open from "open" -import { Effect, Exit, Layer, Option, Context, Stream } from "effect" -import { EffectBridge } from "@/effect/bridge" -import { InstanceState } from "@/effect/instance-state" -import { ChildProcess, ChildProcessSpawner } from "effect/unstable/process" -import * as CrossSpawnSpawner from "@/effect/cross-spawn-spawner" - -export namespace MCP { - const log = Log.create({ service: "mcp" }) - const DEFAULT_TIMEOUT = 30_000 - - export const Resource = z - .object({ - name: z.string(), - uri: z.string(), - description: z.string().optional(), - mimeType: z.string().optional(), - client: z.string(), - }) - .meta({ ref: "McpResource" }) - export type Resource = z.infer - - export const ToolsChanged = BusEvent.define( - "mcp.tools.changed", - z.object({ - server: z.string(), - }), - ) - - export const BrowserOpenFailed = BusEvent.define( - "mcp.browser.open.failed", - z.object({ - mcpName: z.string(), - url: z.string(), - }), - ) - - export const Failed = NamedError.create( - "MCPFailed", - z.object({ - name: z.string(), - }), - ) - - type MCPClient = Client - - export const Status = z - .discriminatedUnion("status", [ - z - .object({ - status: z.literal("connected"), - }) - .meta({ - ref: "MCPStatusConnected", - }), - z - .object({ - status: z.literal("disabled"), - }) - .meta({ - ref: "MCPStatusDisabled", - }), - z - .object({ - status: z.literal("failed"), - error: z.string(), - }) - .meta({ - ref: "MCPStatusFailed", - }), - z - .object({ - status: z.literal("needs_auth"), - }) - .meta({ - ref: "MCPStatusNeedsAuth", - }), - z - .object({ - status: z.literal("needs_client_registration"), - error: z.string(), - }) - .meta({ - ref: "MCPStatusNeedsClientRegistration", - }), - ]) - .meta({ - ref: "MCPStatus", - }) - export type Status = z.infer - - // Store transports for OAuth servers to allow finishing auth - type TransportWithAuth = StreamableHTTPClientTransport | SSEClientTransport - const pendingOAuthTransports = new Map() - - // Prompt cache types - type PromptInfo = Awaited>["prompts"][number] - type ResourceInfo = Awaited>["resources"][number] - type McpEntry = NonNullable[string] - - function isMcpConfigured(entry: McpEntry): entry is Config.Mcp { - return typeof entry === "object" && entry !== null && "type" in entry - } - - const sanitize = (s: string) => s.replace(/[^a-zA-Z0-9_-]/g, "_") - - // Convert MCP tool definition to AI SDK Tool type - function convertMcpTool(mcpTool: MCPToolDef, client: MCPClient, timeout?: number): Tool { - const inputSchema = mcpTool.inputSchema - - // Spread first, then override type to ensure it's always "object" - const schema: JSONSchema7 = { - ...(inputSchema as JSONSchema7), - type: "object", - properties: (inputSchema.properties ?? {}) as JSONSchema7["properties"], - additionalProperties: false, - } - - return dynamicTool({ - description: mcpTool.description ?? "", - inputSchema: jsonSchema(schema), - execute: async (args: unknown) => { - return client.callTool( - { - name: mcpTool.name, - arguments: (args || {}) as Record, - }, - CallToolResultSchema, - { - resetTimeoutOnProgress: true, - timeout, - }, - ) - }, - }) - } - - function defs(key: string, client: MCPClient, timeout?: number) { - return Effect.tryPromise({ - try: () => withTimeout(client.listTools(), timeout ?? DEFAULT_TIMEOUT), - catch: (err) => (err instanceof Error ? err : new Error(String(err))), - }).pipe( - Effect.map((result) => result.tools), - Effect.catch((err) => { - log.error("failed to get tools from client", { key, error: err }) - return Effect.succeed(undefined) - }), - ) - } - - function fetchFromClient( - clientName: string, - client: Client, - listFn: (c: Client) => Promise, - label: string, - ) { - return Effect.tryPromise({ - try: () => listFn(client), - catch: (e: any) => { - log.error(`failed to get ${label}`, { clientName, error: e.message }) - return e - }, - }).pipe( - Effect.map((items) => { - const out: Record = {} - const sanitizedClient = sanitize(clientName) - for (const item of items) { - out[sanitizedClient + ":" + sanitize(item.name)] = { ...item, client: clientName } - } - return out - }), - Effect.orElseSucceed(() => undefined), - ) - } - - interface CreateResult { - mcpClient?: MCPClient - status: Status - defs?: MCPToolDef[] - } - - interface AuthResult { - authorizationUrl: string - oauthState: string - client?: MCPClient - } - - // --- Effect Service --- - - interface State { - status: Record - clients: Record - defs: Record - } - - export interface Interface { - readonly status: () => Effect.Effect> - readonly clients: () => Effect.Effect> - readonly tools: () => Effect.Effect> - readonly prompts: () => Effect.Effect> - readonly resources: () => Effect.Effect> - readonly add: (name: string, mcp: Config.Mcp) => Effect.Effect<{ status: Record | Status }> - readonly connect: (name: string) => Effect.Effect - readonly disconnect: (name: string) => Effect.Effect - readonly getPrompt: ( - clientName: string, - name: string, - args?: Record, - ) => Effect.Effect> | undefined> - readonly readResource: ( - clientName: string, - resourceUri: string, - ) => Effect.Effect> | undefined> - readonly startAuth: (mcpName: string) => Effect.Effect<{ authorizationUrl: string; oauthState: string }> - readonly authenticate: (mcpName: string) => Effect.Effect - readonly finishAuth: (mcpName: string, authorizationCode: string) => Effect.Effect - readonly removeAuth: (mcpName: string) => Effect.Effect - readonly supportsOAuth: (mcpName: string) => Effect.Effect - readonly hasStoredTokens: (mcpName: string) => Effect.Effect - readonly getAuthStatus: (mcpName: string) => Effect.Effect - } - - export class Service extends Context.Service()("@opencode/MCP") {} - - export const layer = Layer.effect( - Service, - Effect.gen(function* () { - const spawner = yield* ChildProcessSpawner.ChildProcessSpawner - const auth = yield* McpAuth.Service - const bus = yield* Bus.Service - - type Transport = StdioClientTransport | StreamableHTTPClientTransport | SSEClientTransport - - /** - * Connect a client via the given transport with resource safety: - * on failure the transport is closed; on success the caller owns it. - */ - const connectTransport = (transport: Transport, timeout: number) => - Effect.acquireUseRelease( - Effect.succeed(transport), - (t) => - Effect.tryPromise({ - try: () => { - const client = new Client({ name: "opencode", version: Installation.VERSION }) - return withTimeout(client.connect(t), timeout).then(() => client) - }, - catch: (e) => (e instanceof Error ? e : new Error(String(e))), - }), - (t, exit) => (Exit.isFailure(exit) ? Effect.tryPromise(() => t.close()).pipe(Effect.ignore) : Effect.void), - ) - - const DISABLED_RESULT: CreateResult = { status: { status: "disabled" } } - - const connectRemote = Effect.fn("MCP.connectRemote")(function* ( - key: string, - mcp: Config.Mcp & { type: "remote" }, - ) { - const oauthDisabled = mcp.oauth === false - const oauthConfig = typeof mcp.oauth === "object" ? mcp.oauth : undefined - let authProvider: McpOAuthProvider | undefined - - if (!oauthDisabled) { - authProvider = new McpOAuthProvider( - key, - mcp.url, - { - clientId: oauthConfig?.clientId, - clientSecret: oauthConfig?.clientSecret, - scope: oauthConfig?.scope, - redirectUri: oauthConfig?.redirectUri, - }, - { - onRedirect: async (url) => { - log.info("oauth redirect requested", { key, url: url.toString() }) - }, - }, - auth, - ) - } - - const transports: Array<{ name: string; transport: TransportWithAuth }> = [ - { - name: "StreamableHTTP", - transport: new StreamableHTTPClientTransport(new URL(mcp.url), { - authProvider, - requestInit: mcp.headers ? { headers: mcp.headers } : undefined, - }), - }, - { - name: "SSE", - transport: new SSEClientTransport(new URL(mcp.url), { - authProvider, - requestInit: mcp.headers ? { headers: mcp.headers } : undefined, - }), - }, - ] - - const connectTimeout = mcp.timeout ?? DEFAULT_TIMEOUT - let lastStatus: Status | undefined - - for (const { name, transport } of transports) { - const result = yield* connectTransport(transport, connectTimeout).pipe( - Effect.map((client) => ({ client, transportName: name })), - Effect.catch((error) => { - const lastError = error instanceof Error ? error : new Error(String(error)) - const isAuthError = - error instanceof UnauthorizedError || (authProvider && lastError.message.includes("OAuth")) - - if (isAuthError) { - log.info("mcp server requires authentication", { key, transport: name }) - - if (lastError.message.includes("registration") || lastError.message.includes("client_id")) { - lastStatus = { - status: "needs_client_registration" as const, - error: "Server does not support dynamic client registration. Please provide clientId in config.", - } - return bus - .publish(TuiEvent.ToastShow, { - title: "MCP Authentication Required", - message: `Server "${key}" requires a pre-registered client ID. Add clientId to your config.`, - variant: "warning", - duration: 8000, - }) - .pipe(Effect.ignore, Effect.as(undefined)) - } else { - pendingOAuthTransports.set(key, transport) - lastStatus = { status: "needs_auth" as const } - return bus - .publish(TuiEvent.ToastShow, { - title: "MCP Authentication Required", - message: `Server "${key}" requires authentication. Run: opencode mcp auth ${key}`, - variant: "warning", - duration: 8000, - }) - .pipe(Effect.ignore, Effect.as(undefined)) - } - } - - log.debug("transport connection failed", { - key, - transport: name, - url: mcp.url, - error: lastError.message, - }) - lastStatus = { status: "failed" as const, error: lastError.message } - return Effect.succeed(undefined) - }), - ) - if (result) { - log.info("connected", { key, transport: result.transportName }) - return { client: result.client as MCPClient | undefined, status: { status: "connected" } as Status } - } - // If this was an auth error, stop trying other transports - if (lastStatus?.status === "needs_auth" || lastStatus?.status === "needs_client_registration") break - } - - return { - client: undefined as MCPClient | undefined, - status: (lastStatus ?? { status: "failed", error: "Unknown error" }) as Status, - } - }) - - const connectLocal = Effect.fn("MCP.connectLocal")(function* (key: string, mcp: Config.Mcp & { type: "local" }) { - const [cmd, ...args] = mcp.command - const cwd = Instance.directory - const transport = new StdioClientTransport({ - stderr: "pipe", - command: cmd, - args, - cwd, - env: { - ...process.env, - ...(cmd === "opencode" ? { BUN_BE_BUN: "1" } : {}), - ...mcp.environment, - }, - }) - transport.stderr?.on("data", (chunk: Buffer) => { - log.info(`mcp stderr: ${chunk.toString()}`, { key }) - }) - - const connectTimeout = mcp.timeout ?? DEFAULT_TIMEOUT - return yield* connectTransport(transport, connectTimeout).pipe( - Effect.map((client): { client: MCPClient | undefined; status: Status } => ({ - client, - status: { status: "connected" }, - })), - Effect.catch((error): Effect.Effect<{ client: MCPClient | undefined; status: Status }> => { - const msg = error instanceof Error ? error.message : String(error) - log.error("local mcp startup failed", { key, command: mcp.command, cwd, error: msg }) - return Effect.succeed({ client: undefined, status: { status: "failed", error: msg } }) - }), - ) - }) - - const create = Effect.fn("MCP.create")(function* (key: string, mcp: Config.Mcp) { - if (mcp.enabled === false) { - log.info("mcp server disabled", { key }) - return DISABLED_RESULT - } - - log.info("found", { key, type: mcp.type }) - - const { client: mcpClient, status } = - mcp.type === "remote" - ? yield* connectRemote(key, mcp as Config.Mcp & { type: "remote" }) - : yield* connectLocal(key, mcp as Config.Mcp & { type: "local" }) - - if (!mcpClient) { - return { status } satisfies CreateResult - } - - const listed = yield* defs(key, mcpClient, mcp.timeout) - if (!listed) { - yield* Effect.tryPromise(() => mcpClient.close()).pipe(Effect.ignore) - return { status: { status: "failed", error: "Failed to get tools" } } satisfies CreateResult - } - - log.info("create() successfully created client", { key, toolCount: listed.length }) - return { mcpClient, status, defs: listed } satisfies CreateResult - }) - const cfgSvc = yield* Config.Service - - const descendants = Effect.fnUntraced( - function* (pid: number) { - if (process.platform === "win32") return [] as number[] - const pids: number[] = [] - const queue = [pid] - while (queue.length > 0) { - const current = queue.shift()! - const handle = yield* spawner.spawn( - ChildProcess.make("pgrep", ["-P", String(current)], { stdin: "ignore" }), - ) - const text = yield* Stream.mkString(Stream.decodeText(handle.stdout)) - yield* handle.exitCode - for (const tok of text.split("\n")) { - const cpid = parseInt(tok, 10) - if (!isNaN(cpid) && !pids.includes(cpid)) { - pids.push(cpid) - queue.push(cpid) - } - } - } - return pids - }, - Effect.scoped, - Effect.catch(() => Effect.succeed([] as number[])), - ) - - function watch(s: State, name: string, client: MCPClient, bridge: EffectBridge.Shape, timeout?: number) { - client.setNotificationHandler(ToolListChangedNotificationSchema, async () => { - log.info("tools list changed notification received", { server: name }) - if (s.clients[name] !== client || s.status[name]?.status !== "connected") return - - const listed = await bridge.promise(defs(name, client, timeout)) - if (!listed) return - if (s.clients[name] !== client || s.status[name]?.status !== "connected") return - - s.defs[name] = listed - await bridge.promise(bus.publish(ToolsChanged, { server: name }).pipe(Effect.ignore)) - }) - } - - const state = yield* InstanceState.make( - Effect.fn("MCP.state")(function* () { - const cfg = yield* cfgSvc.get() - const bridge = yield* EffectBridge.make() - const config = cfg.mcp ?? {} - const s: State = { - status: {}, - clients: {}, - defs: {}, - } - - yield* Effect.forEach( - Object.entries(config), - ([key, mcp]) => - Effect.gen(function* () { - if (!isMcpConfigured(mcp)) { - log.error("Ignoring MCP config entry without type", { key }) - return - } - - if (mcp.enabled === false) { - s.status[key] = { status: "disabled" } - return - } - - const result = yield* create(key, mcp).pipe(Effect.catch(() => Effect.void)) - if (!result) return - - s.status[key] = result.status - if (result.mcpClient) { - s.clients[key] = result.mcpClient - s.defs[key] = result.defs! - watch(s, key, result.mcpClient, bridge, mcp.timeout) - } - }), - { concurrency: "unbounded" }, - ) - - yield* Effect.addFinalizer(() => - Effect.gen(function* () { - yield* Effect.forEach( - Object.values(s.clients), - (client) => - Effect.gen(function* () { - const pid = (client.transport as any)?.pid - if (typeof pid === "number") { - const pids = yield* descendants(pid) - for (const dpid of pids) { - try { - process.kill(dpid, "SIGTERM") - } catch {} - } - } - yield* Effect.tryPromise(() => client.close()).pipe(Effect.ignore) - }), - { concurrency: "unbounded" }, - ) - pendingOAuthTransports.clear() - }), - ) - - return s - }), - ) - - function closeClient(s: State, name: string) { - const client = s.clients[name] - delete s.defs[name] - if (!client) return Effect.void - return Effect.tryPromise(() => client.close()).pipe(Effect.ignore) - } - - const storeClient = Effect.fnUntraced(function* ( - s: State, - name: string, - client: MCPClient, - listed: MCPToolDef[], - timeout?: number, - ) { - const bridge = yield* EffectBridge.make() - yield* closeClient(s, name) - s.status[name] = { status: "connected" } - s.clients[name] = client - s.defs[name] = listed - watch(s, name, client, bridge, timeout) - return s.status[name] - }) - - const status = Effect.fn("MCP.status")(function* () { - const s = yield* InstanceState.get(state) - - const cfg = yield* cfgSvc.get() - const config = cfg.mcp ?? {} - const result: Record = {} - - for (const [key, mcp] of Object.entries(config)) { - if (!isMcpConfigured(mcp)) continue - result[key] = s.status[key] ?? { status: "disabled" } - } - - return result - }) - - const clients = Effect.fn("MCP.clients")(function* () { - const s = yield* InstanceState.get(state) - return s.clients - }) - - const createAndStore = Effect.fn("MCP.createAndStore")(function* (name: string, mcp: Config.Mcp) { - const s = yield* InstanceState.get(state) - const result = yield* create(name, mcp) - - s.status[name] = result.status - if (!result.mcpClient) { - yield* closeClient(s, name) - delete s.clients[name] - return result.status - } - - return yield* storeClient(s, name, result.mcpClient, result.defs!, mcp.timeout) - }) - - const add = Effect.fn("MCP.add")(function* (name: string, mcp: Config.Mcp) { - yield* createAndStore(name, mcp) - const s = yield* InstanceState.get(state) - return { status: s.status } - }) - - const connect = Effect.fn("MCP.connect")(function* (name: string) { - const mcp = yield* getMcpConfig(name) - if (!mcp) { - log.error("MCP config not found or invalid", { name }) - return - } - yield* createAndStore(name, { ...mcp, enabled: true }) - }) - - const disconnect = Effect.fn("MCP.disconnect")(function* (name: string) { - const s = yield* InstanceState.get(state) - yield* closeClient(s, name) - delete s.clients[name] - s.status[name] = { status: "disabled" } - }) - - const tools = Effect.fn("MCP.tools")(function* () { - const result: Record = {} - const s = yield* InstanceState.get(state) - - const cfg = yield* cfgSvc.get() - const config = cfg.mcp ?? {} - const defaultTimeout = cfg.experimental?.mcp_timeout - - const connectedClients = Object.entries(s.clients).filter( - ([clientName]) => s.status[clientName]?.status === "connected", - ) - - yield* Effect.forEach( - connectedClients, - ([clientName, client]) => - Effect.gen(function* () { - const mcpConfig = config[clientName] - const entry = mcpConfig && isMcpConfigured(mcpConfig) ? mcpConfig : undefined - - const listed = s.defs[clientName] - if (!listed) { - log.warn("missing cached tools for connected server", { clientName }) - return - } - - const timeout = entry?.timeout ?? defaultTimeout - for (const mcpTool of listed) { - result[sanitize(clientName) + "_" + sanitize(mcpTool.name)] = convertMcpTool(mcpTool, client, timeout) - } - }), - { concurrency: "unbounded" }, - ) - return result - }) - - function collectFromConnected( - s: State, - listFn: (c: Client) => Promise, - label: string, - ) { - return Effect.forEach( - Object.entries(s.clients).filter(([name]) => s.status[name]?.status === "connected"), - ([clientName, client]) => - fetchFromClient(clientName, client, listFn, label).pipe(Effect.map((items) => Object.entries(items ?? {}))), - { concurrency: "unbounded" }, - ).pipe(Effect.map((results) => Object.fromEntries(results.flat()))) - } - - const prompts = Effect.fn("MCP.prompts")(function* () { - const s = yield* InstanceState.get(state) - return yield* collectFromConnected(s, (c) => c.listPrompts().then((r) => r.prompts), "prompts") - }) - - const resources = Effect.fn("MCP.resources")(function* () { - const s = yield* InstanceState.get(state) - return yield* collectFromConnected(s, (c) => c.listResources().then((r) => r.resources), "resources") - }) - - const withClient = Effect.fnUntraced(function* ( - clientName: string, - fn: (client: MCPClient) => Promise, - label: string, - meta?: Record, - ) { - const s = yield* InstanceState.get(state) - const client = s.clients[clientName] - if (!client) { - log.warn(`client not found for ${label}`, { clientName }) - return undefined - } - return yield* Effect.tryPromise({ - try: () => fn(client), - catch: (e: any) => { - log.error(`failed to ${label}`, { clientName, ...meta, error: e?.message }) - return e - }, - }).pipe(Effect.orElseSucceed(() => undefined)) - }) - - const getPrompt = Effect.fn("MCP.getPrompt")(function* ( - clientName: string, - name: string, - args?: Record, - ) { - return yield* withClient(clientName, (client) => client.getPrompt({ name, arguments: args }), "getPrompt", { - promptName: name, - }) - }) - - const readResource = Effect.fn("MCP.readResource")(function* (clientName: string, resourceUri: string) { - return yield* withClient(clientName, (client) => client.readResource({ uri: resourceUri }), "readResource", { - resourceUri, - }) - }) - - const getMcpConfig = Effect.fnUntraced(function* (mcpName: string) { - const cfg = yield* cfgSvc.get() - const mcpConfig = cfg.mcp?.[mcpName] - if (!mcpConfig || !isMcpConfigured(mcpConfig)) return undefined - return mcpConfig - }) - - const startAuth = Effect.fn("MCP.startAuth")(function* (mcpName: string) { - const mcpConfig = yield* getMcpConfig(mcpName) - if (!mcpConfig) throw new Error(`MCP server ${mcpName} not found or disabled`) - if (mcpConfig.type !== "remote") throw new Error(`MCP server ${mcpName} is not a remote server`) - if (mcpConfig.oauth === false) throw new Error(`MCP server ${mcpName} has OAuth explicitly disabled`) - - // OAuth config is optional - if not provided, we'll use auto-discovery - const oauthConfig = typeof mcpConfig.oauth === "object" ? mcpConfig.oauth : undefined - - // Start the callback server with custom redirectUri if configured - yield* Effect.promise(() => McpOAuthCallback.ensureRunning(oauthConfig?.redirectUri)) - - const oauthState = Array.from(crypto.getRandomValues(new Uint8Array(32))) - .map((b) => b.toString(16).padStart(2, "0")) - .join("") - yield* auth.updateOAuthState(mcpName, oauthState) - let capturedUrl: URL | undefined - const authProvider = new McpOAuthProvider( - mcpName, - mcpConfig.url, - { - clientId: oauthConfig?.clientId, - clientSecret: oauthConfig?.clientSecret, - scope: oauthConfig?.scope, - redirectUri: oauthConfig?.redirectUri, - }, - { - onRedirect: async (url) => { - capturedUrl = url - }, - }, - auth, - ) - - const transport = new StreamableHTTPClientTransport(new URL(mcpConfig.url), { authProvider }) - - return yield* Effect.tryPromise({ - try: () => { - const client = new Client({ name: "opencode", version: Installation.VERSION }) - return client - .connect(transport) - .then(() => ({ authorizationUrl: "", oauthState, client }) satisfies AuthResult) - }, - catch: (error) => error, - }).pipe( - Effect.catch((error) => { - if (error instanceof UnauthorizedError && capturedUrl) { - pendingOAuthTransports.set(mcpName, transport) - return Effect.succeed({ authorizationUrl: capturedUrl.toString(), oauthState } satisfies AuthResult) - } - return Effect.die(error) - }), - ) - }) - - const authenticate = Effect.fn("MCP.authenticate")(function* (mcpName: string) { - const result = yield* startAuth(mcpName) - if (!result.authorizationUrl) { - const client = "client" in result ? result.client : undefined - const mcpConfig = yield* getMcpConfig(mcpName) - if (!mcpConfig) { - yield* Effect.tryPromise(() => client?.close() ?? Promise.resolve()).pipe(Effect.ignore) - return { status: "failed", error: "MCP config not found after auth" } as Status - } - - const listed = client ? yield* defs(mcpName, client, mcpConfig.timeout) : undefined - if (!client || !listed) { - yield* Effect.tryPromise(() => client?.close() ?? Promise.resolve()).pipe(Effect.ignore) - return { status: "failed", error: "Failed to get tools" } as Status - } - - const s = yield* InstanceState.get(state) - yield* auth.clearOAuthState(mcpName) - return yield* storeClient(s, mcpName, client, listed, mcpConfig.timeout) - } - - log.info("opening browser for oauth", { mcpName, url: result.authorizationUrl, state: result.oauthState }) - - const callbackPromise = McpOAuthCallback.waitForCallback(result.oauthState, mcpName) - - yield* Effect.tryPromise(() => open(result.authorizationUrl)).pipe( - Effect.flatMap((subprocess) => - Effect.callback((resume) => { - const timer = setTimeout(() => resume(Effect.void), 500) - subprocess.on("error", (err) => { - clearTimeout(timer) - resume(Effect.fail(err)) - }) - subprocess.on("exit", (code) => { - if (code !== null && code !== 0) { - clearTimeout(timer) - resume(Effect.fail(new Error(`Browser open failed with exit code ${code}`))) - } - }) - }), - ), - Effect.catch(() => { - log.warn("failed to open browser, user must open URL manually", { mcpName }) - return bus.publish(BrowserOpenFailed, { mcpName, url: result.authorizationUrl }).pipe(Effect.ignore) - }), - ) - - const code = yield* Effect.promise(() => callbackPromise) - - const storedState = yield* auth.getOAuthState(mcpName) - if (storedState !== result.oauthState) { - yield* auth.clearOAuthState(mcpName) - throw new Error("OAuth state mismatch - potential CSRF attack") - } - yield* auth.clearOAuthState(mcpName) - return yield* finishAuth(mcpName, code) - }) - - const finishAuth = Effect.fn("MCP.finishAuth")(function* (mcpName: string, authorizationCode: string) { - const transport = pendingOAuthTransports.get(mcpName) - if (!transport) throw new Error(`No pending OAuth flow for MCP server: ${mcpName}`) - - const result = yield* Effect.tryPromise({ - try: () => transport.finishAuth(authorizationCode).then(() => true as const), - catch: (error) => { - log.error("failed to finish oauth", { mcpName, error }) - return error - }, - }).pipe(Effect.option) - - if (Option.isNone(result)) { - return { status: "failed", error: "OAuth completion failed" } as Status - } - - yield* auth.clearCodeVerifier(mcpName) - pendingOAuthTransports.delete(mcpName) - - const mcpConfig = yield* getMcpConfig(mcpName) - if (!mcpConfig) return { status: "failed", error: "MCP config not found after auth" } as Status - - return yield* createAndStore(mcpName, mcpConfig) - }) - - const removeAuth = Effect.fn("MCP.removeAuth")(function* (mcpName: string) { - yield* auth.remove(mcpName) - McpOAuthCallback.cancelPending(mcpName) - pendingOAuthTransports.delete(mcpName) - log.info("removed oauth credentials", { mcpName }) - }) - - const supportsOAuth = Effect.fn("MCP.supportsOAuth")(function* (mcpName: string) { - const mcpConfig = yield* getMcpConfig(mcpName) - if (!mcpConfig) return false - return mcpConfig.type === "remote" && mcpConfig.oauth !== false - }) - - const hasStoredTokens = Effect.fn("MCP.hasStoredTokens")(function* (mcpName: string) { - const entry = yield* auth.get(mcpName) - return !!entry?.tokens - }) - - const getAuthStatus = Effect.fn("MCP.getAuthStatus")(function* (mcpName: string) { - const entry = yield* auth.get(mcpName) - if (!entry?.tokens) return "not_authenticated" as AuthStatus - const expired = yield* auth.isTokenExpired(mcpName) - return (expired ? "expired" : "authenticated") as AuthStatus - }) - - return Service.of({ - status, - clients, - tools, - prompts, - resources, - add, - connect, - disconnect, - getPrompt, - readResource, - startAuth, - authenticate, - finishAuth, - removeAuth, - supportsOAuth, - hasStoredTokens, - getAuthStatus, - }) - }), - ) - - export type AuthStatus = "authenticated" | "expired" | "not_authenticated" - - // --- Per-service runtime --- - - export const defaultLayer = layer.pipe( - Layer.provide(McpAuth.layer), - Layer.provide(Bus.layer), - Layer.provide(Config.defaultLayer), - Layer.provide(CrossSpawnSpawner.defaultLayer), - Layer.provide(AppFileSystem.defaultLayer), - ) -} +export * as MCP from "./mcp" diff --git a/packages/opencode/src/mcp/mcp.ts b/packages/opencode/src/mcp/mcp.ts new file mode 100644 index 0000000000..1e3288682e --- /dev/null +++ b/packages/opencode/src/mcp/mcp.ts @@ -0,0 +1,928 @@ +import { dynamicTool, type Tool, jsonSchema, type JSONSchema7 } from "ai" +import { Client } from "@modelcontextprotocol/sdk/client/index.js" +import { StreamableHTTPClientTransport } from "@modelcontextprotocol/sdk/client/streamableHttp.js" +import { SSEClientTransport } from "@modelcontextprotocol/sdk/client/sse.js" +import { StdioClientTransport } from "@modelcontextprotocol/sdk/client/stdio.js" +import { UnauthorizedError } from "@modelcontextprotocol/sdk/client/auth.js" +import { + CallToolResultSchema, + type Tool as MCPToolDef, + ToolListChangedNotificationSchema, +} from "@modelcontextprotocol/sdk/types.js" +import { Config } from "../config" +import { Log } from "../util/log" +import { NamedError } from "@opencode-ai/shared/util/error" +import z from "zod/v4" +import { Instance } from "../project/instance" +import { Installation } from "../installation" +import { withTimeout } from "@/util/timeout" +import { AppFileSystem } from "@opencode-ai/shared/filesystem" +import { McpOAuthProvider } from "./oauth-provider" +import { McpOAuthCallback } from "./oauth-callback" +import { McpAuth } from "./auth" +import { BusEvent } from "../bus/bus-event" +import { Bus } from "@/bus" +import { TuiEvent } from "@/cli/cmd/tui/event" +import open from "open" +import { Effect, Exit, Layer, Option, Context, Stream } from "effect" +import { EffectBridge } from "@/effect/bridge" +import { InstanceState } from "@/effect/instance-state" +import { ChildProcess, ChildProcessSpawner } from "effect/unstable/process" +import * as CrossSpawnSpawner from "@/effect/cross-spawn-spawner" + +const log = Log.create({ service: "mcp" }) +const DEFAULT_TIMEOUT = 30_000 + +export const Resource = z + .object({ + name: z.string(), + uri: z.string(), + description: z.string().optional(), + mimeType: z.string().optional(), + client: z.string(), + }) + .meta({ ref: "McpResource" }) +export type Resource = z.infer + +export const ToolsChanged = BusEvent.define( + "mcp.tools.changed", + z.object({ + server: z.string(), + }), +) + +export const BrowserOpenFailed = BusEvent.define( + "mcp.browser.open.failed", + z.object({ + mcpName: z.string(), + url: z.string(), + }), +) + +export const Failed = NamedError.create( + "MCPFailed", + z.object({ + name: z.string(), + }), +) + +type MCPClient = Client + +export const Status = z + .discriminatedUnion("status", [ + z + .object({ + status: z.literal("connected"), + }) + .meta({ + ref: "MCPStatusConnected", + }), + z + .object({ + status: z.literal("disabled"), + }) + .meta({ + ref: "MCPStatusDisabled", + }), + z + .object({ + status: z.literal("failed"), + error: z.string(), + }) + .meta({ + ref: "MCPStatusFailed", + }), + z + .object({ + status: z.literal("needs_auth"), + }) + .meta({ + ref: "MCPStatusNeedsAuth", + }), + z + .object({ + status: z.literal("needs_client_registration"), + error: z.string(), + }) + .meta({ + ref: "MCPStatusNeedsClientRegistration", + }), + ]) + .meta({ + ref: "MCPStatus", + }) +export type Status = z.infer + +// Store transports for OAuth servers to allow finishing auth +type TransportWithAuth = StreamableHTTPClientTransport | SSEClientTransport +const pendingOAuthTransports = new Map() + +// Prompt cache types +type PromptInfo = Awaited>["prompts"][number] +type ResourceInfo = Awaited>["resources"][number] +type McpEntry = NonNullable[string] + +function isMcpConfigured(entry: McpEntry): entry is Config.Mcp { + return typeof entry === "object" && entry !== null && "type" in entry +} + +const sanitize = (s: string) => s.replace(/[^a-zA-Z0-9_-]/g, "_") + +// Convert MCP tool definition to AI SDK Tool type +function convertMcpTool(mcpTool: MCPToolDef, client: MCPClient, timeout?: number): Tool { + const inputSchema = mcpTool.inputSchema + + // Spread first, then override type to ensure it's always "object" + const schema: JSONSchema7 = { + ...(inputSchema as JSONSchema7), + type: "object", + properties: (inputSchema.properties ?? {}) as JSONSchema7["properties"], + additionalProperties: false, + } + + return dynamicTool({ + description: mcpTool.description ?? "", + inputSchema: jsonSchema(schema), + execute: async (args: unknown) => { + return client.callTool( + { + name: mcpTool.name, + arguments: (args || {}) as Record, + }, + CallToolResultSchema, + { + resetTimeoutOnProgress: true, + timeout, + }, + ) + }, + }) +} + +function defs(key: string, client: MCPClient, timeout?: number) { + return Effect.tryPromise({ + try: () => withTimeout(client.listTools(), timeout ?? DEFAULT_TIMEOUT), + catch: (err) => (err instanceof Error ? err : new Error(String(err))), + }).pipe( + Effect.map((result) => result.tools), + Effect.catch((err) => { + log.error("failed to get tools from client", { key, error: err }) + return Effect.succeed(undefined) + }), + ) +} + +function fetchFromClient( + clientName: string, + client: Client, + listFn: (c: Client) => Promise, + label: string, +) { + return Effect.tryPromise({ + try: () => listFn(client), + catch: (e: any) => { + log.error(`failed to get ${label}`, { clientName, error: e.message }) + return e + }, + }).pipe( + Effect.map((items) => { + const out: Record = {} + const sanitizedClient = sanitize(clientName) + for (const item of items) { + out[sanitizedClient + ":" + sanitize(item.name)] = { ...item, client: clientName } + } + return out + }), + Effect.orElseSucceed(() => undefined), + ) +} + +interface CreateResult { + mcpClient?: MCPClient + status: Status + defs?: MCPToolDef[] +} + +interface AuthResult { + authorizationUrl: string + oauthState: string + client?: MCPClient +} + +// --- Effect Service --- + +interface State { + status: Record + clients: Record + defs: Record +} + +export interface Interface { + readonly status: () => Effect.Effect> + readonly clients: () => Effect.Effect> + readonly tools: () => Effect.Effect> + readonly prompts: () => Effect.Effect> + readonly resources: () => Effect.Effect> + readonly add: (name: string, mcp: Config.Mcp) => Effect.Effect<{ status: Record | Status }> + readonly connect: (name: string) => Effect.Effect + readonly disconnect: (name: string) => Effect.Effect + readonly getPrompt: ( + clientName: string, + name: string, + args?: Record, + ) => Effect.Effect> | undefined> + readonly readResource: ( + clientName: string, + resourceUri: string, + ) => Effect.Effect> | undefined> + readonly startAuth: (mcpName: string) => Effect.Effect<{ authorizationUrl: string; oauthState: string }> + readonly authenticate: (mcpName: string) => Effect.Effect + readonly finishAuth: (mcpName: string, authorizationCode: string) => Effect.Effect + readonly removeAuth: (mcpName: string) => Effect.Effect + readonly supportsOAuth: (mcpName: string) => Effect.Effect + readonly hasStoredTokens: (mcpName: string) => Effect.Effect + readonly getAuthStatus: (mcpName: string) => Effect.Effect +} + +export class Service extends Context.Service()("@opencode/MCP") {} + +export const layer = Layer.effect( + Service, + Effect.gen(function* () { + const spawner = yield* ChildProcessSpawner.ChildProcessSpawner + const auth = yield* McpAuth.Service + const bus = yield* Bus.Service + + type Transport = StdioClientTransport | StreamableHTTPClientTransport | SSEClientTransport + + /** + * Connect a client via the given transport with resource safety: + * on failure the transport is closed; on success the caller owns it. + */ + const connectTransport = (transport: Transport, timeout: number) => + Effect.acquireUseRelease( + Effect.succeed(transport), + (t) => + Effect.tryPromise({ + try: () => { + const client = new Client({ name: "opencode", version: Installation.VERSION }) + return withTimeout(client.connect(t), timeout).then(() => client) + }, + catch: (e) => (e instanceof Error ? e : new Error(String(e))), + }), + (t, exit) => (Exit.isFailure(exit) ? Effect.tryPromise(() => t.close()).pipe(Effect.ignore) : Effect.void), + ) + + const DISABLED_RESULT: CreateResult = { status: { status: "disabled" } } + + const connectRemote = Effect.fn("MCP.connectRemote")(function* ( + key: string, + mcp: Config.Mcp & { type: "remote" }, + ) { + const oauthDisabled = mcp.oauth === false + const oauthConfig = typeof mcp.oauth === "object" ? mcp.oauth : undefined + let authProvider: McpOAuthProvider | undefined + + if (!oauthDisabled) { + authProvider = new McpOAuthProvider( + key, + mcp.url, + { + clientId: oauthConfig?.clientId, + clientSecret: oauthConfig?.clientSecret, + scope: oauthConfig?.scope, + redirectUri: oauthConfig?.redirectUri, + }, + { + onRedirect: async (url) => { + log.info("oauth redirect requested", { key, url: url.toString() }) + }, + }, + auth, + ) + } + + const transports: Array<{ name: string; transport: TransportWithAuth }> = [ + { + name: "StreamableHTTP", + transport: new StreamableHTTPClientTransport(new URL(mcp.url), { + authProvider, + requestInit: mcp.headers ? { headers: mcp.headers } : undefined, + }), + }, + { + name: "SSE", + transport: new SSEClientTransport(new URL(mcp.url), { + authProvider, + requestInit: mcp.headers ? { headers: mcp.headers } : undefined, + }), + }, + ] + + const connectTimeout = mcp.timeout ?? DEFAULT_TIMEOUT + let lastStatus: Status | undefined + + for (const { name, transport } of transports) { + const result = yield* connectTransport(transport, connectTimeout).pipe( + Effect.map((client) => ({ client, transportName: name })), + Effect.catch((error) => { + const lastError = error instanceof Error ? error : new Error(String(error)) + const isAuthError = + error instanceof UnauthorizedError || (authProvider && lastError.message.includes("OAuth")) + + if (isAuthError) { + log.info("mcp server requires authentication", { key, transport: name }) + + if (lastError.message.includes("registration") || lastError.message.includes("client_id")) { + lastStatus = { + status: "needs_client_registration" as const, + error: "Server does not support dynamic client registration. Please provide clientId in config.", + } + return bus + .publish(TuiEvent.ToastShow, { + title: "MCP Authentication Required", + message: `Server "${key}" requires a pre-registered client ID. Add clientId to your config.`, + variant: "warning", + duration: 8000, + }) + .pipe(Effect.ignore, Effect.as(undefined)) + } else { + pendingOAuthTransports.set(key, transport) + lastStatus = { status: "needs_auth" as const } + return bus + .publish(TuiEvent.ToastShow, { + title: "MCP Authentication Required", + message: `Server "${key}" requires authentication. Run: opencode mcp auth ${key}`, + variant: "warning", + duration: 8000, + }) + .pipe(Effect.ignore, Effect.as(undefined)) + } + } + + log.debug("transport connection failed", { + key, + transport: name, + url: mcp.url, + error: lastError.message, + }) + lastStatus = { status: "failed" as const, error: lastError.message } + return Effect.succeed(undefined) + }), + ) + if (result) { + log.info("connected", { key, transport: result.transportName }) + return { client: result.client as MCPClient | undefined, status: { status: "connected" } as Status } + } + // If this was an auth error, stop trying other transports + if (lastStatus?.status === "needs_auth" || lastStatus?.status === "needs_client_registration") break + } + + return { + client: undefined as MCPClient | undefined, + status: (lastStatus ?? { status: "failed", error: "Unknown error" }) as Status, + } + }) + + const connectLocal = Effect.fn("MCP.connectLocal")(function* (key: string, mcp: Config.Mcp & { type: "local" }) { + const [cmd, ...args] = mcp.command + const cwd = Instance.directory + const transport = new StdioClientTransport({ + stderr: "pipe", + command: cmd, + args, + cwd, + env: { + ...process.env, + ...(cmd === "opencode" ? { BUN_BE_BUN: "1" } : {}), + ...mcp.environment, + }, + }) + transport.stderr?.on("data", (chunk: Buffer) => { + log.info(`mcp stderr: ${chunk.toString()}`, { key }) + }) + + const connectTimeout = mcp.timeout ?? DEFAULT_TIMEOUT + return yield* connectTransport(transport, connectTimeout).pipe( + Effect.map((client): { client: MCPClient | undefined; status: Status } => ({ + client, + status: { status: "connected" }, + })), + Effect.catch((error): Effect.Effect<{ client: MCPClient | undefined; status: Status }> => { + const msg = error instanceof Error ? error.message : String(error) + log.error("local mcp startup failed", { key, command: mcp.command, cwd, error: msg }) + return Effect.succeed({ client: undefined, status: { status: "failed", error: msg } }) + }), + ) + }) + + const create = Effect.fn("MCP.create")(function* (key: string, mcp: Config.Mcp) { + if (mcp.enabled === false) { + log.info("mcp server disabled", { key }) + return DISABLED_RESULT + } + + log.info("found", { key, type: mcp.type }) + + const { client: mcpClient, status } = + mcp.type === "remote" + ? yield* connectRemote(key, mcp as Config.Mcp & { type: "remote" }) + : yield* connectLocal(key, mcp as Config.Mcp & { type: "local" }) + + if (!mcpClient) { + return { status } satisfies CreateResult + } + + const listed = yield* defs(key, mcpClient, mcp.timeout) + if (!listed) { + yield* Effect.tryPromise(() => mcpClient.close()).pipe(Effect.ignore) + return { status: { status: "failed", error: "Failed to get tools" } } satisfies CreateResult + } + + log.info("create() successfully created client", { key, toolCount: listed.length }) + return { mcpClient, status, defs: listed } satisfies CreateResult + }) + const cfgSvc = yield* Config.Service + + const descendants = Effect.fnUntraced( + function* (pid: number) { + if (process.platform === "win32") return [] as number[] + const pids: number[] = [] + const queue = [pid] + while (queue.length > 0) { + const current = queue.shift()! + const handle = yield* spawner.spawn( + ChildProcess.make("pgrep", ["-P", String(current)], { stdin: "ignore" }), + ) + const text = yield* Stream.mkString(Stream.decodeText(handle.stdout)) + yield* handle.exitCode + for (const tok of text.split("\n")) { + const cpid = parseInt(tok, 10) + if (!isNaN(cpid) && !pids.includes(cpid)) { + pids.push(cpid) + queue.push(cpid) + } + } + } + return pids + }, + Effect.scoped, + Effect.catch(() => Effect.succeed([] as number[])), + ) + + function watch(s: State, name: string, client: MCPClient, bridge: EffectBridge.Shape, timeout?: number) { + client.setNotificationHandler(ToolListChangedNotificationSchema, async () => { + log.info("tools list changed notification received", { server: name }) + if (s.clients[name] !== client || s.status[name]?.status !== "connected") return + + const listed = await bridge.promise(defs(name, client, timeout)) + if (!listed) return + if (s.clients[name] !== client || s.status[name]?.status !== "connected") return + + s.defs[name] = listed + await bridge.promise(bus.publish(ToolsChanged, { server: name }).pipe(Effect.ignore)) + }) + } + + const state = yield* InstanceState.make( + Effect.fn("MCP.state")(function* () { + const cfg = yield* cfgSvc.get() + const bridge = yield* EffectBridge.make() + const config = cfg.mcp ?? {} + const s: State = { + status: {}, + clients: {}, + defs: {}, + } + + yield* Effect.forEach( + Object.entries(config), + ([key, mcp]) => + Effect.gen(function* () { + if (!isMcpConfigured(mcp)) { + log.error("Ignoring MCP config entry without type", { key }) + return + } + + if (mcp.enabled === false) { + s.status[key] = { status: "disabled" } + return + } + + const result = yield* create(key, mcp).pipe(Effect.catch(() => Effect.void)) + if (!result) return + + s.status[key] = result.status + if (result.mcpClient) { + s.clients[key] = result.mcpClient + s.defs[key] = result.defs! + watch(s, key, result.mcpClient, bridge, mcp.timeout) + } + }), + { concurrency: "unbounded" }, + ) + + yield* Effect.addFinalizer(() => + Effect.gen(function* () { + yield* Effect.forEach( + Object.values(s.clients), + (client) => + Effect.gen(function* () { + const pid = (client.transport as any)?.pid + if (typeof pid === "number") { + const pids = yield* descendants(pid) + for (const dpid of pids) { + try { + process.kill(dpid, "SIGTERM") + } catch {} + } + } + yield* Effect.tryPromise(() => client.close()).pipe(Effect.ignore) + }), + { concurrency: "unbounded" }, + ) + pendingOAuthTransports.clear() + }), + ) + + return s + }), + ) + + function closeClient(s: State, name: string) { + const client = s.clients[name] + delete s.defs[name] + if (!client) return Effect.void + return Effect.tryPromise(() => client.close()).pipe(Effect.ignore) + } + + const storeClient = Effect.fnUntraced(function* ( + s: State, + name: string, + client: MCPClient, + listed: MCPToolDef[], + timeout?: number, + ) { + const bridge = yield* EffectBridge.make() + yield* closeClient(s, name) + s.status[name] = { status: "connected" } + s.clients[name] = client + s.defs[name] = listed + watch(s, name, client, bridge, timeout) + return s.status[name] + }) + + const status = Effect.fn("MCP.status")(function* () { + const s = yield* InstanceState.get(state) + + const cfg = yield* cfgSvc.get() + const config = cfg.mcp ?? {} + const result: Record = {} + + for (const [key, mcp] of Object.entries(config)) { + if (!isMcpConfigured(mcp)) continue + result[key] = s.status[key] ?? { status: "disabled" } + } + + return result + }) + + const clients = Effect.fn("MCP.clients")(function* () { + const s = yield* InstanceState.get(state) + return s.clients + }) + + const createAndStore = Effect.fn("MCP.createAndStore")(function* (name: string, mcp: Config.Mcp) { + const s = yield* InstanceState.get(state) + const result = yield* create(name, mcp) + + s.status[name] = result.status + if (!result.mcpClient) { + yield* closeClient(s, name) + delete s.clients[name] + return result.status + } + + return yield* storeClient(s, name, result.mcpClient, result.defs!, mcp.timeout) + }) + + const add = Effect.fn("MCP.add")(function* (name: string, mcp: Config.Mcp) { + yield* createAndStore(name, mcp) + const s = yield* InstanceState.get(state) + return { status: s.status } + }) + + const connect = Effect.fn("MCP.connect")(function* (name: string) { + const mcp = yield* getMcpConfig(name) + if (!mcp) { + log.error("MCP config not found or invalid", { name }) + return + } + yield* createAndStore(name, { ...mcp, enabled: true }) + }) + + const disconnect = Effect.fn("MCP.disconnect")(function* (name: string) { + const s = yield* InstanceState.get(state) + yield* closeClient(s, name) + delete s.clients[name] + s.status[name] = { status: "disabled" } + }) + + const tools = Effect.fn("MCP.tools")(function* () { + const result: Record = {} + const s = yield* InstanceState.get(state) + + const cfg = yield* cfgSvc.get() + const config = cfg.mcp ?? {} + const defaultTimeout = cfg.experimental?.mcp_timeout + + const connectedClients = Object.entries(s.clients).filter( + ([clientName]) => s.status[clientName]?.status === "connected", + ) + + yield* Effect.forEach( + connectedClients, + ([clientName, client]) => + Effect.gen(function* () { + const mcpConfig = config[clientName] + const entry = mcpConfig && isMcpConfigured(mcpConfig) ? mcpConfig : undefined + + const listed = s.defs[clientName] + if (!listed) { + log.warn("missing cached tools for connected server", { clientName }) + return + } + + const timeout = entry?.timeout ?? defaultTimeout + for (const mcpTool of listed) { + result[sanitize(clientName) + "_" + sanitize(mcpTool.name)] = convertMcpTool(mcpTool, client, timeout) + } + }), + { concurrency: "unbounded" }, + ) + return result + }) + + function collectFromConnected( + s: State, + listFn: (c: Client) => Promise, + label: string, + ) { + return Effect.forEach( + Object.entries(s.clients).filter(([name]) => s.status[name]?.status === "connected"), + ([clientName, client]) => + fetchFromClient(clientName, client, listFn, label).pipe(Effect.map((items) => Object.entries(items ?? {}))), + { concurrency: "unbounded" }, + ).pipe(Effect.map((results) => Object.fromEntries(results.flat()))) + } + + const prompts = Effect.fn("MCP.prompts")(function* () { + const s = yield* InstanceState.get(state) + return yield* collectFromConnected(s, (c) => c.listPrompts().then((r) => r.prompts), "prompts") + }) + + const resources = Effect.fn("MCP.resources")(function* () { + const s = yield* InstanceState.get(state) + return yield* collectFromConnected(s, (c) => c.listResources().then((r) => r.resources), "resources") + }) + + const withClient = Effect.fnUntraced(function* ( + clientName: string, + fn: (client: MCPClient) => Promise, + label: string, + meta?: Record, + ) { + const s = yield* InstanceState.get(state) + const client = s.clients[clientName] + if (!client) { + log.warn(`client not found for ${label}`, { clientName }) + return undefined + } + return yield* Effect.tryPromise({ + try: () => fn(client), + catch: (e: any) => { + log.error(`failed to ${label}`, { clientName, ...meta, error: e?.message }) + return e + }, + }).pipe(Effect.orElseSucceed(() => undefined)) + }) + + const getPrompt = Effect.fn("MCP.getPrompt")(function* ( + clientName: string, + name: string, + args?: Record, + ) { + return yield* withClient(clientName, (client) => client.getPrompt({ name, arguments: args }), "getPrompt", { + promptName: name, + }) + }) + + const readResource = Effect.fn("MCP.readResource")(function* (clientName: string, resourceUri: string) { + return yield* withClient(clientName, (client) => client.readResource({ uri: resourceUri }), "readResource", { + resourceUri, + }) + }) + + const getMcpConfig = Effect.fnUntraced(function* (mcpName: string) { + const cfg = yield* cfgSvc.get() + const mcpConfig = cfg.mcp?.[mcpName] + if (!mcpConfig || !isMcpConfigured(mcpConfig)) return undefined + return mcpConfig + }) + + const startAuth = Effect.fn("MCP.startAuth")(function* (mcpName: string) { + const mcpConfig = yield* getMcpConfig(mcpName) + if (!mcpConfig) throw new Error(`MCP server ${mcpName} not found or disabled`) + if (mcpConfig.type !== "remote") throw new Error(`MCP server ${mcpName} is not a remote server`) + if (mcpConfig.oauth === false) throw new Error(`MCP server ${mcpName} has OAuth explicitly disabled`) + + // OAuth config is optional - if not provided, we'll use auto-discovery + const oauthConfig = typeof mcpConfig.oauth === "object" ? mcpConfig.oauth : undefined + + // Start the callback server with custom redirectUri if configured + yield* Effect.promise(() => McpOAuthCallback.ensureRunning(oauthConfig?.redirectUri)) + + const oauthState = Array.from(crypto.getRandomValues(new Uint8Array(32))) + .map((b) => b.toString(16).padStart(2, "0")) + .join("") + yield* auth.updateOAuthState(mcpName, oauthState) + let capturedUrl: URL | undefined + const authProvider = new McpOAuthProvider( + mcpName, + mcpConfig.url, + { + clientId: oauthConfig?.clientId, + clientSecret: oauthConfig?.clientSecret, + scope: oauthConfig?.scope, + redirectUri: oauthConfig?.redirectUri, + }, + { + onRedirect: async (url) => { + capturedUrl = url + }, + }, + auth, + ) + + const transport = new StreamableHTTPClientTransport(new URL(mcpConfig.url), { authProvider }) + + return yield* Effect.tryPromise({ + try: () => { + const client = new Client({ name: "opencode", version: Installation.VERSION }) + return client + .connect(transport) + .then(() => ({ authorizationUrl: "", oauthState, client }) satisfies AuthResult) + }, + catch: (error) => error, + }).pipe( + Effect.catch((error) => { + if (error instanceof UnauthorizedError && capturedUrl) { + pendingOAuthTransports.set(mcpName, transport) + return Effect.succeed({ authorizationUrl: capturedUrl.toString(), oauthState } satisfies AuthResult) + } + return Effect.die(error) + }), + ) + }) + + const authenticate = Effect.fn("MCP.authenticate")(function* (mcpName: string) { + const result = yield* startAuth(mcpName) + if (!result.authorizationUrl) { + const client = "client" in result ? result.client : undefined + const mcpConfig = yield* getMcpConfig(mcpName) + if (!mcpConfig) { + yield* Effect.tryPromise(() => client?.close() ?? Promise.resolve()).pipe(Effect.ignore) + return { status: "failed", error: "MCP config not found after auth" } as Status + } + + const listed = client ? yield* defs(mcpName, client, mcpConfig.timeout) : undefined + if (!client || !listed) { + yield* Effect.tryPromise(() => client?.close() ?? Promise.resolve()).pipe(Effect.ignore) + return { status: "failed", error: "Failed to get tools" } as Status + } + + const s = yield* InstanceState.get(state) + yield* auth.clearOAuthState(mcpName) + return yield* storeClient(s, mcpName, client, listed, mcpConfig.timeout) + } + + log.info("opening browser for oauth", { mcpName, url: result.authorizationUrl, state: result.oauthState }) + + const callbackPromise = McpOAuthCallback.waitForCallback(result.oauthState, mcpName) + + yield* Effect.tryPromise(() => open(result.authorizationUrl)).pipe( + Effect.flatMap((subprocess) => + Effect.callback((resume) => { + const timer = setTimeout(() => resume(Effect.void), 500) + subprocess.on("error", (err) => { + clearTimeout(timer) + resume(Effect.fail(err)) + }) + subprocess.on("exit", (code) => { + if (code !== null && code !== 0) { + clearTimeout(timer) + resume(Effect.fail(new Error(`Browser open failed with exit code ${code}`))) + } + }) + }), + ), + Effect.catch(() => { + log.warn("failed to open browser, user must open URL manually", { mcpName }) + return bus.publish(BrowserOpenFailed, { mcpName, url: result.authorizationUrl }).pipe(Effect.ignore) + }), + ) + + const code = yield* Effect.promise(() => callbackPromise) + + const storedState = yield* auth.getOAuthState(mcpName) + if (storedState !== result.oauthState) { + yield* auth.clearOAuthState(mcpName) + throw new Error("OAuth state mismatch - potential CSRF attack") + } + yield* auth.clearOAuthState(mcpName) + return yield* finishAuth(mcpName, code) + }) + + const finishAuth = Effect.fn("MCP.finishAuth")(function* (mcpName: string, authorizationCode: string) { + const transport = pendingOAuthTransports.get(mcpName) + if (!transport) throw new Error(`No pending OAuth flow for MCP server: ${mcpName}`) + + const result = yield* Effect.tryPromise({ + try: () => transport.finishAuth(authorizationCode).then(() => true as const), + catch: (error) => { + log.error("failed to finish oauth", { mcpName, error }) + return error + }, + }).pipe(Effect.option) + + if (Option.isNone(result)) { + return { status: "failed", error: "OAuth completion failed" } as Status + } + + yield* auth.clearCodeVerifier(mcpName) + pendingOAuthTransports.delete(mcpName) + + const mcpConfig = yield* getMcpConfig(mcpName) + if (!mcpConfig) return { status: "failed", error: "MCP config not found after auth" } as Status + + return yield* createAndStore(mcpName, mcpConfig) + }) + + const removeAuth = Effect.fn("MCP.removeAuth")(function* (mcpName: string) { + yield* auth.remove(mcpName) + McpOAuthCallback.cancelPending(mcpName) + pendingOAuthTransports.delete(mcpName) + log.info("removed oauth credentials", { mcpName }) + }) + + const supportsOAuth = Effect.fn("MCP.supportsOAuth")(function* (mcpName: string) { + const mcpConfig = yield* getMcpConfig(mcpName) + if (!mcpConfig) return false + return mcpConfig.type === "remote" && mcpConfig.oauth !== false + }) + + const hasStoredTokens = Effect.fn("MCP.hasStoredTokens")(function* (mcpName: string) { + const entry = yield* auth.get(mcpName) + return !!entry?.tokens + }) + + const getAuthStatus = Effect.fn("MCP.getAuthStatus")(function* (mcpName: string) { + const entry = yield* auth.get(mcpName) + if (!entry?.tokens) return "not_authenticated" as AuthStatus + const expired = yield* auth.isTokenExpired(mcpName) + return (expired ? "expired" : "authenticated") as AuthStatus + }) + + return Service.of({ + status, + clients, + tools, + prompts, + resources, + add, + connect, + disconnect, + getPrompt, + readResource, + startAuth, + authenticate, + finishAuth, + removeAuth, + supportsOAuth, + hasStoredTokens, + getAuthStatus, + }) + }), +) + +export type AuthStatus = "authenticated" | "expired" | "not_authenticated" + +// --- Per-service runtime --- + +export const defaultLayer = layer.pipe( + Layer.provide(McpAuth.layer), + Layer.provide(Bus.layer), + Layer.provide(Config.defaultLayer), + Layer.provide(CrossSpawnSpawner.defaultLayer), + Layer.provide(AppFileSystem.defaultLayer), +)