diff --git a/extensions/active-memory/index.test.ts b/extensions/active-memory/index.test.ts index 6fc0b9ee10d..803d40b2e7d 100644 --- a/extensions/active-memory/index.test.ts +++ b/extensions/active-memory/index.test.ts @@ -3086,4 +3086,147 @@ describe("active-memory plugin", () => { ), ).toMatchObject({ status: "ok", summary: "memory 1" }); }); + + it("skips recall after consecutive timeouts when circuit breaker trips (#74054)", async () => { + __testing.setMinimumTimeoutMsForTests(1); + __testing.setSetupGraceTimeoutMsForTests(0); + api.pluginConfig = { + agents: ["main"], + timeoutMs: 1, + logging: true, + circuitBreakerMaxTimeouts: 2, + circuitBreakerCooldownMs: 60_000, + }; + plugin.register(api as unknown as OpenClawPluginApi); + runEmbeddedPiAgent.mockImplementation(async () => await new Promise(() => {})); + + // First two calls should actually attempt the subagent (and timeout). + await hooks.before_prompt_build( + { prompt: "circuit breaker test 1", messages: [] }, + { + agentId: "main", + trigger: "user", + sessionKey: "agent:main:cb-test", + messageProvider: "webchat", + }, + ); + await hooks.before_prompt_build( + { prompt: "circuit breaker test 2", messages: [] }, + { + agentId: "main", + trigger: "user", + sessionKey: "agent:main:cb-test", + messageProvider: "webchat", + }, + ); + expect(runEmbeddedPiAgent).toHaveBeenCalledTimes(2); + + // Third call should be skipped by the circuit breaker. + await hooks.before_prompt_build( + { prompt: "circuit breaker test 3", messages: [] }, + { + agentId: "main", + trigger: "user", + sessionKey: "agent:main:cb-test", + messageProvider: "webchat", + }, + ); + // The subagent should NOT have been called a third time. + expect(runEmbeddedPiAgent).toHaveBeenCalledTimes(2); + + const infoLines = vi + .mocked(api.logger.info) + .mock.calls.map((call: unknown[]) => String(call[0])); + expect(infoLines.some((line: string) => line.includes("circuit breaker open"))).toBe(true); + }); + + it("resets circuit breaker after a successful recall", async () => { + __testing.setMinimumTimeoutMsForTests(1); + __testing.setSetupGraceTimeoutMsForTests(0); + api.pluginConfig = { + agents: ["main"], + timeoutMs: 1, + logging: true, + circuitBreakerMaxTimeouts: 1, + circuitBreakerCooldownMs: 60_000, + }; + plugin.register(api as unknown as OpenClawPluginApi); + + // First call: timeout (trips the breaker with max=1). + runEmbeddedPiAgent.mockImplementationOnce(async () => await new Promise(() => {})); + await hooks.before_prompt_build( + { prompt: "cb reset test timeout", messages: [] }, + { + agentId: "main", + trigger: "user", + sessionKey: "agent:main:cb-reset", + messageProvider: "webchat", + }, + ); + expect(runEmbeddedPiAgent).toHaveBeenCalledTimes(1); + + // Second call should be skipped by circuit breaker. + await hooks.before_prompt_build( + { prompt: "cb reset test skipped", messages: [] }, + { + agentId: "main", + trigger: "user", + sessionKey: "agent:main:cb-reset", + messageProvider: "webchat", + }, + ); + expect(runEmbeddedPiAgent).toHaveBeenCalledTimes(1); + + // Simulate cooldown expiry by manipulating the circuit breaker entry. + const cbKey = __testing.buildCircuitBreakerKey("main", "github-copilot", "gpt-5.4-mini"); + const entry = __testing.getCircuitBreakerEntry(cbKey); + if (entry) { + entry.lastTimeoutAt = Date.now() - 120_000; + } + + // Third call should go through (cooldown expired) and succeed. + runEmbeddedPiAgent.mockImplementationOnce(async () => ({ + payloads: [{ text: "- lemon pepper wings" }], + })); + await hooks.before_prompt_build( + { prompt: "cb reset test success", messages: [] }, + { + agentId: "main", + trigger: "user", + sessionKey: "agent:main:cb-reset", + messageProvider: "webchat", + }, + ); + expect(runEmbeddedPiAgent).toHaveBeenCalledTimes(2); + + // Fourth call should also go through since the breaker was reset on success. + runEmbeddedPiAgent.mockImplementationOnce(async () => ({ + payloads: [{ text: "- buffalo wings" }], + })); + await hooks.before_prompt_build( + { prompt: "cb reset test still ok", messages: [] }, + { + agentId: "main", + trigger: "user", + sessionKey: "agent:main:cb-reset", + messageProvider: "webchat", + }, + ); + expect(runEmbeddedPiAgent).toHaveBeenCalledTimes(3); + }); + + it("normalizes circuit breaker config with defaults", () => { + const config = __testing.normalizePluginConfig({}); + expect(config.circuitBreakerMaxTimeouts).toBe(3); + expect(config.circuitBreakerCooldownMs).toBe(60_000); + }); + + it("clamps circuit breaker config within valid ranges", () => { + const config = __testing.normalizePluginConfig({ + circuitBreakerMaxTimeouts: 0, + circuitBreakerCooldownMs: 1000, + }); + expect(config.circuitBreakerMaxTimeouts).toBe(1); + expect(config.circuitBreakerCooldownMs).toBe(5000); + }); }); diff --git a/extensions/active-memory/index.ts b/extensions/active-memory/index.ts index dc3b2f59fab..ee89f59cc48 100644 --- a/extensions/active-memory/index.ts +++ b/extensions/active-memory/index.ts @@ -39,6 +39,8 @@ const DEFAULT_SETUP_GRACE_TIMEOUT_MS = 30_000; const DEFAULT_QUERY_MODE = "recent" as const; const DEFAULT_QMD_SEARCH_MODE = "search" as const; const DEFAULT_TRANSCRIPT_DIR = "active-memory"; +const DEFAULT_CIRCUIT_BREAKER_MAX_TIMEOUTS = 3; +const DEFAULT_CIRCUIT_BREAKER_COOLDOWN_MS = 60_000; const TOGGLE_STATE_FILE = "session-toggles.json"; const DEFAULT_PARTIAL_TRANSCRIPT_MAX_CHARS = 32_000; const DEFAULT_TRANSCRIPT_READ_MAX_LINES = 2_000; @@ -97,6 +99,8 @@ type ActiveRecallPluginConfig = { recentAssistantChars?: number; logging?: boolean; cacheTtlMs?: number; + circuitBreakerMaxTimeouts?: number; + circuitBreakerCooldownMs?: number; persistTranscripts?: boolean; transcriptDir?: string; qmd?: { @@ -134,6 +138,8 @@ type ResolvedActiveRecallPluginConfig = { recentAssistantChars: number; logging: boolean; cacheTtlMs: number; + circuitBreakerMaxTimeouts: number; + circuitBreakerCooldownMs: number; persistTranscripts: boolean; transcriptDir: string; qmd: { @@ -278,6 +284,44 @@ const MAX_LOG_VALUE_CHARS = 300; const activeRecallCache = new Map(); +type CircuitBreakerEntry = { + consecutiveTimeouts: number; + lastTimeoutAt: number; +}; + +const timeoutCircuitBreaker = new Map(); + +function buildCircuitBreakerKey(agentId: string, provider?: string, model?: string): string { + return `${agentId}:${provider ?? "unknown"}/${model ?? "unknown"}`; +} + +function isCircuitBreakerOpen(key: string, maxTimeouts: number, cooldownMs: number): boolean { + const entry = timeoutCircuitBreaker.get(key); + if (!entry || entry.consecutiveTimeouts < maxTimeouts) { + return false; + } + if (Date.now() - entry.lastTimeoutAt >= cooldownMs) { + // Cooldown expired — reset and allow one attempt through. + timeoutCircuitBreaker.delete(key); + return false; + } + return true; +} + +function recordCircuitBreakerTimeout(key: string): void { + const entry = timeoutCircuitBreaker.get(key); + if (entry) { + entry.consecutiveTimeouts++; + entry.lastTimeoutAt = Date.now(); + } else { + timeoutCircuitBreaker.set(key, { consecutiveTimeouts: 1, lastTimeoutAt: Date.now() }); + } +} + +function resetCircuitBreaker(key: string): void { + timeoutCircuitBreaker.delete(key); +} + function parseOptionalPositiveInt(value: unknown, fallback: number): number { const parsed = typeof value === "number" @@ -718,6 +762,18 @@ function normalizePluginConfig(pluginConfig: unknown): ResolvedActiveRecallPlugi ), logging: raw.logging === true, cacheTtlMs: clampInt(raw.cacheTtlMs, DEFAULT_CACHE_TTL_MS, 1000, 120_000), + circuitBreakerMaxTimeouts: clampInt( + raw.circuitBreakerMaxTimeouts, + DEFAULT_CIRCUIT_BREAKER_MAX_TIMEOUTS, + 1, + 20, + ), + circuitBreakerCooldownMs: clampInt( + raw.circuitBreakerCooldownMs, + DEFAULT_CIRCUIT_BREAKER_COOLDOWN_MS, + 5000, + 600_000, + ), persistTranscripts: raw.persistTranscripts === true, transcriptDir: normalizeTranscriptDir(raw.transcriptDir), qmd: { @@ -2181,6 +2237,39 @@ async function maybeResolveActiveRecall(params: { return cached; } + // Circuit breaker: skip recall when the same agent/model has timed out + // too many times in a row (#74054). + const cbKey = buildCircuitBreakerKey( + params.agentId, + resolvedModelRef?.provider, + resolvedModelRef?.model, + ); + if ( + isCircuitBreakerOpen( + cbKey, + params.config.circuitBreakerMaxTimeouts, + params.config.circuitBreakerCooldownMs, + ) + ) { + const result: ActiveRecallResult = { + status: "timeout", + elapsedMs: 0, + summary: null, + }; + if (params.config.logging) { + params.api.logger.info?.( + `${logPrefix} skipped (circuit breaker open after consecutive timeouts)`, + ); + } + await persistPluginStatusLines({ + api: params.api, + agentId: params.agentId, + sessionKey: params.sessionKey, + statusLine: `${buildPluginStatusLine({ result, config: params.config })} circuit-breaker`, + }); + return result; + } + if (params.config.logging) { params.api.logger.info?.( `${logPrefix} start timeoutMs=${String(params.config.timeoutMs)} queryChars=${String(params.query.length)}`, @@ -2241,6 +2330,7 @@ async function maybeResolveActiveRecall(params: { debugSummary: buildPersistedDebugSummary(result), searchDebug: result.searchDebug, }); + recordCircuitBreakerTimeout(cbKey); return result; } @@ -2283,6 +2373,7 @@ async function maybeResolveActiveRecall(params: { if (shouldCacheResult(result)) { setCachedResult(cacheKey, result, params.config.cacheTtlMs); } + resetCircuitBreaker(cbKey); return result; } catch (error) { if (controller.signal.aborted) { @@ -2307,6 +2398,7 @@ async function maybeResolveActiveRecall(params: { debugSummary: buildPersistedDebugSummary(result), searchDebug: result.searchDebug, }); + recordCircuitBreakerTimeout(cbKey); return result; } const message = toSingleLineLogValue(error instanceof Error ? error.message : String(error)); @@ -2544,16 +2636,19 @@ export default definePluginEntry({ export const __testing = { buildCacheKey, + buildCircuitBreakerKey, buildMetadata, buildPluginStatusLine, buildPromptPrefix, getCachedResult, + isCircuitBreakerOpen, normalizePluginConfig, readActiveMemorySearchDebug, readPartialAssistantText, shouldCacheResult, resetActiveRecallCacheForTests() { activeRecallCache.clear(); + timeoutCircuitBreaker.clear(); lastActiveRecallCacheSweepAt = 0; minimumTimeoutMs = DEFAULT_MIN_TIMEOUT_MS; setupGraceTimeoutMs = DEFAULT_SETUP_GRACE_TIMEOUT_MS; @@ -2565,4 +2660,7 @@ export const __testing = { setupGraceTimeoutMs = Math.max(0, Math.floor(value)); }, setCachedResult, + getCircuitBreakerEntry(key: string) { + return timeoutCircuitBreaker.get(key); + }, }; diff --git a/extensions/active-memory/openclaw.plugin.json b/extensions/active-memory/openclaw.plugin.json index bf5bbb4fea5..88675ae5f61 100644 --- a/extensions/active-memory/openclaw.plugin.json +++ b/extensions/active-memory/openclaw.plugin.json @@ -66,6 +66,8 @@ "persistTranscripts": { "type": "boolean" }, "transcriptDir": { "type": "string" }, "cacheTtlMs": { "type": "integer", "minimum": 1000, "maximum": 120000 }, + "circuitBreakerMaxTimeouts": { "type": "integer", "minimum": 1, "maximum": 20 }, + "circuitBreakerCooldownMs": { "type": "integer", "minimum": 5000, "maximum": 600000 }, "qmd": { "type": "object", "additionalProperties": false, @@ -142,6 +144,14 @@ "label": "Enable Logging", "help": "Emit active memory timing and result logs." }, + "circuitBreakerMaxTimeouts": { + "label": "Circuit Breaker Max Timeouts", + "help": "Skip recall after this many consecutive timeouts for the same agent/model. Resets on a successful recall or after the cooldown expires. Default: 3." + }, + "circuitBreakerCooldownMs": { + "label": "Circuit Breaker Cooldown (ms)", + "help": "How long to skip recall after the circuit breaker trips, in milliseconds. Default: 60000 (1 minute)." + }, "persistTranscripts": { "label": "Persist Transcripts", "help": "Keep blocking memory sub-agent session transcripts on disk in a separate plugin-owned directory."