plugins: harden webhook taskflow API responses

This commit is contained in:
Mariano Belinky
2026-04-04 12:31:12 +02:00
parent 8f095d8f5f
commit 91ae967fae
2 changed files with 405 additions and 30 deletions

View File

@@ -91,6 +91,26 @@ function createHandler(): {
};
}
async function dispatchJsonRequest(params: {
handler: ReturnType<typeof createTaskFlowWebhookRequestHandler>;
path: string;
secret?: string;
body: unknown;
}) {
const req = createJsonRequest({
path: params.path,
secret: params.secret,
body: params.body,
});
const res = createMockServerResponse();
await params.handler(req, res);
return res;
}
function parseJsonBody(res: { body?: string | Buffer | null }) {
return JSON.parse(String(res.body ?? ""));
}
afterEach(() => {
vi.clearAllMocks();
});
@@ -98,16 +118,14 @@ afterEach(() => {
describe("createTaskFlowWebhookRequestHandler", () => {
it("rejects requests with the wrong secret", async () => {
const { handler, target } = createHandler();
const req = createJsonRequest({
const res = await dispatchJsonRequest({
handler,
path: target.path,
secret: "wrong-secret",
body: {
action: "list_flows",
},
});
const res = createMockServerResponse();
await handler(req, res);
expect(res.statusCode).toBe(401);
expect(res.body).toBe("unauthorized");
@@ -116,7 +134,8 @@ describe("createTaskFlowWebhookRequestHandler", () => {
it("creates flows through the bound session and scrubs owner metadata from responses", async () => {
const { handler, target } = createHandler();
const req = createJsonRequest({
const res = await dispatchJsonRequest({
handler,
path: target.path,
secret: target.secret,
body: {
@@ -124,12 +143,9 @@ describe("createTaskFlowWebhookRequestHandler", () => {
goal: "Review inbound queue",
},
});
const res = createMockServerResponse();
await handler(req, res);
expect(res.statusCode).toBe(200);
const parsed = JSON.parse(res.body ?? "");
const parsed = parseJsonBody(res);
expect(parsed.ok).toBe(true);
expect(parsed.result.flow).toMatchObject({
syncMode: "managed",
@@ -147,7 +163,8 @@ describe("createTaskFlowWebhookRequestHandler", () => {
controllerId: "webhooks/zapier",
goal: "Triage inbox",
});
const req = createJsonRequest({
const res = await dispatchJsonRequest({
handler,
path: target.path,
secret: target.secret,
body: {
@@ -161,12 +178,9 @@ describe("createTaskFlowWebhookRequestHandler", () => {
lastEventAt: 10,
},
});
const res = createMockServerResponse();
await handler(req, res);
expect(res.statusCode).toBe(200);
const parsed = JSON.parse(res.body ?? "");
const parsed = parseJsonBody(res);
expect(parsed.ok).toBe(true);
expect(parsed.result.created).toBe(true);
expect(parsed.result.task).toMatchObject({
@@ -177,4 +191,185 @@ describe("createTaskFlowWebhookRequestHandler", () => {
expect(parsed.result.task.ownerKey).toBeUndefined();
expect(parsed.result.task.requesterSessionKey).toBeUndefined();
});
it("returns 404 for missing flow mutations", async () => {
const { handler, target } = createHandler();
const res = await dispatchJsonRequest({
handler,
path: target.path,
secret: target.secret,
body: {
action: "set_waiting",
flowId: "flow-missing",
expectedRevision: 0,
},
});
expect(res.statusCode).toBe(404);
const parsed = parseJsonBody(res);
expect(parsed).toMatchObject({
ok: false,
code: "not_found",
error: "TaskFlow not found.",
result: {
applied: false,
code: "not_found",
},
});
});
it("returns 409 for revision conflicts", async () => {
const { handler, target } = createHandler();
const flow = target.taskFlow.createManaged({
controllerId: "webhooks/zapier",
goal: "Review inbox",
});
const res = await dispatchJsonRequest({
handler,
path: target.path,
secret: target.secret,
body: {
action: "set_waiting",
flowId: flow.flowId,
expectedRevision: flow.revision + 1,
},
});
expect(res.statusCode).toBe(409);
const parsed = parseJsonBody(res);
expect(parsed).toMatchObject({
ok: false,
code: "revision_conflict",
result: {
applied: false,
code: "revision_conflict",
current: {
flowId: flow.flowId,
revision: flow.revision,
},
},
});
});
it("rejects internal runtimes and running-only metadata from external callers", async () => {
const { handler, target } = createHandler();
const flow = target.taskFlow.createManaged({
controllerId: "webhooks/zapier",
goal: "Review inbox",
});
const runtimeRes = await dispatchJsonRequest({
handler,
path: target.path,
secret: target.secret,
body: {
action: "run_task",
flowId: flow.flowId,
runtime: "cli",
task: "Inspect queue",
},
});
expect(runtimeRes.statusCode).toBe(400);
expect(parseJsonBody(runtimeRes)).toMatchObject({
ok: false,
code: "invalid_request",
});
const queuedMetadataRes = await dispatchJsonRequest({
handler,
path: target.path,
secret: target.secret,
body: {
action: "run_task",
flowId: flow.flowId,
runtime: "acp",
task: "Inspect queue",
startedAt: 10,
},
});
expect(queuedMetadataRes.statusCode).toBe(400);
expect(parseJsonBody(queuedMetadataRes)).toMatchObject({
ok: false,
code: "invalid_request",
error:
"status: status must be running when startedAt, lastEventAt, or progressSummary is provided",
});
});
it("reuses the same task record when retried with the same runId", async () => {
const { handler, target } = createHandler();
const flow = target.taskFlow.createManaged({
controllerId: "webhooks/zapier",
goal: "Triage inbox",
});
const first = await dispatchJsonRequest({
handler,
path: target.path,
secret: target.secret,
body: {
action: "run_task",
flowId: flow.flowId,
runtime: "acp",
childSessionKey: "agent:main:subagent:child",
runId: "retry-me",
task: "Inspect the next message batch",
},
});
const second = await dispatchJsonRequest({
handler,
path: target.path,
secret: target.secret,
body: {
action: "run_task",
flowId: flow.flowId,
runtime: "acp",
childSessionKey: "agent:main:subagent:child",
runId: "retry-me",
task: "Inspect the next message batch",
},
});
expect(first.statusCode).toBe(200);
expect(second.statusCode).toBe(200);
const firstParsed = parseJsonBody(first);
const secondParsed = parseJsonBody(second);
expect(firstParsed.result.task.taskId).toBe(secondParsed.result.task.taskId);
expect(target.taskFlow.getTaskSummary(flow.flowId)?.total).toBe(1);
});
it("returns 409 when cancellation targets a terminal flow", async () => {
const { handler, target } = createHandler();
const flow = target.taskFlow.createManaged({
controllerId: "webhooks/zapier",
goal: "Review inbox",
});
const finished = target.taskFlow.finish({
flowId: flow.flowId,
expectedRevision: flow.revision,
});
expect(finished.applied).toBe(true);
const res = await dispatchJsonRequest({
handler,
path: target.path,
secret: target.secret,
body: {
action: "cancel_flow",
flowId: flow.flowId,
},
});
expect(res.statusCode).toBe(409);
expect(parseJsonBody(res)).toMatchObject({
ok: false,
code: "terminal",
error: "Flow is already succeeded.",
result: {
found: true,
cancelled: false,
reason: "Flow is already succeeded.",
},
});
});
});

View File

@@ -120,7 +120,7 @@ const runTaskRequestSchema = z
.object({
action: z.literal("run_task"),
flowId: z.string().trim().min(1),
runtime: z.enum(["subagent", "acp", "cli", "cron"]),
runtime: z.enum(["subagent", "acp"]),
sourceId: z.string().trim().min(1).optional(),
childSessionKey: z.string().trim().min(1).optional(),
parentTaskId: z.string().trim().min(1).optional(),
@@ -130,22 +130,27 @@ const runTaskRequestSchema = z
task: z.string().trim().min(1),
preferMetadata: z.boolean().optional(),
notifyPolicy: z.enum(["done_only", "state_changes", "silent"]).optional(),
deliveryStatus: z
.enum([
"pending",
"delivered",
"session_queued",
"failed",
"parent_missing",
"not_applicable",
])
.optional(),
status: z.enum(["queued", "running"]).optional(),
startedAt: z.number().int().nonnegative().optional(),
lastEventAt: z.number().int().nonnegative().optional(),
progressSummary: nullableStringSchema,
})
.strict();
.strict()
.superRefine((value, ctx) => {
if (
value.status !== "running" &&
(value.startedAt !== undefined ||
value.lastEventAt !== undefined ||
value.progressSummary !== undefined)
) {
ctx.addIssue({
code: z.ZodIssueCode.custom,
message:
"status must be running when startedAt, lastEventAt, or progressSummary is provided",
path: ["status"],
});
}
});
const webhookActionSchema = z.discriminatedUnion("action", [
createFlowRequestSchema,
@@ -366,6 +371,165 @@ function mapMutationResult(
return result;
}
function mapMutationStatus(result: {
applied: boolean;
code?: "not_found" | "not_managed" | "revision_conflict";
}): { statusCode: number; code?: string; error?: string } {
if (result.applied) {
return { statusCode: 200 };
}
switch (result.code) {
case "not_found":
return {
statusCode: 404,
code: "not_found",
error: "TaskFlow not found.",
};
case "not_managed":
return {
statusCode: 409,
code: "not_managed",
error: "TaskFlow is not managed by this webhook surface.",
};
case "revision_conflict":
return {
statusCode: 409,
code: "revision_conflict",
error: "TaskFlow changed since the caller's expected revision.",
};
default:
return {
statusCode: 409,
code: "mutation_rejected",
error: "TaskFlow mutation was rejected.",
};
}
}
function mapRunTaskStatus(result: { created: boolean; found: boolean; reason?: string }): {
statusCode: number;
code?: string;
error?: string;
} {
if (result.created) {
return { statusCode: 200 };
}
if (!result.found) {
return {
statusCode: 404,
code: "not_found",
error: "TaskFlow not found.",
};
}
if (result.reason === "Flow cancellation has already been requested.") {
return {
statusCode: 409,
code: "cancel_requested",
error: result.reason,
};
}
if (result.reason === "Flow does not accept managed child tasks.") {
return {
statusCode: 409,
code: "not_managed",
error: result.reason,
};
}
if (result.reason?.startsWith("Flow is already ")) {
return {
statusCode: 409,
code: "terminal",
error: result.reason,
};
}
return {
statusCode: 409,
code: "task_not_created",
error: result.reason ?? "TaskFlow task was not created.",
};
}
function mapCancelStatus(result: { found: boolean; cancelled: boolean; reason?: string }): {
statusCode: number;
code?: string;
error?: string;
} {
if (result.cancelled) {
return { statusCode: 200 };
}
if (!result.found) {
return {
statusCode: 404,
code: "not_found",
error: "TaskFlow not found.",
};
}
if (result.reason === "One or more child tasks are still active.") {
return {
statusCode: 202,
code: "cancel_pending",
error: result.reason,
};
}
if (result.reason === "Flow changed while cancellation was in progress.") {
return {
statusCode: 409,
code: "revision_conflict",
error: result.reason,
};
}
if (result.reason?.startsWith("Flow is already ")) {
return {
statusCode: 409,
code: "terminal",
error: result.reason,
};
}
return {
statusCode: 409,
code: "cancel_rejected",
error: result.reason ?? "TaskFlow cancellation was rejected.",
};
}
function describeWebhookOutcome(params: { action: WebhookAction; result: unknown }): {
statusCode: number;
code?: string;
error?: string;
} {
switch (params.action.action) {
case "set_waiting":
case "resume_flow":
case "finish_flow":
case "fail_flow":
case "request_cancel":
return mapMutationStatus(
params.result as {
applied: boolean;
code?: "not_found" | "not_managed" | "revision_conflict";
},
);
case "cancel_flow":
return mapCancelStatus(
params.result as {
found: boolean;
cancelled: boolean;
reason?: string;
},
);
case "run_task":
return mapRunTaskStatus(
params.result as {
created: boolean;
found: boolean;
reason?: string;
},
);
default:
return { statusCode: 200 };
}
}
async function executeWebhookAction(params: {
action: WebhookAction;
target: TaskFlowWebhookTarget;
@@ -514,7 +678,6 @@ async function executeWebhookAction(params: {
task: action.task,
preferMetadata: action.preferMetadata,
notifyPolicy: action.notifyPolicy,
deliveryStatus: action.deliveryStatus,
status: action.status,
startedAt: action.startedAt,
lastEventAt: action.lastEventAt,
@@ -613,11 +776,28 @@ export function createTaskFlowWebhookRequestHandler(params: {
target,
cfg: params.cfg,
});
writeJson(res, 200, {
ok: true,
routeId: target.routeId,
const outcome = describeWebhookOutcome({
action: parsed.data,
result,
});
writeJson(
res,
outcome.statusCode,
outcome.statusCode < 400
? {
ok: true,
routeId: target.routeId,
...(outcome.code ? { code: outcome.code } : {}),
result,
}
: {
ok: false,
routeId: target.routeId,
code: outcome.code ?? "request_rejected",
error: outcome.error ?? "request rejected",
result,
},
);
return true;
},
});