mirror of
https://fastgit.cc/github.com/openclaw/openclaw
synced 2026-05-01 06:36:23 +08:00
refactor(feishu): share synthetic event dedupe claims
This commit is contained in:
@@ -66,6 +66,25 @@ export function releaseFeishuMessageProcessing(
|
||||
processingClaims.delete(resolveEventDedupeKey(namespace, messageId));
|
||||
}
|
||||
|
||||
export async function claimUnprocessedFeishuMessage(params: {
|
||||
messageId: string | undefined | null;
|
||||
namespace?: string;
|
||||
log?: (...args: unknown[]) => void;
|
||||
}): Promise<"claimed" | "duplicate" | "inflight" | "invalid"> {
|
||||
const { messageId, namespace = "global", log } = params;
|
||||
const normalizedMessageId = normalizeMessageId(messageId);
|
||||
if (!normalizedMessageId) {
|
||||
return "invalid";
|
||||
}
|
||||
if (await hasProcessedFeishuMessage(normalizedMessageId, namespace, log)) {
|
||||
return "duplicate";
|
||||
}
|
||||
if (!tryBeginFeishuMessageProcessing(normalizedMessageId, namespace)) {
|
||||
return "inflight";
|
||||
}
|
||||
return "claimed";
|
||||
}
|
||||
|
||||
export async function finalizeFeishuMessageProcessing(params: {
|
||||
messageId: string | undefined | null;
|
||||
namespace?: string;
|
||||
|
||||
@@ -15,6 +15,7 @@ import { createEventDispatcher } from "./client.js";
|
||||
import { handleFeishuCommentEvent } from "./comment-handler.js";
|
||||
import { isRecord, readString } from "./comment-shared.js";
|
||||
import {
|
||||
claimUnprocessedFeishuMessage,
|
||||
hasProcessedFeishuMessage,
|
||||
recordProcessedFeishuMessage,
|
||||
releaseFeishuMessageProcessing,
|
||||
@@ -604,19 +605,20 @@ function registerEventHandlers(
|
||||
}
|
||||
const eventId = event.event_id?.trim();
|
||||
const syntheticMessageId = eventId ? `drive-comment:${eventId}` : undefined;
|
||||
if (
|
||||
syntheticMessageId &&
|
||||
(await hasProcessedFeishuMessage(syntheticMessageId, accountId, log))
|
||||
) {
|
||||
log(`feishu[${accountId}]: dropping duplicate comment event ${syntheticMessageId}`);
|
||||
return;
|
||||
}
|
||||
if (
|
||||
syntheticMessageId &&
|
||||
!tryBeginFeishuMessageProcessing(syntheticMessageId, accountId)
|
||||
) {
|
||||
log(`feishu[${accountId}]: dropping in-flight comment event ${syntheticMessageId}`);
|
||||
return;
|
||||
if (syntheticMessageId) {
|
||||
const claim = await claimUnprocessedFeishuMessage({
|
||||
messageId: syntheticMessageId,
|
||||
namespace: accountId,
|
||||
log,
|
||||
});
|
||||
if (claim === "duplicate") {
|
||||
log(`feishu[${accountId}]: dropping duplicate comment event ${syntheticMessageId}`);
|
||||
return;
|
||||
}
|
||||
if (claim === "inflight") {
|
||||
log(`feishu[${accountId}]: dropping in-flight comment event ${syntheticMessageId}`);
|
||||
return;
|
||||
}
|
||||
}
|
||||
log(
|
||||
`feishu[${accountId}]: received drive comment notice ` +
|
||||
@@ -739,11 +741,16 @@ function registerEventHandlers(
|
||||
},
|
||||
};
|
||||
const syntheticMessageId = syntheticEvent.message.message_id;
|
||||
if (await hasProcessedFeishuMessage(syntheticMessageId, accountId, log)) {
|
||||
const claim = await claimUnprocessedFeishuMessage({
|
||||
messageId: syntheticMessageId,
|
||||
namespace: accountId,
|
||||
log,
|
||||
});
|
||||
if (claim === "duplicate") {
|
||||
log(`feishu[${accountId}]: dropping duplicate bot-menu event for ${syntheticMessageId}`);
|
||||
return;
|
||||
}
|
||||
if (!tryBeginFeishuMessageProcessing(syntheticMessageId, accountId)) {
|
||||
if (claim === "inflight") {
|
||||
log(`feishu[${accountId}]: dropping in-flight bot-menu event for ${syntheticMessageId}`);
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -876,6 +876,7 @@ describe("drive.notice.comment_add_v1 monitor handler", () => {
|
||||
createFeishuThreadBindingManagerMock.mockReset().mockImplementation(() => ({
|
||||
stop: vi.fn(),
|
||||
}));
|
||||
vi.spyOn(dedup, "claimUnprocessedFeishuMessage").mockResolvedValue("claimed");
|
||||
vi.spyOn(dedup, "tryBeginFeishuMessageProcessing").mockReturnValue(true);
|
||||
vi.spyOn(dedup, "recordProcessedFeishuMessage").mockResolvedValue(true);
|
||||
vi.spyOn(dedup, "hasProcessedFeishuMessage").mockResolvedValue(false);
|
||||
@@ -951,7 +952,7 @@ describe("drive.notice.comment_add_v1 monitor handler", () => {
|
||||
});
|
||||
|
||||
it("drops duplicate comment events before dispatch", async () => {
|
||||
vi.spyOn(dedup, "hasProcessedFeishuMessage").mockResolvedValue(true);
|
||||
vi.spyOn(dedup, "claimUnprocessedFeishuMessage").mockResolvedValue("duplicate");
|
||||
const onComment = await setupCommentMonitorHandler();
|
||||
|
||||
await onComment(makeDriveCommentEvent());
|
||||
|
||||
Reference in New Issue
Block a user