use workflow_kind as business type (snippet/evaluation), not workflow_type.

This commit is contained in:
FFXN
2026-04-20 13:55:26 +08:00
parent b60533dcc3
commit 2b5dbbc7ff
11 changed files with 122 additions and 31 deletions

View File

@@ -34,6 +34,7 @@ from libs.helper import build_icon_url
from libs.login import current_account_with_tenant, login_required
from models import App, DatasetPermissionEnum, Workflow
from models.model import IconType
from models.workflow import resolve_workflow_kind
from services.app_dsl_service import AppDslService
from services.app_service import AppService
from services.enterprise.enterprise_service import EnterpriseService
@@ -330,6 +331,7 @@ class AppPartial(ResponseModel):
author_name: str | None = None
has_draft_trigger: bool | None = None
workflow_type: str | None = None
workflow_kind: str | None = None
@computed_field(return_type=str | None) # type: ignore
@property
@@ -365,6 +367,7 @@ class AppDetail(ResponseModel):
updated_at: int | None = None
access_mode: str | None = None
workflow_type: str | None = None
workflow_kind: str | None = None
tags: list[Tag] = Field(default_factory=list)
@field_validator("created_at", "updated_at", mode="before")
@@ -508,15 +511,23 @@ class AppListApi(Resource):
app.has_draft_trigger = str(app.id) in draft_trigger_app_ids
workflow_ids = [str(app.workflow_id) for app in app_pagination.items if app.workflow_id]
workflow_type_map: dict[str, str] = {}
workflow_info_map: dict[str, tuple[str, str]] = {}
if workflow_ids:
rows = db.session.execute(
select(Workflow.id, Workflow.type).where(Workflow.id.in_(workflow_ids))
select(Workflow.id, Workflow.type, Workflow.kind).where(Workflow.id.in_(workflow_ids))
).all()
workflow_type_map = {str(row.id): row.type for row in rows}
workflow_info_map = {
str(row.id): (
row.type.value if hasattr(row.type, "value") else str(row.type),
resolve_workflow_kind(row.kind).value,
)
for row in rows
}
for app in app_pagination.items:
app.workflow_type = workflow_type_map.get(str(app.workflow_id)) if app.workflow_id else None
workflow_info = workflow_info_map.get(str(app.workflow_id)) if app.workflow_id else None
app.workflow_type = workflow_info[0] if workflow_info else None
app.workflow_kind = workflow_info[1] if workflow_info else None
pagination_model = AppPagination.model_validate(app_pagination, from_attributes=True)
return pagination_model.model_dump(mode="json"), 200
@@ -566,11 +577,15 @@ class AppApi(Resource):
if app_model.workflow_id:
row = db.session.execute(
select(Workflow.type).where(Workflow.id == app_model.workflow_id)
).scalar()
app_model.workflow_type = row if row else None
select(Workflow.type, Workflow.kind).where(Workflow.id == app_model.workflow_id)
).first()
app_model.workflow_type = (
(row.type.value if hasattr(row.type, "value") else str(row.type)) if row else None
)
app_model.workflow_kind = resolve_workflow_kind(row.kind).value if row else None
else:
app_model.workflow_type = None
app_model.workflow_kind = None
response_model = AppDetailWithSite.model_validate(app_model, from_attributes=True)
return response_model.model_dump(mode="json")

View File

@@ -48,7 +48,7 @@ from libs.helper import TimestampField, uuid_value
from libs.login import current_account_with_tenant, login_required
from models import App
from models.model import AppMode
from models.workflow import Workflow, WorkflowType
from models.workflow import Workflow, WorkflowKind
from repositories.workflow_collaboration_repository import WORKFLOW_ONLINE_USERS_PREFIX
from services.app_generate_service import AppGenerateService
from services.errors.app import IsDraftWorkflowError, WorkflowHashNotEqualError, WorkflowNotFoundError
@@ -1140,7 +1140,7 @@ class WorkflowTypeConvertApi(Resource):
def post(self, app_model: App):
current_user, _ = current_account_with_tenant()
args = WorkflowTypeConvertQuery.model_validate(request.args.to_dict(flat=True)) # type: ignore
target_type = WorkflowType.value_of(args.target_type)
target_type = WorkflowKind.EVALUATION if args.target_type == "evaluation" else WorkflowKind.STANDARD
workflow_service = WorkflowService()
with Session(db.engine) as session:
@@ -1164,6 +1164,7 @@ class WorkflowTypeConvertApi(Resource):
"result": "success",
"workflow_id": workflow.id,
"type": workflow.type.value,
"kind": workflow.kind_or_standard,
"updated_at": TimestampField().format(workflow.updated_at or workflow.created_at),
}

