fix(tasks): close stale terminal acp sessions

This commit is contained in:
Peter Steinberger
2026-04-28 21:03:47 +01:00
parent 3cad579c4e
commit 5dfc14d49b
6 changed files with 369 additions and 4 deletions

View File

@@ -17,7 +17,7 @@ Docs: https://docs.openclaw.ai
- Control UI/Talk: decode Google Live binary WebSocket JSON frames and stop queued browser audio on interruption or shutdown, so browser Talk leaves `Connecting Talk...` and barge-in no longer plays stale audio. Fixes #73601 and #73460; supersedes #73466. Thanks @Spolen23 and @WadydX.
- Channels/Discord: ignore stale route-shaped conversation bindings after a Discord channel is reconfigured to another agent, while preserving explicit focus and subagent bindings. Fixes #73626. Thanks @ramitrkar-hash.
- Agents/bootstrap: pass pending BOOTSTRAP.md contents through the first-run user prompt while keeping them out of privileged system context, and show limited bootstrap guidance when workspace file access is unavailable. Fixes #73622. Thanks @mark1010.
- ACP/tasks: classify parent-owned ACP sessions as background work regardless of persistent runtime mode, so delegated ACP output reports through the parent task notifier instead of acting like a normal foreground chat session. Refs #73609. Thanks @joerod26.
- ACP/tasks: classify parent-owned ACP sessions as background work regardless of persistent runtime mode, and close terminal stale ACP sessions when no active binding remains, so delegated ACP output reports through the parent task notifier instead of acting like a normal foreground chat session. Refs #73609. Thanks @joerod26.
- Gateway/sessions: add conservative stuck-session recovery that releases only stale session lanes while active embedded runs, reply operations, and lane tasks remain serialized, so queued follow-ups can drain without aborting legitimate long-running turns. Refs #73581, #73655, #73652, #73705, #73647, #73602, #73592, and #73601. Thanks @WS-Q0758, @bryangauvin, @spenceryang1996-dot, @bmilne1981, @mattmcintyre, @Vksh07, and @Spolen23.
- Plugins: cache unchanged plugin manifest loads by file signature, reducing repeated JSON/JSON5 parsing and manifest normalization in bursty startup and runtime registry paths. Refs #73532 and #73647; carries forward #73678. Thanks @TheDutchRuler.
- Agents/model selection: resolve slash-form aliases before provider/model parsing and keep alias-resolved primary models subject to transient provider cooldowns, so cron and persisted sessions do not retry cooled-down raw aliases. Fixes #73573 and #73657. Thanks @akai-shuuichi and @hashslingers.

View File

@@ -311,12 +311,15 @@ autocheckpoint threshold plus periodic and shutdown `TRUNCATE` checkpoints.
### Automatic maintenance
A sweeper runs every **60 seconds** and handles three things:
A sweeper runs every **60 seconds** and handles four things:
<Steps>
<Step title="Reconciliation">
Checks whether active tasks still have authoritative runtime backing. ACP/subagent tasks use child-session state, cron tasks use active-job ownership, and chat-backed CLI tasks use the owning run context. If that backing state is gone for more than 5 minutes, the task is marked `lost`.
</Step>
<Step title="ACP session repair">
Closes terminal parent-owned one-shot ACP sessions, and closes stale terminal persistent ACP sessions only when no active conversation binding remains.
</Step>
<Step title="Cleanup stamping">
Sets a `cleanupAfter` timestamp on terminal tasks (endedAt + 7 days). During retention, lost tasks still appear in audit as warnings; after `cleanupAfter` expires or when cleanup metadata is missing, they are errors.
</Step>

View File

@@ -111,6 +111,8 @@ Token resolution order is account-aware. In practice, config values win over env
- `open` (requires `allowFrom` to include `"*"`)
- `disabled`
`dmPolicy: "open"` with `allowFrom: ["*"]` lets any Telegram account that finds or guesses the bot username command the bot. Use it only for intentionally public bots with tightly restricted tools; one-owner bots should use `allowlist` with numeric user IDs.
`channels.telegram.allowFrom` accepts numeric Telegram user IDs. `telegram:` / `tg:` prefixes are accepted and normalized.
`dmPolicy: "allowlist"` with empty `allowFrom` blocks all DMs and is rejected by config validation.
Setup asks for numeric user IDs only.

View File

