feat: knowledgebase summary index (#31600)
Some checks failed
Build and Push API & Web / build (api, DIFY_API_IMAGE_NAME, linux/amd64, build-api-amd64) (push) Has been cancelled
Build and Push API & Web / build (api, DIFY_API_IMAGE_NAME, linux/arm64, build-api-arm64) (push) Has been cancelled
Build and Push API & Web / build (web, DIFY_WEB_IMAGE_NAME, linux/amd64, build-web-amd64) (push) Has been cancelled
Build and Push API & Web / build (web, DIFY_WEB_IMAGE_NAME, linux/arm64, build-web-arm64) (push) Has been cancelled
Build and Push API & Web / create-manifest (api, DIFY_API_IMAGE_NAME, merge-api-images) (push) Has been cancelled
Build and Push API & Web / create-manifest (web, DIFY_WEB_IMAGE_NAME, merge-web-images) (push) Has been cancelled

This commit is contained in:
FFXN
2026-01-27 16:12:57 +08:00
committed by GitHub
5 changed files with 69 additions and 14 deletions

View File

@@ -12,6 +12,7 @@ from core.entities.knowledge_entities import PreviewDetail
from core.file import File, FileTransferMethod, FileType, file_manager
from core.llm_generator.prompts import DEFAULT_GENERATOR_SUMMARY_PROMPT
from core.model_manager import ModelInstance
from core.model_runtime.entities.llm_entities import LLMUsage
from core.model_runtime.entities.message_entities import (
ImagePromptMessageContent,
PromptMessageContentUnionTypes,
@@ -295,11 +296,11 @@ class ParagraphIndexProcessor(BaseIndexProcessor):
if flask_app:
# Ensure Flask app context in worker thread
with flask_app.app_context():
summary = self.generate_summary(tenant_id, preview.content, summary_index_setting)
summary, _ = self.generate_summary(tenant_id, preview.content, summary_index_setting)
preview.summary = summary
else:
# Fallback: try without app context (may fail)
summary = self.generate_summary(tenant_id, preview.content, summary_index_setting)
summary, _ = self.generate_summary(tenant_id, preview.content, summary_index_setting)
preview.summary = summary
# Generate summaries concurrently using ThreadPoolExecutor
@@ -356,7 +357,7 @@ class ParagraphIndexProcessor(BaseIndexProcessor):
text: str,
summary_index_setting: dict | None = None,
segment_id: str | None = None,
) -> str:
) -> tuple[str, LLMUsage]:
"""
Generate summary for the given text using ModelInstance.invoke_llm and the default or custom summary prompt,
and supports vision models by including images from the segment attachments or text content.
@@ -366,6 +367,9 @@ class ParagraphIndexProcessor(BaseIndexProcessor):
text: Text content to summarize
summary_index_setting: Summary index configuration
segment_id: Optional segment ID to fetch attachments from SegmentAttachmentBinding table
Returns:
Tuple of (summary_content, llm_usage) where llm_usage is LLMUsage object
"""
if not summary_index_setting or not summary_index_setting.get("enable"):
raise ValueError("summary_index_setting is required and must be enabled to generate summary.")
@@ -432,7 +436,19 @@ class ParagraphIndexProcessor(BaseIndexProcessor):
result = model_instance.invoke_llm(prompt_messages=prompt_messages, model_parameters={}, stream=False)
return getattr(result.message, "content", "")
summary_content = getattr(result.message, "content", "")
usage = result.usage
# Deduct quota for summary generation (same as workflow nodes)
from core.workflow.nodes.llm import llm_utils
try:
llm_utils.deduct_llm_quota(tenant_id=tenant_id, model_instance=model_instance, usage=usage)
except Exception as e:
# Log but don't fail summary generation if quota deduction fails
logger.warning("Failed to deduct quota for summary generation: %s", str(e))
return summary_content, usage
@staticmethod
def _extract_images_from_text(tenant_id: str, text: str) -> list[File]:

View File