View File

@@ -152,6 +152,7 @@ available_evaluation_workflow_list_fields = {
"app_id": fields.String,
"app_name": fields.String,
"type": fields.String,
"kind": fields.String,
"version": fields.String,
"marked_name": fields.String,
"marked_comment": fields.String,
@@ -743,6 +744,7 @@ class AvailableEvaluationWorkflowsApi(Resource):
"app_id": wf.app_id,
"app_name": app_names.get(wf.app_id, ""),
"type": wf.type.value,
"kind": wf.kind_or_standard,
"version": wf.version,
"marked_name": wf.marked_name,
"marked_comment": wf.marked_comment,

View File

@@ -57,7 +57,7 @@ class WorkflowAppGenerator(BaseAppGenerator):
@staticmethod
def _ensure_snippet_start_node_in_worker(*, session: Session, workflow: Workflow) -> Workflow:
"""Re-apply snippet virtual Start injection after worker reloads workflow from DB."""
if workflow.type != "snippet":
if workflow.kind_or_standard != "snippet":
return workflow
from models.snippet import CustomizedSnippet

View File

@@ -68,6 +68,7 @@ pipeline_variable_fields = {
workflow_fields = {
"id": fields.String,
"kind": fields.String(attribute="kind_or_standard"),
"graph": fields.Raw(attribute="graph_dict"),
"features": fields.Raw(attribute="features_dict"),
"hash": fields.String(attribute="unique_hash"),

View File

@@ -118,6 +118,7 @@ from .workflow import (
WorkflowAppLog,
WorkflowAppLogCreatedFrom,
WorkflowArchiveLog,
WorkflowKind,
WorkflowNodeExecutionModel,
WorkflowNodeExecutionOffload,
WorkflowNodeExecutionTriggeredFrom,
@@ -240,5 +241,6 @@ __all__ = [
"WorkflowSchedulePlan",
"WorkflowToolProvider",
"WorkflowTriggerStatus",
"WorkflowKind",
"WorkflowType",
]

View File

@@ -135,6 +135,34 @@ class WorkflowType(StrEnum):
return cls.WORKFLOW if app_mode == AppMode.WORKFLOW else cls.CHAT
class WorkflowKind(StrEnum):
"""
Workflow business kind.
Runtime execution type and product business kind are intentionally separated:
- ``Workflow.type`` is consumed by the graph runtime layer.
- ``Workflow.kind`` is consumed by product logic (snippet/evaluation/standard).
"""
STANDARD = "standard"
SNIPPET = "snippet"
EVALUATION = "evaluation"
@classmethod
def value_of(cls, value: str) -> "WorkflowKind":
for kind in cls:
if kind.value == value:
return kind
raise ValueError(f"invalid workflow kind value {value}")
def resolve_workflow_kind(kind: "WorkflowKind | str | None") -> WorkflowKind:
"""Resolve workflow business kind, defaulting empty values to ``standard``."""
if kind:
return kind if isinstance(kind, WorkflowKind) else WorkflowKind.value_of(kind)
return WorkflowKind.STANDARD
class _InvalidGraphDefinitionError(Exception):
pass
@@ -148,12 +176,14 @@ class Workflow(Base): # bug
- id (uuid) Workflow ID, pk
- tenant_id (uuid) Workspace ID
- app_id (uuid) App ID
- type (string) Workflow type
- type (string) Runtime workflow type
`workflow` for `Workflow App`
`chat` for `Chat App workflow mode`
- kind (string) Business workflow kind (`standard`/`snippet`/`evaluation`)
- version (string) Version
`draft` for draft version (only one for each app), other for version number (redundant)
@@ -182,6 +212,12 @@ class Workflow(Base): # bug
tenant_id: Mapped[str] = mapped_column(StringUUID, nullable=False)
app_id: Mapped[str] = mapped_column(StringUUID, nullable=False)
type: Mapped[WorkflowType] = mapped_column(EnumText(WorkflowType, length=255), nullable=False)
kind: Mapped[WorkflowKind | None] = mapped_column(
EnumText(WorkflowKind, length=255),
nullable=True,
default=WorkflowKind.STANDARD,
server_default=sa.text("'standard'"),
)
version: Mapped[str] = mapped_column(String(255), nullable=False)
marked_name: Mapped[str] = mapped_column(String(255), default="", server_default="")
marked_comment: Mapped[str] = mapped_column(String(255), default="", server_default="")
@@ -221,6 +257,7 @@ class Workflow(Base): # bug
environment_variables: Sequence[VariableBase],
conversation_variables: Sequence[VariableBase],
rag_pipeline_variables: list[dict],
kind: str | None = WorkflowKind.STANDARD.value,
marked_name: str = "",
marked_comment: str = "",
) -> "Workflow":
@@ -229,6 +266,7 @@ class Workflow(Base): # bug
workflow.tenant_id = tenant_id
workflow.app_id = app_id
workflow.type = WorkflowType(type)
workflow.kind = WorkflowKind(kind) if kind else WorkflowKind.STANDARD
workflow.version = version
workflow.graph = graph
workflow.features = features
@@ -250,6 +288,14 @@ class Workflow(Base): # bug
def updated_by_account(self):
return db.session.get(Account, self.updated_by) if self.updated_by else None
@property
def kind_or_standard(self) -> str:
return self.resolved_kind.value
@property
def resolved_kind(self) -> WorkflowKind:
return resolve_workflow_kind(self.kind)
@property
def graph_dict(self) -> Mapping[str, Any]:
# TODO(QuantumGhost): Consider caching `graph_dict` to avoid repeated JSON decoding.

View File

@@ -16,6 +16,7 @@ from models.enums import WorkflowRunTriggeredFrom
from models.snippet import CustomizedSnippet, SnippetType
from models.workflow import (
Workflow,
WorkflowKind,
WorkflowNodeExecutionModel,
WorkflowRun,
WorkflowType,
@@ -47,6 +48,11 @@ class SnippetService:
)
self._workflow_run_repo = DifyAPIRepositoryFactory.create_api_workflow_run_repository(session_maker)
@staticmethod
def _snippet_kind_filter():
"""Match snippet workflows by business kind."""
return Workflow.kind == WorkflowKind.SNIPPET.value
@staticmethod
def validate_snippet_graph_forbidden_nodes(graph: Mapping[str, Any]) -> None:
"""Reject graphs that contain node types not allowed in snippets."""
@@ -271,7 +277,7 @@ class SnippetService:
.where(
Workflow.tenant_id == snippet.tenant_id,
Workflow.app_id == snippet.id,
Workflow.type == WorkflowType.SNIPPET.value,
self._snippet_kind_filter(),
Workflow.version == "draft",
)
.first()
@@ -293,7 +299,7 @@ class SnippetService:
.where(
Workflow.tenant_id == snippet.tenant_id,
Workflow.app_id == snippet.id,
Workflow.type == WorkflowType.SNIPPET.value,
self._snippet_kind_filter(),
Workflow.id == snippet.workflow_id,
)
.first()
@@ -336,7 +342,8 @@ class SnippetService:
tenant_id=snippet.tenant_id,
app_id=snippet.id,
features="{}",
type=WorkflowType.SNIPPET.value,
type=WorkflowType.WORKFLOW.value,
kind=WorkflowKind.SNIPPET.value,
version="draft",
graph=json.dumps(graph),
created_by=account.id,
@@ -348,6 +355,8 @@ class SnippetService:
else:
# Update existing draft workflow
workflow.graph = json.dumps(graph)
workflow.type = WorkflowType.WORKFLOW.value
workflow.kind = WorkflowKind.SNIPPET
workflow.updated_by = account.id
workflow.updated_at = datetime.now(UTC).replace(tzinfo=None)
workflow.environment_variables = []
@@ -381,7 +390,7 @@ class SnippetService:
draft_workflow_stmt = select(Workflow).where(
Workflow.tenant_id == snippet.tenant_id,
Workflow.app_id == snippet.id,
Workflow.type == WorkflowType.SNIPPET.value,
self._snippet_kind_filter(),
Workflow.version == "draft",
)
draft_workflow = session.scalar(draft_workflow_stmt)
@@ -394,7 +403,7 @@ class SnippetService:
workflow = Workflow.new(
tenant_id=snippet.tenant_id,
app_id=snippet.id,
type=draft_workflow.type,
type=WorkflowType.WORKFLOW.value,
version=str(datetime.now(UTC).replace(tzinfo=None)),
graph=draft_workflow.graph,
features=draft_workflow.features,
@@ -402,6 +411,7 @@ class SnippetService:
environment_variables=[],
conversation_variables=[],
rag_pipeline_variables=draft_workflow.rag_pipeline_variables,
kind=WorkflowKind.SNIPPET.value,
marked_name="",
marked_comment="",
)
@@ -440,7 +450,7 @@ class SnippetService:
select(Workflow)
.where(
Workflow.app_id == snippet.id,
Workflow.type == WorkflowType.SNIPPET.value,
self._snippet_kind_filter(),
Workflow.version != "draft",
)
.order_by(Workflow.version.desc())

View File

@@ -27,7 +27,7 @@ from graphon.variables.input_entities import VariableEntity
from models import Account
from models.api_based_extension import APIBasedExtension, APIBasedExtensionPoint
from models.model import App, AppMode, AppModelConfig, IconType
from models.workflow import Workflow, WorkflowType
from models.workflow import Workflow, WorkflowKind, WorkflowType
class _NodeType(TypedDict):
@@ -208,6 +208,7 @@ class WorkflowConverter:
tenant_id=app_model.tenant_id,
app_id=app_model.id,
type=WorkflowType.from_app_mode(new_app_mode).value,
kind=WorkflowKind.STANDARD.value,
version=Workflow.VERSION_DRAFT,
graph=json.dumps(graph),
features=json.dumps(features),

View File

@@ -41,6 +41,7 @@ def apply_published_workflow_snapshot_to_draft(
tenant_id=tenant_id,
app_id=app_id,
type=workflow_type,
kind=source_workflow.kind_or_standard,
version=Workflow.VERSION_DRAFT,
graph=source_workflow.graph,
features=source_workflow.serialized_features,
@@ -51,6 +52,7 @@ def apply_published_workflow_snapshot_to_draft(
draft_workflow.graph = source_workflow.graph
draft_workflow.features = source_workflow.serialized_features
draft_workflow.kind = source_workflow.resolved_kind
draft_workflow.updated_by = account.id
draft_workflow.updated_at = updated_at_factory()
draft_workflow.copy_serialized_variable_storage_from(source_workflow)

View File

@@ -5,7 +5,7 @@ import uuid
from collections.abc import Callable, Generator, Mapping, Sequence
from typing import Any, cast
from sqlalchemy import exists, select
from sqlalchemy import and_, exists, or_, select
from sqlalchemy.orm import Session, sessionmaker
from configs import dify_config
@@ -70,7 +70,13 @@ from models import Account
from models.human_input import HumanInputFormRecipient, RecipientType
from models.model import App, AppMode
from models.tools import WorkflowToolProvider
from models.workflow import Workflow, WorkflowNodeExecutionModel, WorkflowNodeExecutionTriggeredFrom, WorkflowType
from models.workflow import (
Workflow,
WorkflowKind,
WorkflowNodeExecutionModel,
WorkflowNodeExecutionTriggeredFrom,
WorkflowType,
)
from repositories.factory import DifyAPIRepositoryFactory
from services.billing_service import BillingService
from services.errors.app import (
@@ -276,7 +282,7 @@ class WorkflowService:
"""
stmt = select(Workflow).where(
Workflow.tenant_id == tenant_id,
Workflow.type == WorkflowType.EVALUATION,
Workflow.kind == WorkflowKind.EVALUATION.value,
Workflow.version != Workflow.VERSION_DRAFT,
)
@@ -343,6 +349,7 @@ class WorkflowService:
tenant_id=app_model.tenant_id,
app_id=app_model.id,
type=WorkflowType.from_app_mode(app_model.mode).value,
kind=WorkflowKind.STANDARD.value,
version=Workflow.VERSION_DRAFT,
graph=json.dumps(graph),
features=json.dumps(features),
@@ -359,6 +366,8 @@ class WorkflowService:
workflow.updated_at = naive_utc_now()
workflow.environment_variables = environment_variables
workflow.conversation_variables = conversation_variables
if workflow.resolved_kind == WorkflowKind.STANDARD:
workflow.kind = WorkflowKind.STANDARD
# commit db session changes
db.session.commit()
@@ -534,6 +543,7 @@ class WorkflowService:
marked_comment=marked_comment,
rag_pipeline_variables=draft_workflow.rag_pipeline_variables,
features=draft_workflow.features,
kind=draft_workflow.kind_or_standard,
)
# commit db session changes
@@ -557,7 +567,7 @@ class WorkflowService:
"""Publish draft workflow as an evaluation workflow version.
Compared to standard publish:
- force published workflow type to ``evaluation``;
- set business kind to ``evaluation``;
- reject graphs containing trigger or human-input nodes.
"""
draft_workflow_stmt = select(Workflow).where(
@@ -582,7 +592,7 @@ class WorkflowService:
workflow = Workflow.new(
tenant_id=app_model.tenant_id,
app_id=app_model.id,
type=WorkflowType.EVALUATION.value,
type=draft_workflow.type.value,
version=Workflow.version_from_datetime(naive_utc_now()),
graph=draft_workflow.graph,
created_by=account.id,
@@ -592,6 +602,7 @@ class WorkflowService:
marked_comment=marked_comment,
rag_pipeline_variables=draft_workflow.rag_pipeline_variables,
features=draft_workflow.features,
kind=WorkflowKind.EVALUATION.value,
)
session.add(workflow)
@@ -606,16 +617,16 @@ class WorkflowService:
*,
session: Session,
app_model: App,
target_type: WorkflowType,
target_type: WorkflowKind,
account: Account,
) -> Workflow:
"""
Convert a published workflow type in-place.
Convert a published workflow business kind in-place.
This endpoint only supports conversion between standard workflow and evaluation workflow.
"""
if target_type not in {WorkflowType.WORKFLOW, WorkflowType.EVALUATION}:
raise ValueError("target_type must be either 'workflow' or 'evaluation'")
if target_type not in {WorkflowKind.STANDARD, WorkflowKind.EVALUATION}:
raise ValueError("target_type must be either 'standard' or 'evaluation'")
if not app_model.workflow_id:
raise WorkflowNotFoundError("Published workflow not found")
@@ -632,13 +643,13 @@ class WorkflowService:
if workflow.version == Workflow.VERSION_DRAFT:
raise IsDraftWorkflowError("Current effective workflow cannot be a draft version.")
if workflow.type == target_type:
if workflow.resolved_kind == target_type:
return workflow
if target_type == WorkflowType.EVALUATION:
if target_type == WorkflowKind.EVALUATION:
self._validate_evaluation_workflow_nodes(workflow)
workflow.type = target_type
workflow.kind = target_type
workflow.updated_by = account.id
workflow.updated_at = naive_utc_now()
@@ -1817,7 +1828,7 @@ def _setup_variable_pool(
}
# Only add chatflow-specific variables for chat-like workflow types.
if workflow.type not in {WorkflowType.WORKFLOW, WorkflowType.EVALUATION, WorkflowType.SNIPPET}:
if workflow.type != WorkflowType.WORKFLOW:
system_variable_values.update(
{
"query": query,