mirror of
https://fastgit.cc/github.com/openclaw/openclaw
synced 2026-05-01 06:36:23 +08:00
fix(gateway): defer hook request handler imports
This commit is contained in:
@@ -89,6 +89,7 @@ Docs: https://docs.openclaw.ai
|
||||
- Gateway/startup: resolve channel runtime helpers asynchronously only when an enabled/configured channel starts, so no-channel Gateway boot skips auto-reply, media, pairing, and outbound channel helper imports. Thanks @vincentkoc.
|
||||
- Gateway/startup: lazy-load HTTP auth, canvas auth, and plugin route scope helpers from their request paths so Gateway bind no longer pays those utility graphs during boot. Thanks @vincentkoc.
|
||||
- Gateway/startup: defer isolated cron runner imports until `/hooks/agent` dispatch so Gateway boot skips the agent-turn runtime on installs that only need normal HTTP bind. Thanks @vincentkoc.
|
||||
- Gateway/startup: split hook request parsing into a request-path module and load the Gateway hook dispatcher only when a request matches the hooks base path, keeping hook mapping and throttle helpers off plain HTTP bind. Thanks @vincentkoc.
|
||||
- CLI/Gateway: use a parse-only config snapshot for plain `gateway status` reads and reuse same-path service config context so status no longer spends tens of seconds in full config validation before printing. Thanks @vincentkoc.
|
||||
- Lobster/Gateway: memoize repeated Ajv schema compilation before loading the embedded Lobster runtime so scheduled workflows and `llm.invoke` loops stop growing gateway heap on content-identical schemas. Fixes #71148. Thanks @cmi525, @vsolaz, and @vincentkoc.
|
||||
- Codex harness: normalize cached input tokens before session/context accounting so prompt cache reads are not double-counted in `/status`, `session_status`, or persisted `sessionEntry.totalTokens`. Fixes #69298. Thanks @richardmqq.
|
||||
|
||||
@@ -4,7 +4,8 @@ import type { createSubsystemLogger } from "../logging/subsystem.js";
|
||||
import type { ResolvedGatewayAuth } from "./auth.js";
|
||||
import { createGatewayRequest, createHooksConfig } from "./hooks-test-helpers.js";
|
||||
import { canonicalizePathVariant, isProtectedPluginRoutePath } from "./security-path.js";
|
||||
import { createGatewayHttpServer, createHooksRequestHandler } from "./server-http.js";
|
||||
import { createGatewayHttpServer } from "./server-http.js";
|
||||
import { createHooksRequestHandler } from "./server/hooks-request-handler.js";
|
||||
import { withTempConfig } from "./test-temp-config.js";
|
||||
|
||||
export type GatewayHttpServer = ReturnType<typeof createGatewayHttpServer>;
|
||||
|
||||
@@ -1,4 +1,3 @@
|
||||
import { createHash } from "node:crypto";
|
||||
import {
|
||||
createServer as createHttpServer,
|
||||
type Server as HttpServer,
|
||||
@@ -22,16 +21,8 @@ import {
|
||||
createDiagnosticTraceContext,
|
||||
runWithDiagnosticTraceContext,
|
||||
} from "../infra/diagnostic-trace-context.js";
|
||||
import type { createSubsystemLogger } from "../logging/subsystem.js";
|
||||
import { resolveHookExternalContentSource as resolveHookExternalContentSourceFromSession } from "../security/external-content.js";
|
||||
import { safeEqualSecret } from "../security/secret-equal.js";
|
||||
import { resolveAssistantIdentity } from "./assistant-identity.js";
|
||||
import {
|
||||
AUTH_RATE_LIMIT_SCOPE_HOOK_AUTH,
|
||||
createAuthRateLimiter,
|
||||
normalizeRateLimitClientIp,
|
||||
type AuthRateLimiter,
|
||||
} from "./auth-rate-limit.js";
|
||||
import type { AuthRateLimiter } from "./auth-rate-limit.js";
|
||||
import {
|
||||
authorizeHttpGatewayConnect,
|
||||
isLocalDirectRequest,
|
||||
@@ -40,31 +31,10 @@ import {
|
||||
} from "./auth.js";
|
||||
import { normalizeCanvasScopedUrl } from "./canvas-capability.js";
|
||||
import type { ControlUiRootState } from "./control-ui.js";
|
||||
import { applyHookMappings } from "./hooks-mapping.js";
|
||||
import {
|
||||
extractHookToken,
|
||||
getHookAgentPolicyError,
|
||||
getHookChannelError,
|
||||
getHookSessionKeyPrefixError,
|
||||
type HookAgentDispatchPayload,
|
||||
type HooksConfigResolved,
|
||||
isHookAgentAllowed,
|
||||
isSessionKeyAllowedByPrefix,
|
||||
normalizeAgentPayload,
|
||||
normalizeHookHeaders,
|
||||
resolveHookIdempotencyKey,
|
||||
normalizeWakePayload,
|
||||
readJsonBody,
|
||||
normalizeHookDispatchSessionKey,
|
||||
resolveHookSessionKey,
|
||||
resolveHookTargetAgentId,
|
||||
resolveHookChannel,
|
||||
resolveHookDeliver,
|
||||
} from "./hooks.js";
|
||||
import type { AuthorizedGatewayHttpRequest } from "./http-auth-utils.js";
|
||||
import { sendGatewayAuthFailure, setDefaultSecurityHeaders } from "./http-common.js";
|
||||
import { resolveRequestClientIp } from "./net.js";
|
||||
import { DEDUPE_MAX, DEDUPE_TTL_MS } from "./server-constants.js";
|
||||
import type { HooksRequestHandler } from "./server/hooks-request-handler.js";
|
||||
import {
|
||||
isProtectedPluginRoutePathFromContext,
|
||||
resolvePluginRoutePathContext,
|
||||
@@ -75,7 +45,6 @@ import type { ReadinessChecker } from "./server/readiness.js";
|
||||
import type { GatewayWsClient } from "./server/ws-types.js";
|
||||
import { VOICECLAW_REALTIME_PATH } from "./voiceclaw-realtime/paths.js";
|
||||
|
||||
type SubsystemLogger = ReturnType<typeof createSubsystemLogger>;
|
||||
type PluginHttpRequestHandler = (
|
||||
req: IncomingMessage,
|
||||
res: ServerResponse,
|
||||
@@ -87,9 +56,6 @@ type PluginHttpRequestHandler = (
|
||||
},
|
||||
) => Promise<boolean>;
|
||||
|
||||
const HOOK_AUTH_FAILURE_LIMIT = 20;
|
||||
const HOOK_AUTH_FAILURE_WINDOW_MS = 60_000;
|
||||
|
||||
let identityAvatarModulePromise: Promise<typeof import("../agents/identity-avatar.js")> | undefined;
|
||||
let controlUiModulePromise: Promise<typeof import("./control-ui.js")> | undefined;
|
||||
let embeddingsHttpModulePromise: Promise<typeof import("./embeddings-http.js")> | undefined;
|
||||
@@ -183,47 +149,6 @@ function getPluginRouteRuntimeScopesModule() {
|
||||
return pluginRouteRuntimeScopesModulePromise;
|
||||
}
|
||||
|
||||
type HookDispatchers = {
|
||||
dispatchWakeHook: (value: { text: string; mode: "now" | "next-heartbeat" }) => void;
|
||||
dispatchAgentHook: (value: HookAgentDispatchPayload) => string;
|
||||
};
|
||||
|
||||
function resolveMappedHookExternalContentSource(params: {
|
||||
subPath: string;
|
||||
payload: Record<string, unknown>;
|
||||
sessionKey: string;
|
||||
}) {
|
||||
const payloadSource =
|
||||
typeof params.payload.source === "string" ? params.payload.source.trim().toLowerCase() : "";
|
||||
if (params.subPath === "gmail" || payloadSource === "gmail") {
|
||||
return "gmail" as const;
|
||||
}
|
||||
return resolveHookExternalContentSourceFromSession(params.sessionKey) ?? "webhook";
|
||||
}
|
||||
|
||||
export type HookClientIpConfig = Readonly<{
|
||||
trustedProxies?: string[];
|
||||
allowRealIpFallback?: boolean;
|
||||
}>;
|
||||
|
||||
type HookReplayEntry = {
|
||||
ts: number;
|
||||
runId: string;
|
||||
};
|
||||
|
||||
type HookReplayScope = {
|
||||
pathKey: string;
|
||||
token: string | undefined;
|
||||
idempotencyKey?: string;
|
||||
dispatchScope: Record<string, unknown>;
|
||||
};
|
||||
|
||||
function sendJson(res: ServerResponse, status: number, body: unknown) {
|
||||
res.statusCode = status;
|
||||
res.setHeader("Content-Type", "application/json; charset=utf-8");
|
||||
res.end(JSON.stringify(body));
|
||||
}
|
||||
|
||||
const GATEWAY_PROBE_STATUS_BY_PATH = new Map<string, "live" | "ready">([
|
||||
["/health", "live"],
|
||||
["/healthz", "live"],
|
||||
@@ -439,8 +364,6 @@ function writeUpgradeServiceUnavailable(socket: { write: (chunk: string) => void
|
||||
);
|
||||
}
|
||||
|
||||
export type HooksRequestHandler = (req: IncomingMessage, res: ServerResponse) => Promise<boolean>;
|
||||
|
||||
type GatewayHttpRequestStage = {
|
||||
name: string;
|
||||
run: () => Promise<boolean> | boolean;
|
||||
@@ -544,375 +467,6 @@ function buildPluginRequestStages(params: {
|
||||
];
|
||||
}
|
||||
|
||||
export function createHooksRequestHandler(
|
||||
opts: {
|
||||
getHooksConfig: () => HooksConfigResolved | null;
|
||||
bindHost: string;
|
||||
port: number;
|
||||
logHooks: SubsystemLogger;
|
||||
getClientIpConfig?: () => HookClientIpConfig;
|
||||
} & HookDispatchers,
|
||||
): HooksRequestHandler {
|
||||
const { getHooksConfig, logHooks, dispatchAgentHook, dispatchWakeHook, getClientIpConfig } = opts;
|
||||
const hookReplayCache = new Map<string, HookReplayEntry>();
|
||||
const hookAuthLimiter = createAuthRateLimiter({
|
||||
maxAttempts: HOOK_AUTH_FAILURE_LIMIT,
|
||||
windowMs: HOOK_AUTH_FAILURE_WINDOW_MS,
|
||||
lockoutMs: HOOK_AUTH_FAILURE_WINDOW_MS,
|
||||
exemptLoopback: false,
|
||||
// Handler lifetimes are tied to gateway runtime/tests; skip background timer fanout.
|
||||
pruneIntervalMs: 0,
|
||||
});
|
||||
|
||||
const resolveHookClientKey = (req: IncomingMessage): string => {
|
||||
const clientIpConfig = getClientIpConfig?.();
|
||||
const clientIp =
|
||||
resolveRequestClientIp(
|
||||
req,
|
||||
clientIpConfig?.trustedProxies,
|
||||
clientIpConfig?.allowRealIpFallback === true,
|
||||
) ?? req.socket?.remoteAddress;
|
||||
return normalizeRateLimitClientIp(clientIp);
|
||||
};
|
||||
|
||||
const pruneHookReplayCache = (now: number) => {
|
||||
const cutoff = now - DEDUPE_TTL_MS;
|
||||
for (const [key, entry] of hookReplayCache) {
|
||||
if (entry.ts < cutoff) {
|
||||
hookReplayCache.delete(key);
|
||||
}
|
||||
}
|
||||
while (hookReplayCache.size > DEDUPE_MAX) {
|
||||
const oldestKey = hookReplayCache.keys().next().value;
|
||||
if (!oldestKey) {
|
||||
break;
|
||||
}
|
||||
hookReplayCache.delete(oldestKey);
|
||||
}
|
||||
};
|
||||
|
||||
const buildHookReplayCacheKey = (params: HookReplayScope): string | undefined => {
|
||||
const idem = params.idempotencyKey?.trim();
|
||||
if (!idem) {
|
||||
return undefined;
|
||||
}
|
||||
const tokenFingerprint = createHash("sha256")
|
||||
.update(params.token ?? "", "utf8")
|
||||
.digest("hex");
|
||||
const idempotencyFingerprint = createHash("sha256").update(idem, "utf8").digest("hex");
|
||||
const scopeFingerprint = createHash("sha256")
|
||||
.update(
|
||||
JSON.stringify({
|
||||
pathKey: params.pathKey,
|
||||
dispatchScope: params.dispatchScope,
|
||||
}),
|
||||
"utf8",
|
||||
)
|
||||
.digest("hex");
|
||||
return `${tokenFingerprint}:${scopeFingerprint}:${idempotencyFingerprint}`;
|
||||
};
|
||||
|
||||
const resolveCachedHookRunId = (key: string | undefined, now: number): string | undefined => {
|
||||
if (!key) {
|
||||
return undefined;
|
||||
}
|
||||
pruneHookReplayCache(now);
|
||||
const cached = hookReplayCache.get(key);
|
||||
if (!cached) {
|
||||
return undefined;
|
||||
}
|
||||
hookReplayCache.delete(key);
|
||||
hookReplayCache.set(key, cached);
|
||||
return cached.runId;
|
||||
};
|
||||
|
||||
const rememberHookRunId = (key: string | undefined, runId: string, now: number) => {
|
||||
if (!key) {
|
||||
return;
|
||||
}
|
||||
hookReplayCache.delete(key);
|
||||
hookReplayCache.set(key, { ts: now, runId });
|
||||
pruneHookReplayCache(now);
|
||||
};
|
||||
|
||||
return async (req, res) => {
|
||||
const hooksConfig = getHooksConfig();
|
||||
if (!hooksConfig) {
|
||||
return false;
|
||||
}
|
||||
// Only pathname/search are used here; keep the base host fixed so bind-host
|
||||
// representation (e.g. IPv6 wildcards) cannot break request parsing.
|
||||
const url = new URL(req.url ?? "/", "http://localhost");
|
||||
const basePath = hooksConfig.basePath;
|
||||
if (url.pathname !== basePath && !url.pathname.startsWith(`${basePath}/`)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
if (url.searchParams.has("token")) {
|
||||
res.statusCode = 400;
|
||||
res.setHeader("Content-Type", "text/plain; charset=utf-8");
|
||||
res.end(
|
||||
"Hook token must be provided via Authorization: Bearer <token> or X-OpenClaw-Token header (query parameters are not allowed).",
|
||||
);
|
||||
return true;
|
||||
}
|
||||
|
||||
if (req.method !== "POST") {
|
||||
res.statusCode = 405;
|
||||
res.setHeader("Allow", "POST");
|
||||
res.setHeader("Content-Type", "text/plain; charset=utf-8");
|
||||
res.end("Method Not Allowed");
|
||||
return true;
|
||||
}
|
||||
|
||||
const token = extractHookToken(req);
|
||||
const clientKey = resolveHookClientKey(req);
|
||||
if (!safeEqualSecret(token, hooksConfig.token)) {
|
||||
const throttle = hookAuthLimiter.check(clientKey, AUTH_RATE_LIMIT_SCOPE_HOOK_AUTH);
|
||||
if (!throttle.allowed) {
|
||||
const retryAfter = throttle.retryAfterMs > 0 ? Math.ceil(throttle.retryAfterMs / 1000) : 1;
|
||||
res.statusCode = 429;
|
||||
res.setHeader("Retry-After", String(retryAfter));
|
||||
res.setHeader("Content-Type", "text/plain; charset=utf-8");
|
||||
res.end("Too Many Requests");
|
||||
logHooks.warn(`hook auth throttled for ${clientKey}; retry-after=${retryAfter}s`);
|
||||
return true;
|
||||
}
|
||||
hookAuthLimiter.recordFailure(clientKey, AUTH_RATE_LIMIT_SCOPE_HOOK_AUTH);
|
||||
res.statusCode = 401;
|
||||
res.setHeader("Content-Type", "text/plain; charset=utf-8");
|
||||
res.end("Unauthorized");
|
||||
return true;
|
||||
}
|
||||
hookAuthLimiter.reset(clientKey, AUTH_RATE_LIMIT_SCOPE_HOOK_AUTH);
|
||||
|
||||
const subPath = url.pathname.slice(basePath.length).replace(/^\/+/, "");
|
||||
if (!subPath) {
|
||||
res.statusCode = 404;
|
||||
res.setHeader("Content-Type", "text/plain; charset=utf-8");
|
||||
res.end("Not Found");
|
||||
return true;
|
||||
}
|
||||
|
||||
const body = await readJsonBody(req, hooksConfig.maxBodyBytes);
|
||||
if (!body.ok) {
|
||||
const status =
|
||||
body.error === "payload too large"
|
||||
? 413
|
||||
: body.error === "request body timeout"
|
||||
? 408
|
||||
: 400;
|
||||
sendJson(res, status, { ok: false, error: body.error });
|
||||
return true;
|
||||
}
|
||||
|
||||
const payload = typeof body.value === "object" && body.value !== null ? body.value : {};
|
||||
const headers = normalizeHookHeaders(req);
|
||||
const idempotencyKey = resolveHookIdempotencyKey({
|
||||
payload: payload as Record<string, unknown>,
|
||||
headers,
|
||||
});
|
||||
const now = Date.now();
|
||||
|
||||
if (subPath === "wake") {
|
||||
const normalized = normalizeWakePayload(payload as Record<string, unknown>);
|
||||
if (!normalized.ok) {
|
||||
sendJson(res, 400, { ok: false, error: normalized.error });
|
||||
return true;
|
||||
}
|
||||
dispatchWakeHook(normalized.value);
|
||||
sendJson(res, 200, { ok: true, mode: normalized.value.mode });
|
||||
return true;
|
||||
}
|
||||
|
||||
if (subPath === "agent") {
|
||||
const normalized = normalizeAgentPayload(payload as Record<string, unknown>);
|
||||
if (!normalized.ok) {
|
||||
sendJson(res, 400, { ok: false, error: normalized.error });
|
||||
return true;
|
||||
}
|
||||
if (!isHookAgentAllowed(hooksConfig, normalized.value.agentId)) {
|
||||
sendJson(res, 400, { ok: false, error: getHookAgentPolicyError() });
|
||||
return true;
|
||||
}
|
||||
const sessionKey = resolveHookSessionKey({
|
||||
hooksConfig,
|
||||
source: "request",
|
||||
sessionKey: normalized.value.sessionKey,
|
||||
});
|
||||
if (!sessionKey.ok) {
|
||||
sendJson(res, 400, { ok: false, error: sessionKey.error });
|
||||
return true;
|
||||
}
|
||||
const targetAgentId = resolveHookTargetAgentId(hooksConfig, normalized.value.agentId);
|
||||
const replayKey = buildHookReplayCacheKey({
|
||||
pathKey: "agent",
|
||||
token,
|
||||
idempotencyKey,
|
||||
dispatchScope: {
|
||||
agentId: targetAgentId ?? null,
|
||||
sessionKey:
|
||||
normalized.value.sessionKey ?? hooksConfig.sessionPolicy.defaultSessionKey ?? null,
|
||||
message: normalized.value.message,
|
||||
name: normalized.value.name,
|
||||
wakeMode: normalized.value.wakeMode,
|
||||
deliver: normalized.value.deliver,
|
||||
channel: normalized.value.channel,
|
||||
to: normalized.value.to ?? null,
|
||||
model: normalized.value.model ?? null,
|
||||
thinking: normalized.value.thinking ?? null,
|
||||
timeoutSeconds: normalized.value.timeoutSeconds ?? null,
|
||||
},
|
||||
});
|
||||
const cachedRunId = resolveCachedHookRunId(replayKey, now);
|
||||
if (cachedRunId) {
|
||||
sendJson(res, 200, { ok: true, runId: cachedRunId });
|
||||
return true;
|
||||
}
|
||||
const normalizedDispatchSessionKey = normalizeHookDispatchSessionKey({
|
||||
sessionKey: sessionKey.value,
|
||||
targetAgentId,
|
||||
});
|
||||
const allowedPrefixes = hooksConfig.sessionPolicy.allowedSessionKeyPrefixes;
|
||||
if (
|
||||
allowedPrefixes &&
|
||||
!isSessionKeyAllowedByPrefix(normalizedDispatchSessionKey, allowedPrefixes)
|
||||
) {
|
||||
sendJson(res, 400, { ok: false, error: getHookSessionKeyPrefixError(allowedPrefixes) });
|
||||
return true;
|
||||
}
|
||||
const runId = dispatchAgentHook({
|
||||
...normalized.value,
|
||||
idempotencyKey,
|
||||
sessionKey: normalizedDispatchSessionKey,
|
||||
agentId: targetAgentId,
|
||||
externalContentSource: "webhook",
|
||||
});
|
||||
rememberHookRunId(replayKey, runId, now);
|
||||
sendJson(res, 200, { ok: true, runId });
|
||||
return true;
|
||||
}
|
||||
|
||||
if (hooksConfig.mappings.length > 0) {
|
||||
try {
|
||||
const mapped = await applyHookMappings(hooksConfig.mappings, {
|
||||
payload: payload as Record<string, unknown>,
|
||||
headers,
|
||||
url,
|
||||
path: subPath,
|
||||
});
|
||||
if (mapped) {
|
||||
if (!mapped.ok) {
|
||||
sendJson(res, 400, { ok: false, error: mapped.error });
|
||||
return true;
|
||||
}
|
||||
if (mapped.action === null) {
|
||||
res.statusCode = 204;
|
||||
res.end();
|
||||
return true;
|
||||
}
|
||||
if (mapped.action.kind === "wake") {
|
||||
dispatchWakeHook({
|
||||
text: mapped.action.text,
|
||||
mode: mapped.action.mode,
|
||||
});
|
||||
sendJson(res, 200, { ok: true, mode: mapped.action.mode });
|
||||
return true;
|
||||
}
|
||||
const channel = resolveHookChannel(mapped.action.channel);
|
||||
if (!channel) {
|
||||
sendJson(res, 400, { ok: false, error: getHookChannelError() });
|
||||
return true;
|
||||
}
|
||||
if (!isHookAgentAllowed(hooksConfig, mapped.action.agentId)) {
|
||||
sendJson(res, 400, { ok: false, error: getHookAgentPolicyError() });
|
||||
return true;
|
||||
}
|
||||
const sessionKey = resolveHookSessionKey({
|
||||
hooksConfig,
|
||||
source:
|
||||
mapped.action.sessionKeySource === "static" ? "mapping-static" : "mapping-templated",
|
||||
sessionKey: mapped.action.sessionKey,
|
||||
});
|
||||
if (!sessionKey.ok) {
|
||||
sendJson(res, 400, { ok: false, error: sessionKey.error });
|
||||
return true;
|
||||
}
|
||||
const targetAgentId = resolveHookTargetAgentId(hooksConfig, mapped.action.agentId);
|
||||
const normalizedDispatchSessionKey = normalizeHookDispatchSessionKey({
|
||||
sessionKey: sessionKey.value,
|
||||
targetAgentId,
|
||||
});
|
||||
const allowedPrefixes = hooksConfig.sessionPolicy.allowedSessionKeyPrefixes;
|
||||
if (
|
||||
allowedPrefixes &&
|
||||
!isSessionKeyAllowedByPrefix(normalizedDispatchSessionKey, allowedPrefixes)
|
||||
) {
|
||||
sendJson(res, 400, { ok: false, error: getHookSessionKeyPrefixError(allowedPrefixes) });
|
||||
return true;
|
||||
}
|
||||
const replayKey = buildHookReplayCacheKey({
|
||||
pathKey: subPath || "mapping",
|
||||
token,
|
||||
idempotencyKey,
|
||||
dispatchScope: {
|
||||
agentId: targetAgentId ?? null,
|
||||
sessionKey:
|
||||
mapped.action.sessionKey ?? hooksConfig.sessionPolicy.defaultSessionKey ?? null,
|
||||
message: mapped.action.message,
|
||||
name: mapped.action.name ?? "Hook",
|
||||
wakeMode: mapped.action.wakeMode,
|
||||
deliver: resolveHookDeliver(mapped.action.deliver),
|
||||
channel,
|
||||
to: mapped.action.to ?? null,
|
||||
model: mapped.action.model ?? null,
|
||||
thinking: mapped.action.thinking ?? null,
|
||||
timeoutSeconds: mapped.action.timeoutSeconds ?? null,
|
||||
},
|
||||
});
|
||||
const cachedRunId = resolveCachedHookRunId(replayKey, now);
|
||||
if (cachedRunId) {
|
||||
sendJson(res, 200, { ok: true, runId: cachedRunId });
|
||||
return true;
|
||||
}
|
||||
const runId = dispatchAgentHook({
|
||||
message: mapped.action.message,
|
||||
name: mapped.action.name ?? "Hook",
|
||||
idempotencyKey,
|
||||
agentId: targetAgentId,
|
||||
wakeMode: mapped.action.wakeMode,
|
||||
sessionKey: normalizedDispatchSessionKey,
|
||||
deliver: resolveHookDeliver(mapped.action.deliver),
|
||||
channel,
|
||||
to: mapped.action.to,
|
||||
model: mapped.action.model,
|
||||
thinking: mapped.action.thinking,
|
||||
timeoutSeconds: mapped.action.timeoutSeconds,
|
||||
allowUnsafeExternalContent: mapped.action.allowUnsafeExternalContent,
|
||||
externalContentSource: resolveMappedHookExternalContentSource({
|
||||
subPath,
|
||||
payload: payload as Record<string, unknown>,
|
||||
sessionKey: sessionKey.value,
|
||||
}),
|
||||
});
|
||||
rememberHookRunId(replayKey, runId, now);
|
||||
sendJson(res, 200, { ok: true, runId });
|
||||
return true;
|
||||
}
|
||||
} catch (err) {
|
||||
logHooks.warn(`hook mapping failed: ${String(err)}`);
|
||||
sendJson(res, 500, { ok: false, error: "hook mapping failed" });
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
res.statusCode = 404;
|
||||
res.setHeader("Content-Type", "text/plain; charset=utf-8");
|
||||
res.end("Not Found");
|
||||
return true;
|
||||
};
|
||||
}
|
||||
|
||||
export function createGatewayHttpServer(opts: {
|
||||
canvasHost: CanvasHostHandler | null;
|
||||
clients: Set<GatewayWsClient>;
|
||||
|
||||
@@ -1,11 +1,11 @@
|
||||
import type { PluginServicesHandle } from "../plugins/services.js";
|
||||
import type { HooksConfigResolved } from "./hooks.js";
|
||||
import type { GatewayCronState } from "./server-cron.js";
|
||||
import type { HookClientIpConfig } from "./server-http.js";
|
||||
import {
|
||||
createGatewayServerMutableState,
|
||||
type GatewayServerMutableState,
|
||||
} from "./server-runtime-handles.js";
|
||||
import type { HookClientIpConfig } from "./server/hooks-request-handler.js";
|
||||
|
||||
export type GatewayServerLiveState = GatewayServerMutableState & {
|
||||
hooksConfig: HooksConfigResolved | null;
|
||||
|
||||
@@ -30,7 +30,6 @@ import type { ChannelKind } from "./config-reload-plan.js";
|
||||
import { startGatewayConfigReloader, type GatewayReloadPlan } from "./config-reload.js";
|
||||
import { resolveHooksConfig } from "./hooks.js";
|
||||
import { buildGatewayCronService, type GatewayCronState } from "./server-cron.js";
|
||||
import type { HookClientIpConfig } from "./server-http.js";
|
||||
import {
|
||||
type GatewayChannelManager,
|
||||
startGatewayChannelHealthMonitor,
|
||||
@@ -43,7 +42,8 @@ import {
|
||||
type SharedGatewaySessionGenerationState,
|
||||
} from "./server-shared-auth-generation.js";
|
||||
import type { ActivateRuntimeSecrets } from "./server-startup-config.js";
|
||||
import { resolveHookClientIpConfig } from "./server/hooks.js";
|
||||
import { resolveHookClientIpConfig } from "./server/hook-client-ip-config.js";
|
||||
import type { HookClientIpConfig } from "./server/hooks-request-handler.js";
|
||||
|
||||
type GatewayHotReloadState = {
|
||||
hooksConfig: ReturnType<typeof resolveHooksConfig>;
|
||||
|
||||
@@ -28,13 +28,9 @@ import {
|
||||
createToolEventRecipientRegistry,
|
||||
} from "./server-chat-state.js";
|
||||
import { MAX_PREAUTH_PAYLOAD_BYTES } from "./server-constants.js";
|
||||
import {
|
||||
attachGatewayUpgradeHandler,
|
||||
createGatewayHttpServer,
|
||||
type HookClientIpConfig,
|
||||
} from "./server-http.js";
|
||||
import { attachGatewayUpgradeHandler, createGatewayHttpServer } from "./server-http.js";
|
||||
import type { DedupeEntry } from "./server-shared.js";
|
||||
import { createGatewayHooksRequestHandler } from "./server/hooks.js";
|
||||
import type { HookClientIpConfig, HooksRequestHandler } from "./server/hooks-request-handler.js";
|
||||
import { listenGatewayHttpServer } from "./server/http-listen.js";
|
||||
import type { PluginRoutePathContext } from "./server/plugins-http/path-context.js";
|
||||
import { shouldEnforceGatewayAuthForPluginPath } from "./server/plugins-http/route-auth.js";
|
||||
@@ -145,14 +141,30 @@ export async function createGatewayRuntimeState(params: {
|
||||
const clients = new Set<GatewayWsClient>();
|
||||
const { broadcast, broadcastToConnIds } = createGatewayBroadcaster({ clients });
|
||||
|
||||
const handleHooksRequest = createGatewayHooksRequestHandler({
|
||||
deps: params.deps,
|
||||
getHooksConfig: params.hooksConfig,
|
||||
getClientIpConfig: params.getHookClientIpConfig,
|
||||
bindHost: params.bindHost,
|
||||
port: params.port,
|
||||
logHooks: params.logHooks,
|
||||
});
|
||||
let loadedHooksRequestHandler: HooksRequestHandler | null = null;
|
||||
const handleHooksRequest: HooksRequestHandler = async (req, res) => {
|
||||
const hooksConfig = params.hooksConfig();
|
||||
if (!hooksConfig) {
|
||||
return false;
|
||||
}
|
||||
const url = new URL(req.url ?? "/", "http://localhost");
|
||||
const basePath = hooksConfig.basePath;
|
||||
if (url.pathname !== basePath && !url.pathname.startsWith(`${basePath}/`)) {
|
||||
return false;
|
||||
}
|
||||
if (!loadedHooksRequestHandler) {
|
||||
const { createGatewayHooksRequestHandler } = await import("./server/hooks.js");
|
||||
loadedHooksRequestHandler = createGatewayHooksRequestHandler({
|
||||
deps: params.deps,
|
||||
getHooksConfig: params.hooksConfig,
|
||||
getClientIpConfig: params.getHookClientIpConfig,
|
||||
bindHost: params.bindHost,
|
||||
port: params.port,
|
||||
logHooks: params.logHooks,
|
||||
});
|
||||
}
|
||||
return await loadedHooksRequestHandler(req, res);
|
||||
};
|
||||
|
||||
let loadedPluginRequestHandler: GatewayPluginRequestHandler | null = null;
|
||||
const handlePluginRequest: GatewayPluginRequestHandler = async (
|
||||
|
||||
@@ -90,7 +90,7 @@ import {
|
||||
incrementPresenceVersion,
|
||||
refreshGatewayHealthSnapshot,
|
||||
} from "./server/health-state.js";
|
||||
import { resolveHookClientIpConfig } from "./server/hooks.js";
|
||||
import { resolveHookClientIpConfig } from "./server/hook-client-ip-config.js";
|
||||
import { createReadinessChecker } from "./server/readiness.js";
|
||||
import { loadGatewayTlsRuntime } from "./server/tls.js";
|
||||
import { resolveSharedGatewaySessionGeneration } from "./server/ws-shared-generation.js";
|
||||
|
||||
9
src/gateway/server/hook-client-ip-config.ts
Normal file
9
src/gateway/server/hook-client-ip-config.ts
Normal file
@@ -0,0 +1,9 @@
|
||||
import type { OpenClawConfig } from "../../config/types.openclaw.js";
|
||||
import type { HookClientIpConfig } from "./hooks-request-handler.js";
|
||||
|
||||
export function resolveHookClientIpConfig(cfg: OpenClawConfig): HookClientIpConfig {
|
||||
return {
|
||||
trustedProxies: cfg.gateway?.trustedProxies,
|
||||
allowRealIpFallback: cfg.gateway?.allowRealIpFallback === true,
|
||||
};
|
||||
}
|
||||
450
src/gateway/server/hooks-request-handler.ts
Normal file
450
src/gateway/server/hooks-request-handler.ts
Normal file
@@ -0,0 +1,450 @@
|
||||
import { createHash } from "node:crypto";
|
||||
import type { IncomingMessage, ServerResponse } from "node:http";
|
||||
import type { createSubsystemLogger } from "../../logging/subsystem.js";
|
||||
import { resolveHookExternalContentSource as resolveHookExternalContentSourceFromSession } from "../../security/external-content.js";
|
||||
import { safeEqualSecret } from "../../security/secret-equal.js";
|
||||
import {
|
||||
AUTH_RATE_LIMIT_SCOPE_HOOK_AUTH,
|
||||
createAuthRateLimiter,
|
||||
normalizeRateLimitClientIp,
|
||||
} from "../auth-rate-limit.js";
|
||||
import { applyHookMappings } from "../hooks-mapping.js";
|
||||
import {
|
||||
extractHookToken,
|
||||
getHookAgentPolicyError,
|
||||
getHookChannelError,
|
||||
getHookSessionKeyPrefixError,
|
||||
type HookAgentDispatchPayload,
|
||||
type HooksConfigResolved,
|
||||
isHookAgentAllowed,
|
||||
isSessionKeyAllowedByPrefix,
|
||||
normalizeAgentPayload,
|
||||
normalizeHookDispatchSessionKey,
|
||||
normalizeHookHeaders,
|
||||
normalizeWakePayload,
|
||||
readJsonBody,
|
||||
resolveHookChannel,
|
||||
resolveHookDeliver,
|
||||
resolveHookIdempotencyKey,
|
||||
resolveHookSessionKey,
|
||||
resolveHookTargetAgentId,
|
||||
} from "../hooks.js";
|
||||
import { resolveRequestClientIp } from "../net.js";
|
||||
import { DEDUPE_MAX, DEDUPE_TTL_MS } from "../server-constants.js";
|
||||
|
||||
type SubsystemLogger = ReturnType<typeof createSubsystemLogger>;
|
||||
|
||||
const HOOK_AUTH_FAILURE_LIMIT = 20;
|
||||
const HOOK_AUTH_FAILURE_WINDOW_MS = 60_000;
|
||||
|
||||
export type HookClientIpConfig = Readonly<{
|
||||
trustedProxies?: string[];
|
||||
allowRealIpFallback?: boolean;
|
||||
}>;
|
||||
|
||||
export type HooksRequestHandler = (req: IncomingMessage, res: ServerResponse) => Promise<boolean>;
|
||||
|
||||
type HookDispatchers = {
|
||||
dispatchWakeHook: (value: { text: string; mode: "now" | "next-heartbeat" }) => void;
|
||||
dispatchAgentHook: (value: HookAgentDispatchPayload) => string;
|
||||
};
|
||||
|
||||
type HookReplayEntry = {
|
||||
ts: number;
|
||||
runId: string;
|
||||
};
|
||||
|
||||
type HookReplayScope = {
|
||||
pathKey: string;
|
||||
token: string | undefined;
|
||||
idempotencyKey?: string;
|
||||
dispatchScope: Record<string, unknown>;
|
||||
};
|
||||
|
||||
function sendJson(res: ServerResponse, status: number, body: unknown) {
|
||||
res.statusCode = status;
|
||||
res.setHeader("Content-Type", "application/json; charset=utf-8");
|
||||
res.end(JSON.stringify(body));
|
||||
}
|
||||
|
||||
function resolveMappedHookExternalContentSource(params: {
|
||||
subPath: string;
|
||||
payload: Record<string, unknown>;
|
||||
sessionKey: string;
|
||||
}) {
|
||||
const payloadSource =
|
||||
typeof params.payload.source === "string" ? params.payload.source.trim().toLowerCase() : "";
|
||||
if (params.subPath === "gmail" || payloadSource === "gmail") {
|
||||
return "gmail" as const;
|
||||
}
|
||||
return resolveHookExternalContentSourceFromSession(params.sessionKey) ?? "webhook";
|
||||
}
|
||||
|
||||
export function createHooksRequestHandler(
|
||||
opts: {
|
||||
getHooksConfig: () => HooksConfigResolved | null;
|
||||
bindHost: string;
|
||||
port: number;
|
||||
logHooks: SubsystemLogger;
|
||||
getClientIpConfig?: () => HookClientIpConfig;
|
||||
} & HookDispatchers,
|
||||
): HooksRequestHandler {
|
||||
const { getHooksConfig, logHooks, dispatchAgentHook, dispatchWakeHook, getClientIpConfig } = opts;
|
||||
const hookReplayCache = new Map<string, HookReplayEntry>();
|
||||
const hookAuthLimiter = createAuthRateLimiter({
|
||||
maxAttempts: HOOK_AUTH_FAILURE_LIMIT,
|
||||
windowMs: HOOK_AUTH_FAILURE_WINDOW_MS,
|
||||
lockoutMs: HOOK_AUTH_FAILURE_WINDOW_MS,
|
||||
exemptLoopback: false,
|
||||
// Handler lifetimes are tied to gateway runtime/tests; skip background timer fanout.
|
||||
pruneIntervalMs: 0,
|
||||
});
|
||||
|
||||
const resolveHookClientKey = (req: IncomingMessage): string => {
|
||||
const clientIpConfig = getClientIpConfig?.();
|
||||
const clientIp =
|
||||
resolveRequestClientIp(
|
||||
req,
|
||||
clientIpConfig?.trustedProxies,
|
||||
clientIpConfig?.allowRealIpFallback === true,
|
||||
) ?? req.socket?.remoteAddress;
|
||||
return normalizeRateLimitClientIp(clientIp);
|
||||
};
|
||||
|
||||
const pruneHookReplayCache = (now: number) => {
|
||||
const cutoff = now - DEDUPE_TTL_MS;
|
||||
for (const [key, entry] of hookReplayCache) {
|
||||
if (entry.ts < cutoff) {
|
||||
hookReplayCache.delete(key);
|
||||
}
|
||||
}
|
||||
while (hookReplayCache.size > DEDUPE_MAX) {
|
||||
const oldestKey = hookReplayCache.keys().next().value;
|
||||
if (!oldestKey) {
|
||||
break;
|
||||
}
|
||||
hookReplayCache.delete(oldestKey);
|
||||
}
|
||||
};
|
||||
|
||||
const buildHookReplayCacheKey = (params: HookReplayScope): string | undefined => {
|
||||
const idem = params.idempotencyKey?.trim();
|
||||
if (!idem) {
|
||||
return undefined;
|
||||
}
|
||||
const tokenFingerprint = createHash("sha256")
|
||||
.update(params.token ?? "", "utf8")
|
||||
.digest("hex");
|
||||
const idempotencyFingerprint = createHash("sha256").update(idem, "utf8").digest("hex");
|
||||
const scopeFingerprint = createHash("sha256")
|
||||
.update(
|
||||
JSON.stringify({
|
||||
pathKey: params.pathKey,
|
||||
dispatchScope: params.dispatchScope,
|
||||
}),
|
||||
"utf8",
|
||||
)
|
||||
.digest("hex");
|
||||
return `${tokenFingerprint}:${scopeFingerprint}:${idempotencyFingerprint}`;
|
||||
};
|
||||
|
||||
const resolveCachedHookRunId = (key: string | undefined, now: number): string | undefined => {
|
||||
if (!key) {
|
||||
return undefined;
|
||||
}
|
||||
pruneHookReplayCache(now);
|
||||
const cached = hookReplayCache.get(key);
|
||||
if (!cached) {
|
||||
return undefined;
|
||||
}
|
||||
hookReplayCache.delete(key);
|
||||
hookReplayCache.set(key, cached);
|
||||
return cached.runId;
|
||||
};
|
||||
|
||||
const rememberHookRunId = (key: string | undefined, runId: string, now: number) => {
|
||||
if (!key) {
|
||||
return;
|
||||
}
|
||||
hookReplayCache.delete(key);
|
||||
hookReplayCache.set(key, { ts: now, runId });
|
||||
pruneHookReplayCache(now);
|
||||
};
|
||||
|
||||
return async (req, res) => {
|
||||
const hooksConfig = getHooksConfig();
|
||||
if (!hooksConfig) {
|
||||
return false;
|
||||
}
|
||||
// Only pathname/search are used here; keep the base host fixed so bind-host
|
||||
// representation (e.g. IPv6 wildcards) cannot break request parsing.
|
||||
const url = new URL(req.url ?? "/", "http://localhost");
|
||||
const basePath = hooksConfig.basePath;
|
||||
if (url.pathname !== basePath && !url.pathname.startsWith(`${basePath}/`)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
if (url.searchParams.has("token")) {
|
||||
res.statusCode = 400;
|
||||
res.setHeader("Content-Type", "text/plain; charset=utf-8");
|
||||
res.end(
|
||||
"Hook token must be provided via Authorization: Bearer <token> or X-OpenClaw-Token header (query parameters are not allowed).",
|
||||
);
|
||||
return true;
|
||||
}
|
||||
|
||||
if (req.method !== "POST") {
|
||||
res.statusCode = 405;
|
||||
res.setHeader("Allow", "POST");
|
||||
res.setHeader("Content-Type", "text/plain; charset=utf-8");
|
||||
res.end("Method Not Allowed");
|
||||
return true;
|
||||
}
|
||||
|
||||
const token = extractHookToken(req);
|
||||
const clientKey = resolveHookClientKey(req);
|
||||
if (!safeEqualSecret(token, hooksConfig.token)) {
|
||||
const throttle = hookAuthLimiter.check(clientKey, AUTH_RATE_LIMIT_SCOPE_HOOK_AUTH);
|
||||
if (!throttle.allowed) {
|
||||
const retryAfter = throttle.retryAfterMs > 0 ? Math.ceil(throttle.retryAfterMs / 1000) : 1;
|
||||
res.statusCode = 429;
|
||||
res.setHeader("Retry-After", String(retryAfter));
|
||||
res.setHeader("Content-Type", "text/plain; charset=utf-8");
|
||||
res.end("Too Many Requests");
|
||||
logHooks.warn(`hook auth throttled for ${clientKey}; retry-after=${retryAfter}s`);
|
||||
return true;
|
||||
}
|
||||
hookAuthLimiter.recordFailure(clientKey, AUTH_RATE_LIMIT_SCOPE_HOOK_AUTH);
|
||||
res.statusCode = 401;
|
||||
res.setHeader("Content-Type", "text/plain; charset=utf-8");
|
||||
res.end("Unauthorized");
|
||||
return true;
|
||||
}
|
||||
hookAuthLimiter.reset(clientKey, AUTH_RATE_LIMIT_SCOPE_HOOK_AUTH);
|
||||
|
||||
const subPath = url.pathname.slice(basePath.length).replace(/^\/+/, "");
|
||||
if (!subPath) {
|
||||
res.statusCode = 404;
|
||||
res.setHeader("Content-Type", "text/plain; charset=utf-8");
|
||||
res.end("Not Found");
|
||||
return true;
|
||||
}
|
||||
|
||||
const body = await readJsonBody(req, hooksConfig.maxBodyBytes);
|
||||
if (!body.ok) {
|
||||
const status =
|
||||
body.error === "payload too large"
|
||||
? 413
|
||||
: body.error === "request body timeout"
|
||||
? 408
|
||||
: 400;
|
||||
sendJson(res, status, { ok: false, error: body.error });
|
||||
return true;
|
||||
}
|
||||
|
||||
const payload = typeof body.value === "object" && body.value !== null ? body.value : {};
|
||||
const headers = normalizeHookHeaders(req);
|
||||
const idempotencyKey = resolveHookIdempotencyKey({
|
||||
payload: payload as Record<string, unknown>,
|
||||
headers,
|
||||
});
|
||||
const now = Date.now();
|
||||
|
||||
if (subPath === "wake") {
|
||||
const normalized = normalizeWakePayload(payload as Record<string, unknown>);
|
||||
if (!normalized.ok) {
|
||||
sendJson(res, 400, { ok: false, error: normalized.error });
|
||||
return true;
|
||||
}
|
||||
dispatchWakeHook(normalized.value);
|
||||
sendJson(res, 200, { ok: true, mode: normalized.value.mode });
|
||||
return true;
|
||||
}
|
||||
|
||||
if (subPath === "agent") {
|
||||
const normalized = normalizeAgentPayload(payload as Record<string, unknown>);
|
||||
if (!normalized.ok) {
|
||||
sendJson(res, 400, { ok: false, error: normalized.error });
|
||||
return true;
|
||||
}
|
||||
if (!isHookAgentAllowed(hooksConfig, normalized.value.agentId)) {
|
||||
sendJson(res, 400, { ok: false, error: getHookAgentPolicyError() });
|
||||
return true;
|
||||
}
|
||||
const sessionKey = resolveHookSessionKey({
|
||||
hooksConfig,
|
||||
source: "request",
|
||||
sessionKey: normalized.value.sessionKey,
|
||||
});
|
||||
if (!sessionKey.ok) {
|
||||
sendJson(res, 400, { ok: false, error: sessionKey.error });
|
||||
return true;
|
||||
}
|
||||
const targetAgentId = resolveHookTargetAgentId(hooksConfig, normalized.value.agentId);
|
||||
const replayKey = buildHookReplayCacheKey({
|
||||
pathKey: "agent",
|
||||
token,
|
||||
idempotencyKey,
|
||||
dispatchScope: {
|
||||
agentId: targetAgentId ?? null,
|
||||
sessionKey:
|
||||
normalized.value.sessionKey ?? hooksConfig.sessionPolicy.defaultSessionKey ?? null,
|
||||
message: normalized.value.message,
|
||||
name: normalized.value.name,
|
||||
wakeMode: normalized.value.wakeMode,
|
||||
deliver: normalized.value.deliver,
|
||||
channel: normalized.value.channel,
|
||||
to: normalized.value.to ?? null,
|
||||
model: normalized.value.model ?? null,
|
||||
thinking: normalized.value.thinking ?? null,
|
||||
timeoutSeconds: normalized.value.timeoutSeconds ?? null,
|
||||
},
|
||||
});
|
||||
const cachedRunId = resolveCachedHookRunId(replayKey, now);
|
||||
if (cachedRunId) {
|
||||
sendJson(res, 200, { ok: true, runId: cachedRunId });
|
||||
return true;
|
||||
}
|
||||
const normalizedDispatchSessionKey = normalizeHookDispatchSessionKey({
|
||||
sessionKey: sessionKey.value,
|
||||
targetAgentId,
|
||||
});
|
||||
const allowedPrefixes = hooksConfig.sessionPolicy.allowedSessionKeyPrefixes;
|
||||
if (
|
||||
allowedPrefixes &&
|
||||
!isSessionKeyAllowedByPrefix(normalizedDispatchSessionKey, allowedPrefixes)
|
||||
) {
|
||||
sendJson(res, 400, { ok: false, error: getHookSessionKeyPrefixError(allowedPrefixes) });
|
||||
return true;
|
||||
}
|
||||
const runId = dispatchAgentHook({
|
||||
...normalized.value,
|
||||
idempotencyKey,
|
||||
sessionKey: normalizedDispatchSessionKey,
|
||||
agentId: targetAgentId,
|
||||
externalContentSource: "webhook",
|
||||
});
|
||||
rememberHookRunId(replayKey, runId, now);
|
||||
sendJson(res, 200, { ok: true, runId });
|
||||
return true;
|
||||
}
|
||||
|
||||
if (hooksConfig.mappings.length > 0) {
|
||||
try {
|
||||
const mapped = await applyHookMappings(hooksConfig.mappings, {
|
||||
payload: payload as Record<string, unknown>,
|
||||
headers,
|
||||
url,
|
||||
path: subPath,
|
||||
});
|
||||
if (mapped) {
|
||||
if (!mapped.ok) {
|
||||
sendJson(res, 400, { ok: false, error: mapped.error });
|
||||
return true;
|
||||
}
|
||||
if (mapped.action === null) {
|
||||
res.statusCode = 204;
|
||||
res.end();
|
||||
return true;
|
||||
}
|
||||
if (mapped.action.kind === "wake") {
|
||||
dispatchWakeHook({
|
||||
text: mapped.action.text,
|
||||
mode: mapped.action.mode,
|
||||
});
|
||||
sendJson(res, 200, { ok: true, mode: mapped.action.mode });
|
||||
return true;
|
||||
}
|
||||
const channel = resolveHookChannel(mapped.action.channel);
|
||||
if (!channel) {
|
||||
sendJson(res, 400, { ok: false, error: getHookChannelError() });
|
||||
return true;
|
||||
}
|
||||
if (!isHookAgentAllowed(hooksConfig, mapped.action.agentId)) {
|
||||
sendJson(res, 400, { ok: false, error: getHookAgentPolicyError() });
|
||||
return true;
|
||||
}
|
||||
const sessionKey = resolveHookSessionKey({
|
||||
hooksConfig,
|
||||
source:
|
||||
mapped.action.sessionKeySource === "static" ? "mapping-static" : "mapping-templated",
|
||||
sessionKey: mapped.action.sessionKey,
|
||||
});
|
||||
if (!sessionKey.ok) {
|
||||
sendJson(res, 400, { ok: false, error: sessionKey.error });
|
||||
return true;
|
||||
}
|
||||
const targetAgentId = resolveHookTargetAgentId(hooksConfig, mapped.action.agentId);
|
||||
const normalizedDispatchSessionKey = normalizeHookDispatchSessionKey({
|
||||
sessionKey: sessionKey.value,
|
||||
targetAgentId,
|
||||
});
|
||||
const allowedPrefixes = hooksConfig.sessionPolicy.allowedSessionKeyPrefixes;
|
||||
if (
|
||||
allowedPrefixes &&
|
||||
!isSessionKeyAllowedByPrefix(normalizedDispatchSessionKey, allowedPrefixes)
|
||||
) {
|
||||
sendJson(res, 400, { ok: false, error: getHookSessionKeyPrefixError(allowedPrefixes) });
|
||||
return true;
|
||||
}
|
||||
const replayKey = buildHookReplayCacheKey({
|
||||
pathKey: subPath || "mapping",
|
||||
token,
|
||||
idempotencyKey,
|
||||
dispatchScope: {
|
||||
agentId: targetAgentId ?? null,
|
||||
sessionKey:
|
||||
mapped.action.sessionKey ?? hooksConfig.sessionPolicy.defaultSessionKey ?? null,
|
||||
message: mapped.action.message,
|
||||
name: mapped.action.name ?? "Hook",
|
||||
wakeMode: mapped.action.wakeMode,
|
||||
deliver: resolveHookDeliver(mapped.action.deliver),
|
||||
channel,
|
||||
to: mapped.action.to ?? null,
|
||||
model: mapped.action.model ?? null,
|
||||
thinking: mapped.action.thinking ?? null,
|
||||
timeoutSeconds: mapped.action.timeoutSeconds ?? null,
|
||||
},
|
||||
});
|
||||
const cachedRunId = resolveCachedHookRunId(replayKey, now);
|
||||
if (cachedRunId) {
|
||||
sendJson(res, 200, { ok: true, runId: cachedRunId });
|
||||
return true;
|
||||
}
|
||||
const runId = dispatchAgentHook({
|
||||
message: mapped.action.message,
|
||||
name: mapped.action.name ?? "Hook",
|
||||
idempotencyKey,
|
||||
agentId: targetAgentId,
|
||||
wakeMode: mapped.action.wakeMode,
|
||||
sessionKey: normalizedDispatchSessionKey,
|
||||
deliver: resolveHookDeliver(mapped.action.deliver),
|
||||
channel,
|
||||
to: mapped.action.to,
|
||||
model: mapped.action.model,
|
||||
thinking: mapped.action.thinking,
|
||||
timeoutSeconds: mapped.action.timeoutSeconds,
|
||||
allowUnsafeExternalContent: mapped.action.allowUnsafeExternalContent,
|
||||
externalContentSource: resolveMappedHookExternalContentSource({
|
||||
subPath,
|
||||
payload: payload as Record<string, unknown>,
|
||||
sessionKey: sessionKey.value,
|
||||
}),
|
||||
});
|
||||
rememberHookRunId(replayKey, runId, now);
|
||||
sendJson(res, 200, { ok: true, runId });
|
||||
return true;
|
||||
}
|
||||
} catch (err) {
|
||||
logHooks.warn(`hook mapping failed: ${String(err)}`);
|
||||
sendJson(res, 500, { ok: false, error: "hook mapping failed" });
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
res.statusCode = 404;
|
||||
res.setHeader("Content-Type", "text/plain; charset=utf-8");
|
||||
res.end("Not Found");
|
||||
return true;
|
||||
};
|
||||
}
|
||||
@@ -24,7 +24,7 @@ vi.mock("../../config/config.js", () => ({
|
||||
|
||||
let capturedDispatchAgentHook: ((...args: unknown[]) => unknown) | undefined;
|
||||
|
||||
vi.mock("../server-http.js", () => ({
|
||||
vi.mock("./hooks-request-handler.js", () => ({
|
||||
createHooksRequestHandler: vi.fn((opts: Record<string, unknown>) => {
|
||||
capturedDispatchAgentHook = opts.dispatchAgentHook as typeof capturedDispatchAgentHook;
|
||||
return vi.fn();
|
||||
|
||||
@@ -3,24 +3,16 @@ import { sanitizeInboundSystemTags } from "../../auto-reply/reply/inbound-text.j
|
||||
import type { CliDeps } from "../../cli/deps.types.js";
|
||||
import { loadConfig } from "../../config/config.js";
|
||||
import { resolveMainSessionKeyFromConfig } from "../../config/sessions.js";
|
||||
import type { OpenClawConfig } from "../../config/types.openclaw.js";
|
||||
import type { CronJob } from "../../cron/types.js";
|
||||
import { requestHeartbeatNow } from "../../infra/heartbeat-wake.js";
|
||||
import { enqueueSystemEvent } from "../../infra/system-events.js";
|
||||
import type { createSubsystemLogger } from "../../logging/subsystem.js";
|
||||
import { normalizeOptionalString } from "../../shared/string-coerce.js";
|
||||
import { type HookAgentDispatchPayload, type HooksConfigResolved } from "../hooks.js";
|
||||
import { createHooksRequestHandler, type HookClientIpConfig } from "../server-http.js";
|
||||
import { createHooksRequestHandler, type HookClientIpConfig } from "./hooks-request-handler.js";
|
||||
|
||||
type SubsystemLogger = ReturnType<typeof createSubsystemLogger>;
|
||||
|
||||
export function resolveHookClientIpConfig(cfg: OpenClawConfig): HookClientIpConfig {
|
||||
return {
|
||||
trustedProxies: cfg.gateway?.trustedProxies,
|
||||
allowRealIpFallback: cfg.gateway?.allowRealIpFallback === true,
|
||||
};
|
||||
}
|
||||
|
||||
export function createGatewayHooksRequestHandler(params: {
|
||||
deps: CliDeps;
|
||||
getHooksConfig: () => HooksConfigResolved | null;
|
||||
|
||||
Reference in New Issue
Block a user