diff --git a/api/services/retention/workflow_run/clear_free_plan_expired_workflow_run_logs.py b/api/services/retention/workflow_run/clear_free_plan_expired_workflow_run_logs.py index 021fa61d96..58e8ac57a8 100644 --- a/api/services/retention/workflow_run/clear_free_plan_expired_workflow_run_logs.py +++ b/api/services/retention/workflow_run/clear_free_plan_expired_workflow_run_logs.py @@ -3,7 +3,7 @@ import logging import random import time from collections.abc import Iterable, Sequence -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, TypedDict import click from sqlalchemy.orm import Session, sessionmaker @@ -12,7 +12,7 @@ from configs import dify_config from enums.cloud_plan import CloudPlan from extensions.ext_database import db from models.workflow import WorkflowRun -from repositories.api_workflow_run_repository import APIWorkflowRunRepository +from repositories.api_workflow_run_repository import APIWorkflowRunRepository, RunsWithRelatedCountsDict from repositories.factory import DifyAPIRepositoryFactory from repositories.sqlalchemy_workflow_trigger_log_repository import SQLAlchemyWorkflowTriggerLogRepository from services.billing_service import BillingService, SubscriptionPlan @@ -24,6 +24,15 @@ if TYPE_CHECKING: from opentelemetry.metrics import Counter, Histogram +class RelatedCountsDict(TypedDict): + node_executions: int + offloads: int + app_logs: int + trigger_logs: int + pauses: int + pause_reasons: int + + class WorkflowRunCleanupMetrics: """ Records low-cardinality OpenTelemetry metrics for workflow run cleanup jobs. @@ -233,7 +242,7 @@ class WorkflowRunCleanup: total_runs_deleted = 0 total_runs_targeted = 0 - related_totals = self._empty_related_counts() if self.dry_run else None + related_totals: RelatedCountsDict | None = self._empty_related_counts() if self.dry_run else None batch_index = 0 last_seen: tuple[datetime.datetime, str] | None = None status = "success" @@ -315,8 +324,7 @@ class WorkflowRunCleanup: int((time.monotonic() - count_start) * 1000), ) if related_totals is not None: - for k in _RELATED_RECORD_KEYS: - related_totals[k] += batch_counts.get(k, 0) # type: ignore[literal-required,operator] + self._accumulate_related_counts(related_totals, batch_counts) sample_ids = ", ".join(run.id for run in free_runs[:5]) click.echo( click.style( @@ -515,7 +523,7 @@ class WorkflowRunCleanup: return trigger_repo.count_by_run_ids(run_ids) @staticmethod - def _empty_related_counts() -> dict[str, int]: + def _empty_related_counts() -> RelatedCountsDict: return { "node_executions": 0, "offloads": 0, @@ -526,7 +534,7 @@ class WorkflowRunCleanup: } @staticmethod - def _format_related_counts(counts: dict[str, int]) -> str: + def _format_related_counts(counts: RelatedCountsDict) -> str: return ( f"node_executions {counts['node_executions']}, " f"offloads {counts['offloads']}, " @@ -536,6 +544,15 @@ class WorkflowRunCleanup: f"pause_reasons {counts['pause_reasons']}" ) + @staticmethod + def _accumulate_related_counts(totals: RelatedCountsDict, batch: RunsWithRelatedCountsDict) -> None: + totals["node_executions"] += batch.get("node_executions", 0) + totals["offloads"] += batch.get("offloads", 0) + totals["app_logs"] += batch.get("app_logs", 0) + totals["trigger_logs"] += batch.get("trigger_logs", 0) + totals["pauses"] += batch.get("pauses", 0) + totals["pause_reasons"] += batch.get("pause_reasons", 0) + def _count_node_executions(self, session: Session, runs: Sequence[WorkflowRun]) -> tuple[int, int]: run_ids = [run.id for run in runs] repo = DifyAPIRepositoryFactory.create_api_workflow_node_execution_repository(