fix(tasks): clean orphaned parent-owned acp sessions

This commit is contained in:
Peter Steinberger
2026-04-29 04:35:00 +01:00
parent d130a77a3b
commit 5a9c0efa54
6 changed files with 209 additions and 7 deletions

View File

@@ -47,6 +47,7 @@ Docs: https://docs.openclaw.ai
- NVIDIA/NIM: persist the `NVIDIA_API_KEY` provider marker and mark bundled NVIDIA Chat Completions models as string-content compatible, so NIM models load from `models.json` and OpenAI-compatible subagent calls send plain text content. Fixes #73013 and #50107; refs #73014. Thanks @bautrey, @iot2edge, @ifearghal, and @futhgar.
- Channels/Discord: let text-only configs drop the `GuildVoiceStates` gateway intent and expose a bounded `/gateway/bot` metadata timeout with rate-limited fallback logs, reducing idle CPU and warning floods. Fixes #73709 and #73585. Thanks @sanchezm86 and @trac3r00.
- Agents/sessions: mark same-turn `sessions_send` and A2A reply prompts with an inter-session `isUser=false` envelope before they reach the model, so foreign session output no longer lands as bare active user text. Fixes #73702; refs #73698, #73609, #73595, and #73622. Thanks @alvelda.
- ACP/tasks: sweep orphaned parent-owned ACP sessions whose task records are gone, preserving bound persistent sessions but clearing unbound stale ACPX metadata so old child sessions cannot silently respawn into chat. Fixes #73609. Thanks @joerod26.
- Outbound/security: strip known internal runtime scaffolding such as `<system-reminder>` and `<previous_response>` at the final channel delivery boundary and keep Discord output on targeted tag stripping, so degraded harness replies cannot leak those tags to users. Fixes #73595. Thanks @gabrielexito-stack and @martingarramon.
- Security/Telegram: load Telegram security adapters in read-only audit/doctor, audit malformed Telegram DM `allowFrom` entries even when groups are disabled, and keep allowlist DM audits from counting stale pairing-store senders, so public/shared-DM risk checks stay accurate. Refs #73698. Thanks @xace1825.
- Plugins: remove hidden manifest, provider-owner, bootstrap, and channel metadata caches so plugin installs, manifest edits, and bundled-root changes are visible on the next metadata read while keeping runtime/module loader caches for actual plugin code. Thanks @shakkernerd.

View File

@@ -318,7 +318,7 @@ A sweeper runs every **60 seconds** and handles four things:
Checks whether active tasks still have authoritative runtime backing. ACP/subagent tasks use child-session state, cron tasks use active-job ownership, and chat-backed CLI tasks use the owning run context. If that backing state is gone for more than 5 minutes, the task is marked `lost`.
</Step>
<Step title="ACP session repair">
Closes terminal parent-owned one-shot ACP sessions, and closes stale terminal persistent ACP sessions only when no active conversation binding remains.
Closes terminal or orphaned parent-owned one-shot ACP sessions, and closes stale terminal or orphaned persistent ACP sessions only when no active conversation binding remains.
</Step>
<Step title="Cleanup stamping">
Sets a `cleanupAfter` timestamp on terminal tasks (endedAt + 7 days). During retention, lost tasks still appear in audit as warnings; after `cleanupAfter` expires or when cleanup metadata is missing, they are errors.

View File

@@ -143,7 +143,7 @@ Quick `/acp` flow from chat:
<Accordion title="Lifecycle details">
- Spawn creates or resumes an ACP runtime session, records ACP metadata in the OpenClaw session store, and may create a background task when the run is parent-owned.
- Parent-owned ACP sessions are treated as background work even when the runtime session is persistent; completion and cross-surface delivery go through the parent task notifier rather than acting like a normal user-facing chat session.
- Task maintenance closes terminal parent-owned one-shot ACP sessions. Persistent ACP sessions are preserved while an active conversation binding remains; stale persistent sessions without an active binding are closed so they cannot be silently resumed after the owning task is done.
- Task maintenance closes terminal or orphaned parent-owned one-shot ACP sessions. Persistent ACP sessions are preserved while an active conversation binding remains; stale persistent sessions without an active binding are closed so they cannot be silently resumed after the owning task is done or its task record is gone.
- Bound follow-up messages go directly to the ACP session until the binding is closed, unfocused, reset, or expired.
- Gateway commands stay local. `/acp ...`, `/status`, and `/unfocus` are never sent as normal prompt text to a bound ACP harness.
- `cancel` aborts the active turn when the backend supports cancellation; it does not delete the binding or session metadata.

