mirror of
https://fastgit.cc/github.com/openclaw/openclaw
synced 2026-04-30 22:12:32 +08:00
fix: gate startup history and model requests (#65365)
This commit is contained in:
@@ -8,7 +8,7 @@ Docs: https://docs.openclaw.ai
|
||||
|
||||
### Fixes
|
||||
|
||||
- Gateway/startup: defer heartbeat, cron, and pending delivery recovery until sidecars finish so Sandbox wake and chat history startup gates cannot block channel resume. (#65365) Thanks @lml2468.
|
||||
- Gateway/startup: defer scheduled services until sidecars finish, gate chat history and model listing during sidecar resume, and let Control UI retry startup-gated history loads so Sandbox wake resumes channels first. (#65365) Thanks @lml2468.
|
||||
|
||||
## 2026.4.12-beta.1
|
||||
|
||||
|
||||
@@ -139,11 +139,11 @@ describe("gateway control-plane write rate limit", () => {
|
||||
};
|
||||
const context = {
|
||||
...buildContext(),
|
||||
unavailableGatewayMethods: new Set(["chat.history"]),
|
||||
unavailableGatewayMethods: new Set(["chat.history", "models.list"]),
|
||||
} as Parameters<typeof handleGatewayRequest>[0]["context"];
|
||||
const client = buildClient();
|
||||
|
||||
const blocked = await runRequest({ method: "chat.history", context, client, handler });
|
||||
const blocked = await runRequest({ method: "models.list", context, client, handler });
|
||||
|
||||
expect(handlerCalls).not.toHaveBeenCalled();
|
||||
expect(blocked).toHaveBeenCalledWith(
|
||||
@@ -152,6 +152,8 @@ describe("gateway control-plane write rate limit", () => {
|
||||
expect.objectContaining({
|
||||
code: "UNAVAILABLE",
|
||||
retryable: true,
|
||||
retryAfterMs: 500,
|
||||
details: { method: "models.list" },
|
||||
}),
|
||||
);
|
||||
});
|
||||
|
||||
@@ -114,6 +114,7 @@ export async function handleGatewayRequest(
|
||||
undefined,
|
||||
errorShape(ErrorCodes.UNAVAILABLE, `${req.method} unavailable during gateway startup`, {
|
||||
retryable: true,
|
||||
retryAfterMs: 500,
|
||||
details: { method: req.method },
|
||||
}),
|
||||
);
|
||||
|
||||
@@ -123,8 +123,8 @@ describe("startGatewayPostAttachRuntime", () => {
|
||||
hoisted.reconcilePendingSessionIdentities.mockClear();
|
||||
});
|
||||
|
||||
it("re-enables chat.history after post-attach sidecars start", async () => {
|
||||
const unavailableGatewayMethods = new Set<string>(["chat.history"]);
|
||||
it("re-enables startup-gated methods after post-attach sidecars start", async () => {
|
||||
const unavailableGatewayMethods = new Set<string>(["chat.history", "models.list"]);
|
||||
|
||||
await startGatewayPostAttachRuntime({
|
||||
minimalTestGateway: false,
|
||||
@@ -168,7 +168,7 @@ describe("startGatewayPostAttachRuntime", () => {
|
||||
unavailableGatewayMethods,
|
||||
});
|
||||
|
||||
expect(unavailableGatewayMethods.has("chat.history")).toBe(false);
|
||||
expect([...unavailableGatewayMethods]).toEqual([]);
|
||||
expect(hoisted.startPluginServices).toHaveBeenCalledTimes(1);
|
||||
expect(hoisted.setInternalHooksEnabled).toHaveBeenCalledWith(false);
|
||||
expect(hoisted.logGatewayStartup).toHaveBeenCalledWith(
|
||||
|
||||
@@ -43,6 +43,7 @@ import {
|
||||
} from "./server-restart-sentinel.js";
|
||||
import { logGatewayStartup } from "./server-startup-log.js";
|
||||
import { startGatewayMemoryBackend } from "./server-startup-memory.js";
|
||||
import { STARTUP_UNAVAILABLE_GATEWAY_METHODS } from "./server-startup-unavailable-methods.js";
|
||||
import { startGatewayTailscaleExposure } from "./server-tailscale.js";
|
||||
|
||||
const SESSION_LOCK_STALE_MS = 30 * 60 * 1000;
|
||||
@@ -322,7 +323,9 @@ export async function startGatewayPostAttachRuntime(params: {
|
||||
logHooks: params.logHooks,
|
||||
logChannels: params.logChannels,
|
||||
}));
|
||||
params.unavailableGatewayMethods.delete("chat.history");
|
||||
for (const method of STARTUP_UNAVAILABLE_GATEWAY_METHODS) {
|
||||
params.unavailableGatewayMethods.delete(method);
|
||||
}
|
||||
}
|
||||
|
||||
if (!params.minimalTestGateway) {
|
||||
|
||||
1
src/gateway/server-startup-unavailable-methods.ts
Normal file
1
src/gateway/server-startup-unavailable-methods.ts
Normal file
@@ -0,0 +1 @@
|
||||
export const STARTUP_UNAVAILABLE_GATEWAY_METHODS = ["chat.history", "models.list"] as const;
|
||||
@@ -74,6 +74,7 @@ import {
|
||||
prepareGatewayStartupConfig,
|
||||
} from "./server-startup-config.js";
|
||||
import { prepareGatewayPluginBootstrap } from "./server-startup-plugins.js";
|
||||
import { STARTUP_UNAVAILABLE_GATEWAY_METHODS } from "./server-startup-unavailable-methods.js";
|
||||
import { startGatewayEarlyRuntime, startGatewayPostAttachRuntime } from "./server-startup.js";
|
||||
import { createWizardSessionTracker } from "./server-wizard-sessions.js";
|
||||
import { attachGatewayWsHandlers } from "./server-ws-runtime.js";
|
||||
@@ -625,7 +626,9 @@ export async function startGatewayServer(
|
||||
|
||||
const canvasHostServerPort = (canvasHostServer as CanvasHostServer | null)?.port;
|
||||
|
||||
const unavailableGatewayMethods = new Set<string>(minimalTestGateway ? [] : ["chat.history"]);
|
||||
const unavailableGatewayMethods = new Set<string>(
|
||||
minimalTestGateway ? [] : STARTUP_UNAVAILABLE_GATEWAY_METHODS,
|
||||
);
|
||||
const gatewayRequestContext = createGatewayRequestContext({
|
||||
deps,
|
||||
runtimeState,
|
||||
@@ -756,10 +759,7 @@ export async function startGatewayServer(
|
||||
unavailableGatewayMethods,
|
||||
}));
|
||||
|
||||
// Activate cron scheduler, heartbeat runner, and pending delivery
|
||||
// recovery now that sidecars are ready and chat.history is available.
|
||||
// Previously these ran before sidecars finished, causing a race.
|
||||
// See #65322.
|
||||
// Keep scheduled work inert until post-attach sidecars finish.
|
||||
const activated = activateGatewayScheduledServices({
|
||||
minimalTestGateway,
|
||||
cfgAtStart,
|
||||
|
||||
@@ -681,6 +681,49 @@ describe("abortChatRun", () => {
|
||||
});
|
||||
|
||||
describe("loadChatHistory", () => {
|
||||
it("retries retryable startup unavailability before showing history", async () => {
|
||||
vi.useFakeTimers();
|
||||
try {
|
||||
const request = vi
|
||||
.fn()
|
||||
.mockRejectedValueOnce(
|
||||
new GatewayRequestError({
|
||||
code: "UNAVAILABLE",
|
||||
message: "chat.history unavailable during gateway startup",
|
||||
details: { method: "chat.history" },
|
||||
retryable: true,
|
||||
retryAfterMs: 250,
|
||||
}),
|
||||
)
|
||||
.mockResolvedValueOnce({
|
||||
messages: [{ role: "assistant", content: [{ type: "text", text: "awake" }] }],
|
||||
thinkingLevel: "low",
|
||||
});
|
||||
const state = createState({
|
||||
connected: true,
|
||||
client: { request } as unknown as ChatState["client"],
|
||||
});
|
||||
|
||||
const load = loadChatHistory(state);
|
||||
await vi.waitFor(() => expect(request).toHaveBeenCalledTimes(1));
|
||||
expect(state.chatLoading).toBe(true);
|
||||
expect(state.lastError).toBeNull();
|
||||
|
||||
await vi.advanceTimersByTimeAsync(250);
|
||||
await load;
|
||||
|
||||
expect(request).toHaveBeenCalledTimes(2);
|
||||
expect(state.chatMessages).toEqual([
|
||||
{ role: "assistant", content: [{ type: "text", text: "awake" }] },
|
||||
]);
|
||||
expect(state.chatThinkingLevel).toBe("low");
|
||||
expect(state.chatLoading).toBe(false);
|
||||
expect(state.lastError).toBeNull();
|
||||
} finally {
|
||||
vi.useRealTimers();
|
||||
}
|
||||
});
|
||||
|
||||
it("filters assistant NO_REPLY messages and keeps user NO_REPLY messages", async () => {
|
||||
const request = vi.fn().mockResolvedValue({
|
||||
messages: [
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
import { resetToolStream } from "../app-tool-stream.ts";
|
||||
import { extractText } from "../chat/message-extract.ts";
|
||||
import { formatConnectError } from "../connect-error.ts";
|
||||
import type { GatewayBrowserClient } from "../gateway.ts";
|
||||
import { GatewayRequestError, type GatewayBrowserClient } from "../gateway.ts";
|
||||
import { normalizeLowercaseStringOrEmpty } from "../string-coerce.ts";
|
||||
import type { ChatAttachment } from "../ui-types.ts";
|
||||
import { generateUUID } from "../uuid.ts";
|
||||
@@ -13,6 +13,9 @@ import {
|
||||
const SILENT_REPLY_PATTERN = /^\s*NO_REPLY\s*$/;
|
||||
const SYNTHETIC_TRANSCRIPT_REPAIR_RESULT =
|
||||
"[openclaw] missing tool result in session history; inserted synthetic error result for transcript repair.";
|
||||
const STARTUP_CHAT_HISTORY_RETRY_TIMEOUT_MS = 60_000;
|
||||
const STARTUP_CHAT_HISTORY_DEFAULT_RETRY_MS = 500;
|
||||
const STARTUP_CHAT_HISTORY_MAX_RETRY_MS = 5_000;
|
||||
const chatHistoryRequestVersions = new WeakMap<object, number>();
|
||||
|
||||
function beginChatHistoryRequest(state: ChatState): number {
|
||||
@@ -72,6 +75,31 @@ function shouldHideHistoryMessage(message: unknown): boolean {
|
||||
return isAssistantSilentReply(message) || isSyntheticTranscriptRepairToolResult(message);
|
||||
}
|
||||
|
||||
function isRetryableStartupUnavailable(err: unknown, method: string): err is GatewayRequestError {
|
||||
if (!(err instanceof GatewayRequestError)) {
|
||||
return false;
|
||||
}
|
||||
if (err.gatewayCode !== "UNAVAILABLE" || !err.retryable) {
|
||||
return false;
|
||||
}
|
||||
const details = err.details;
|
||||
if (!details || typeof details !== "object") {
|
||||
return true;
|
||||
}
|
||||
const detailMethod = (details as { method?: unknown }).method;
|
||||
return typeof detailMethod !== "string" || detailMethod === method;
|
||||
}
|
||||
|
||||
function resolveStartupRetryDelayMs(err: GatewayRequestError): number {
|
||||
const retryAfterMs =
|
||||
typeof err.retryAfterMs === "number" ? err.retryAfterMs : STARTUP_CHAT_HISTORY_DEFAULT_RETRY_MS;
|
||||
return Math.min(Math.max(retryAfterMs, 100), STARTUP_CHAT_HISTORY_MAX_RETRY_MS);
|
||||
}
|
||||
|
||||
function sleep(ms: number): Promise<void> {
|
||||
return new Promise((resolve) => setTimeout(resolve, ms));
|
||||
}
|
||||
|
||||
export type ChatState = {
|
||||
client: GatewayBrowserClient | null;
|
||||
connected: boolean;
|
||||
@@ -114,16 +142,37 @@ export async function loadChatHistory(state: ChatState) {
|
||||
}
|
||||
const sessionKey = state.sessionKey;
|
||||
const requestVersion = beginChatHistoryRequest(state);
|
||||
const startedAt = Date.now();
|
||||
state.chatLoading = true;
|
||||
state.lastError = null;
|
||||
try {
|
||||
const res = await state.client.request<{ messages?: Array<unknown>; thinkingLevel?: string }>(
|
||||
"chat.history",
|
||||
{
|
||||
sessionKey,
|
||||
limit: 200,
|
||||
},
|
||||
);
|
||||
let res: { messages?: Array<unknown>; thinkingLevel?: string };
|
||||
for (;;) {
|
||||
try {
|
||||
res = await state.client.request<{ messages?: Array<unknown>; thinkingLevel?: string }>(
|
||||
"chat.history",
|
||||
{
|
||||
sessionKey,
|
||||
limit: 200,
|
||||
},
|
||||
);
|
||||
break;
|
||||
} catch (err) {
|
||||
if (!shouldApplyChatHistoryResult(state, requestVersion, sessionKey)) {
|
||||
return;
|
||||
}
|
||||
const withinStartupRetryWindow =
|
||||
Date.now() - startedAt < STARTUP_CHAT_HISTORY_RETRY_TIMEOUT_MS;
|
||||
if (withinStartupRetryWindow && isRetryableStartupUnavailable(err, "chat.history")) {
|
||||
await sleep(resolveStartupRetryDelayMs(err));
|
||||
if (!state.client || !state.connected) {
|
||||
return;
|
||||
}
|
||||
continue;
|
||||
}
|
||||
throw err;
|
||||
}
|
||||
}
|
||||
if (!shouldApplyChatHistoryResult(state, requestVersion, sessionKey)) {
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -27,24 +27,36 @@ export type GatewayResponseFrame = {
|
||||
id: string;
|
||||
ok: boolean;
|
||||
payload?: unknown;
|
||||
error?: { code: string; message: string; details?: unknown };
|
||||
error?: {
|
||||
code: string;
|
||||
message: string;
|
||||
details?: unknown;
|
||||
retryable?: boolean;
|
||||
retryAfterMs?: number;
|
||||
};
|
||||
};
|
||||
|
||||
export type GatewayErrorInfo = {
|
||||
code: string;
|
||||
message: string;
|
||||
details?: unknown;
|
||||
retryable?: boolean;
|
||||
retryAfterMs?: number;
|
||||
};
|
||||
|
||||
export class GatewayRequestError extends Error {
|
||||
readonly gatewayCode: string;
|
||||
readonly details?: unknown;
|
||||
readonly retryable: boolean;
|
||||
readonly retryAfterMs?: number;
|
||||
|
||||
constructor(error: GatewayErrorInfo) {
|
||||
super(error.message);
|
||||
this.name = "GatewayRequestError";
|
||||
this.gatewayCode = error.code;
|
||||
this.details = error.details;
|
||||
this.retryable = error.retryable === true;
|
||||
this.retryAfterMs = error.retryAfterMs;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -478,6 +490,8 @@ export class GatewayBrowserClient {
|
||||
code: err.gatewayCode,
|
||||
message: err.message,
|
||||
details: err.details,
|
||||
retryable: err.retryable,
|
||||
retryAfterMs: err.retryAfterMs,
|
||||
};
|
||||
} else {
|
||||
this.pendingConnectError = undefined;
|
||||
@@ -555,6 +569,8 @@ export class GatewayBrowserClient {
|
||||
code: res.error?.code ?? "UNAVAILABLE",
|
||||
message: res.error?.message ?? "request failed",
|
||||
details: res.error?.details,
|
||||
retryable: res.error?.retryable,
|
||||
retryAfterMs: res.error?.retryAfterMs,
|
||||
}),
|
||||
);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user