From 4c6fc974fc6c4dd03ff6809abe7376ccd92b595c Mon Sep 17 00:00:00 2001 From: Vincent Koc Date: Mon, 13 Apr 2026 15:14:29 +0100 Subject: [PATCH] refactor(feishu): share synthetic event dedupe claims --- extensions/feishu/src/dedup.ts | 19 ++++++++++ extensions/feishu/src/monitor.account.ts | 37 +++++++++++-------- extensions/feishu/src/monitor.comment.test.ts | 3 +- 3 files changed, 43 insertions(+), 16 deletions(-) diff --git a/extensions/feishu/src/dedup.ts b/extensions/feishu/src/dedup.ts index adcbd40526d..65c3b10e9a3 100644 --- a/extensions/feishu/src/dedup.ts +++ b/extensions/feishu/src/dedup.ts @@ -66,6 +66,25 @@ export function releaseFeishuMessageProcessing( processingClaims.delete(resolveEventDedupeKey(namespace, messageId)); } +export async function claimUnprocessedFeishuMessage(params: { + messageId: string | undefined | null; + namespace?: string; + log?: (...args: unknown[]) => void; +}): Promise<"claimed" | "duplicate" | "inflight" | "invalid"> { + const { messageId, namespace = "global", log } = params; + const normalizedMessageId = normalizeMessageId(messageId); + if (!normalizedMessageId) { + return "invalid"; + } + if (await hasProcessedFeishuMessage(normalizedMessageId, namespace, log)) { + return "duplicate"; + } + if (!tryBeginFeishuMessageProcessing(normalizedMessageId, namespace)) { + return "inflight"; + } + return "claimed"; +} + export async function finalizeFeishuMessageProcessing(params: { messageId: string | undefined | null; namespace?: string; diff --git a/extensions/feishu/src/monitor.account.ts b/extensions/feishu/src/monitor.account.ts index c3a012012f0..3f4b89fa0f0 100644 --- a/extensions/feishu/src/monitor.account.ts +++ b/extensions/feishu/src/monitor.account.ts @@ -15,6 +15,7 @@ import { createEventDispatcher } from "./client.js"; import { handleFeishuCommentEvent } from "./comment-handler.js"; import { isRecord, readString } from "./comment-shared.js"; import { + claimUnprocessedFeishuMessage, hasProcessedFeishuMessage, recordProcessedFeishuMessage, releaseFeishuMessageProcessing, @@ -604,19 +605,20 @@ function registerEventHandlers( } const eventId = event.event_id?.trim(); const syntheticMessageId = eventId ? `drive-comment:${eventId}` : undefined; - if ( - syntheticMessageId && - (await hasProcessedFeishuMessage(syntheticMessageId, accountId, log)) - ) { - log(`feishu[${accountId}]: dropping duplicate comment event ${syntheticMessageId}`); - return; - } - if ( - syntheticMessageId && - !tryBeginFeishuMessageProcessing(syntheticMessageId, accountId) - ) { - log(`feishu[${accountId}]: dropping in-flight comment event ${syntheticMessageId}`); - return; + if (syntheticMessageId) { + const claim = await claimUnprocessedFeishuMessage({ + messageId: syntheticMessageId, + namespace: accountId, + log, + }); + if (claim === "duplicate") { + log(`feishu[${accountId}]: dropping duplicate comment event ${syntheticMessageId}`); + return; + } + if (claim === "inflight") { + log(`feishu[${accountId}]: dropping in-flight comment event ${syntheticMessageId}`); + return; + } } log( `feishu[${accountId}]: received drive comment notice ` + @@ -739,11 +741,16 @@ function registerEventHandlers( }, }; const syntheticMessageId = syntheticEvent.message.message_id; - if (await hasProcessedFeishuMessage(syntheticMessageId, accountId, log)) { + const claim = await claimUnprocessedFeishuMessage({ + messageId: syntheticMessageId, + namespace: accountId, + log, + }); + if (claim === "duplicate") { log(`feishu[${accountId}]: dropping duplicate bot-menu event for ${syntheticMessageId}`); return; } - if (!tryBeginFeishuMessageProcessing(syntheticMessageId, accountId)) { + if (claim === "inflight") { log(`feishu[${accountId}]: dropping in-flight bot-menu event for ${syntheticMessageId}`); return; } diff --git a/extensions/feishu/src/monitor.comment.test.ts b/extensions/feishu/src/monitor.comment.test.ts index faa179096aa..7815f65866f 100644 --- a/extensions/feishu/src/monitor.comment.test.ts +++ b/extensions/feishu/src/monitor.comment.test.ts @@ -876,6 +876,7 @@ describe("drive.notice.comment_add_v1 monitor handler", () => { createFeishuThreadBindingManagerMock.mockReset().mockImplementation(() => ({ stop: vi.fn(), })); + vi.spyOn(dedup, "claimUnprocessedFeishuMessage").mockResolvedValue("claimed"); vi.spyOn(dedup, "tryBeginFeishuMessageProcessing").mockReturnValue(true); vi.spyOn(dedup, "recordProcessedFeishuMessage").mockResolvedValue(true); vi.spyOn(dedup, "hasProcessedFeishuMessage").mockResolvedValue(false); @@ -951,7 +952,7 @@ describe("drive.notice.comment_add_v1 monitor handler", () => { }); it("drops duplicate comment events before dispatch", async () => { - vi.spyOn(dedup, "hasProcessedFeishuMessage").mockResolvedValue(true); + vi.spyOn(dedup, "claimUnprocessedFeishuMessage").mockResolvedValue("duplicate"); const onComment = await setupCommentMonitorHandler(); await onComment(makeDriveCommentEvent());