fix(agents): persist compaction token snapshots

This commit is contained in:
Peter Steinberger
2026-04-27 14:58:01 +01:00
parent f9946eb069
commit f3e8a8a319
19 changed files with 275 additions and 11 deletions

View File

@@ -45,6 +45,7 @@ Docs: https://docs.openclaw.ai
- Agents/tools: scope tool-loop detection history to the active run when available, so scheduled heartbeat cycles no longer inherit stale repeated-call counts from previous runs. Fixes #40144. Thanks @mattbrown319.
- Agents/subagents: preserve requester delivery for completion announces when a child agent is bound to a different channel account while keeping same-channel thread completions routed to the child thread. Thanks @sfuminya.
- Agents/subagents: fail closed instead of selecting a single child thread binding when completion delivery lacks requester conversation signal. Thanks @suyua9.
- Agents/status: persist the post-compaction token estimate from auto-compaction when providers omit usage metadata, so `/status` and session lists keep showing fresh context usage after compaction. Fixes #67667; carries forward #72822. Thanks @Jimmy-xuzimo and @skylight-9.
- Control UI: show loading, reload, and retry states when a lazy dashboard panel cannot load after an upgrade, so the Logs tab no longer appears blank on stale browser bundles. Fixes #72450. Thanks @sobergou.
- Gateway/plugins: start the Gateway in degraded mode when a single plugin entry has invalid schema config, and let `openclaw doctor --fix` quarantine that plugin config instead of crash-looping every channel. Fixes #62976 and #70371. Thanks @Doraemon-Claw and @pksidekyk.
- Agents/plugins: skip malformed plugin tools with missing schema objects and report plugin diagnostics, so one broken tool no longer crashes Anthropic agent runs. Fixes #69423. Thanks @jmnickels.

View File

