diff --git a/src/features/tmux-subagent/manager.test.ts b/src/features/tmux-subagent/manager.test.ts index e6523f592..1efe999e3 100644 --- a/src/features/tmux-subagent/manager.test.ts +++ b/src/features/tmux-subagent/manager.test.ts @@ -17,6 +17,11 @@ type SpawnTmuxContainerResult = { paneId?: string } +type SessionReadyWaitParams = { + client: unknown + sessionId: string +} + const mockQueryWindowState = mock<(paneId: string) => Promise>( async () => ({ windowWidth: 212, @@ -38,6 +43,13 @@ const mockExecuteAction = mock<( action: PaneAction, ctx: ExecuteContext ) => Promise>(async () => ({ success: true })) +const mockSpawnTmuxPane = mock(async (_sessionId?: string) => ({ + success: true, + paneId: '%mock', +})) +const mockWaitForSessionReady = mock<( + params: SessionReadyWaitParams, +) => Promise>(async () => true) const mockSpawnTmuxWindow = mock<( sessionId: string, description: string, @@ -88,6 +100,10 @@ mock.module('./action-executor', () => ({ executeActionWithDeps: mockExecuteAction, })) +mock.module('./session-ready-waiter', () => ({ + waitForSessionReady: mockWaitForSessionReady, +})) + mock.module('../../shared/tmux', () => { const { isInsideTmux, getCurrentPaneId } = require('../../shared/tmux/tmux-utils') const { POLL_INTERVAL_BACKGROUND_MS, SESSION_TIMEOUT_MS, SESSION_MISSING_GRACE_MS } = require('../../shared/tmux/constants') @@ -161,6 +177,28 @@ function createWindowState(overrides?: Partial): WindowState { } } +function createDeferred() { + let resolvePromise!: (value: TValue | PromiseLike) => void + let rejectPromise!: (reason?: unknown) => void + + const promise = new Promise((resolve, reject) => { + resolvePromise = resolve + rejectPromise = reject + }) + + return { + promise, + resolve: resolvePromise, + reject: rejectPromise, + } +} + +async function flushMicrotasks(turns: number = 5): Promise { + for (let index = 0; index < turns; index += 1) { + await Promise.resolve() + } +} + function createTmuxConfig(overrides?: Partial): TmuxConfig { return { enabled: true, @@ -183,6 +221,8 @@ describe('TmuxSessionManager', () => { mockPaneExists.mockClear() mockExecuteActions.mockClear() mockExecuteAction.mockClear() + mockSpawnTmuxPane.mockClear() + mockWaitForSessionReady.mockClear() mockSpawnTmuxWindow.mockClear() mockSpawnTmuxSession.mockClear() mockIsInsideTmux.mockClear() @@ -190,16 +230,32 @@ describe('TmuxSessionManager', () => { trackedSessions.clear() mockQueryWindowState.mockImplementation(async () => createWindowState()) - mockExecuteActions.mockImplementation(async (actions: PaneAction[]) => { for (const action of actions) { - if (action.type === 'spawn') { - trackedSessions.add(action.sessionId) + mockExecuteActions.mockImplementation(async (actions: PaneAction[]) => { + const results: ExecuteActionsResult['results'] = [] + let spawnedPaneId: string | undefined + + for (const action of actions) { + if (action.type === 'spawn') { + const spawnResult = await mockSpawnTmuxPane(action.sessionId) + if (!spawnResult.success) { + return { + success: false, + results: [{ action, result: { success: false, error: 'spawn failed' } }], + } + } + trackedSessions.add(action.sessionId) + spawnedPaneId = spawnResult.paneId + results.push({ action, result: { success: true, paneId: spawnResult.paneId } }) + } } - } - return { - success: true, - spawnedPaneId: '%mock', - results: [], - } }) + + return { + success: true, + spawnedPaneId: spawnedPaneId ?? '%mock', + results, + } + }) + mockWaitForSessionReady.mockImplementation(async () => true) mockSpawnTmuxWindow.mockImplementation(async (sessionId: string) => { trackedSessions.add(sessionId) return { @@ -1106,55 +1162,112 @@ describe('TmuxSessionManager', () => { }) }) - test('#given session.status never reports session ready #when onSessionCreated runs #then pane is tracked immediately without blocking', async () => { + test('#given session readiness is pending #when onSessionCreated runs #then pane spawn waits until readiness resolves', async () => { // given mockIsInsideTmux.mockReturnValue(true) mockQueryWindowState.mockImplementation(async () => createWindowState()) + const readiness = createDeferred() + mockWaitForSessionReady.mockImplementationOnce(async () => readiness.promise) const { TmuxSessionManager } = await import('./manager') - const ctx = createMockContext({ sessionStatusResult: { data: {} } }) + const ctx = createMockContext() const config = createTmuxConfig({ enabled: true }) const manager = new TmuxSessionManager(ctx, config, mockTmuxDeps) - const event = createSessionCreatedEvent('ses_fast_track', 'ses_parent', 'Fast Track') + const event = createSessionCreatedEvent('ses_wait', 'ses_parent', 'Wait For Ready') // when - const start = Date.now() - await manager.onSessionCreated(event) - const elapsed = Date.now() - start + const onSessionCreatedPromise = manager.onSessionCreated(event) + await flushMicrotasks() // then - expect(elapsed < 500).toBe(true) - expect(getTrackedSessions(manager).has('ses_fast_track')).toBe(true) + expect(mockWaitForSessionReady).toHaveBeenCalledTimes(1) + expect(mockExecuteActions).toHaveBeenCalledTimes(0) + expect(mockSpawnTmuxPane).toHaveBeenCalledTimes(0) + + // when + readiness.resolve(true) + await onSessionCreatedPromise + + // then + expect(mockExecuteActions).toHaveBeenCalledTimes(1) + expect(mockSpawnTmuxPane).toHaveBeenCalledTimes(1) + expect(getTrackedSessions(manager).has('ses_wait')).toBe(true) + }) + + test('#given readiness probe fails #when onSessionCreated runs #then it logs the structured error and does not spawn a pane', async () => { + // given + mockIsInsideTmux.mockReturnValue(true) + const readinessError = new Error('session readiness timed out') + mockWaitForSessionReady.mockImplementationOnce(async () => { + throw readinessError + }) + const logSpy = spyOn(sharedModule, 'log').mockImplementation(() => {}) + + const { TmuxSessionManager } = await import('./manager') + const manager = new TmuxSessionManager(createMockContext(), createTmuxConfig({ enabled: true }), mockTmuxDeps) + + // when + await manager.onSessionCreated( + createSessionCreatedEvent('ses_timeout', 'ses_parent', 'Timeout Task') + ) + + // then + expect(mockExecuteActions).toHaveBeenCalledTimes(0) + expect(mockSpawnTmuxPane).toHaveBeenCalledTimes(0) + expect(logSpy).toHaveBeenCalledWith( + '[tmux-session-manager] session readiness failed before spawn', + expect.objectContaining({ + sessionId: 'ses_timeout', + stage: 'session.created', + error: String(readinessError), + }), + ) + + logSpy.mockRestore() + }) + + test('#given duplicate session.created triggers while readiness is pending #when readiness resolves #then only one pane spawn runs', async () => { + // given + mockIsInsideTmux.mockReturnValue(true) + const readiness = createDeferred() + mockWaitForSessionReady.mockImplementationOnce(async () => readiness.promise) + + const { TmuxSessionManager } = await import('./manager') + const manager = new TmuxSessionManager(createMockContext(), createTmuxConfig({ enabled: true }), mockTmuxDeps) + const event = createSessionCreatedEvent('ses_dup_pending', 'ses_parent', 'Duplicate Pending') + + // when + const firstSpawnPromise = manager.onSessionCreated(event) + const secondSpawnPromise = manager.onSessionCreated(event) + await flushMicrotasks() + + // then + expect(mockWaitForSessionReady).toHaveBeenCalledTimes(1) + expect(mockExecuteActions).toHaveBeenCalledTimes(0) + expect(mockSpawnTmuxPane).toHaveBeenCalledTimes(0) + + // when + readiness.resolve(true) + await Promise.all([firstSpawnPromise, secondSpawnPromise]) + + // then + expect(mockWaitForSessionReady).toHaveBeenCalledTimes(1) + expect(mockExecuteActions).toHaveBeenCalledTimes(1) + expect(mockSpawnTmuxPane).toHaveBeenCalledTimes(1) + expect(getTrackedSessions(manager).has('ses_dup_pending')).toBe(true) }) }) describe('onSessionDeleted', () => { - test('does not track session when readiness timed out', async () => { + test('does nothing when session creation stopped before tracking due to readiness failure', async () => { // given mockIsInsideTmux.mockReturnValue(true) - let stateCallCount = 0 - mockQueryWindowState.mockImplementation(async () => { - stateCallCount++ - if (stateCallCount === 1) { - return createWindowState() - } - return createWindowState({ - agentPanes: [ - { - paneId: '%mock', - width: 40, - height: 44, - left: 100, - top: 0, - title: 'omo-subagent-Timeout Task', - isActive: false, - }, - ], - }) + mockWaitForSessionReady.mockImplementationOnce(async () => { + throw new Error('readiness failed') }) const { TmuxSessionManager } = await import('./manager') - const ctx = createMockContext({ sessionStatusResult: { data: {} } }) + const ctx = createMockContext() const config = createTmuxConfig({ enabled: true, layout: 'main-vertical', main_pane_size: 60, @@ -1171,7 +1284,7 @@ describe('TmuxSessionManager', () => { await manager.onSessionDeleted({ sessionID: 'ses_timeout' }) // then - expect(mockExecuteAction).toHaveBeenCalledTimes(1) + expect(mockExecuteAction).toHaveBeenCalledTimes(0) }) test('closes pane when tracked session is deleted', async () => { @@ -2033,7 +2146,8 @@ describe('TmuxSessionManager', () => { const cleanupPromise = manager.cleanup() // then - await expect(cleanupPromise).resolves.toBeUndefined() + const cleanupResult = await cleanupPromise + expect(cleanupResult).toBeUndefined() expect(mockKillTmuxSessionIfExists).toHaveBeenCalledTimes(1) }) }) diff --git a/src/features/tmux-subagent/manager.ts b/src/features/tmux-subagent/manager.ts index 965488053..260103037 100644 --- a/src/features/tmux-subagent/manager.ts +++ b/src/features/tmux-subagent/manager.ts @@ -1,13 +1,11 @@ import type { PluginInput } from "@opencode-ai/plugin" import type { TmuxConfig } from "../../config/schema" import type { TrackedSession, CapacityConfig, WindowState } from "./types" -import { log, normalizeSDKResponse } from "../../shared" +import { log } from "../../shared" import { isInsideTmux as defaultIsInsideTmux, getCurrentPaneId as defaultGetCurrentPaneId, POLL_INTERVAL_BACKGROUND_MS, - SESSION_READY_POLL_INTERVAL_MS, - SESSION_READY_TIMEOUT_MS, spawnTmuxWindow, spawnTmuxSession, killTmuxSessionIfExists, @@ -19,6 +17,7 @@ import { decideSpawnActions, decideCloseAction, type SessionMapping } from "./de import { executeActions, executeAction } from "./action-executor" import { TmuxPollingManager } from "./polling-manager" import { createTrackedSession, markTrackedSessionClosePending } from "./tracked-session-state" +import { waitForSessionReady } from "./session-ready-waiter" type OpencodeClient = PluginInput["client"] interface SessionCreatedEvent { @@ -490,6 +489,51 @@ export class TmuxSessionManager { log("[tmux-session-manager] deferred attach polling stopped") } + private beginPendingSession(sessionId: string): boolean { + if ( + this.sessions.has(sessionId) + || this.pendingSessions.has(sessionId) + || this.deferredSessions.has(sessionId) + ) { + log("[tmux-session-manager] session already tracked or pending", { sessionId }) + return false + } + + this.pendingSessions.add(sessionId) + return true + } + + private async ensureSessionReadyBeforeSpawn( + sessionId: string, + stage: "session.created" | "deferred.attach" | "deferred.isolated-container", + ): Promise { + try { + const ready = await waitForSessionReady({ + client: this.client, + sessionId, + }) + + if (ready) { + return true + } + + const readinessError = new Error("Session readiness timed out") + log("[tmux-session-manager] session readiness failed before spawn", { + sessionId, + stage, + error: String(readinessError), + }) + return false + } catch (error) { + log("[tmux-session-manager] session readiness failed before spawn", { + sessionId, + stage, + error: String(error), + }) + return false + } + } + private async tryAttachDeferredSession(): Promise { const sessionId = this.deferredQueue[0] if (!sessionId) { @@ -519,6 +563,15 @@ export class TmuxSessionManager { } if (deferred.retryIsolatedContainer) { + const readyForIsolatedContainer = await this.ensureSessionReadyBeforeSpawn( + sessionId, + "deferred.isolated-container", + ) + if (!readyForIsolatedContainer) { + this.removeDeferredSession(sessionId) + return + } + const isolatedPaneId = await this.spawnInIsolatedContainer(sessionId, deferred.title) if (isolatedPaneId) { this.sessions.set( @@ -535,7 +588,6 @@ export class TmuxSessionManager { sessionId, paneId: isolatedPaneId, }) - this.logSessionReadinessInBackground(sessionId) return } } @@ -575,6 +627,15 @@ export class TmuxSessionManager { return } + const readyForDeferredAttach = await this.ensureSessionReadyBeforeSpawn( + sessionId, + "deferred.attach", + ) + if (!readyForDeferredAttach) { + this.removeDeferredSession(sessionId) + return + } + const result = await executeActions(decision.actions, { config: this.tmuxConfig, serverUrl: this.serverUrl, @@ -608,46 +669,6 @@ export class TmuxSessionManager { sessionId, paneId: result.spawnedPaneId, }) - this.logSessionReadinessInBackground(sessionId) - } - - private logSessionReadinessInBackground(sessionId: string): void { - void this.waitForSessionReady(sessionId).catch((error) => { - log("[tmux-session-manager] background readiness probe failed", { - sessionId, - error: String(error), - }) - }) - } - - private async waitForSessionReady(sessionId: string): Promise { - const startTime = Date.now() - - while (Date.now() - startTime < SESSION_READY_TIMEOUT_MS) { - try { - const statusResult = await this.client.session.status({ path: undefined }) - const allStatuses = normalizeSDKResponse(statusResult, {} as Record) - - if (allStatuses[sessionId]) { - log("[tmux-session-manager] session ready", { - sessionId, - status: allStatuses[sessionId].type, - waitedMs: Date.now() - startTime, - }) - return true - } - } catch (err) { - log("[tmux-session-manager] session status check error", { error: String(err) }) - } - - await new Promise((resolve) => setTimeout(resolve, SESSION_READY_POLL_INTERVAL_MS)) - } - - log("[tmux-session-manager] session ready timeout", { - sessionId, - timeoutMs: SESSION_READY_TIMEOUT_MS, - }) - return false } async onSessionCreated(event: SessionCreatedEvent): Promise { @@ -675,156 +696,156 @@ export class TmuxSessionManager { return } - await this.sweepStaleIsolatedSessionsOnce() - await this.retryPendingCloses() - - if ( - this.sessions.has(sessionId) || - this.pendingSessions.has(sessionId) || - this.deferredSessions.has(sessionId) - ) { - log("[tmux-session-manager] session already tracked or pending", { sessionId }) + if (!this.beginPendingSession(sessionId)) { return } - this.pendingSessions.add(sessionId) + try { + await this.sweepStaleIsolatedSessionsOnce() + await this.retryPendingCloses() - await this.enqueueSpawn(async () => { - try { - const isolatedPaneId = await this.spawnInIsolatedContainer(sessionId, title) - if (isolatedPaneId) { - this.sessions.set( - sessionId, - createTrackedSession({ sessionId, paneId: isolatedPaneId, description: title }), - ) - this.pollingManager.startPolling() - log("[tmux-session-manager] first subagent spawned in isolated window", { - sessionId, - paneId: isolatedPaneId, + await this.enqueueSpawn(async () => { + try { + const readyForSpawn = await this.ensureSessionReadyBeforeSpawn(sessionId, "session.created") + if (!readyForSpawn) { + return + } + + const isolatedPaneId = await this.spawnInIsolatedContainer(sessionId, title) + if (isolatedPaneId) { + this.sessions.set( + sessionId, + createTrackedSession({ sessionId, paneId: isolatedPaneId, description: title }), + ) + this.pollingManager.startPolling() + log("[tmux-session-manager] first subagent spawned in isolated window", { + sessionId, + paneId: isolatedPaneId, + }) + return + } + + if (this.isIsolated() && !this.isolatedWindowPaneId) { + log("[tmux-session-manager] isolated container failed, deferring session for retry", { sessionId }) + this.enqueueDeferredSession(sessionId, title, true) + return + } + const sourcePaneId = this.getEffectiveSourcePaneId() + if (!sourcePaneId) { + log("[tmux-session-manager] no effective source pane id") + return + } + + const state = await queryWindowState(sourcePaneId) + if (!state) { + log("[tmux-session-manager] failed to query window state, deferring session") + this.enqueueDeferredSession(sessionId, title) + return + } + + log("[tmux-session-manager] window state queried", { + windowWidth: state.windowWidth, + mainPane: state.mainPane?.paneId, + agentPaneCount: state.agentPanes.length, + agentPanes: state.agentPanes.map((p) => p.paneId), }) - this.logSessionReadinessInBackground(sessionId) - return - } - if (this.isIsolated() && !this.isolatedWindowPaneId) { - log("[tmux-session-manager] isolated container failed, deferring session for retry", { sessionId }) - this.enqueueDeferredSession(sessionId, title, true) - return - } - const sourcePaneId = this.getEffectiveSourcePaneId() - if (!sourcePaneId) { - log("[tmux-session-manager] no effective source pane id") - return - } - - const state = await queryWindowState(sourcePaneId) - if (!state) { - log("[tmux-session-manager] failed to query window state, deferring session") - this.enqueueDeferredSession(sessionId, title) - return - } - - log("[tmux-session-manager] window state queried", { - windowWidth: state.windowWidth, - mainPane: state.mainPane?.paneId, - agentPaneCount: state.agentPanes.length, - agentPanes: state.agentPanes.map((p) => p.paneId), - }) - - const decision = decideSpawnActions( - state, - sessionId, - title, - this.getCapacityConfig(), - this.getSessionMappings() - ) - - log("[tmux-session-manager] spawn decision", { - canSpawn: decision.canSpawn, - reason: decision.reason, - actionCount: decision.actions.length, - actions: decision.actions.map((a) => { - if (a.type === "close") return { type: "close", paneId: a.paneId } - if (a.type === "replace") return { type: "replace", paneId: a.paneId, newSessionId: a.newSessionId } - return { type: "spawn", sessionId: a.sessionId } - }), - }) - - if (!decision.canSpawn) { - log("[tmux-session-manager] cannot spawn", { reason: decision.reason }) - this.enqueueDeferredSession(sessionId, title) - return - } - - const result = await executeActions( - decision.actions, - { - config: this.tmuxConfig, - serverUrl: this.serverUrl, - windowState: state, - sourcePaneId, - } - ) - - for (const { action, result: actionResult } of result.results) { - if (action.type === "close" && actionResult.success) { - this.sessions.delete(action.sessionId) - log("[tmux-session-manager] removed closed session from cache", { - sessionId: action.sessionId, - }) - } - if (action.type === "replace" && actionResult.success) { - this.sessions.delete(action.oldSessionId) - log("[tmux-session-manager] removed replaced session from cache", { - oldSessionId: action.oldSessionId, - newSessionId: action.newSessionId, - }) - } - } - - if (result.success && result.spawnedPaneId) { - this.sessions.set( + const decision = decideSpawnActions( + state, sessionId, - createTrackedSession({ + title, + this.getCapacityConfig(), + this.getSessionMappings() + ) + + log("[tmux-session-manager] spawn decision", { + canSpawn: decision.canSpawn, + reason: decision.reason, + actionCount: decision.actions.length, + actions: decision.actions.map((a) => { + if (a.type === "close") return { type: "close", paneId: a.paneId } + if (a.type === "replace") return { type: "replace", paneId: a.paneId, newSessionId: a.newSessionId } + return { type: "spawn", sessionId: a.sessionId } + }), + }) + + if (!decision.canSpawn) { + log("[tmux-session-manager] cannot spawn", { reason: decision.reason }) + this.enqueueDeferredSession(sessionId, title) + return + } + + const result = await executeActions( + decision.actions, + { + config: this.tmuxConfig, + serverUrl: this.serverUrl, + windowState: state, + sourcePaneId, + } + ) + + for (const { action, result: actionResult } of result.results) { + if (action.type === "close" && actionResult.success) { + this.sessions.delete(action.sessionId) + log("[tmux-session-manager] removed closed session from cache", { + sessionId: action.sessionId, + }) + } + if (action.type === "replace" && actionResult.success) { + this.sessions.delete(action.oldSessionId) + log("[tmux-session-manager] removed replaced session from cache", { + oldSessionId: action.oldSessionId, + newSessionId: action.newSessionId, + }) + } + } + + if (result.success && result.spawnedPaneId) { + this.sessions.set( + sessionId, + createTrackedSession({ + sessionId, + paneId: result.spawnedPaneId, + description: title, + }), + ) + log("[tmux-session-manager] pane spawned and tracked", { sessionId, paneId: result.spawnedPaneId, - description: title, - }), - ) - log("[tmux-session-manager] pane spawned and tracked", { - sessionId, - paneId: result.spawnedPaneId, - }) - this.pollingManager.startPolling() - this.logSessionReadinessInBackground(sessionId) - } else { - log("[tmux-session-manager] spawn failed", { - success: result.success, - results: result.results.map((r) => ({ - type: r.action.type, - success: r.result.success, - error: r.result.error, - })), - }) + }) + this.pollingManager.startPolling() + } else { + log("[tmux-session-manager] spawn failed", { + success: result.success, + results: result.results.map((r) => ({ + type: r.action.type, + success: r.result.success, + error: r.result.error, + })), + }) - log("[tmux-session-manager] re-queueing deferred session after spawn failure", { - sessionId, - }) - this.enqueueDeferredSession(sessionId, title) + log("[tmux-session-manager] re-queueing deferred session after spawn failure", { + sessionId, + }) + this.enqueueDeferredSession(sessionId, title) - if (result.spawnedPaneId) { - await executeAction( - { type: "close", paneId: result.spawnedPaneId, sessionId }, - { config: this.tmuxConfig, serverUrl: this.serverUrl, windowState: state } - ) + if (result.spawnedPaneId) { + await executeAction( + { type: "close", paneId: result.spawnedPaneId, sessionId }, + { config: this.tmuxConfig, serverUrl: this.serverUrl, windowState: state } + ) + } + + return } - - return + } finally { + this.pendingSessions.delete(sessionId) } - } finally { - this.pendingSessions.delete(sessionId) - } - }) + }) + } finally { + this.pendingSessions.delete(sessionId) + } } private async enqueueSpawn(run: () => Promise): Promise {