From a494eea6d40f2156a3070181be26eb3659ee5121 Mon Sep 17 00:00:00 2001 From: Vincent Koc Date: Mon, 27 Apr 2026 01:23:34 -0700 Subject: [PATCH] fix(gateway): defer hook request handler imports --- CHANGELOG.md | 1 + src/gateway/server-http.test-harness.ts | 3 +- src/gateway/server-http.ts | 450 +------------------ src/gateway/server-live-state.ts | 2 +- src/gateway/server-reload-handlers.ts | 4 +- src/gateway/server-runtime-state.ts | 40 +- src/gateway/server.impl.ts | 2 +- src/gateway/server/hook-client-ip-config.ts | 9 + src/gateway/server/hooks-request-handler.ts | 450 +++++++++++++++++++ src/gateway/server/hooks.agent-trust.test.ts | 2 +- src/gateway/server/hooks.ts | 10 +- 11 files changed, 496 insertions(+), 477 deletions(-) create mode 100644 src/gateway/server/hook-client-ip-config.ts create mode 100644 src/gateway/server/hooks-request-handler.ts diff --git a/CHANGELOG.md b/CHANGELOG.md index 7606eaa0734..1b49e0de7f6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -89,6 +89,7 @@ Docs: https://docs.openclaw.ai - Gateway/startup: resolve channel runtime helpers asynchronously only when an enabled/configured channel starts, so no-channel Gateway boot skips auto-reply, media, pairing, and outbound channel helper imports. Thanks @vincentkoc. - Gateway/startup: lazy-load HTTP auth, canvas auth, and plugin route scope helpers from their request paths so Gateway bind no longer pays those utility graphs during boot. Thanks @vincentkoc. - Gateway/startup: defer isolated cron runner imports until `/hooks/agent` dispatch so Gateway boot skips the agent-turn runtime on installs that only need normal HTTP bind. Thanks @vincentkoc. +- Gateway/startup: split hook request parsing into a request-path module and load the Gateway hook dispatcher only when a request matches the hooks base path, keeping hook mapping and throttle helpers off plain HTTP bind. Thanks @vincentkoc. - CLI/Gateway: use a parse-only config snapshot for plain `gateway status` reads and reuse same-path service config context so status no longer spends tens of seconds in full config validation before printing. Thanks @vincentkoc. - Lobster/Gateway: memoize repeated Ajv schema compilation before loading the embedded Lobster runtime so scheduled workflows and `llm.invoke` loops stop growing gateway heap on content-identical schemas. Fixes #71148. Thanks @cmi525, @vsolaz, and @vincentkoc. - Codex harness: normalize cached input tokens before session/context accounting so prompt cache reads are not double-counted in `/status`, `session_status`, or persisted `sessionEntry.totalTokens`. Fixes #69298. Thanks @richardmqq. diff --git a/src/gateway/server-http.test-harness.ts b/src/gateway/server-http.test-harness.ts index e78480bdbdb..7c757a0622d 100644 --- a/src/gateway/server-http.test-harness.ts +++ b/src/gateway/server-http.test-harness.ts @@ -4,7 +4,8 @@ import type { createSubsystemLogger } from "../logging/subsystem.js"; import type { ResolvedGatewayAuth } from "./auth.js"; import { createGatewayRequest, createHooksConfig } from "./hooks-test-helpers.js"; import { canonicalizePathVariant, isProtectedPluginRoutePath } from "./security-path.js"; -import { createGatewayHttpServer, createHooksRequestHandler } from "./server-http.js"; +import { createGatewayHttpServer } from "./server-http.js"; +import { createHooksRequestHandler } from "./server/hooks-request-handler.js"; import { withTempConfig } from "./test-temp-config.js"; export type GatewayHttpServer = ReturnType; diff --git a/src/gateway/server-http.ts b/src/gateway/server-http.ts index 47214d3cedb..970b4b49fbc 100644 --- a/src/gateway/server-http.ts +++ b/src/gateway/server-http.ts @@ -1,4 +1,3 @@ -import { createHash } from "node:crypto"; import { createServer as createHttpServer, type Server as HttpServer, @@ -22,16 +21,8 @@ import { createDiagnosticTraceContext, runWithDiagnosticTraceContext, } from "../infra/diagnostic-trace-context.js"; -import type { createSubsystemLogger } from "../logging/subsystem.js"; -import { resolveHookExternalContentSource as resolveHookExternalContentSourceFromSession } from "../security/external-content.js"; -import { safeEqualSecret } from "../security/secret-equal.js"; import { resolveAssistantIdentity } from "./assistant-identity.js"; -import { - AUTH_RATE_LIMIT_SCOPE_HOOK_AUTH, - createAuthRateLimiter, - normalizeRateLimitClientIp, - type AuthRateLimiter, -} from "./auth-rate-limit.js"; +import type { AuthRateLimiter } from "./auth-rate-limit.js"; import { authorizeHttpGatewayConnect, isLocalDirectRequest, @@ -40,31 +31,10 @@ import { } from "./auth.js"; import { normalizeCanvasScopedUrl } from "./canvas-capability.js"; import type { ControlUiRootState } from "./control-ui.js"; -import { applyHookMappings } from "./hooks-mapping.js"; -import { - extractHookToken, - getHookAgentPolicyError, - getHookChannelError, - getHookSessionKeyPrefixError, - type HookAgentDispatchPayload, - type HooksConfigResolved, - isHookAgentAllowed, - isSessionKeyAllowedByPrefix, - normalizeAgentPayload, - normalizeHookHeaders, - resolveHookIdempotencyKey, - normalizeWakePayload, - readJsonBody, - normalizeHookDispatchSessionKey, - resolveHookSessionKey, - resolveHookTargetAgentId, - resolveHookChannel, - resolveHookDeliver, -} from "./hooks.js"; import type { AuthorizedGatewayHttpRequest } from "./http-auth-utils.js"; import { sendGatewayAuthFailure, setDefaultSecurityHeaders } from "./http-common.js"; import { resolveRequestClientIp } from "./net.js"; -import { DEDUPE_MAX, DEDUPE_TTL_MS } from "./server-constants.js"; +import type { HooksRequestHandler } from "./server/hooks-request-handler.js"; import { isProtectedPluginRoutePathFromContext, resolvePluginRoutePathContext, @@ -75,7 +45,6 @@ import type { ReadinessChecker } from "./server/readiness.js"; import type { GatewayWsClient } from "./server/ws-types.js"; import { VOICECLAW_REALTIME_PATH } from "./voiceclaw-realtime/paths.js"; -type SubsystemLogger = ReturnType; type PluginHttpRequestHandler = ( req: IncomingMessage, res: ServerResponse, @@ -87,9 +56,6 @@ type PluginHttpRequestHandler = ( }, ) => Promise; -const HOOK_AUTH_FAILURE_LIMIT = 20; -const HOOK_AUTH_FAILURE_WINDOW_MS = 60_000; - let identityAvatarModulePromise: Promise | undefined; let controlUiModulePromise: Promise | undefined; let embeddingsHttpModulePromise: Promise | undefined; @@ -183,47 +149,6 @@ function getPluginRouteRuntimeScopesModule() { return pluginRouteRuntimeScopesModulePromise; } -type HookDispatchers = { - dispatchWakeHook: (value: { text: string; mode: "now" | "next-heartbeat" }) => void; - dispatchAgentHook: (value: HookAgentDispatchPayload) => string; -}; - -function resolveMappedHookExternalContentSource(params: { - subPath: string; - payload: Record; - sessionKey: string; -}) { - const payloadSource = - typeof params.payload.source === "string" ? params.payload.source.trim().toLowerCase() : ""; - if (params.subPath === "gmail" || payloadSource === "gmail") { - return "gmail" as const; - } - return resolveHookExternalContentSourceFromSession(params.sessionKey) ?? "webhook"; -} - -export type HookClientIpConfig = Readonly<{ - trustedProxies?: string[]; - allowRealIpFallback?: boolean; -}>; - -type HookReplayEntry = { - ts: number; - runId: string; -}; - -type HookReplayScope = { - pathKey: string; - token: string | undefined; - idempotencyKey?: string; - dispatchScope: Record; -}; - -function sendJson(res: ServerResponse, status: number, body: unknown) { - res.statusCode = status; - res.setHeader("Content-Type", "application/json; charset=utf-8"); - res.end(JSON.stringify(body)); -} - const GATEWAY_PROBE_STATUS_BY_PATH = new Map([ ["/health", "live"], ["/healthz", "live"], @@ -439,8 +364,6 @@ function writeUpgradeServiceUnavailable(socket: { write: (chunk: string) => void ); } -export type HooksRequestHandler = (req: IncomingMessage, res: ServerResponse) => Promise; - type GatewayHttpRequestStage = { name: string; run: () => Promise | boolean; @@ -544,375 +467,6 @@ function buildPluginRequestStages(params: { ]; } -export function createHooksRequestHandler( - opts: { - getHooksConfig: () => HooksConfigResolved | null; - bindHost: string; - port: number; - logHooks: SubsystemLogger; - getClientIpConfig?: () => HookClientIpConfig; - } & HookDispatchers, -): HooksRequestHandler { - const { getHooksConfig, logHooks, dispatchAgentHook, dispatchWakeHook, getClientIpConfig } = opts; - const hookReplayCache = new Map(); - const hookAuthLimiter = createAuthRateLimiter({ - maxAttempts: HOOK_AUTH_FAILURE_LIMIT, - windowMs: HOOK_AUTH_FAILURE_WINDOW_MS, - lockoutMs: HOOK_AUTH_FAILURE_WINDOW_MS, - exemptLoopback: false, - // Handler lifetimes are tied to gateway runtime/tests; skip background timer fanout. - pruneIntervalMs: 0, - }); - - const resolveHookClientKey = (req: IncomingMessage): string => { - const clientIpConfig = getClientIpConfig?.(); - const clientIp = - resolveRequestClientIp( - req, - clientIpConfig?.trustedProxies, - clientIpConfig?.allowRealIpFallback === true, - ) ?? req.socket?.remoteAddress; - return normalizeRateLimitClientIp(clientIp); - }; - - const pruneHookReplayCache = (now: number) => { - const cutoff = now - DEDUPE_TTL_MS; - for (const [key, entry] of hookReplayCache) { - if (entry.ts < cutoff) { - hookReplayCache.delete(key); - } - } - while (hookReplayCache.size > DEDUPE_MAX) { - const oldestKey = hookReplayCache.keys().next().value; - if (!oldestKey) { - break; - } - hookReplayCache.delete(oldestKey); - } - }; - - const buildHookReplayCacheKey = (params: HookReplayScope): string | undefined => { - const idem = params.idempotencyKey?.trim(); - if (!idem) { - return undefined; - } - const tokenFingerprint = createHash("sha256") - .update(params.token ?? "", "utf8") - .digest("hex"); - const idempotencyFingerprint = createHash("sha256").update(idem, "utf8").digest("hex"); - const scopeFingerprint = createHash("sha256") - .update( - JSON.stringify({ - pathKey: params.pathKey, - dispatchScope: params.dispatchScope, - }), - "utf8", - ) - .digest("hex"); - return `${tokenFingerprint}:${scopeFingerprint}:${idempotencyFingerprint}`; - }; - - const resolveCachedHookRunId = (key: string | undefined, now: number): string | undefined => { - if (!key) { - return undefined; - } - pruneHookReplayCache(now); - const cached = hookReplayCache.get(key); - if (!cached) { - return undefined; - } - hookReplayCache.delete(key); - hookReplayCache.set(key, cached); - return cached.runId; - }; - - const rememberHookRunId = (key: string | undefined, runId: string, now: number) => { - if (!key) { - return; - } - hookReplayCache.delete(key); - hookReplayCache.set(key, { ts: now, runId }); - pruneHookReplayCache(now); - }; - - return async (req, res) => { - const hooksConfig = getHooksConfig(); - if (!hooksConfig) { - return false; - } - // Only pathname/search are used here; keep the base host fixed so bind-host - // representation (e.g. IPv6 wildcards) cannot break request parsing. - const url = new URL(req.url ?? "/", "http://localhost"); - const basePath = hooksConfig.basePath; - if (url.pathname !== basePath && !url.pathname.startsWith(`${basePath}/`)) { - return false; - } - - if (url.searchParams.has("token")) { - res.statusCode = 400; - res.setHeader("Content-Type", "text/plain; charset=utf-8"); - res.end( - "Hook token must be provided via Authorization: Bearer or X-OpenClaw-Token header (query parameters are not allowed).", - ); - return true; - } - - if (req.method !== "POST") { - res.statusCode = 405; - res.setHeader("Allow", "POST"); - res.setHeader("Content-Type", "text/plain; charset=utf-8"); - res.end("Method Not Allowed"); - return true; - } - - const token = extractHookToken(req); - const clientKey = resolveHookClientKey(req); - if (!safeEqualSecret(token, hooksConfig.token)) { - const throttle = hookAuthLimiter.check(clientKey, AUTH_RATE_LIMIT_SCOPE_HOOK_AUTH); - if (!throttle.allowed) { - const retryAfter = throttle.retryAfterMs > 0 ? Math.ceil(throttle.retryAfterMs / 1000) : 1; - res.statusCode = 429; - res.setHeader("Retry-After", String(retryAfter)); - res.setHeader("Content-Type", "text/plain; charset=utf-8"); - res.end("Too Many Requests"); - logHooks.warn(`hook auth throttled for ${clientKey}; retry-after=${retryAfter}s`); - return true; - } - hookAuthLimiter.recordFailure(clientKey, AUTH_RATE_LIMIT_SCOPE_HOOK_AUTH); - res.statusCode = 401; - res.setHeader("Content-Type", "text/plain; charset=utf-8"); - res.end("Unauthorized"); - return true; - } - hookAuthLimiter.reset(clientKey, AUTH_RATE_LIMIT_SCOPE_HOOK_AUTH); - - const subPath = url.pathname.slice(basePath.length).replace(/^\/+/, ""); - if (!subPath) { - res.statusCode = 404; - res.setHeader("Content-Type", "text/plain; charset=utf-8"); - res.end("Not Found"); - return true; - } - - const body = await readJsonBody(req, hooksConfig.maxBodyBytes); - if (!body.ok) { - const status = - body.error === "payload too large" - ? 413 - : body.error === "request body timeout" - ? 408 - : 400; - sendJson(res, status, { ok: false, error: body.error }); - return true; - } - - const payload = typeof body.value === "object" && body.value !== null ? body.value : {}; - const headers = normalizeHookHeaders(req); - const idempotencyKey = resolveHookIdempotencyKey({ - payload: payload as Record, - headers, - }); - const now = Date.now(); - - if (subPath === "wake") { - const normalized = normalizeWakePayload(payload as Record); - if (!normalized.ok) { - sendJson(res, 400, { ok: false, error: normalized.error }); - return true; - } - dispatchWakeHook(normalized.value); - sendJson(res, 200, { ok: true, mode: normalized.value.mode }); - return true; - } - - if (subPath === "agent") { - const normalized = normalizeAgentPayload(payload as Record); - if (!normalized.ok) { - sendJson(res, 400, { ok: false, error: normalized.error }); - return true; - } - if (!isHookAgentAllowed(hooksConfig, normalized.value.agentId)) { - sendJson(res, 400, { ok: false, error: getHookAgentPolicyError() }); - return true; - } - const sessionKey = resolveHookSessionKey({ - hooksConfig, - source: "request", - sessionKey: normalized.value.sessionKey, - }); - if (!sessionKey.ok) { - sendJson(res, 400, { ok: false, error: sessionKey.error }); - return true; - } - const targetAgentId = resolveHookTargetAgentId(hooksConfig, normalized.value.agentId); - const replayKey = buildHookReplayCacheKey({ - pathKey: "agent", - token, - idempotencyKey, - dispatchScope: { - agentId: targetAgentId ?? null, - sessionKey: - normalized.value.sessionKey ?? hooksConfig.sessionPolicy.defaultSessionKey ?? null, - message: normalized.value.message, - name: normalized.value.name, - wakeMode: normalized.value.wakeMode, - deliver: normalized.value.deliver, - channel: normalized.value.channel, - to: normalized.value.to ?? null, - model: normalized.value.model ?? null, - thinking: normalized.value.thinking ?? null, - timeoutSeconds: normalized.value.timeoutSeconds ?? null, - }, - }); - const cachedRunId = resolveCachedHookRunId(replayKey, now); - if (cachedRunId) { - sendJson(res, 200, { ok: true, runId: cachedRunId }); - return true; - } - const normalizedDispatchSessionKey = normalizeHookDispatchSessionKey({ - sessionKey: sessionKey.value, - targetAgentId, - }); - const allowedPrefixes = hooksConfig.sessionPolicy.allowedSessionKeyPrefixes; - if ( - allowedPrefixes && - !isSessionKeyAllowedByPrefix(normalizedDispatchSessionKey, allowedPrefixes) - ) { - sendJson(res, 400, { ok: false, error: getHookSessionKeyPrefixError(allowedPrefixes) }); - return true; - } - const runId = dispatchAgentHook({ - ...normalized.value, - idempotencyKey, - sessionKey: normalizedDispatchSessionKey, - agentId: targetAgentId, - externalContentSource: "webhook", - }); - rememberHookRunId(replayKey, runId, now); - sendJson(res, 200, { ok: true, runId }); - return true; - } - - if (hooksConfig.mappings.length > 0) { - try { - const mapped = await applyHookMappings(hooksConfig.mappings, { - payload: payload as Record, - headers, - url, - path: subPath, - }); - if (mapped) { - if (!mapped.ok) { - sendJson(res, 400, { ok: false, error: mapped.error }); - return true; - } - if (mapped.action === null) { - res.statusCode = 204; - res.end(); - return true; - } - if (mapped.action.kind === "wake") { - dispatchWakeHook({ - text: mapped.action.text, - mode: mapped.action.mode, - }); - sendJson(res, 200, { ok: true, mode: mapped.action.mode }); - return true; - } - const channel = resolveHookChannel(mapped.action.channel); - if (!channel) { - sendJson(res, 400, { ok: false, error: getHookChannelError() }); - return true; - } - if (!isHookAgentAllowed(hooksConfig, mapped.action.agentId)) { - sendJson(res, 400, { ok: false, error: getHookAgentPolicyError() }); - return true; - } - const sessionKey = resolveHookSessionKey({ - hooksConfig, - source: - mapped.action.sessionKeySource === "static" ? "mapping-static" : "mapping-templated", - sessionKey: mapped.action.sessionKey, - }); - if (!sessionKey.ok) { - sendJson(res, 400, { ok: false, error: sessionKey.error }); - return true; - } - const targetAgentId = resolveHookTargetAgentId(hooksConfig, mapped.action.agentId); - const normalizedDispatchSessionKey = normalizeHookDispatchSessionKey({ - sessionKey: sessionKey.value, - targetAgentId, - }); - const allowedPrefixes = hooksConfig.sessionPolicy.allowedSessionKeyPrefixes; - if ( - allowedPrefixes && - !isSessionKeyAllowedByPrefix(normalizedDispatchSessionKey, allowedPrefixes) - ) { - sendJson(res, 400, { ok: false, error: getHookSessionKeyPrefixError(allowedPrefixes) }); - return true; - } - const replayKey = buildHookReplayCacheKey({ - pathKey: subPath || "mapping", - token, - idempotencyKey, - dispatchScope: { - agentId: targetAgentId ?? null, - sessionKey: - mapped.action.sessionKey ?? hooksConfig.sessionPolicy.defaultSessionKey ?? null, - message: mapped.action.message, - name: mapped.action.name ?? "Hook", - wakeMode: mapped.action.wakeMode, - deliver: resolveHookDeliver(mapped.action.deliver), - channel, - to: mapped.action.to ?? null, - model: mapped.action.model ?? null, - thinking: mapped.action.thinking ?? null, - timeoutSeconds: mapped.action.timeoutSeconds ?? null, - }, - }); - const cachedRunId = resolveCachedHookRunId(replayKey, now); - if (cachedRunId) { - sendJson(res, 200, { ok: true, runId: cachedRunId }); - return true; - } - const runId = dispatchAgentHook({ - message: mapped.action.message, - name: mapped.action.name ?? "Hook", - idempotencyKey, - agentId: targetAgentId, - wakeMode: mapped.action.wakeMode, - sessionKey: normalizedDispatchSessionKey, - deliver: resolveHookDeliver(mapped.action.deliver), - channel, - to: mapped.action.to, - model: mapped.action.model, - thinking: mapped.action.thinking, - timeoutSeconds: mapped.action.timeoutSeconds, - allowUnsafeExternalContent: mapped.action.allowUnsafeExternalContent, - externalContentSource: resolveMappedHookExternalContentSource({ - subPath, - payload: payload as Record, - sessionKey: sessionKey.value, - }), - }); - rememberHookRunId(replayKey, runId, now); - sendJson(res, 200, { ok: true, runId }); - return true; - } - } catch (err) { - logHooks.warn(`hook mapping failed: ${String(err)}`); - sendJson(res, 500, { ok: false, error: "hook mapping failed" }); - return true; - } - } - - res.statusCode = 404; - res.setHeader("Content-Type", "text/plain; charset=utf-8"); - res.end("Not Found"); - return true; - }; -} - export function createGatewayHttpServer(opts: { canvasHost: CanvasHostHandler | null; clients: Set; diff --git a/src/gateway/server-live-state.ts b/src/gateway/server-live-state.ts index d608acc83ff..3fe4dcde9c3 100644 --- a/src/gateway/server-live-state.ts +++ b/src/gateway/server-live-state.ts @@ -1,11 +1,11 @@ import type { PluginServicesHandle } from "../plugins/services.js"; import type { HooksConfigResolved } from "./hooks.js"; import type { GatewayCronState } from "./server-cron.js"; -import type { HookClientIpConfig } from "./server-http.js"; import { createGatewayServerMutableState, type GatewayServerMutableState, } from "./server-runtime-handles.js"; +import type { HookClientIpConfig } from "./server/hooks-request-handler.js"; export type GatewayServerLiveState = GatewayServerMutableState & { hooksConfig: HooksConfigResolved | null; diff --git a/src/gateway/server-reload-handlers.ts b/src/gateway/server-reload-handlers.ts index e308eca6c7e..59f8304967f 100644 --- a/src/gateway/server-reload-handlers.ts +++ b/src/gateway/server-reload-handlers.ts @@ -30,7 +30,6 @@ import type { ChannelKind } from "./config-reload-plan.js"; import { startGatewayConfigReloader, type GatewayReloadPlan } from "./config-reload.js"; import { resolveHooksConfig } from "./hooks.js"; import { buildGatewayCronService, type GatewayCronState } from "./server-cron.js"; -import type { HookClientIpConfig } from "./server-http.js"; import { type GatewayChannelManager, startGatewayChannelHealthMonitor, @@ -43,7 +42,8 @@ import { type SharedGatewaySessionGenerationState, } from "./server-shared-auth-generation.js"; import type { ActivateRuntimeSecrets } from "./server-startup-config.js"; -import { resolveHookClientIpConfig } from "./server/hooks.js"; +import { resolveHookClientIpConfig } from "./server/hook-client-ip-config.js"; +import type { HookClientIpConfig } from "./server/hooks-request-handler.js"; type GatewayHotReloadState = { hooksConfig: ReturnType; diff --git a/src/gateway/server-runtime-state.ts b/src/gateway/server-runtime-state.ts index 563c19f8672..75b611fa9ff 100644 --- a/src/gateway/server-runtime-state.ts +++ b/src/gateway/server-runtime-state.ts @@ -28,13 +28,9 @@ import { createToolEventRecipientRegistry, } from "./server-chat-state.js"; import { MAX_PREAUTH_PAYLOAD_BYTES } from "./server-constants.js"; -import { - attachGatewayUpgradeHandler, - createGatewayHttpServer, - type HookClientIpConfig, -} from "./server-http.js"; +import { attachGatewayUpgradeHandler, createGatewayHttpServer } from "./server-http.js"; import type { DedupeEntry } from "./server-shared.js"; -import { createGatewayHooksRequestHandler } from "./server/hooks.js"; +import type { HookClientIpConfig, HooksRequestHandler } from "./server/hooks-request-handler.js"; import { listenGatewayHttpServer } from "./server/http-listen.js"; import type { PluginRoutePathContext } from "./server/plugins-http/path-context.js"; import { shouldEnforceGatewayAuthForPluginPath } from "./server/plugins-http/route-auth.js"; @@ -145,14 +141,30 @@ export async function createGatewayRuntimeState(params: { const clients = new Set(); const { broadcast, broadcastToConnIds } = createGatewayBroadcaster({ clients }); - const handleHooksRequest = createGatewayHooksRequestHandler({ - deps: params.deps, - getHooksConfig: params.hooksConfig, - getClientIpConfig: params.getHookClientIpConfig, - bindHost: params.bindHost, - port: params.port, - logHooks: params.logHooks, - }); + let loadedHooksRequestHandler: HooksRequestHandler | null = null; + const handleHooksRequest: HooksRequestHandler = async (req, res) => { + const hooksConfig = params.hooksConfig(); + if (!hooksConfig) { + return false; + } + const url = new URL(req.url ?? "/", "http://localhost"); + const basePath = hooksConfig.basePath; + if (url.pathname !== basePath && !url.pathname.startsWith(`${basePath}/`)) { + return false; + } + if (!loadedHooksRequestHandler) { + const { createGatewayHooksRequestHandler } = await import("./server/hooks.js"); + loadedHooksRequestHandler = createGatewayHooksRequestHandler({ + deps: params.deps, + getHooksConfig: params.hooksConfig, + getClientIpConfig: params.getHookClientIpConfig, + bindHost: params.bindHost, + port: params.port, + logHooks: params.logHooks, + }); + } + return await loadedHooksRequestHandler(req, res); + }; let loadedPluginRequestHandler: GatewayPluginRequestHandler | null = null; const handlePluginRequest: GatewayPluginRequestHandler = async ( diff --git a/src/gateway/server.impl.ts b/src/gateway/server.impl.ts index 5c3d97bb7fa..f78ff27c83b 100644 --- a/src/gateway/server.impl.ts +++ b/src/gateway/server.impl.ts @@ -90,7 +90,7 @@ import { incrementPresenceVersion, refreshGatewayHealthSnapshot, } from "./server/health-state.js"; -import { resolveHookClientIpConfig } from "./server/hooks.js"; +import { resolveHookClientIpConfig } from "./server/hook-client-ip-config.js"; import { createReadinessChecker } from "./server/readiness.js"; import { loadGatewayTlsRuntime } from "./server/tls.js"; import { resolveSharedGatewaySessionGeneration } from "./server/ws-shared-generation.js"; diff --git a/src/gateway/server/hook-client-ip-config.ts b/src/gateway/server/hook-client-ip-config.ts new file mode 100644 index 00000000000..6856caf0da1 --- /dev/null +++ b/src/gateway/server/hook-client-ip-config.ts @@ -0,0 +1,9 @@ +import type { OpenClawConfig } from "../../config/types.openclaw.js"; +import type { HookClientIpConfig } from "./hooks-request-handler.js"; + +export function resolveHookClientIpConfig(cfg: OpenClawConfig): HookClientIpConfig { + return { + trustedProxies: cfg.gateway?.trustedProxies, + allowRealIpFallback: cfg.gateway?.allowRealIpFallback === true, + }; +} diff --git a/src/gateway/server/hooks-request-handler.ts b/src/gateway/server/hooks-request-handler.ts new file mode 100644 index 00000000000..c90d8e2d352 --- /dev/null +++ b/src/gateway/server/hooks-request-handler.ts @@ -0,0 +1,450 @@ +import { createHash } from "node:crypto"; +import type { IncomingMessage, ServerResponse } from "node:http"; +import type { createSubsystemLogger } from "../../logging/subsystem.js"; +import { resolveHookExternalContentSource as resolveHookExternalContentSourceFromSession } from "../../security/external-content.js"; +import { safeEqualSecret } from "../../security/secret-equal.js"; +import { + AUTH_RATE_LIMIT_SCOPE_HOOK_AUTH, + createAuthRateLimiter, + normalizeRateLimitClientIp, +} from "../auth-rate-limit.js"; +import { applyHookMappings } from "../hooks-mapping.js"; +import { + extractHookToken, + getHookAgentPolicyError, + getHookChannelError, + getHookSessionKeyPrefixError, + type HookAgentDispatchPayload, + type HooksConfigResolved, + isHookAgentAllowed, + isSessionKeyAllowedByPrefix, + normalizeAgentPayload, + normalizeHookDispatchSessionKey, + normalizeHookHeaders, + normalizeWakePayload, + readJsonBody, + resolveHookChannel, + resolveHookDeliver, + resolveHookIdempotencyKey, + resolveHookSessionKey, + resolveHookTargetAgentId, +} from "../hooks.js"; +import { resolveRequestClientIp } from "../net.js"; +import { DEDUPE_MAX, DEDUPE_TTL_MS } from "../server-constants.js"; + +type SubsystemLogger = ReturnType; + +const HOOK_AUTH_FAILURE_LIMIT = 20; +const HOOK_AUTH_FAILURE_WINDOW_MS = 60_000; + +export type HookClientIpConfig = Readonly<{ + trustedProxies?: string[]; + allowRealIpFallback?: boolean; +}>; + +export type HooksRequestHandler = (req: IncomingMessage, res: ServerResponse) => Promise; + +type HookDispatchers = { + dispatchWakeHook: (value: { text: string; mode: "now" | "next-heartbeat" }) => void; + dispatchAgentHook: (value: HookAgentDispatchPayload) => string; +}; + +type HookReplayEntry = { + ts: number; + runId: string; +}; + +type HookReplayScope = { + pathKey: string; + token: string | undefined; + idempotencyKey?: string; + dispatchScope: Record; +}; + +function sendJson(res: ServerResponse, status: number, body: unknown) { + res.statusCode = status; + res.setHeader("Content-Type", "application/json; charset=utf-8"); + res.end(JSON.stringify(body)); +} + +function resolveMappedHookExternalContentSource(params: { + subPath: string; + payload: Record; + sessionKey: string; +}) { + const payloadSource = + typeof params.payload.source === "string" ? params.payload.source.trim().toLowerCase() : ""; + if (params.subPath === "gmail" || payloadSource === "gmail") { + return "gmail" as const; + } + return resolveHookExternalContentSourceFromSession(params.sessionKey) ?? "webhook"; +} + +export function createHooksRequestHandler( + opts: { + getHooksConfig: () => HooksConfigResolved | null; + bindHost: string; + port: number; + logHooks: SubsystemLogger; + getClientIpConfig?: () => HookClientIpConfig; + } & HookDispatchers, +): HooksRequestHandler { + const { getHooksConfig, logHooks, dispatchAgentHook, dispatchWakeHook, getClientIpConfig } = opts; + const hookReplayCache = new Map(); + const hookAuthLimiter = createAuthRateLimiter({ + maxAttempts: HOOK_AUTH_FAILURE_LIMIT, + windowMs: HOOK_AUTH_FAILURE_WINDOW_MS, + lockoutMs: HOOK_AUTH_FAILURE_WINDOW_MS, + exemptLoopback: false, + // Handler lifetimes are tied to gateway runtime/tests; skip background timer fanout. + pruneIntervalMs: 0, + }); + + const resolveHookClientKey = (req: IncomingMessage): string => { + const clientIpConfig = getClientIpConfig?.(); + const clientIp = + resolveRequestClientIp( + req, + clientIpConfig?.trustedProxies, + clientIpConfig?.allowRealIpFallback === true, + ) ?? req.socket?.remoteAddress; + return normalizeRateLimitClientIp(clientIp); + }; + + const pruneHookReplayCache = (now: number) => { + const cutoff = now - DEDUPE_TTL_MS; + for (const [key, entry] of hookReplayCache) { + if (entry.ts < cutoff) { + hookReplayCache.delete(key); + } + } + while (hookReplayCache.size > DEDUPE_MAX) { + const oldestKey = hookReplayCache.keys().next().value; + if (!oldestKey) { + break; + } + hookReplayCache.delete(oldestKey); + } + }; + + const buildHookReplayCacheKey = (params: HookReplayScope): string | undefined => { + const idem = params.idempotencyKey?.trim(); + if (!idem) { + return undefined; + } + const tokenFingerprint = createHash("sha256") + .update(params.token ?? "", "utf8") + .digest("hex"); + const idempotencyFingerprint = createHash("sha256").update(idem, "utf8").digest("hex"); + const scopeFingerprint = createHash("sha256") + .update( + JSON.stringify({ + pathKey: params.pathKey, + dispatchScope: params.dispatchScope, + }), + "utf8", + ) + .digest("hex"); + return `${tokenFingerprint}:${scopeFingerprint}:${idempotencyFingerprint}`; + }; + + const resolveCachedHookRunId = (key: string | undefined, now: number): string | undefined => { + if (!key) { + return undefined; + } + pruneHookReplayCache(now); + const cached = hookReplayCache.get(key); + if (!cached) { + return undefined; + } + hookReplayCache.delete(key); + hookReplayCache.set(key, cached); + return cached.runId; + }; + + const rememberHookRunId = (key: string | undefined, runId: string, now: number) => { + if (!key) { + return; + } + hookReplayCache.delete(key); + hookReplayCache.set(key, { ts: now, runId }); + pruneHookReplayCache(now); + }; + + return async (req, res) => { + const hooksConfig = getHooksConfig(); + if (!hooksConfig) { + return false; + } + // Only pathname/search are used here; keep the base host fixed so bind-host + // representation (e.g. IPv6 wildcards) cannot break request parsing. + const url = new URL(req.url ?? "/", "http://localhost"); + const basePath = hooksConfig.basePath; + if (url.pathname !== basePath && !url.pathname.startsWith(`${basePath}/`)) { + return false; + } + + if (url.searchParams.has("token")) { + res.statusCode = 400; + res.setHeader("Content-Type", "text/plain; charset=utf-8"); + res.end( + "Hook token must be provided via Authorization: Bearer or X-OpenClaw-Token header (query parameters are not allowed).", + ); + return true; + } + + if (req.method !== "POST") { + res.statusCode = 405; + res.setHeader("Allow", "POST"); + res.setHeader("Content-Type", "text/plain; charset=utf-8"); + res.end("Method Not Allowed"); + return true; + } + + const token = extractHookToken(req); + const clientKey = resolveHookClientKey(req); + if (!safeEqualSecret(token, hooksConfig.token)) { + const throttle = hookAuthLimiter.check(clientKey, AUTH_RATE_LIMIT_SCOPE_HOOK_AUTH); + if (!throttle.allowed) { + const retryAfter = throttle.retryAfterMs > 0 ? Math.ceil(throttle.retryAfterMs / 1000) : 1; + res.statusCode = 429; + res.setHeader("Retry-After", String(retryAfter)); + res.setHeader("Content-Type", "text/plain; charset=utf-8"); + res.end("Too Many Requests"); + logHooks.warn(`hook auth throttled for ${clientKey}; retry-after=${retryAfter}s`); + return true; + } + hookAuthLimiter.recordFailure(clientKey, AUTH_RATE_LIMIT_SCOPE_HOOK_AUTH); + res.statusCode = 401; + res.setHeader("Content-Type", "text/plain; charset=utf-8"); + res.end("Unauthorized"); + return true; + } + hookAuthLimiter.reset(clientKey, AUTH_RATE_LIMIT_SCOPE_HOOK_AUTH); + + const subPath = url.pathname.slice(basePath.length).replace(/^\/+/, ""); + if (!subPath) { + res.statusCode = 404; + res.setHeader("Content-Type", "text/plain; charset=utf-8"); + res.end("Not Found"); + return true; + } + + const body = await readJsonBody(req, hooksConfig.maxBodyBytes); + if (!body.ok) { + const status = + body.error === "payload too large" + ? 413 + : body.error === "request body timeout" + ? 408 + : 400; + sendJson(res, status, { ok: false, error: body.error }); + return true; + } + + const payload = typeof body.value === "object" && body.value !== null ? body.value : {}; + const headers = normalizeHookHeaders(req); + const idempotencyKey = resolveHookIdempotencyKey({ + payload: payload as Record, + headers, + }); + const now = Date.now(); + + if (subPath === "wake") { + const normalized = normalizeWakePayload(payload as Record); + if (!normalized.ok) { + sendJson(res, 400, { ok: false, error: normalized.error }); + return true; + } + dispatchWakeHook(normalized.value); + sendJson(res, 200, { ok: true, mode: normalized.value.mode }); + return true; + } + + if (subPath === "agent") { + const normalized = normalizeAgentPayload(payload as Record); + if (!normalized.ok) { + sendJson(res, 400, { ok: false, error: normalized.error }); + return true; + } + if (!isHookAgentAllowed(hooksConfig, normalized.value.agentId)) { + sendJson(res, 400, { ok: false, error: getHookAgentPolicyError() }); + return true; + } + const sessionKey = resolveHookSessionKey({ + hooksConfig, + source: "request", + sessionKey: normalized.value.sessionKey, + }); + if (!sessionKey.ok) { + sendJson(res, 400, { ok: false, error: sessionKey.error }); + return true; + } + const targetAgentId = resolveHookTargetAgentId(hooksConfig, normalized.value.agentId); + const replayKey = buildHookReplayCacheKey({ + pathKey: "agent", + token, + idempotencyKey, + dispatchScope: { + agentId: targetAgentId ?? null, + sessionKey: + normalized.value.sessionKey ?? hooksConfig.sessionPolicy.defaultSessionKey ?? null, + message: normalized.value.message, + name: normalized.value.name, + wakeMode: normalized.value.wakeMode, + deliver: normalized.value.deliver, + channel: normalized.value.channel, + to: normalized.value.to ?? null, + model: normalized.value.model ?? null, + thinking: normalized.value.thinking ?? null, + timeoutSeconds: normalized.value.timeoutSeconds ?? null, + }, + }); + const cachedRunId = resolveCachedHookRunId(replayKey, now); + if (cachedRunId) { + sendJson(res, 200, { ok: true, runId: cachedRunId }); + return true; + } + const normalizedDispatchSessionKey = normalizeHookDispatchSessionKey({ + sessionKey: sessionKey.value, + targetAgentId, + }); + const allowedPrefixes = hooksConfig.sessionPolicy.allowedSessionKeyPrefixes; + if ( + allowedPrefixes && + !isSessionKeyAllowedByPrefix(normalizedDispatchSessionKey, allowedPrefixes) + ) { + sendJson(res, 400, { ok: false, error: getHookSessionKeyPrefixError(allowedPrefixes) }); + return true; + } + const runId = dispatchAgentHook({ + ...normalized.value, + idempotencyKey, + sessionKey: normalizedDispatchSessionKey, + agentId: targetAgentId, + externalContentSource: "webhook", + }); + rememberHookRunId(replayKey, runId, now); + sendJson(res, 200, { ok: true, runId }); + return true; + } + + if (hooksConfig.mappings.length > 0) { + try { + const mapped = await applyHookMappings(hooksConfig.mappings, { + payload: payload as Record, + headers, + url, + path: subPath, + }); + if (mapped) { + if (!mapped.ok) { + sendJson(res, 400, { ok: false, error: mapped.error }); + return true; + } + if (mapped.action === null) { + res.statusCode = 204; + res.end(); + return true; + } + if (mapped.action.kind === "wake") { + dispatchWakeHook({ + text: mapped.action.text, + mode: mapped.action.mode, + }); + sendJson(res, 200, { ok: true, mode: mapped.action.mode }); + return true; + } + const channel = resolveHookChannel(mapped.action.channel); + if (!channel) { + sendJson(res, 400, { ok: false, error: getHookChannelError() }); + return true; + } + if (!isHookAgentAllowed(hooksConfig, mapped.action.agentId)) { + sendJson(res, 400, { ok: false, error: getHookAgentPolicyError() }); + return true; + } + const sessionKey = resolveHookSessionKey({ + hooksConfig, + source: + mapped.action.sessionKeySource === "static" ? "mapping-static" : "mapping-templated", + sessionKey: mapped.action.sessionKey, + }); + if (!sessionKey.ok) { + sendJson(res, 400, { ok: false, error: sessionKey.error }); + return true; + } + const targetAgentId = resolveHookTargetAgentId(hooksConfig, mapped.action.agentId); + const normalizedDispatchSessionKey = normalizeHookDispatchSessionKey({ + sessionKey: sessionKey.value, + targetAgentId, + }); + const allowedPrefixes = hooksConfig.sessionPolicy.allowedSessionKeyPrefixes; + if ( + allowedPrefixes && + !isSessionKeyAllowedByPrefix(normalizedDispatchSessionKey, allowedPrefixes) + ) { + sendJson(res, 400, { ok: false, error: getHookSessionKeyPrefixError(allowedPrefixes) }); + return true; + } + const replayKey = buildHookReplayCacheKey({ + pathKey: subPath || "mapping", + token, + idempotencyKey, + dispatchScope: { + agentId: targetAgentId ?? null, + sessionKey: + mapped.action.sessionKey ?? hooksConfig.sessionPolicy.defaultSessionKey ?? null, + message: mapped.action.message, + name: mapped.action.name ?? "Hook", + wakeMode: mapped.action.wakeMode, + deliver: resolveHookDeliver(mapped.action.deliver), + channel, + to: mapped.action.to ?? null, + model: mapped.action.model ?? null, + thinking: mapped.action.thinking ?? null, + timeoutSeconds: mapped.action.timeoutSeconds ?? null, + }, + }); + const cachedRunId = resolveCachedHookRunId(replayKey, now); + if (cachedRunId) { + sendJson(res, 200, { ok: true, runId: cachedRunId }); + return true; + } + const runId = dispatchAgentHook({ + message: mapped.action.message, + name: mapped.action.name ?? "Hook", + idempotencyKey, + agentId: targetAgentId, + wakeMode: mapped.action.wakeMode, + sessionKey: normalizedDispatchSessionKey, + deliver: resolveHookDeliver(mapped.action.deliver), + channel, + to: mapped.action.to, + model: mapped.action.model, + thinking: mapped.action.thinking, + timeoutSeconds: mapped.action.timeoutSeconds, + allowUnsafeExternalContent: mapped.action.allowUnsafeExternalContent, + externalContentSource: resolveMappedHookExternalContentSource({ + subPath, + payload: payload as Record, + sessionKey: sessionKey.value, + }), + }); + rememberHookRunId(replayKey, runId, now); + sendJson(res, 200, { ok: true, runId }); + return true; + } + } catch (err) { + logHooks.warn(`hook mapping failed: ${String(err)}`); + sendJson(res, 500, { ok: false, error: "hook mapping failed" }); + return true; + } + } + + res.statusCode = 404; + res.setHeader("Content-Type", "text/plain; charset=utf-8"); + res.end("Not Found"); + return true; + }; +} diff --git a/src/gateway/server/hooks.agent-trust.test.ts b/src/gateway/server/hooks.agent-trust.test.ts index 75aaf1a0fec..f17209499bc 100644 --- a/src/gateway/server/hooks.agent-trust.test.ts +++ b/src/gateway/server/hooks.agent-trust.test.ts @@ -24,7 +24,7 @@ vi.mock("../../config/config.js", () => ({ let capturedDispatchAgentHook: ((...args: unknown[]) => unknown) | undefined; -vi.mock("../server-http.js", () => ({ +vi.mock("./hooks-request-handler.js", () => ({ createHooksRequestHandler: vi.fn((opts: Record) => { capturedDispatchAgentHook = opts.dispatchAgentHook as typeof capturedDispatchAgentHook; return vi.fn(); diff --git a/src/gateway/server/hooks.ts b/src/gateway/server/hooks.ts index 85e2c454cc5..abfa7e93fe7 100644 --- a/src/gateway/server/hooks.ts +++ b/src/gateway/server/hooks.ts @@ -3,24 +3,16 @@ import { sanitizeInboundSystemTags } from "../../auto-reply/reply/inbound-text.j import type { CliDeps } from "../../cli/deps.types.js"; import { loadConfig } from "../../config/config.js"; import { resolveMainSessionKeyFromConfig } from "../../config/sessions.js"; -import type { OpenClawConfig } from "../../config/types.openclaw.js"; import type { CronJob } from "../../cron/types.js"; import { requestHeartbeatNow } from "../../infra/heartbeat-wake.js"; import { enqueueSystemEvent } from "../../infra/system-events.js"; import type { createSubsystemLogger } from "../../logging/subsystem.js"; import { normalizeOptionalString } from "../../shared/string-coerce.js"; import { type HookAgentDispatchPayload, type HooksConfigResolved } from "../hooks.js"; -import { createHooksRequestHandler, type HookClientIpConfig } from "../server-http.js"; +import { createHooksRequestHandler, type HookClientIpConfig } from "./hooks-request-handler.js"; type SubsystemLogger = ReturnType; -export function resolveHookClientIpConfig(cfg: OpenClawConfig): HookClientIpConfig { - return { - trustedProxies: cfg.gateway?.trustedProxies, - allowRealIpFallback: cfg.gateway?.allowRealIpFallback === true, - }; -} - export function createGatewayHooksRequestHandler(params: { deps: CliDeps; getHooksConfig: () => HooksConfigResolved | null;