diff --git a/src/hooks/team-session-streamer/hook.test.ts b/src/hooks/team-session-streamer/hook.test.ts index 9c77627fa..41f0bd485 100644 --- a/src/hooks/team-session-streamer/hook.test.ts +++ b/src/hooks/team-session-streamer/hook.test.ts @@ -651,4 +651,151 @@ describe("createTeamSessionStreamer", () => { expect(writeTeamSessionFifoMock).toHaveBeenCalledTimes(1) expect(writeTeamSessionFifoMock).toHaveBeenCalledWith("/tmp/omo-team/11111111-1111-4111-8111-111111111111/member-a.fifo", "fresh") }) + + test("does not restore cleared dedupe state when session.deleted arrives during an in-flight write", async () => { + // given + const listActiveTeams = mock(async () => [{ + teamRunId: "11111111-1111-4111-8111-111111111111", + teamName: "team-alpha", + status: "active" as const, + memberCount: 1, + scope: "project" as const, + }]) + const loadRuntimeState = mock(async () => createRuntimeState()) + let releaseSecondWrite: (() => void) | undefined + const secondWriteGate = new Promise((resolve) => { + releaseSecondWrite = resolve + }) + let callCount = 0 + writeTeamSessionFifoMock.mockImplementation(async () => { + callCount += 1 + if (callCount === 2) await secondWriteGate + }) + const config = TeamModeConfigSchema.parse({ enabled: true, tmux_visualization: true }) + const streamer = createTeamSessionStreamer(config, { listActiveTeams, loadRuntimeState }) + + // when + await streamer.event({ + event: { + type: "message.part.updated", + properties: { + part: { id: "part-a", sessionID: "member-session", messageID: "msg-a", type: "text", text: "A\n" }, + }, + }, + }) + const gatedEventPromise = streamer.event({ + event: { + type: "message.part.updated", + properties: { + part: { id: "part-a", sessionID: "member-session", messageID: "msg-a", type: "text", text: "A\nB\n" }, + }, + }, + }) + await new Promise((resolvePromise) => setTimeout(resolvePromise, 10)) + await streamer.event({ + event: { + type: "session.deleted", + properties: { info: { id: "member-session" } as never }, + }, + }) + releaseSecondWrite?.() + await gatedEventPromise + await streamer.event({ + event: { + type: "message.part.updated", + properties: { + part: { id: "part-a", sessionID: "member-session", messageID: "msg-a", type: "text", text: "A\nB\nC\n" }, + }, + }, + }) + streamer.dispose() + + // then + expect(writeTeamSessionFifoMock).toHaveBeenCalledTimes(3) + expect(writeTeamSessionFifoMock).toHaveBeenNthCalledWith(1, "/tmp/omo-team/11111111-1111-4111-8111-111111111111/member-a.fifo", "A\n") + expect(writeTeamSessionFifoMock).toHaveBeenNthCalledWith(2, "/tmp/omo-team/11111111-1111-4111-8111-111111111111/member-a.fifo", "B\n") + expect(writeTeamSessionFifoMock).toHaveBeenNthCalledWith(3, "/tmp/omo-team/11111111-1111-4111-8111-111111111111/member-a.fifo", "A\nB\nC\n") + }) + + test("skips state mutation when message.part.removed arrives for the drained partID during write", async () => { + // given + const listActiveTeams = mock(async () => [{ + teamRunId: "11111111-1111-4111-8111-111111111111", + teamName: "team-alpha", + status: "active" as const, + memberCount: 1, + scope: "project" as const, + }]) + const loadRuntimeState = mock(async () => createRuntimeState()) + let releaseSecondWrite: (() => void) | undefined + const secondWriteGate = new Promise((resolve) => { + releaseSecondWrite = resolve + }) + let callCount = 0 + writeTeamSessionFifoMock.mockImplementation(async () => { + callCount += 1 + if (callCount === 2) await secondWriteGate + }) + const config = TeamModeConfigSchema.parse({ enabled: true, tmux_visualization: true }) + const streamer = createTeamSessionStreamer(config, { listActiveTeams, loadRuntimeState }) + + // when + await streamer.event({ + event: { + type: "message.part.updated", + properties: { + part: { + id: "part-a", + sessionID: "member-session", + messageID: "msg-a", + type: "text", + text: "first-wave\n", + }, + }, + }, + }) + const slowEvent = streamer.event({ + event: { + type: "message.part.updated", + properties: { + part: { + id: "part-a", + sessionID: "member-session", + messageID: "msg-a", + type: "text", + text: "first-wave\nin-flight\n", + }, + }, + }, + }) + await new Promise((resolvePromise) => setTimeout(resolvePromise, 10)) + await streamer.event({ + event: { + type: "message.part.removed", + properties: { sessionID: "member-session", messageID: "msg-a", partID: "part-a" }, + }, + }) + releaseSecondWrite?.() + await slowEvent + await streamer.event({ + event: { + type: "message.part.updated", + properties: { + part: { + id: "part-a", + sessionID: "member-session", + messageID: "msg-a", + type: "text", + text: "first-wave\nin-flight\n", + }, + }, + }, + }) + streamer.dispose() + + // then + expect(writeTeamSessionFifoMock).toHaveBeenCalledTimes(3) + expect(writeTeamSessionFifoMock).toHaveBeenNthCalledWith(1, "/tmp/omo-team/11111111-1111-4111-8111-111111111111/member-a.fifo", "first-wave\n") + expect(writeTeamSessionFifoMock).toHaveBeenNthCalledWith(3, "/tmp/omo-team/11111111-1111-4111-8111-111111111111/member-a.fifo", "first-wave\nin-flight\n") + }) }) diff --git a/src/hooks/team-session-streamer/hook.ts b/src/hooks/team-session-streamer/hook.ts index ae5d882d3..bc6121f1d 100644 --- a/src/hooks/team-session-streamer/hook.ts +++ b/src/hooks/team-session-streamer/hook.ts @@ -4,11 +4,12 @@ import type { TeamModeConfig } from "../../config/schema/team-mode" import { getTeamMemberFifoPath } from "../../features/team-mode/team-layout-tmux/fifo-path" import * as teamStateStore from "../../features/team-mode/team-state-store" import { log } from "../../shared/logger" -import { clearSessionPartState, previewPendingEvent } from "./apply-pending-event" +import { clearSessionPartState, type PendingEventPreview, previewPendingEvent } from "./apply-pending-event" import { type MessagePartDeltaEvent, normalizeDeltaEventForBuffer, normalizeUpdateEventForBuffer } from "./event-normalizer" import { writeTeamSessionFifo } from "./fifo-writer" import { createPendingDeltaBuffer, type PendingStreamEvent } from "./pending-delta-buffer" import { createPendingRetryScheduler } from "./pending-retry-scheduler" +import { createStreamGeneration } from "./stream-generation" type TeamStateStore = Pick @@ -23,6 +24,8 @@ type TeamSessionStreamTarget = { type HookInput = { event: StreamEvent } type HookImpl = { event: (input: HookInput) => Promise; dispose: () => void } +type AttemptWriteResult = { written: boolean; preview?: PendingEventPreview } + const DROPPABLE_FIFO_ERROR_CODES = new Set(["ENXIO", "ENOENT", "EPIPE"]) const PENDING_RESOLVE_RETRY_MS = 250 @@ -34,6 +37,7 @@ export function createTeamSessionStreamer(config: TeamModeConfig, stateStore: Te const streamTargetsBySession = new Map() const partTextByKey = new Map() const pendingDeltaBuffer = createPendingDeltaBuffer() + const generation = createStreamGeneration() async function resolveStreamTarget(sessionID: string): Promise { const cachedTarget = streamTargetsBySession.get(sessionID) @@ -90,26 +94,41 @@ export function createTeamSessionStreamer(config: TeamModeConfig, stateStore: Te } } - async function writePendingEvent( + async function attemptWrite( target: TeamSessionStreamTarget, sessionID: string, pending: PendingStreamEvent, - ): Promise { + ): Promise { const preview = previewPendingEvent(pending, sessionID, partTextByKey) - if (!preview) return true + if (!preview) return { written: true } const written = await writeSegment(target, sessionID, preview.appendedText) - if (written) partTextByKey.set(preview.partKey, preview.nextState) - else pendingDeltaBuffer.enqueue(sessionID, pending) - return written + return { written, preview: written ? preview : undefined } } async function drainAndWrite(target: TeamSessionStreamTarget, sessionID: string): Promise { + const drainSessionGen = generation.captureSession(sessionID) const drained = pendingDeltaBuffer.drain(sessionID) + const drainPartGens = new Map() + for (const pending of drained) { + if (!drainPartGens.has(pending.partID)) { + drainPartGens.set(pending.partID, generation.capturePart(sessionID, pending.partID)) + } + } for (let i = 0; i < drained.length; i++) { - const written = await writePendingEvent(target, sessionID, drained[i]) - if (!written) { + const pending = drained[i] + const startPartGen = drainPartGens.get(pending.partID) ?? 0 + const result = await attemptWrite(target, sessionID, pending) + if (!generation.isSessionCurrent(sessionID, drainSessionGen)) return false + if (!generation.isPartCurrent(sessionID, pending.partID, startPartGen)) continue + if (result.preview) partTextByKey.set(result.preview.partKey, result.preview.nextState) + if (!result.written) { + pendingDeltaBuffer.enqueue(sessionID, pending) for (let j = i + 1; j < drained.length; j++) { - pendingDeltaBuffer.enqueue(sessionID, drained[j]) + const remaining = drained[j] + const remainingStart = drainPartGens.get(remaining.partID) ?? 0 + if (generation.isPartCurrent(sessionID, remaining.partID, remainingStart)) { + pendingDeltaBuffer.enqueue(sessionID, remaining) + } } return false } @@ -127,7 +146,11 @@ export function createTeamSessionStreamer(config: TeamModeConfig, stateStore: Te }) async function handlePendingEvent(sessionID: string, pending: PendingStreamEvent): Promise { + const sessionGenAtStart = generation.captureSession(sessionID) + const partGenAtStart = generation.capturePart(sessionID, pending.partID) const target = await resolveStreamTarget(sessionID) + if (!generation.isSessionCurrent(sessionID, sessionGenAtStart)) return + if (!generation.isPartCurrent(sessionID, pending.partID, partGenAtStart)) return if (!target) { pendingDeltaBuffer.enqueue(sessionID, pending) retryScheduler.schedule() @@ -144,6 +167,7 @@ export function createTeamSessionStreamer(config: TeamModeConfig, stateStore: Te if (event.type === "session.deleted") { const sessionID = event.properties.info.id + generation.bumpSession(sessionID) streamTargetsBySession.delete(sessionID) pendingDeltaBuffer.clearSession(sessionID) clearSessionPartState(sessionID, partTextByKey) @@ -153,6 +177,7 @@ export function createTeamSessionStreamer(config: TeamModeConfig, stateStore: Te if (event.type === "message.part.removed") { const { sessionID, partID } = event.properties + generation.bumpPart(sessionID, partID) pendingDeltaBuffer.removePart(sessionID, partID) partTextByKey.delete(`${sessionID}:${partID}`) if (pendingDeltaBuffer.getPendingSessions().length === 0) retryScheduler.stop() @@ -162,6 +187,7 @@ export function createTeamSessionStreamer(config: TeamModeConfig, stateStore: Te if (event.type === "session.error") { const sessionID = event.properties.sessionID if (sessionID) { + generation.bumpSession(sessionID) streamTargetsBySession.delete(sessionID) pendingDeltaBuffer.clearSession(sessionID) clearSessionPartState(sessionID, partTextByKey) diff --git a/src/hooks/team-session-streamer/pending-retry-scheduler.test.ts b/src/hooks/team-session-streamer/pending-retry-scheduler.test.ts index e5d96b32e..6e72d6564 100644 --- a/src/hooks/team-session-streamer/pending-retry-scheduler.test.ts +++ b/src/hooks/team-session-streamer/pending-retry-scheduler.test.ts @@ -70,4 +70,35 @@ describe("createPendingRetryScheduler", () => { expect(pending.size).toBe(0) scheduler.stop() }) + + test("aborts the in-flight run and does not reschedule when stop is called during drain", async () => { + // given + const pending = new Set(["session-a", "session-b", "session-c"]) + let release: (() => void) | undefined + const gate = new Promise((resolve) => { + release = resolve + }) + const drained: string[] = [] + const drainSession = mock(async (sessionID: string) => { + drained.push(sessionID) + if (sessionID === "session-a") await gate + pending.delete(sessionID) + }) + const scheduler = createPendingRetryScheduler({ + intervalMs: 10, + getPendingSessions: () => Array.from(pending), + drainSession, + }) + + // when + scheduler.schedule() + await new Promise((resolvePromise) => setTimeout(resolvePromise, 30)) + scheduler.stop() + release?.() + await new Promise((resolvePromise) => setTimeout(resolvePromise, 80)) + + // then + expect(drained).toEqual(["session-a"]) + expect(pending.size).toBe(2) + }) }) diff --git a/src/hooks/team-session-streamer/pending-retry-scheduler.ts b/src/hooks/team-session-streamer/pending-retry-scheduler.ts index 137b60e09..2533949ea 100644 --- a/src/hooks/team-session-streamer/pending-retry-scheduler.ts +++ b/src/hooks/team-session-streamer/pending-retry-scheduler.ts @@ -13,12 +13,15 @@ export type PendingRetrySchedulerOptions = { export function createPendingRetryScheduler(options: PendingRetrySchedulerOptions): PendingRetryScheduler { let timer: ReturnType | undefined + let stopped = false async function run(): Promise { timer = undefined + if (stopped) return try { const sessions = options.getPendingSessions() for (const sessionID of sessions) { + if (stopped) return try { await options.drainSession(sessionID) } catch (error) { @@ -30,18 +33,19 @@ export function createPendingRetryScheduler(options: PendingRetrySchedulerOption } } } finally { - if (options.getPendingSessions().length > 0) schedule() + if (!stopped && options.getPendingSessions().length > 0) schedule() } } function schedule(): void { - if (timer) return + if (stopped || timer) return timer = setTimeout(() => { void run() }, options.intervalMs) } function stop(): void { + stopped = true if (!timer) return clearTimeout(timer) timer = undefined diff --git a/src/hooks/team-session-streamer/stream-generation.ts b/src/hooks/team-session-streamer/stream-generation.ts new file mode 100644 index 000000000..664892703 --- /dev/null +++ b/src/hooks/team-session-streamer/stream-generation.ts @@ -0,0 +1,34 @@ +export type StreamGeneration = { + bumpSession: (sessionID: string) => void + bumpPart: (sessionID: string, partID: string) => void + captureSession: (sessionID: string) => number + capturePart: (sessionID: string, partID: string) => number + isSessionCurrent: (sessionID: string, generation: number) => boolean + isPartCurrent: (sessionID: string, partID: string, generation: number) => boolean +} + +export function createStreamGeneration(): StreamGeneration { + const sessionGen = new Map() + const partGen = new Map() + + function sessionValue(sessionID: string): number { + return sessionGen.get(sessionID) ?? 0 + } + + function partValue(sessionID: string, partID: string): number { + return partGen.get(`${sessionID}:${partID}`) ?? 0 + } + + return { + bumpSession: (sessionID) => { + sessionGen.set(sessionID, sessionValue(sessionID) + 1) + }, + bumpPart: (sessionID, partID) => { + partGen.set(`${sessionID}:${partID}`, partValue(sessionID, partID) + 1) + }, + captureSession: sessionValue, + capturePart: partValue, + isSessionCurrent: (sessionID, generation) => sessionValue(sessionID) === generation, + isPartCurrent: (sessionID, partID, generation) => partValue(sessionID, partID) === generation, + } +}