diff --git a/src/agents/tools/sessions-announce-target.ts b/src/agents/tools/sessions-announce-target.ts index 401cdc40c61..095bb39c5c9 100644 --- a/src/agents/tools/sessions-announce-target.ts +++ b/src/agents/tools/sessions-announce-target.ts @@ -2,6 +2,7 @@ import { getChannelPlugin, normalizeChannelId } from "../../channels/plugins/ind import type { CallGatewayOptions } from "../../gateway/call.js"; import { parseThreadSessionSuffix } from "../../sessions/session-key-utils.js"; import { normalizeOptionalStringifiedId } from "../../shared/string-coerce.js"; +import { deliveryContextFromSession } from "../../utils/delivery-context.shared.js"; import type { SessionListRow } from "./sessions-helpers.js"; import type { AnnounceTarget } from "./sessions-send-helpers.js"; import { resolveAnnounceTargetFromKey } from "./sessions-send-helpers.js"; @@ -45,30 +46,10 @@ export async function resolveAnnounceTarget(params: { sessions.find((entry) => entry?.key === params.sessionKey) ?? sessions.find((entry) => entry?.key === params.displayKey); - const deliveryContext = - match?.deliveryContext && typeof match.deliveryContext === "object" - ? (match.deliveryContext as Record) - : undefined; - const origin = - match?.origin && typeof match.origin === "object" - ? (match.origin as Record) - : undefined; - const channel = - (typeof deliveryContext?.channel === "string" ? deliveryContext.channel : undefined) ?? - (typeof match?.lastChannel === "string" ? match.lastChannel : undefined) ?? - (typeof origin?.provider === "string" ? origin.provider : undefined); - const to = - (typeof deliveryContext?.to === "string" ? deliveryContext.to : undefined) ?? - (typeof match?.lastTo === "string" ? match.lastTo : undefined); - const accountId = - (typeof deliveryContext?.accountId === "string" ? deliveryContext.accountId : undefined) ?? - (typeof match?.lastAccountId === "string" ? match.lastAccountId : undefined) ?? - (typeof origin?.accountId === "string" ? origin.accountId : undefined); - const threadId = normalizeOptionalStringifiedId( - deliveryContext?.threadId ?? match?.lastThreadId ?? origin?.threadId ?? fallbackThreadId, - ); - if (channel && to) { - return { channel, to, accountId, threadId }; + const context = deliveryContextFromSession(match); + const threadId = normalizeOptionalStringifiedId(context?.threadId ?? fallbackThreadId); + if (context?.channel && context.to) { + return { channel: context.channel, to: context.to, accountId: context.accountId, threadId }; } } catch { // ignore diff --git a/src/auto-reply/reply/queue.collect.test.ts b/src/auto-reply/reply/queue.collect.test.ts index af38525b7cf..6be6bdd5b91 100644 --- a/src/auto-reply/reply/queue.collect.test.ts +++ b/src/auto-reply/reply/queue.collect.test.ts @@ -622,6 +622,50 @@ describe("followup queue collect routing", () => { expect(calls[0]?.originatingThreadId).toBe("1706000000.000001"); }); + it("collects messages when numeric and string thread ids share the route key", async () => { + const key = `test-collect-thread-normalized-${Date.now()}`; + const calls: FollowupRun[] = []; + const done = createDeferred(); + const runFollowup = async (run: FollowupRun) => { + calls.push(run); + done.resolve(); + }; + const settings: QueueSettings = { + mode: "collect", + debounceMs: 0, + cap: 50, + dropPolicy: "summarize", + }; + + enqueueFollowupRun( + key, + createRun({ + prompt: "one", + originatingChannel: "telegram", + originatingTo: "-100123", + originatingThreadId: 42.9, + }), + settings, + ); + enqueueFollowupRun( + key, + createRun({ + prompt: "two", + originatingChannel: "telegram", + originatingTo: "-100123", + originatingThreadId: "42", + }), + settings, + ); + + scheduleFollowupDrain(key, runFollowup); + await done.promise; + expect(calls).toHaveLength(1); + expect(calls[0]?.prompt).toContain("[Queued messages while agent was busy]"); + expect(calls[0]?.prompt).toContain("one"); + expect(calls[0]?.prompt).toContain("two"); + }); + it("does not collect Slack messages when thread ids differ", async () => { const key = `test-collect-slack-thread-diff-${Date.now()}`; const calls: FollowupRun[] = []; diff --git a/src/auto-reply/reply/queue/drain.ts b/src/auto-reply/reply/queue/drain.ts index fcaf5ed2404..31cb3f13acc 100644 --- a/src/auto-reply/reply/queue/drain.ts +++ b/src/auto-reply/reply/queue/drain.ts @@ -1,3 +1,4 @@ +import { channelRouteKey, normalizeChannelRouteRef } from "../../../channels/route/ref.js"; import { defaultRuntime } from "../../../runtime.js"; import { resolveGlobalMap } from "../../../shared/global-singleton.js"; import { @@ -134,11 +135,8 @@ function resolveCrossChannelKey(item: FollowupRun): { cross?: true; key?: string if (!isRoutableChannel(channel) || !to) { return { cross: true }; } - // Support both number (Telegram topic IDs) and string (Slack thread_ts) thread IDs. - const threadKey = threadId != null && threadId !== "" ? String(threadId) : ""; - return { - key: [channel, to, accountId || "", threadKey].join("|"), - }; + const key = channelRouteKey(normalizeChannelRouteRef({ channel, to, accountId, threadId })); + return key ? { key } : { cross: true }; } export function scheduleFollowupDrain(