From a1759fccb84124e24b0244933455c11baaa2e447 Mon Sep 17 00:00:00 2001 From: "yunlu.wen" Date: Thu, 16 Apr 2026 17:41:21 +0800 Subject: [PATCH] feat: replayable message --- api/controllers/web/workflow_events.py | 4 +- .../app/apps/message_based_app_generator.py | 4 +- api/core/app/apps/message_generator.py | 4 +- api/core/app/apps/streaming_utils.py | 3 +- api/fields/message_fields.py | 1 + api/libs/broadcast_channel/channel.py | 9 +- api/libs/broadcast_channel/redis/channel.py | 2 +- .../redis/sharded_channel.py | 2 +- .../redis/streams_channel.py | 13 +- api/services/app_generate_service.py | 3 +- .../workflow_event_snapshot_service.py | 3 +- docker/docker-compose.yaml | 19 +- .../__tests__/chat-wrapper.spec.tsx | 90 +++++ .../chat/chat-with-history/chat-wrapper.tsx | 53 +-- .../base/chat/chat-with-history/hooks.tsx | 7 +- .../base/chat/chat/__tests__/hooks.spec.tsx | 102 +++++- web/app/components/base/chat/chat/hooks.ts | 321 +++++++++++++++++- web/app/components/base/chat/chat/type.ts | 1 + .../__tests__/chat-wrapper.spec.tsx | 47 +++ .../chat/embedded-chatbot/chat-wrapper.tsx | 53 +-- .../base/chat/embedded-chatbot/hooks.tsx | 2 + .../__tests__/hooks/handle-resume.spec.ts | 2 +- .../workflow/panel/debug-and-preview/hooks.ts | 6 +- 23 files changed, 675 insertions(+), 76 deletions(-) diff --git a/api/controllers/web/workflow_events.py b/api/controllers/web/workflow_events.py index 474f9c0957..4da144d6b1 100644 --- a/api/controllers/web/workflow_events.py +++ b/api/controllers/web/workflow_events.py @@ -81,6 +81,7 @@ class WorkflowEventsApi(WebApiResource): raise InvalidArgumentError(f"cannot subscribe to workflow run, workflow_run_id={workflow_run.id}") include_state_snapshot = request.args.get("include_state_snapshot", "false").lower() == "true" + replay = request.args.get("replay", "false").lower() == "true" def _generate_stream_events(): if include_state_snapshot: @@ -91,10 +92,11 @@ class WorkflowEventsApi(WebApiResource): tenant_id=app_model.tenant_id, app_id=app_model.id, session_maker=session_maker, + replay=replay, ) ) return generator.convert_to_event_stream( - msg_generator.retrieve_events(app_mode, workflow_run.id), + msg_generator.retrieve_events(app_mode, workflow_run.id, replay=replay), ) event_generator = _generate_stream_events diff --git a/api/core/app/apps/message_based_app_generator.py b/api/core/app/apps/message_based_app_generator.py index fe61224ada..a19829e344 100644 --- a/api/core/app/apps/message_based_app_generator.py +++ b/api/core/app/apps/message_based_app_generator.py @@ -311,12 +311,14 @@ class MessageBasedAppGenerator(BaseAppGenerator): cls, app_mode: AppMode, workflow_run_id: str, - idle_timeout=300, + idle_timeout: float = 300, on_subscribe: Callable[[], None] | None = None, + replay: bool = False, ) -> Generator[Mapping | str, None, None]: topic = cls.get_response_topic(app_mode, workflow_run_id) return stream_topic_events( topic=topic, idle_timeout=idle_timeout, on_subscribe=on_subscribe, + replay=replay, ) diff --git a/api/core/app/apps/message_generator.py b/api/core/app/apps/message_generator.py index 68631bb230..155f966d4d 100644 --- a/api/core/app/apps/message_generator.py +++ b/api/core/app/apps/message_generator.py @@ -23,9 +23,10 @@ class MessageGenerator: cls, app_mode: AppMode, workflow_run_id: str, - idle_timeout=300, + idle_timeout: float = 300, ping_interval: float = 10.0, on_subscribe: Callable[[], None] | None = None, + replay: bool = False, ) -> Generator[Mapping | str, None, None]: topic = cls.get_response_topic(app_mode, workflow_run_id) return stream_topic_events( @@ -33,4 +34,5 @@ class MessageGenerator: idle_timeout=idle_timeout, ping_interval=ping_interval, on_subscribe=on_subscribe, + replay=replay, ) diff --git a/api/core/app/apps/streaming_utils.py b/api/core/app/apps/streaming_utils.py index af3441aca3..6b76f6e807 100644 --- a/api/core/app/apps/streaming_utils.py +++ b/api/core/app/apps/streaming_utils.py @@ -17,6 +17,7 @@ def stream_topic_events( ping_interval: float | None = None, on_subscribe: Callable[[], None] | None = None, terminal_events: Iterable[str | StreamEvent] | None = None, + replay: bool = False, ) -> Generator[Mapping[str, Any] | str, None, None]: # send a PING event immediately to prevent the connection staying in pending state for a long time. # @@ -27,7 +28,7 @@ def stream_topic_events( terminal_values = _normalize_terminal_events(terminal_events) last_msg_time = time.time() last_ping_time = last_msg_time - with topic.subscribe() as sub: + with topic.subscribe(replay=replay) as sub: # on_subscribe fires only after the Redis subscription is active. # This is used to gate task start and reduce pub/sub race for the first event. if on_subscribe is not None: diff --git a/api/fields/message_fields.py b/api/fields/message_fields.py index ca18f1c203..bf42e5b224 100644 --- a/api/fields/message_fields.py +++ b/api/fields/message_fields.py @@ -58,6 +58,7 @@ class MessageListItem(ResponseModel): message_files: list[MessageFile] status: str error: str | None = None + workflow_run_id: str | None = None extra_contents: list[ExecutionExtraContentDomainModel] @field_validator("inputs", mode="before") diff --git a/api/libs/broadcast_channel/channel.py b/api/libs/broadcast_channel/channel.py index 8eeac37232..ed37b9bb92 100644 --- a/api/libs/broadcast_channel/channel.py +++ b/api/libs/broadcast_channel/channel.py @@ -92,7 +92,14 @@ class Subscriber(Protocol): """ @abstractmethod - def subscribe(self) -> Subscription: + def subscribe(self, *, replay: bool = False) -> Subscription: + """Create a new subscription. + + :param replay: When True and the underlying transport supports message retention + (e.g. Redis Streams), the subscription replays all buffered messages from + the beginning of the stream before switching to live tail. Transports + without retention (plain Pub/Sub) silently ignore this flag. + """ pass diff --git a/api/libs/broadcast_channel/redis/channel.py b/api/libs/broadcast_channel/redis/channel.py index b76a23eb3c..af59e50a7d 100644 --- a/api/libs/broadcast_channel/redis/channel.py +++ b/api/libs/broadcast_channel/redis/channel.py @@ -44,7 +44,7 @@ class Topic: def as_subscriber(self) -> Subscriber: return self - def subscribe(self) -> Subscription: + def subscribe(self, *, replay: bool = False) -> Subscription: return _RedisSubscription( client=self._client, pubsub=self._client.pubsub(), diff --git a/api/libs/broadcast_channel/redis/sharded_channel.py b/api/libs/broadcast_channel/redis/sharded_channel.py index 919d8d622e..bfe0bc1b6f 100644 --- a/api/libs/broadcast_channel/redis/sharded_channel.py +++ b/api/libs/broadcast_channel/redis/sharded_channel.py @@ -42,7 +42,7 @@ class ShardedTopic: def as_subscriber(self) -> Subscriber: return self - def subscribe(self) -> Subscription: + def subscribe(self, *, replay: bool = False) -> Subscription: return _RedisShardedSubscription( client=self._client, pubsub=self._client.pubsub(), diff --git a/api/libs/broadcast_channel/redis/streams_channel.py b/api/libs/broadcast_channel/redis/streams_channel.py index 55ff6cd4f9..3a0561370c 100644 --- a/api/libs/broadcast_channel/redis/streams_channel.py +++ b/api/libs/broadcast_channel/redis/streams_channel.py @@ -54,16 +54,17 @@ class StreamsTopic: def as_subscriber(self) -> Subscriber: return self - def subscribe(self) -> Subscription: - return _StreamsSubscription(self._client, self._key) + def subscribe(self, *, replay: bool = False) -> Subscription: + return _StreamsSubscription(self._client, self._key, replay=replay) class _StreamsSubscription(Subscription): _SENTINEL = object() - def __init__(self, client: Redis | RedisCluster, key: str): + def __init__(self, client: Redis | RedisCluster, key: str, *, replay: bool = False): self._client = client self._key = key + self._replay = replay self._queue: queue.Queue[object] = queue.Queue() @@ -76,7 +77,6 @@ class _StreamsSubscription(Subscription): # reading and writing the _listener / `_closed` attribute. self._lock = threading.Lock() self._closed: bool = False - # self._closed = threading.Event() self._listener: threading.Thread | None = None def _listen(self) -> None: @@ -89,10 +89,9 @@ class _StreamsSubscription(Subscription): # since this method runs in a dedicated thread, acquiring `_lock` inside this method won't cause # deadlock. - # Setting initial last id to `$` to signal redis that we only want new messages. - # + # `"0"` replays all retained entries; `"$"` tails only new messages. # ref: https://redis.io/docs/latest/commands/xread/#the-special--id - last_id = "$" + last_id = "0" if self._replay else "$" try: while True: with self._lock: diff --git a/api/services/app_generate_service.py b/api/services/app_generate_service.py index 5e8c7aa337..178de3bb9f 100644 --- a/api/services/app_generate_service.py +++ b/api/services/app_generate_service.py @@ -416,6 +416,7 @@ class AppGenerateService: cls, app_model: App, workflow_run: WorkflowRun, + replay: bool = False, ): if workflow_run.status.is_ended(): # TODO(QuantumGhost): handled the ended scenario. @@ -424,5 +425,5 @@ class AppGenerateService: generator = AdvancedChatAppGenerator() return generator.convert_to_event_stream( - generator.retrieve_events(AppMode(app_model.mode), workflow_run.id), + generator.retrieve_events(AppMode(app_model.mode), workflow_run.id, replay=replay), ) diff --git a/api/services/workflow_event_snapshot_service.py b/api/services/workflow_event_snapshot_service.py index 5fca444723..b133118e8f 100644 --- a/api/services/workflow_event_snapshot_service.py +++ b/api/services/workflow_event_snapshot_service.py @@ -61,6 +61,7 @@ def build_workflow_event_stream( session_maker: sessionmaker[Session], idle_timeout: float = 300, ping_interval: float = 10.0, + replay: bool = False, ) -> Generator[Mapping[str, Any] | str, None, None]: topic = MessageGenerator.get_response_topic(app_mode, workflow_run.id) workflow_run_repo = DifyAPIRepositoryFactory.create_api_workflow_run_repository(session_maker) @@ -103,7 +104,7 @@ def build_workflow_event_stream( last_msg_time = time.time() last_ping_time = last_msg_time - with topic.subscribe() as sub: + with topic.subscribe(replay=replay) as sub: buffer_state = _start_buffering(sub) try: task_id = _resolve_task_id(resumption_context, buffer_state, workflow_run.id) diff --git a/docker/docker-compose.yaml b/docker/docker-compose.yaml index a10fdf77c6..4d3058067d 100644 --- a/docker/docker-compose.yaml +++ b/docker/docker-compose.yaml @@ -34,7 +34,6 @@ x-shared-env: &shared-api-worker-env OPENAI_API_BASE: ${OPENAI_API_BASE:-https://api.openai.com/v1} MIGRATION_ENABLED: ${MIGRATION_ENABLED:-true} FILES_ACCESS_TIMEOUT: ${FILES_ACCESS_TIMEOUT:-300} - ENABLE_COLLABORATION_MODE: ${ENABLE_COLLABORATION_MODE:-false} ACCESS_TOKEN_EXPIRE_MINUTES: ${ACCESS_TOKEN_EXPIRE_MINUTES:-60} REFRESH_TOKEN_EXPIRE_DAYS: ${REFRESH_TOKEN_EXPIRE_DAYS:-30} APP_DEFAULT_ACTIVE_REQUESTS: ${APP_DEFAULT_ACTIVE_REQUESTS:-0} @@ -120,7 +119,6 @@ x-shared-env: &shared-api-worker-env CONSOLE_CORS_ALLOW_ORIGINS: ${CONSOLE_CORS_ALLOW_ORIGINS:-*} COOKIE_DOMAIN: ${COOKIE_DOMAIN:-} NEXT_PUBLIC_COOKIE_DOMAIN: ${NEXT_PUBLIC_COOKIE_DOMAIN:-} - NEXT_PUBLIC_SOCKET_URL: ${NEXT_PUBLIC_SOCKET_URL:-ws://localhost} NEXT_PUBLIC_BATCH_CONCURRENCY: ${NEXT_PUBLIC_BATCH_CONCURRENCY:-5} STORAGE_TYPE: ${STORAGE_TYPE:-opendal} OPENDAL_SCHEME: ${OPENDAL_SCHEME:-fs} @@ -742,7 +740,9 @@ services: # API service api: - image: langgenius/dify-api:1.13.3 + build: + context: ../api + dockerfile: Dockerfile restart: always environment: # Use the shared environment variables. @@ -790,7 +790,9 @@ services: # worker service # The Celery worker for processing all queues (dataset, workflow, mail, etc.) worker: - image: langgenius/dify-api:1.13.3 + build: + context: ../api + dockerfile: Dockerfile restart: always environment: # Use the shared environment variables. @@ -836,7 +838,9 @@ services: # worker_beat service # Celery beat for scheduling periodic tasks. worker_beat: - image: langgenius/dify-api:1.13.3 + build: + context: ../api + dockerfile: Dockerfile restart: always environment: # Use the shared environment variables. @@ -873,14 +877,15 @@ services: # Frontend web application. web: - image: langgenius/dify-web:1.13.3 + build: + context: .. + dockerfile: web/Dockerfile restart: always environment: CONSOLE_API_URL: ${CONSOLE_API_URL:-} APP_API_URL: ${APP_API_URL:-} AMPLITUDE_API_KEY: ${AMPLITUDE_API_KEY:-} NEXT_PUBLIC_COOKIE_DOMAIN: ${NEXT_PUBLIC_COOKIE_DOMAIN:-} - NEXT_PUBLIC_SOCKET_URL: ${NEXT_PUBLIC_SOCKET_URL:-ws://localhost} SENTRY_DSN: ${WEB_SENTRY_DSN:-} NEXT_TELEMETRY_DISABLED: ${NEXT_TELEMETRY_DISABLED:-0} EXPERIMENTAL_ENABLE_VINEXT: ${EXPERIMENTAL_ENABLE_VINEXT:-false} diff --git a/web/app/components/base/chat/chat-with-history/__tests__/chat-wrapper.spec.tsx b/web/app/components/base/chat/chat-with-history/__tests__/chat-wrapper.spec.tsx index bd5f01bcda..e8b77bf5b4 100644 --- a/web/app/components/base/chat/chat-with-history/__tests__/chat-wrapper.spec.tsx +++ b/web/app/components/base/chat/chat-with-history/__tests__/chat-wrapper.spec.tsx @@ -125,6 +125,7 @@ const defaultChatHookReturn: Partial = { handleSend: vi.fn(), handleStop: vi.fn(), handleSwitchSibling: vi.fn(), + handleReconnect: vi.fn(), isResponding: false, suggestedQuestions: [], } @@ -605,6 +606,95 @@ describe('ChatWrapper', () => { expect(handleSwitchSibling).not.toHaveBeenCalled() }) + it('should reconnect to a recent running workflow on mount', () => { + const handleReconnect = vi.fn() + vi.mocked(useChat).mockReturnValue({ + ...defaultChatHookReturn, + chatList: [], + handleReconnect, + } as unknown as ChatHookReturn) + + vi.mocked(useChatWithHistoryContext).mockReturnValue({ + ...defaultContextValue, + appPrevChatTree: [{ + id: 'running-answer', + isAnswer: true, + content: 'partial content', + workflow_run_id: 'run-active', + created_at: Math.floor(Date.now() / 1000) - 30, + children: [], + } as unknown as ChatItemInTree], + }) + + render() + expect(handleReconnect).toHaveBeenCalledWith( + 'running-answer', + 'run-active', + expect.objectContaining({ isPublicAPI: true }), + ) + }) + + it('should not reconnect to an old workflow beyond the retention window', () => { + const handleReconnect = vi.fn() + vi.mocked(useChat).mockReturnValue({ + ...defaultChatHookReturn, + chatList: [], + handleReconnect, + } as unknown as ChatHookReturn) + + vi.mocked(useChatWithHistoryContext).mockReturnValue({ + ...defaultContextValue, + appPrevChatTree: [{ + id: 'old-answer', + isAnswer: true, + content: 'old content', + workflow_run_id: 'run-old', + created_at: Math.floor(Date.now() / 1000) - 700, + children: [], + } as unknown as ChatItemInTree], + }) + + render() + expect(handleReconnect).not.toHaveBeenCalled() + }) + + it('should prefer paused workflow over running workflow for reconnection', () => { + const handleSwitchSibling = vi.fn() + const handleReconnect = vi.fn() + vi.mocked(useChat).mockReturnValue({ + ...defaultChatHookReturn, + chatList: [], + handleSwitchSibling, + handleReconnect, + } as unknown as ChatHookReturn) + + vi.mocked(useChatWithHistoryContext).mockReturnValue({ + ...defaultContextValue, + appPrevChatTree: [ + { + id: 'running-answer', + isAnswer: true, + content: '', + workflow_run_id: 'run-active', + created_at: Math.floor(Date.now() / 1000) - 10, + children: [], + } as unknown as ChatItemInTree, + { + id: 'paused-answer', + isAnswer: true, + content: '', + workflow_run_id: 'run-paused', + humanInputFormDataList: [{ node_id: 'n-1' }], + children: [], + } as unknown as ChatItemInTree, + ], + }) + + render() + expect(handleSwitchSibling).toHaveBeenCalledWith('paused-answer', expect.any(Object)) + expect(handleReconnect).not.toHaveBeenCalled() + }) + it('should call stopChatMessageResponding when handleStop is triggered', () => { const handleStop = vi.fn() vi.mocked(useChat).mockReturnValue({ diff --git a/web/app/components/base/chat/chat-with-history/chat-wrapper.tsx b/web/app/components/base/chat/chat-with-history/chat-wrapper.tsx index 1b59618c8c..2e3d6b5101 100644 --- a/web/app/components/base/chat/chat-with-history/chat-wrapper.tsx +++ b/web/app/components/base/chat/chat-with-history/chat-wrapper.tsx @@ -79,6 +79,7 @@ const ChatWrapper = () => { handleSend, handleStop, handleSwitchSibling, + handleReconnect, isResponding: respondingState, suggestedQuestions, } = useChat( @@ -140,37 +141,47 @@ const ChatWrapper = () => { setIsResponding(respondingState) }, [respondingState, setIsResponding]) - // Resume paused workflows when chat history is loaded + // Resume paused workflows or reconnect to running workflows when chat history is loaded useEffect(() => { if (!appPrevChatTree || appPrevChatTree.length === 0) return - // Find the last answer item with workflow_run_id that needs resumption (DFS - find deepest first) - let lastPausedNode: ChatItemInTree | undefined - const findLastPausedWorkflow = (nodes: ChatItemInTree[]) => { - nodes.forEach((node) => { - // DFS: recurse to children first - if (node.children && node.children.length > 0) - findLastPausedWorkflow(node.children) + const STREAM_RETENTION_SECONDS = 600 - // Track the last node with humanInputFormDataList - if (node.isAnswer && node.workflow_run_id && node.humanInputFormDataList && node.humanInputFormDataList.length > 0) - lastPausedNode = node + let lastPausedNode: ChatItemInTree | undefined + let lastRunningNode: ChatItemInTree | undefined + const findReconnectableWorkflow = (nodes: ChatItemInTree[]) => { + nodes.forEach((node) => { + if (node.isAnswer && node.workflow_run_id) { + if (node.humanInputFormDataList && node.humanInputFormDataList.length > 0) { + lastPausedNode = node + } + else if ( + node.created_at + && (Date.now() / 1000 - node.created_at) < STREAM_RETENTION_SECONDS + ) { + lastRunningNode = node + } + } + + if (node.children && node.children.length > 0) + findReconnectableWorkflow(node.children) }) } - findLastPausedWorkflow(appPrevChatTree) + findReconnectableWorkflow(appPrevChatTree) + + const callbacks = { + onGetSuggestedQuestions: (responseItemId: string) => fetchSuggestedQuestions(responseItemId, appSourceType, appId), + onConversationComplete: currentConversationId ? undefined : handleNewConversationCompleted, + isPublicAPI: appSourceType === AppSourceType.webApp, + } - // Only resume the last paused workflow if (lastPausedNode) { - handleSwitchSibling( - lastPausedNode.id, - { - onGetSuggestedQuestions: responseItemId => fetchSuggestedQuestions(responseItemId, appSourceType, appId), - onConversationComplete: currentConversationId ? undefined : handleNewConversationCompleted, - isPublicAPI: appSourceType === AppSourceType.webApp, - }, - ) + handleSwitchSibling(lastPausedNode.id, callbacks) + } + else if (lastRunningNode) { + handleReconnect(lastRunningNode.id, lastRunningNode.workflow_run_id!, callbacks) } }, []) diff --git a/web/app/components/base/chat/chat-with-history/hooks.tsx b/web/app/components/base/chat/chat-with-history/hooks.tsx index e6f5657ff5..5ba62d7a0d 100644 --- a/web/app/components/base/chat/chat-with-history/hooks.tsx +++ b/web/app/components/base/chat/chat-with-history/hooks.tsx @@ -35,12 +35,12 @@ function getFormattedChatList(messages: any[]) { const answerFiles = item.message_files?.filter((file: any) => file.belongs_to === 'assistant') || [] const humanInputFormDataList: HumanInputFormData[] = [] const humanInputFilledFormDataList: HumanInputFilledFormData[] = [] - let workflowRunId = '' + let workflowRunIdFromExtra = '' if (item.status === 'paused') { item.extra_contents?.forEach((content: ExtraContent) => { if (content.type === 'human_input' && !content.submitted) { humanInputFormDataList.push(content.form_definition) - workflowRunId = content.workflow_run_id + workflowRunIdFromExtra = content.workflow_run_id } }) } @@ -62,7 +62,8 @@ function getFormattedChatList(messages: any[]) { parentMessageId: `question-${item.id}`, humanInputFormDataList, humanInputFilledFormDataList, - workflow_run_id: workflowRunId, + workflow_run_id: item.workflow_run_id || workflowRunIdFromExtra, + created_at: item.created_at, }) }) return newChatList diff --git a/web/app/components/base/chat/chat/__tests__/hooks.spec.tsx b/web/app/components/base/chat/chat/__tests__/hooks.spec.tsx index 89327341de..9894c65dab 100644 --- a/web/app/components/base/chat/chat/__tests__/hooks.spec.tsx +++ b/web/app/components/base/chat/chat/__tests__/hooks.spec.tsx @@ -932,7 +932,7 @@ describe('useChat', () => { }) expect(sseGet).toHaveBeenCalledWith( - '/workflow/wr-1/events?include_state_snapshot=true', + '/workflow/wr-1/events?include_state_snapshot=true&replay=true', expect.any(Object), expect.any(Object), ) @@ -1264,6 +1264,102 @@ describe('useChat', () => { }) }) + describe('handleReconnect', () => { + it('should call sseGet with include_state_snapshot and rebuild from snapshot events', () => { + let callbacks: HookCallbacks + + vi.mocked(sseGet).mockImplementation(async (_url, _params, options) => { + callbacks = options as HookCallbacks + }) + + const prevChatTree = [{ + id: 'q-1', + content: 'query', + isAnswer: false, + children: [{ + id: 'm-reconnect', + content: 'stale partial content', + isAnswer: true, + siblingIndex: 0, + workflowProcess: { status: 'running', tracing: [{ node_id: 'old-node' }] }, + }], + }] + + const { result } = renderHook(() => useChat(undefined, undefined, prevChatTree as ChatItemInTree[])) + + act(() => { + result.current.handleReconnect('m-reconnect', 'wr-reconnect', { isPublicAPI: true }) + }) + + expect(sseGet).toHaveBeenCalledWith( + '/workflow/wr-reconnect/events?include_state_snapshot=true', + expect.any(Object), + expect.any(Object), + ) + + // Content is not reset until onWorkflowStarted fires + const beforeStart = result.current.chatList[1] + expect(beforeStart.content).toBe('stale partial content') + + act(() => { + callbacks.onWorkflowStarted({ workflow_run_id: 'wr-reconnect', task_id: 't-1' }) + }) + + // After onWorkflowStarted, content is reset and workflowProcess is fresh + const afterStart = result.current.chatList[1] + expect(afterStart.content).toBe('') + expect(afterStart.workflowProcess).toEqual({ status: WorkflowRunningStatus.Running, tracing: [] }) + + act(() => { + callbacks.onMessageReplace({ answer: 'full snapshot text' }) + callbacks.onNodeStarted({ data: { node_id: 'n-1', id: 'n-1', title: 'Node 1' } }) + callbacks.onNodeFinished({ data: { node_id: 'n-1', id: 'n-1', title: 'Node 1', status: 'succeeded' } }) + callbacks.onWorkflowFinished({ data: { status: 'succeeded' } }) + callbacks.onCompleted() + }) + + const lastResponse = result.current.chatList[1] + expect(lastResponse.content).toBe('full snapshot text') + expect(lastResponse.workflowProcess?.status).toBe('succeeded') + expect(lastResponse.workflowProcess?.tracing).toHaveLength(1) + expect(result.current.isResponding).toBe(false) + }) + + it('should abort previous stream when reconnecting again', () => { + const callbacksList: HookCallbacks[] = [] + vi.mocked(sseGet).mockImplementation(async (_url, _params, options) => { + callbacksList.push(options as HookCallbacks) + }) + + const prevChatTree = [{ + id: 'q-1', + content: 'query', + isAnswer: false, + children: [{ + id: 'm-rc', + content: 'partial', + isAnswer: true, + siblingIndex: 0, + }], + }] + + const { result } = renderHook(() => useChat(undefined, undefined, prevChatTree as ChatItemInTree[])) + const previousAbort = createAbortControllerMock() + + act(() => { + result.current.handleReconnect('m-rc', 'wr-1', { isPublicAPI: true }) + }) + act(() => { + callbacksList[0].getAbortController(previousAbort) + }) + act(() => { + result.current.handleReconnect('m-rc', 'wr-2', { isPublicAPI: true }) + }) + + expect(previousAbort.abort).toHaveBeenCalledTimes(1) + }) + }) + describe('createAudioPlayerManager branch cases', () => { it('should handle ttsUrl generation for appId with installed apps', async () => { vi.mocked(usePathname).mockReturnValue('/explore/installed/app') @@ -1331,7 +1427,7 @@ describe('useChat', () => { }) expect(sseGet).toHaveBeenCalledWith( - '/workflow/wr-tts-app/events?include_state_snapshot=true', + '/workflow/wr-tts-app/events?include_state_snapshot=true&replay=true', expect.any(Object), expect.any(Object), ) @@ -1493,7 +1589,7 @@ describe('useChat', () => { // Should automatically call handleResume -> sseGet for human input expect(sseGet).toHaveBeenCalledWith( - '/workflow/wr-1/events?include_state_snapshot=true', + '/workflow/wr-1/events?include_state_snapshot=true&replay=true', expect.any(Object), expect.any(Object), ) diff --git a/web/app/components/base/chat/chat/hooks.ts b/web/app/components/base/chat/chat/hooks.ts index 30dd7e0db1..6af8703d85 100644 --- a/web/app/components/base/chat/chat/hooks.ts +++ b/web/app/components/base/chat/chat/hooks.ts @@ -244,8 +244,10 @@ export const useChat = ( }: SendCallback, ) => { const getOrCreatePlayer = createAudioPlayerManager() - // Re-subscribe to workflow events for the specific message - const url = `/workflow/${workflowRunId}/events?include_state_snapshot=true` + // Re-subscribe to workflow events for the specific message. + // replay=true tells the backend to read from the beginning of the Redis Stream + // so all retained events are replayed on reconnection (e.g. page refresh). + const url = `/workflow/${workflowRunId}/events?include_state_snapshot=true&replay=true` const otherOptions: IOtherOptions = { isPublicAPI, @@ -583,6 +585,320 @@ export const useChat = ( ) }, [updateChatTreeNode, handleResponding, createAudioPlayerManager, config?.suggested_questions_after_answer]) + const handleReconnect = useCallback(( + messageId: string, + workflowRunId: string, + { + onGetSuggestedQuestions, + onConversationComplete, + isPublicAPI, + }: SendCallback, + ) => { + const getOrCreatePlayer = createAudioPlayerManager() + const url = `/workflow/${workflowRunId}/events?include_state_snapshot=true&replay=true` + + const otherOptions: IOtherOptions = { + isPublicAPI, + getAbortController: (abortController) => { + workflowEventsAbortControllerRef.current = abortController + }, + onData: (message: string, isFirstMessage: boolean, { conversationId: newConversationId, messageId: msgId, taskId }: IOnDataMoreInfo) => { + updateChatTreeNode(messageId, (responseItem) => { + const isAgentMode = responseItem.agent_thoughts && responseItem.agent_thoughts.length > 0 + if (!isAgentMode) { + responseItem.content = responseItem.content + message + } + else { + const lastThought = responseItem.agent_thoughts?.[responseItem.agent_thoughts?.length - 1] + if (lastThought) + lastThought.thought = lastThought.thought + message + } + if (msgId) + responseItem.id = msgId + }) + + if (isFirstMessage && newConversationId) + conversationIdRef.current = newConversationId + + if (taskId) + taskIdRef.current = taskId + }, + async onCompleted(hasError?: boolean) { + handleResponding(false) + + if (hasError) + return + + if (onConversationComplete) + onConversationComplete(conversationIdRef.current) + + if (config?.suggested_questions_after_answer?.enabled && !hasStopRespondedRef.current && onGetSuggestedQuestions) { + try { + const { data }: any = await onGetSuggestedQuestions( + messageId, + newAbortController => suggestedQuestionsAbortControllerRef.current = newAbortController, + ) + setSuggestedQuestions(data) + } + catch { + setSuggestedQuestions([]) + } + } + }, + onFile(file) { + const fileType = (file as { type?: string }).type || 'image' + const baseFile = ('transferMethod' in file) ? (file as Partial) : null + const convertedFile: FileEntity = { + id: baseFile?.id || (file as { id: string }).id, + type: baseFile?.type || (fileType === 'image' ? 'image/png' : fileType === 'video' ? 'video/mp4' : fileType === 'audio' ? 'audio/mpeg' : 'application/octet-stream'), + transferMethod: (baseFile?.transferMethod as FileEntity['transferMethod']) || (fileType === 'image' ? 'remote_url' : 'local_file'), + uploadedId: baseFile?.uploadedId || (file as { id: string }).id, + supportFileType: baseFile?.supportFileType || (fileType === 'image' ? 'image' : fileType === 'video' ? 'video' : fileType === 'audio' ? 'audio' : 'document'), + progress: baseFile?.progress ?? 100, + name: baseFile?.name || `generated_${fileType}.${fileType === 'image' ? 'png' : fileType === 'video' ? 'mp4' : fileType === 'audio' ? 'mp3' : 'bin'}`, + url: baseFile?.url || (file as { url?: string }).url, + size: baseFile?.size ?? 0, + } + updateChatTreeNode(messageId, (responseItem) => { + const lastThought = responseItem.agent_thoughts?.[responseItem.agent_thoughts?.length - 1] + if (lastThought) { + responseItem.agent_thoughts!.at(-1)!.message_files = [...(lastThought as any).message_files, convertedFile] + } + else { + const currentFiles = (responseItem.message_files as FileEntity[] | undefined) ?? [] + responseItem.message_files = [...currentFiles, convertedFile] + } + }) + }, + onThought(thought) { + updateChatTreeNode(messageId, (responseItem) => { + if (thought.message_id) + responseItem.id = thought.message_id + if (thought.conversation_id) + responseItem.conversationId = thought.conversation_id + if (!responseItem.agent_thoughts) + responseItem.agent_thoughts = [] + if (responseItem.agent_thoughts.length === 0) { + responseItem.agent_thoughts.push(thought) + } + else { + const lastThought = responseItem.agent_thoughts.at(-1) + if (lastThought?.id === thought.id) { + thought.thought = lastThought.thought + thought.message_files = lastThought.message_files + responseItem.agent_thoughts[responseItem.agent_thoughts.length - 1] = thought + } + else { + responseItem.agent_thoughts.push(thought) + } + } + }) + }, + onMessageEnd: (messageEnd) => { + updateChatTreeNode(messageId, (responseItem) => { + if (messageEnd.metadata?.annotation_reply) { + responseItem.annotation = ({ + id: messageEnd.metadata.annotation_reply.id, + authorName: messageEnd.metadata.annotation_reply.account.name, + }) + return + } + responseItem.citation = messageEnd.metadata?.retriever_resources || [] + const processedFilesFromResponse = getProcessedFilesFromResponse(messageEnd.files || []) + responseItem.allFiles = uniqBy([...(responseItem.allFiles || []), ...(processedFilesFromResponse || [])], 'id') + }) + }, + onMessageReplace: (messageReplace) => { + updateChatTreeNode(messageId, (responseItem) => { + responseItem.content = messageReplace.answer + }) + }, + onError() { + handleResponding(false) + }, + onWorkflowStarted: ({ workflow_run_id, task_id }) => { + handleResponding(true) + hasStopRespondedRef.current = false + updateChatTreeNode(messageId, (responseItem) => { + taskIdRef.current = task_id + responseItem.content = '' + responseItem.workflow_run_id = workflow_run_id + responseItem.workflowProcess = { + status: WorkflowRunningStatus.Running, + tracing: [], + } + }) + }, + onWorkflowFinished: ({ data: workflowFinishedData }) => { + updateChatTreeNode(messageId, (responseItem) => { + if (responseItem.workflowProcess) + responseItem.workflowProcess.status = workflowFinishedData.status as WorkflowRunningStatus + }) + }, + onIterationStart: ({ data: iterationStartedData }) => { + updateChatTreeNode(messageId, (responseItem) => { + if (!responseItem.workflowProcess) + return + if (!responseItem.workflowProcess.tracing) + responseItem.workflowProcess.tracing = [] + responseItem.workflowProcess.tracing.push({ + ...iterationStartedData, + status: WorkflowRunningStatus.Running, + }) + }) + }, + onIterationFinish: ({ data: iterationFinishedData }) => { + updateChatTreeNode(messageId, (responseItem) => { + if (!responseItem.workflowProcess?.tracing) + return + const tracing = responseItem.workflowProcess.tracing + const iterationIndex = tracing.findIndex(item => item.node_id === iterationFinishedData.node_id + && (item.execution_metadata?.parallel_id === iterationFinishedData.execution_metadata?.parallel_id || item.parallel_id === iterationFinishedData.execution_metadata?.parallel_id))! + if (iterationIndex > -1) { + tracing[iterationIndex] = { + ...tracing[iterationIndex], + ...iterationFinishedData, + status: WorkflowRunningStatus.Succeeded, + } + } + }) + }, + onNodeStarted: ({ data: nodeStartedData }) => { + updateChatTreeNode(messageId, (responseItem) => { + if (!responseItem.workflowProcess) + return + if (!responseItem.workflowProcess.tracing) + responseItem.workflowProcess.tracing = [] + const currentIndex = responseItem.workflowProcess.tracing.findIndex(item => item.node_id === nodeStartedData.node_id) + if (currentIndex > -1) { + responseItem.workflowProcess.tracing[currentIndex] = { + ...nodeStartedData, + status: NodeRunningStatus.Running, + } + } + else { + if (nodeStartedData.iteration_id) + return + responseItem.workflowProcess.tracing.push({ + ...nodeStartedData, + status: WorkflowRunningStatus.Running, + }) + } + }) + }, + onNodeFinished: ({ data: nodeFinishedData }) => { + updateChatTreeNode(messageId, (responseItem) => { + if (!responseItem.workflowProcess?.tracing) + return + if (nodeFinishedData.iteration_id) + return + const currentIndex = responseItem.workflowProcess.tracing.findIndex((item) => { + if (!item.execution_metadata?.parallel_id) + return item.id === nodeFinishedData.id + return item.id === nodeFinishedData.id && (item.execution_metadata?.parallel_id === nodeFinishedData.execution_metadata?.parallel_id) + }) + if (currentIndex > -1) + responseItem.workflowProcess.tracing[currentIndex] = nodeFinishedData as any + }) + }, + onTTSChunk: (msgId: string, audio: string) => { + if (!audio || audio === '') + return + const audioPlayer = getOrCreatePlayer() + if (audioPlayer) { + audioPlayer.playAudioWithAudio(audio, true) + AudioPlayerManager.getInstance().resetMsgId(msgId) + } + }, + onTTSEnd: (_msgId: string, audio: string) => { + const audioPlayer = getOrCreatePlayer() + if (audioPlayer) + audioPlayer.playAudioWithAudio(audio, false) + }, + onLoopStart: ({ data: loopStartedData }) => { + updateChatTreeNode(messageId, (responseItem) => { + if (!responseItem.workflowProcess) + return + if (!responseItem.workflowProcess.tracing) + responseItem.workflowProcess.tracing = [] + responseItem.workflowProcess.tracing.push({ + ...loopStartedData, + status: WorkflowRunningStatus.Running, + }) + }) + }, + onLoopFinish: ({ data: loopFinishedData }) => { + updateChatTreeNode(messageId, (responseItem) => { + if (!responseItem.workflowProcess?.tracing) + return + const tracing = responseItem.workflowProcess.tracing + const loopIndex = tracing.findIndex(item => item.node_id === loopFinishedData.node_id + && (item.execution_metadata?.parallel_id === loopFinishedData.execution_metadata?.parallel_id || item.parallel_id === loopFinishedData.execution_metadata?.parallel_id))! + if (loopIndex > -1) { + tracing[loopIndex] = { + ...tracing[loopIndex], + ...loopFinishedData, + status: WorkflowRunningStatus.Succeeded, + } + } + }) + }, + onHumanInputRequired: ({ data: humanInputRequiredData }) => { + updateChatTreeNode(messageId, (responseItem) => { + if (!responseItem.humanInputFormDataList) { + responseItem.humanInputFormDataList = [humanInputRequiredData] + } + else { + const currentFormIndex = responseItem.humanInputFormDataList.findIndex(item => item.node_id === humanInputRequiredData.node_id) + if (currentFormIndex > -1) + responseItem.humanInputFormDataList[currentFormIndex] = humanInputRequiredData + else + responseItem.humanInputFormDataList.push(humanInputRequiredData) + } + if (responseItem.workflowProcess?.tracing) { + const currentTracingIndex = responseItem.workflowProcess.tracing.findIndex(item => item.node_id === humanInputRequiredData.node_id) + if (currentTracingIndex > -1) + responseItem.workflowProcess.tracing[currentTracingIndex].status = NodeRunningStatus.Paused + } + }) + }, + onHumanInputFormFilled: ({ data: humanInputFilledFormData }) => { + updateChatTreeNode(messageId, (responseItem) => { + if (responseItem.humanInputFormDataList?.length) { + const currentFormIndex = responseItem.humanInputFormDataList.findIndex(item => item.node_id === humanInputFilledFormData.node_id) + if (currentFormIndex > -1) + responseItem.humanInputFormDataList.splice(currentFormIndex, 1) + } + if (!responseItem.humanInputFilledFormDataList) + responseItem.humanInputFilledFormDataList = [humanInputFilledFormData] + else + responseItem.humanInputFilledFormDataList.push(humanInputFilledFormData) + }) + }, + onHumanInputFormTimeout: ({ data: humanInputFormTimeoutData }) => { + updateChatTreeNode(messageId, (responseItem) => { + if (responseItem.humanInputFormDataList?.length) { + const currentFormIndex = responseItem.humanInputFormDataList.findIndex(item => item.node_id === humanInputFormTimeoutData.node_id) + responseItem.humanInputFormDataList[currentFormIndex].expiration_time = humanInputFormTimeoutData.expiration_time + } + }) + }, + onWorkflowPaused: ({ data: workflowPausedData }) => { + const resumeUrl = `/workflow/${workflowPausedData.workflow_run_id}/events` + pausedStateRef.current = true + sseGet(resumeUrl, {}, otherOptions) + updateChatTreeNode(messageId, (responseItem) => { + responseItem.workflowProcess!.status = WorkflowRunningStatus.Paused + }) + }, + } + + if (workflowEventsAbortControllerRef.current) + workflowEventsAbortControllerRef.current.abort() + + sseGet(url, {}, otherOptions) + }, [updateChatTreeNode, handleResponding, createAudioPlayerManager, config?.suggested_questions_after_answer]) + const updateCurrentQAOnTree = useCallback(({ parentId, responseItem, @@ -1275,6 +1591,7 @@ export const useChat = ( setIsResponding, handleSend, handleResume, + handleReconnect, handleSwitchSibling, suggestedQuestions, handleRestart, diff --git a/web/app/components/base/chat/chat/type.ts b/web/app/components/base/chat/chat/type.ts index 6ddb4f958e..0028eb7357 100644 --- a/web/app/components/base/chat/chat/type.ts +++ b/web/app/components/base/chat/chat/type.ts @@ -111,6 +111,7 @@ export type IChatItem = { agent_thoughts?: ThoughtItem[] message_files?: FileEntity[] workflow_run_id?: string + created_at?: number // for agent log conversationId?: string input?: any diff --git a/web/app/components/base/chat/embedded-chatbot/__tests__/chat-wrapper.spec.tsx b/web/app/components/base/chat/embedded-chatbot/__tests__/chat-wrapper.spec.tsx index 865023722e..5c7df93c4b 100644 --- a/web/app/components/base/chat/embedded-chatbot/__tests__/chat-wrapper.spec.tsx +++ b/web/app/components/base/chat/embedded-chatbot/__tests__/chat-wrapper.spec.tsx @@ -177,6 +177,7 @@ const createUseChatReturn = (overrides: Partial = {}): UseChatRet setTargetMessageId: vi.fn() as UseChatReturn['setTargetMessageId'], handleSend: vi.fn(), handleResume: vi.fn(), + handleReconnect: vi.fn(), setIsResponding: vi.fn() as UseChatReturn['setIsResponding'], handleStop: vi.fn(), handleSwitchSibling: vi.fn(), @@ -541,6 +542,52 @@ describe('EmbeddedChatbot chat-wrapper', () => { expect(handleSwitchSibling).toHaveBeenCalled() }) + it('should reconnect to a recent running workflow on mount', () => { + const handleReconnect = vi.fn() + vi.mocked(useChat).mockReturnValue(createUseChatReturn({ + handleReconnect, + })) + vi.mocked(useEmbeddedChatbotContext).mockReturnValue(createContextValue({ + appPrevChatList: [ + { + id: 'running-node', + isAnswer: true, + content: 'partial', + workflow_run_id: 'run-active', + created_at: Math.floor(Date.now() / 1000) - 30, + children: [], + } as unknown as ChatItemInTree, + ], + })) + render() + expect(handleReconnect).toHaveBeenCalledWith( + 'running-node', + 'run-active', + expect.objectContaining({ isPublicAPI: true }), + ) + }) + + it('should not reconnect to an old workflow beyond the retention window', () => { + const handleReconnect = vi.fn() + vi.mocked(useChat).mockReturnValue(createUseChatReturn({ + handleReconnect, + })) + vi.mocked(useEmbeddedChatbotContext).mockReturnValue(createContextValue({ + appPrevChatList: [ + { + id: 'old-node', + isAnswer: true, + content: 'done', + workflow_run_id: 'run-old', + created_at: Math.floor(Date.now() / 1000) - 700, + children: [], + } as unknown as ChatItemInTree, + ], + })) + render() + expect(handleReconnect).not.toHaveBeenCalled() + }) + it('should handle conversation completion and suggested questions in chat actions', async () => { const handleSend = vi.fn() vi.mocked(useChat).mockReturnValue(createUseChatReturn({ diff --git a/web/app/components/base/chat/embedded-chatbot/chat-wrapper.tsx b/web/app/components/base/chat/embedded-chatbot/chat-wrapper.tsx index b63784cc34..aa5e8db6b7 100644 --- a/web/app/components/base/chat/embedded-chatbot/chat-wrapper.tsx +++ b/web/app/components/base/chat/embedded-chatbot/chat-wrapper.tsx @@ -85,6 +85,7 @@ const ChatWrapper = () => { handleSend, handleStop, handleSwitchSibling, + handleReconnect, isResponding: respondingState, suggestedQuestions, } = useChat( @@ -142,37 +143,47 @@ const ChatWrapper = () => { setIsResponding(respondingState) }, [respondingState, setIsResponding]) - // Resume paused workflows when chat history is loaded + // Resume paused workflows or reconnect to running workflows when chat history is loaded useEffect(() => { if (!appPrevChatList || appPrevChatList.length === 0) return - // Find the last answer item with workflow_run_id that needs resumption (DFS - find deepest first) - let lastPausedNode: ChatItemInTree | undefined - const findLastPausedWorkflow = (nodes: ChatItemInTree[]) => { - nodes.forEach((node) => { - // DFS: recurse to children first - if (node.children && node.children.length > 0) - findLastPausedWorkflow(node.children) + const STREAM_RETENTION_SECONDS = 600 - // Track the last node with humanInputFormDataList - if (node.isAnswer && node.workflow_run_id && node.humanInputFormDataList && node.humanInputFormDataList.length > 0) - lastPausedNode = node + let lastPausedNode: ChatItemInTree | undefined + let lastRunningNode: ChatItemInTree | undefined + const findReconnectableWorkflow = (nodes: ChatItemInTree[]) => { + nodes.forEach((node) => { + if (node.isAnswer && node.workflow_run_id) { + if (node.humanInputFormDataList && node.humanInputFormDataList.length > 0) { + lastPausedNode = node + } + else if ( + node.created_at + && (Date.now() / 1000 - node.created_at) < STREAM_RETENTION_SECONDS + ) { + lastRunningNode = node + } + } + + if (node.children && node.children.length > 0) + findReconnectableWorkflow(node.children) }) } - findLastPausedWorkflow(appPrevChatList) + findReconnectableWorkflow(appPrevChatList) + + const callbacks = { + onGetSuggestedQuestions: (responseItemId: string) => fetchSuggestedQuestions(responseItemId, appSourceType, appId), + onConversationComplete: currentConversationId ? undefined : handleNewConversationCompleted, + isPublicAPI: appSourceType === AppSourceType.webApp, + } - // Only resume the last paused workflow if (lastPausedNode) { - handleSwitchSibling( - lastPausedNode.id, - { - onGetSuggestedQuestions: responseItemId => fetchSuggestedQuestions(responseItemId, appSourceType, appId), - onConversationComplete: currentConversationId ? undefined : handleNewConversationCompleted, - isPublicAPI: appSourceType === AppSourceType.webApp, - }, - ) + handleSwitchSibling(lastPausedNode.id, callbacks) + } + else if (lastRunningNode) { + handleReconnect(lastRunningNode.id, lastRunningNode.workflow_run_id!, callbacks) } }, []) diff --git a/web/app/components/base/chat/embedded-chatbot/hooks.tsx b/web/app/components/base/chat/embedded-chatbot/hooks.tsx index 67b5332434..f7d64c55eb 100644 --- a/web/app/components/base/chat/embedded-chatbot/hooks.tsx +++ b/web/app/components/base/chat/embedded-chatbot/hooks.tsx @@ -42,6 +42,8 @@ function getFormattedChatList(messages: any[]) { citation: item.retriever_resources, message_files: getProcessedFilesFromResponse(answerFiles.map((item: any) => ({ ...item, related_id: item.id }))), parentMessageId: `question-${item.id}`, + workflow_run_id: item.workflow_run_id, + created_at: item.created_at, }) }) return newChatList diff --git a/web/app/components/workflow/panel/debug-and-preview/__tests__/hooks/handle-resume.spec.ts b/web/app/components/workflow/panel/debug-and-preview/__tests__/hooks/handle-resume.spec.ts index 328de5a0af..c5c9198c3b 100644 --- a/web/app/components/workflow/panel/debug-and-preview/__tests__/hooks/handle-resume.spec.ts +++ b/web/app/components/workflow/panel/debug-and-preview/__tests__/hooks/handle-resume.spec.ts @@ -122,7 +122,7 @@ describe('useChat – handleResume', () => { }) expect(mockSseGet).toHaveBeenCalledWith( - '/workflow/wfr-1/events?include_state_snapshot=true', + '/workflow/wfr-1/events?include_state_snapshot=true&replay=true', {}, expect.any(Object), ) diff --git a/web/app/components/workflow/panel/debug-and-preview/hooks.ts b/web/app/components/workflow/panel/debug-and-preview/hooks.ts index 2529d505be..f18c7f26f0 100644 --- a/web/app/components/workflow/panel/debug-and-preview/hooks.ts +++ b/web/app/components/workflow/panel/debug-and-preview/hooks.ts @@ -680,8 +680,10 @@ export const useChat = ( onGetSuggestedQuestions, }: SendCallback, ) => { - // Re-subscribe to workflow events for the specific message - const url = `/workflow/${workflowRunId}/events?include_state_snapshot=true` + // Re-subscribe to workflow events for the specific message. + // replay=true tells the backend to read from the beginning of the Redis Stream + // so all retained events are replayed on reconnection (e.g. page refresh). + const url = `/workflow/${workflowRunId}/events?include_state_snapshot=true&replay=true` const otherOptions: IOtherOptions = { getAbortController: (abortController) => {