From 91ae967faedd3cb66ade45676eb515cb91930d60 Mon Sep 17 00:00:00 2001 From: Mariano Belinky Date: Sat, 4 Apr 2026 12:31:12 +0200 Subject: [PATCH] plugins: harden webhook taskflow API responses --- extensions/webhooks/src/http.test.ts | 223 +++++++++++++++++++++++++-- extensions/webhooks/src/http.ts | 212 +++++++++++++++++++++++-- 2 files changed, 405 insertions(+), 30 deletions(-) diff --git a/extensions/webhooks/src/http.test.ts b/extensions/webhooks/src/http.test.ts index bcf08a01384..e2ee560ff00 100644 --- a/extensions/webhooks/src/http.test.ts +++ b/extensions/webhooks/src/http.test.ts @@ -91,6 +91,26 @@ function createHandler(): { }; } +async function dispatchJsonRequest(params: { + handler: ReturnType; + path: string; + secret?: string; + body: unknown; +}) { + const req = createJsonRequest({ + path: params.path, + secret: params.secret, + body: params.body, + }); + const res = createMockServerResponse(); + await params.handler(req, res); + return res; +} + +function parseJsonBody(res: { body?: string | Buffer | null }) { + return JSON.parse(String(res.body ?? "")); +} + afterEach(() => { vi.clearAllMocks(); }); @@ -98,16 +118,14 @@ afterEach(() => { describe("createTaskFlowWebhookRequestHandler", () => { it("rejects requests with the wrong secret", async () => { const { handler, target } = createHandler(); - const req = createJsonRequest({ + const res = await dispatchJsonRequest({ + handler, path: target.path, secret: "wrong-secret", body: { action: "list_flows", }, }); - const res = createMockServerResponse(); - - await handler(req, res); expect(res.statusCode).toBe(401); expect(res.body).toBe("unauthorized"); @@ -116,7 +134,8 @@ describe("createTaskFlowWebhookRequestHandler", () => { it("creates flows through the bound session and scrubs owner metadata from responses", async () => { const { handler, target } = createHandler(); - const req = createJsonRequest({ + const res = await dispatchJsonRequest({ + handler, path: target.path, secret: target.secret, body: { @@ -124,12 +143,9 @@ describe("createTaskFlowWebhookRequestHandler", () => { goal: "Review inbound queue", }, }); - const res = createMockServerResponse(); - - await handler(req, res); expect(res.statusCode).toBe(200); - const parsed = JSON.parse(res.body ?? ""); + const parsed = parseJsonBody(res); expect(parsed.ok).toBe(true); expect(parsed.result.flow).toMatchObject({ syncMode: "managed", @@ -147,7 +163,8 @@ describe("createTaskFlowWebhookRequestHandler", () => { controllerId: "webhooks/zapier", goal: "Triage inbox", }); - const req = createJsonRequest({ + const res = await dispatchJsonRequest({ + handler, path: target.path, secret: target.secret, body: { @@ -161,12 +178,9 @@ describe("createTaskFlowWebhookRequestHandler", () => { lastEventAt: 10, }, }); - const res = createMockServerResponse(); - - await handler(req, res); expect(res.statusCode).toBe(200); - const parsed = JSON.parse(res.body ?? ""); + const parsed = parseJsonBody(res); expect(parsed.ok).toBe(true); expect(parsed.result.created).toBe(true); expect(parsed.result.task).toMatchObject({ @@ -177,4 +191,185 @@ describe("createTaskFlowWebhookRequestHandler", () => { expect(parsed.result.task.ownerKey).toBeUndefined(); expect(parsed.result.task.requesterSessionKey).toBeUndefined(); }); + + it("returns 404 for missing flow mutations", async () => { + const { handler, target } = createHandler(); + const res = await dispatchJsonRequest({ + handler, + path: target.path, + secret: target.secret, + body: { + action: "set_waiting", + flowId: "flow-missing", + expectedRevision: 0, + }, + }); + + expect(res.statusCode).toBe(404); + const parsed = parseJsonBody(res); + expect(parsed).toMatchObject({ + ok: false, + code: "not_found", + error: "TaskFlow not found.", + result: { + applied: false, + code: "not_found", + }, + }); + }); + + it("returns 409 for revision conflicts", async () => { + const { handler, target } = createHandler(); + const flow = target.taskFlow.createManaged({ + controllerId: "webhooks/zapier", + goal: "Review inbox", + }); + const res = await dispatchJsonRequest({ + handler, + path: target.path, + secret: target.secret, + body: { + action: "set_waiting", + flowId: flow.flowId, + expectedRevision: flow.revision + 1, + }, + }); + + expect(res.statusCode).toBe(409); + const parsed = parseJsonBody(res); + expect(parsed).toMatchObject({ + ok: false, + code: "revision_conflict", + result: { + applied: false, + code: "revision_conflict", + current: { + flowId: flow.flowId, + revision: flow.revision, + }, + }, + }); + }); + + it("rejects internal runtimes and running-only metadata from external callers", async () => { + const { handler, target } = createHandler(); + const flow = target.taskFlow.createManaged({ + controllerId: "webhooks/zapier", + goal: "Review inbox", + }); + + const runtimeRes = await dispatchJsonRequest({ + handler, + path: target.path, + secret: target.secret, + body: { + action: "run_task", + flowId: flow.flowId, + runtime: "cli", + task: "Inspect queue", + }, + }); + expect(runtimeRes.statusCode).toBe(400); + expect(parseJsonBody(runtimeRes)).toMatchObject({ + ok: false, + code: "invalid_request", + }); + + const queuedMetadataRes = await dispatchJsonRequest({ + handler, + path: target.path, + secret: target.secret, + body: { + action: "run_task", + flowId: flow.flowId, + runtime: "acp", + task: "Inspect queue", + startedAt: 10, + }, + }); + expect(queuedMetadataRes.statusCode).toBe(400); + expect(parseJsonBody(queuedMetadataRes)).toMatchObject({ + ok: false, + code: "invalid_request", + error: + "status: status must be running when startedAt, lastEventAt, or progressSummary is provided", + }); + }); + + it("reuses the same task record when retried with the same runId", async () => { + const { handler, target } = createHandler(); + const flow = target.taskFlow.createManaged({ + controllerId: "webhooks/zapier", + goal: "Triage inbox", + }); + + const first = await dispatchJsonRequest({ + handler, + path: target.path, + secret: target.secret, + body: { + action: "run_task", + flowId: flow.flowId, + runtime: "acp", + childSessionKey: "agent:main:subagent:child", + runId: "retry-me", + task: "Inspect the next message batch", + }, + }); + const second = await dispatchJsonRequest({ + handler, + path: target.path, + secret: target.secret, + body: { + action: "run_task", + flowId: flow.flowId, + runtime: "acp", + childSessionKey: "agent:main:subagent:child", + runId: "retry-me", + task: "Inspect the next message batch", + }, + }); + + expect(first.statusCode).toBe(200); + expect(second.statusCode).toBe(200); + const firstParsed = parseJsonBody(first); + const secondParsed = parseJsonBody(second); + expect(firstParsed.result.task.taskId).toBe(secondParsed.result.task.taskId); + expect(target.taskFlow.getTaskSummary(flow.flowId)?.total).toBe(1); + }); + + it("returns 409 when cancellation targets a terminal flow", async () => { + const { handler, target } = createHandler(); + const flow = target.taskFlow.createManaged({ + controllerId: "webhooks/zapier", + goal: "Review inbox", + }); + const finished = target.taskFlow.finish({ + flowId: flow.flowId, + expectedRevision: flow.revision, + }); + expect(finished.applied).toBe(true); + + const res = await dispatchJsonRequest({ + handler, + path: target.path, + secret: target.secret, + body: { + action: "cancel_flow", + flowId: flow.flowId, + }, + }); + + expect(res.statusCode).toBe(409); + expect(parseJsonBody(res)).toMatchObject({ + ok: false, + code: "terminal", + error: "Flow is already succeeded.", + result: { + found: true, + cancelled: false, + reason: "Flow is already succeeded.", + }, + }); + }); }); diff --git a/extensions/webhooks/src/http.ts b/extensions/webhooks/src/http.ts index c14d8b17720..c3b511cead9 100644 --- a/extensions/webhooks/src/http.ts +++ b/extensions/webhooks/src/http.ts @@ -120,7 +120,7 @@ const runTaskRequestSchema = z .object({ action: z.literal("run_task"), flowId: z.string().trim().min(1), - runtime: z.enum(["subagent", "acp", "cli", "cron"]), + runtime: z.enum(["subagent", "acp"]), sourceId: z.string().trim().min(1).optional(), childSessionKey: z.string().trim().min(1).optional(), parentTaskId: z.string().trim().min(1).optional(), @@ -130,22 +130,27 @@ const runTaskRequestSchema = z task: z.string().trim().min(1), preferMetadata: z.boolean().optional(), notifyPolicy: z.enum(["done_only", "state_changes", "silent"]).optional(), - deliveryStatus: z - .enum([ - "pending", - "delivered", - "session_queued", - "failed", - "parent_missing", - "not_applicable", - ]) - .optional(), status: z.enum(["queued", "running"]).optional(), startedAt: z.number().int().nonnegative().optional(), lastEventAt: z.number().int().nonnegative().optional(), progressSummary: nullableStringSchema, }) - .strict(); + .strict() + .superRefine((value, ctx) => { + if ( + value.status !== "running" && + (value.startedAt !== undefined || + value.lastEventAt !== undefined || + value.progressSummary !== undefined) + ) { + ctx.addIssue({ + code: z.ZodIssueCode.custom, + message: + "status must be running when startedAt, lastEventAt, or progressSummary is provided", + path: ["status"], + }); + } + }); const webhookActionSchema = z.discriminatedUnion("action", [ createFlowRequestSchema, @@ -366,6 +371,165 @@ function mapMutationResult( return result; } +function mapMutationStatus(result: { + applied: boolean; + code?: "not_found" | "not_managed" | "revision_conflict"; +}): { statusCode: number; code?: string; error?: string } { + if (result.applied) { + return { statusCode: 200 }; + } + switch (result.code) { + case "not_found": + return { + statusCode: 404, + code: "not_found", + error: "TaskFlow not found.", + }; + case "not_managed": + return { + statusCode: 409, + code: "not_managed", + error: "TaskFlow is not managed by this webhook surface.", + }; + case "revision_conflict": + return { + statusCode: 409, + code: "revision_conflict", + error: "TaskFlow changed since the caller's expected revision.", + }; + default: + return { + statusCode: 409, + code: "mutation_rejected", + error: "TaskFlow mutation was rejected.", + }; + } +} + +function mapRunTaskStatus(result: { created: boolean; found: boolean; reason?: string }): { + statusCode: number; + code?: string; + error?: string; +} { + if (result.created) { + return { statusCode: 200 }; + } + if (!result.found) { + return { + statusCode: 404, + code: "not_found", + error: "TaskFlow not found.", + }; + } + if (result.reason === "Flow cancellation has already been requested.") { + return { + statusCode: 409, + code: "cancel_requested", + error: result.reason, + }; + } + if (result.reason === "Flow does not accept managed child tasks.") { + return { + statusCode: 409, + code: "not_managed", + error: result.reason, + }; + } + if (result.reason?.startsWith("Flow is already ")) { + return { + statusCode: 409, + code: "terminal", + error: result.reason, + }; + } + return { + statusCode: 409, + code: "task_not_created", + error: result.reason ?? "TaskFlow task was not created.", + }; +} + +function mapCancelStatus(result: { found: boolean; cancelled: boolean; reason?: string }): { + statusCode: number; + code?: string; + error?: string; +} { + if (result.cancelled) { + return { statusCode: 200 }; + } + if (!result.found) { + return { + statusCode: 404, + code: "not_found", + error: "TaskFlow not found.", + }; + } + if (result.reason === "One or more child tasks are still active.") { + return { + statusCode: 202, + code: "cancel_pending", + error: result.reason, + }; + } + if (result.reason === "Flow changed while cancellation was in progress.") { + return { + statusCode: 409, + code: "revision_conflict", + error: result.reason, + }; + } + if (result.reason?.startsWith("Flow is already ")) { + return { + statusCode: 409, + code: "terminal", + error: result.reason, + }; + } + return { + statusCode: 409, + code: "cancel_rejected", + error: result.reason ?? "TaskFlow cancellation was rejected.", + }; +} + +function describeWebhookOutcome(params: { action: WebhookAction; result: unknown }): { + statusCode: number; + code?: string; + error?: string; +} { + switch (params.action.action) { + case "set_waiting": + case "resume_flow": + case "finish_flow": + case "fail_flow": + case "request_cancel": + return mapMutationStatus( + params.result as { + applied: boolean; + code?: "not_found" | "not_managed" | "revision_conflict"; + }, + ); + case "cancel_flow": + return mapCancelStatus( + params.result as { + found: boolean; + cancelled: boolean; + reason?: string; + }, + ); + case "run_task": + return mapRunTaskStatus( + params.result as { + created: boolean; + found: boolean; + reason?: string; + }, + ); + default: + return { statusCode: 200 }; + } +} + async function executeWebhookAction(params: { action: WebhookAction; target: TaskFlowWebhookTarget; @@ -514,7 +678,6 @@ async function executeWebhookAction(params: { task: action.task, preferMetadata: action.preferMetadata, notifyPolicy: action.notifyPolicy, - deliveryStatus: action.deliveryStatus, status: action.status, startedAt: action.startedAt, lastEventAt: action.lastEventAt, @@ -613,11 +776,28 @@ export function createTaskFlowWebhookRequestHandler(params: { target, cfg: params.cfg, }); - writeJson(res, 200, { - ok: true, - routeId: target.routeId, + const outcome = describeWebhookOutcome({ + action: parsed.data, result, }); + writeJson( + res, + outcome.statusCode, + outcome.statusCode < 400 + ? { + ok: true, + routeId: target.routeId, + ...(outcome.code ? { code: outcome.code } : {}), + result, + } + : { + ok: false, + routeId: target.routeId, + code: outcome.code ?? "request_rejected", + error: outcome.error ?? "request rejected", + result, + }, + ); return true; }, });