@@ -382,7 +382,7 @@ class ParentChildIndexProcessor(BaseIndexProcessor):
if flask_app:
# Ensure Flask app context in worker thread
with flask_app.app_context():
summary = ParagraphIndexProcessor.generate_summary(
summary, _ = ParagraphIndexProcessor.generate_summary(
tenant_id=tenant_id,
text=preview.content,
summary_index_setting=summary_index_setting,
@@ -390,7 +390,7 @@ class ParentChildIndexProcessor(BaseIndexProcessor):
preview.summary = summary
else:
# Fallback: try without app context (may fail)
summary = ParagraphIndexProcessor.generate_summary(
summary, _ = ParagraphIndexProcessor.generate_summary(
tenant_id=tenant_id,
text=preview.content,
summary_index_setting=summary_index_setting,

View File

@@ -364,7 +364,7 @@ class KnowledgeIndexNode(Node[KnowledgeIndexNodeData]):
# Set Flask application context in worker thread
if flask_app:
with flask_app.app_context():
summary = ParagraphIndexProcessor.generate_summary(
summary, _ = ParagraphIndexProcessor.generate_summary(
tenant_id=dataset.tenant_id,
text=preview_item["content"],
summary_index_setting=summary_index_setting,
@@ -373,7 +373,7 @@ class KnowledgeIndexNode(Node[KnowledgeIndexNodeData]):
preview_item["summary"] = summary
else:
# Fallback: try without app context (may fail)
summary = ParagraphIndexProcessor.generate_summary(
summary, _ = ParagraphIndexProcessor.generate_summary(
tenant_id=dataset.tenant_id,
text=preview_item["content"],
summary_index_setting=summary_index_setting,

View File

@@ -1597,6 +1597,7 @@ class DocumentSegmentSummary(Base):
summary_content: Mapped[str] = mapped_column(LongText, nullable=True)
summary_index_node_id: Mapped[str] = mapped_column(String(255), nullable=True)
summary_index_node_hash: Mapped[str] = mapped_column(String(255), nullable=True)
tokens: Mapped[int | None] = mapped_column(sa.Integer, nullable=True)
status: Mapped[str] = mapped_column(String(32), nullable=False, server_default=sa.text("'generating'"))
error: Mapped[str] = mapped_column(LongText, nullable=True)
enabled: Mapped[bool] = mapped_column(sa.Boolean, nullable=False, server_default=sa.text("true"))

View File

@@ -5,6 +5,9 @@ import time
import uuid
from datetime import UTC, datetime
from core.model_manager import ModelManager
from core.model_runtime.entities.llm_entities import LLMUsage
from core.model_runtime.entities.model_entities import ModelType
from core.rag.datasource.vdb.vector_factory import Vector
from core.rag.index_processor.constant.doc_type import DocType
from core.rag.models.document import Document
@@ -24,7 +27,7 @@ class SummaryIndexService:
segment: DocumentSegment,
dataset: Dataset,
summary_index_setting: dict,
) -> str:
) -> tuple[str, LLMUsage]:
"""
Generate summary for a single segment.
@@ -34,7 +37,7 @@ class SummaryIndexService:
summary_index_setting: Summary index configuration
Returns:
Generated summary text
Tuple of (summary_content, llm_usage) where llm_usage is LLMUsage object
Raises:
ValueError: If summary_index_setting is invalid or generation fails
@@ -43,7 +46,7 @@ class SummaryIndexService:
# Use lazy import to avoid circular import
from core.rag.index_processor.processor.paragraph_index_processor import ParagraphIndexProcessor
summary_content = ParagraphIndexProcessor.generate_summary(
summary_content, usage = ParagraphIndexProcessor.generate_summary(
tenant_id=dataset.tenant_id,
text=segment.content,
summary_index_setting=summary_index_setting,
@@ -53,7 +56,7 @@ class SummaryIndexService:
if not summary_content:
raise ValueError("Generated summary is empty")
return summary_content
return summary_content, usage
@staticmethod
def create_summary_record(
@@ -153,6 +156,22 @@ class SummaryIndexService:
str(e),
)
# Calculate embedding tokens for summary (for logging and statistics)
embedding_tokens = 0
try:
model_manager = ModelManager()
embedding_model = model_manager.get_model_instance(
tenant_id=dataset.tenant_id,
provider=dataset.embedding_model_provider,
model_type=ModelType.TEXT_EMBEDDING,
model=dataset.embedding_model,
)
if embedding_model:
tokens_list = embedding_model.get_text_embedding_num_tokens([summary_record.summary_content])
embedding_tokens = tokens_list[0] if tokens_list else 0
except Exception as e:
logger.warning("Failed to calculate embedding tokens for summary: %s", str(e))
# Create document with summary content and metadata
summary_document = Document(
page_content=summary_record.summary_content,
@@ -179,9 +198,18 @@ class SummaryIndexService:
# we still want to re-vectorize (upsert will overwrite)
vector.add_texts([summary_document], duplicate_check=False)
# Log embedding token usage
if embedding_tokens > 0:
logger.info(
"Summary embedding for segment %s used %s tokens",
segment.id,
embedding_tokens,
)
# Success - update summary record with index node info
summary_record.summary_index_node_id = summary_index_node_id
summary_record.summary_index_node_hash = summary_hash
summary_record.tokens = embedding_tokens # Save embedding tokens
summary_record.status = "completed"
# Explicitly update updated_at to ensure it's refreshed even if other fields haven't changed
summary_record.updated_at = datetime.now(UTC).replace(tzinfo=None)
@@ -364,14 +392,24 @@ class SummaryIndexService:
db.session.add(summary_record)
db.session.flush()
# Generate summary
summary_content = SummaryIndexService.generate_summary_for_segment(
# Generate summary (returns summary_content and llm_usage)
summary_content, llm_usage = SummaryIndexService.generate_summary_for_segment(
segment, dataset, summary_index_setting
)
# Update summary content
summary_record.summary_content = summary_content
# Log LLM usage for summary generation
if llm_usage and llm_usage.total_tokens > 0:
logger.info(
"Summary generation for segment %s used %s tokens (prompt: %s, completion: %s)",
segment.id,
llm_usage.total_tokens,
llm_usage.prompt_tokens,
llm_usage.completion_tokens,
)
# Vectorize summary (will delete old vector if exists before creating new one)
SummaryIndexService.vectorize_summary(summary_record, segment, dataset)