diff --git a/CHANGELOG.md b/CHANGELOG.md index 2cc5777237a..ed0bdea4c3b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -42,6 +42,7 @@ Docs: https://docs.openclaw.ai - Gateway/startup: keep value-option foreground starts on the gateway fast path and skip proxy bootstrap unless proxy env is configured, reducing normal gateway startup RSS and avoiding full CLI graph loading. Thanks @vincentkoc. - Heartbeat/models: show heartbeat model bleed guidance on context-overflow resets when the last runtime model matches configured `heartbeat.model`, so smaller local heartbeat models point users to `isolatedSession` or `lightContext` instead of only compaction-buffer tuning. Fixes #67314. Thanks @Knightmare6890. - Subagents/models: persist `sessions_spawn.model` and configured subagent models as child-session model overrides before the first turn, so spawned subagents actually run on the requested provider/model instead of reverting to the target agent default. Fixes #73180. Thanks @danielzinhu99. +- Channels/Telegram: keep webhook-mode local listeners alive and retry Telegram `setWebhook` registration after recoverable startup network failures, so transient Bot API timeouts no longer leave reverse proxies pointing at a closed listener. Fixes #71834. Thanks @jinon86. - Backup: skip installed plugin `extensions/*/node_modules` dependency trees while keeping plugin manifests and source files in archives, so local backups avoid rebuildable npm payload bloat. Fixes #64144. Thanks @BrilliantWang. - Cron/models: fail isolated cron runs closed when an explicit `payload.model` is not allowed or cannot be resolved, so scheduled jobs do not silently fall back to an unrelated agent default or paid route before configured provider proxies such as LiteLLM can run. Fixes #73146. Thanks @oneandrewwang. - Memory/QMD: back off repeated chat-turn QMD open failures while still letting memory status and CLI probes recheck immediately, so a broken sidecar dependency cannot trigger active-memory or cron retry storms. Fixes #73188 and #73176. Thanks @leonlushgit and @w3i-William. diff --git a/extensions/telegram/src/webhook.test.ts b/extensions/telegram/src/webhook.test.ts index d89cc01f91a..600955fa3a8 100644 --- a/extensions/telegram/src/webhook.test.ts +++ b/extensions/telegram/src/webhook.test.ts @@ -442,6 +442,60 @@ describe("startTelegramWebhook", () => { ); }); + it("keeps local listener alive and retries when setWebhook has a recoverable startup failure", async () => { + const runtimeLog = vi.fn(); + const runtimeError = vi.fn(); + setWebhookSpy.mockRejectedValueOnce(new TypeError("fetch failed")).mockResolvedValueOnce(true); + + await withStartedWebhook( + { + secret: TELEGRAM_SECRET, + path: TELEGRAM_WEBHOOK_PATH, + runtime: { log: runtimeLog, error: runtimeError, exit: vi.fn() }, + webhookRegistrationRetryPolicy: { + initialMs: 0, + maxMs: 0, + factor: 1, + jitter: 0, + }, + }, + async ({ port }) => { + const health = await fetch(`http://127.0.0.1:${port}/healthz`); + expect(health.status).toBe(200); + expect(stopSpy).not.toHaveBeenCalled(); + expect(runtimeError).toHaveBeenCalledWith( + expect.stringContaining("telegram setWebhook failed: fetch failed"), + ); + await vi.waitFor(() => expect(setWebhookSpy).toHaveBeenCalledTimes(2)); + expect(runtimeLog).toHaveBeenCalledWith("telegram setWebhook retry 1 scheduled in 0ms"); + expect(runtimeLog).toHaveBeenCalledWith( + expect.stringContaining("webhook advertised to telegram on http://"), + ); + }, + ); + }); + + it("fails startup when setWebhook has a non-recoverable rejection", async () => { + const runtimeError = vi.fn(); + const error = Object.assign(new Error("unauthorized"), { error_code: 401 }); + setWebhookSpy.mockRejectedValueOnce(error); + + await expect( + startTelegramWebhook({ + token: TELEGRAM_TOKEN, + port: 0, + secret: TELEGRAM_SECRET, + path: TELEGRAM_WEBHOOK_PATH, + runtime: { log: vi.fn(), error: runtimeError, exit: vi.fn() }, + }), + ).rejects.toThrow("unauthorized"); + + expect(stopSpy).toHaveBeenCalledTimes(1); + expect(runtimeError).toHaveBeenCalledWith( + expect.stringContaining("telegram setWebhook failed: unauthorized"), + ); + }); + it("registers webhook with certificate when webhookCertPath is provided", async () => { setWebhookSpy.mockClear(); await withStartedWebhook( diff --git a/extensions/telegram/src/webhook.ts b/extensions/telegram/src/webhook.ts index abef596c0d7..00502d38531 100644 --- a/extensions/telegram/src/webhook.ts +++ b/extensions/telegram/src/webhook.ts @@ -4,8 +4,13 @@ import net from "node:net"; import * as grammy from "grammy"; import type { OpenClawConfig } from "openclaw/plugin-sdk/config-types"; import { isDiagnosticsEnabled } from "openclaw/plugin-sdk/diagnostic-runtime"; -import type { RuntimeEnv } from "openclaw/plugin-sdk/runtime-env"; -import { defaultRuntime } from "openclaw/plugin-sdk/runtime-env"; +import type { BackoffPolicy, RuntimeEnv } from "openclaw/plugin-sdk/runtime-env"; +import { + computeBackoff, + defaultRuntime, + formatDurationPrecise, + sleepWithAbort, +} from "openclaw/plugin-sdk/runtime-env"; import { safeEqualSecret } from "openclaw/plugin-sdk/security-runtime"; import { formatErrorMessage } from "openclaw/plugin-sdk/ssrf-runtime"; import { @@ -25,9 +30,20 @@ import { readJsonBodyWithLimit } from "openclaw/plugin-sdk/webhook-request-guard import { resolveTelegramAllowedUpdates } from "./allowed-updates.js"; import { withTelegramApiErrorLogging } from "./api-logging.js"; import { createTelegramBot } from "./bot.js"; +import { + isRecoverableTelegramNetworkError, + isTelegramRateLimitError, + isTelegramServerError, +} from "./network-errors.js"; const TELEGRAM_WEBHOOK_MAX_BODY_BYTES = 1024 * 1024; const TELEGRAM_WEBHOOK_BODY_TIMEOUT_MS = 30_000; +const TELEGRAM_WEBHOOK_REGISTRATION_RETRY_POLICY: BackoffPolicy = { + initialMs: 5_000, + maxMs: 60_000, + factor: 2, + jitter: 0.2, +}; const InputFileCtor: typeof grammy.InputFile = typeof grammy.InputFile === "function" ? grammy.InputFile @@ -245,6 +261,7 @@ export async function startTelegramWebhook(opts: { healthPath?: string; publicUrl?: string; webhookCertPath?: string; + webhookRegistrationRetryPolicy?: BackoffPolicy; }) { const path = opts.path ?? "/telegram-webhook"; const healthPath = opts.healthPath ?? "/healthz"; @@ -258,6 +275,8 @@ export async function startTelegramWebhook(opts: { ); } const runtime = opts.runtime ?? defaultRuntime; + const webhookRegistrationRetryPolicy = + opts.webhookRegistrationRetryPolicy ?? TELEGRAM_WEBHOOK_REGISTRATION_RETRY_POLICY; const diagnosticsEnabled = isDiagnosticsEnabled(opts.config); const bot = createTelegramBot({ token: opts.token, @@ -398,30 +417,8 @@ export async function startTelegramWebhook(opts: { port, }); - try { - await withTelegramApiErrorLogging({ - operation: "setWebhook", - runtime, - fn: () => - bot.api.setWebhook(publicUrl, { - secret_token: secret, - allowed_updates: resolveTelegramAllowedUpdates(), - certificate: opts.webhookCertPath ? new InputFileCtor(opts.webhookCertPath) : undefined, - }), - }); - } catch (err) { - server.close(); - void bot.stop(); - if (diagnosticsEnabled) { - stopDiagnosticHeartbeat(); - } - throw err; - } - - runtime.log?.(`webhook local listener on http://${host}:${boundPort}${path}`); - runtime.log?.(`webhook advertised to telegram on ${publicUrl}`); - let shutDown = false; + let webhookAdvertised = false; const shutdown = () => { if (shutDown) { return; @@ -440,9 +437,90 @@ export async function startTelegramWebhook(opts: { stopDiagnosticHeartbeat(); } }; - if (opts.abortSignal) { + if (opts.abortSignal?.aborted) { + shutdown(); + } else if (opts.abortSignal) { opts.abortSignal.addEventListener("abort", shutdown, { once: true }); } + const advertiseWebhook = async (): Promise => { + if (shutDown || opts.abortSignal?.aborted) { + return; + } + await withTelegramApiErrorLogging({ + operation: "setWebhook", + runtime, + fn: () => + bot.api.setWebhook(publicUrl, { + secret_token: secret, + allowed_updates: resolveTelegramAllowedUpdates(), + certificate: opts.webhookCertPath ? new InputFileCtor(opts.webhookCertPath) : undefined, + }), + }); + if (shutDown) { + return; + } + webhookAdvertised = true; + runtime.log?.(`webhook advertised to telegram on ${publicUrl}`); + }; + const shouldRetryWebhookRegistration = (err: unknown): boolean => + isRecoverableTelegramNetworkError(err, { context: "webhook" }) || + isTelegramServerError(err) || + isTelegramRateLimitError(err); + const retryWebhookRegistration = async (firstAttempt: number): Promise => { + let attempt = firstAttempt; + while (true) { + if (shutDown || opts.abortSignal?.aborted || webhookAdvertised) { + return; + } + const delayMs = computeBackoff(webhookRegistrationRetryPolicy, attempt); + runtime.log?.( + `telegram setWebhook retry ${attempt} scheduled in ${formatDurationPrecise(delayMs)}`, + ); + try { + await sleepWithAbort(delayMs, opts.abortSignal); + } catch { + return; + } + if (shutDown || opts.abortSignal?.aborted || webhookAdvertised) { + return; + } + try { + await advertiseWebhook(); + return; + } catch (err) { + if (!shouldRetryWebhookRegistration(err)) { + runtime.error?.( + `telegram setWebhook retry stopped after non-recoverable error: ${formatErrorMessage(err)}`, + ); + return; + } + } + attempt += 1; + } + }; + const closeAfterStartupFailure = () => { + shutDown = true; + server.close(); + void bot.stop(); + if (diagnosticsEnabled) { + stopDiagnosticHeartbeat(); + } + }; + + runtime.log?.(`webhook local listener on http://${host}:${boundPort}${path}`); + + if (!shutDown) { + try { + await advertiseWebhook(); + } catch (err) { + if (!shouldRetryWebhookRegistration(err)) { + closeAfterStartupFailure(); + throw err; + } + void retryWebhookRegistration(1); + } + } + return { server, bot, stop: shutdown }; }