@@ -143,6 +143,7 @@ Quick `/acp` flow from chat:
<Accordion title="Lifecycle details">
- Spawn creates or resumes an ACP runtime session, records ACP metadata in the OpenClaw session store, and may create a background task when the run is parent-owned.
- Parent-owned ACP sessions are treated as background work even when the runtime session is persistent; completion and cross-surface delivery go through the parent task notifier rather than acting like a normal user-facing chat session.
- Task maintenance closes terminal parent-owned one-shot ACP sessions. Persistent ACP sessions are preserved while an active conversation binding remains; stale persistent sessions without an active binding are closed so they cannot be silently resumed after the owning task is done.
- Bound follow-up messages go directly to the ACP session until the binding is closed, unfocused, reset, or expired.
- Gateway commands stay local. `/acp ...`, `/status`, and `/unfocus` are never sent as normal prompt text to a bound ACP harness.
- `cancel` aborts the active turn when the backend supports cancellation; it does not delete the binding or session metadata.

View File

@@ -1,14 +1,21 @@
import { getAcpSessionManager } from "../acp/control-plane/manager.js";
import { readAcpSessionEntry } from "../acp/runtime/session-meta.js";
import { loadSessionStore, resolveStorePath } from "../config/sessions.js";
import type { OpenClawConfig } from "../config/types.openclaw.js";
import { isCronJobActive } from "../cron/active-jobs.js";
import { readCronRunLogEntriesSync, resolveCronRunLogPath } from "../cron/run-log.js";
import type { CronRunLogEntry } from "../cron/run-log.js";
import { loadCronStoreSync, resolveCronStorePath } from "../cron/store.js";
import type { CronJob, CronStoreFile } from "../cron/types.js";
import { getAgentRunContext } from "../infra/agent-events.js";
import { getSessionBindingService } from "../infra/outbound/session-binding-service.js";
import { createSubsystemLogger } from "../logging/subsystem.js";
import { parseAgentSessionKey } from "../routing/session-key.js";
import { deriveSessionChatType } from "../sessions/session-chat-type.js";
import { normalizeLowercaseStringOrEmpty } from "../shared/string-coerce.js";
import {
normalizeLowercaseStringOrEmpty,
normalizeOptionalString,
} from "../shared/string-coerce.js";
import { tryRecoverTaskBeforeMarkLost } from "./detached-task-runtime.js";
import {
deleteTaskRecordById,
@@ -30,6 +37,7 @@ import type { TaskAuditSummary } from "./task-registry.audit.js";
import { summarizeTaskRecords } from "./task-registry.summary.js";
import type { TaskRecord, TaskRegistrySummary, TaskStatus } from "./task-registry.types.js";
const log = createSubsystemLogger("tasks/task-registry-maintenance");
const TASK_RECONCILE_GRACE_MS = 5 * 60_000;
const TASK_RETENTION_MS = 7 * 24 * 60 * 60_000;
const TASK_SWEEP_INTERVAL_MS = 60_000;
@@ -48,6 +56,13 @@ let configuredCronRuntimeAuthoritative = false;
type TaskRegistryMaintenanceRuntime = {
readAcpSessionEntry: typeof readAcpSessionEntry;
closeAcpSession?: (params: {
cfg: OpenClawConfig;
sessionKey: string;
reason: string;
}) => Promise<void>;
listSessionBindingsBySession?: ReturnType<typeof getSessionBindingService>["listBySession"];
unbindSessionBindings?: ReturnType<typeof getSessionBindingService>["unbind"];
loadSessionStore: typeof loadSessionStore;
resolveStorePath: typeof resolveStorePath;
isCronJobActive: typeof isCronJobActive;
@@ -71,6 +86,20 @@ type TaskRegistryMaintenanceRuntime = {
const defaultTaskRegistryMaintenanceRuntime: TaskRegistryMaintenanceRuntime = {
readAcpSessionEntry,
closeAcpSession: async ({ cfg, sessionKey, reason }) => {
await getAcpSessionManager().closeSession({
cfg,
sessionKey,
reason,
discardPersistentState: true,
clearMeta: true,
allowBackendUnavailable: true,
requireAcpSession: false,
});
},
listSessionBindingsBySession: (sessionKey) =>
getSessionBindingService().listBySession(sessionKey),
unbindSessionBindings: (input) => getSessionBindingService().unbind(input),
loadSessionStore,
resolveStorePath,
isCronJobActive,
@@ -374,6 +403,115 @@ function resolveCleanupAfter(task: TaskRecord): number {
return terminalAt + TASK_RETENTION_MS;
}
function getNormalizedTaskChildSessionKey(task: TaskRecord): string | undefined {
return normalizeOptionalString(task.childSessionKey);
}
function isSameTaskChildSession(a: TaskRecord, b: TaskRecord): boolean {
const left = getNormalizedTaskChildSessionKey(a);
return Boolean(left && left === getNormalizedTaskChildSessionKey(b));
}
function hasActiveTaskForChildSession(task: TaskRecord, tasks: TaskRecord[]): boolean {
return tasks.some(
(candidate) =>
candidate.taskId !== task.taskId &&
isActiveTask(candidate) &&
isSameTaskChildSession(task, candidate),
);
}
function isParentOwnedAcpSessionTask(
task: TaskRecord,
acpEntry: ReturnType<typeof readAcpSessionEntry>,
): boolean {
const entry = acpEntry?.entry;
if (!entry) {
return false;
}
const ownerKey = normalizeOptionalString(task.ownerKey);
const requesterKey = normalizeOptionalString(task.requesterSessionKey);
const parentKeys = [
normalizeOptionalString(entry.spawnedBy),
normalizeOptionalString(entry.parentSessionKey),
].filter((value): value is string => Boolean(value));
return parentKeys.some((parentKey) => parentKey === ownerKey || parentKey === requesterKey);
}
function hasActiveSessionBinding(sessionKey: string): boolean {
const listBindings = taskRegistryMaintenanceRuntime.listSessionBindingsBySession;
if (!listBindings) {
return true;
}
try {
return listBindings(sessionKey).some((binding) => binding.status !== "ended");
} catch {
return true;
}
}
function shouldCloseTerminalAcpSession(task: TaskRecord, tasks: TaskRecord[]): boolean {
if (task.runtime !== "acp" || isActiveTask(task)) {
return false;
}
const sessionKey = getNormalizedTaskChildSessionKey(task);
if (!sessionKey || hasActiveTaskForChildSession(task, tasks)) {
return false;
}
const acpEntry = taskRegistryMaintenanceRuntime.readAcpSessionEntry({ sessionKey });
if (!acpEntry || acpEntry.storeReadFailed || !acpEntry.acp) {
return false;
}
if (!isParentOwnedAcpSessionTask(task, acpEntry)) {
return false;
}
if (acpEntry.acp.mode === "oneshot") {
return true;
}
return !hasActiveSessionBinding(sessionKey);
}
async function cleanupTerminalAcpSession(task: TaskRecord, tasks: TaskRecord[]): Promise<void> {
if (!shouldCloseTerminalAcpSession(task, tasks)) {
return;
}
const sessionKey = getNormalizedTaskChildSessionKey(task);
if (!sessionKey) {
return;
}
const acpEntry = taskRegistryMaintenanceRuntime.readAcpSessionEntry({ sessionKey });
const closeAcpSession = taskRegistryMaintenanceRuntime.closeAcpSession;
if (!acpEntry || !closeAcpSession) {
return;
}
try {
await closeAcpSession({
cfg: acpEntry.cfg,
sessionKey,
reason: "terminal-task-cleanup",
});
} catch (error) {
log.warn("Failed to close terminal ACP session during task maintenance", {
sessionKey,
taskId: task.taskId,
error,
});
return;
}
try {
await taskRegistryMaintenanceRuntime.unbindSessionBindings?.({
targetSessionKey: sessionKey,
reason: "terminal-task-cleanup",
});
} catch (error) {
log.warn("Failed to unbind terminal ACP session during task maintenance", {
sessionKey,
taskId: task.taskId,
error,
});
}
}
function markTaskLost(task: TaskRecord, now: number): TaskRecord {
const cleanupAfter = task.cleanupAfter ?? projectTaskLost(task, now).cleanupAfter;
const updated =
@@ -590,6 +728,7 @@ export async function runTaskRegistryMaintenance(): Promise<TaskRegistryMaintena
}
continue;
}
await cleanupTerminalAcpSession(current, taskRegistryMaintenanceRuntime.listTaskRecords());
if (
shouldPruneTerminalTask(current, now) &&
taskRegistryMaintenanceRuntime.deleteTaskRecordById(current.taskId)

View File

@@ -11,6 +11,7 @@ import {
hasPendingHeartbeatWake,
resetHeartbeatWakeStateForTests,
} from "../infra/heartbeat-wake.js";
import type { SessionBindingRecord } from "../infra/outbound/session-binding-service.js";
import { peekSystemEvents, resetSystemEventsForTest } from "../infra/system-events.js";
import type { ParsedAgentSessionKey } from "../routing/session-key.js";
import { withTempDir } from "../test-helpers/temp-dir.js";
@@ -89,6 +90,18 @@ vi.mock("../utils/message-channel.js", () => ({
function configureTaskRegistryMaintenanceRuntimeForTest(params: {
currentTasks: Map<string, ReturnType<typeof createTaskRecord>>;
snapshotTasks: ReturnType<typeof createTaskRecord>[];
acpEntry?: AcpSessionStoreEntry;
sessionBindings?: SessionBindingRecord[];
closeAcpSession?: (params: {
cfg: AcpSessionStoreEntry["cfg"];
sessionKey: string;
reason: string;
}) => Promise<void>;
unbindSessionBindings?: (params: {
targetSessionKey?: string;
bindingId?: string;
reason: string;
}) => Promise<SessionBindingRecord[]>;
}): void {
const emptyAcpEntry = {
cfg: {} as never,
@@ -99,7 +112,10 @@ function configureTaskRegistryMaintenanceRuntimeForTest(params: {
storeReadFailed: false,
} satisfies AcpSessionStoreEntry;
setTaskRegistryMaintenanceRuntimeForTests({
readAcpSessionEntry: () => emptyAcpEntry,
readAcpSessionEntry: () => params.acpEntry ?? emptyAcpEntry,
listSessionBindingsBySession: () => params.sessionBindings ?? [],
closeAcpSession: params.closeAcpSession,
unbindSessionBindings: params.unbindSessionBindings,
loadSessionStore: () => ({}),
resolveStorePath: () => "",
parseAgentSessionKey: () => null as ParsedAgentSessionKey | null,
@@ -154,6 +170,54 @@ function configureTaskRegistryMaintenanceRuntimeForTest(params: {
});
}
function createSessionBindingRecord(
overrides: Partial<SessionBindingRecord> & Pick<SessionBindingRecord, "targetSessionKey">,
): SessionBindingRecord {
return {
bindingId: overrides.bindingId ?? "binding-1",
targetSessionKey: overrides.targetSessionKey,
targetKind: overrides.targetKind ?? "session",
conversation: overrides.conversation ?? {
channel: "telegram",
accountId: "default",
conversationId: "telegram:thread:1",
},
status: overrides.status ?? "active",
boundAt: overrides.boundAt ?? Date.now(),
...(overrides.expiresAt !== undefined ? { expiresAt: overrides.expiresAt } : {}),
...(overrides.metadata !== undefined ? { metadata: overrides.metadata } : {}),
};
}
function createAcpSessionStoreEntry(params: {
sessionKey: string;
parentSessionKey: string;
mode: "persistent" | "oneshot";
}): AcpSessionStoreEntry {
const acp = {
backend: "acpx",
agent: "claude",
runtimeSessionName: `${params.sessionKey}:runtime`,
mode: params.mode,
state: "idle",
lastActivityAt: Date.now(),
} as const;
return {
cfg: {} as never,
storePath: "/tmp/openclaw-test-sessions.json",
sessionKey: params.sessionKey,
storeSessionKey: params.sessionKey,
entry: {
sessionId: `${params.sessionKey}:session`,
updatedAt: Date.now(),
spawnedBy: params.parentSessionKey,
acp,
},
acp,
storeReadFailed: false,
};
}
async function waitForAssertion(assertion: () => void, timeoutMs = 2_000, stepMs = 5) {
const startedAt = Date.now();
for (;;) {
@@ -1506,6 +1570,162 @@ describe("task-registry", () => {
});
});
it("closes terminal parent-owned one-shot ACP sessions during maintenance", async () => {
await withTaskRegistryTempDir(async (root) => {
process.env.OPENCLAW_STATE_DIR = root;
resetTaskRegistryForTests();
const now = Date.now();
const parentSessionKey = "agent:main:telegram:direct:owner";
const childSessionKey = "agent:claude:acp:stale-oneshot";
const task = createTaskRecord({
runtime: "acp",
ownerKey: parentSessionKey,
requesterSessionKey: parentSessionKey,
scopeKind: "session",
childSessionKey,
runId: "run-terminal-acp-oneshot",
task: "Old ACP task",
status: "succeeded",
deliveryStatus: "delivered",
});
setTaskTimingById({
taskId: task.taskId,
endedAt: now - 60_000,
lastEventAt: now - 60_000,
});
const current = getTaskById(task.taskId)!;
const closeAcpSession = vi.fn().mockResolvedValue(undefined);
const unbindSessionBindings = vi.fn().mockResolvedValue([]);
configureTaskRegistryMaintenanceRuntimeForTest({
currentTasks: new Map([[task.taskId, current]]),
snapshotTasks: [current],
acpEntry: createAcpSessionStoreEntry({
sessionKey: childSessionKey,
parentSessionKey,
mode: "oneshot",
}),
closeAcpSession,
unbindSessionBindings,
});
expect(await runTaskRegistryMaintenance()).toMatchObject({
reconciled: 0,
recovered: 0,
pruned: 0,
});
expect(closeAcpSession).toHaveBeenCalledWith({
cfg: {},
sessionKey: childSessionKey,
reason: "terminal-task-cleanup",
});
expect(unbindSessionBindings).toHaveBeenCalledWith({
targetSessionKey: childSessionKey,
reason: "terminal-task-cleanup",
});
});
});
it("closes stale terminal persistent ACP sessions only when no binding remains", async () => {
await withTaskRegistryTempDir(async (root) => {
process.env.OPENCLAW_STATE_DIR = root;
resetTaskRegistryForTests();
const now = Date.now();
const parentSessionKey = "agent:main:telegram:direct:owner";
const childSessionKey = "agent:claude:acp:stale-persistent";
const task = createTaskRecord({
runtime: "acp",
ownerKey: parentSessionKey,
requesterSessionKey: parentSessionKey,
scopeKind: "session",
childSessionKey,
runId: "run-terminal-acp-persistent",
task: "Old persistent ACP task",
status: "failed",
deliveryStatus: "failed",
});
setTaskTimingById({
taskId: task.taskId,
endedAt: now - 60_000,
lastEventAt: now - 60_000,
});
const current = getTaskById(task.taskId)!;
const closeAcpSession = vi.fn().mockResolvedValue(undefined);
const unbindSessionBindings = vi.fn().mockResolvedValue([]);
configureTaskRegistryMaintenanceRuntimeForTest({
currentTasks: new Map([[task.taskId, current]]),
snapshotTasks: [current],
acpEntry: createAcpSessionStoreEntry({
sessionKey: childSessionKey,
parentSessionKey,
mode: "persistent",
}),
closeAcpSession,
unbindSessionBindings,
});
await runTaskRegistryMaintenance();
expect(closeAcpSession).toHaveBeenCalledWith({
cfg: {},
sessionKey: childSessionKey,
reason: "terminal-task-cleanup",
});
expect(unbindSessionBindings).toHaveBeenCalledWith({
targetSessionKey: childSessionKey,
reason: "terminal-task-cleanup",
});
});
});
it("keeps terminal persistent ACP sessions that still have an active binding", async () => {
await withTaskRegistryTempDir(async (root) => {
process.env.OPENCLAW_STATE_DIR = root;
resetTaskRegistryForTests();
const now = Date.now();
const parentSessionKey = "agent:main:telegram:direct:owner";
const childSessionKey = "agent:claude:acp:bound-persistent";
const task = createTaskRecord({
runtime: "acp",
ownerKey: parentSessionKey,
requesterSessionKey: parentSessionKey,
scopeKind: "session",
childSessionKey,
runId: "run-terminal-acp-bound",
task: "Thread-bound ACP session",
status: "succeeded",
deliveryStatus: "delivered",
});
setTaskTimingById({
taskId: task.taskId,
endedAt: now - 60_000,
lastEventAt: now - 60_000,
});
const current = getTaskById(task.taskId)!;
const closeAcpSession = vi.fn().mockResolvedValue(undefined);
const unbindSessionBindings = vi.fn().mockResolvedValue([]);
configureTaskRegistryMaintenanceRuntimeForTest({
currentTasks: new Map([[task.taskId, current]]),
snapshotTasks: [current],
acpEntry: createAcpSessionStoreEntry({
sessionKey: childSessionKey,
parentSessionKey,
mode: "persistent",
}),
sessionBindings: [createSessionBindingRecord({ targetSessionKey: childSessionKey })],
closeAcpSession,
unbindSessionBindings,
});
await runTaskRegistryMaintenance();
expect(closeAcpSession).not.toHaveBeenCalled();
expect(unbindSessionBindings).not.toHaveBeenCalled();
});
});
it("prunes old terminal tasks during maintenance sweeps", async () => {
await withTaskRegistryTempDir(async (root) => {
process.env.OPENCLAW_STATE_DIR = root;