diff --git a/src/hooks/team-session-streamer/hook.test.ts b/src/hooks/team-session-streamer/hook.test.ts index cfb4f6a9f..d1994e154 100644 --- a/src/hooks/team-session-streamer/hook.test.ts +++ b/src/hooks/team-session-streamer/hook.test.ts @@ -258,4 +258,57 @@ describe("createTeamSessionStreamer", () => { expect(writeTeamSessionFifoMock).toHaveBeenCalledTimes(1) expect(writeTeamSessionFifoMock).toHaveBeenCalledWith("/tmp/omo-team/11111111-1111-4111-8111-111111111111/member-a.fifo", "orphan full") }) + + test("does not lose text when runtime state mapping is not yet available (pre-mapping race)", async () => { + // given + let mappingReady = false + const listActiveTeams = mock(async () => mappingReady ? [{ + teamRunId: "11111111-1111-4111-8111-111111111111", + teamName: "team-alpha", + status: "active" as const, + memberCount: 1, + scope: "project" as const, + }] : []) + const loadRuntimeState = mock(async () => createRuntimeState()) + const config = TeamModeConfigSchema.parse({ enabled: true, tmux_visualization: true }) + const streamer = createTeamSessionStreamer(config, { listActiveTeams, loadRuntimeState }) + + // when + await streamer.event({ + event: { + type: "message.part.updated", + properties: { + part: { + id: "part-race", + sessionID: "member-session", + messageID: "message-z", + type: "text", + text: "early", + }, + }, + }, + }) + expect(writeTeamSessionFifoMock).not.toHaveBeenCalled() + + mappingReady = true + + await streamer.event({ + event: { + type: "message.part.updated", + properties: { + part: { + id: "part-race", + sessionID: "member-session", + messageID: "message-z", + type: "text", + text: "early late", + }, + }, + }, + }) + + // then + expect(writeTeamSessionFifoMock).toHaveBeenCalledTimes(1) + expect(writeTeamSessionFifoMock).toHaveBeenCalledWith("/tmp/omo-team/11111111-1111-4111-8111-111111111111/member-a.fifo", "early late") + }) }) diff --git a/src/hooks/team-session-streamer/hook.ts b/src/hooks/team-session-streamer/hook.ts index cef046c66..436bb0e51 100644 --- a/src/hooks/team-session-streamer/hook.ts +++ b/src/hooks/team-session-streamer/hook.ts @@ -40,7 +40,7 @@ function extractCumulativeText(part: Part): string | undefined { return undefined } -function extractUpdateSegment( +function consumeUpdateSegment( event: EventMessagePartUpdated, partTextByKey: Map, ): { sessionID: string; text: string } | undefined { @@ -59,7 +59,7 @@ function extractUpdateSegment( return { sessionID: part.sessionID, text: appendedText } } -function extractDeltaSegment( +function consumeDeltaSegment( event: MessagePartDeltaEvent, partTextByKey: Map, ): { sessionID: string; text: string } | undefined { @@ -117,10 +117,7 @@ export function createTeamSessionStreamer(config: TeamModeConfig, stateStore: Te return undefined } - async function writeSegment(sessionID: string, text: string): Promise { - const target = await resolveStreamTarget(sessionID) - if (!target) return - + async function writeSegment(target: TeamSessionStreamTarget, sessionID: string, text: string): Promise { try { await writeTeamSessionFifo(target.fifoPath, text) } catch (error) { @@ -168,17 +165,21 @@ export function createTeamSessionStreamer(config: TeamModeConfig, stateStore: Te } if (event.type === "message.part.delta") { - const segment = extractDeltaSegment(event, partTextByKey) + const target = await resolveStreamTarget(event.properties.sessionID) + if (!target) return + const segment = consumeDeltaSegment(event, partTextByKey) if (!segment) return - await writeSegment(segment.sessionID, segment.text) + await writeSegment(target, segment.sessionID, segment.text) return } if (event.type !== "message.part.updated") return - const segment = extractUpdateSegment(event, partTextByKey) + const target = await resolveStreamTarget(event.properties.part.sessionID) + if (!target) return + const segment = consumeUpdateSegment(event, partTextByKey) if (!segment) return - await writeSegment(segment.sessionID, segment.text) + await writeSegment(target, segment.sessionID, segment.text) }, } }