From fa20c2b47cb191e4fd044172e2c2c564d52c26fe Mon Sep 17 00:00:00 2001 From: YeonGyu-Kim Date: Sat, 18 Apr 2026 07:09:47 +0900 Subject: [PATCH] fix(team-mode): resolve stream target before mutating part-text dedupe state team-session-streamer was mutating partTextByKey inside extractSegment BEFORE checking if the stream target could be resolved. If a member emitted output in the launch-before-mapping window (session created, events arriving, but runtimeState.members[.].sessionId not yet written), the first event recorded its cumulative text into the dedupe map but was silently dropped at the write step because no target existed. The next cumulative update would then diff against that recorded prefix and emit only the delta, permanently losing the earlier text. Reproduction (per Oracle round 8): event 1: message.part.updated text='early', mapping not ready -> partTextByKey['key']='early', write skipped event 2: message.part.updated text='early late', mapping ready -> previousText='early', appended=' late' -> FIFO received only ' late', 'early' lost forever Fix: swap the order. resolveStreamTarget first; if no target, return without touching partTextByKey. Once mapping appears, the next cumulative update diffs against '' and emits the full current text. Apply the same ordering to message.part.delta path (orphan/non-text short-circuits still run before target resolution since they carry no state mutation). Regression test in hook.test.ts: 'does not lose text when runtime state mapping is not yet available' -- listActiveTeams returns [] for event 1, then [team] for event 2 -- assert FIFO write sees full 'early late' cumulative, not just ' late' delta Oracle review blocker: pre-mapping startup gap losing earliest text. --- src/hooks/team-session-streamer/hook.test.ts | 53 ++++++++++++++++++++ src/hooks/team-session-streamer/hook.ts | 21 ++++---- 2 files changed, 64 insertions(+), 10 deletions(-) diff --git a/src/hooks/team-session-streamer/hook.test.ts b/src/hooks/team-session-streamer/hook.test.ts index cfb4f6a9f..d1994e154 100644 --- a/src/hooks/team-session-streamer/hook.test.ts +++ b/src/hooks/team-session-streamer/hook.test.ts @@ -258,4 +258,57 @@ describe("createTeamSessionStreamer", () => { expect(writeTeamSessionFifoMock).toHaveBeenCalledTimes(1) expect(writeTeamSessionFifoMock).toHaveBeenCalledWith("/tmp/omo-team/11111111-1111-4111-8111-111111111111/member-a.fifo", "orphan full") }) + + test("does not lose text when runtime state mapping is not yet available (pre-mapping race)", async () => { + // given + let mappingReady = false + const listActiveTeams = mock(async () => mappingReady ? [{ + teamRunId: "11111111-1111-4111-8111-111111111111", + teamName: "team-alpha", + status: "active" as const, + memberCount: 1, + scope: "project" as const, + }] : []) + const loadRuntimeState = mock(async () => createRuntimeState()) + const config = TeamModeConfigSchema.parse({ enabled: true, tmux_visualization: true }) + const streamer = createTeamSessionStreamer(config, { listActiveTeams, loadRuntimeState }) + + // when + await streamer.event({ + event: { + type: "message.part.updated", + properties: { + part: { + id: "part-race", + sessionID: "member-session", + messageID: "message-z", + type: "text", + text: "early", + }, + }, + }, + }) + expect(writeTeamSessionFifoMock).not.toHaveBeenCalled() + + mappingReady = true + + await streamer.event({ + event: { + type: "message.part.updated", + properties: { + part: { + id: "part-race", + sessionID: "member-session", + messageID: "message-z", + type: "text", + text: "early late", + }, + }, + }, + }) + + // then + expect(writeTeamSessionFifoMock).toHaveBeenCalledTimes(1) + expect(writeTeamSessionFifoMock).toHaveBeenCalledWith("/tmp/omo-team/11111111-1111-4111-8111-111111111111/member-a.fifo", "early late") + }) }) diff --git a/src/hooks/team-session-streamer/hook.ts b/src/hooks/team-session-streamer/hook.ts index cef046c66..436bb0e51 100644 --- a/src/hooks/team-session-streamer/hook.ts +++ b/src/hooks/team-session-streamer/hook.ts @@ -40,7 +40,7 @@ function extractCumulativeText(part: Part): string | undefined { return undefined } -function extractUpdateSegment( +function consumeUpdateSegment( event: EventMessagePartUpdated, partTextByKey: Map, ): { sessionID: string; text: string } | undefined { @@ -59,7 +59,7 @@ function extractUpdateSegment( return { sessionID: part.sessionID, text: appendedText } } -function extractDeltaSegment( +function consumeDeltaSegment( event: MessagePartDeltaEvent, partTextByKey: Map, ): { sessionID: string; text: string } | undefined { @@ -117,10 +117,7 @@ export function createTeamSessionStreamer(config: TeamModeConfig, stateStore: Te return undefined } - async function writeSegment(sessionID: string, text: string): Promise { - const target = await resolveStreamTarget(sessionID) - if (!target) return - + async function writeSegment(target: TeamSessionStreamTarget, sessionID: string, text: string): Promise { try { await writeTeamSessionFifo(target.fifoPath, text) } catch (error) { @@ -168,17 +165,21 @@ export function createTeamSessionStreamer(config: TeamModeConfig, stateStore: Te } if (event.type === "message.part.delta") { - const segment = extractDeltaSegment(event, partTextByKey) + const target = await resolveStreamTarget(event.properties.sessionID) + if (!target) return + const segment = consumeDeltaSegment(event, partTextByKey) if (!segment) return - await writeSegment(segment.sessionID, segment.text) + await writeSegment(target, segment.sessionID, segment.text) return } if (event.type !== "message.part.updated") return - const segment = extractUpdateSegment(event, partTextByKey) + const target = await resolveStreamTarget(event.properties.part.sessionID) + if (!target) return + const segment = consumeUpdateSegment(event, partTextByKey) if (!segment) return - await writeSegment(segment.sessionID, segment.text) + await writeSegment(target, segment.sessionID, segment.text) }, } }