mirror of
https://fastgit.cc/github.com/openclaw/openclaw
synced 2026-05-01 06:36:23 +08:00
feat(active-memory): add timeout circuit breaker to skip recall after consecutive failures (#74054) (#74158)
This commit is contained in:
@@ -3086,4 +3086,147 @@ describe("active-memory plugin", () => {
|
||||
),
|
||||
).toMatchObject({ status: "ok", summary: "memory 1" });
|
||||
});
|
||||
|
||||
it("skips recall after consecutive timeouts when circuit breaker trips (#74054)", async () => {
|
||||
__testing.setMinimumTimeoutMsForTests(1);
|
||||
__testing.setSetupGraceTimeoutMsForTests(0);
|
||||
api.pluginConfig = {
|
||||
agents: ["main"],
|
||||
timeoutMs: 1,
|
||||
logging: true,
|
||||
circuitBreakerMaxTimeouts: 2,
|
||||
circuitBreakerCooldownMs: 60_000,
|
||||
};
|
||||
plugin.register(api as unknown as OpenClawPluginApi);
|
||||
runEmbeddedPiAgent.mockImplementation(async () => await new Promise<never>(() => {}));
|
||||
|
||||
// First two calls should actually attempt the subagent (and timeout).
|
||||
await hooks.before_prompt_build(
|
||||
{ prompt: "circuit breaker test 1", messages: [] },
|
||||
{
|
||||
agentId: "main",
|
||||
trigger: "user",
|
||||
sessionKey: "agent:main:cb-test",
|
||||
messageProvider: "webchat",
|
||||
},
|
||||
);
|
||||
await hooks.before_prompt_build(
|
||||
{ prompt: "circuit breaker test 2", messages: [] },
|
||||
{
|
||||
agentId: "main",
|
||||
trigger: "user",
|
||||
sessionKey: "agent:main:cb-test",
|
||||
messageProvider: "webchat",
|
||||
},
|
||||
);
|
||||
expect(runEmbeddedPiAgent).toHaveBeenCalledTimes(2);
|
||||
|
||||
// Third call should be skipped by the circuit breaker.
|
||||
await hooks.before_prompt_build(
|
||||
{ prompt: "circuit breaker test 3", messages: [] },
|
||||
{
|
||||
agentId: "main",
|
||||
trigger: "user",
|
||||
sessionKey: "agent:main:cb-test",
|
||||
messageProvider: "webchat",
|
||||
},
|
||||
);
|
||||
// The subagent should NOT have been called a third time.
|
||||
expect(runEmbeddedPiAgent).toHaveBeenCalledTimes(2);
|
||||
|
||||
const infoLines = vi
|
||||
.mocked(api.logger.info)
|
||||
.mock.calls.map((call: unknown[]) => String(call[0]));
|
||||
expect(infoLines.some((line: string) => line.includes("circuit breaker open"))).toBe(true);
|
||||
});
|
||||
|
||||
it("resets circuit breaker after a successful recall", async () => {
|
||||
__testing.setMinimumTimeoutMsForTests(1);
|
||||
__testing.setSetupGraceTimeoutMsForTests(0);
|
||||
api.pluginConfig = {
|
||||
agents: ["main"],
|
||||
timeoutMs: 1,
|
||||
logging: true,
|
||||
circuitBreakerMaxTimeouts: 1,
|
||||
circuitBreakerCooldownMs: 60_000,
|
||||
};
|
||||
plugin.register(api as unknown as OpenClawPluginApi);
|
||||
|
||||
// First call: timeout (trips the breaker with max=1).
|
||||
runEmbeddedPiAgent.mockImplementationOnce(async () => await new Promise<never>(() => {}));
|
||||
await hooks.before_prompt_build(
|
||||
{ prompt: "cb reset test timeout", messages: [] },
|
||||
{
|
||||
agentId: "main",
|
||||
trigger: "user",
|
||||
sessionKey: "agent:main:cb-reset",
|
||||
messageProvider: "webchat",
|
||||
},
|
||||
);
|
||||
expect(runEmbeddedPiAgent).toHaveBeenCalledTimes(1);
|
||||
|
||||
// Second call should be skipped by circuit breaker.
|
||||
await hooks.before_prompt_build(
|
||||
{ prompt: "cb reset test skipped", messages: [] },
|
||||
{
|
||||
agentId: "main",
|
||||
trigger: "user",
|
||||
sessionKey: "agent:main:cb-reset",
|
||||
messageProvider: "webchat",
|
||||
},
|
||||
);
|
||||
expect(runEmbeddedPiAgent).toHaveBeenCalledTimes(1);
|
||||
|
||||
// Simulate cooldown expiry by manipulating the circuit breaker entry.
|
||||
const cbKey = __testing.buildCircuitBreakerKey("main", "github-copilot", "gpt-5.4-mini");
|
||||
const entry = __testing.getCircuitBreakerEntry(cbKey);
|
||||
if (entry) {
|
||||
entry.lastTimeoutAt = Date.now() - 120_000;
|
||||
}
|
||||
|
||||
// Third call should go through (cooldown expired) and succeed.
|
||||
runEmbeddedPiAgent.mockImplementationOnce(async () => ({
|
||||
payloads: [{ text: "- lemon pepper wings" }],
|
||||
}));
|
||||
await hooks.before_prompt_build(
|
||||
{ prompt: "cb reset test success", messages: [] },
|
||||
{
|
||||
agentId: "main",
|
||||
trigger: "user",
|
||||
sessionKey: "agent:main:cb-reset",
|
||||
messageProvider: "webchat",
|
||||
},
|
||||
);
|
||||
expect(runEmbeddedPiAgent).toHaveBeenCalledTimes(2);
|
||||
|
||||
// Fourth call should also go through since the breaker was reset on success.
|
||||
runEmbeddedPiAgent.mockImplementationOnce(async () => ({
|
||||
payloads: [{ text: "- buffalo wings" }],
|
||||
}));
|
||||
await hooks.before_prompt_build(
|
||||
{ prompt: "cb reset test still ok", messages: [] },
|
||||
{
|
||||
agentId: "main",
|
||||
trigger: "user",
|
||||
sessionKey: "agent:main:cb-reset",
|
||||
messageProvider: "webchat",
|
||||
},
|
||||
);
|
||||
expect(runEmbeddedPiAgent).toHaveBeenCalledTimes(3);
|
||||
});
|
||||
|
||||
it("normalizes circuit breaker config with defaults", () => {
|
||||
const config = __testing.normalizePluginConfig({});
|
||||
expect(config.circuitBreakerMaxTimeouts).toBe(3);
|
||||
expect(config.circuitBreakerCooldownMs).toBe(60_000);
|
||||
});
|
||||
|
||||
it("clamps circuit breaker config within valid ranges", () => {
|
||||
const config = __testing.normalizePluginConfig({
|
||||
circuitBreakerMaxTimeouts: 0,
|
||||
circuitBreakerCooldownMs: 1000,
|
||||
});
|
||||
expect(config.circuitBreakerMaxTimeouts).toBe(1);
|
||||
expect(config.circuitBreakerCooldownMs).toBe(5000);
|
||||
});
|
||||
});
|
||||
|
||||
@@ -39,6 +39,8 @@ const DEFAULT_SETUP_GRACE_TIMEOUT_MS = 30_000;
|
||||
const DEFAULT_QUERY_MODE = "recent" as const;
|
||||
const DEFAULT_QMD_SEARCH_MODE = "search" as const;
|
||||
const DEFAULT_TRANSCRIPT_DIR = "active-memory";
|
||||
const DEFAULT_CIRCUIT_BREAKER_MAX_TIMEOUTS = 3;
|
||||
const DEFAULT_CIRCUIT_BREAKER_COOLDOWN_MS = 60_000;
|
||||
const TOGGLE_STATE_FILE = "session-toggles.json";
|
||||
const DEFAULT_PARTIAL_TRANSCRIPT_MAX_CHARS = 32_000;
|
||||
const DEFAULT_TRANSCRIPT_READ_MAX_LINES = 2_000;
|
||||
@@ -97,6 +99,8 @@ type ActiveRecallPluginConfig = {
|
||||
recentAssistantChars?: number;
|
||||
logging?: boolean;
|
||||
cacheTtlMs?: number;
|
||||
circuitBreakerMaxTimeouts?: number;
|
||||
circuitBreakerCooldownMs?: number;
|
||||
persistTranscripts?: boolean;
|
||||
transcriptDir?: string;
|
||||
qmd?: {
|
||||
@@ -134,6 +138,8 @@ type ResolvedActiveRecallPluginConfig = {
|
||||
recentAssistantChars: number;
|
||||
logging: boolean;
|
||||
cacheTtlMs: number;
|
||||
circuitBreakerMaxTimeouts: number;
|
||||
circuitBreakerCooldownMs: number;
|
||||
persistTranscripts: boolean;
|
||||
transcriptDir: string;
|
||||
qmd: {
|
||||
@@ -278,6 +284,44 @@ const MAX_LOG_VALUE_CHARS = 300;
|
||||
|
||||
const activeRecallCache = new Map<string, CachedActiveRecallResult>();
|
||||
|
||||
type CircuitBreakerEntry = {
|
||||
consecutiveTimeouts: number;
|
||||
lastTimeoutAt: number;
|
||||
};
|
||||
|
||||
const timeoutCircuitBreaker = new Map<string, CircuitBreakerEntry>();
|
||||
|
||||
function buildCircuitBreakerKey(agentId: string, provider?: string, model?: string): string {
|
||||
return `${agentId}:${provider ?? "unknown"}/${model ?? "unknown"}`;
|
||||
}
|
||||
|
||||
function isCircuitBreakerOpen(key: string, maxTimeouts: number, cooldownMs: number): boolean {
|
||||
const entry = timeoutCircuitBreaker.get(key);
|
||||
if (!entry || entry.consecutiveTimeouts < maxTimeouts) {
|
||||
return false;
|
||||
}
|
||||
if (Date.now() - entry.lastTimeoutAt >= cooldownMs) {
|
||||
// Cooldown expired — reset and allow one attempt through.
|
||||
timeoutCircuitBreaker.delete(key);
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
function recordCircuitBreakerTimeout(key: string): void {
|
||||
const entry = timeoutCircuitBreaker.get(key);
|
||||
if (entry) {
|
||||
entry.consecutiveTimeouts++;
|
||||
entry.lastTimeoutAt = Date.now();
|
||||
} else {
|
||||
timeoutCircuitBreaker.set(key, { consecutiveTimeouts: 1, lastTimeoutAt: Date.now() });
|
||||
}
|
||||
}
|
||||
|
||||
function resetCircuitBreaker(key: string): void {
|
||||
timeoutCircuitBreaker.delete(key);
|
||||
}
|
||||
|
||||
function parseOptionalPositiveInt(value: unknown, fallback: number): number {
|
||||
const parsed =
|
||||
typeof value === "number"
|
||||
@@ -718,6 +762,18 @@ function normalizePluginConfig(pluginConfig: unknown): ResolvedActiveRecallPlugi
|
||||
),
|
||||
logging: raw.logging === true,
|
||||
cacheTtlMs: clampInt(raw.cacheTtlMs, DEFAULT_CACHE_TTL_MS, 1000, 120_000),
|
||||
circuitBreakerMaxTimeouts: clampInt(
|
||||
raw.circuitBreakerMaxTimeouts,
|
||||
DEFAULT_CIRCUIT_BREAKER_MAX_TIMEOUTS,
|
||||
1,
|
||||
20,
|
||||
),
|
||||
circuitBreakerCooldownMs: clampInt(
|
||||
raw.circuitBreakerCooldownMs,
|
||||
DEFAULT_CIRCUIT_BREAKER_COOLDOWN_MS,
|
||||
5000,
|
||||
600_000,
|
||||
),
|
||||
persistTranscripts: raw.persistTranscripts === true,
|
||||
transcriptDir: normalizeTranscriptDir(raw.transcriptDir),
|
||||
qmd: {
|
||||
@@ -2181,6 +2237,39 @@ async function maybeResolveActiveRecall(params: {
|
||||
return cached;
|
||||
}
|
||||
|
||||
// Circuit breaker: skip recall when the same agent/model has timed out
|
||||
// too many times in a row (#74054).
|
||||
const cbKey = buildCircuitBreakerKey(
|
||||
params.agentId,
|
||||
resolvedModelRef?.provider,
|
||||
resolvedModelRef?.model,
|
||||
);
|
||||
if (
|
||||
isCircuitBreakerOpen(
|
||||
cbKey,
|
||||
params.config.circuitBreakerMaxTimeouts,
|
||||
params.config.circuitBreakerCooldownMs,
|
||||
)
|
||||
) {
|
||||
const result: ActiveRecallResult = {
|
||||
status: "timeout",
|
||||
elapsedMs: 0,
|
||||
summary: null,
|
||||
};
|
||||
if (params.config.logging) {
|
||||
params.api.logger.info?.(
|
||||
`${logPrefix} skipped (circuit breaker open after consecutive timeouts)`,
|
||||
);
|
||||
}
|
||||
await persistPluginStatusLines({
|
||||
api: params.api,
|
||||
agentId: params.agentId,
|
||||
sessionKey: params.sessionKey,
|
||||
statusLine: `${buildPluginStatusLine({ result, config: params.config })} circuit-breaker`,
|
||||
});
|
||||
return result;
|
||||
}
|
||||
|
||||
if (params.config.logging) {
|
||||
params.api.logger.info?.(
|
||||
`${logPrefix} start timeoutMs=${String(params.config.timeoutMs)} queryChars=${String(params.query.length)}`,
|
||||
@@ -2241,6 +2330,7 @@ async function maybeResolveActiveRecall(params: {
|
||||
debugSummary: buildPersistedDebugSummary(result),
|
||||
searchDebug: result.searchDebug,
|
||||
});
|
||||
recordCircuitBreakerTimeout(cbKey);
|
||||
return result;
|
||||
}
|
||||
|
||||
@@ -2283,6 +2373,7 @@ async function maybeResolveActiveRecall(params: {
|
||||
if (shouldCacheResult(result)) {
|
||||
setCachedResult(cacheKey, result, params.config.cacheTtlMs);
|
||||
}
|
||||
resetCircuitBreaker(cbKey);
|
||||
return result;
|
||||
} catch (error) {
|
||||
if (controller.signal.aborted) {
|
||||
@@ -2307,6 +2398,7 @@ async function maybeResolveActiveRecall(params: {
|
||||
debugSummary: buildPersistedDebugSummary(result),
|
||||
searchDebug: result.searchDebug,
|
||||
});
|
||||
recordCircuitBreakerTimeout(cbKey);
|
||||
return result;
|
||||
}
|
||||
const message = toSingleLineLogValue(error instanceof Error ? error.message : String(error));
|
||||
@@ -2544,16 +2636,19 @@ export default definePluginEntry({
|
||||
|
||||
export const __testing = {
|
||||
buildCacheKey,
|
||||
buildCircuitBreakerKey,
|
||||
buildMetadata,
|
||||
buildPluginStatusLine,
|
||||
buildPromptPrefix,
|
||||
getCachedResult,
|
||||
isCircuitBreakerOpen,
|
||||
normalizePluginConfig,
|
||||
readActiveMemorySearchDebug,
|
||||
readPartialAssistantText,
|
||||
shouldCacheResult,
|
||||
resetActiveRecallCacheForTests() {
|
||||
activeRecallCache.clear();
|
||||
timeoutCircuitBreaker.clear();
|
||||
lastActiveRecallCacheSweepAt = 0;
|
||||
minimumTimeoutMs = DEFAULT_MIN_TIMEOUT_MS;
|
||||
setupGraceTimeoutMs = DEFAULT_SETUP_GRACE_TIMEOUT_MS;
|
||||
@@ -2565,4 +2660,7 @@ export const __testing = {
|
||||
setupGraceTimeoutMs = Math.max(0, Math.floor(value));
|
||||
},
|
||||
setCachedResult,
|
||||
getCircuitBreakerEntry(key: string) {
|
||||
return timeoutCircuitBreaker.get(key);
|
||||
},
|
||||
};
|
||||
|
||||
@@ -66,6 +66,8 @@
|
||||
"persistTranscripts": { "type": "boolean" },
|
||||
"transcriptDir": { "type": "string" },
|
||||
"cacheTtlMs": { "type": "integer", "minimum": 1000, "maximum": 120000 },
|
||||
"circuitBreakerMaxTimeouts": { "type": "integer", "minimum": 1, "maximum": 20 },
|
||||
"circuitBreakerCooldownMs": { "type": "integer", "minimum": 5000, "maximum": 600000 },
|
||||
"qmd": {
|
||||
"type": "object",
|
||||
"additionalProperties": false,
|
||||
@@ -142,6 +144,14 @@
|
||||
"label": "Enable Logging",
|
||||
"help": "Emit active memory timing and result logs."
|
||||
},
|
||||
"circuitBreakerMaxTimeouts": {
|
||||
"label": "Circuit Breaker Max Timeouts",
|
||||
"help": "Skip recall after this many consecutive timeouts for the same agent/model. Resets on a successful recall or after the cooldown expires. Default: 3."
|
||||
},
|
||||
"circuitBreakerCooldownMs": {
|
||||
"label": "Circuit Breaker Cooldown (ms)",
|
||||
"help": "How long to skip recall after the circuit breaker trips, in milliseconds. Default: 60000 (1 minute)."
|
||||
},
|
||||
"persistTranscripts": {
|
||||
"label": "Persist Transcripts",
|
||||
"help": "Keep blocking memory sub-agent session transcripts on disk in a separate plugin-owned directory."
|
||||
|
||||
Reference in New Issue
Block a user