From bd3ad3436efde3ed2834c4d20ed80765f4b2cd9e Mon Sep 17 00:00:00 2001 From: Mariano <132747814+mbelinky@users.noreply.github.com> Date: Sun, 19 Apr 2026 13:13:11 +0200 Subject: [PATCH] tasks: add detached runtime plugin registration contract (#68915) * tasks: register detached runtime plugins * tasks: harden detached runtime ownership * tasks: extract detached runtime contract types * changelog: note detached runtime contract * changelog: attribute detached runtime contract --- CHANGELOG.md | 2 + src/commands/tasks.ts | 9 +- src/plugin-sdk/index.ts | 1 + src/plugins/api-builder.ts | 4 + src/plugins/loader.test.ts | 175 +++++++++++++++ src/plugins/loader.ts | 13 ++ src/plugins/registry.ts | 17 ++ .../runtime/runtime-task-test-harness.ts | 2 + src/plugins/runtime/runtime-tasks.test.ts | 53 ++++- src/plugins/runtime/runtime-tasks.ts | 8 +- src/plugins/runtime/runtime-tasks.types.ts | 2 + src/plugins/types.ts | 4 + src/tasks/detached-task-runtime-contract.ts | 120 +++++++++++ src/tasks/detached-task-runtime-state.ts | 54 +++++ src/tasks/detached-task-runtime.test.ts | 34 ++- src/tasks/detached-task-runtime.ts | 91 +++++--- src/tasks/task-executor.test.ts | 203 ++++++++++++++++++ src/tasks/task-executor.ts | 15 +- test/helpers/plugins/plugin-api.ts | 1 + 19 files changed, 759 insertions(+), 49 deletions(-) create mode 100644 src/tasks/detached-task-runtime-contract.ts create mode 100644 src/tasks/detached-task-runtime-state.ts diff --git a/CHANGELOG.md b/CHANGELOG.md index 4c63591cd73..e288dfaeac3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,8 @@ Docs: https://docs.openclaw.ai ### Changes +- Plugins/tasks: add a detached runtime registration contract so plugin executors can own detached task lifecycle and cancellation without reaching into core task internals. (#68915) Thanks @mbelinky. + ### Fixes - Models/Kimi: default bundled Kimi thinking to off and normalize Anthropic-compatible `thinking` payloads so stale session `/think` state no longer silently re-enables reasoning on Kimi runs. (#68907) Thanks @frankekn. diff --git a/src/commands/tasks.ts b/src/commands/tasks.ts index 6ff85a7a5f3..5ed06a30484 100644 --- a/src/commands/tasks.ts +++ b/src/commands/tasks.ts @@ -1,10 +1,7 @@ import type { RuntimeEnv } from "../runtime.js"; import { normalizeOptionalString } from "../shared/string-coerce.js"; -import { - cancelTaskById, - getTaskById, - updateTaskNotifyPolicyById, -} from "../tasks/runtime-internal.js"; +import { getTaskById, updateTaskNotifyPolicyById } from "../tasks/runtime-internal.js"; +import { cancelDetachedTaskRunById } from "../tasks/task-executor.js"; import { listTaskFlowAuditFindings, summarizeTaskFlowAuditFindings, @@ -391,7 +388,7 @@ export async function tasksCancelCommand(opts: { lookup: string }, runtime: Runt runtime.exit(1); return; } - const result = await cancelTaskById({ + const result = await cancelDetachedTaskRunById({ cfg: await loadTaskCancelConfig(), taskId: task.taskId, }); diff --git a/src/plugin-sdk/index.ts b/src/plugin-sdk/index.ts index 7260e8d4798..87d5c3c3fcd 100644 --- a/src/plugin-sdk/index.ts +++ b/src/plugin-sdk/index.ts @@ -64,6 +64,7 @@ export type { export type { BoundTaskFlowsRuntime, BoundTaskRunsRuntime, + DetachedTaskLifecycleRuntime, PluginRuntimeTaskFlows, PluginRuntimeTaskRuns, PluginRuntimeTasks, diff --git a/src/plugins/api-builder.ts b/src/plugins/api-builder.ts index 1c95ea49da2..de9e645574e 100644 --- a/src/plugins/api-builder.ts +++ b/src/plugins/api-builder.ts @@ -48,6 +48,7 @@ export type BuildPluginApiParams = { | "registerContextEngine" | "registerCompactionProvider" | "registerAgentHarness" + | "registerDetachedTaskRuntime" | "registerMemoryCapability" | "registerMemoryPromptSection" | "registerMemoryPromptSupplement" @@ -98,6 +99,7 @@ const noopRegisterCommand: OpenClawPluginApi["registerCommand"] = () => {}; const noopRegisterContextEngine: OpenClawPluginApi["registerContextEngine"] = () => {}; const noopRegisterCompactionProvider: OpenClawPluginApi["registerCompactionProvider"] = () => {}; const noopRegisterAgentHarness: OpenClawPluginApi["registerAgentHarness"] = () => {}; +const noopRegisterDetachedTaskRuntime: OpenClawPluginApi["registerDetachedTaskRuntime"] = () => {}; const noopRegisterMemoryCapability: OpenClawPluginApi["registerMemoryCapability"] = () => {}; const noopRegisterMemoryPromptSection: OpenClawPluginApi["registerMemoryPromptSection"] = () => {}; const noopRegisterMemoryPromptSupplement: OpenClawPluginApi["registerMemoryPromptSupplement"] = @@ -164,6 +166,8 @@ export function buildPluginApi(params: BuildPluginApiParams): OpenClawPluginApi registerCompactionProvider: handlers.registerCompactionProvider ?? noopRegisterCompactionProvider, registerAgentHarness: handlers.registerAgentHarness ?? noopRegisterAgentHarness, + registerDetachedTaskRuntime: + handlers.registerDetachedTaskRuntime ?? noopRegisterDetachedTaskRuntime, registerMemoryCapability: handlers.registerMemoryCapability ?? noopRegisterMemoryCapability, registerMemoryPromptSection: handlers.registerMemoryPromptSection ?? noopRegisterMemoryPromptSection, diff --git a/src/plugins/loader.test.ts b/src/plugins/loader.test.ts index bc3f9ac412d..15c8f966bad 100644 --- a/src/plugins/loader.test.ts +++ b/src/plugins/loader.test.ts @@ -11,6 +11,12 @@ import { triggerInternalHook, } from "../hooks/internal-hooks.js"; import { emitDiagnosticEvent } from "../infra/diagnostic-events.js"; +import { + clearDetachedTaskLifecycleRuntimeRegistration, + getDetachedTaskLifecycleRuntimeRegistration, + registerDetachedTaskLifecycleRuntime, + type DetachedTaskLifecycleRuntime, +} from "../tasks/detached-task-runtime-state.js"; import { withEnv } from "../test-utils/env.js"; import { clearPluginCommands, getPluginCommandSpecs } from "./command-registry-state.js"; import { getGlobalHookRunner, resetGlobalHookRunner } from "./hook-runner-global.js"; @@ -65,6 +71,26 @@ import { import type { PluginSdkResolutionPreference } from "./sdk-alias.js"; let cachedBundledTelegramDir = ""; let cachedBundledMemoryDir = ""; + +function createDetachedTaskRuntimeStub(id: string): DetachedTaskLifecycleRuntime { + const fail = (name: string): never => { + throw new Error(`detached runtime ${id} should not execute ${name} in this test`); + }; + return { + createQueuedTaskRun: () => fail("createQueuedTaskRun"), + createRunningTaskRun: () => fail("createRunningTaskRun"), + startTaskRunByRunId: () => fail("startTaskRunByRunId"), + recordTaskRunProgressByRunId: () => fail("recordTaskRunProgressByRunId"), + completeTaskRunByRunId: () => fail("completeTaskRunByRunId"), + failTaskRunByRunId: () => fail("failTaskRunByRunId"), + setDetachedTaskDeliveryStatusByRunId: () => fail("setDetachedTaskDeliveryStatusByRunId"), + cancelDetachedTaskRunById: async () => ({ + found: true, + cancelled: true, + }), + }; +} + const BUNDLED_TELEGRAM_PLUGIN_BODY = `module.exports = { id: "telegram", register(api) { @@ -2025,6 +2051,155 @@ module.exports = { id: "throws-after-import", register() {} };`, expect(listMemoryEmbeddingProviders()).toEqual([]); }); + it("does not replace the active detached task runtime during non-activating loads", () => { + useNoBundledPlugins(); + const activeRuntime = createDetachedTaskRuntimeStub("active"); + registerDetachedTaskLifecycleRuntime("active-runtime", activeRuntime); + + const plugin = writePlugin({ + id: "snapshot-detached-runtime", + filename: "snapshot-detached-runtime.cjs", + body: `module.exports = { + id: "snapshot-detached-runtime", + register(api) { + api.registerDetachedTaskRuntime({ + createQueuedTaskRun() { throw new Error("snapshot createQueuedTaskRun should not run"); }, + createRunningTaskRun() { throw new Error("snapshot createRunningTaskRun should not run"); }, + startTaskRunByRunId() { throw new Error("snapshot startTaskRunByRunId should not run"); }, + recordTaskRunProgressByRunId() { throw new Error("snapshot recordTaskRunProgressByRunId should not run"); }, + completeTaskRunByRunId() { throw new Error("snapshot completeTaskRunByRunId should not run"); }, + failTaskRunByRunId() { throw new Error("snapshot failTaskRunByRunId should not run"); }, + setDetachedTaskDeliveryStatusByRunId() { throw new Error("snapshot setDetachedTaskDeliveryStatusByRunId should not run"); }, + async cancelDetachedTaskRunById() { return { found: true, cancelled: true }; }, + }); + }, + };`, + }); + + const scoped = loadOpenClawPlugins({ + cache: false, + activate: false, + workspaceDir: plugin.dir, + config: { + plugins: { + load: { paths: [plugin.file] }, + allow: ["snapshot-detached-runtime"], + }, + }, + onlyPluginIds: ["snapshot-detached-runtime"], + }); + + expect(scoped.plugins.find((entry) => entry.id === "snapshot-detached-runtime")?.status).toBe( + "loaded", + ); + expect(getDetachedTaskLifecycleRuntimeRegistration()).toMatchObject({ + pluginId: "active-runtime", + runtime: activeRuntime, + }); + }); + + it("clears newly-registered detached task runtimes when plugin register fails", () => { + useNoBundledPlugins(); + const plugin = writePlugin({ + id: "failing-detached-runtime", + filename: "failing-detached-runtime.cjs", + body: `module.exports = { + id: "failing-detached-runtime", + register(api) { + api.registerDetachedTaskRuntime({ + createQueuedTaskRun() { throw new Error("failing createQueuedTaskRun should not run"); }, + createRunningTaskRun() { throw new Error("failing createRunningTaskRun should not run"); }, + startTaskRunByRunId() { throw new Error("failing startTaskRunByRunId should not run"); }, + recordTaskRunProgressByRunId() { throw new Error("failing recordTaskRunProgressByRunId should not run"); }, + completeTaskRunByRunId() { throw new Error("failing completeTaskRunByRunId should not run"); }, + failTaskRunByRunId() { throw new Error("failing failTaskRunByRunId should not run"); }, + setDetachedTaskDeliveryStatusByRunId() { throw new Error("failing setDetachedTaskDeliveryStatusByRunId should not run"); }, + async cancelDetachedTaskRunById() { return { found: true, cancelled: true }; }, + }); + throw new Error("detached runtime register failed"); + }, + };`, + }); + + const registry = loadOpenClawPlugins({ + cache: false, + workspaceDir: plugin.dir, + config: { + plugins: { + load: { paths: [plugin.file] }, + allow: ["failing-detached-runtime"], + }, + }, + onlyPluginIds: ["failing-detached-runtime"], + }); + + expect(registry.plugins.find((entry) => entry.id === "failing-detached-runtime")?.status).toBe( + "error", + ); + expect(getDetachedTaskLifecycleRuntimeRegistration()).toBeUndefined(); + }); + + it("restores cached detached task runtime registrations on cache hits", () => { + useNoBundledPlugins(); + const plugin = writePlugin({ + id: "cached-detached-runtime", + filename: "cached-detached-runtime.cjs", + body: `module.exports = { + id: "cached-detached-runtime", + register(api) { + api.registerDetachedTaskRuntime({ + createQueuedTaskRun() { throw new Error("cached createQueuedTaskRun should not run"); }, + createRunningTaskRun() { throw new Error("cached createRunningTaskRun should not run"); }, + startTaskRunByRunId() { throw new Error("cached startTaskRunByRunId should not run"); }, + recordTaskRunProgressByRunId() { throw new Error("cached recordTaskRunProgressByRunId should not run"); }, + completeTaskRunByRunId() { throw new Error("cached completeTaskRunByRunId should not run"); }, + failTaskRunByRunId() { throw new Error("cached failTaskRunByRunId should not run"); }, + setDetachedTaskDeliveryStatusByRunId() { throw new Error("cached setDetachedTaskDeliveryStatusByRunId should not run"); }, + async cancelDetachedTaskRunById() { return { found: true, cancelled: true }; }, + }); + }, + };`, + }); + + const loadOptions = { + workspaceDir: plugin.dir, + config: { + plugins: { + load: { paths: [plugin.file] }, + allow: ["cached-detached-runtime"], + }, + }, + onlyPluginIds: ["cached-detached-runtime"], + } satisfies Parameters[0]; + + loadOpenClawPlugins(loadOptions); + expect(getDetachedTaskLifecycleRuntimeRegistration()?.pluginId).toBe("cached-detached-runtime"); + + clearDetachedTaskLifecycleRuntimeRegistration(); + expect(getDetachedTaskLifecycleRuntimeRegistration()).toBeUndefined(); + + loadOpenClawPlugins(loadOptions); + + expect(getDetachedTaskLifecycleRuntimeRegistration()?.pluginId).toBe("cached-detached-runtime"); + }); + + it("clears stale detached task runtime registrations on active reloads when no plugin re-registers one", () => { + useNoBundledPlugins(); + registerDetachedTaskLifecycleRuntime("stale-runtime", createDetachedTaskRuntimeStub("stale")); + + loadOpenClawPlugins({ + cache: false, + config: { + plugins: { + load: { paths: [] }, + allow: [], + }, + }, + }); + + expect(getDetachedTaskLifecycleRuntimeRegistration()).toBeUndefined(); + }); + it("restores cached memory capability public artifacts on cache hits", async () => { useNoBundledPlugins(); const workspaceDir = makeTempDir(); diff --git a/src/plugins/loader.ts b/src/plugins/loader.ts index 91b0e09e08c..42b52a78e72 100644 --- a/src/plugins/loader.ts +++ b/src/plugins/loader.ts @@ -22,6 +22,11 @@ import { normalizeLowercaseStringOrEmpty, normalizeOptionalString, } from "../shared/string-coerce.js"; +import { + clearDetachedTaskLifecycleRuntimeRegistration, + getDetachedTaskLifecycleRuntimeRegistration, + restoreDetachedTaskLifecycleRuntimeRegistration, +} from "../tasks/detached-task-runtime-state.js"; import { resolveUserPath } from "../utils.js"; import { buildPluginApi } from "./api-builder.js"; import { inspectBundleMcpRuntimeSupport } from "./bundle-mcp.js"; @@ -187,6 +192,7 @@ export class PluginLoadReentryError extends Error { type CachedPluginState = { registry: PluginRegistry; + detachedTaskRuntimeRegistration: ReturnType; memoryCapability: ReturnType; memoryCorpusSupplements: ReturnType; agentHarnesses: ReturnType; @@ -225,6 +231,7 @@ export function clearPluginLoaderCache(): void { openAllowlistWarningCache.clear(); clearAgentHarnesses(); clearCompactionProviders(); + clearDetachedTaskLifecycleRuntimeRegistration(); clearMemoryEmbeddingProviders(); clearMemoryPluginState(); } @@ -1441,6 +1448,7 @@ export function loadOpenClawPlugins(options: PluginLoadOptions = {}): PluginRegi if (cached) { restoreRegisteredAgentHarnesses(cached.agentHarnesses); restoreRegisteredCompactionProviders(cached.compactionProviders); + restoreDetachedTaskLifecycleRuntimeRegistration(cached.detachedTaskRuntimeRegistration); restoreRegisteredMemoryEmbeddingProviders(cached.memoryEmbeddingProviders); restoreMemoryPluginState({ capability: cached.memoryCapability, @@ -1472,6 +1480,7 @@ export function loadOpenClawPlugins(options: PluginLoadOptions = {}): PluginRegi clearAgentHarnesses(); clearPluginCommands(); clearPluginInteractiveHandlers(); + clearDetachedTaskLifecycleRuntimeRegistration(); clearMemoryPluginState(); } @@ -2212,6 +2221,7 @@ export function loadOpenClawPlugins(options: PluginLoadOptions = {}): PluginRegi const registrySnapshot = snapshotPluginRegistry(registry); const previousAgentHarnesses = listRegisteredAgentHarnesses(); const previousCompactionProviders = listRegisteredCompactionProviders(); + const previousDetachedTaskRuntimeRegistration = getDetachedTaskLifecycleRuntimeRegistration(); const previousMemoryEmbeddingProviders = listRegisteredMemoryEmbeddingProviders(); const previousMemoryFlushPlanResolver = getMemoryFlushPlanResolver(); const previousMemoryPromptBuilder = getMemoryPromptSectionBuilder(); @@ -2225,6 +2235,7 @@ export function loadOpenClawPlugins(options: PluginLoadOptions = {}): PluginRegi if (!shouldActivate) { restoreRegisteredAgentHarnesses(previousAgentHarnesses); restoreRegisteredCompactionProviders(previousCompactionProviders); + restoreDetachedTaskLifecycleRuntimeRegistration(previousDetachedTaskRuntimeRegistration); restoreRegisteredMemoryEmbeddingProviders(previousMemoryEmbeddingProviders); restoreMemoryPluginState({ corpusSupplements: previousMemoryCorpusSupplements, @@ -2241,6 +2252,7 @@ export function loadOpenClawPlugins(options: PluginLoadOptions = {}): PluginRegi restorePluginRegistry(registry, registrySnapshot); restoreRegisteredAgentHarnesses(previousAgentHarnesses); restoreRegisteredCompactionProviders(previousCompactionProviders); + restoreDetachedTaskLifecycleRuntimeRegistration(previousDetachedTaskRuntimeRegistration); restoreRegisteredMemoryEmbeddingProviders(previousMemoryEmbeddingProviders); restoreMemoryPluginState({ corpusSupplements: previousMemoryCorpusSupplements, @@ -2297,6 +2309,7 @@ export function loadOpenClawPlugins(options: PluginLoadOptions = {}): PluginRegi if (cacheEnabled) { setCachedPluginRegistry(cacheKey, { + detachedTaskRuntimeRegistration: getDetachedTaskLifecycleRuntimeRegistration(), memoryCapability: getMemoryCapabilityRegistration(), memoryCorpusSupplements: listMemoryCorpusSupplements(), registry, diff --git a/src/plugins/registry.ts b/src/plugins/registry.ts index 1a5f30fa352..2e7cc4837ae 100644 --- a/src/plugins/registry.ts +++ b/src/plugins/registry.ts @@ -22,6 +22,10 @@ import { import { normalizePluginGatewayMethodScope } from "../shared/gateway-method-policy.js"; import { resolveGlobalSingleton } from "../shared/global-singleton.js"; import { normalizeOptionalString } from "../shared/string-coerce.js"; +import { + getDetachedTaskLifecycleRuntimeRegistration, + registerDetachedTaskLifecycleRuntime, +} from "../tasks/detached-task-runtime-state.js"; import { resolveUserPath } from "../utils.js"; import { buildPluginApi } from "./api-builder.js"; import { normalizeRegisteredChannelPlugin } from "./channel-validation.js"; @@ -1169,6 +1173,19 @@ export function createPluginRegistry(registryParams: PluginRegistryParams) { registerHttpRoute: (routeParams) => registerHttpRoute(record, routeParams), registerProvider: (provider) => registerProvider(record, provider), registerAgentHarness: (harness) => registerAgentHarness(record, harness), + registerDetachedTaskRuntime: (runtime) => { + const existing = getDetachedTaskLifecycleRuntimeRegistration(); + if (existing && existing.pluginId !== record.id) { + pushDiagnostic({ + level: "error", + pluginId: record.id, + source: record.source, + message: `detached task runtime already registered by ${existing.pluginId}`, + }); + return; + } + registerDetachedTaskLifecycleRuntime(record.id, runtime); + }, registerSpeechProvider: (provider) => registerSpeechProvider(record, provider), registerRealtimeTranscriptionProvider: (provider) => registerRealtimeTranscriptionProvider(record, provider), diff --git a/src/plugins/runtime/runtime-task-test-harness.ts b/src/plugins/runtime/runtime-task-test-harness.ts index 72eb9f62c5b..678ae144b60 100644 --- a/src/plugins/runtime/runtime-task-test-harness.ts +++ b/src/plugins/runtime/runtime-task-test-harness.ts @@ -1,4 +1,5 @@ import { vi } from "vitest"; +import { resetDetachedTaskLifecycleRuntimeForTests } from "../../tasks/detached-task-runtime.js"; import { resetTaskRegistryControlRuntimeForTests, resetTaskRegistryDeliveryRuntimeForTests, @@ -33,6 +34,7 @@ export function installRuntimeTaskDeliveryMock(): void { export function resetRuntimeTaskTestState( taskRegistryOptions?: Parameters[0], ): void { + resetDetachedTaskLifecycleRuntimeForTests(); resetTaskRegistryControlRuntimeForTests(); resetTaskRegistryDeliveryRuntimeForTests(); resetTaskRegistryForTests(taskRegistryOptions); diff --git a/src/plugins/runtime/runtime-tasks.test.ts b/src/plugins/runtime/runtime-tasks.test.ts index 2bc7d0f81a5..edbc435463d 100644 --- a/src/plugins/runtime/runtime-tasks.test.ts +++ b/src/plugins/runtime/runtime-tasks.test.ts @@ -1,4 +1,8 @@ -import { afterEach, beforeEach, describe, expect, it } from "vitest"; +import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; +import { + getDetachedTaskLifecycleRuntime, + setDetachedTaskLifecycleRuntime, +} from "../../tasks/detached-task-runtime.js"; import { getRuntimeTaskMocks, installRuntimeTaskDeliveryMock, @@ -176,6 +180,53 @@ describe("runtime tasks", () => { }); }); + it("routes runtime task cancellation through the detached task runtime seam", async () => { + const legacyTaskFlow = createRuntimeTaskFlow().bindSession({ + sessionKey: "agent:main:main", + }); + const taskRuns = createRuntimeTaskRuns().bindSession({ + sessionKey: "agent:main:main", + }); + + const created = legacyTaskFlow.createManaged({ + controllerId: "tests/runtime-tasks", + goal: "Cancel through runtime seam", + }); + const child = legacyTaskFlow.runTask({ + flowId: created.flowId, + runtime: "acp", + childSessionKey: "agent:main:subagent:child", + runId: "runtime-task-cancel-seam", + task: "Cancel via seam", + status: "running", + startedAt: 22, + lastEventAt: 23, + }); + if (!child.created) { + throw new Error("expected child task creation to succeed"); + } + + const defaultRuntime = getDetachedTaskLifecycleRuntime(); + const cancelDetachedTaskRunByIdSpy = vi.fn( + (...args: Parameters) => + defaultRuntime.cancelDetachedTaskRunById(...args), + ); + setDetachedTaskLifecycleRuntime({ + ...defaultRuntime, + cancelDetachedTaskRunById: cancelDetachedTaskRunByIdSpy, + }); + + await taskRuns.cancel({ + taskId: child.task.taskId, + cfg: {} as never, + }); + + expect(cancelDetachedTaskRunByIdSpy).toHaveBeenCalledWith({ + cfg: {} as never, + taskId: child.task.taskId, + }); + }); + it("does not allow cross-owner task cancellation or leak task details", async () => { const legacyTaskFlow = createRuntimeTaskFlow().bindSession({ sessionKey: "agent:main:main", diff --git a/src/plugins/runtime/runtime-tasks.ts b/src/plugins/runtime/runtime-tasks.ts index 898a12458dc..9406a218305 100644 --- a/src/plugins/runtime/runtime-tasks.ts +++ b/src/plugins/runtime/runtime-tasks.ts @@ -1,4 +1,4 @@ -import { cancelTaskById, listTasksForFlowId } from "../../tasks/runtime-internal.js"; +import { listTasksForFlowId } from "../../tasks/runtime-internal.js"; import { mapTaskFlowDetail, mapTaskFlowView, @@ -6,7 +6,7 @@ import { mapTaskRunDetail, mapTaskRunView, } from "../../tasks/task-domain-views.js"; -import { getFlowTaskSummary } from "../../tasks/task-executor.js"; +import { cancelDetachedTaskRunById, getFlowTaskSummary } from "../../tasks/task-executor.js"; import { getTaskFlowByIdForOwner, listTaskFlowsForOwner, @@ -47,7 +47,7 @@ function assertSessionKey(sessionKey: string | undefined, errorMessage: string): } function mapCancelledTaskResult( - result: Awaited>, + result: Awaited>, ): TaskRunCancelResult { return { found: result.found, @@ -107,7 +107,7 @@ function createBoundTaskRunsRuntime(params: { }; } return mapCancelledTaskResult( - await cancelTaskById({ + await cancelDetachedTaskRunById({ cfg, taskId: task.taskId, }), diff --git a/src/plugins/runtime/runtime-tasks.types.ts b/src/plugins/runtime/runtime-tasks.types.ts index fcd032062cf..28f7db24fe3 100644 --- a/src/plugins/runtime/runtime-tasks.types.ts +++ b/src/plugins/runtime/runtime-tasks.types.ts @@ -1,4 +1,5 @@ import type { OpenClawConfig } from "../../config/types.openclaw.js"; +import type { DetachedTaskLifecycleRuntime } from "../../tasks/detached-task-runtime-contract.js"; import type { TaskDeliveryState } from "../../tasks/task-registry.types.js"; import type { OpenClawPluginToolContext } from "../tool-types.js"; import type { PluginRuntimeTaskFlow } from "./runtime-taskflow.types.js"; @@ -18,6 +19,7 @@ export type { TaskRunDetail, TaskRunView, } from "./task-domain-types.js"; +export type { DetachedTaskLifecycleRuntime } from "../../tasks/detached-task-runtime-contract.js"; export type BoundTaskRunsRuntime = { readonly sessionKey: string; diff --git a/src/plugins/types.ts b/src/plugins/types.ts index eb8385441f9..34dd566d5ab 100644 --- a/src/plugins/types.ts +++ b/src/plugins/types.ts @@ -1980,6 +1980,10 @@ export type OpenClawPluginApi = { ) => void; /** Register an agent harness implementation. */ registerAgentHarness: (harness: AgentHarness) => void; + /** Register the active detached task runtime for this plugin (exclusive slot). */ + registerDetachedTaskRuntime: ( + runtime: import("./runtime/runtime-tasks.types.js").DetachedTaskLifecycleRuntime, + ) => void; /** Register the active memory capability for this memory plugin (exclusive slot). */ registerMemoryCapability: ( capability: import("./memory-state.js").MemoryPluginCapability, diff --git a/src/tasks/detached-task-runtime-contract.ts b/src/tasks/detached-task-runtime-contract.ts new file mode 100644 index 00000000000..e1e82d9f09f --- /dev/null +++ b/src/tasks/detached-task-runtime-contract.ts @@ -0,0 +1,120 @@ +import type { OpenClawConfig } from "../config/types.openclaw.js"; +import type { + TaskDeliveryState, + TaskDeliveryStatus, + TaskNotifyPolicy, + TaskRecord, + TaskRuntime, + TaskScopeKind, + TaskStatus, + TaskTerminalOutcome, +} from "./task-registry.types.js"; + +export type DetachedTaskCreateParams = { + runtime: TaskRuntime; + taskKind?: string; + sourceId?: string; + requesterSessionKey?: string; + ownerKey?: string; + scopeKind?: TaskScopeKind; + requesterOrigin?: TaskDeliveryState["requesterOrigin"]; + parentFlowId?: string; + childSessionKey?: string; + parentTaskId?: string; + agentId?: string; + runId?: string; + label?: string; + task: string; + preferMetadata?: boolean; + notifyPolicy?: TaskNotifyPolicy; + deliveryStatus?: TaskDeliveryStatus; +}; + +export type DetachedRunningTaskCreateParams = DetachedTaskCreateParams & { + startedAt?: number; + lastEventAt?: number; + progressSummary?: string | null; +}; + +export type DetachedTaskStartParams = { + runId: string; + runtime?: TaskRuntime; + sessionKey?: string; + startedAt?: number; + lastEventAt?: number; + progressSummary?: string | null; + eventSummary?: string | null; +}; + +export type DetachedTaskProgressParams = { + runId: string; + runtime?: TaskRuntime; + sessionKey?: string; + lastEventAt?: number; + progressSummary?: string | null; + eventSummary?: string | null; +}; + +export type DetachedTaskCompleteParams = { + runId: string; + runtime?: TaskRuntime; + sessionKey?: string; + endedAt: number; + lastEventAt?: number; + progressSummary?: string | null; + terminalSummary?: string | null; + terminalOutcome?: TaskTerminalOutcome | null; +}; + +export type DetachedTaskFailParams = { + runId: string; + runtime?: TaskRuntime; + sessionKey?: string; + status?: Extract; + endedAt: number; + lastEventAt?: number; + error?: string; + progressSummary?: string | null; + terminalSummary?: string | null; +}; + +export type DetachedTaskDeliveryStatusParams = { + runId: string; + runtime?: TaskRuntime; + sessionKey?: string; + deliveryStatus: TaskDeliveryStatus; +}; + +export type DetachedTaskCancelParams = { + cfg: OpenClawConfig; + taskId: string; +}; + +export type DetachedTaskCancelResult = { + found: boolean; + cancelled: boolean; + reason?: string; + task?: TaskRecord; +}; + +export type DetachedTaskLifecycleRuntime = { + createQueuedTaskRun: (params: DetachedTaskCreateParams) => TaskRecord; + createRunningTaskRun: (params: DetachedRunningTaskCreateParams) => TaskRecord; + startTaskRunByRunId: (params: DetachedTaskStartParams) => TaskRecord[]; + recordTaskRunProgressByRunId: (params: DetachedTaskProgressParams) => TaskRecord[]; + completeTaskRunByRunId: (params: DetachedTaskCompleteParams) => TaskRecord[]; + failTaskRunByRunId: (params: DetachedTaskFailParams) => TaskRecord[]; + setDetachedTaskDeliveryStatusByRunId: (params: DetachedTaskDeliveryStatusParams) => TaskRecord[]; + /** + * Return `found: false` when this runtime does not own the task so core can + * fall back to the legacy detached-task cancel path. + */ + cancelDetachedTaskRunById: ( + params: DetachedTaskCancelParams, + ) => Promise; +}; + +export type DetachedTaskLifecycleRuntimeRegistration = { + pluginId: string; + runtime: DetachedTaskLifecycleRuntime; +}; diff --git a/src/tasks/detached-task-runtime-state.ts b/src/tasks/detached-task-runtime-state.ts new file mode 100644 index 00000000000..bab33b916ae --- /dev/null +++ b/src/tasks/detached-task-runtime-state.ts @@ -0,0 +1,54 @@ +import type { + DetachedTaskLifecycleRuntime, + DetachedTaskLifecycleRuntimeRegistration, +} from "./detached-task-runtime-contract.js"; + +export type { DetachedTaskLifecycleRuntime, DetachedTaskLifecycleRuntimeRegistration }; + +let detachedTaskLifecycleRuntimeRegistration: DetachedTaskLifecycleRuntimeRegistration | undefined; + +export function registerDetachedTaskLifecycleRuntime( + pluginId: string, + runtime: DetachedTaskLifecycleRuntime, +): void { + detachedTaskLifecycleRuntimeRegistration = { + pluginId, + runtime, + }; +} + +export function getDetachedTaskLifecycleRuntimeRegistration(): + | DetachedTaskLifecycleRuntimeRegistration + | undefined { + if (!detachedTaskLifecycleRuntimeRegistration) { + return undefined; + } + return { + pluginId: detachedTaskLifecycleRuntimeRegistration.pluginId, + runtime: detachedTaskLifecycleRuntimeRegistration.runtime, + }; +} + +export function getRegisteredDetachedTaskLifecycleRuntime(): + | DetachedTaskLifecycleRuntime + | undefined { + return detachedTaskLifecycleRuntimeRegistration?.runtime; +} + +export function restoreDetachedTaskLifecycleRuntimeRegistration( + registration: DetachedTaskLifecycleRuntimeRegistration | undefined, +): void { + detachedTaskLifecycleRuntimeRegistration = registration + ? { + pluginId: registration.pluginId, + runtime: registration.runtime, + } + : undefined; +} + +export function clearDetachedTaskLifecycleRuntimeRegistration(): void { + detachedTaskLifecycleRuntimeRegistration = undefined; +} + +export const _resetDetachedTaskLifecycleRuntimeRegistration = + clearDetachedTaskLifecycleRuntimeRegistration; diff --git a/src/tasks/detached-task-runtime.test.ts b/src/tasks/detached-task-runtime.test.ts index 79e0526d7f8..26eaea3280f 100644 --- a/src/tasks/detached-task-runtime.test.ts +++ b/src/tasks/detached-task-runtime.test.ts @@ -1,10 +1,13 @@ import { afterEach, describe, expect, it, vi } from "vitest"; import { + cancelDetachedTaskRunById, completeTaskRunByRunId, createQueuedTaskRun, createRunningTaskRun, failTaskRunByRunId, getDetachedTaskLifecycleRuntime, + getDetachedTaskLifecycleRuntimeRegistration, + registerDetachedTaskRuntime, recordTaskRunProgressByRunId, resetDetachedTaskLifecycleRuntimeForTests, setDetachedTaskLifecycleRuntime, @@ -35,7 +38,7 @@ describe("detached-task-runtime", () => { resetDetachedTaskLifecycleRuntimeForTests(); }); - it("dispatches lifecycle operations through the installed runtime", () => { + it("dispatches lifecycle operations through the installed runtime", async () => { const defaultRuntime = getDetachedTaskLifecycleRuntime(); const queuedTask = createFakeTaskRecord({ taskId: "task-queued", @@ -48,7 +51,7 @@ describe("detached-task-runtime", () => { }); const updatedTasks = [runningTask]; - const fakeRuntime = { + const fakeRuntime: typeof defaultRuntime = { createQueuedTaskRun: vi.fn(() => queuedTask), createRunningTaskRun: vi.fn(() => runningTask), startTaskRunByRunId: vi.fn(() => updatedTasks), @@ -56,6 +59,11 @@ describe("detached-task-runtime", () => { completeTaskRunByRunId: vi.fn(() => updatedTasks), failTaskRunByRunId: vi.fn(() => updatedTasks), setDetachedTaskDeliveryStatusByRunId: vi.fn(() => updatedTasks), + cancelDetachedTaskRunById: vi.fn(async () => ({ + found: true, + cancelled: true, + task: runningTask, + })), }; setDetachedTaskLifecycleRuntime(fakeRuntime); @@ -89,6 +97,10 @@ describe("detached-task-runtime", () => { runId: "run-running", deliveryStatus: "delivered", }); + await cancelDetachedTaskRunById({ + cfg: {} as never, + taskId: runningTask.taskId, + }); expect(fakeRuntime.createQueuedTaskRun).toHaveBeenCalledWith( expect.objectContaining({ runId: "run-queued", task: "Queue task" }), @@ -111,8 +123,26 @@ describe("detached-task-runtime", () => { expect(fakeRuntime.setDetachedTaskDeliveryStatusByRunId).toHaveBeenCalledWith( expect.objectContaining({ runId: "run-running", deliveryStatus: "delivered" }), ); + expect(fakeRuntime.cancelDetachedTaskRunById).toHaveBeenCalledWith({ + cfg: {} as never, + taskId: runningTask.taskId, + }); resetDetachedTaskLifecycleRuntimeForTests(); expect(getDetachedTaskLifecycleRuntime()).toBe(defaultRuntime); }); + + it("tracks registered detached runtimes by plugin id", () => { + const runtime = { + ...getDetachedTaskLifecycleRuntime(), + }; + + registerDetachedTaskRuntime("tests/detached-runtime", runtime); + + expect(getDetachedTaskLifecycleRuntimeRegistration()).toMatchObject({ + pluginId: "tests/detached-runtime", + runtime, + }); + expect(getDetachedTaskLifecycleRuntime()).toBe(runtime); + }); }); diff --git a/src/tasks/detached-task-runtime.ts b/src/tasks/detached-task-runtime.ts index ca133e7f110..643f15af13f 100644 --- a/src/tasks/detached-task-runtime.ts +++ b/src/tasks/detached-task-runtime.ts @@ -1,85 +1,106 @@ +import type { + DetachedTaskLifecycleRuntime, + DetachedTaskLifecycleRuntimeRegistration, +} from "./detached-task-runtime-contract.js"; import { - completeTaskRunByRunId as completeTaskRunByRunIdInCore, - createQueuedTaskRun as createQueuedTaskRunInCore, - createRunningTaskRun as createRunningTaskRunInCore, - failTaskRunByRunId as failTaskRunByRunIdInCore, - recordTaskRunProgressByRunId as recordTaskRunProgressByRunIdInCore, - setDetachedTaskDeliveryStatusByRunId as setDetachedTaskDeliveryStatusByRunIdInCore, - startTaskRunByRunId as startTaskRunByRunIdInCore, + clearDetachedTaskLifecycleRuntimeRegistration, + getDetachedTaskLifecycleRuntimeRegistration as getDetachedTaskLifecycleRuntimeRegistrationState, + getRegisteredDetachedTaskLifecycleRuntime, + registerDetachedTaskLifecycleRuntime, +} from "./detached-task-runtime-state.js"; +import { cancelTaskById as cancelDetachedTaskRunByIdInCore } from "./runtime-internal.js"; +import { + completeTaskRunByRunId as completeTaskRunByRunIdFromExecutor, + createQueuedTaskRun as createQueuedTaskRunFromExecutor, + createRunningTaskRun as createRunningTaskRunFromExecutor, + failTaskRunByRunId as failTaskRunByRunIdFromExecutor, + recordTaskRunProgressByRunId as recordTaskRunProgressByRunIdFromExecutor, + setDetachedTaskDeliveryStatusByRunId as setDetachedTaskDeliveryStatusByRunIdFromExecutor, + startTaskRunByRunId as startTaskRunByRunIdFromExecutor, } from "./task-executor.js"; -export type DetachedTaskLifecycleRuntime = { - createQueuedTaskRun: typeof createQueuedTaskRunInCore; - createRunningTaskRun: typeof createRunningTaskRunInCore; - startTaskRunByRunId: typeof startTaskRunByRunIdInCore; - recordTaskRunProgressByRunId: typeof recordTaskRunProgressByRunIdInCore; - completeTaskRunByRunId: typeof completeTaskRunByRunIdInCore; - failTaskRunByRunId: typeof failTaskRunByRunIdInCore; - setDetachedTaskDeliveryStatusByRunId: typeof setDetachedTaskDeliveryStatusByRunIdInCore; -}; +export type { DetachedTaskLifecycleRuntime, DetachedTaskLifecycleRuntimeRegistration }; const DEFAULT_DETACHED_TASK_LIFECYCLE_RUNTIME: DetachedTaskLifecycleRuntime = { - createQueuedTaskRun: createQueuedTaskRunInCore, - createRunningTaskRun: createRunningTaskRunInCore, - startTaskRunByRunId: startTaskRunByRunIdInCore, - recordTaskRunProgressByRunId: recordTaskRunProgressByRunIdInCore, - completeTaskRunByRunId: completeTaskRunByRunIdInCore, - failTaskRunByRunId: failTaskRunByRunIdInCore, - setDetachedTaskDeliveryStatusByRunId: setDetachedTaskDeliveryStatusByRunIdInCore, + createQueuedTaskRun: createQueuedTaskRunFromExecutor, + createRunningTaskRun: createRunningTaskRunFromExecutor, + startTaskRunByRunId: startTaskRunByRunIdFromExecutor, + recordTaskRunProgressByRunId: recordTaskRunProgressByRunIdFromExecutor, + completeTaskRunByRunId: completeTaskRunByRunIdFromExecutor, + failTaskRunByRunId: failTaskRunByRunIdFromExecutor, + setDetachedTaskDeliveryStatusByRunId: setDetachedTaskDeliveryStatusByRunIdFromExecutor, + cancelDetachedTaskRunById: cancelDetachedTaskRunByIdInCore, }; -let detachedTaskLifecycleRuntime = DEFAULT_DETACHED_TASK_LIFECYCLE_RUNTIME; - export function getDetachedTaskLifecycleRuntime(): DetachedTaskLifecycleRuntime { - return detachedTaskLifecycleRuntime; + return getRegisteredDetachedTaskLifecycleRuntime() ?? DEFAULT_DETACHED_TASK_LIFECYCLE_RUNTIME; +} + +export function getDetachedTaskLifecycleRuntimeRegistration(): + | DetachedTaskLifecycleRuntimeRegistration + | undefined { + return getDetachedTaskLifecycleRuntimeRegistrationState(); +} + +export function registerDetachedTaskRuntime( + pluginId: string, + runtime: DetachedTaskLifecycleRuntime, +): void { + registerDetachedTaskLifecycleRuntime(pluginId, runtime); } export function setDetachedTaskLifecycleRuntime(runtime: DetachedTaskLifecycleRuntime): void { - detachedTaskLifecycleRuntime = runtime; + registerDetachedTaskRuntime("__test__", runtime); } export function resetDetachedTaskLifecycleRuntimeForTests(): void { - detachedTaskLifecycleRuntime = DEFAULT_DETACHED_TASK_LIFECYCLE_RUNTIME; + clearDetachedTaskLifecycleRuntimeRegistration(); } export function createQueuedTaskRun( ...args: Parameters ): ReturnType { - return detachedTaskLifecycleRuntime.createQueuedTaskRun(...args); + return getDetachedTaskLifecycleRuntime().createQueuedTaskRun(...args); } export function createRunningTaskRun( ...args: Parameters ): ReturnType { - return detachedTaskLifecycleRuntime.createRunningTaskRun(...args); + return getDetachedTaskLifecycleRuntime().createRunningTaskRun(...args); } export function startTaskRunByRunId( ...args: Parameters ): ReturnType { - return detachedTaskLifecycleRuntime.startTaskRunByRunId(...args); + return getDetachedTaskLifecycleRuntime().startTaskRunByRunId(...args); } export function recordTaskRunProgressByRunId( ...args: Parameters ): ReturnType { - return detachedTaskLifecycleRuntime.recordTaskRunProgressByRunId(...args); + return getDetachedTaskLifecycleRuntime().recordTaskRunProgressByRunId(...args); } export function completeTaskRunByRunId( ...args: Parameters ): ReturnType { - return detachedTaskLifecycleRuntime.completeTaskRunByRunId(...args); + return getDetachedTaskLifecycleRuntime().completeTaskRunByRunId(...args); } export function failTaskRunByRunId( ...args: Parameters ): ReturnType { - return detachedTaskLifecycleRuntime.failTaskRunByRunId(...args); + return getDetachedTaskLifecycleRuntime().failTaskRunByRunId(...args); } export function setDetachedTaskDeliveryStatusByRunId( ...args: Parameters ): ReturnType { - return detachedTaskLifecycleRuntime.setDetachedTaskDeliveryStatusByRunId(...args); + return getDetachedTaskLifecycleRuntime().setDetachedTaskDeliveryStatusByRunId(...args); +} + +export function cancelDetachedTaskRunById( + ...args: Parameters +): ReturnType { + return getDetachedTaskLifecycleRuntime().cancelDetachedTaskRunById(...args); } diff --git a/src/tasks/task-executor.test.ts b/src/tasks/task-executor.test.ts index c2481686efd..2be30873a66 100644 --- a/src/tasks/task-executor.test.ts +++ b/src/tasks/task-executor.test.ts @@ -3,6 +3,11 @@ import { resetAgentEventsForTest, resetAgentRunContextForTest } from "../infra/a import { resetHeartbeatWakeStateForTests } from "../infra/heartbeat-wake.js"; import { resetSystemEventsForTest } from "../infra/system-events.js"; import { withStateDirEnv } from "../test-helpers/state-dir-env.js"; +import { + getDetachedTaskLifecycleRuntime, + resetDetachedTaskLifecycleRuntimeForTests, + setDetachedTaskLifecycleRuntime, +} from "./detached-task-runtime.js"; import { cancelFlowById, cancelFlowByIdForOwner, @@ -59,6 +64,7 @@ vi.mock("../agents/subagent-control.js", () => ({ async function withTaskExecutorStateDir(run: (stateDir: string) => Promise): Promise { await withStateDirEnv("openclaw-task-executor-", async ({ stateDir }) => { + resetDetachedTaskLifecycleRuntimeForTests(); resetSystemEventsForTest(); resetHeartbeatWakeStateForTests(); resetAgentEventsForTest(); @@ -613,6 +619,147 @@ describe("task-executor", () => { }); }); + it("dispatches detached task cancellation through the registered runtime", async () => { + await withTaskExecutorStateDir(async () => { + hoisted.cancelSessionMock.mockResolvedValue(undefined); + + const child = createRunningTaskRun({ + runtime: "acp", + ownerKey: "agent:main:main", + scopeKind: "session", + childSessionKey: "agent:codex:acp:child", + runId: "run-external-cancel", + task: "Inspect a PR", + startedAt: 10, + deliveryStatus: "pending", + }); + + const defaultRuntime = getDetachedTaskLifecycleRuntime(); + const cancelDetachedTaskRunByIdSpy = vi.fn( + (...args: Parameters) => + defaultRuntime.cancelDetachedTaskRunById(...args), + ); + + setDetachedTaskLifecycleRuntime({ + ...defaultRuntime, + cancelDetachedTaskRunById: cancelDetachedTaskRunByIdSpy, + }); + + const cancelled = await cancelDetachedTaskRunById({ + cfg: {} as never, + taskId: child.taskId, + }); + + expect(cancelDetachedTaskRunByIdSpy).toHaveBeenCalledWith({ + cfg: {} as never, + taskId: child.taskId, + }); + expect(cancelled).toMatchObject({ + found: true, + cancelled: true, + }); + }); + }); + + it("falls back to the legacy canceller when the registered runtime declines task ownership", async () => { + await withTaskExecutorStateDir(async () => { + hoisted.cancelSessionMock.mockResolvedValue(undefined); + + const child = createRunningTaskRun({ + runtime: "acp", + ownerKey: "agent:main:main", + scopeKind: "session", + childSessionKey: "agent:codex:acp:child", + runId: "run-runtime-decline-cancel", + task: "Inspect a PR", + startedAt: 10, + deliveryStatus: "pending", + }); + + const cancelDetachedTaskRunByIdSpy = vi.fn(async () => ({ + found: false, + cancelled: false, + reason: "not owned by runtime", + })); + + setDetachedTaskLifecycleRuntime({ + ...getDetachedTaskLifecycleRuntime(), + cancelDetachedTaskRunById: cancelDetachedTaskRunByIdSpy, + }); + + const cancelled = await cancelDetachedTaskRunById({ + cfg: {} as never, + taskId: child.taskId, + }); + + expect(cancelDetachedTaskRunByIdSpy).toHaveBeenCalledWith({ + cfg: {} as never, + taskId: child.taskId, + }); + expect(cancelled).toMatchObject({ + found: true, + cancelled: true, + }); + expect(getTaskById(child.taskId)).toMatchObject({ + taskId: child.taskId, + status: "cancelled", + }); + expect(hoisted.cancelSessionMock).toHaveBeenCalledWith({ + cfg: {} as never, + sessionKey: "agent:codex:acp:child", + reason: "task-cancel", + }); + }); + }); + + it("does not fall back when the registered runtime claims task ownership", async () => { + await withTaskExecutorStateDir(async () => { + hoisted.cancelSessionMock.mockResolvedValue(undefined); + + const child = createRunningTaskRun({ + runtime: "acp", + ownerKey: "agent:main:main", + scopeKind: "session", + childSessionKey: "agent:codex:acp:child", + runId: "run-runtime-owned-cancel", + task: "Inspect a PR", + startedAt: 10, + deliveryStatus: "pending", + }); + + const cancelDetachedTaskRunByIdSpy = vi.fn(async () => ({ + found: true, + cancelled: false, + reason: "runtime refused cancel", + })); + + setDetachedTaskLifecycleRuntime({ + ...getDetachedTaskLifecycleRuntime(), + cancelDetachedTaskRunById: cancelDetachedTaskRunByIdSpy, + }); + + const cancelled = await cancelDetachedTaskRunById({ + cfg: {} as never, + taskId: child.taskId, + }); + + expect(cancelled).toMatchObject({ + found: true, + cancelled: false, + reason: "runtime refused cancel", + }); + expect(cancelDetachedTaskRunByIdSpy).toHaveBeenCalledWith({ + cfg: {} as never, + taskId: child.taskId, + }); + expect(getTaskById(child.taskId)).toMatchObject({ + taskId: child.taskId, + status: "running", + }); + expect(hoisted.cancelSessionMock).not.toHaveBeenCalled(); + }); + }); + it("cancels active subagent child tasks", async () => { await withTaskExecutorStateDir(async () => { hoisted.killSubagentRunAdminMock.mockResolvedValue({ @@ -651,6 +798,62 @@ describe("task-executor", () => { }); }); + it("routes TaskFlow cancellation through the registered detached runtime", async () => { + await withTaskExecutorStateDir(async () => { + hoisted.cancelSessionMock.mockResolvedValue(undefined); + + const flow = createManagedTaskFlow({ + ownerKey: "agent:main:main", + controllerId: "tests/cancel-flow", + goal: "Cancel linked tasks", + }); + const child = runTaskInFlow({ + flowId: flow.flowId, + runtime: "acp", + childSessionKey: "agent:codex:acp:child", + runId: "run-flow-cancel-via-runtime", + task: "Cancel flow child", + status: "running", + startedAt: 10, + }); + if (!child.created) { + throw new Error("expected child task creation to succeed"); + } + const childTask = child.task; + if (!childTask) { + throw new Error("expected child task payload"); + } + + const defaultRuntime = getDetachedTaskLifecycleRuntime(); + const cancelDetachedTaskRunByIdSpy = vi.fn( + (...args: Parameters) => + defaultRuntime.cancelDetachedTaskRunById(...args), + ); + setDetachedTaskLifecycleRuntime({ + ...defaultRuntime, + cancelDetachedTaskRunById: cancelDetachedTaskRunByIdSpy, + }); + + const cancelled = await cancelFlowById({ + cfg: {} as never, + flowId: flow.flowId, + }); + + expect(cancelDetachedTaskRunByIdSpy).toHaveBeenCalledWith({ + cfg: {} as never, + taskId: childTask.taskId, + }); + expect(cancelled).toMatchObject({ + found: true, + cancelled: true, + flow: { + flowId: flow.flowId, + status: "cancelled", + }, + }); + }); + }); + it("scopes run-id updates to the matching runtime and session", async () => { await withTaskExecutorStateDir(async () => { const victim = createRunningTaskRun({ diff --git a/src/tasks/task-executor.ts b/src/tasks/task-executor.ts index 3ca4df12b33..57219e4218f 100644 --- a/src/tasks/task-executor.ts +++ b/src/tasks/task-executor.ts @@ -1,9 +1,11 @@ import type { OpenClawConfig } from "../config/types.openclaw.js"; import { createSubsystemLogger } from "../logging/subsystem.js"; +import { getRegisteredDetachedTaskLifecycleRuntime } from "./detached-task-runtime-state.js"; import { cancelTaskById, createTaskRecord, findLatestTaskForFlowId, + getTaskById, isParentFlowLinkError, linkTaskToFlowById, listTasksForFlowId, @@ -636,7 +638,7 @@ export async function cancelFlowById(params: { const linkedTasks = listTasksForFlowId(flow.flowId); const activeTasks = linkedTasks.filter((task) => isActiveTaskStatus(task.status)); for (const task of activeTasks) { - await cancelTaskById({ + await cancelDetachedTaskRunById({ cfg: params.cfg, taskId: task.taskId, }); @@ -707,5 +709,16 @@ export async function cancelFlowByIdForOwner(params: { } export async function cancelDetachedTaskRunById(params: { cfg: OpenClawConfig; taskId: string }) { + const task = getTaskById(params.taskId); + if (!task) { + return cancelTaskById(params); + } + const registeredRuntime = getRegisteredDetachedTaskLifecycleRuntime(); + if (registeredRuntime) { + const cancelled = await registeredRuntime.cancelDetachedTaskRunById(params); + if (cancelled.found) { + return cancelled; + } + } return cancelTaskById(params); } diff --git a/test/helpers/plugins/plugin-api.ts b/test/helpers/plugins/plugin-api.ts index df12b26f983..33e922e79ed 100644 --- a/test/helpers/plugins/plugin-api.ts +++ b/test/helpers/plugins/plugin-api.ts @@ -41,6 +41,7 @@ export function createTestPluginApi(api: TestPluginApiInput = {}): OpenClawPlugi registerContextEngine() {}, registerCompactionProvider() {}, registerAgentHarness() {}, + registerDetachedTaskRuntime() {}, registerMemoryCapability() {}, registerMemoryPromptSection() {}, registerMemoryPromptSupplement() {},