mirror of
https://fastgit.cc/github.com/openclaw/openclaw
synced 2026-04-30 22:12:32 +08:00
fix(telegram): retry startup control calls on fallback transport
This commit is contained in:
@@ -48,6 +48,7 @@ Docs: https://docs.openclaw.ai
|
||||
|
||||
### Fixes
|
||||
|
||||
- Channels/Telegram: keep Bot API network fallbacks sticky after failed attempts and retry timed-out startup control calls once on the fallback route, so `deleteWebhook` IPv6 stalls no longer trigger slow multi-account retry storms. Fixes #73255. Thanks @ttomiczek and @sktbrd.
|
||||
- Export/session: keep inline export HTML scripts and vendor libraries injected after template formatting so generated session exports open with the app code, markdown renderer, and syntax highlighter present. Fixes #41862 and #49957; carries forward #41861 and #68947. Thanks @briannewman, @martenzi, and @armanddp.
|
||||
- Agents/ACPX: stage the patched Claude ACP adapter as an ACPX runtime dependency and route known Codex/Claude ACP commands through local wrappers, so Gateway runtime no longer depends on live `npx` adapter resolution. Fixes #73202. Thanks @joerod26.
|
||||
- Memory/compaction: let pre-compaction memory flush use an exact `agents.defaults.compaction.memoryFlush.model` override such as `ollama/qwen3:8b` without inheriting the active session fallback chain, so local housekeeping can avoid paid conversation models. Fixes #53772. Thanks @limen96.
|
||||
|
||||
@@ -127,6 +127,12 @@ function extractTelegramApiMethod(input: TelegramFetchInput): string | null {
|
||||
}
|
||||
}
|
||||
|
||||
const TELEGRAM_TIMEOUT_FALLBACK_METHODS = new Set(["deletewebhook", "getme", "setwebhook"]);
|
||||
|
||||
function shouldRetryTimedOutTelegramControlRequest(method: string | null): boolean {
|
||||
return method !== null && TELEGRAM_TIMEOUT_FALLBACK_METHODS.has(method);
|
||||
}
|
||||
|
||||
export function createTelegramBotCore(
|
||||
opts: TelegramBotOptions & { telegramDeps: TelegramBotDeps },
|
||||
): TelegramBotInstance {
|
||||
@@ -185,54 +191,86 @@ export function createTelegramBotCore(
|
||||
// Use manual event forwarding instead of AbortSignal.any() to avoid the cross-realm
|
||||
// AbortSignal issue in Node.js (grammY's signal may come from a different module context,
|
||||
// causing "signals[0] must be an instance of AbortSignal" errors).
|
||||
finalFetch = (input: TelegramFetchInput, init?: TelegramFetchInit) => {
|
||||
const controller = new AbortController();
|
||||
const abortWith = (signal: Pick<TelegramAbortSignalLike, "reason">) =>
|
||||
controller.abort(signal.reason);
|
||||
finalFetch = async (input: TelegramFetchInput, init?: TelegramFetchInit) => {
|
||||
const method = extractTelegramApiMethod(input);
|
||||
const requestTimeoutMs = resolveTelegramRequestTimeoutMs(method);
|
||||
const shutdownSignal = isTelegramAbortSignalLike(opts.fetchAbortSignal)
|
||||
? opts.fetchAbortSignal
|
||||
: undefined;
|
||||
const onShutdown = () => {
|
||||
if (shutdownSignal) {
|
||||
const requestSignal = isTelegramAbortSignalLike(init?.signal) ? init.signal : undefined;
|
||||
|
||||
const runFetch = async () => {
|
||||
const controller = new AbortController();
|
||||
const abortWith = (signal: Pick<TelegramAbortSignalLike, "reason">) =>
|
||||
controller.abort(signal.reason);
|
||||
const onShutdown = () => {
|
||||
if (shutdownSignal) {
|
||||
abortWith(shutdownSignal);
|
||||
}
|
||||
};
|
||||
let requestTimeout: ReturnType<typeof setTimeout> | undefined;
|
||||
let onRequestAbort: (() => void) | undefined;
|
||||
let requestTimedOut = false;
|
||||
const timeoutError =
|
||||
requestTimeoutMs !== undefined
|
||||
? new Error(`Telegram ${method} timed out after ${requestTimeoutMs}ms`)
|
||||
: undefined;
|
||||
|
||||
if (shutdownSignal?.aborted) {
|
||||
abortWith(shutdownSignal);
|
||||
} else if (shutdownSignal) {
|
||||
shutdownSignal.addEventListener("abort", onShutdown, { once: true });
|
||||
}
|
||||
if (requestSignal) {
|
||||
if (requestSignal.aborted) {
|
||||
abortWith(requestSignal);
|
||||
} else {
|
||||
onRequestAbort = () => abortWith(requestSignal);
|
||||
requestSignal.addEventListener("abort", onRequestAbort);
|
||||
}
|
||||
}
|
||||
if (requestTimeoutMs && timeoutError) {
|
||||
requestTimeout = setTimeout(() => {
|
||||
requestTimedOut = true;
|
||||
controller.abort(timeoutError);
|
||||
}, requestTimeoutMs);
|
||||
requestTimeout.unref?.();
|
||||
}
|
||||
try {
|
||||
return await callFetch(input, {
|
||||
...init,
|
||||
signal: controller.signal,
|
||||
});
|
||||
} catch (err) {
|
||||
if (requestTimedOut && timeoutError) {
|
||||
throw timeoutError;
|
||||
}
|
||||
throw err;
|
||||
} finally {
|
||||
if (requestTimeout) {
|
||||
clearTimeout(requestTimeout);
|
||||
}
|
||||
shutdownSignal?.removeEventListener("abort", onShutdown);
|
||||
if (requestSignal && onRequestAbort) {
|
||||
requestSignal.removeEventListener("abort", onRequestAbort);
|
||||
}
|
||||
}
|
||||
};
|
||||
const method = extractTelegramApiMethod(input);
|
||||
const requestTimeoutMs = resolveTelegramRequestTimeoutMs(method);
|
||||
let requestTimeout: ReturnType<typeof setTimeout> | undefined;
|
||||
let onRequestAbort: (() => void) | undefined;
|
||||
const requestSignal = isTelegramAbortSignalLike(init?.signal) ? init.signal : undefined;
|
||||
if (shutdownSignal?.aborted) {
|
||||
abortWith(shutdownSignal);
|
||||
} else if (shutdownSignal) {
|
||||
shutdownSignal.addEventListener("abort", onShutdown, { once: true });
|
||||
}
|
||||
if (requestSignal) {
|
||||
if (requestSignal.aborted) {
|
||||
abortWith(requestSignal);
|
||||
} else {
|
||||
onRequestAbort = () => abortWith(requestSignal);
|
||||
requestSignal.addEventListener("abort", onRequestAbort);
|
||||
|
||||
try {
|
||||
return await runFetch();
|
||||
} catch (err) {
|
||||
if (
|
||||
requestTimeoutMs &&
|
||||
shouldRetryTimedOutTelegramControlRequest(method) &&
|
||||
!shutdownSignal?.aborted &&
|
||||
!requestSignal?.aborted &&
|
||||
telegramTransport.forceFallback?.("request-timeout")
|
||||
) {
|
||||
return await runFetch();
|
||||
}
|
||||
throw err;
|
||||
}
|
||||
if (requestTimeoutMs) {
|
||||
requestTimeout = setTimeout(() => {
|
||||
controller.abort(new Error(`Telegram ${method} timed out after ${requestTimeoutMs}ms`));
|
||||
}, requestTimeoutMs);
|
||||
requestTimeout.unref?.();
|
||||
}
|
||||
return callFetch(input, {
|
||||
...init,
|
||||
signal: controller.signal,
|
||||
}).finally(() => {
|
||||
if (requestTimeout) {
|
||||
clearTimeout(requestTimeout);
|
||||
}
|
||||
shutdownSignal?.removeEventListener("abort", onShutdown);
|
||||
if (requestSignal && onRequestAbort) {
|
||||
requestSignal.removeEventListener("abort", onRequestAbort);
|
||||
}
|
||||
});
|
||||
};
|
||||
}
|
||||
if (finalFetch) {
|
||||
|
||||
@@ -28,6 +28,28 @@ function createWrappedTelegramClientFetch(proxyFetch: typeof fetch) {
|
||||
return { clientFetch, shutdown };
|
||||
}
|
||||
|
||||
function createWrappedTelegramClientFetchWithTransport(params: {
|
||||
fetch: typeof fetch;
|
||||
forceFallback?: (reason: string) => boolean;
|
||||
}) {
|
||||
const shutdown = new AbortController();
|
||||
botCtorSpy.mockClear();
|
||||
createTelegramBot({
|
||||
token: "tok",
|
||||
fetchAbortSignal: shutdown.signal,
|
||||
telegramTransport: {
|
||||
fetch: params.fetch,
|
||||
sourceFetch: params.fetch,
|
||||
close: async () => undefined,
|
||||
...(params.forceFallback ? { forceFallback: params.forceFallback } : {}),
|
||||
},
|
||||
});
|
||||
const clientFetch = (botCtorSpy.mock.calls.at(-1)?.[1] as { client?: { fetch?: unknown } })
|
||||
?.client?.fetch as (input: RequestInfo | URL, init?: RequestInit) => Promise<unknown>;
|
||||
expect(clientFetch).toBeTypeOf("function");
|
||||
return { clientFetch, shutdown };
|
||||
}
|
||||
|
||||
describe("createTelegramBot fetch abort", () => {
|
||||
it("aborts wrapped client fetch when fetchAbortSignal aborts", async () => {
|
||||
const fetchSpy = vi.fn(
|
||||
@@ -89,6 +111,33 @@ describe("createTelegramBot fetch abort", () => {
|
||||
vi.useRealTimers();
|
||||
});
|
||||
|
||||
it("retries timed-out control calls once after forcing transport fallback", async () => {
|
||||
vi.useFakeTimers();
|
||||
const forceFallback = vi.fn(() => true);
|
||||
const fetchSpy = vi
|
||||
.fn()
|
||||
.mockImplementationOnce(
|
||||
(_input: RequestInfo | URL, init?: RequestInit) =>
|
||||
new Promise((_resolve, reject) => {
|
||||
const signal = init?.signal as AbortSignal;
|
||||
signal.addEventListener("abort", () => reject(signal.reason), { once: true });
|
||||
}),
|
||||
)
|
||||
.mockResolvedValueOnce({ ok: true } as Response);
|
||||
const { clientFetch } = createWrappedTelegramClientFetchWithTransport({
|
||||
fetch: fetchSpy as unknown as typeof fetch,
|
||||
forceFallback,
|
||||
});
|
||||
|
||||
const resultPromise = clientFetch("https://api.telegram.org/bot123456:ABC/deleteWebhook");
|
||||
await vi.advanceTimersByTimeAsync(15_000);
|
||||
|
||||
await expect(resultPromise).resolves.toEqual({ ok: true });
|
||||
expect(forceFallback).toHaveBeenCalledWith("request-timeout");
|
||||
expect(fetchSpy).toHaveBeenCalledTimes(2);
|
||||
vi.useRealTimers();
|
||||
});
|
||||
|
||||
it("preserves the original fetch error when tagging cannot attach metadata", async () => {
|
||||
const frozenError = Object.freeze(
|
||||
Object.assign(new TypeError("fetch failed"), {
|
||||
|
||||
@@ -756,6 +756,30 @@ describe("resolveTelegramFetch", () => {
|
||||
expectPinnedFallbackIpDispatcher(3);
|
||||
});
|
||||
|
||||
it("keeps the armed fallback sticky when all attempts fail", async () => {
|
||||
undiciFetch
|
||||
.mockRejectedValueOnce(buildFetchFallbackError("ETIMEDOUT"))
|
||||
.mockRejectedValueOnce(buildFetchFallbackError("EHOSTUNREACH"))
|
||||
.mockRejectedValueOnce(buildFetchFallbackError("ETIMEDOUT"))
|
||||
.mockResolvedValueOnce({ ok: true } as Response);
|
||||
|
||||
const resolved = resolveTelegramFetchOrThrow(undefined, {
|
||||
network: {
|
||||
autoSelectFamily: true,
|
||||
dnsResultOrder: "ipv4first",
|
||||
},
|
||||
});
|
||||
|
||||
await expect(resolved("https://api.telegram.org/botx/deleteWebhook")).rejects.toThrow(
|
||||
"fetch failed",
|
||||
);
|
||||
await resolved("https://api.telegram.org/botx/getMe");
|
||||
|
||||
expect(undiciFetch).toHaveBeenCalledTimes(4);
|
||||
expectPinnedFallbackIpDispatcher(3);
|
||||
expect(getDispatcherFromUndiciCall(4)).toBe(getDispatcherFromUndiciCall(3));
|
||||
});
|
||||
|
||||
it("preserves caller-provided dispatcher across fallback retry", async () => {
|
||||
const fetchError = buildFetchFallbackError("EHOSTUNREACH");
|
||||
undiciFetch.mockRejectedValueOnce(fetchError).mockResolvedValueOnce({ ok: true } as Response);
|
||||
|
||||
@@ -457,6 +457,11 @@ export type TelegramTransport = {
|
||||
fetch: typeof fetch;
|
||||
sourceFetch: typeof fetch;
|
||||
dispatcherAttempts?: TelegramDispatcherAttempt[];
|
||||
/**
|
||||
* Promote this transport to its next fallback dispatcher before the next
|
||||
* request. Returns false when no fallback path exists.
|
||||
*/
|
||||
forceFallback?: (reason: string) => boolean;
|
||||
/**
|
||||
* Release all dispatchers owned by this transport and the TCP sockets they
|
||||
* hold. Safe to call multiple times; subsequent calls resolve immediately.
|
||||
@@ -617,6 +622,19 @@ export function resolveTelegramTransport(
|
||||
});
|
||||
|
||||
let stickyAttemptIndex = 0;
|
||||
const promoteStickyAttempt = (nextIndex: number, err: unknown, reason?: string): boolean => {
|
||||
if (nextIndex <= stickyAttemptIndex || nextIndex >= transportAttempts.length) {
|
||||
return false;
|
||||
}
|
||||
const nextAttempt = transportAttempts[nextIndex];
|
||||
if (nextAttempt.logMessage) {
|
||||
const reasonText = reason ? `, reason=${reason}` : "";
|
||||
log.warn(`${nextAttempt.logMessage} (codes=${formatErrorCodes(err)}${reasonText})`);
|
||||
}
|
||||
stickyAttemptIndex = nextIndex;
|
||||
return true;
|
||||
};
|
||||
|
||||
const resolvedFetch = (async (input: RequestInfo | URL, init?: RequestInit) => {
|
||||
const callerProvidedDispatcher = Boolean(
|
||||
(init as RequestInitWithDispatcher | undefined)?.dispatcher,
|
||||
@@ -652,9 +670,7 @@ export function resolveTelegramTransport(
|
||||
|
||||
for (let nextIndex = startIndex + 1; nextIndex < transportAttempts.length; nextIndex += 1) {
|
||||
const nextAttempt = transportAttempts[nextIndex];
|
||||
if (nextAttempt.logMessage) {
|
||||
log.warn(`${nextAttempt.logMessage} (codes=${formatErrorCodes(err)})`);
|
||||
}
|
||||
promoteStickyAttempt(nextIndex, err);
|
||||
try {
|
||||
const response = await sourceFetch(
|
||||
input,
|
||||
@@ -669,7 +685,6 @@ export function resolveTelegramTransport(
|
||||
flowId: randomUUID(),
|
||||
meta: { subsystem: "telegram-fetch", fallbackAttempt: nextIndex },
|
||||
});
|
||||
stickyAttemptIndex = nextIndex;
|
||||
return response;
|
||||
} catch (caught) {
|
||||
err = caught;
|
||||
@@ -697,6 +712,8 @@ export function resolveTelegramTransport(
|
||||
fetch: resolvedFetch,
|
||||
sourceFetch,
|
||||
dispatcherAttempts: transportAttempts.map((attempt) => attempt.exportAttempt),
|
||||
forceFallback: (reason: string) =>
|
||||
promoteStickyAttempt(stickyAttemptIndex + 1, new Error("forced fallback"), reason),
|
||||
close,
|
||||
};
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user