From 5dfc14d49b8429098812f0625fbe2ae455cf2911 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Tue, 28 Apr 2026 21:03:47 +0100 Subject: [PATCH] fix(tasks): close stale terminal acp sessions --- CHANGELOG.md | 2 +- docs/automation/tasks.md | 5 +- docs/channels/telegram.md | 2 + docs/tools/acp-agents.md | 1 + src/tasks/task-registry.maintenance.ts | 141 +++++++++++++++- src/tasks/task-registry.test.ts | 222 ++++++++++++++++++++++++- 6 files changed, 369 insertions(+), 4 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index d88f6457c18..e6e3ca7bcca 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,7 +17,7 @@ Docs: https://docs.openclaw.ai - Control UI/Talk: decode Google Live binary WebSocket JSON frames and stop queued browser audio on interruption or shutdown, so browser Talk leaves `Connecting Talk...` and barge-in no longer plays stale audio. Fixes #73601 and #73460; supersedes #73466. Thanks @Spolen23 and @WadydX. - Channels/Discord: ignore stale route-shaped conversation bindings after a Discord channel is reconfigured to another agent, while preserving explicit focus and subagent bindings. Fixes #73626. Thanks @ramitrkar-hash. - Agents/bootstrap: pass pending BOOTSTRAP.md contents through the first-run user prompt while keeping them out of privileged system context, and show limited bootstrap guidance when workspace file access is unavailable. Fixes #73622. Thanks @mark1010. -- ACP/tasks: classify parent-owned ACP sessions as background work regardless of persistent runtime mode, so delegated ACP output reports through the parent task notifier instead of acting like a normal foreground chat session. Refs #73609. Thanks @joerod26. +- ACP/tasks: classify parent-owned ACP sessions as background work regardless of persistent runtime mode, and close terminal stale ACP sessions when no active binding remains, so delegated ACP output reports through the parent task notifier instead of acting like a normal foreground chat session. Refs #73609. Thanks @joerod26. - Gateway/sessions: add conservative stuck-session recovery that releases only stale session lanes while active embedded runs, reply operations, and lane tasks remain serialized, so queued follow-ups can drain without aborting legitimate long-running turns. Refs #73581, #73655, #73652, #73705, #73647, #73602, #73592, and #73601. Thanks @WS-Q0758, @bryangauvin, @spenceryang1996-dot, @bmilne1981, @mattmcintyre, @Vksh07, and @Spolen23. - Plugins: cache unchanged plugin manifest loads by file signature, reducing repeated JSON/JSON5 parsing and manifest normalization in bursty startup and runtime registry paths. Refs #73532 and #73647; carries forward #73678. Thanks @TheDutchRuler. - Agents/model selection: resolve slash-form aliases before provider/model parsing and keep alias-resolved primary models subject to transient provider cooldowns, so cron and persisted sessions do not retry cooled-down raw aliases. Fixes #73573 and #73657. Thanks @akai-shuuichi and @hashslingers. diff --git a/docs/automation/tasks.md b/docs/automation/tasks.md index 06787115a88..8d1a9b75152 100644 --- a/docs/automation/tasks.md +++ b/docs/automation/tasks.md @@ -311,12 +311,15 @@ autocheckpoint threshold plus periodic and shutdown `TRUNCATE` checkpoints. ### Automatic maintenance -A sweeper runs every **60 seconds** and handles three things: +A sweeper runs every **60 seconds** and handles four things: Checks whether active tasks still have authoritative runtime backing. ACP/subagent tasks use child-session state, cron tasks use active-job ownership, and chat-backed CLI tasks use the owning run context. If that backing state is gone for more than 5 minutes, the task is marked `lost`. + + Closes terminal parent-owned one-shot ACP sessions, and closes stale terminal persistent ACP sessions only when no active conversation binding remains. + Sets a `cleanupAfter` timestamp on terminal tasks (endedAt + 7 days). During retention, lost tasks still appear in audit as warnings; after `cleanupAfter` expires or when cleanup metadata is missing, they are errors. diff --git a/docs/channels/telegram.md b/docs/channels/telegram.md index fca04800a15..5542403b8d6 100644 --- a/docs/channels/telegram.md +++ b/docs/channels/telegram.md @@ -111,6 +111,8 @@ Token resolution order is account-aware. In practice, config values win over env - `open` (requires `allowFrom` to include `"*"`) - `disabled` + `dmPolicy: "open"` with `allowFrom: ["*"]` lets any Telegram account that finds or guesses the bot username command the bot. Use it only for intentionally public bots with tightly restricted tools; one-owner bots should use `allowlist` with numeric user IDs. + `channels.telegram.allowFrom` accepts numeric Telegram user IDs. `telegram:` / `tg:` prefixes are accepted and normalized. `dmPolicy: "allowlist"` with empty `allowFrom` blocks all DMs and is rejected by config validation. Setup asks for numeric user IDs only. diff --git a/docs/tools/acp-agents.md b/docs/tools/acp-agents.md index 74952fd79cf..4b53e39b722 100644 --- a/docs/tools/acp-agents.md +++ b/docs/tools/acp-agents.md @@ -143,6 +143,7 @@ Quick `/acp` flow from chat: - Spawn creates or resumes an ACP runtime session, records ACP metadata in the OpenClaw session store, and may create a background task when the run is parent-owned. - Parent-owned ACP sessions are treated as background work even when the runtime session is persistent; completion and cross-surface delivery go through the parent task notifier rather than acting like a normal user-facing chat session. + - Task maintenance closes terminal parent-owned one-shot ACP sessions. Persistent ACP sessions are preserved while an active conversation binding remains; stale persistent sessions without an active binding are closed so they cannot be silently resumed after the owning task is done. - Bound follow-up messages go directly to the ACP session until the binding is closed, unfocused, reset, or expired. - Gateway commands stay local. `/acp ...`, `/status`, and `/unfocus` are never sent as normal prompt text to a bound ACP harness. - `cancel` aborts the active turn when the backend supports cancellation; it does not delete the binding or session metadata. diff --git a/src/tasks/task-registry.maintenance.ts b/src/tasks/task-registry.maintenance.ts index fc60cf988df..c07bdab1a56 100644 --- a/src/tasks/task-registry.maintenance.ts +++ b/src/tasks/task-registry.maintenance.ts @@ -1,14 +1,21 @@ +import { getAcpSessionManager } from "../acp/control-plane/manager.js"; import { readAcpSessionEntry } from "../acp/runtime/session-meta.js"; import { loadSessionStore, resolveStorePath } from "../config/sessions.js"; +import type { OpenClawConfig } from "../config/types.openclaw.js"; import { isCronJobActive } from "../cron/active-jobs.js"; import { readCronRunLogEntriesSync, resolveCronRunLogPath } from "../cron/run-log.js"; import type { CronRunLogEntry } from "../cron/run-log.js"; import { loadCronStoreSync, resolveCronStorePath } from "../cron/store.js"; import type { CronJob, CronStoreFile } from "../cron/types.js"; import { getAgentRunContext } from "../infra/agent-events.js"; +import { getSessionBindingService } from "../infra/outbound/session-binding-service.js"; +import { createSubsystemLogger } from "../logging/subsystem.js"; import { parseAgentSessionKey } from "../routing/session-key.js"; import { deriveSessionChatType } from "../sessions/session-chat-type.js"; -import { normalizeLowercaseStringOrEmpty } from "../shared/string-coerce.js"; +import { + normalizeLowercaseStringOrEmpty, + normalizeOptionalString, +} from "../shared/string-coerce.js"; import { tryRecoverTaskBeforeMarkLost } from "./detached-task-runtime.js"; import { deleteTaskRecordById, @@ -30,6 +37,7 @@ import type { TaskAuditSummary } from "./task-registry.audit.js"; import { summarizeTaskRecords } from "./task-registry.summary.js"; import type { TaskRecord, TaskRegistrySummary, TaskStatus } from "./task-registry.types.js"; +const log = createSubsystemLogger("tasks/task-registry-maintenance"); const TASK_RECONCILE_GRACE_MS = 5 * 60_000; const TASK_RETENTION_MS = 7 * 24 * 60 * 60_000; const TASK_SWEEP_INTERVAL_MS = 60_000; @@ -48,6 +56,13 @@ let configuredCronRuntimeAuthoritative = false; type TaskRegistryMaintenanceRuntime = { readAcpSessionEntry: typeof readAcpSessionEntry; + closeAcpSession?: (params: { + cfg: OpenClawConfig; + sessionKey: string; + reason: string; + }) => Promise; + listSessionBindingsBySession?: ReturnType["listBySession"]; + unbindSessionBindings?: ReturnType["unbind"]; loadSessionStore: typeof loadSessionStore; resolveStorePath: typeof resolveStorePath; isCronJobActive: typeof isCronJobActive; @@ -71,6 +86,20 @@ type TaskRegistryMaintenanceRuntime = { const defaultTaskRegistryMaintenanceRuntime: TaskRegistryMaintenanceRuntime = { readAcpSessionEntry, + closeAcpSession: async ({ cfg, sessionKey, reason }) => { + await getAcpSessionManager().closeSession({ + cfg, + sessionKey, + reason, + discardPersistentState: true, + clearMeta: true, + allowBackendUnavailable: true, + requireAcpSession: false, + }); + }, + listSessionBindingsBySession: (sessionKey) => + getSessionBindingService().listBySession(sessionKey), + unbindSessionBindings: (input) => getSessionBindingService().unbind(input), loadSessionStore, resolveStorePath, isCronJobActive, @@ -374,6 +403,115 @@ function resolveCleanupAfter(task: TaskRecord): number { return terminalAt + TASK_RETENTION_MS; } +function getNormalizedTaskChildSessionKey(task: TaskRecord): string | undefined { + return normalizeOptionalString(task.childSessionKey); +} + +function isSameTaskChildSession(a: TaskRecord, b: TaskRecord): boolean { + const left = getNormalizedTaskChildSessionKey(a); + return Boolean(left && left === getNormalizedTaskChildSessionKey(b)); +} + +function hasActiveTaskForChildSession(task: TaskRecord, tasks: TaskRecord[]): boolean { + return tasks.some( + (candidate) => + candidate.taskId !== task.taskId && + isActiveTask(candidate) && + isSameTaskChildSession(task, candidate), + ); +} + +function isParentOwnedAcpSessionTask( + task: TaskRecord, + acpEntry: ReturnType, +): boolean { + const entry = acpEntry?.entry; + if (!entry) { + return false; + } + const ownerKey = normalizeOptionalString(task.ownerKey); + const requesterKey = normalizeOptionalString(task.requesterSessionKey); + const parentKeys = [ + normalizeOptionalString(entry.spawnedBy), + normalizeOptionalString(entry.parentSessionKey), + ].filter((value): value is string => Boolean(value)); + return parentKeys.some((parentKey) => parentKey === ownerKey || parentKey === requesterKey); +} + +function hasActiveSessionBinding(sessionKey: string): boolean { + const listBindings = taskRegistryMaintenanceRuntime.listSessionBindingsBySession; + if (!listBindings) { + return true; + } + try { + return listBindings(sessionKey).some((binding) => binding.status !== "ended"); + } catch { + return true; + } +} + +function shouldCloseTerminalAcpSession(task: TaskRecord, tasks: TaskRecord[]): boolean { + if (task.runtime !== "acp" || isActiveTask(task)) { + return false; + } + const sessionKey = getNormalizedTaskChildSessionKey(task); + if (!sessionKey || hasActiveTaskForChildSession(task, tasks)) { + return false; + } + const acpEntry = taskRegistryMaintenanceRuntime.readAcpSessionEntry({ sessionKey }); + if (!acpEntry || acpEntry.storeReadFailed || !acpEntry.acp) { + return false; + } + if (!isParentOwnedAcpSessionTask(task, acpEntry)) { + return false; + } + if (acpEntry.acp.mode === "oneshot") { + return true; + } + return !hasActiveSessionBinding(sessionKey); +} + +async function cleanupTerminalAcpSession(task: TaskRecord, tasks: TaskRecord[]): Promise { + if (!shouldCloseTerminalAcpSession(task, tasks)) { + return; + } + const sessionKey = getNormalizedTaskChildSessionKey(task); + if (!sessionKey) { + return; + } + const acpEntry = taskRegistryMaintenanceRuntime.readAcpSessionEntry({ sessionKey }); + const closeAcpSession = taskRegistryMaintenanceRuntime.closeAcpSession; + if (!acpEntry || !closeAcpSession) { + return; + } + try { + await closeAcpSession({ + cfg: acpEntry.cfg, + sessionKey, + reason: "terminal-task-cleanup", + }); + } catch (error) { + log.warn("Failed to close terminal ACP session during task maintenance", { + sessionKey, + taskId: task.taskId, + error, + }); + return; + } + try { + await taskRegistryMaintenanceRuntime.unbindSessionBindings?.({ + targetSessionKey: sessionKey, + reason: "terminal-task-cleanup", + }); + } catch (error) { + log.warn("Failed to unbind terminal ACP session during task maintenance", { + sessionKey, + taskId: task.taskId, + error, + }); + } +} + function markTaskLost(task: TaskRecord, now: number): TaskRecord { const cleanupAfter = task.cleanupAfter ?? projectTaskLost(task, now).cleanupAfter; const updated = @@ -590,6 +728,7 @@ export async function runTaskRegistryMaintenance(): Promise ({ function configureTaskRegistryMaintenanceRuntimeForTest(params: { currentTasks: Map>; snapshotTasks: ReturnType[]; + acpEntry?: AcpSessionStoreEntry; + sessionBindings?: SessionBindingRecord[]; + closeAcpSession?: (params: { + cfg: AcpSessionStoreEntry["cfg"]; + sessionKey: string; + reason: string; + }) => Promise; + unbindSessionBindings?: (params: { + targetSessionKey?: string; + bindingId?: string; + reason: string; + }) => Promise; }): void { const emptyAcpEntry = { cfg: {} as never, @@ -99,7 +112,10 @@ function configureTaskRegistryMaintenanceRuntimeForTest(params: { storeReadFailed: false, } satisfies AcpSessionStoreEntry; setTaskRegistryMaintenanceRuntimeForTests({ - readAcpSessionEntry: () => emptyAcpEntry, + readAcpSessionEntry: () => params.acpEntry ?? emptyAcpEntry, + listSessionBindingsBySession: () => params.sessionBindings ?? [], + closeAcpSession: params.closeAcpSession, + unbindSessionBindings: params.unbindSessionBindings, loadSessionStore: () => ({}), resolveStorePath: () => "", parseAgentSessionKey: () => null as ParsedAgentSessionKey | null, @@ -154,6 +170,54 @@ function configureTaskRegistryMaintenanceRuntimeForTest(params: { }); } +function createSessionBindingRecord( + overrides: Partial & Pick, +): SessionBindingRecord { + return { + bindingId: overrides.bindingId ?? "binding-1", + targetSessionKey: overrides.targetSessionKey, + targetKind: overrides.targetKind ?? "session", + conversation: overrides.conversation ?? { + channel: "telegram", + accountId: "default", + conversationId: "telegram:thread:1", + }, + status: overrides.status ?? "active", + boundAt: overrides.boundAt ?? Date.now(), + ...(overrides.expiresAt !== undefined ? { expiresAt: overrides.expiresAt } : {}), + ...(overrides.metadata !== undefined ? { metadata: overrides.metadata } : {}), + }; +} + +function createAcpSessionStoreEntry(params: { + sessionKey: string; + parentSessionKey: string; + mode: "persistent" | "oneshot"; +}): AcpSessionStoreEntry { + const acp = { + backend: "acpx", + agent: "claude", + runtimeSessionName: `${params.sessionKey}:runtime`, + mode: params.mode, + state: "idle", + lastActivityAt: Date.now(), + } as const; + return { + cfg: {} as never, + storePath: "/tmp/openclaw-test-sessions.json", + sessionKey: params.sessionKey, + storeSessionKey: params.sessionKey, + entry: { + sessionId: `${params.sessionKey}:session`, + updatedAt: Date.now(), + spawnedBy: params.parentSessionKey, + acp, + }, + acp, + storeReadFailed: false, + }; +} + async function waitForAssertion(assertion: () => void, timeoutMs = 2_000, stepMs = 5) { const startedAt = Date.now(); for (;;) { @@ -1506,6 +1570,162 @@ describe("task-registry", () => { }); }); + it("closes terminal parent-owned one-shot ACP sessions during maintenance", async () => { + await withTaskRegistryTempDir(async (root) => { + process.env.OPENCLAW_STATE_DIR = root; + resetTaskRegistryForTests(); + const now = Date.now(); + const parentSessionKey = "agent:main:telegram:direct:owner"; + const childSessionKey = "agent:claude:acp:stale-oneshot"; + const task = createTaskRecord({ + runtime: "acp", + ownerKey: parentSessionKey, + requesterSessionKey: parentSessionKey, + scopeKind: "session", + childSessionKey, + runId: "run-terminal-acp-oneshot", + task: "Old ACP task", + status: "succeeded", + deliveryStatus: "delivered", + }); + setTaskTimingById({ + taskId: task.taskId, + endedAt: now - 60_000, + lastEventAt: now - 60_000, + }); + const current = getTaskById(task.taskId)!; + const closeAcpSession = vi.fn().mockResolvedValue(undefined); + const unbindSessionBindings = vi.fn().mockResolvedValue([]); + + configureTaskRegistryMaintenanceRuntimeForTest({ + currentTasks: new Map([[task.taskId, current]]), + snapshotTasks: [current], + acpEntry: createAcpSessionStoreEntry({ + sessionKey: childSessionKey, + parentSessionKey, + mode: "oneshot", + }), + closeAcpSession, + unbindSessionBindings, + }); + + expect(await runTaskRegistryMaintenance()).toMatchObject({ + reconciled: 0, + recovered: 0, + pruned: 0, + }); + expect(closeAcpSession).toHaveBeenCalledWith({ + cfg: {}, + sessionKey: childSessionKey, + reason: "terminal-task-cleanup", + }); + expect(unbindSessionBindings).toHaveBeenCalledWith({ + targetSessionKey: childSessionKey, + reason: "terminal-task-cleanup", + }); + }); + }); + + it("closes stale terminal persistent ACP sessions only when no binding remains", async () => { + await withTaskRegistryTempDir(async (root) => { + process.env.OPENCLAW_STATE_DIR = root; + resetTaskRegistryForTests(); + const now = Date.now(); + const parentSessionKey = "agent:main:telegram:direct:owner"; + const childSessionKey = "agent:claude:acp:stale-persistent"; + const task = createTaskRecord({ + runtime: "acp", + ownerKey: parentSessionKey, + requesterSessionKey: parentSessionKey, + scopeKind: "session", + childSessionKey, + runId: "run-terminal-acp-persistent", + task: "Old persistent ACP task", + status: "failed", + deliveryStatus: "failed", + }); + setTaskTimingById({ + taskId: task.taskId, + endedAt: now - 60_000, + lastEventAt: now - 60_000, + }); + const current = getTaskById(task.taskId)!; + const closeAcpSession = vi.fn().mockResolvedValue(undefined); + const unbindSessionBindings = vi.fn().mockResolvedValue([]); + + configureTaskRegistryMaintenanceRuntimeForTest({ + currentTasks: new Map([[task.taskId, current]]), + snapshotTasks: [current], + acpEntry: createAcpSessionStoreEntry({ + sessionKey: childSessionKey, + parentSessionKey, + mode: "persistent", + }), + closeAcpSession, + unbindSessionBindings, + }); + + await runTaskRegistryMaintenance(); + + expect(closeAcpSession).toHaveBeenCalledWith({ + cfg: {}, + sessionKey: childSessionKey, + reason: "terminal-task-cleanup", + }); + expect(unbindSessionBindings).toHaveBeenCalledWith({ + targetSessionKey: childSessionKey, + reason: "terminal-task-cleanup", + }); + }); + }); + + it("keeps terminal persistent ACP sessions that still have an active binding", async () => { + await withTaskRegistryTempDir(async (root) => { + process.env.OPENCLAW_STATE_DIR = root; + resetTaskRegistryForTests(); + const now = Date.now(); + const parentSessionKey = "agent:main:telegram:direct:owner"; + const childSessionKey = "agent:claude:acp:bound-persistent"; + const task = createTaskRecord({ + runtime: "acp", + ownerKey: parentSessionKey, + requesterSessionKey: parentSessionKey, + scopeKind: "session", + childSessionKey, + runId: "run-terminal-acp-bound", + task: "Thread-bound ACP session", + status: "succeeded", + deliveryStatus: "delivered", + }); + setTaskTimingById({ + taskId: task.taskId, + endedAt: now - 60_000, + lastEventAt: now - 60_000, + }); + const current = getTaskById(task.taskId)!; + const closeAcpSession = vi.fn().mockResolvedValue(undefined); + const unbindSessionBindings = vi.fn().mockResolvedValue([]); + + configureTaskRegistryMaintenanceRuntimeForTest({ + currentTasks: new Map([[task.taskId, current]]), + snapshotTasks: [current], + acpEntry: createAcpSessionStoreEntry({ + sessionKey: childSessionKey, + parentSessionKey, + mode: "persistent", + }), + sessionBindings: [createSessionBindingRecord({ targetSessionKey: childSessionKey })], + closeAcpSession, + unbindSessionBindings, + }); + + await runTaskRegistryMaintenance(); + + expect(closeAcpSession).not.toHaveBeenCalled(); + expect(unbindSessionBindings).not.toHaveBeenCalled(); + }); + }); + it("prunes old terminal tasks during maintenance sweeps", async () => { await withTaskRegistryTempDir(async (root) => { process.env.OPENCLAW_STATE_DIR = root;