View File

@@ -68,6 +68,7 @@ function createTaskRegistryMaintenanceHarness(params: {
const currentTasks = new Map(params.tasks.map((task) => [task.taskId, { ...task }]));
const runtime: TaskRegistryMaintenanceRuntime = {
listAcpSessionEntries: async () => [],
readAcpSessionEntry: () =>
acpEntry !== undefined
? ({

View File

@@ -1,5 +1,9 @@
import { getAcpSessionManager } from "../acp/control-plane/manager.js";
import { readAcpSessionEntry } from "../acp/runtime/session-meta.js";
import {
listAcpSessionEntries,
readAcpSessionEntry,
type AcpSessionStoreEntry,
} from "../acp/runtime/session-meta.js";
import { loadSessionStore, resolveStorePath } from "../config/sessions.js";
import type { OpenClawConfig } from "../config/types.openclaw.js";
import { isCronJobActive } from "../cron/active-jobs.js";
@@ -55,6 +59,7 @@ let configuredCronStorePath: string | undefined;
let configuredCronRuntimeAuthoritative = false;
type TaskRegistryMaintenanceRuntime = {
listAcpSessionEntries: typeof listAcpSessionEntries;
readAcpSessionEntry: typeof readAcpSessionEntry;
closeAcpSession?: (params: {
cfg: OpenClawConfig;
@@ -85,6 +90,7 @@ type TaskRegistryMaintenanceRuntime = {
};
const defaultTaskRegistryMaintenanceRuntime: TaskRegistryMaintenanceRuntime = {
listAcpSessionEntries,
readAcpSessionEntry,
closeAcpSession: async ({ cfg, sessionKey, reason }) => {
await getAcpSessionManager().closeSession({
@@ -421,6 +427,24 @@ function hasActiveTaskForChildSession(task: TaskRecord, tasks: TaskRecord[]): bo
);
}
function hasActiveTaskForChildSessionKey(sessionKey: string, tasks: TaskRecord[]): boolean {
const normalized = normalizeOptionalString(sessionKey);
if (!normalized) {
return false;
}
return tasks.some(
(candidate) =>
isActiveTask(candidate) && getNormalizedTaskChildSessionKey(candidate) === normalized,
);
}
function getAcpSessionParentKeys(acpEntry: Pick<AcpSessionStoreEntry, "entry">): string[] {
return [
normalizeOptionalString(acpEntry.entry?.spawnedBy),
normalizeOptionalString(acpEntry.entry?.parentSessionKey),
].filter((value): value is string => Boolean(value));
}
function isParentOwnedAcpSessionTask(
task: TaskRecord,
acpEntry: ReturnType<typeof readAcpSessionEntry>,
@@ -431,13 +455,14 @@ function isParentOwnedAcpSessionTask(
}
const ownerKey = normalizeOptionalString(task.ownerKey);
const requesterKey = normalizeOptionalString(task.requesterSessionKey);
const parentKeys = [
normalizeOptionalString(entry.spawnedBy),
normalizeOptionalString(entry.parentSessionKey),
].filter((value): value is string => Boolean(value));
const parentKeys = getAcpSessionParentKeys({ entry });
return parentKeys.some((parentKey) => parentKey === ownerKey || parentKey === requesterKey);
}
function isParentOwnedAcpSessionEntry(acpEntry: Pick<AcpSessionStoreEntry, "entry">): boolean {
return getAcpSessionParentKeys(acpEntry).length > 0;
}
function hasActiveSessionBinding(sessionKey: string): boolean {
const listBindings = taskRegistryMaintenanceRuntime.listSessionBindingsBySession;
if (!listBindings) {
@@ -471,6 +496,23 @@ function shouldCloseTerminalAcpSession(task: TaskRecord, tasks: TaskRecord[]): b
return !hasActiveSessionBinding(sessionKey);
}
function shouldCloseOrphanedParentOwnedAcpSession(
acpEntry: AcpSessionStoreEntry,
tasks: TaskRecord[],
): boolean {
if (!acpEntry.entry || !acpEntry.acp || !isParentOwnedAcpSessionEntry(acpEntry)) {
return false;
}
const sessionKey = normalizeOptionalString(acpEntry.sessionKey);
if (!sessionKey || hasActiveTaskForChildSessionKey(sessionKey, tasks)) {
return false;
}
if (acpEntry.acp.mode === "oneshot") {
return true;
}
return !hasActiveSessionBinding(sessionKey);
}
async function cleanupTerminalAcpSession(task: TaskRecord, tasks: TaskRecord[]): Promise<void> {
if (!shouldCloseTerminalAcpSession(task, tasks)) {
return;
@@ -512,6 +554,55 @@ async function cleanupTerminalAcpSession(task: TaskRecord, tasks: TaskRecord[]):
}
}
async function cleanupOrphanedParentOwnedAcpSessions(tasks: TaskRecord[]): Promise<void> {
let acpSessions: AcpSessionStoreEntry[];
try {
acpSessions = await taskRegistryMaintenanceRuntime.listAcpSessionEntries({});
} catch (error) {
log.warn("Failed to list ACP sessions during task maintenance", { error });
return;
}
const seenSessionKeys = new Set<string>();
for (const acpEntry of acpSessions) {
const sessionKey = normalizeOptionalString(acpEntry.sessionKey);
if (!sessionKey || seenSessionKeys.has(sessionKey)) {
continue;
}
seenSessionKeys.add(sessionKey);
if (!shouldCloseOrphanedParentOwnedAcpSession(acpEntry, tasks)) {
continue;
}
const closeAcpSession = taskRegistryMaintenanceRuntime.closeAcpSession;
if (!closeAcpSession) {
continue;
}
try {
await closeAcpSession({
cfg: acpEntry.cfg,
sessionKey,
reason: "orphaned-parent-task-cleanup",
});
} catch (error) {
log.warn("Failed to close orphaned parent-owned ACP session during task maintenance", {
sessionKey,
error,
});
continue;
}
try {
await taskRegistryMaintenanceRuntime.unbindSessionBindings?.({
targetSessionKey: sessionKey,
reason: "orphaned-parent-task-cleanup",
});
} catch (error) {
log.warn("Failed to unbind orphaned parent-owned ACP session during task maintenance", {
sessionKey,
error,
});
}
}
}
function markTaskLost(task: TaskRecord, now: number): TaskRecord {
const cleanupAfter = task.cleanupAfter ?? projectTaskLost(task, now).cleanupAfter;
const updated =
@@ -754,6 +845,7 @@ export async function runTaskRegistryMaintenance(): Promise<TaskRegistryMaintena
await yieldToEventLoop();
}
}
await cleanupOrphanedParentOwnedAcpSessions(taskRegistryMaintenanceRuntime.listTaskRecords());
return { reconciled, recovered, cleanupStamped, pruned };
}

View File

@@ -91,6 +91,7 @@ function configureTaskRegistryMaintenanceRuntimeForTest(params: {
currentTasks: Map<string, ReturnType<typeof createTaskRecord>>;
snapshotTasks: ReturnType<typeof createTaskRecord>[];
acpEntry?: AcpSessionStoreEntry;
acpEntries?: AcpSessionStoreEntry[];
sessionBindings?: SessionBindingRecord[];
closeAcpSession?: (params: {
cfg: AcpSessionStoreEntry["cfg"];
@@ -112,6 +113,7 @@ function configureTaskRegistryMaintenanceRuntimeForTest(params: {
storeReadFailed: false,
} satisfies AcpSessionStoreEntry;
setTaskRegistryMaintenanceRuntimeForTests({
listAcpSessionEntries: async () => params.acpEntries ?? [],
readAcpSessionEntry: () => params.acpEntry ?? emptyAcpEntry,
listSessionBindingsBySession: () => params.sessionBindings ?? [],
closeAcpSession: params.closeAcpSession,
@@ -1726,6 +1728,111 @@ describe("task-registry", () => {
});
});
it("closes orphaned parent-owned one-shot ACP sessions after task records are gone", async () => {
await withTaskRegistryTempDir(async (root) => {
process.env.OPENCLAW_STATE_DIR = root;
resetTaskRegistryForTests();
const parentSessionKey = "agent:main:telegram:direct:owner";
const childSessionKey = "agent:claude:acp:orphaned-oneshot";
const closeAcpSession = vi.fn().mockResolvedValue(undefined);
const unbindSessionBindings = vi.fn().mockResolvedValue([]);
configureTaskRegistryMaintenanceRuntimeForTest({
currentTasks: new Map(),
snapshotTasks: [],
acpEntries: [
createAcpSessionStoreEntry({
sessionKey: childSessionKey,
parentSessionKey,
mode: "oneshot",
}),
],
closeAcpSession,
unbindSessionBindings,
});
await runTaskRegistryMaintenance();
expect(closeAcpSession).toHaveBeenCalledWith({
cfg: {},
sessionKey: childSessionKey,
reason: "orphaned-parent-task-cleanup",
});
expect(unbindSessionBindings).toHaveBeenCalledWith({
targetSessionKey: childSessionKey,
reason: "orphaned-parent-task-cleanup",
});
});
});
it("keeps orphaned parent-owned persistent ACP sessions while a binding is active", async () => {
await withTaskRegistryTempDir(async (root) => {
process.env.OPENCLAW_STATE_DIR = root;
resetTaskRegistryForTests();
const parentSessionKey = "agent:main:telegram:direct:owner";
const childSessionKey = "agent:claude:acp:bound-orphaned-persistent";
const closeAcpSession = vi.fn().mockResolvedValue(undefined);
const unbindSessionBindings = vi.fn().mockResolvedValue([]);
configureTaskRegistryMaintenanceRuntimeForTest({
currentTasks: new Map(),
snapshotTasks: [],
acpEntries: [
createAcpSessionStoreEntry({
sessionKey: childSessionKey,
parentSessionKey,
mode: "persistent",
}),
],
sessionBindings: [createSessionBindingRecord({ targetSessionKey: childSessionKey })],
closeAcpSession,
unbindSessionBindings,
});
await runTaskRegistryMaintenance();
expect(closeAcpSession).not.toHaveBeenCalled();
expect(unbindSessionBindings).not.toHaveBeenCalled();
});
});
it("closes orphaned parent-owned persistent ACP sessions without active bindings", async () => {
await withTaskRegistryTempDir(async (root) => {
process.env.OPENCLAW_STATE_DIR = root;
resetTaskRegistryForTests();
const parentSessionKey = "agent:main:telegram:direct:owner";
const childSessionKey = "agent:claude:acp:unbound-orphaned-persistent";
const closeAcpSession = vi.fn().mockResolvedValue(undefined);
const unbindSessionBindings = vi.fn().mockResolvedValue([]);
configureTaskRegistryMaintenanceRuntimeForTest({
currentTasks: new Map(),
snapshotTasks: [],
acpEntries: [
createAcpSessionStoreEntry({
sessionKey: childSessionKey,
parentSessionKey,
mode: "persistent",
}),
],
closeAcpSession,
unbindSessionBindings,
});
await runTaskRegistryMaintenance();
expect(closeAcpSession).toHaveBeenCalledWith({
cfg: {},
sessionKey: childSessionKey,
reason: "orphaned-parent-task-cleanup",
});
expect(unbindSessionBindings).toHaveBeenCalledWith({
targetSessionKey: childSessionKey,
reason: "orphaned-parent-task-cleanup",
});
});
});
it("prunes old terminal tasks during maintenance sweeps", async () => {
await withTaskRegistryTempDir(async (root) => {
process.env.OPENCLAW_STATE_DIR = root;
@@ -1856,6 +1963,7 @@ describe("task-registry", () => {
process.on("unhandledRejection", onUnhandledRejection);
setTaskRegistryMaintenanceRuntimeForTests({
listAcpSessionEntries: async () => [],
readAcpSessionEntry: () => ({
cfg: {} as never,
storePath: "",