@@ -564,6 +564,95 @@ describe("updateSessionStoreAfterAgentRun", () => {
});
});
it("persists compaction tokensAfter when provider usage is unavailable", async () => {
await withTempSessionStore(async ({ storePath }) => {
const cfg = {} as OpenClawConfig;
const sessionKey = "agent:main:explicit:test-compaction-tokens-after";
const sessionId = "test-compaction-tokens-after-session";
const sessionStore: Record<string, SessionEntry> = {
[sessionKey]: {
sessionId,
updatedAt: 1,
},
};
await fs.writeFile(storePath, JSON.stringify(sessionStore, null, 2));
const result: EmbeddedPiRunResult = {
meta: {
durationMs: 500,
agentMeta: {
sessionId,
provider: "minimax",
model: "MiniMax-M2.7",
compactionCount: 1,
compactionTokensAfter: 21_225,
},
},
};
await updateSessionStoreAfterAgentRun({
cfg,
sessionId,
sessionKey,
storePath,
sessionStore,
defaultProvider: "minimax",
defaultModel: "MiniMax-M2.7",
result,
});
expect(sessionStore[sessionKey]?.totalTokens).toBe(21_225);
expect(sessionStore[sessionKey]?.totalTokensFresh).toBe(true);
expect(sessionStore[sessionKey]?.compactionCount).toBe(1);
const persisted = loadSessionStore(storePath);
expect(persisted[sessionKey]?.totalTokens).toBe(21_225);
expect(persisted[sessionKey]?.totalTokensFresh).toBe(true);
});
});
it("ignores non-finite compaction tokensAfter values", async () => {
await withTempSessionStore(async ({ storePath }) => {
const cfg = {} as OpenClawConfig;
const sessionKey = "agent:main:explicit:test-compaction-tokens-after-invalid";
const sessionId = "test-compaction-tokens-after-invalid-session";
const sessionStore: Record<string, SessionEntry> = {
[sessionKey]: {
sessionId,
updatedAt: 1,
totalTokens: 12_000,
totalTokensFresh: true,
},
};
await fs.writeFile(storePath, JSON.stringify(sessionStore, null, 2));
await updateSessionStoreAfterAgentRun({
cfg,
sessionId,
sessionKey,
storePath,
sessionStore,
defaultProvider: "minimax",
defaultModel: "MiniMax-M2.7",
result: {
meta: {
durationMs: 500,
agentMeta: {
sessionId,
provider: "minimax",
model: "MiniMax-M2.7",
compactionCount: 1,
compactionTokensAfter: Number.POSITIVE_INFINITY,
},
},
},
});
expect(sessionStore[sessionKey]?.totalTokens).toBe(12_000);
expect(sessionStore[sessionKey]?.totalTokensFresh).toBe(false);
});
});
it("snapshots cost instead of accumulating (fixes #69347)", async () => {
await withTempSessionStore(async ({ storePath }) => {
const cfg = {

View File

@@ -68,6 +68,12 @@ export async function updateSessionStoreAfterAgentRun(params: {
const usage = result.meta.agentMeta?.usage;
const promptTokens = result.meta.agentMeta?.promptTokens;
const compactionTokensAfter =
typeof result.meta.agentMeta?.compactionTokensAfter === "number" &&
Number.isFinite(result.meta.agentMeta.compactionTokensAfter) &&
result.meta.agentMeta.compactionTokensAfter > 0
? Math.floor(result.meta.agentMeta.compactionTokensAfter)
: undefined;
const compactionsThisRun = Math.max(0, result.meta.agentMeta?.compactionCount ?? 0);
const modelUsed = result.meta.agentMeta?.model ?? fallbackModel ?? defaultModel;
const providerUsed = result.meta.agentMeta?.provider ?? fallbackProvider ?? defaultProvider;
@@ -147,6 +153,9 @@ export async function updateSessionStoreAfterAgentRun(params: {
if (typeof totalTokens === "number" && Number.isFinite(totalTokens) && totalTokens > 0) {
next.totalTokens = totalTokens;
next.totalTokensFresh = true;
} else if (compactionTokensAfter !== undefined) {
next.totalTokens = compactionTokensAfter;
next.totalTokensFresh = true;
} else {
next.totalTokens = undefined;
next.totalTokensFresh = false;
@@ -159,6 +168,9 @@ export async function updateSessionStoreAfterAgentRun(params: {
if (runEstimatedCostUsd !== undefined) {
next.estimatedCostUsd = runEstimatedCostUsd;
}
} else if (compactionTokensAfter !== undefined) {
next.totalTokens = compactionTokensAfter;
next.totalTokensFresh = true;
} else if (
typeof entry.totalTokens === "number" &&
Number.isFinite(entry.totalTokens) &&

View File

@@ -448,7 +448,7 @@ describe("runEmbeddedPiAgent overflow compaction trigger routing", () => {
}),
);
await runEmbeddedPiAgent(overflowBaseRunParams);
const result = await runEmbeddedPiAgent(overflowBaseRunParams);
expect(mockedCompactDirect).toHaveBeenCalledWith(
expect.objectContaining({
@@ -469,6 +469,7 @@ describe("runEmbeddedPiAgent overflow compaction trigger routing", () => {
}),
}),
);
expect(result.meta.agentMeta?.compactionTokensAfter).toBe(80_000);
});
it("passes observed overflow token counts into compaction when providers report them", async () => {

View File

@@ -100,6 +100,7 @@ describe("timeout-triggered compaction", () => {
);
expect(mockedRunEmbeddedAttempt).toHaveBeenCalledTimes(2);
expect(result.meta.error).toBeUndefined();
expect(result.meta.agentMeta?.compactionTokensAfter).toBe(80_000);
});
it("retries the prompt after successful timeout compaction", async () => {

View File

@@ -639,6 +639,7 @@ export async function runEmbeddedPiAgent(
const usageAccumulator = createUsageAccumulator();
let lastRunPromptUsage: ReturnType<typeof normalizeUsage> | undefined;
let autoCompactionCount = 0;
let lastCompactionTokensAfter: number | undefined;
let runLoopIterations = 0;
let overloadProfileRotations = 0;
let planningOnlyRetryAttempts = 0;
@@ -1033,6 +1034,13 @@ export async function runEmbeddedPiAgent(
lastTurnTotal = lastAssistantUsage?.total ?? attemptUsage?.total;
const attemptCompactionCount = Math.max(0, attempt.compactionCount ?? 0);
autoCompactionCount += attemptCompactionCount;
if (
typeof attempt.compactionTokensAfter === "number" &&
Number.isFinite(attempt.compactionTokensAfter) &&
attempt.compactionTokensAfter > 0
) {
lastCompactionTokensAfter = Math.floor(attempt.compactionTokensAfter);
}
const activeErrorContext = resolveActiveErrorContext({
provider,
model: modelId,
@@ -1180,6 +1188,13 @@ export async function runEmbeddedPiAgent(
await runOwnsCompactionAfterHook("timeout recovery", timeoutCompactResult);
if (timeoutCompactResult.compacted) {
autoCompactionCount += 1;
if (
typeof timeoutCompactResult.result?.tokensAfter === "number" &&
Number.isFinite(timeoutCompactResult.result.tokensAfter) &&
timeoutCompactResult.result.tokensAfter > 0
) {
lastCompactionTokensAfter = Math.floor(timeoutCompactResult.result.tokensAfter);
}
if (contextEngine.info.ownsCompaction === true) {
await runPostCompactionSideEffects({
config: params.config,
@@ -1339,6 +1354,13 @@ export async function runEmbeddedPiAgent(
await runOwnsCompactionAfterHook("overflow recovery", compactResult);
if (compactResult.compacted) {
adoptCompactionTranscript(compactResult);
if (
typeof compactResult.result?.tokensAfter === "number" &&
Number.isFinite(compactResult.result.tokensAfter) &&
compactResult.result.tokensAfter > 0
) {
lastCompactionTokensAfter = Math.floor(compactResult.result.tokensAfter);
}
if (preflightRecovery?.route === "compact_then_truncate") {
const truncResult = await truncateOversizedToolResultsInSession({
sessionFile: activeSessionFile,
@@ -1856,6 +1878,7 @@ export async function runEmbeddedPiAgent(
lastCallUsage: usageMeta.lastCallUsage,
promptTokens: usageMeta.promptTokens,
compactionCount: autoCompactionCount > 0 ? autoCompactionCount : undefined,
compactionTokensAfter: lastCompactionTokensAfter,
};
const finalAssistantVisibleText = resolveFinalAssistantVisibleText(sessionLastAssistant);
const finalAssistantRawText = resolveFinalAssistantRawText(sessionLastAssistant);

View File

@@ -101,6 +101,7 @@ export function createSubscriptionMock(): SubscriptionMock {
getLastToolError: () => undefined,
getUsageTotals: () => undefined,
getCompactionCount: () => 0,
getLastCompactionTokensAfter: () => undefined,
getItemLifecycle: () => ({ startedCount: 0, completedCount: 0, activeCount: 0 }),
isCompacting: () => false,
isCompactionInFlight: () => false,

View File

@@ -2093,6 +2093,7 @@ export async function runEmbeddedAttempt(
setTerminalLifecycleMeta,
getUsageTotals,
getCompactionCount,
getLastCompactionTokensAfter,
} = subscription;
const queueHandle: EmbeddedPiQueueHandle & {
@@ -3195,6 +3196,7 @@ export async function runEmbeddedAttempt(
attemptUsage,
promptCache,
compactionCount: getCompactionCount(),
compactionTokensAfter: getLastCompactionTokensAfter(),
// Client tool call detected (OpenResponses hosted tools)
clientToolCall: clientToolCallDetected ?? undefined,
yieldDetected: yieldDetected || undefined,

View File

@@ -102,6 +102,7 @@ export type EmbeddedRunAttemptResult = {
attemptUsage?: NormalizedUsage;
promptCache?: ContextEnginePromptCacheInfo;
compactionCount?: number;
compactionTokensAfter?: number;
/** Client tool call detected (OpenResponses hosted tools). */
clientToolCall?: { name: string; params: Record<string, unknown> };
/** True when sessions_yield tool was called during this attempt. */

View File

@@ -11,6 +11,12 @@ export type EmbeddedPiAgentMeta = {
agentHarnessId?: string;
cliSessionBinding?: CliSessionBinding;
compactionCount?: number;
/**
* Token count estimate after the most recent successful auto-compaction.
* Used as the freshest context snapshot when the follow-up model call omits
* usage metadata.
*/
compactionTokensAfter?: number;
/**
* Prompt/context snapshot from the latest model request. Prefer this for
* context-window utilization because provider usage totals can include cached

View File

@@ -53,6 +53,11 @@ export function handleCompactionEnd(
const wasAborted = Boolean(evt.aborted);
if (hasResult && !wasAborted) {
ctx.incrementCompactionCount();
const tokensAfter =
typeof evt.result === "object" && evt.result
? (evt.result as { tokensAfter?: unknown }).tokensAfter
: undefined;
ctx.noteCompactionTokensAfter(tokensAfter);
const observedCompactionCount = ctx.getCompactionCount();
void reconcileSessionStoreCompactionCountAfterSuccess({
sessionKey: ctx.params.sessionKey,

View File

@@ -64,6 +64,7 @@ export type EmbeddedPiSubscribeState = {
assistantUsageCommitted: boolean;
compactionInFlight: boolean;
lastCompactionTokensAfter?: number;
pendingCompactionRetry: number;
compactionRetryResolve?: () => void;
compactionRetryReject?: (reason?: unknown) => void;
@@ -137,8 +138,10 @@ export type EmbeddedPiSubscribeContext = {
recordAssistantUsage: (usage: unknown) => void;
commitAssistantUsage: () => void;
incrementCompactionCount: () => void;
noteCompactionTokensAfter: (value: unknown) => void;
getUsageTotals: () => NormalizedUsage | undefined;
getCompactionCount: () => number;
getLastCompactionTokensAfter: () => number | undefined;
emitBlockReply: (payload: BlockReplyPayload) => void;
};

View File

@@ -39,12 +39,22 @@ describe("subscribeEmbeddedPiSession", () => {
expect(subscription.getCompactionCount()).toBe(0);
// willRetry with result — counter IS incremented (overflow compaction succeeded)
emit({ type: "compaction_end", willRetry: true, result: { summary: "s" } });
emit({
type: "compaction_end",
willRetry: true,
result: { summary: "s", tokensAfter: 12_345 },
});
expect(subscription.getCompactionCount()).toBe(1);
expect(subscription.getLastCompactionTokensAfter()).toBe(12_345);
// willRetry=false with result — counter incremented again
emit({ type: "compaction_end", willRetry: false, result: { summary: "s2" } });
emit({
type: "compaction_end",
willRetry: false,
result: { summary: "s2", tokensAfter: 6_789 },
});
expect(subscription.getCompactionCount()).toBe(2);
expect(subscription.getLastCompactionTokensAfter()).toBe(6_789);
});
it("does not count compaction when result is absent", async () => {

View File

@@ -119,6 +119,7 @@ export function subscribeEmbeddedPiSession(params: SubscribeEmbeddedPiSessionPar
pendingAssistantUsage: undefined,
assistantUsageCommitted: false,
compactionInFlight: false,
lastCompactionTokensAfter: undefined,
pendingCompactionRetry: 0,
compactionRetryResolve: undefined,
compactionRetryReject: undefined,
@@ -444,6 +445,12 @@ export function subscribeEmbeddedPiSession(params: SubscribeEmbeddedPiSessionPar
const incrementCompactionCount = () => {
compactionCount += 1;
};
const noteCompactionTokensAfter = (value: unknown) => {
if (typeof value !== "number" || !Number.isFinite(value) || value <= 0) {
return;
}
state.lastCompactionTokensAfter = Math.floor(value);
};
const blockChunking = params.blockReplyChunking;
const blockChunker = blockChunking ? new EmbeddedBlockChunker(blockChunking) : null;
@@ -826,8 +833,10 @@ export function subscribeEmbeddedPiSession(params: SubscribeEmbeddedPiSessionPar
recordAssistantUsage,
commitAssistantUsage,
incrementCompactionCount,
noteCompactionTokensAfter,
getUsageTotals,
getCompactionCount: () => compactionCount,
getLastCompactionTokensAfter: () => state.lastCompactionTokensAfter,
};
const sessionUnsubscribe = params.session.subscribe(createEmbeddedPiSessionEventHandler(ctx));
@@ -896,6 +905,7 @@ export function subscribeEmbeddedPiSession(params: SubscribeEmbeddedPiSessionPar
getLastToolError: () => (state.lastToolError ? { ...state.lastToolError } : undefined),
getUsageTotals,
getCompactionCount: () => compactionCount,
getLastCompactionTokensAfter: () => state.lastCompactionTokensAfter,
getItemLifecycle: () => ({
startedCount: state.itemStartedCount,
completedCount: state.itemCompletedCount,

View File

@@ -1534,6 +1534,7 @@ export async function runReplyAgent(params: {
sessionKey,
storePath,
amount: autoCompactionCount,
compactionTokensAfter: runResult.meta?.agentMeta?.compactionTokensAfter,
lastCallUsage: runResult.meta?.agentMeta?.lastCallUsage,
contextTokensUsed,
newSessionId: runResult.meta?.agentMeta?.sessionId,

View File

@@ -452,6 +452,7 @@ export function createFollowupRunner(params: {
sessionKey,
storePath,
amount: autoCompactionCount,
compactionTokensAfter: runResult.meta?.agentMeta?.compactionTokensAfter,
lastCallUsage: runResult.meta?.agentMeta?.lastCallUsage,
contextTokensUsed,
newSessionId: runResult.meta?.agentMeta?.sessionId,

View File

@@ -20,6 +20,7 @@ import {
shouldRunPreflightCompaction,
} from "./memory-flush.js";
import { CURRENT_MESSAGE_MARKER } from "./mentions.js";
import { incrementRunCompactionCount } from "./session-run-accounting.js";
import { incrementCompactionCount } from "./session-updates.js";
const tempDirs: string[] = [];
@@ -458,6 +459,86 @@ describe("incrementCompactionCount", () => {
expect(stored[sessionKey].outputTokens).toBeUndefined();
});
it("prefers explicit compactionTokensAfter over last-call usage for run accounting", async () => {
const entry = {
sessionId: "s1",
updatedAt: Date.now(),
compactionCount: 0,
totalTokens: 180_000,
} as SessionEntry;
const { storePath, sessionKey, sessionStore } = await createCompactionSessionFixture(entry);
await incrementRunCompactionCount({
sessionEntry: entry,
sessionStore,
sessionKey,
storePath,
compactionTokensAfter: 12_000,
lastCallUsage: {
input: 90_000,
output: 1_000,
total: 91_000,
},
contextTokensUsed: 200_000,
});
const stored = JSON.parse(await fs.readFile(storePath, "utf-8"));
expect(stored[sessionKey].totalTokens).toBe(12_000);
expect(stored[sessionKey].totalTokensFresh).toBe(true);
});
it("falls back to last-call usage when run compactionTokensAfter is non-finite", async () => {
const entry = {
sessionId: "s1",
updatedAt: Date.now(),
compactionCount: 0,
totalTokens: 180_000,
} as SessionEntry;
const { storePath, sessionKey, sessionStore } = await createCompactionSessionFixture(entry);
await incrementRunCompactionCount({
sessionEntry: entry,
sessionStore,
sessionKey,
storePath,
compactionTokensAfter: Number.POSITIVE_INFINITY,
lastCallUsage: {
input: 90_000,
output: 1_000,
total: 91_000,
},
contextTokensUsed: 200_000,
});
const stored = JSON.parse(await fs.readFile(storePath, "utf-8"));
expect(stored[sessionKey].totalTokens).toBe(90_000);
expect(stored[sessionKey].totalTokensFresh).toBe(true);
});
it("ignores non-finite tokensAfter values", async () => {
const entry = {
sessionId: "s1",
updatedAt: Date.now(),
compactionCount: 0,
totalTokens: 180_000,
totalTokensFresh: true,
} as SessionEntry;
const { storePath, sessionKey, sessionStore } = await createCompactionSessionFixture(entry);
await incrementCompactionCount({
sessionEntry: entry,
sessionStore,
sessionKey,
storePath,
tokensAfter: Number.POSITIVE_INFINITY,
});
const stored = JSON.parse(await fs.readFile(storePath, "utf-8"));
expect(stored[sessionKey].compactionCount).toBe(1);
expect(stored[sessionKey].totalTokens).toBe(180_000);
expect(stored[sessionKey].totalTokensFresh).toBe(true);
});
it("updates sessionId and sessionFile when compaction rotated transcripts", async () => {
const { stored, sessionKey, expectedDir } = await rotateCompactionSessionFile({
tempPrefix: "openclaw-compact-rotate-",

View File

@@ -11,12 +11,19 @@ type IncrementRunCompactionCountParams = Omit<
> & {
amount?: number;
cfg?: OpenClawConfig;
compactionTokensAfter?: number;
lastCallUsage?: NormalizedUsage;
contextTokensUsed?: number;
newSessionId?: string;
newSessionFile?: string;
};
function resolvePositiveTokenCount(value: number | undefined): number | undefined {
return typeof value === "number" && Number.isFinite(value) && value > 0
? Math.floor(value)
: undefined;
}
export async function persistRunSessionUsage(params: PersistRunSessionUsageParams): Promise<void> {
await persistSessionUsageUpdate(params);
}
@@ -24,12 +31,14 @@ export async function persistRunSessionUsage(params: PersistRunSessionUsageParam
export async function incrementRunCompactionCount(
params: IncrementRunCompactionCountParams,
): Promise<number | undefined> {
const tokensAfterCompaction = params.lastCallUsage
? deriveSessionTotalTokens({
usage: params.lastCallUsage,
contextTokens: params.contextTokensUsed,
})
: undefined;
const tokensAfterCompaction =
resolvePositiveTokenCount(params.compactionTokensAfter) ??
(params.lastCallUsage
? deriveSessionTotalTokens({
usage: params.lastCallUsage,
contextTokens: params.contextTokensUsed,
})
: undefined);
return incrementCompactionCount({
sessionEntry: params.sessionEntry,
sessionStore: params.sessionStore,

View File

@@ -93,6 +93,12 @@ function emitCompactionSessionLifecycleHooks(params: {
}
}
function resolvePositiveTokenCount(value: number | undefined): number | undefined {
return typeof value === "number" && Number.isFinite(value) && value > 0
? Math.floor(value)
: undefined;
}
export async function ensureSkillSnapshot(params: {
sessionEntry?: SessionEntry;
sessionStore?: Record<string, SessionEntry>;
@@ -267,8 +273,9 @@ export async function incrementCompactionCount(params: {
updates.sessionFile = explicitNewSessionFile;
}
// If tokensAfter is provided, update the cached token counts to reflect post-compaction state
if (tokensAfter != null && tokensAfter > 0) {
updates.totalTokens = tokensAfter;
const tokensAfterCompaction = resolvePositiveTokenCount(tokensAfter);
if (tokensAfterCompaction !== undefined) {
updates.totalTokens = tokensAfterCompaction;
updates.totalTokensFresh = true;
// Clear input/output breakdown since we only have the total estimate after compaction
updates.inputTokens = undefined;