refactor(api): type workflow run related counts with RelatedCountsDict TypedDict (#34530)
Some checks failed
autofix.ci / autofix (push) Has been cancelled
Build and Push API & Web / build (api, {{defaultContext}}:api, Dockerfile, DIFY_API_IMAGE_NAME, linux/amd64, ubuntu-latest, build-api-amd64) (push) Has been cancelled
Build and Push API & Web / build (api, {{defaultContext}}:api, Dockerfile, DIFY_API_IMAGE_NAME, linux/arm64, ubuntu-24.04-arm, build-api-arm64) (push) Has been cancelled
Build and Push API & Web / build (web, {{defaultContext}}, web/Dockerfile, DIFY_WEB_IMAGE_NAME, linux/amd64, ubuntu-latest, build-web-amd64) (push) Has been cancelled
Build and Push API & Web / build (web, {{defaultContext}}, web/Dockerfile, DIFY_WEB_IMAGE_NAME, linux/arm64, ubuntu-24.04-arm, build-web-arm64) (push) Has been cancelled
Build and Push API & Web / create-manifest (api, DIFY_API_IMAGE_NAME, merge-api-images) (push) Has been cancelled
Build and Push API & Web / create-manifest (web, DIFY_WEB_IMAGE_NAME, merge-web-images) (push) Has been cancelled
Main CI Pipeline / Skip Duplicate Checks (push) Has been cancelled
Main CI Pipeline / Check Changed Files (push) Has been cancelled
Main CI Pipeline / Run API Tests (push) Has been cancelled
Main CI Pipeline / Skip API Tests (push) Has been cancelled
Main CI Pipeline / API Tests (push) Has been cancelled
Main CI Pipeline / Run Web Tests (push) Has been cancelled
Main CI Pipeline / Skip Web Tests (push) Has been cancelled
Main CI Pipeline / Web Tests (push) Has been cancelled
Main CI Pipeline / Run Web Full-Stack E2E (push) Has been cancelled
Main CI Pipeline / Skip Web Full-Stack E2E (push) Has been cancelled
Main CI Pipeline / Web Full-Stack E2E (push) Has been cancelled
Main CI Pipeline / Style Check (push) Has been cancelled
Main CI Pipeline / Run VDB Tests (push) Has been cancelled
Main CI Pipeline / Skip VDB Tests (push) Has been cancelled
Main CI Pipeline / VDB Tests (push) Has been cancelled
Main CI Pipeline / Run DB Migration Test (push) Has been cancelled
Main CI Pipeline / Skip DB Migration Test (push) Has been cancelled
Main CI Pipeline / DB Migration Test (push) Has been cancelled

This commit is contained in:
YBoy
2026-04-06 15:17:01 +02:00
committed by GitHub
parent 5b862a43e0
commit 5ad906ea6a

View File

@@ -3,7 +3,7 @@ import logging
import random
import time
from collections.abc import Iterable, Sequence
from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, TypedDict
import click
from sqlalchemy.orm import Session, sessionmaker
@@ -12,7 +12,7 @@ from configs import dify_config
from enums.cloud_plan import CloudPlan
from extensions.ext_database import db
from models.workflow import WorkflowRun
from repositories.api_workflow_run_repository import APIWorkflowRunRepository
from repositories.api_workflow_run_repository import APIWorkflowRunRepository, RunsWithRelatedCountsDict
from repositories.factory import DifyAPIRepositoryFactory
from repositories.sqlalchemy_workflow_trigger_log_repository import SQLAlchemyWorkflowTriggerLogRepository
from services.billing_service import BillingService, SubscriptionPlan
@@ -24,6 +24,15 @@ if TYPE_CHECKING:
from opentelemetry.metrics import Counter, Histogram
class RelatedCountsDict(TypedDict):
node_executions: int
offloads: int
app_logs: int
trigger_logs: int
pauses: int
pause_reasons: int
class WorkflowRunCleanupMetrics:
"""
Records low-cardinality OpenTelemetry metrics for workflow run cleanup jobs.
@@ -233,7 +242,7 @@ class WorkflowRunCleanup:
total_runs_deleted = 0
total_runs_targeted = 0
related_totals = self._empty_related_counts() if self.dry_run else None
related_totals: RelatedCountsDict | None = self._empty_related_counts() if self.dry_run else None
batch_index = 0
last_seen: tuple[datetime.datetime, str] | None = None
status = "success"
@@ -315,8 +324,7 @@ class WorkflowRunCleanup:
int((time.monotonic() - count_start) * 1000),
)
if related_totals is not None:
for k in _RELATED_RECORD_KEYS:
related_totals[k] += batch_counts.get(k, 0) # type: ignore[literal-required,operator]
self._accumulate_related_counts(related_totals, batch_counts)
sample_ids = ", ".join(run.id for run in free_runs[:5])
click.echo(
click.style(
@@ -515,7 +523,7 @@ class WorkflowRunCleanup:
return trigger_repo.count_by_run_ids(run_ids)
@staticmethod
def _empty_related_counts() -> dict[str, int]:
def _empty_related_counts() -> RelatedCountsDict:
return {
"node_executions": 0,
"offloads": 0,
@@ -526,7 +534,7 @@ class WorkflowRunCleanup:
}
@staticmethod
def _format_related_counts(counts: dict[str, int]) -> str:
def _format_related_counts(counts: RelatedCountsDict) -> str:
return (
f"node_executions {counts['node_executions']}, "
f"offloads {counts['offloads']}, "
@@ -536,6 +544,15 @@ class WorkflowRunCleanup:
f"pause_reasons {counts['pause_reasons']}"
)
@staticmethod
def _accumulate_related_counts(totals: RelatedCountsDict, batch: RunsWithRelatedCountsDict) -> None:
totals["node_executions"] += batch.get("node_executions", 0)
totals["offloads"] += batch.get("offloads", 0)
totals["app_logs"] += batch.get("app_logs", 0)
totals["trigger_logs"] += batch.get("trigger_logs", 0)
totals["pauses"] += batch.get("pauses", 0)
totals["pause_reasons"] += batch.get("pause_reasons", 0)
def _count_node_executions(self, session: Session, runs: Sequence[WorkflowRun]) -> tuple[int, int]:
run_ids = [run.id for run in runs]
repo = DifyAPIRepositoryFactory.create_api_workflow_node_execution_repository(