From 2b5dbbc7ff0da553abf0278cb8fc58847171180d Mon Sep 17 00:00:00 2001 From: FFXN Date: Mon, 20 Apr 2026 13:55:26 +0800 Subject: [PATCH] use workflow_kind as business type (snippet/evaluation), not workflow_type. --- api/controllers/console/app/app.py | 29 ++++++++--- api/controllers/console/app/workflow.py | 5 +- .../console/evaluation/evaluation.py | 2 + api/core/app/apps/workflow/app_generator.py | 2 +- api/fields/workflow_fields.py | 1 + api/models/__init__.py | 2 + api/models/workflow.py | 48 ++++++++++++++++++- api/services/snippet_service.py | 22 ++++++--- api/services/workflow/workflow_converter.py | 3 +- api/services/workflow_restore.py | 2 + api/services/workflow_service.py | 37 +++++++++----- 11 files changed, 122 insertions(+), 31 deletions(-) diff --git a/api/controllers/console/app/app.py b/api/controllers/console/app/app.py index df5c94a867..27c8008e37 100644 --- a/api/controllers/console/app/app.py +++ b/api/controllers/console/app/app.py @@ -34,6 +34,7 @@ from libs.helper import build_icon_url from libs.login import current_account_with_tenant, login_required from models import App, DatasetPermissionEnum, Workflow from models.model import IconType +from models.workflow import resolve_workflow_kind from services.app_dsl_service import AppDslService from services.app_service import AppService from services.enterprise.enterprise_service import EnterpriseService @@ -330,6 +331,7 @@ class AppPartial(ResponseModel): author_name: str | None = None has_draft_trigger: bool | None = None workflow_type: str | None = None + workflow_kind: str | None = None @computed_field(return_type=str | None) # type: ignore @property @@ -365,6 +367,7 @@ class AppDetail(ResponseModel): updated_at: int | None = None access_mode: str | None = None workflow_type: str | None = None + workflow_kind: str | None = None tags: list[Tag] = Field(default_factory=list) @field_validator("created_at", "updated_at", mode="before") @@ -508,15 +511,23 @@ class AppListApi(Resource): app.has_draft_trigger = str(app.id) in draft_trigger_app_ids workflow_ids = [str(app.workflow_id) for app in app_pagination.items if app.workflow_id] - workflow_type_map: dict[str, str] = {} + workflow_info_map: dict[str, tuple[str, str]] = {} if workflow_ids: rows = db.session.execute( - select(Workflow.id, Workflow.type).where(Workflow.id.in_(workflow_ids)) + select(Workflow.id, Workflow.type, Workflow.kind).where(Workflow.id.in_(workflow_ids)) ).all() - workflow_type_map = {str(row.id): row.type for row in rows} + workflow_info_map = { + str(row.id): ( + row.type.value if hasattr(row.type, "value") else str(row.type), + resolve_workflow_kind(row.kind).value, + ) + for row in rows + } for app in app_pagination.items: - app.workflow_type = workflow_type_map.get(str(app.workflow_id)) if app.workflow_id else None + workflow_info = workflow_info_map.get(str(app.workflow_id)) if app.workflow_id else None + app.workflow_type = workflow_info[0] if workflow_info else None + app.workflow_kind = workflow_info[1] if workflow_info else None pagination_model = AppPagination.model_validate(app_pagination, from_attributes=True) return pagination_model.model_dump(mode="json"), 200 @@ -566,11 +577,15 @@ class AppApi(Resource): if app_model.workflow_id: row = db.session.execute( - select(Workflow.type).where(Workflow.id == app_model.workflow_id) - ).scalar() - app_model.workflow_type = row if row else None + select(Workflow.type, Workflow.kind).where(Workflow.id == app_model.workflow_id) + ).first() + app_model.workflow_type = ( + (row.type.value if hasattr(row.type, "value") else str(row.type)) if row else None + ) + app_model.workflow_kind = resolve_workflow_kind(row.kind).value if row else None else: app_model.workflow_type = None + app_model.workflow_kind = None response_model = AppDetailWithSite.model_validate(app_model, from_attributes=True) return response_model.model_dump(mode="json") diff --git a/api/controllers/console/app/workflow.py b/api/controllers/console/app/workflow.py index 1f25beeefc..e1e0b1eef0 100644 --- a/api/controllers/console/app/workflow.py +++ b/api/controllers/console/app/workflow.py @@ -48,7 +48,7 @@ from libs.helper import TimestampField, uuid_value from libs.login import current_account_with_tenant, login_required from models import App from models.model import AppMode -from models.workflow import Workflow, WorkflowType +from models.workflow import Workflow, WorkflowKind from repositories.workflow_collaboration_repository import WORKFLOW_ONLINE_USERS_PREFIX from services.app_generate_service import AppGenerateService from services.errors.app import IsDraftWorkflowError, WorkflowHashNotEqualError, WorkflowNotFoundError @@ -1140,7 +1140,7 @@ class WorkflowTypeConvertApi(Resource): def post(self, app_model: App): current_user, _ = current_account_with_tenant() args = WorkflowTypeConvertQuery.model_validate(request.args.to_dict(flat=True)) # type: ignore - target_type = WorkflowType.value_of(args.target_type) + target_type = WorkflowKind.EVALUATION if args.target_type == "evaluation" else WorkflowKind.STANDARD workflow_service = WorkflowService() with Session(db.engine) as session: @@ -1164,6 +1164,7 @@ class WorkflowTypeConvertApi(Resource): "result": "success", "workflow_id": workflow.id, "type": workflow.type.value, + "kind": workflow.kind_or_standard, "updated_at": TimestampField().format(workflow.updated_at or workflow.created_at), } diff --git a/api/controllers/console/evaluation/evaluation.py b/api/controllers/console/evaluation/evaluation.py index 31490020c3..4ecccc05f1 100644 --- a/api/controllers/console/evaluation/evaluation.py +++ b/api/controllers/console/evaluation/evaluation.py @@ -152,6 +152,7 @@ available_evaluation_workflow_list_fields = { "app_id": fields.String, "app_name": fields.String, "type": fields.String, + "kind": fields.String, "version": fields.String, "marked_name": fields.String, "marked_comment": fields.String, @@ -743,6 +744,7 @@ class AvailableEvaluationWorkflowsApi(Resource): "app_id": wf.app_id, "app_name": app_names.get(wf.app_id, ""), "type": wf.type.value, + "kind": wf.kind_or_standard, "version": wf.version, "marked_name": wf.marked_name, "marked_comment": wf.marked_comment, diff --git a/api/core/app/apps/workflow/app_generator.py b/api/core/app/apps/workflow/app_generator.py index 5a1e7e117f..1acb1acaf3 100644 --- a/api/core/app/apps/workflow/app_generator.py +++ b/api/core/app/apps/workflow/app_generator.py @@ -57,7 +57,7 @@ class WorkflowAppGenerator(BaseAppGenerator): @staticmethod def _ensure_snippet_start_node_in_worker(*, session: Session, workflow: Workflow) -> Workflow: """Re-apply snippet virtual Start injection after worker reloads workflow from DB.""" - if workflow.type != "snippet": + if workflow.kind_or_standard != "snippet": return workflow from models.snippet import CustomizedSnippet diff --git a/api/fields/workflow_fields.py b/api/fields/workflow_fields.py index f9b5e98936..94549d2152 100644 --- a/api/fields/workflow_fields.py +++ b/api/fields/workflow_fields.py @@ -68,6 +68,7 @@ pipeline_variable_fields = { workflow_fields = { "id": fields.String, + "kind": fields.String(attribute="kind_or_standard"), "graph": fields.Raw(attribute="graph_dict"), "features": fields.Raw(attribute="features_dict"), "hash": fields.String(attribute="unique_hash"), diff --git a/api/models/__init__.py b/api/models/__init__.py index f6e43bb4ea..2055a6b005 100644 --- a/api/models/__init__.py +++ b/api/models/__init__.py @@ -118,6 +118,7 @@ from .workflow import ( WorkflowAppLog, WorkflowAppLogCreatedFrom, WorkflowArchiveLog, + WorkflowKind, WorkflowNodeExecutionModel, WorkflowNodeExecutionOffload, WorkflowNodeExecutionTriggeredFrom, @@ -240,5 +241,6 @@ __all__ = [ "WorkflowSchedulePlan", "WorkflowToolProvider", "WorkflowTriggerStatus", + "WorkflowKind", "WorkflowType", ] diff --git a/api/models/workflow.py b/api/models/workflow.py index 467d60f6ac..adedecfc9f 100644 --- a/api/models/workflow.py +++ b/api/models/workflow.py @@ -135,6 +135,34 @@ class WorkflowType(StrEnum): return cls.WORKFLOW if app_mode == AppMode.WORKFLOW else cls.CHAT +class WorkflowKind(StrEnum): + """ + Workflow business kind. + + Runtime execution type and product business kind are intentionally separated: + - ``Workflow.type`` is consumed by the graph runtime layer. + - ``Workflow.kind`` is consumed by product logic (snippet/evaluation/standard). + """ + + STANDARD = "standard" + SNIPPET = "snippet" + EVALUATION = "evaluation" + + @classmethod + def value_of(cls, value: str) -> "WorkflowKind": + for kind in cls: + if kind.value == value: + return kind + raise ValueError(f"invalid workflow kind value {value}") + + +def resolve_workflow_kind(kind: "WorkflowKind | str | None") -> WorkflowKind: + """Resolve workflow business kind, defaulting empty values to ``standard``.""" + if kind: + return kind if isinstance(kind, WorkflowKind) else WorkflowKind.value_of(kind) + return WorkflowKind.STANDARD + + class _InvalidGraphDefinitionError(Exception): pass @@ -148,12 +176,14 @@ class Workflow(Base): # bug - id (uuid) Workflow ID, pk - tenant_id (uuid) Workspace ID - app_id (uuid) App ID - - type (string) Workflow type + - type (string) Runtime workflow type `workflow` for `Workflow App` `chat` for `Chat App workflow mode` + - kind (string) Business workflow kind (`standard`/`snippet`/`evaluation`) + - version (string) Version `draft` for draft version (only one for each app), other for version number (redundant) @@ -182,6 +212,12 @@ class Workflow(Base): # bug tenant_id: Mapped[str] = mapped_column(StringUUID, nullable=False) app_id: Mapped[str] = mapped_column(StringUUID, nullable=False) type: Mapped[WorkflowType] = mapped_column(EnumText(WorkflowType, length=255), nullable=False) + kind: Mapped[WorkflowKind | None] = mapped_column( + EnumText(WorkflowKind, length=255), + nullable=True, + default=WorkflowKind.STANDARD, + server_default=sa.text("'standard'"), + ) version: Mapped[str] = mapped_column(String(255), nullable=False) marked_name: Mapped[str] = mapped_column(String(255), default="", server_default="") marked_comment: Mapped[str] = mapped_column(String(255), default="", server_default="") @@ -221,6 +257,7 @@ class Workflow(Base): # bug environment_variables: Sequence[VariableBase], conversation_variables: Sequence[VariableBase], rag_pipeline_variables: list[dict], + kind: str | None = WorkflowKind.STANDARD.value, marked_name: str = "", marked_comment: str = "", ) -> "Workflow": @@ -229,6 +266,7 @@ class Workflow(Base): # bug workflow.tenant_id = tenant_id workflow.app_id = app_id workflow.type = WorkflowType(type) + workflow.kind = WorkflowKind(kind) if kind else WorkflowKind.STANDARD workflow.version = version workflow.graph = graph workflow.features = features @@ -250,6 +288,14 @@ class Workflow(Base): # bug def updated_by_account(self): return db.session.get(Account, self.updated_by) if self.updated_by else None + @property + def kind_or_standard(self) -> str: + return self.resolved_kind.value + + @property + def resolved_kind(self) -> WorkflowKind: + return resolve_workflow_kind(self.kind) + @property def graph_dict(self) -> Mapping[str, Any]: # TODO(QuantumGhost): Consider caching `graph_dict` to avoid repeated JSON decoding. diff --git a/api/services/snippet_service.py b/api/services/snippet_service.py index a2cdc23f3d..0d525a6248 100644 --- a/api/services/snippet_service.py +++ b/api/services/snippet_service.py @@ -16,6 +16,7 @@ from models.enums import WorkflowRunTriggeredFrom from models.snippet import CustomizedSnippet, SnippetType from models.workflow import ( Workflow, + WorkflowKind, WorkflowNodeExecutionModel, WorkflowRun, WorkflowType, @@ -47,6 +48,11 @@ class SnippetService: ) self._workflow_run_repo = DifyAPIRepositoryFactory.create_api_workflow_run_repository(session_maker) + @staticmethod + def _snippet_kind_filter(): + """Match snippet workflows by business kind.""" + return Workflow.kind == WorkflowKind.SNIPPET.value + @staticmethod def validate_snippet_graph_forbidden_nodes(graph: Mapping[str, Any]) -> None: """Reject graphs that contain node types not allowed in snippets.""" @@ -271,7 +277,7 @@ class SnippetService: .where( Workflow.tenant_id == snippet.tenant_id, Workflow.app_id == snippet.id, - Workflow.type == WorkflowType.SNIPPET.value, + self._snippet_kind_filter(), Workflow.version == "draft", ) .first() @@ -293,7 +299,7 @@ class SnippetService: .where( Workflow.tenant_id == snippet.tenant_id, Workflow.app_id == snippet.id, - Workflow.type == WorkflowType.SNIPPET.value, + self._snippet_kind_filter(), Workflow.id == snippet.workflow_id, ) .first() @@ -336,7 +342,8 @@ class SnippetService: tenant_id=snippet.tenant_id, app_id=snippet.id, features="{}", - type=WorkflowType.SNIPPET.value, + type=WorkflowType.WORKFLOW.value, + kind=WorkflowKind.SNIPPET.value, version="draft", graph=json.dumps(graph), created_by=account.id, @@ -348,6 +355,8 @@ class SnippetService: else: # Update existing draft workflow workflow.graph = json.dumps(graph) + workflow.type = WorkflowType.WORKFLOW.value + workflow.kind = WorkflowKind.SNIPPET workflow.updated_by = account.id workflow.updated_at = datetime.now(UTC).replace(tzinfo=None) workflow.environment_variables = [] @@ -381,7 +390,7 @@ class SnippetService: draft_workflow_stmt = select(Workflow).where( Workflow.tenant_id == snippet.tenant_id, Workflow.app_id == snippet.id, - Workflow.type == WorkflowType.SNIPPET.value, + self._snippet_kind_filter(), Workflow.version == "draft", ) draft_workflow = session.scalar(draft_workflow_stmt) @@ -394,7 +403,7 @@ class SnippetService: workflow = Workflow.new( tenant_id=snippet.tenant_id, app_id=snippet.id, - type=draft_workflow.type, + type=WorkflowType.WORKFLOW.value, version=str(datetime.now(UTC).replace(tzinfo=None)), graph=draft_workflow.graph, features=draft_workflow.features, @@ -402,6 +411,7 @@ class SnippetService: environment_variables=[], conversation_variables=[], rag_pipeline_variables=draft_workflow.rag_pipeline_variables, + kind=WorkflowKind.SNIPPET.value, marked_name="", marked_comment="", ) @@ -440,7 +450,7 @@ class SnippetService: select(Workflow) .where( Workflow.app_id == snippet.id, - Workflow.type == WorkflowType.SNIPPET.value, + self._snippet_kind_filter(), Workflow.version != "draft", ) .order_by(Workflow.version.desc()) diff --git a/api/services/workflow/workflow_converter.py b/api/services/workflow/workflow_converter.py index 5dedb9e372..1658bdf99f 100644 --- a/api/services/workflow/workflow_converter.py +++ b/api/services/workflow/workflow_converter.py @@ -27,7 +27,7 @@ from graphon.variables.input_entities import VariableEntity from models import Account from models.api_based_extension import APIBasedExtension, APIBasedExtensionPoint from models.model import App, AppMode, AppModelConfig, IconType -from models.workflow import Workflow, WorkflowType +from models.workflow import Workflow, WorkflowKind, WorkflowType class _NodeType(TypedDict): @@ -208,6 +208,7 @@ class WorkflowConverter: tenant_id=app_model.tenant_id, app_id=app_model.id, type=WorkflowType.from_app_mode(new_app_mode).value, + kind=WorkflowKind.STANDARD.value, version=Workflow.VERSION_DRAFT, graph=json.dumps(graph), features=json.dumps(features), diff --git a/api/services/workflow_restore.py b/api/services/workflow_restore.py index 083235d228..062c8c2a6b 100644 --- a/api/services/workflow_restore.py +++ b/api/services/workflow_restore.py @@ -41,6 +41,7 @@ def apply_published_workflow_snapshot_to_draft( tenant_id=tenant_id, app_id=app_id, type=workflow_type, + kind=source_workflow.kind_or_standard, version=Workflow.VERSION_DRAFT, graph=source_workflow.graph, features=source_workflow.serialized_features, @@ -51,6 +52,7 @@ def apply_published_workflow_snapshot_to_draft( draft_workflow.graph = source_workflow.graph draft_workflow.features = source_workflow.serialized_features + draft_workflow.kind = source_workflow.resolved_kind draft_workflow.updated_by = account.id draft_workflow.updated_at = updated_at_factory() draft_workflow.copy_serialized_variable_storage_from(source_workflow) diff --git a/api/services/workflow_service.py b/api/services/workflow_service.py index 6786623d7a..a24d121c76 100644 --- a/api/services/workflow_service.py +++ b/api/services/workflow_service.py @@ -5,7 +5,7 @@ import uuid from collections.abc import Callable, Generator, Mapping, Sequence from typing import Any, cast -from sqlalchemy import exists, select +from sqlalchemy import and_, exists, or_, select from sqlalchemy.orm import Session, sessionmaker from configs import dify_config @@ -70,7 +70,13 @@ from models import Account from models.human_input import HumanInputFormRecipient, RecipientType from models.model import App, AppMode from models.tools import WorkflowToolProvider -from models.workflow import Workflow, WorkflowNodeExecutionModel, WorkflowNodeExecutionTriggeredFrom, WorkflowType +from models.workflow import ( + Workflow, + WorkflowKind, + WorkflowNodeExecutionModel, + WorkflowNodeExecutionTriggeredFrom, + WorkflowType, +) from repositories.factory import DifyAPIRepositoryFactory from services.billing_service import BillingService from services.errors.app import ( @@ -276,7 +282,7 @@ class WorkflowService: """ stmt = select(Workflow).where( Workflow.tenant_id == tenant_id, - Workflow.type == WorkflowType.EVALUATION, + Workflow.kind == WorkflowKind.EVALUATION.value, Workflow.version != Workflow.VERSION_DRAFT, ) @@ -343,6 +349,7 @@ class WorkflowService: tenant_id=app_model.tenant_id, app_id=app_model.id, type=WorkflowType.from_app_mode(app_model.mode).value, + kind=WorkflowKind.STANDARD.value, version=Workflow.VERSION_DRAFT, graph=json.dumps(graph), features=json.dumps(features), @@ -359,6 +366,8 @@ class WorkflowService: workflow.updated_at = naive_utc_now() workflow.environment_variables = environment_variables workflow.conversation_variables = conversation_variables + if workflow.resolved_kind == WorkflowKind.STANDARD: + workflow.kind = WorkflowKind.STANDARD # commit db session changes db.session.commit() @@ -534,6 +543,7 @@ class WorkflowService: marked_comment=marked_comment, rag_pipeline_variables=draft_workflow.rag_pipeline_variables, features=draft_workflow.features, + kind=draft_workflow.kind_or_standard, ) # commit db session changes @@ -557,7 +567,7 @@ class WorkflowService: """Publish draft workflow as an evaluation workflow version. Compared to standard publish: - - force published workflow type to ``evaluation``; + - set business kind to ``evaluation``; - reject graphs containing trigger or human-input nodes. """ draft_workflow_stmt = select(Workflow).where( @@ -582,7 +592,7 @@ class WorkflowService: workflow = Workflow.new( tenant_id=app_model.tenant_id, app_id=app_model.id, - type=WorkflowType.EVALUATION.value, + type=draft_workflow.type.value, version=Workflow.version_from_datetime(naive_utc_now()), graph=draft_workflow.graph, created_by=account.id, @@ -592,6 +602,7 @@ class WorkflowService: marked_comment=marked_comment, rag_pipeline_variables=draft_workflow.rag_pipeline_variables, features=draft_workflow.features, + kind=WorkflowKind.EVALUATION.value, ) session.add(workflow) @@ -606,16 +617,16 @@ class WorkflowService: *, session: Session, app_model: App, - target_type: WorkflowType, + target_type: WorkflowKind, account: Account, ) -> Workflow: """ - Convert a published workflow type in-place. + Convert a published workflow business kind in-place. This endpoint only supports conversion between standard workflow and evaluation workflow. """ - if target_type not in {WorkflowType.WORKFLOW, WorkflowType.EVALUATION}: - raise ValueError("target_type must be either 'workflow' or 'evaluation'") + if target_type not in {WorkflowKind.STANDARD, WorkflowKind.EVALUATION}: + raise ValueError("target_type must be either 'standard' or 'evaluation'") if not app_model.workflow_id: raise WorkflowNotFoundError("Published workflow not found") @@ -632,13 +643,13 @@ class WorkflowService: if workflow.version == Workflow.VERSION_DRAFT: raise IsDraftWorkflowError("Current effective workflow cannot be a draft version.") - if workflow.type == target_type: + if workflow.resolved_kind == target_type: return workflow - if target_type == WorkflowType.EVALUATION: + if target_type == WorkflowKind.EVALUATION: self._validate_evaluation_workflow_nodes(workflow) - workflow.type = target_type + workflow.kind = target_type workflow.updated_by = account.id workflow.updated_at = naive_utc_now() @@ -1817,7 +1828,7 @@ def _setup_variable_pool( } # Only add chatflow-specific variables for chat-like workflow types. - if workflow.type not in {WorkflowType.WORKFLOW, WorkflowType.EVALUATION, WorkflowType.SNIPPET}: + if workflow.type != WorkflowType.WORKFLOW: system_variable_values.update( { "query": query,