From 74a667f119cf675c9301503d36ef09a2c293b541 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Tue, 28 Apr 2026 06:01:46 +0100 Subject: [PATCH] fix(telegram): retry startup control calls on fallback transport --- CHANGELOG.md | 1 + extensions/telegram/src/bot-core.ts | 118 ++++++++++++------ .../telegram/src/bot.fetch-abort.test.ts | 49 ++++++++ extensions/telegram/src/fetch.test.ts | 24 ++++ extensions/telegram/src/fetch.ts | 25 +++- 5 files changed, 173 insertions(+), 44 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 3436c836961..2af83d20cc4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -48,6 +48,7 @@ Docs: https://docs.openclaw.ai ### Fixes +- Channels/Telegram: keep Bot API network fallbacks sticky after failed attempts and retry timed-out startup control calls once on the fallback route, so `deleteWebhook` IPv6 stalls no longer trigger slow multi-account retry storms. Fixes #73255. Thanks @ttomiczek and @sktbrd. - Export/session: keep inline export HTML scripts and vendor libraries injected after template formatting so generated session exports open with the app code, markdown renderer, and syntax highlighter present. Fixes #41862 and #49957; carries forward #41861 and #68947. Thanks @briannewman, @martenzi, and @armanddp. - Agents/ACPX: stage the patched Claude ACP adapter as an ACPX runtime dependency and route known Codex/Claude ACP commands through local wrappers, so Gateway runtime no longer depends on live `npx` adapter resolution. Fixes #73202. Thanks @joerod26. - Memory/compaction: let pre-compaction memory flush use an exact `agents.defaults.compaction.memoryFlush.model` override such as `ollama/qwen3:8b` without inheriting the active session fallback chain, so local housekeeping can avoid paid conversation models. Fixes #53772. Thanks @limen96. diff --git a/extensions/telegram/src/bot-core.ts b/extensions/telegram/src/bot-core.ts index f841221f6ec..09dd6d28099 100644 --- a/extensions/telegram/src/bot-core.ts +++ b/extensions/telegram/src/bot-core.ts @@ -127,6 +127,12 @@ function extractTelegramApiMethod(input: TelegramFetchInput): string | null { } } +const TELEGRAM_TIMEOUT_FALLBACK_METHODS = new Set(["deletewebhook", "getme", "setwebhook"]); + +function shouldRetryTimedOutTelegramControlRequest(method: string | null): boolean { + return method !== null && TELEGRAM_TIMEOUT_FALLBACK_METHODS.has(method); +} + export function createTelegramBotCore( opts: TelegramBotOptions & { telegramDeps: TelegramBotDeps }, ): TelegramBotInstance { @@ -185,54 +191,86 @@ export function createTelegramBotCore( // Use manual event forwarding instead of AbortSignal.any() to avoid the cross-realm // AbortSignal issue in Node.js (grammY's signal may come from a different module context, // causing "signals[0] must be an instance of AbortSignal" errors). - finalFetch = (input: TelegramFetchInput, init?: TelegramFetchInit) => { - const controller = new AbortController(); - const abortWith = (signal: Pick) => - controller.abort(signal.reason); + finalFetch = async (input: TelegramFetchInput, init?: TelegramFetchInit) => { + const method = extractTelegramApiMethod(input); + const requestTimeoutMs = resolveTelegramRequestTimeoutMs(method); const shutdownSignal = isTelegramAbortSignalLike(opts.fetchAbortSignal) ? opts.fetchAbortSignal : undefined; - const onShutdown = () => { - if (shutdownSignal) { + const requestSignal = isTelegramAbortSignalLike(init?.signal) ? init.signal : undefined; + + const runFetch = async () => { + const controller = new AbortController(); + const abortWith = (signal: Pick) => + controller.abort(signal.reason); + const onShutdown = () => { + if (shutdownSignal) { + abortWith(shutdownSignal); + } + }; + let requestTimeout: ReturnType | undefined; + let onRequestAbort: (() => void) | undefined; + let requestTimedOut = false; + const timeoutError = + requestTimeoutMs !== undefined + ? new Error(`Telegram ${method} timed out after ${requestTimeoutMs}ms`) + : undefined; + + if (shutdownSignal?.aborted) { abortWith(shutdownSignal); + } else if (shutdownSignal) { + shutdownSignal.addEventListener("abort", onShutdown, { once: true }); + } + if (requestSignal) { + if (requestSignal.aborted) { + abortWith(requestSignal); + } else { + onRequestAbort = () => abortWith(requestSignal); + requestSignal.addEventListener("abort", onRequestAbort); + } + } + if (requestTimeoutMs && timeoutError) { + requestTimeout = setTimeout(() => { + requestTimedOut = true; + controller.abort(timeoutError); + }, requestTimeoutMs); + requestTimeout.unref?.(); + } + try { + return await callFetch(input, { + ...init, + signal: controller.signal, + }); + } catch (err) { + if (requestTimedOut && timeoutError) { + throw timeoutError; + } + throw err; + } finally { + if (requestTimeout) { + clearTimeout(requestTimeout); + } + shutdownSignal?.removeEventListener("abort", onShutdown); + if (requestSignal && onRequestAbort) { + requestSignal.removeEventListener("abort", onRequestAbort); + } } }; - const method = extractTelegramApiMethod(input); - const requestTimeoutMs = resolveTelegramRequestTimeoutMs(method); - let requestTimeout: ReturnType | undefined; - let onRequestAbort: (() => void) | undefined; - const requestSignal = isTelegramAbortSignalLike(init?.signal) ? init.signal : undefined; - if (shutdownSignal?.aborted) { - abortWith(shutdownSignal); - } else if (shutdownSignal) { - shutdownSignal.addEventListener("abort", onShutdown, { once: true }); - } - if (requestSignal) { - if (requestSignal.aborted) { - abortWith(requestSignal); - } else { - onRequestAbort = () => abortWith(requestSignal); - requestSignal.addEventListener("abort", onRequestAbort); + + try { + return await runFetch(); + } catch (err) { + if ( + requestTimeoutMs && + shouldRetryTimedOutTelegramControlRequest(method) && + !shutdownSignal?.aborted && + !requestSignal?.aborted && + telegramTransport.forceFallback?.("request-timeout") + ) { + return await runFetch(); } + throw err; } - if (requestTimeoutMs) { - requestTimeout = setTimeout(() => { - controller.abort(new Error(`Telegram ${method} timed out after ${requestTimeoutMs}ms`)); - }, requestTimeoutMs); - requestTimeout.unref?.(); - } - return callFetch(input, { - ...init, - signal: controller.signal, - }).finally(() => { - if (requestTimeout) { - clearTimeout(requestTimeout); - } - shutdownSignal?.removeEventListener("abort", onShutdown); - if (requestSignal && onRequestAbort) { - requestSignal.removeEventListener("abort", onRequestAbort); - } - }); }; } if (finalFetch) { diff --git a/extensions/telegram/src/bot.fetch-abort.test.ts b/extensions/telegram/src/bot.fetch-abort.test.ts index 49d2b3486c3..ca491dd2fbc 100644 --- a/extensions/telegram/src/bot.fetch-abort.test.ts +++ b/extensions/telegram/src/bot.fetch-abort.test.ts @@ -28,6 +28,28 @@ function createWrappedTelegramClientFetch(proxyFetch: typeof fetch) { return { clientFetch, shutdown }; } +function createWrappedTelegramClientFetchWithTransport(params: { + fetch: typeof fetch; + forceFallback?: (reason: string) => boolean; +}) { + const shutdown = new AbortController(); + botCtorSpy.mockClear(); + createTelegramBot({ + token: "tok", + fetchAbortSignal: shutdown.signal, + telegramTransport: { + fetch: params.fetch, + sourceFetch: params.fetch, + close: async () => undefined, + ...(params.forceFallback ? { forceFallback: params.forceFallback } : {}), + }, + }); + const clientFetch = (botCtorSpy.mock.calls.at(-1)?.[1] as { client?: { fetch?: unknown } }) + ?.client?.fetch as (input: RequestInfo | URL, init?: RequestInit) => Promise; + expect(clientFetch).toBeTypeOf("function"); + return { clientFetch, shutdown }; +} + describe("createTelegramBot fetch abort", () => { it("aborts wrapped client fetch when fetchAbortSignal aborts", async () => { const fetchSpy = vi.fn( @@ -89,6 +111,33 @@ describe("createTelegramBot fetch abort", () => { vi.useRealTimers(); }); + it("retries timed-out control calls once after forcing transport fallback", async () => { + vi.useFakeTimers(); + const forceFallback = vi.fn(() => true); + const fetchSpy = vi + .fn() + .mockImplementationOnce( + (_input: RequestInfo | URL, init?: RequestInit) => + new Promise((_resolve, reject) => { + const signal = init?.signal as AbortSignal; + signal.addEventListener("abort", () => reject(signal.reason), { once: true }); + }), + ) + .mockResolvedValueOnce({ ok: true } as Response); + const { clientFetch } = createWrappedTelegramClientFetchWithTransport({ + fetch: fetchSpy as unknown as typeof fetch, + forceFallback, + }); + + const resultPromise = clientFetch("https://api.telegram.org/bot123456:ABC/deleteWebhook"); + await vi.advanceTimersByTimeAsync(15_000); + + await expect(resultPromise).resolves.toEqual({ ok: true }); + expect(forceFallback).toHaveBeenCalledWith("request-timeout"); + expect(fetchSpy).toHaveBeenCalledTimes(2); + vi.useRealTimers(); + }); + it("preserves the original fetch error when tagging cannot attach metadata", async () => { const frozenError = Object.freeze( Object.assign(new TypeError("fetch failed"), { diff --git a/extensions/telegram/src/fetch.test.ts b/extensions/telegram/src/fetch.test.ts index 4c04ba7a22b..0b57e954524 100644 --- a/extensions/telegram/src/fetch.test.ts +++ b/extensions/telegram/src/fetch.test.ts @@ -756,6 +756,30 @@ describe("resolveTelegramFetch", () => { expectPinnedFallbackIpDispatcher(3); }); + it("keeps the armed fallback sticky when all attempts fail", async () => { + undiciFetch + .mockRejectedValueOnce(buildFetchFallbackError("ETIMEDOUT")) + .mockRejectedValueOnce(buildFetchFallbackError("EHOSTUNREACH")) + .mockRejectedValueOnce(buildFetchFallbackError("ETIMEDOUT")) + .mockResolvedValueOnce({ ok: true } as Response); + + const resolved = resolveTelegramFetchOrThrow(undefined, { + network: { + autoSelectFamily: true, + dnsResultOrder: "ipv4first", + }, + }); + + await expect(resolved("https://api.telegram.org/botx/deleteWebhook")).rejects.toThrow( + "fetch failed", + ); + await resolved("https://api.telegram.org/botx/getMe"); + + expect(undiciFetch).toHaveBeenCalledTimes(4); + expectPinnedFallbackIpDispatcher(3); + expect(getDispatcherFromUndiciCall(4)).toBe(getDispatcherFromUndiciCall(3)); + }); + it("preserves caller-provided dispatcher across fallback retry", async () => { const fetchError = buildFetchFallbackError("EHOSTUNREACH"); undiciFetch.mockRejectedValueOnce(fetchError).mockResolvedValueOnce({ ok: true } as Response); diff --git a/extensions/telegram/src/fetch.ts b/extensions/telegram/src/fetch.ts index d5501620e72..8a8c226fc1e 100644 --- a/extensions/telegram/src/fetch.ts +++ b/extensions/telegram/src/fetch.ts @@ -457,6 +457,11 @@ export type TelegramTransport = { fetch: typeof fetch; sourceFetch: typeof fetch; dispatcherAttempts?: TelegramDispatcherAttempt[]; + /** + * Promote this transport to its next fallback dispatcher before the next + * request. Returns false when no fallback path exists. + */ + forceFallback?: (reason: string) => boolean; /** * Release all dispatchers owned by this transport and the TCP sockets they * hold. Safe to call multiple times; subsequent calls resolve immediately. @@ -617,6 +622,19 @@ export function resolveTelegramTransport( }); let stickyAttemptIndex = 0; + const promoteStickyAttempt = (nextIndex: number, err: unknown, reason?: string): boolean => { + if (nextIndex <= stickyAttemptIndex || nextIndex >= transportAttempts.length) { + return false; + } + const nextAttempt = transportAttempts[nextIndex]; + if (nextAttempt.logMessage) { + const reasonText = reason ? `, reason=${reason}` : ""; + log.warn(`${nextAttempt.logMessage} (codes=${formatErrorCodes(err)}${reasonText})`); + } + stickyAttemptIndex = nextIndex; + return true; + }; + const resolvedFetch = (async (input: RequestInfo | URL, init?: RequestInit) => { const callerProvidedDispatcher = Boolean( (init as RequestInitWithDispatcher | undefined)?.dispatcher, @@ -652,9 +670,7 @@ export function resolveTelegramTransport( for (let nextIndex = startIndex + 1; nextIndex < transportAttempts.length; nextIndex += 1) { const nextAttempt = transportAttempts[nextIndex]; - if (nextAttempt.logMessage) { - log.warn(`${nextAttempt.logMessage} (codes=${formatErrorCodes(err)})`); - } + promoteStickyAttempt(nextIndex, err); try { const response = await sourceFetch( input, @@ -669,7 +685,6 @@ export function resolveTelegramTransport( flowId: randomUUID(), meta: { subsystem: "telegram-fetch", fallbackAttempt: nextIndex }, }); - stickyAttemptIndex = nextIndex; return response; } catch (caught) { err = caught; @@ -697,6 +712,8 @@ export function resolveTelegramTransport( fetch: resolvedFetch, sourceFetch, dispatcherAttempts: transportAttempts.map((attempt) => attempt.exportAttempt), + forceFallback: (reason: string) => + promoteStickyAttempt(stickyAttemptIndex + 1, new Error("forced fallback"), reason), close, }; }