From f368d3b49fd4c8f77242a692d68356223c8ddd65 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Mon, 27 Apr 2026 22:35:15 +0100 Subject: [PATCH] refactor(channels): share route identity keys --- src/auto-reply/reply/inbound-dedupe.test.ts | 16 +++++++++- src/auto-reply/reply/inbound-dedupe.ts | 13 +++++--- src/auto-reply/reply/queue.dedupe.test.ts | 30 ++++++++++++++++++ src/auto-reply/reply/queue/enqueue.ts | 27 ++++++++-------- src/channels/route/ref.test.ts | 19 +++++++++++ src/channels/route/ref.ts | 21 +++++++++++++ src/infra/approval-native-target-key.test.ts | 14 +++++++++ src/infra/approval-native-target-key.ts | 9 +++--- src/infra/exec-approval-forwarder.test.ts | 33 +++++++++++++++++++- src/infra/exec-approval-forwarder.ts | 10 ++++-- src/infra/outbound/conversation-id.test.ts | 5 +++ src/infra/outbound/conversation-id.ts | 4 +-- src/infra/system-events.test.ts | 17 ++++++++++ src/infra/system-events.ts | 7 ++--- 14 files changed, 190 insertions(+), 35 deletions(-) diff --git a/src/auto-reply/reply/inbound-dedupe.test.ts b/src/auto-reply/reply/inbound-dedupe.test.ts index ba6d029a0aa..1034f428a90 100644 --- a/src/auto-reply/reply/inbound-dedupe.test.ts +++ b/src/auto-reply/reply/inbound-dedupe.test.ts @@ -1,7 +1,7 @@ import { afterEach, describe, expect, it } from "vitest"; import { importFreshModule } from "../../../test/helpers/import-fresh.js"; import type { MsgContext } from "../templating.js"; -import { resetInboundDedupe } from "./inbound-dedupe.js"; +import { buildInboundDedupeKey, resetInboundDedupe } from "./inbound-dedupe.js"; const sharedInboundContext: MsgContext = { Provider: "discord", @@ -41,6 +41,20 @@ describe("inbound dedupe", () => { } }); + it("deduplicates inbound messages with equivalent numeric and string thread ids", () => { + expect( + buildInboundDedupeKey({ + ...sharedInboundContext, + MessageThreadId: 77, + }), + ).toBe( + buildInboundDedupeKey({ + ...sharedInboundContext, + MessageThreadId: "77", + }), + ); + }); + it("shares claim/release state across distinct module instances", async () => { const inboundA = await importFreshModule( import.meta.url, diff --git a/src/auto-reply/reply/inbound-dedupe.ts b/src/auto-reply/reply/inbound-dedupe.ts index 90fd48f25cf..4036c15106f 100644 --- a/src/auto-reply/reply/inbound-dedupe.ts +++ b/src/auto-reply/reply/inbound-dedupe.ts @@ -1,3 +1,4 @@ +import { channelRouteIdentityKey } from "../../channels/route/ref.js"; import { logVerbose, shouldLogVerbose } from "../../globals.js"; import { resolveGlobalDedupeCache, type DedupeCache } from "../../infra/dedupe.js"; import { parseAgentSessionKey } from "../../sessions/session-key-utils.js"; @@ -68,11 +69,13 @@ export function buildInboundDedupeKey(ctx: MsgContext): string | null { } const sessionScope = resolveInboundDedupeSessionScope(ctx); const accountId = normalizeOptionalString(ctx.AccountId) ?? ""; - const threadId = - ctx.MessageThreadId !== undefined && ctx.MessageThreadId !== null - ? String(ctx.MessageThreadId) - : ""; - return [provider, accountId, sessionScope, peerId, threadId, messageId].filter(Boolean).join("|"); + const routeKey = channelRouteIdentityKey({ + channel: provider, + to: peerId, + accountId, + threadId: ctx.MessageThreadId, + }); + return JSON.stringify([sessionScope, routeKey, messageId]); } export function shouldSkipDuplicateInbound( diff --git a/src/auto-reply/reply/queue.dedupe.test.ts b/src/auto-reply/reply/queue.dedupe.test.ts index 22fc37e6085..bc3f5f5c330 100644 --- a/src/auto-reply/reply/queue.dedupe.test.ts +++ b/src/auto-reply/reply/queue.dedupe.test.ts @@ -90,6 +90,36 @@ describe("followup queue deduplication", () => { expect(calls[0]?.prompt).toContain("[Queued messages while agent was busy]"); }); + it("deduplicates message ids when numeric and string thread ids share a route", async () => { + const key = `test-dedup-thread-normalized-${Date.now()}`; + + const first = enqueueFollowupRun( + key, + createRun({ + prompt: "first", + messageId: "same-id", + originatingChannel: "telegram", + originatingTo: "-100123", + originatingThreadId: 42.9, + }), + collectSettings, + ); + expect(first).toBe(true); + + const second = enqueueFollowupRun( + key, + createRun({ + prompt: "second", + messageId: "same-id", + originatingChannel: "telegram", + originatingTo: "-100123", + originatingThreadId: "42", + }), + collectSettings, + ); + expect(second).toBe(false); + }); + it("deduplicates same message_id after queue drain restarts", async () => { const key = `test-dedup-after-drain-${Date.now()}`; const { calls, done, runFollowup } = createFollowupCollector(); diff --git a/src/auto-reply/reply/queue/enqueue.ts b/src/auto-reply/reply/queue/enqueue.ts index 55ccc0b64f2..3974913f2ba 100644 --- a/src/auto-reply/reply/queue/enqueue.ts +++ b/src/auto-reply/reply/queue/enqueue.ts @@ -1,3 +1,4 @@ +import { channelRouteIdentityKey } from "../../../channels/route/ref.js"; import { resolveGlobalDedupeCache } from "../../../infra/dedupe.js"; import { normalizeOptionalString } from "../../../shared/string-coerce.js"; import { applyQueueDropPolicy, shouldSkipQueueItem } from "../../../utils/queue-helpers.js"; @@ -16,6 +17,15 @@ const RECENT_QUEUE_MESSAGE_IDS = resolveGlobalDedupeCache(RECENT_QUEUE_MESSAGE_I maxSize: 10_000, }); +function followupRouteIdentityKey(run: FollowupRun): string { + return channelRouteIdentityKey({ + channel: run.originatingChannel, + to: run.originatingTo, + accountId: run.originatingAccountId, + threadId: run.originatingThreadId, + }); +} + function buildRecentMessageIdKey(run: FollowupRun, queueKey: string): string | undefined { const messageId = normalizeOptionalString(run.messageId); if (!messageId) { @@ -23,15 +33,7 @@ function buildRecentMessageIdKey(run: FollowupRun, queueKey: string): string | u } // Use JSON tuple serialization to avoid delimiter-collision edge cases when // channel/to/account values contain "|" characters. - return JSON.stringify([ - "queue", - queueKey, - run.originatingChannel ?? "", - run.originatingTo ?? "", - run.originatingAccountId ?? "", - run.originatingThreadId == null ? "" : String(run.originatingThreadId), - messageId, - ]); + return JSON.stringify(["queue", queueKey, followupRouteIdentityKey(run), messageId]); } function isRunAlreadyQueued( @@ -39,11 +41,8 @@ function isRunAlreadyQueued( items: FollowupRun[], allowPromptFallback = false, ): boolean { - const hasSameRouting = (item: FollowupRun) => - item.originatingChannel === run.originatingChannel && - item.originatingTo === run.originatingTo && - item.originatingAccountId === run.originatingAccountId && - item.originatingThreadId === run.originatingThreadId; + const routeKey = followupRouteIdentityKey(run); + const hasSameRouting = (item: FollowupRun) => followupRouteIdentityKey(item) === routeKey; const messageId = normalizeOptionalString(run.messageId); if (messageId) { diff --git a/src/channels/route/ref.test.ts b/src/channels/route/ref.test.ts index db3d30e1388..78b368e9339 100644 --- a/src/channels/route/ref.test.ts +++ b/src/channels/route/ref.test.ts @@ -1,5 +1,6 @@ import { describe, expect, it } from "vitest"; import { + channelRouteIdentityKey, channelRouteKey, channelRoutesMatchExact, channelRoutesShareConversation, @@ -41,6 +42,24 @@ describe("channel route refs", () => { expect(channelRouteKey(route)).toBe("telegram|-100123||42"); }); + it("builds a stable identity key from route-like input", () => { + expect( + channelRouteIdentityKey({ + channel: " Telegram ", + to: " -100123 ", + accountId: " Work ", + threadId: 42.9, + }), + ).toBe( + channelRouteIdentityKey({ + channel: "telegram", + to: "-100123", + accountId: "work", + threadId: "42", + }), + ); + }); + it("matches exact routes when numeric and string thread ids are equivalent", () => { expect( channelRoutesMatchExact({ diff --git a/src/channels/route/ref.ts b/src/channels/route/ref.ts index d61d5382a90..e0b7b9b67f5 100644 --- a/src/channels/route/ref.ts +++ b/src/channels/route/ref.ts @@ -36,6 +36,11 @@ export type ChannelRouteRefInput = { threadSource?: ChannelRouteThreadSource; }; +export type ChannelRouteTargetInput = Pick< + ChannelRouteRefInput, + "channel" | "accountId" | "to" | "rawTo" | "chatType" | "threadId" +>; + export function normalizeRouteThreadId(value: unknown): string | number | undefined { return normalizeOptionalThreadValue(value); } @@ -92,6 +97,22 @@ export function channelRouteThreadId(route?: ChannelRouteRef): string | number | return route?.thread?.id; } +export function normalizeChannelRouteTarget( + input?: ChannelRouteTargetInput | null, +): ChannelRouteRef | undefined { + return input ? normalizeChannelRouteRef(input) : undefined; +} + +export function channelRouteIdentityKey(input?: ChannelRouteTargetInput | null): string { + const route = normalizeChannelRouteTarget(input); + return JSON.stringify([ + route?.channel ?? "", + route?.target?.to ?? "", + route?.accountId ?? "", + stringifyRouteThreadId(route?.thread?.id) ?? "", + ]); +} + function threadIdsEqual(left?: string | number, right?: string | number): boolean { const normalizedLeft = stringifyRouteThreadId(left); const normalizedRight = stringifyRouteThreadId(right); diff --git a/src/infra/approval-native-target-key.test.ts b/src/infra/approval-native-target-key.test.ts index 9680c1ec39a..fc6fca40bd2 100644 --- a/src/infra/approval-native-target-key.test.ts +++ b/src/infra/approval-native-target-key.test.ts @@ -28,4 +28,18 @@ describe("buildChannelApprovalNativeTargetKey", () => { }), ); }); + + it("normalizes numeric thread ids through the shared route key", () => { + expect( + buildChannelApprovalNativeTargetKey({ + to: "telegram:-100123", + threadId: 42.9, + }), + ).toBe( + buildChannelApprovalNativeTargetKey({ + to: " telegram:-100123 ", + threadId: "42", + }), + ); + }); }); diff --git a/src/infra/approval-native-target-key.ts b/src/infra/approval-native-target-key.ts index f52950cf713..f2068b2f82e 100644 --- a/src/infra/approval-native-target-key.ts +++ b/src/infra/approval-native-target-key.ts @@ -1,8 +1,9 @@ import type { ChannelApprovalNativeTarget } from "../channels/plugins/approval-native.types.js"; -import { normalizeOptionalString } from "../shared/string-coerce.js"; +import { channelRouteIdentityKey } from "../channels/route/ref.js"; export function buildChannelApprovalNativeTargetKey(target: ChannelApprovalNativeTarget): string { - return `${normalizeOptionalString(target.to) ?? ""}\u0000${ - target.threadId == null ? "" : (normalizeOptionalString(String(target.threadId)) ?? "") - }`; + return channelRouteIdentityKey({ + to: target.to, + threadId: target.threadId, + }); } diff --git a/src/infra/exec-approval-forwarder.test.ts b/src/infra/exec-approval-forwarder.test.ts index 9def9322942..55979176969 100644 --- a/src/infra/exec-approval-forwarder.test.ts +++ b/src/infra/exec-approval-forwarder.test.ts @@ -211,7 +211,12 @@ const TARGETS_CFG = makeTargetsCfg([{ channel: "slack", to: "U123" }]); function createForwarder(params: { cfg: OpenClawConfig; deliver?: ReturnType; - resolveSessionTarget?: () => { channel: string; to: string } | null; + resolveSessionTarget?: () => { + channel: string; + to: string; + accountId?: string; + threadId?: string | number; + } | null; }) { const deliver = params.deliver ?? vi.fn().mockResolvedValue([]); const deps: NonNullable[0]> = { @@ -362,6 +367,32 @@ describe("exec approval forwarder", () => { expect(deliver).toHaveBeenCalledTimes(2); }); + it("deduplicates session and explicit approval targets through normalized route identity", async () => { + vi.useFakeTimers(); + const cfg = { + approvals: { + exec: { + enabled: true, + mode: "both", + targets: [{ channel: "telegram", to: "-100999", accountId: "bot", threadId: "77" }], + }, + }, + } as OpenClawConfig; + + const { deliver, forwarder } = createForwarder({ + cfg, + resolveSessionTarget: () => ({ + channel: "telegram", + to: "-100999", + accountId: "bot", + threadId: 77, + }), + }); + + await expect(forwarder.handleRequested(baseRequest)).resolves.toBe(true); + expect(deliver).toHaveBeenCalledTimes(1); + }); + it("calls outbound beforeDeliverPayload before exec approval delivery", async () => { const beforeDeliverPayload = vi.fn(); setActivePluginRegistry( diff --git a/src/infra/exec-approval-forwarder.ts b/src/infra/exec-approval-forwarder.ts index 97f4068614f..cc4ea05ec60 100644 --- a/src/infra/exec-approval-forwarder.ts +++ b/src/infra/exec-approval-forwarder.ts @@ -3,6 +3,7 @@ import { getLoadedChannelPlugin, resolveChannelApprovalAdapter, } from "../channels/plugins/index.js"; +import { channelRouteIdentityKey } from "../channels/route/ref.js"; import { getRuntimeConfig } from "../config/config.js"; import type { ExecApprovalForwardingConfig, @@ -169,9 +170,12 @@ function shouldForwardRoute(params: { function buildTargetKey(target: ExecApprovalForwardTarget): string { const channel = normalizeMessageChannel(target.channel) ?? target.channel; - const accountId = target.accountId ?? ""; - const threadId = target.threadId ?? ""; - return [channel, target.to, accountId, threadId].join(":"); + return channelRouteIdentityKey({ + channel, + to: target.to, + accountId: target.accountId, + threadId: target.threadId, + }); } function buildSyntheticApprovalRequest(routeRequest: ApprovalRouteRequest): ExecApprovalRequest { diff --git a/src/infra/outbound/conversation-id.test.ts b/src/infra/outbound/conversation-id.test.ts index d359c2b21e5..4c86c4c75a9 100644 --- a/src/infra/outbound/conversation-id.test.ts +++ b/src/infra/outbound/conversation-id.test.ts @@ -13,6 +13,11 @@ describe("resolveConversationIdFromTargets", () => { params: { threadId: 123456789, targets: ["channel:987654321"] }, expected: "123456789", }, + { + name: "truncates decimal numeric thread ids", + params: { threadId: 42.9, targets: ["channel:987654321"] }, + expected: "42", + }, { name: "falls back when the thread id is blank", params: { threadId: " ", targets: ["channel:987654321"] }, diff --git a/src/infra/outbound/conversation-id.ts b/src/infra/outbound/conversation-id.ts index df0d2080a76..b9c9068f8c8 100644 --- a/src/infra/outbound/conversation-id.ts +++ b/src/infra/outbound/conversation-id.ts @@ -1,3 +1,4 @@ +import { stringifyRouteThreadId } from "../../channels/route/ref.js"; import { normalizeLowercaseStringOrEmpty, normalizeOptionalString, @@ -16,8 +17,7 @@ export function resolveConversationIdFromTargets(params: { threadId?: string | number; targets: Array; }): string | undefined { - const threadId = - params.threadId != null ? normalizeOptionalString(String(params.threadId)) : undefined; + const threadId = stringifyRouteThreadId(params.threadId); if (threadId) { return threadId; } diff --git a/src/infra/system-events.test.ts b/src/infra/system-events.test.ts index 398cd9087a1..1bffc16d473 100644 --- a/src/infra/system-events.test.ts +++ b/src/infra/system-events.test.ts @@ -120,6 +120,23 @@ describe("system events (session routing)", () => { expect(peekSystemEvents(key)).toEqual(["second"]); }); + it("matches consumed delivery contexts through normalized route identity", () => { + const key = "agent:main:test-consume-route-context"; + enqueueSystemEvent("first", { + sessionKey: key, + deliveryContext: { + channel: "telegram", + to: "-100123", + threadId: 42.9, + }, + }); + const inspected = peekSystemEventEntries(key); + inspected[0].deliveryContext!.threadId = "42"; + + expect(consumeSystemEventEntries(key, inspected).map((entry) => entry.text)).toEqual(["first"]); + expect(peekSystemEvents(key)).toEqual([]); + }); + it("resolves the newest effective delivery context from queued events", () => { const key = "agent:main:test-delivery-context"; enqueueSystemEvent("Restarted", { diff --git a/src/infra/system-events.ts b/src/infra/system-events.ts index 2496481c4f1..01d1c2d75c3 100644 --- a/src/infra/system-events.ts +++ b/src/infra/system-events.ts @@ -2,6 +2,7 @@ // prefixed to the next prompt. We intentionally avoid persistence to keep // events ephemeral. Events are session-scoped and require an explicit key. +import { channelRouteIdentityKey } from "../channels/route/ref.js"; import { resolveGlobalMap } from "../shared/global-singleton.js"; import { normalizeOptionalLowercaseString, @@ -135,11 +136,7 @@ function areDeliveryContextsEqual(left?: DeliveryContext, right?: DeliveryContex if (!left || !right) { return false; } - return ( - (left.channel ?? undefined) === (right.channel ?? undefined) && - (left.to ?? undefined) === (right.to ?? undefined) && - (left.threadId ?? undefined) === (right.threadId ?? undefined) - ); + return channelRouteIdentityKey(left) === channelRouteIdentityKey(right); } function areSystemEventsEqual(left: SystemEvent, right: SystemEvent): boolean {