tasks: add detached runtime plugin registration contract (#68915)

* tasks: register detached runtime plugins

* tasks: harden detached runtime ownership

* tasks: extract detached runtime contract types

* changelog: note detached runtime contract

* changelog: attribute detached runtime contract
This commit is contained in:
Mariano
2026-04-19 13:13:11 +02:00
committed by GitHub
parent c67a9c5259
commit bd3ad3436e
19 changed files with 759 additions and 49 deletions

View File

@@ -6,6 +6,8 @@ Docs: https://docs.openclaw.ai
### Changes
- Plugins/tasks: add a detached runtime registration contract so plugin executors can own detached task lifecycle and cancellation without reaching into core task internals. (#68915) Thanks @mbelinky.
### Fixes
- Models/Kimi: default bundled Kimi thinking to off and normalize Anthropic-compatible `thinking` payloads so stale session `/think` state no longer silently re-enables reasoning on Kimi runs. (#68907) Thanks @frankekn.

View File

@@ -1,10 +1,7 @@
import type { RuntimeEnv } from "../runtime.js";
import { normalizeOptionalString } from "../shared/string-coerce.js";
import {
cancelTaskById,
getTaskById,
updateTaskNotifyPolicyById,
} from "../tasks/runtime-internal.js";
import { getTaskById, updateTaskNotifyPolicyById } from "../tasks/runtime-internal.js";
import { cancelDetachedTaskRunById } from "../tasks/task-executor.js";
import {
listTaskFlowAuditFindings,
summarizeTaskFlowAuditFindings,
@@ -391,7 +388,7 @@ export async function tasksCancelCommand(opts: { lookup: string }, runtime: Runt
runtime.exit(1);
return;
}
const result = await cancelTaskById({
const result = await cancelDetachedTaskRunById({
cfg: await loadTaskCancelConfig(),
taskId: task.taskId,
});

View File

@@ -64,6 +64,7 @@ export type {
export type {
BoundTaskFlowsRuntime,
BoundTaskRunsRuntime,
DetachedTaskLifecycleRuntime,
PluginRuntimeTaskFlows,
PluginRuntimeTaskRuns,
PluginRuntimeTasks,

View File

@@ -48,6 +48,7 @@ export type BuildPluginApiParams = {
| "registerContextEngine"
| "registerCompactionProvider"
| "registerAgentHarness"
| "registerDetachedTaskRuntime"
| "registerMemoryCapability"
| "registerMemoryPromptSection"
| "registerMemoryPromptSupplement"
@@ -98,6 +99,7 @@ const noopRegisterCommand: OpenClawPluginApi["registerCommand"] = () => {};
const noopRegisterContextEngine: OpenClawPluginApi["registerContextEngine"] = () => {};
const noopRegisterCompactionProvider: OpenClawPluginApi["registerCompactionProvider"] = () => {};
const noopRegisterAgentHarness: OpenClawPluginApi["registerAgentHarness"] = () => {};
const noopRegisterDetachedTaskRuntime: OpenClawPluginApi["registerDetachedTaskRuntime"] = () => {};
const noopRegisterMemoryCapability: OpenClawPluginApi["registerMemoryCapability"] = () => {};
const noopRegisterMemoryPromptSection: OpenClawPluginApi["registerMemoryPromptSection"] = () => {};
const noopRegisterMemoryPromptSupplement: OpenClawPluginApi["registerMemoryPromptSupplement"] =
@@ -164,6 +166,8 @@ export function buildPluginApi(params: BuildPluginApiParams): OpenClawPluginApi
registerCompactionProvider:
handlers.registerCompactionProvider ?? noopRegisterCompactionProvider,
registerAgentHarness: handlers.registerAgentHarness ?? noopRegisterAgentHarness,
registerDetachedTaskRuntime:
handlers.registerDetachedTaskRuntime ?? noopRegisterDetachedTaskRuntime,
registerMemoryCapability: handlers.registerMemoryCapability ?? noopRegisterMemoryCapability,
registerMemoryPromptSection:
handlers.registerMemoryPromptSection ?? noopRegisterMemoryPromptSection,

View File

@@ -11,6 +11,12 @@ import {
triggerInternalHook,
} from "../hooks/internal-hooks.js";
import { emitDiagnosticEvent } from "../infra/diagnostic-events.js";
import {
clearDetachedTaskLifecycleRuntimeRegistration,
getDetachedTaskLifecycleRuntimeRegistration,
registerDetachedTaskLifecycleRuntime,
type DetachedTaskLifecycleRuntime,
} from "../tasks/detached-task-runtime-state.js";
import { withEnv } from "../test-utils/env.js";
import { clearPluginCommands, getPluginCommandSpecs } from "./command-registry-state.js";
import { getGlobalHookRunner, resetGlobalHookRunner } from "./hook-runner-global.js";
@@ -65,6 +71,26 @@ import {
import type { PluginSdkResolutionPreference } from "./sdk-alias.js";
let cachedBundledTelegramDir = "";
let cachedBundledMemoryDir = "";
function createDetachedTaskRuntimeStub(id: string): DetachedTaskLifecycleRuntime {
const fail = (name: string): never => {
throw new Error(`detached runtime ${id} should not execute ${name} in this test`);
};
return {
createQueuedTaskRun: () => fail("createQueuedTaskRun"),
createRunningTaskRun: () => fail("createRunningTaskRun"),
startTaskRunByRunId: () => fail("startTaskRunByRunId"),
recordTaskRunProgressByRunId: () => fail("recordTaskRunProgressByRunId"),
completeTaskRunByRunId: () => fail("completeTaskRunByRunId"),
failTaskRunByRunId: () => fail("failTaskRunByRunId"),
setDetachedTaskDeliveryStatusByRunId: () => fail("setDetachedTaskDeliveryStatusByRunId"),
cancelDetachedTaskRunById: async () => ({
found: true,
cancelled: true,
}),
};
}
const BUNDLED_TELEGRAM_PLUGIN_BODY = `module.exports = {
id: "telegram",
register(api) {
@@ -2025,6 +2051,155 @@ module.exports = { id: "throws-after-import", register() {} };`,
expect(listMemoryEmbeddingProviders()).toEqual([]);
});
it("does not replace the active detached task runtime during non-activating loads", () => {
useNoBundledPlugins();
const activeRuntime = createDetachedTaskRuntimeStub("active");
registerDetachedTaskLifecycleRuntime("active-runtime", activeRuntime);
const plugin = writePlugin({
id: "snapshot-detached-runtime",
filename: "snapshot-detached-runtime.cjs",
body: `module.exports = {
id: "snapshot-detached-runtime",
register(api) {
api.registerDetachedTaskRuntime({
createQueuedTaskRun() { throw new Error("snapshot createQueuedTaskRun should not run"); },
createRunningTaskRun() { throw new Error("snapshot createRunningTaskRun should not run"); },
startTaskRunByRunId() { throw new Error("snapshot startTaskRunByRunId should not run"); },
recordTaskRunProgressByRunId() { throw new Error("snapshot recordTaskRunProgressByRunId should not run"); },
completeTaskRunByRunId() { throw new Error("snapshot completeTaskRunByRunId should not run"); },
failTaskRunByRunId() { throw new Error("snapshot failTaskRunByRunId should not run"); },
setDetachedTaskDeliveryStatusByRunId() { throw new Error("snapshot setDetachedTaskDeliveryStatusByRunId should not run"); },
async cancelDetachedTaskRunById() { return { found: true, cancelled: true }; },
});
},
};`,
});
const scoped = loadOpenClawPlugins({
cache: false,
activate: false,
workspaceDir: plugin.dir,
config: {
plugins: {
load: { paths: [plugin.file] },
allow: ["snapshot-detached-runtime"],
},
},
onlyPluginIds: ["snapshot-detached-runtime"],
});
expect(scoped.plugins.find((entry) => entry.id === "snapshot-detached-runtime")?.status).toBe(
"loaded",
);
expect(getDetachedTaskLifecycleRuntimeRegistration()).toMatchObject({
pluginId: "active-runtime",
runtime: activeRuntime,
});
});
it("clears newly-registered detached task runtimes when plugin register fails", () => {
useNoBundledPlugins();
const plugin = writePlugin({
id: "failing-detached-runtime",
filename: "failing-detached-runtime.cjs",
body: `module.exports = {
id: "failing-detached-runtime",
register(api) {
api.registerDetachedTaskRuntime({
createQueuedTaskRun() { throw new Error("failing createQueuedTaskRun should not run"); },
createRunningTaskRun() { throw new Error("failing createRunningTaskRun should not run"); },
startTaskRunByRunId() { throw new Error("failing startTaskRunByRunId should not run"); },
recordTaskRunProgressByRunId() { throw new Error("failing recordTaskRunProgressByRunId should not run"); },
completeTaskRunByRunId() { throw new Error("failing completeTaskRunByRunId should not run"); },
failTaskRunByRunId() { throw new Error("failing failTaskRunByRunId should not run"); },
setDetachedTaskDeliveryStatusByRunId() { throw new Error("failing setDetachedTaskDeliveryStatusByRunId should not run"); },
async cancelDetachedTaskRunById() { return { found: true, cancelled: true }; },
});
throw new Error("detached runtime register failed");
},
};`,
});
const registry = loadOpenClawPlugins({
cache: false,
workspaceDir: plugin.dir,
config: {
plugins: {
load: { paths: [plugin.file] },
allow: ["failing-detached-runtime"],
},
},
onlyPluginIds: ["failing-detached-runtime"],
});
expect(registry.plugins.find((entry) => entry.id === "failing-detached-runtime")?.status).toBe(
"error",
);
expect(getDetachedTaskLifecycleRuntimeRegistration()).toBeUndefined();
});
it("restores cached detached task runtime registrations on cache hits", () => {
useNoBundledPlugins();
const plugin = writePlugin({
id: "cached-detached-runtime",
filename: "cached-detached-runtime.cjs",
body: `module.exports = {
id: "cached-detached-runtime",
register(api) {
api.registerDetachedTaskRuntime({
createQueuedTaskRun() { throw new Error("cached createQueuedTaskRun should not run"); },
createRunningTaskRun() { throw new Error("cached createRunningTaskRun should not run"); },
startTaskRunByRunId() { throw new Error("cached startTaskRunByRunId should not run"); },
recordTaskRunProgressByRunId() { throw new Error("cached recordTaskRunProgressByRunId should not run"); },
completeTaskRunByRunId() { throw new Error("cached completeTaskRunByRunId should not run"); },
failTaskRunByRunId() { throw new Error("cached failTaskRunByRunId should not run"); },
setDetachedTaskDeliveryStatusByRunId() { throw new Error("cached setDetachedTaskDeliveryStatusByRunId should not run"); },
async cancelDetachedTaskRunById() { return { found: true, cancelled: true }; },
});
},
};`,
});
const loadOptions = {
workspaceDir: plugin.dir,
config: {
plugins: {
load: { paths: [plugin.file] },
allow: ["cached-detached-runtime"],
},
},
onlyPluginIds: ["cached-detached-runtime"],
} satisfies Parameters<typeof loadOpenClawPlugins>[0];
loadOpenClawPlugins(loadOptions);
expect(getDetachedTaskLifecycleRuntimeRegistration()?.pluginId).toBe("cached-detached-runtime");
clearDetachedTaskLifecycleRuntimeRegistration();
expect(getDetachedTaskLifecycleRuntimeRegistration()).toBeUndefined();
loadOpenClawPlugins(loadOptions);
expect(getDetachedTaskLifecycleRuntimeRegistration()?.pluginId).toBe("cached-detached-runtime");
});
it("clears stale detached task runtime registrations on active reloads when no plugin re-registers one", () => {
useNoBundledPlugins();
registerDetachedTaskLifecycleRuntime("stale-runtime", createDetachedTaskRuntimeStub("stale"));
loadOpenClawPlugins({
cache: false,
config: {
plugins: {
load: { paths: [] },
allow: [],
},
},
});
expect(getDetachedTaskLifecycleRuntimeRegistration()).toBeUndefined();
});
it("restores cached memory capability public artifacts on cache hits", async () => {
useNoBundledPlugins();
const workspaceDir = makeTempDir();

View File

@@ -22,6 +22,11 @@ import {
normalizeLowercaseStringOrEmpty,
normalizeOptionalString,
} from "../shared/string-coerce.js";
import {
clearDetachedTaskLifecycleRuntimeRegistration,
getDetachedTaskLifecycleRuntimeRegistration,
restoreDetachedTaskLifecycleRuntimeRegistration,
} from "../tasks/detached-task-runtime-state.js";
import { resolveUserPath } from "../utils.js";
import { buildPluginApi } from "./api-builder.js";
import { inspectBundleMcpRuntimeSupport } from "./bundle-mcp.js";
@@ -187,6 +192,7 @@ export class PluginLoadReentryError extends Error {
type CachedPluginState = {
registry: PluginRegistry;
detachedTaskRuntimeRegistration: ReturnType<typeof getDetachedTaskLifecycleRuntimeRegistration>;
memoryCapability: ReturnType<typeof getMemoryCapabilityRegistration>;
memoryCorpusSupplements: ReturnType<typeof listMemoryCorpusSupplements>;
agentHarnesses: ReturnType<typeof listRegisteredAgentHarnesses>;
@@ -225,6 +231,7 @@ export function clearPluginLoaderCache(): void {
openAllowlistWarningCache.clear();
clearAgentHarnesses();
clearCompactionProviders();
clearDetachedTaskLifecycleRuntimeRegistration();
clearMemoryEmbeddingProviders();
clearMemoryPluginState();
}
@@ -1441,6 +1448,7 @@ export function loadOpenClawPlugins(options: PluginLoadOptions = {}): PluginRegi
if (cached) {
restoreRegisteredAgentHarnesses(cached.agentHarnesses);
restoreRegisteredCompactionProviders(cached.compactionProviders);
restoreDetachedTaskLifecycleRuntimeRegistration(cached.detachedTaskRuntimeRegistration);
restoreRegisteredMemoryEmbeddingProviders(cached.memoryEmbeddingProviders);
restoreMemoryPluginState({
capability: cached.memoryCapability,
@@ -1472,6 +1480,7 @@ export function loadOpenClawPlugins(options: PluginLoadOptions = {}): PluginRegi
clearAgentHarnesses();
clearPluginCommands();
clearPluginInteractiveHandlers();
clearDetachedTaskLifecycleRuntimeRegistration();
clearMemoryPluginState();
}
@@ -2212,6 +2221,7 @@ export function loadOpenClawPlugins(options: PluginLoadOptions = {}): PluginRegi
const registrySnapshot = snapshotPluginRegistry(registry);
const previousAgentHarnesses = listRegisteredAgentHarnesses();
const previousCompactionProviders = listRegisteredCompactionProviders();
const previousDetachedTaskRuntimeRegistration = getDetachedTaskLifecycleRuntimeRegistration();
const previousMemoryEmbeddingProviders = listRegisteredMemoryEmbeddingProviders();
const previousMemoryFlushPlanResolver = getMemoryFlushPlanResolver();
const previousMemoryPromptBuilder = getMemoryPromptSectionBuilder();
@@ -2225,6 +2235,7 @@ export function loadOpenClawPlugins(options: PluginLoadOptions = {}): PluginRegi
if (!shouldActivate) {
restoreRegisteredAgentHarnesses(previousAgentHarnesses);
restoreRegisteredCompactionProviders(previousCompactionProviders);
restoreDetachedTaskLifecycleRuntimeRegistration(previousDetachedTaskRuntimeRegistration);
restoreRegisteredMemoryEmbeddingProviders(previousMemoryEmbeddingProviders);
restoreMemoryPluginState({
corpusSupplements: previousMemoryCorpusSupplements,
@@ -2241,6 +2252,7 @@ export function loadOpenClawPlugins(options: PluginLoadOptions = {}): PluginRegi
restorePluginRegistry(registry, registrySnapshot);
restoreRegisteredAgentHarnesses(previousAgentHarnesses);
restoreRegisteredCompactionProviders(previousCompactionProviders);
restoreDetachedTaskLifecycleRuntimeRegistration(previousDetachedTaskRuntimeRegistration);
restoreRegisteredMemoryEmbeddingProviders(previousMemoryEmbeddingProviders);
restoreMemoryPluginState({
corpusSupplements: previousMemoryCorpusSupplements,
@@ -2297,6 +2309,7 @@ export function loadOpenClawPlugins(options: PluginLoadOptions = {}): PluginRegi
if (cacheEnabled) {
setCachedPluginRegistry(cacheKey, {
detachedTaskRuntimeRegistration: getDetachedTaskLifecycleRuntimeRegistration(),
memoryCapability: getMemoryCapabilityRegistration(),
memoryCorpusSupplements: listMemoryCorpusSupplements(),
registry,

View File

@@ -22,6 +22,10 @@ import {
import { normalizePluginGatewayMethodScope } from "../shared/gateway-method-policy.js";
import { resolveGlobalSingleton } from "../shared/global-singleton.js";
import { normalizeOptionalString } from "../shared/string-coerce.js";
import {
getDetachedTaskLifecycleRuntimeRegistration,
registerDetachedTaskLifecycleRuntime,
} from "../tasks/detached-task-runtime-state.js";
import { resolveUserPath } from "../utils.js";
import { buildPluginApi } from "./api-builder.js";
import { normalizeRegisteredChannelPlugin } from "./channel-validation.js";
@@ -1169,6 +1173,19 @@ export function createPluginRegistry(registryParams: PluginRegistryParams) {
registerHttpRoute: (routeParams) => registerHttpRoute(record, routeParams),
registerProvider: (provider) => registerProvider(record, provider),
registerAgentHarness: (harness) => registerAgentHarness(record, harness),
registerDetachedTaskRuntime: (runtime) => {
const existing = getDetachedTaskLifecycleRuntimeRegistration();
if (existing && existing.pluginId !== record.id) {
pushDiagnostic({
level: "error",
pluginId: record.id,
source: record.source,
message: `detached task runtime already registered by ${existing.pluginId}`,
});
return;
}
registerDetachedTaskLifecycleRuntime(record.id, runtime);
},
registerSpeechProvider: (provider) => registerSpeechProvider(record, provider),
registerRealtimeTranscriptionProvider: (provider) =>
registerRealtimeTranscriptionProvider(record, provider),

View File

@@ -1,4 +1,5 @@
import { vi } from "vitest";
import { resetDetachedTaskLifecycleRuntimeForTests } from "../../tasks/detached-task-runtime.js";
import {
resetTaskRegistryControlRuntimeForTests,
resetTaskRegistryDeliveryRuntimeForTests,
@@ -33,6 +34,7 @@ export function installRuntimeTaskDeliveryMock(): void {
export function resetRuntimeTaskTestState(
taskRegistryOptions?: Parameters<typeof resetTaskRegistryForTests>[0],
): void {
resetDetachedTaskLifecycleRuntimeForTests();
resetTaskRegistryControlRuntimeForTests();
resetTaskRegistryDeliveryRuntimeForTests();
resetTaskRegistryForTests(taskRegistryOptions);

View File

@@ -1,4 +1,8 @@
import { afterEach, beforeEach, describe, expect, it } from "vitest";
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
import {
getDetachedTaskLifecycleRuntime,
setDetachedTaskLifecycleRuntime,
} from "../../tasks/detached-task-runtime.js";
import {
getRuntimeTaskMocks,
installRuntimeTaskDeliveryMock,
@@ -176,6 +180,53 @@ describe("runtime tasks", () => {
});
});
it("routes runtime task cancellation through the detached task runtime seam", async () => {
const legacyTaskFlow = createRuntimeTaskFlow().bindSession({
sessionKey: "agent:main:main",
});
const taskRuns = createRuntimeTaskRuns().bindSession({
sessionKey: "agent:main:main",
});
const created = legacyTaskFlow.createManaged({
controllerId: "tests/runtime-tasks",
goal: "Cancel through runtime seam",
});
const child = legacyTaskFlow.runTask({
flowId: created.flowId,
runtime: "acp",
childSessionKey: "agent:main:subagent:child",
runId: "runtime-task-cancel-seam",
task: "Cancel via seam",
status: "running",
startedAt: 22,
lastEventAt: 23,
});
if (!child.created) {
throw new Error("expected child task creation to succeed");
}
const defaultRuntime = getDetachedTaskLifecycleRuntime();
const cancelDetachedTaskRunByIdSpy = vi.fn(
(...args: Parameters<typeof defaultRuntime.cancelDetachedTaskRunById>) =>
defaultRuntime.cancelDetachedTaskRunById(...args),
);
setDetachedTaskLifecycleRuntime({
...defaultRuntime,
cancelDetachedTaskRunById: cancelDetachedTaskRunByIdSpy,
});
await taskRuns.cancel({
taskId: child.task.taskId,
cfg: {} as never,
});
expect(cancelDetachedTaskRunByIdSpy).toHaveBeenCalledWith({
cfg: {} as never,
taskId: child.task.taskId,
});
});
it("does not allow cross-owner task cancellation or leak task details", async () => {
const legacyTaskFlow = createRuntimeTaskFlow().bindSession({
sessionKey: "agent:main:main",

View File

@@ -1,4 +1,4 @@
import { cancelTaskById, listTasksForFlowId } from "../../tasks/runtime-internal.js";
import { listTasksForFlowId } from "../../tasks/runtime-internal.js";
import {
mapTaskFlowDetail,
mapTaskFlowView,
@@ -6,7 +6,7 @@ import {
mapTaskRunDetail,
mapTaskRunView,
} from "../../tasks/task-domain-views.js";
import { getFlowTaskSummary } from "../../tasks/task-executor.js";
import { cancelDetachedTaskRunById, getFlowTaskSummary } from "../../tasks/task-executor.js";
import {
getTaskFlowByIdForOwner,
listTaskFlowsForOwner,
@@ -47,7 +47,7 @@ function assertSessionKey(sessionKey: string | undefined, errorMessage: string):
}
function mapCancelledTaskResult(
result: Awaited<ReturnType<typeof cancelTaskById>>,
result: Awaited<ReturnType<typeof cancelDetachedTaskRunById>>,
): TaskRunCancelResult {
return {
found: result.found,
@@ -107,7 +107,7 @@ function createBoundTaskRunsRuntime(params: {
};
}
return mapCancelledTaskResult(
await cancelTaskById({
await cancelDetachedTaskRunById({
cfg,
taskId: task.taskId,
}),

View File

@@ -1,4 +1,5 @@
import type { OpenClawConfig } from "../../config/types.openclaw.js";
import type { DetachedTaskLifecycleRuntime } from "../../tasks/detached-task-runtime-contract.js";
import type { TaskDeliveryState } from "../../tasks/task-registry.types.js";
import type { OpenClawPluginToolContext } from "../tool-types.js";
import type { PluginRuntimeTaskFlow } from "./runtime-taskflow.types.js";
@@ -18,6 +19,7 @@ export type {
TaskRunDetail,
TaskRunView,
} from "./task-domain-types.js";
export type { DetachedTaskLifecycleRuntime } from "../../tasks/detached-task-runtime-contract.js";
export type BoundTaskRunsRuntime = {
readonly sessionKey: string;

View File

@@ -1980,6 +1980,10 @@ export type OpenClawPluginApi = {
) => void;
/** Register an agent harness implementation. */
registerAgentHarness: (harness: AgentHarness) => void;
/** Register the active detached task runtime for this plugin (exclusive slot). */
registerDetachedTaskRuntime: (
runtime: import("./runtime/runtime-tasks.types.js").DetachedTaskLifecycleRuntime,
) => void;
/** Register the active memory capability for this memory plugin (exclusive slot). */
registerMemoryCapability: (
capability: import("./memory-state.js").MemoryPluginCapability,

View File

@@ -0,0 +1,120 @@
import type { OpenClawConfig } from "../config/types.openclaw.js";
import type {
TaskDeliveryState,
TaskDeliveryStatus,
TaskNotifyPolicy,
TaskRecord,
TaskRuntime,
TaskScopeKind,
TaskStatus,
TaskTerminalOutcome,
} from "./task-registry.types.js";
export type DetachedTaskCreateParams = {
runtime: TaskRuntime;
taskKind?: string;
sourceId?: string;
requesterSessionKey?: string;
ownerKey?: string;
scopeKind?: TaskScopeKind;
requesterOrigin?: TaskDeliveryState["requesterOrigin"];
parentFlowId?: string;
childSessionKey?: string;
parentTaskId?: string;
agentId?: string;
runId?: string;
label?: string;
task: string;
preferMetadata?: boolean;
notifyPolicy?: TaskNotifyPolicy;
deliveryStatus?: TaskDeliveryStatus;
};
export type DetachedRunningTaskCreateParams = DetachedTaskCreateParams & {
startedAt?: number;
lastEventAt?: number;
progressSummary?: string | null;
};
export type DetachedTaskStartParams = {
runId: string;
runtime?: TaskRuntime;
sessionKey?: string;
startedAt?: number;
lastEventAt?: number;
progressSummary?: string | null;
eventSummary?: string | null;
};
export type DetachedTaskProgressParams = {
runId: string;
runtime?: TaskRuntime;
sessionKey?: string;
lastEventAt?: number;
progressSummary?: string | null;
eventSummary?: string | null;
};
export type DetachedTaskCompleteParams = {
runId: string;
runtime?: TaskRuntime;
sessionKey?: string;
endedAt: number;
lastEventAt?: number;
progressSummary?: string | null;
terminalSummary?: string | null;
terminalOutcome?: TaskTerminalOutcome | null;
};
export type DetachedTaskFailParams = {
runId: string;
runtime?: TaskRuntime;
sessionKey?: string;
status?: Extract<TaskStatus, "failed" | "timed_out" | "cancelled">;
endedAt: number;
lastEventAt?: number;
error?: string;
progressSummary?: string | null;
terminalSummary?: string | null;
};
export type DetachedTaskDeliveryStatusParams = {
runId: string;
runtime?: TaskRuntime;
sessionKey?: string;
deliveryStatus: TaskDeliveryStatus;
};
export type DetachedTaskCancelParams = {
cfg: OpenClawConfig;
taskId: string;
};
export type DetachedTaskCancelResult = {
found: boolean;
cancelled: boolean;
reason?: string;
task?: TaskRecord;
};
export type DetachedTaskLifecycleRuntime = {
createQueuedTaskRun: (params: DetachedTaskCreateParams) => TaskRecord;
createRunningTaskRun: (params: DetachedRunningTaskCreateParams) => TaskRecord;
startTaskRunByRunId: (params: DetachedTaskStartParams) => TaskRecord[];
recordTaskRunProgressByRunId: (params: DetachedTaskProgressParams) => TaskRecord[];
completeTaskRunByRunId: (params: DetachedTaskCompleteParams) => TaskRecord[];
failTaskRunByRunId: (params: DetachedTaskFailParams) => TaskRecord[];
setDetachedTaskDeliveryStatusByRunId: (params: DetachedTaskDeliveryStatusParams) => TaskRecord[];
/**
* Return `found: false` when this runtime does not own the task so core can
* fall back to the legacy detached-task cancel path.
*/
cancelDetachedTaskRunById: (
params: DetachedTaskCancelParams,
) => Promise<DetachedTaskCancelResult>;
};
export type DetachedTaskLifecycleRuntimeRegistration = {
pluginId: string;
runtime: DetachedTaskLifecycleRuntime;
};

View File

@@ -0,0 +1,54 @@
import type {
DetachedTaskLifecycleRuntime,
DetachedTaskLifecycleRuntimeRegistration,
} from "./detached-task-runtime-contract.js";
export type { DetachedTaskLifecycleRuntime, DetachedTaskLifecycleRuntimeRegistration };
let detachedTaskLifecycleRuntimeRegistration: DetachedTaskLifecycleRuntimeRegistration | undefined;
export function registerDetachedTaskLifecycleRuntime(
pluginId: string,
runtime: DetachedTaskLifecycleRuntime,
): void {
detachedTaskLifecycleRuntimeRegistration = {
pluginId,
runtime,
};
}
export function getDetachedTaskLifecycleRuntimeRegistration():
| DetachedTaskLifecycleRuntimeRegistration
| undefined {
if (!detachedTaskLifecycleRuntimeRegistration) {
return undefined;
}
return {
pluginId: detachedTaskLifecycleRuntimeRegistration.pluginId,
runtime: detachedTaskLifecycleRuntimeRegistration.runtime,
};
}
export function getRegisteredDetachedTaskLifecycleRuntime():
| DetachedTaskLifecycleRuntime
| undefined {
return detachedTaskLifecycleRuntimeRegistration?.runtime;
}
export function restoreDetachedTaskLifecycleRuntimeRegistration(
registration: DetachedTaskLifecycleRuntimeRegistration | undefined,
): void {
detachedTaskLifecycleRuntimeRegistration = registration
? {
pluginId: registration.pluginId,
runtime: registration.runtime,
}
: undefined;
}
export function clearDetachedTaskLifecycleRuntimeRegistration(): void {
detachedTaskLifecycleRuntimeRegistration = undefined;
}
export const _resetDetachedTaskLifecycleRuntimeRegistration =
clearDetachedTaskLifecycleRuntimeRegistration;

View File

@@ -1,10 +1,13 @@
import { afterEach, describe, expect, it, vi } from "vitest";
import {
cancelDetachedTaskRunById,
completeTaskRunByRunId,
createQueuedTaskRun,
createRunningTaskRun,
failTaskRunByRunId,
getDetachedTaskLifecycleRuntime,
getDetachedTaskLifecycleRuntimeRegistration,
registerDetachedTaskRuntime,
recordTaskRunProgressByRunId,
resetDetachedTaskLifecycleRuntimeForTests,
setDetachedTaskLifecycleRuntime,
@@ -35,7 +38,7 @@ describe("detached-task-runtime", () => {
resetDetachedTaskLifecycleRuntimeForTests();
});
it("dispatches lifecycle operations through the installed runtime", () => {
it("dispatches lifecycle operations through the installed runtime", async () => {
const defaultRuntime = getDetachedTaskLifecycleRuntime();
const queuedTask = createFakeTaskRecord({
taskId: "task-queued",
@@ -48,7 +51,7 @@ describe("detached-task-runtime", () => {
});
const updatedTasks = [runningTask];
const fakeRuntime = {
const fakeRuntime: typeof defaultRuntime = {
createQueuedTaskRun: vi.fn(() => queuedTask),
createRunningTaskRun: vi.fn(() => runningTask),
startTaskRunByRunId: vi.fn(() => updatedTasks),
@@ -56,6 +59,11 @@ describe("detached-task-runtime", () => {
completeTaskRunByRunId: vi.fn(() => updatedTasks),
failTaskRunByRunId: vi.fn(() => updatedTasks),
setDetachedTaskDeliveryStatusByRunId: vi.fn(() => updatedTasks),
cancelDetachedTaskRunById: vi.fn(async () => ({
found: true,
cancelled: true,
task: runningTask,
})),
};
setDetachedTaskLifecycleRuntime(fakeRuntime);
@@ -89,6 +97,10 @@ describe("detached-task-runtime", () => {
runId: "run-running",
deliveryStatus: "delivered",
});
await cancelDetachedTaskRunById({
cfg: {} as never,
taskId: runningTask.taskId,
});
expect(fakeRuntime.createQueuedTaskRun).toHaveBeenCalledWith(
expect.objectContaining({ runId: "run-queued", task: "Queue task" }),
@@ -111,8 +123,26 @@ describe("detached-task-runtime", () => {
expect(fakeRuntime.setDetachedTaskDeliveryStatusByRunId).toHaveBeenCalledWith(
expect.objectContaining({ runId: "run-running", deliveryStatus: "delivered" }),
);
expect(fakeRuntime.cancelDetachedTaskRunById).toHaveBeenCalledWith({
cfg: {} as never,
taskId: runningTask.taskId,
});
resetDetachedTaskLifecycleRuntimeForTests();
expect(getDetachedTaskLifecycleRuntime()).toBe(defaultRuntime);
});
it("tracks registered detached runtimes by plugin id", () => {
const runtime = {
...getDetachedTaskLifecycleRuntime(),
};
registerDetachedTaskRuntime("tests/detached-runtime", runtime);
expect(getDetachedTaskLifecycleRuntimeRegistration()).toMatchObject({
pluginId: "tests/detached-runtime",
runtime,
});
expect(getDetachedTaskLifecycleRuntime()).toBe(runtime);
});
});

View File

@@ -1,85 +1,106 @@
import type {
DetachedTaskLifecycleRuntime,
DetachedTaskLifecycleRuntimeRegistration,
} from "./detached-task-runtime-contract.js";
import {
completeTaskRunByRunId as completeTaskRunByRunIdInCore,
createQueuedTaskRun as createQueuedTaskRunInCore,
createRunningTaskRun as createRunningTaskRunInCore,
failTaskRunByRunId as failTaskRunByRunIdInCore,
recordTaskRunProgressByRunId as recordTaskRunProgressByRunIdInCore,
setDetachedTaskDeliveryStatusByRunId as setDetachedTaskDeliveryStatusByRunIdInCore,
startTaskRunByRunId as startTaskRunByRunIdInCore,
clearDetachedTaskLifecycleRuntimeRegistration,
getDetachedTaskLifecycleRuntimeRegistration as getDetachedTaskLifecycleRuntimeRegistrationState,
getRegisteredDetachedTaskLifecycleRuntime,
registerDetachedTaskLifecycleRuntime,
} from "./detached-task-runtime-state.js";
import { cancelTaskById as cancelDetachedTaskRunByIdInCore } from "./runtime-internal.js";
import {
completeTaskRunByRunId as completeTaskRunByRunIdFromExecutor,
createQueuedTaskRun as createQueuedTaskRunFromExecutor,
createRunningTaskRun as createRunningTaskRunFromExecutor,
failTaskRunByRunId as failTaskRunByRunIdFromExecutor,
recordTaskRunProgressByRunId as recordTaskRunProgressByRunIdFromExecutor,
setDetachedTaskDeliveryStatusByRunId as setDetachedTaskDeliveryStatusByRunIdFromExecutor,
startTaskRunByRunId as startTaskRunByRunIdFromExecutor,
} from "./task-executor.js";
export type DetachedTaskLifecycleRuntime = {
createQueuedTaskRun: typeof createQueuedTaskRunInCore;
createRunningTaskRun: typeof createRunningTaskRunInCore;
startTaskRunByRunId: typeof startTaskRunByRunIdInCore;
recordTaskRunProgressByRunId: typeof recordTaskRunProgressByRunIdInCore;
completeTaskRunByRunId: typeof completeTaskRunByRunIdInCore;
failTaskRunByRunId: typeof failTaskRunByRunIdInCore;
setDetachedTaskDeliveryStatusByRunId: typeof setDetachedTaskDeliveryStatusByRunIdInCore;
};
export type { DetachedTaskLifecycleRuntime, DetachedTaskLifecycleRuntimeRegistration };
const DEFAULT_DETACHED_TASK_LIFECYCLE_RUNTIME: DetachedTaskLifecycleRuntime = {
createQueuedTaskRun: createQueuedTaskRunInCore,
createRunningTaskRun: createRunningTaskRunInCore,
startTaskRunByRunId: startTaskRunByRunIdInCore,
recordTaskRunProgressByRunId: recordTaskRunProgressByRunIdInCore,
completeTaskRunByRunId: completeTaskRunByRunIdInCore,
failTaskRunByRunId: failTaskRunByRunIdInCore,
setDetachedTaskDeliveryStatusByRunId: setDetachedTaskDeliveryStatusByRunIdInCore,
createQueuedTaskRun: createQueuedTaskRunFromExecutor,
createRunningTaskRun: createRunningTaskRunFromExecutor,
startTaskRunByRunId: startTaskRunByRunIdFromExecutor,
recordTaskRunProgressByRunId: recordTaskRunProgressByRunIdFromExecutor,
completeTaskRunByRunId: completeTaskRunByRunIdFromExecutor,
failTaskRunByRunId: failTaskRunByRunIdFromExecutor,
setDetachedTaskDeliveryStatusByRunId: setDetachedTaskDeliveryStatusByRunIdFromExecutor,
cancelDetachedTaskRunById: cancelDetachedTaskRunByIdInCore,
};
let detachedTaskLifecycleRuntime = DEFAULT_DETACHED_TASK_LIFECYCLE_RUNTIME;
export function getDetachedTaskLifecycleRuntime(): DetachedTaskLifecycleRuntime {
return detachedTaskLifecycleRuntime;
return getRegisteredDetachedTaskLifecycleRuntime() ?? DEFAULT_DETACHED_TASK_LIFECYCLE_RUNTIME;
}
export function getDetachedTaskLifecycleRuntimeRegistration():
| DetachedTaskLifecycleRuntimeRegistration
| undefined {
return getDetachedTaskLifecycleRuntimeRegistrationState();
}
export function registerDetachedTaskRuntime(
pluginId: string,
runtime: DetachedTaskLifecycleRuntime,
): void {
registerDetachedTaskLifecycleRuntime(pluginId, runtime);
}
export function setDetachedTaskLifecycleRuntime(runtime: DetachedTaskLifecycleRuntime): void {
detachedTaskLifecycleRuntime = runtime;
registerDetachedTaskRuntime("__test__", runtime);
}
export function resetDetachedTaskLifecycleRuntimeForTests(): void {
detachedTaskLifecycleRuntime = DEFAULT_DETACHED_TASK_LIFECYCLE_RUNTIME;
clearDetachedTaskLifecycleRuntimeRegistration();
}
export function createQueuedTaskRun(
...args: Parameters<DetachedTaskLifecycleRuntime["createQueuedTaskRun"]>
): ReturnType<DetachedTaskLifecycleRuntime["createQueuedTaskRun"]> {
return detachedTaskLifecycleRuntime.createQueuedTaskRun(...args);
return getDetachedTaskLifecycleRuntime().createQueuedTaskRun(...args);
}
export function createRunningTaskRun(
...args: Parameters<DetachedTaskLifecycleRuntime["createRunningTaskRun"]>
): ReturnType<DetachedTaskLifecycleRuntime["createRunningTaskRun"]> {
return detachedTaskLifecycleRuntime.createRunningTaskRun(...args);
return getDetachedTaskLifecycleRuntime().createRunningTaskRun(...args);
}
export function startTaskRunByRunId(
...args: Parameters<DetachedTaskLifecycleRuntime["startTaskRunByRunId"]>
): ReturnType<DetachedTaskLifecycleRuntime["startTaskRunByRunId"]> {
return detachedTaskLifecycleRuntime.startTaskRunByRunId(...args);
return getDetachedTaskLifecycleRuntime().startTaskRunByRunId(...args);
}
export function recordTaskRunProgressByRunId(
...args: Parameters<DetachedTaskLifecycleRuntime["recordTaskRunProgressByRunId"]>
): ReturnType<DetachedTaskLifecycleRuntime["recordTaskRunProgressByRunId"]> {
return detachedTaskLifecycleRuntime.recordTaskRunProgressByRunId(...args);
return getDetachedTaskLifecycleRuntime().recordTaskRunProgressByRunId(...args);
}
export function completeTaskRunByRunId(
...args: Parameters<DetachedTaskLifecycleRuntime["completeTaskRunByRunId"]>
): ReturnType<DetachedTaskLifecycleRuntime["completeTaskRunByRunId"]> {
return detachedTaskLifecycleRuntime.completeTaskRunByRunId(...args);
return getDetachedTaskLifecycleRuntime().completeTaskRunByRunId(...args);
}
export function failTaskRunByRunId(
...args: Parameters<DetachedTaskLifecycleRuntime["failTaskRunByRunId"]>
): ReturnType<DetachedTaskLifecycleRuntime["failTaskRunByRunId"]> {
return detachedTaskLifecycleRuntime.failTaskRunByRunId(...args);
return getDetachedTaskLifecycleRuntime().failTaskRunByRunId(...args);
}
export function setDetachedTaskDeliveryStatusByRunId(
...args: Parameters<DetachedTaskLifecycleRuntime["setDetachedTaskDeliveryStatusByRunId"]>
): ReturnType<DetachedTaskLifecycleRuntime["setDetachedTaskDeliveryStatusByRunId"]> {
return detachedTaskLifecycleRuntime.setDetachedTaskDeliveryStatusByRunId(...args);
return getDetachedTaskLifecycleRuntime().setDetachedTaskDeliveryStatusByRunId(...args);
}
export function cancelDetachedTaskRunById(
...args: Parameters<DetachedTaskLifecycleRuntime["cancelDetachedTaskRunById"]>
): ReturnType<DetachedTaskLifecycleRuntime["cancelDetachedTaskRunById"]> {
return getDetachedTaskLifecycleRuntime().cancelDetachedTaskRunById(...args);
}

View File

@@ -3,6 +3,11 @@ import { resetAgentEventsForTest, resetAgentRunContextForTest } from "../infra/a
import { resetHeartbeatWakeStateForTests } from "../infra/heartbeat-wake.js";
import { resetSystemEventsForTest } from "../infra/system-events.js";
import { withStateDirEnv } from "../test-helpers/state-dir-env.js";
import {
getDetachedTaskLifecycleRuntime,
resetDetachedTaskLifecycleRuntimeForTests,
setDetachedTaskLifecycleRuntime,
} from "./detached-task-runtime.js";
import {
cancelFlowById,
cancelFlowByIdForOwner,
@@ -59,6 +64,7 @@ vi.mock("../agents/subagent-control.js", () => ({
async function withTaskExecutorStateDir(run: (stateDir: string) => Promise<void>): Promise<void> {
await withStateDirEnv("openclaw-task-executor-", async ({ stateDir }) => {
resetDetachedTaskLifecycleRuntimeForTests();
resetSystemEventsForTest();
resetHeartbeatWakeStateForTests();
resetAgentEventsForTest();
@@ -613,6 +619,147 @@ describe("task-executor", () => {
});
});
it("dispatches detached task cancellation through the registered runtime", async () => {
await withTaskExecutorStateDir(async () => {
hoisted.cancelSessionMock.mockResolvedValue(undefined);
const child = createRunningTaskRun({
runtime: "acp",
ownerKey: "agent:main:main",
scopeKind: "session",
childSessionKey: "agent:codex:acp:child",
runId: "run-external-cancel",
task: "Inspect a PR",
startedAt: 10,
deliveryStatus: "pending",
});
const defaultRuntime = getDetachedTaskLifecycleRuntime();
const cancelDetachedTaskRunByIdSpy = vi.fn(
(...args: Parameters<typeof defaultRuntime.cancelDetachedTaskRunById>) =>
defaultRuntime.cancelDetachedTaskRunById(...args),
);
setDetachedTaskLifecycleRuntime({
...defaultRuntime,
cancelDetachedTaskRunById: cancelDetachedTaskRunByIdSpy,
});
const cancelled = await cancelDetachedTaskRunById({
cfg: {} as never,
taskId: child.taskId,
});
expect(cancelDetachedTaskRunByIdSpy).toHaveBeenCalledWith({
cfg: {} as never,
taskId: child.taskId,
});
expect(cancelled).toMatchObject({
found: true,
cancelled: true,
});
});
});
it("falls back to the legacy canceller when the registered runtime declines task ownership", async () => {
await withTaskExecutorStateDir(async () => {
hoisted.cancelSessionMock.mockResolvedValue(undefined);
const child = createRunningTaskRun({
runtime: "acp",
ownerKey: "agent:main:main",
scopeKind: "session",
childSessionKey: "agent:codex:acp:child",
runId: "run-runtime-decline-cancel",
task: "Inspect a PR",
startedAt: 10,
deliveryStatus: "pending",
});
const cancelDetachedTaskRunByIdSpy = vi.fn(async () => ({
found: false,
cancelled: false,
reason: "not owned by runtime",
}));
setDetachedTaskLifecycleRuntime({
...getDetachedTaskLifecycleRuntime(),
cancelDetachedTaskRunById: cancelDetachedTaskRunByIdSpy,
});
const cancelled = await cancelDetachedTaskRunById({
cfg: {} as never,
taskId: child.taskId,
});
expect(cancelDetachedTaskRunByIdSpy).toHaveBeenCalledWith({
cfg: {} as never,
taskId: child.taskId,
});
expect(cancelled).toMatchObject({
found: true,
cancelled: true,
});
expect(getTaskById(child.taskId)).toMatchObject({
taskId: child.taskId,
status: "cancelled",
});
expect(hoisted.cancelSessionMock).toHaveBeenCalledWith({
cfg: {} as never,
sessionKey: "agent:codex:acp:child",
reason: "task-cancel",
});
});
});
it("does not fall back when the registered runtime claims task ownership", async () => {
await withTaskExecutorStateDir(async () => {
hoisted.cancelSessionMock.mockResolvedValue(undefined);
const child = createRunningTaskRun({
runtime: "acp",
ownerKey: "agent:main:main",
scopeKind: "session",
childSessionKey: "agent:codex:acp:child",
runId: "run-runtime-owned-cancel",
task: "Inspect a PR",
startedAt: 10,
deliveryStatus: "pending",
});
const cancelDetachedTaskRunByIdSpy = vi.fn(async () => ({
found: true,
cancelled: false,
reason: "runtime refused cancel",
}));
setDetachedTaskLifecycleRuntime({
...getDetachedTaskLifecycleRuntime(),
cancelDetachedTaskRunById: cancelDetachedTaskRunByIdSpy,
});
const cancelled = await cancelDetachedTaskRunById({
cfg: {} as never,
taskId: child.taskId,
});
expect(cancelled).toMatchObject({
found: true,
cancelled: false,
reason: "runtime refused cancel",
});
expect(cancelDetachedTaskRunByIdSpy).toHaveBeenCalledWith({
cfg: {} as never,
taskId: child.taskId,
});
expect(getTaskById(child.taskId)).toMatchObject({
taskId: child.taskId,
status: "running",
});
expect(hoisted.cancelSessionMock).not.toHaveBeenCalled();
});
});
it("cancels active subagent child tasks", async () => {
await withTaskExecutorStateDir(async () => {
hoisted.killSubagentRunAdminMock.mockResolvedValue({
@@ -651,6 +798,62 @@ describe("task-executor", () => {
});
});
it("routes TaskFlow cancellation through the registered detached runtime", async () => {
await withTaskExecutorStateDir(async () => {
hoisted.cancelSessionMock.mockResolvedValue(undefined);
const flow = createManagedTaskFlow({
ownerKey: "agent:main:main",
controllerId: "tests/cancel-flow",
goal: "Cancel linked tasks",
});
const child = runTaskInFlow({
flowId: flow.flowId,
runtime: "acp",
childSessionKey: "agent:codex:acp:child",
runId: "run-flow-cancel-via-runtime",
task: "Cancel flow child",
status: "running",
startedAt: 10,
});
if (!child.created) {
throw new Error("expected child task creation to succeed");
}
const childTask = child.task;
if (!childTask) {
throw new Error("expected child task payload");
}
const defaultRuntime = getDetachedTaskLifecycleRuntime();
const cancelDetachedTaskRunByIdSpy = vi.fn(
(...args: Parameters<typeof defaultRuntime.cancelDetachedTaskRunById>) =>
defaultRuntime.cancelDetachedTaskRunById(...args),
);
setDetachedTaskLifecycleRuntime({
...defaultRuntime,
cancelDetachedTaskRunById: cancelDetachedTaskRunByIdSpy,
});
const cancelled = await cancelFlowById({
cfg: {} as never,
flowId: flow.flowId,
});
expect(cancelDetachedTaskRunByIdSpy).toHaveBeenCalledWith({
cfg: {} as never,
taskId: childTask.taskId,
});
expect(cancelled).toMatchObject({
found: true,
cancelled: true,
flow: {
flowId: flow.flowId,
status: "cancelled",
},
});
});
});
it("scopes run-id updates to the matching runtime and session", async () => {
await withTaskExecutorStateDir(async () => {
const victim = createRunningTaskRun({

View File

@@ -1,9 +1,11 @@
import type { OpenClawConfig } from "../config/types.openclaw.js";
import { createSubsystemLogger } from "../logging/subsystem.js";
import { getRegisteredDetachedTaskLifecycleRuntime } from "./detached-task-runtime-state.js";
import {
cancelTaskById,
createTaskRecord,
findLatestTaskForFlowId,
getTaskById,
isParentFlowLinkError,
linkTaskToFlowById,
listTasksForFlowId,
@@ -636,7 +638,7 @@ export async function cancelFlowById(params: {
const linkedTasks = listTasksForFlowId(flow.flowId);
const activeTasks = linkedTasks.filter((task) => isActiveTaskStatus(task.status));
for (const task of activeTasks) {
await cancelTaskById({
await cancelDetachedTaskRunById({
cfg: params.cfg,
taskId: task.taskId,
});
@@ -707,5 +709,16 @@ export async function cancelFlowByIdForOwner(params: {
}
export async function cancelDetachedTaskRunById(params: { cfg: OpenClawConfig; taskId: string }) {
const task = getTaskById(params.taskId);
if (!task) {
return cancelTaskById(params);
}
const registeredRuntime = getRegisteredDetachedTaskLifecycleRuntime();
if (registeredRuntime) {
const cancelled = await registeredRuntime.cancelDetachedTaskRunById(params);
if (cancelled.found) {
return cancelled;
}
}
return cancelTaskById(params);
}

View File

@@ -41,6 +41,7 @@ export function createTestPluginApi(api: TestPluginApiInput = {}): OpenClawPlugi
registerContextEngine() {},
registerCompactionProvider() {},
registerAgentHarness() {},
registerDetachedTaskRuntime() {},
registerMemoryCapability() {},
registerMemoryPromptSection() {},
registerMemoryPromptSupplement() {},