From ee87289917953362b67719c1ae09bfa5d17377dc Mon Sep 17 00:00:00 2001 From: agenthaulk Date: Sun, 5 Apr 2026 03:23:51 -0700 Subject: [PATCH] refactor: convert AppMode if/elif to match/case in app_generate_service (#30001) (#34563) Co-authored-by: agenthaulk Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com> --- api/services/app_generate_service.py | 314 +++++++++++++++------------ 1 file changed, 174 insertions(+), 140 deletions(-) diff --git a/api/services/app_generate_service.py b/api/services/app_generate_service.py index 40013f2b66..17ed98d301 100644 --- a/api/services/app_generate_service.py +++ b/api/services/app_generate_service.py @@ -116,139 +116,143 @@ class AppGenerateService: request_id = RateLimit.gen_request_key() try: request_id = rate_limit.enter(request_id) - if app_model.mode == AppMode.COMPLETION: - return rate_limit.generate( - CompletionAppGenerator.convert_to_event_stream( - CompletionAppGenerator().generate( - app_model=app_model, user=user, args=args, invoke_from=invoke_from, streaming=streaming - ), - ), - request_id=request_id, - ) - elif app_model.mode == AppMode.AGENT_CHAT or app_model.is_agent: - return rate_limit.generate( - AgentChatAppGenerator.convert_to_event_stream( - AgentChatAppGenerator().generate( - app_model=app_model, user=user, args=args, invoke_from=invoke_from, streaming=streaming - ), - ), - request_id, - ) - elif app_model.mode == AppMode.CHAT: - return rate_limit.generate( - ChatAppGenerator.convert_to_event_stream( - ChatAppGenerator().generate( - app_model=app_model, user=user, args=args, invoke_from=invoke_from, streaming=streaming - ), - ), - request_id=request_id, - ) - elif app_model.mode == AppMode.ADVANCED_CHAT: - workflow_id = args.get("workflow_id") - workflow = cls._get_workflow(app_model, invoke_from, workflow_id) - - if streaming: - # Streaming mode: subscribe to SSE and enqueue the execution on first subscriber - with rate_limit_context(rate_limit, request_id): - payload = AppExecutionParams.new( - app_model=app_model, - workflow=workflow, - user=user, - args=args, - invoke_from=invoke_from, - streaming=True, - call_depth=0, - ) - payload_json = payload.model_dump_json() - - def on_subscribe(): - workflow_based_app_execution_task.delay(payload_json) - - on_subscribe = cls._build_streaming_task_on_subscribe(on_subscribe) - generator = AdvancedChatAppGenerator() + effective_mode = ( + AppMode.AGENT_CHAT if app_model.is_agent and app_model.mode != AppMode.AGENT_CHAT else app_model.mode + ) + match effective_mode: + case AppMode.COMPLETION: return rate_limit.generate( - generator.convert_to_event_stream( - generator.retrieve_events( - AppMode.ADVANCED_CHAT, - payload.workflow_run_id, - on_subscribe=on_subscribe, + CompletionAppGenerator.convert_to_event_stream( + CompletionAppGenerator().generate( + app_model=app_model, user=user, args=args, invoke_from=invoke_from, streaming=streaming ), ), request_id=request_id, ) - else: - # Blocking mode: run synchronously and return JSON instead of SSE - # Keep behaviour consistent with WORKFLOW blocking branch. - advanced_generator = AdvancedChatAppGenerator() + case AppMode.AGENT_CHAT: return rate_limit.generate( - advanced_generator.convert_to_event_stream( - advanced_generator.generate( + AgentChatAppGenerator.convert_to_event_stream( + AgentChatAppGenerator().generate( + app_model=app_model, user=user, args=args, invoke_from=invoke_from, streaming=streaming + ), + ), + request_id, + ) + case AppMode.CHAT: + return rate_limit.generate( + ChatAppGenerator.convert_to_event_stream( + ChatAppGenerator().generate( + app_model=app_model, user=user, args=args, invoke_from=invoke_from, streaming=streaming + ), + ), + request_id=request_id, + ) + case AppMode.ADVANCED_CHAT: + workflow_id = args.get("workflow_id") + workflow = cls._get_workflow(app_model, invoke_from, workflow_id) + + if streaming: + # Streaming mode: subscribe to SSE and enqueue the execution on first subscriber + with rate_limit_context(rate_limit, request_id): + payload = AppExecutionParams.new( app_model=app_model, workflow=workflow, user=user, args=args, invoke_from=invoke_from, - workflow_run_id=str(uuid.uuid4()), - streaming=False, + streaming=True, + call_depth=0, ) - ), - request_id=request_id, - ) - elif app_model.mode == AppMode.WORKFLOW: - workflow_id = args.get("workflow_id") - workflow = cls._get_workflow(app_model, invoke_from, workflow_id) - if streaming: - with rate_limit_context(rate_limit, request_id): - payload = AppExecutionParams.new( - app_model=app_model, - workflow=workflow, - user=user, - args=args, - invoke_from=invoke_from, - streaming=True, - call_depth=0, - root_node_id=root_node_id, - workflow_run_id=str(uuid.uuid4()), + payload_json = payload.model_dump_json() + + def on_subscribe(): + workflow_based_app_execution_task.delay(payload_json) + + on_subscribe = cls._build_streaming_task_on_subscribe(on_subscribe) + generator = AdvancedChatAppGenerator() + return rate_limit.generate( + generator.convert_to_event_stream( + generator.retrieve_events( + AppMode.ADVANCED_CHAT, + payload.workflow_run_id, + on_subscribe=on_subscribe, + ), + ), + request_id=request_id, ) - payload_json = payload.model_dump_json() + else: + # Blocking mode: run synchronously and return JSON instead of SSE + # Keep behaviour consistent with WORKFLOW blocking branch. + advanced_generator = AdvancedChatAppGenerator() + return rate_limit.generate( + advanced_generator.convert_to_event_stream( + advanced_generator.generate( + app_model=app_model, + workflow=workflow, + user=user, + args=args, + invoke_from=invoke_from, + workflow_run_id=str(uuid.uuid4()), + streaming=False, + ) + ), + request_id=request_id, + ) + case AppMode.WORKFLOW: + workflow_id = args.get("workflow_id") + workflow = cls._get_workflow(app_model, invoke_from, workflow_id) + if streaming: + with rate_limit_context(rate_limit, request_id): + payload = AppExecutionParams.new( + app_model=app_model, + workflow=workflow, + user=user, + args=args, + invoke_from=invoke_from, + streaming=True, + call_depth=0, + root_node_id=root_node_id, + workflow_run_id=str(uuid.uuid4()), + ) + payload_json = payload.model_dump_json() - def on_subscribe(): - workflow_based_app_execution_task.delay(payload_json) + def on_subscribe(): + workflow_based_app_execution_task.delay(payload_json) - on_subscribe = cls._build_streaming_task_on_subscribe(on_subscribe) + on_subscribe = cls._build_streaming_task_on_subscribe(on_subscribe) + return rate_limit.generate( + WorkflowAppGenerator.convert_to_event_stream( + MessageBasedAppGenerator.retrieve_events( + AppMode.WORKFLOW, + payload.workflow_run_id, + on_subscribe=on_subscribe, + ), + ), + request_id, + ) + + pause_config = PauseStateLayerConfig( + session_factory=session_factory.get_session_maker(), + state_owner_user_id=workflow.created_by, + ) return rate_limit.generate( WorkflowAppGenerator.convert_to_event_stream( - MessageBasedAppGenerator.retrieve_events( - AppMode.WORKFLOW, - payload.workflow_run_id, - on_subscribe=on_subscribe, + WorkflowAppGenerator().generate( + app_model=app_model, + workflow=workflow, + user=user, + args=args, + invoke_from=invoke_from, + streaming=False, + root_node_id=root_node_id, + call_depth=0, + pause_state_config=pause_config, ), ), request_id, ) - - pause_config = PauseStateLayerConfig( - session_factory=session_factory.get_session_maker(), - state_owner_user_id=workflow.created_by, - ) - return rate_limit.generate( - WorkflowAppGenerator.convert_to_event_stream( - WorkflowAppGenerator().generate( - app_model=app_model, - workflow=workflow, - user=user, - args=args, - invoke_from=invoke_from, - streaming=False, - root_node_id=root_node_id, - call_depth=0, - pause_state_config=pause_config, - ), - ), - request_id, - ) - else: - raise ValueError(f"Invalid app mode {app_model.mode}") + case _: + raise ValueError(f"Invalid app mode {app_model.mode}") except Exception: quota_charge.refund() rate_limit.exit(request_id) @@ -280,43 +284,73 @@ class AppGenerateService: @classmethod def generate_single_iteration(cls, app_model: App, user: Account, node_id: str, args: Any, streaming: bool = True): - if app_model.mode == AppMode.ADVANCED_CHAT: - workflow = cls._get_workflow(app_model, InvokeFrom.DEBUGGER) - return AdvancedChatAppGenerator.convert_to_event_stream( - AdvancedChatAppGenerator().single_iteration_generate( - app_model=app_model, workflow=workflow, node_id=node_id, user=user, args=args, streaming=streaming + match app_model.mode: + case AppMode.COMPLETION | AppMode.CHAT | AppMode.AGENT_CHAT: + raise ValueError(f"Invalid app mode {app_model.mode}") + case AppMode.ADVANCED_CHAT: + workflow = cls._get_workflow(app_model, InvokeFrom.DEBUGGER) + return AdvancedChatAppGenerator.convert_to_event_stream( + AdvancedChatAppGenerator().single_iteration_generate( + app_model=app_model, + workflow=workflow, + node_id=node_id, + user=user, + args=args, + streaming=streaming, + ) ) - ) - elif app_model.mode == AppMode.WORKFLOW: - workflow = cls._get_workflow(app_model, InvokeFrom.DEBUGGER) - return AdvancedChatAppGenerator.convert_to_event_stream( - WorkflowAppGenerator().single_iteration_generate( - app_model=app_model, workflow=workflow, node_id=node_id, user=user, args=args, streaming=streaming + case AppMode.WORKFLOW: + workflow = cls._get_workflow(app_model, InvokeFrom.DEBUGGER) + return AdvancedChatAppGenerator.convert_to_event_stream( + WorkflowAppGenerator().single_iteration_generate( + app_model=app_model, + workflow=workflow, + node_id=node_id, + user=user, + args=args, + streaming=streaming, + ) ) - ) - else: - raise ValueError(f"Invalid app mode {app_model.mode}") + case AppMode.CHANNEL | AppMode.RAG_PIPELINE: + raise ValueError(f"Invalid app mode {app_model.mode}") + case _: + raise ValueError(f"Invalid app mode {app_model.mode}") @classmethod def generate_single_loop( cls, app_model: App, user: Account, node_id: str, args: LoopNodeRunPayload, streaming: bool = True ): - if app_model.mode == AppMode.ADVANCED_CHAT: - workflow = cls._get_workflow(app_model, InvokeFrom.DEBUGGER) - return AdvancedChatAppGenerator.convert_to_event_stream( - AdvancedChatAppGenerator().single_loop_generate( - app_model=app_model, workflow=workflow, node_id=node_id, user=user, args=args, streaming=streaming + match app_model.mode: + case AppMode.COMPLETION | AppMode.CHAT | AppMode.AGENT_CHAT: + raise ValueError(f"Invalid app mode {app_model.mode}") + case AppMode.ADVANCED_CHAT: + workflow = cls._get_workflow(app_model, InvokeFrom.DEBUGGER) + return AdvancedChatAppGenerator.convert_to_event_stream( + AdvancedChatAppGenerator().single_loop_generate( + app_model=app_model, + workflow=workflow, + node_id=node_id, + user=user, + args=args, + streaming=streaming, + ) ) - ) - elif app_model.mode == AppMode.WORKFLOW: - workflow = cls._get_workflow(app_model, InvokeFrom.DEBUGGER) - return AdvancedChatAppGenerator.convert_to_event_stream( - WorkflowAppGenerator().single_loop_generate( - app_model=app_model, workflow=workflow, node_id=node_id, user=user, args=args, streaming=streaming + case AppMode.WORKFLOW: + workflow = cls._get_workflow(app_model, InvokeFrom.DEBUGGER) + return AdvancedChatAppGenerator.convert_to_event_stream( + WorkflowAppGenerator().single_loop_generate( + app_model=app_model, + workflow=workflow, + node_id=node_id, + user=user, + args=args, + streaming=streaming, + ) ) - ) - else: - raise ValueError(f"Invalid app mode {app_model.mode}") + case AppMode.CHANNEL | AppMode.RAG_PIPELINE: + raise ValueError(f"Invalid app mode {app_model.mode}") + case _: + raise ValueError(f"Invalid app mode {app_model.mode}") @classmethod def generate_more_like_this(