||
- #!/usr/bin/env python
- # -*- coding: utf-8 -*-
- """
- 实质提取Agent (SubstanceExtractionAgent)
- 功能:
- - 从视频内容中提取实质元素(具体元素、具象概念、抽象概念)
- - Step 1: 提取具体元素(只看视频画面中的实体)
- - Step 2: 提取具象概念(只看画面中的文字 + 口播内容中的文字)
- - Step 3: 总结抽象概念(基于Step 1+2)
- - Step 4: 共性分析(频次、段落覆盖率)
- - Step 5: 多维度评分(vs 灵感点/目的点/实质关键点)
- - Step 6: 筛选(基于频次+覆盖率+相似度)
- - Step 7: 分类
- - Step 8: 合并所有信息
- 参考:元素提取新方案设计文档.md
- """
- import json
- from typing import List, Dict, Any
- from concurrent.futures import ThreadPoolExecutor
- from src.components.agents.base import BaseLLMAgent
- from src.utils.logger import get_logger
- from src.utils.llm_invoker import LLMInvoker, get_video_file_from_state
- logger = get_logger(__name__)
- # 全局线程池 - 用于并行处理
- _GLOBAL_THREAD_POOL = ThreadPoolExecutor(max_workers=16, thread_name_prefix="SubstanceExtraction")
- class ScriptSubstanceExtractionAgent(BaseLLMAgent):
- """实质提取Agent - 自底向上的归纳过程
- 提取流程(视频版):
- - Step 1: 提取具体元素(只看视频画面中的实体)
- - Step 2: 提取具象概念(只看画面中的文字 + 口播内容中的文字)
- - Step 3: 总结抽象概念(基于Step 1+2)
- - Step 4: 共性分析(频次、段落覆盖率)
- - Step 5: 多维度评分(vs 灵感点/目的点/实质关键点)
- - Step 6: 筛选(基于频次+覆盖率+相似度)
- - Step 7: 分类
- - Step 8: 合并所有信息
- """
- def __init__(
- self,
- name: str = "substance_extraction_agent",
- description: str = "实质提取Agent",
- model_provider: str = "google_genai",
- temperature: float = 0.1,
- max_tokens: int = 40960
- ):
- system_prompt = self._build_system_prompt()
- super().__init__(
- name=name,
- description=description,
- model_provider=model_provider,
- system_prompt=system_prompt,
- temperature=temperature,
- max_tokens=max_tokens
- )
- def _build_system_prompt(self) -> str:
- """构建系统提示词"""
- return """你是一个专业的内容分析专家,擅长从图文内容中提取实质性元素。
- # 核心定义
- ## 实质(Substance):"是什么"
- 内容本身,独立于表达方式而存在
- ### 具体元素
- - 定义:从图像中直接观察到的单一视觉实体对象
- - 判断标准:可以指着图片说"这是一个X"
- ### 具象概念
- - 定义:画面或者口播内容中出现的名词
- - 判断标准:画面或者口播内容中实际出现,禁止语义推导
- ### 抽象概念
- - 定义:从具体元素和具象概念中理解到的上位抽象
- - 类型1:上位抽象(归类)- 是下位元素的类别、分类
- - 类型2:引申含义 - 需要理解上下文的深层含义
- ## 区分方法:"剥离测试"
- 问题:如果去掉所有表达手法/风格/技巧,这个特征还存在吗?
- - 存在 → 实质(内容本身)
- - 不存在/失去意义 → 形式(表达方式)
- """
- def process(self, state: dict) -> dict:
- """执行完整的实质提取流程(Step 1-8)"""
- logger.info("=== 开始实质元素提取(完整流程) ===")
- # 从 state 中获取视频文件(视频版本核心输入)
- video_file = get_video_file_from_state(state)
- if not video_file:
- logger.error("无法从 state 中获取视频文件,实质提取终止")
- return {
- "concrete_elements": [],
- "concrete_concepts": [],
- "implicit_concepts": [],
- "abstract_concepts": [],
- "substance_elements": [],
- "substance_analyzed_result": [],
- "substance_scored_result": {},
- "substance_filtered_ids": [],
- "substance_categorized_result": {},
- "substance_final_elements": []
- }
- # 从state中提取其他文本/上下文数据
- text_data = state.get("text", {})
- section_division = state.get("section_division", {})
-
- # 处理灵感点:支持列表和字典两种格式
- inspiration_points_raw = state.get("inspiration_points", {})
- if isinstance(inspiration_points_raw, list):
- inspiration_points = inspiration_points_raw
- elif isinstance(inspiration_points_raw, dict):
- # 兼容旧格式:{"points": [...]} 或直接是列表
- inspiration_points = inspiration_points_raw.get("points", [])
- else:
- inspiration_points = []
-
- # 兼容 purpose_point 的多种格式:
- # 1. 字典格式:{"purpose_point": {"purposes": [...], "total_count": ...}}
- # 2. 列表格式:[...](直接是目的点列表)
- purpose_point_raw = state.get("purpose_point", {})
- if isinstance(purpose_point_raw, dict):
- purpose_points = purpose_point_raw.get("purposes", [])
- elif isinstance(purpose_point_raw, list):
- purpose_points = purpose_point_raw
- else:
- purpose_points = []
-
- # 兼容 key_points 的多种格式:
- # 1. 字典格式:{"key_points": [...], "total_count": ...}
- # 2. 列表格式:[...](直接是关键点列表)
- key_points_raw = state.get("key_points", {})
- if isinstance(key_points_raw, dict):
- key_points = key_points_raw.get("key_points", [])
- elif isinstance(key_points_raw, list):
- key_points = key_points_raw
- else:
- key_points = []
- # 只保留实质类关键点
- substance_key_points = [
- kp for kp in key_points
- if isinstance(kp, dict) and kp.get("维度大类") == "实质"
- ] if key_points else []
-
- logger.info(
- f"意图支撑评估输入: 灵感点={len(inspiration_points)}, "
- f"目的点={len(purpose_points)}, 关键点(实质类)={len(substance_key_points)}"
- )
- # Step 1: 提取具体元素(画面中的实体)
- logger.info("▶ Step 1: 提取具体元素")
- concrete_elements = self._step1_extract_concrete_elements(video_file)
- # Step 2: 提取具象概念(画面中的文字 + 口播内容中的文字)
- logger.info("▶ Step 2: 提取具象概念")
- concrete_concepts = self._step2_extract_concrete_concepts(
- video_file, text_data, concrete_elements
- )
- # 隐含概念相关逻辑已移除,不再单独提取
- implicit_concepts: List[dict] = []
- # Step 3: 总结抽象概念(基于Step 1+2)
- logger.info("▶ Step 3: 总结抽象概念")
- abstract_concepts = self._step3_summarize_abstract_concepts(
- video_file, concrete_elements, concrete_concepts, implicit_concepts
- )
- # 合并所有实质元素(不再包含隐含概念)
- all_substance_elements = (
- concrete_elements + concrete_concepts + abstract_concepts
- )
- logger.info(
- "Step 1-3 完成 - 总计: %d 个元素 (具体:%d, 具象:%d, 抽象:%d)",
- len(all_substance_elements),
- len(concrete_elements),
- len(concrete_concepts),
- len(abstract_concepts),
- )
- # Step 4: 共性分析
- logger.info("▶ Step 4: 共性分析")
- analyzed_result = self._step4_commonality_analysis(
- video_file, all_substance_elements, text_data, section_division
- )
- # Step 5: 多维度评分(已废弃相似度比较逻辑,当前不再进行相似度评分)
- logger.info("▶ Step 5: 多维度评分(已停用相似度计算,仅返回空结果)")
- scored_result = self._step5_multi_dimensional_scoring(
- all_substance_elements, analyzed_result, inspiration_points, purpose_points, substance_key_points
- )
- # Step 5.1: 意图支撑评估(基于视频与文本)
- logger.info("▶ Step 5.1: 意图支撑评估")
- intention_support_result = self._step5_1_intention_support_evaluation(
- video_file,
- all_substance_elements,
- analyzed_result,
- inspiration_points,
- purpose_points,
- substance_key_points,
- text_data,
- )
- # Step 6: 筛选
- logger.info("▶ Step 6: 筛选")
- filtered_ids = self._step6_filter_elements(
- analyzed_result,
- scored_result,
- intention_support_result,
- )
- # Step 7: 分类
- logger.info("▶ Step 7: 分类")
- categorized_result = self._step7_categorize_elements(
- all_substance_elements, filtered_ids
- )
- # Step 8: 合并信息
- logger.info("▶ Step 8: 合并信息")
- final_elements = self._merge_all_info(
- all_substance_elements,
- analyzed_result,
- scored_result,
- intention_support_result,
- filtered_ids,
- categorized_result,
- )
- logger.info(f"实质元素提取完成 - 最终元素数: {len(final_elements)}")
- # 返回所有结果
- return {
- # Step 1-3 原始提取结果
- "concrete_elements": concrete_elements,
- "concrete_concepts": concrete_concepts,
- "implicit_concepts": implicit_concepts,
- "abstract_concepts": abstract_concepts,
- "substance_elements": all_substance_elements,
- # Step 4-8 处理结果
- "substance_analyzed_result": analyzed_result,
- "substance_scored_result": scored_result,
- "substance_intention_support_result": intention_support_result,
- "substance_filtered_ids": filtered_ids,
- "substance_categorized_result": categorized_result,
- # 最终结果
- "substance_final_elements": final_elements
- }
- # ========== Step 1-3: 实质提取 ==========
- def _step1_extract_concrete_elements(
- self,
- video_file
- ) -> List[dict]:
- """Step 1: 提取具体元素 - 从图像中直接观察到的单一视觉实体对象"""
- if not self.is_initialized:
- self.initialize()
- if not video_file:
- logger.warning("⚠️ 没有视频文件,跳过具体元素提取")
- return []
- prompt = """# 任务
- 从视频中提取"具体元素"
- # 核心定义
- ## 具体元素
- - **定义**:
- -- 1.从视频画面中直接观察到的、可独立存在的**单一视觉实体对象**
- -- 2.视频的背景音乐、音效等非口播内容的声音
- - **判断标准**:
- -- 1.可以指着画面说"这是一个X"(单一、具体、可见的实体)
- -- 2.有背景音乐、音效等非口播内容的声音,直接用"背景音乐/音效声"作为名称即可,不要重复提取
- - **示例**:
- -- 1.胡萝卜、青椒、西兰花(每个都是单独的实体)
- -- 2.背景音乐/音效声
- - **禁止**:
- - 归类词(蔬菜、水果)
- - 概念性名词(食物、植物、人)
- - 文字内容(只关注视觉实体)
- ## 提取原则(仅针对画面中的视觉实体对象)
- - 只从视频画面中提取,不关注文字
- - 每个元素必须是单一的、具体的视觉实体
- - 使用"剥离测试":去掉表达方式后,这个实体仍然存在
- # 命名规范
- - 原子性:单一原子名词,不可再拆分
- - 名词性:纯名词,严禁形容词、动词、副词
- - 具体性:直接指向可观察的实体
- # 输出json结构
- [
- {
- "id": "从1开始的自增序列",
- "名称": "单一原子名词",
- "描述": "说明这个元素是什么,外观特征",
- "维度": {"一级": "实质", "二级": "具体元素"},
- "来源": ["视频画面"],
- "推理": "为什么识别这个具体元素"
- },
- {
- "id": "从1开始的自增序列",
- "名称": "背景音乐/音效声",
- "描述": "说明背景音乐/音效声是什么",
- "维度": {"一级": "实质", "二级": "具体元素"},
- "来源": ["视频"],
- "推理": "为什么识别这个背景音乐/音效声"
- }
- ]
- 注意:只提取具体的视觉实体对象,不要提取抽象概念或归类词
- """
- # 使用视频分析接口
- result = LLMInvoker.safe_invoke_video_analysis(
- operation_name="具体元素提取",
- video_file=video_file,
- prompt=prompt,
- agent=self,
- fallback=[]
- )
- # 为每个具体元素添加id
- for idx, element in enumerate(result, 1):
- element["id"] = f"具体元素-{idx}"
- return result
- def _step2_extract_concrete_concepts(
- self,
- video_file,
- text_data: dict,
- concrete_elements: List[dict],
- ) -> List[dict]:
- """Step 2: 提取具象概念 - 文字中字面出现的名词"""
- if not self.is_initialized:
- self.initialize()
- # 从第一步结果中提取已识别的具体元素名称,供本步骤排除使用
- element_names = [
- e.get("名称") for e in (concrete_elements or []) if e.get("名称")
- ]
- element_names_text = (
- json.dumps(element_names, ensure_ascii=False, indent=2)
- if element_names
- else "[]"
- )
- prompt = f"""# 任务
- 从视频中提取"具象概念"
- # 核心定义
- ## 具象概念
- - **定义**:视频画面内的文字或者口播内容中明确提到的完整名词
- ## 排除的名称(来自第一步,仅用于排除)
- **禁止提取的名称**:{element_names_text}
- ## 判断标准
- - **视频画面内的文字或者口播内容**中实际出现的**完整名词**
- - **不能是视频画面中出现的元素的名称等归类词**
- - 去掉表达方式后,这个概念仍然存在
- # 约束
- - 禁止通过语义推导、联想、理解得出的名词
- - **禁止归类词(蔬菜、水果、人等)**
- - **禁止使用第一步中已提取的具体元素名称**
- - 禁止拆分复合词
- - 禁止提取形容词、动词
- - 禁止提取谓语、定语、状语、补语
- - 禁止提取副词
- ## 提取原则
- - **词语完整性**:必须提取完整的**名词**,不允许拆分复合词
- - **严格约束**:必须是**画面文字或者口播内容中实际出现**的完整名词
- - **严格的名词验证**(必须同时满足以下两个条件):
- - 条件1:词性是名词(词典意义上的名词)
- - 条件2:在当前上下文中作为名词使用(语境判断)
- **验证方法**:
- - 找到该词在视频画面内的文字或者口播内容中的具体位置
- - 分析该词在句子中的语法成分和实际作用
- - 判断:该词是否在这个语境中充当"事物/对象/概念"的角色?
- # 输出json结构
- [
- {{
- "id": "从1开始的自增序列",
- "名称": "字面原词(完整名词)",
- "描述": "说明这个概念是什么",
- "维度": {{"一级": "实质", "二级": "具象概念"}},
- "来源": "HH:MM:SS",
- "上下文验证": {{
- "原文位置": "该词在原视频画面内的文字或者口播内容中的具体句子",
- "语法成分": "该词在句子中的语法成分(主语/宾语/定语中心语等)",
- "语境判断": "说明该词在此语境中确实作为名词使用的理由"
- }},
- "推理": "为什么这个名词被认为是具象概念"
- }}
- ]
- 注意:只输出同时满足"词性是名词"和"上下文中作为名词使用"两个条件的概念
- """
- # 使用视频分析接口(可综合语音与画面中的文字)
- result = LLMInvoker.safe_invoke_video_analysis(
- operation_name="具象概念提取",
- video_file=video_file,
- prompt=prompt,
- agent=self,
- fallback=[]
- )
- # 为每个具象概念添加id
- for idx, concept in enumerate(result, 1):
- concept["id"] = f"具象概念-{idx}"
- return result
- def _step3_summarize_abstract_concepts(
- self,
- video_file,
- concrete_elements: List[dict],
- concrete_concepts: List[dict],
- implicit_concepts: List[dict]
- ) -> List[dict]:
- """Step 3: 总结抽象概念 - 从具体元素和具象概念中归纳上位抽象"""
- if not self.is_initialized:
- self.initialize()
- if not concrete_elements and not concrete_concepts:
- logger.warning("⚠️ 没有具体元素或具象概念,跳过抽象概念提取")
- return []
- # 构建已提取的元素文本
- elements_text = json.dumps([
- {"id": e.get("id"), "名称": e.get("名称"), "描述": e.get("描述")}
- for e in concrete_elements
- ], ensure_ascii=False, indent=2) if concrete_elements else "无"
- concepts_text = json.dumps([
- {"id": c.get("id"), "名称": c.get("名称"), "描述": c.get("描述")}
- for c in concrete_concepts
- ], ensure_ascii=False, indent=2) if concrete_concepts else "无"
- prompt = f"""# 任务
- 基于已提取的具体元素和具象概念,总结新的"抽象概念"
- # 已提取的具体元素
- {elements_text}
- # 已提取的具象概念
- {concepts_text}
- # 核心定义
- # 定义与分类
- **抽象概念**分两类:
- **类型1-上位抽象**:对具体元素/具象概念的归类
- **类型2-引申含义**:具体元素/具象概念无法直接表达的深层含义
- # 提取原则
- - 对具体元素/具象概念的归类
- - 具体元素和具象概念无法直接表达的深层含义
- - 基于归纳:基于已提取的具体元素/具象概念
- - 来源追溯:准确标明所有来源ID(具体元素ID、具象概念ID),必须完整可追溯
- # 命名规范
- - 有完整独立语义的概念
- - 单一原子名词,不可拆分
- - 纯名词,禁止形容词、动词、副词
- - 精准描述概念,不做修饰
- # 判断标准
- - 去掉表达方式后,概念仍存在
- # 输出json结构
- [
- {{
- "id": "从1开始的自增序列",
- "名称": "单一名词或短语",
- "描述": "说明这个抽象概念是什么",
- "维度": {{"一级": "实质", "二级": "抽象概念"}},
- "类型": "上位抽象 | 引申含义",
- "来源": {{
- "具体元素": [{{"id":"具体元素-X", "名称":"具体元素-X的名称"}}, {{"id":"具体元素-Y", "名称":"具体元素-Y的名称"}}],
- "具象概念": [{{"id":"具象概念-A", "名称":"具象概念-A的名称"}}, {{"id":"具象概念-B", "名称":"具象概念-B的名称"}}]
- }},
- "推理过程": "明确说明如何从上述来源(具体哪些元素ID和概念ID)推导出这个抽象概念",
- }}
- ]
- 注意:只输出验证全部通过的概念
- """
- # 使用视频分析接口总结抽象概念
- result = LLMInvoker.safe_invoke_video_analysis(
- operation_name="抽象概念总结",
- video_file=video_file,
- prompt=prompt,
- agent=self,
- fallback=[]
- )
- # 为每个抽象概念添加id
- for idx, concept in enumerate(result, 1):
- concept["id"] = f"抽象概念-{idx}"
- return result
- # ========== Step 4-8: 后续处理 ==========
- def _step4_commonality_analysis(
- self,
- video_file,
- substance_elements: List[dict],
- text_data: dict,
- section_division: dict
- ) -> List[dict]:
- """Step 4: 共性分析 - 统计频次和段落覆盖率"""
- if not substance_elements:
- return []
- total_sections = self._count_sections(section_division)
- # 分批处理
- analyzed_items = self._commonality_analysis_in_batches(
- video_file, substance_elements, text_data, section_division, total_sections,
- max_batch_size=100
- )
- return analyzed_items
- def _commonality_analysis_in_batches(
- self,
- video_file,
- substance_elements: list,
- text_data: dict,
- section_division: dict,
- total_sections: int,
- max_batch_size: int = 100
- ) -> list:
- """分批处理共性分析"""
- if not self.is_initialized:
- self.initialize()
- num_elements = len(substance_elements)
- if num_elements == 0:
- return []
- # 如果元素数少于批次大小,一次性处理
- if num_elements <= max_batch_size:
- return self._commonality_analysis_single_batch(
- video_file, substance_elements, text_data, section_division, total_sections
- )
- # 分批处理
- num_batches = (num_elements + max_batch_size - 1) // max_batch_size
- batch_futures = {}
- for batch_idx in range(num_batches):
- start_idx = batch_idx * max_batch_size
- end_idx = min(start_idx + max_batch_size, num_elements)
- batch_elements = substance_elements[start_idx:end_idx]
- future = _GLOBAL_THREAD_POOL.submit(
- self._commonality_analysis_single_batch,
- video_file, batch_elements, text_data, section_division, total_sections
- )
- batch_futures[batch_idx] = future
- # 收集结果
- all_results = []
- for batch_idx, future in batch_futures.items():
- try:
- batch_result = future.result()
- if batch_result:
- all_results.extend(batch_result)
- except Exception as e:
- logger.error(f"批次 {batch_idx + 1} 失败: {e}")
- return all_results
- def _commonality_analysis_single_batch(
- self,
- video_file,
- batch_elements: list,
- text_data: dict,
- section_division: dict,
- total_sections: int
- ) -> list:
- """单批次共性分析"""
- if not self.is_initialized:
- self.initialize()
- section_text = self._build_section_text(section_division)
- elements_text = self._build_simple_items_text(batch_elements)
- prompt = f"""# 段落列表
- {section_text}
- # 元素列表
- {elements_text}
- # 任务
- 对每个元素统计出现的段落和频次
- ## 统计规则
- ### 1. 具体元素统计(只统计视觉实体)
- - **出现频次**: 统计该**单一视觉实体对象**在视频图像中直接观察到的次数
- - **出现段落列表**: 只统计能在视频图像中**直接看到该视觉实体**的段落
- ### 2. 具象概念统计(只统计文字字面)
- - **出现频次**: 统计该名词在视频画面文字和口播内容中**画面或者口播内容中出现**的次数
- - **出现段落列表**: 只统计**视频画面文字或者口播内容中包含该名词**的段落
- ### 3. 抽象概念统计(统计语义归类)
- - **出现频次**: 统计该概念被**隐含表达**的总次数
- - **出现段落列表**: 统计**包含该概念所归类的具体元素/具象概念**的段落
- # 输出(JSON)
- [
- {{
- "id": "元素id",
- "名称": "元素名称",
- "出现频次": 0,
- "出现段落列表": [
- {{
- "段落ID": "段落id",
- "如何体现": "描述该元素在这个段落中的具体体现方式"
- }}
- ]
- }}
- ]
- """
- # 使用视频分析接口做共性分析
- llm_result = LLMInvoker.safe_invoke_video_analysis(
- operation_name="共性分析",
- video_file=video_file,
- prompt=prompt,
- agent=self,
- fallback=[]
- )
- # 计算覆盖率
- analyzed_items = []
- for analysis in llm_result:
- section_list = analysis.get("出现段落列表", [])
- unique_paragraph_ids = set()
- for item in section_list:
- unique_paragraph_ids.add(item.get("段落ID", ""))
- coverage_count = len(unique_paragraph_ids)
- coverage_rate = round(coverage_count / total_sections, 4) if total_sections > 0 else 0
- analyzed_items.append({
- "id": analysis.get("id", 0),
- "名称": analysis.get("名称", ""),
- "出现频次": analysis.get("出现频次", 0),
- "出现段落列表": section_list,
- "出现段落数": coverage_count,
- "段落覆盖率": coverage_rate
- })
- return analyzed_items
- def _step5_multi_dimensional_scoring(
- self,
- substance_elements: List[dict],
- analyzed_result: list,
- inspiration_points: dict,
- purpose_points: list,
- substance_key_points: list
- ) -> dict:
- """Step 5: 多维度评分(相似度逻辑已废弃)
- 说明:
- - 不再进行任何相似度计算,完全依赖后续的“意图支撑”进行筛选
- - 保留函数与返回结构,仅返回空结果,避免下游依赖崩溃
- """
- logger.info(
- "【多维度评分】相似度比较逻辑已关闭,当前不进行评分,仅返回空结果。"
- )
- return {
- "灵感点": [],
- "目的点": [],
- "关键点": [],
- }
- def _step5_1_intention_support_evaluation(
- self,
- video_file,
- substance_elements: List[dict],
- analyzed_result: list,
- inspiration_points: dict,
- purpose_points: list,
- substance_key_points: list,
- text_data: dict,
- ) -> dict:
- """Step 5.1: 意图支撑评估
- 说明:
- - 在保留相似度评分的基础上,增加一套“意图支撑”视角的评估
- - 不再使用频次/覆盖率作为筛选条件,仅用于日志与统计
- - 结果以元素-点的支撑关系形式返回,不直接参与筛选决策
- """
- if not substance_elements:
- return {"灵感点": [], "目的点": [], "关键点": []}
- logger.info(f"【意图支撑评估】输入: {len(substance_elements)} 个实质元素")
- # 按二级维度分组(不做频次过滤,全部评估)
- dimension_groups = {
- "具体元素": [],
- "具象概念": [],
- "抽象概念": [],
- }
- for elem in substance_elements:
- second_level = elem.get("维度", {}).get("二级", "")
- if second_level in dimension_groups:
- dimension_groups[second_level].append(elem)
- logger.info(
- "维度分组(意图支撑): 具体元素=%d, 具象概念=%d, 抽象概念=%d",
- len(dimension_groups["具体元素"]),
- len(dimension_groups["具象概念"]),
- len(dimension_groups["抽象概念"]),
- )
- # 并行评估(各维度 x 3个点类型)
- futures = {}
- def submit_if_needed(dimension_name: str, point_type: str, points_list):
- if not points_list:
- logger.info(
- f"⏭️ 跳过意图支撑评估: {dimension_name}-{point_type} "
- f"(原因: 点列表为空, len={len(points_list) if isinstance(points_list, list) else 'N/A'})"
- )
- return
- if not dimension_groups.get(dimension_name):
- logger.info(
- f"⏭️ 跳过意图支撑评估: {dimension_name}-{point_type} "
- f"(原因: 该维度无元素, len={len(dimension_groups.get(dimension_name, []))})"
- )
- return
- key = (dimension_name, point_type)
- logger.info(
- f"📤 提交意图支撑评估任务: {dimension_name}-{point_type} "
- f"(元素数={len(dimension_groups[dimension_name])}, 点数={len(points_list)})"
- )
- futures[key] = _GLOBAL_THREAD_POOL.submit(
- self._evaluate_support_by_dimension,
- video_file,
- dimension_name,
- dimension_groups[dimension_name],
- points_list,
- point_type,
- text_data,
- )
- # 具体元素 / 具象概念 / 抽象概念 × 灵感点 / 目的点 / 关键点
- for dim in ["具体元素", "具象概念", "抽象概念"]:
- submit_if_needed(dim, "灵感点", inspiration_points if isinstance(inspiration_points, list) else [])
- submit_if_needed(dim, "目的点", purpose_points if isinstance(purpose_points, list) else [])
- submit_if_needed(dim, "关键点", substance_key_points if isinstance(substance_key_points, list) else [])
- # 收集结果(按点类型汇总)
- result = {
- "灵感点": [],
- "目的点": [],
- "关键点": [],
- }
- for (dimension_name, point_type), future in futures.items():
- try:
- dimension_result = future.result()
- if dimension_result:
- result[point_type].extend(dimension_result)
- logger.info(
- f"✅ 意图支撑-{dimension_name}-{point_type} 评估完成: {len(dimension_result)} 条支撑关系"
- )
- except Exception as e:
- logger.error(f"❌ 意图支撑-{dimension_name}-{point_type} 评估失败: {e}")
- return result
- def _evaluate_support_by_dimension(
- self,
- video_file,
- dimension_name: str,
- elements: list,
- points: list,
- point_type: str,
- text_data: dict,
- ) -> list:
- """按维度评估意图支撑关系(分批处理)"""
- if not self.is_initialized:
- self.initialize()
- if not elements or not points:
- return []
- # 分批控制:元素数 × 点数 ≈ 100 以内
- num_elements = len(elements)
- num_points = len(points)
- max_batch_product = 100
- max_elements_per_batch = max(1, int(max_batch_product / max(1, num_points)))
- num_batches = (num_elements + max_elements_per_batch - 1) // max_elements_per_batch
- batch_futures = {}
- for batch_idx in range(num_batches):
- start_idx = batch_idx * max_elements_per_batch
- end_idx = min(start_idx + max_elements_per_batch, num_elements)
- batch_elements = elements[start_idx:end_idx]
- future = _GLOBAL_THREAD_POOL.submit(
- self._evaluate_support_single_batch_by_dimension,
- video_file,
- dimension_name,
- batch_elements,
- points,
- point_type,
- text_data,
- )
- batch_futures[batch_idx] = future
- # 收集结果
- all_results = []
- for batch_idx, future in batch_futures.items():
- try:
- batch_result = future.result()
- if batch_result:
- all_results.extend(batch_result)
- except Exception as e:
- logger.error(f"【意图支撑-{dimension_name}】批次 {batch_idx + 1} 失败: {e}")
- # 合并结果(支撑结果)
- merged_results = self._merge_support_batch_results(all_results)
- return merged_results
- def _evaluate_support_single_batch_by_dimension(
- self,
- video_file,
- dimension_name: str,
- batch_elements: list,
- points: list,
- point_type: str,
- text_data: dict,
- ) -> list:
- """单批次意图支撑评估(按维度)"""
- if not self.is_initialized:
- self.initialize()
- post_content = self._build_post_content(text_data)
- elements_text = self._build_simple_items_text_dimension(batch_elements, dimension_name)
- points_text = self._build_points_text(point_type, points)
- # 根据维度选择不同的 prompt
- if dimension_name == "具体元素":
- prompt = self._build_concrete_element_support_prompt(post_content, elements_text, points_text)
- elif dimension_name == "具象概念":
- prompt = self._build_concrete_concept_support_prompt(post_content, elements_text, points_text)
- elif dimension_name == "抽象概念":
- prompt = self._build_abstract_concept_support_prompt(post_content, elements_text, points_text)
- else:
- logger.error(f"未知维度(意图支撑): {dimension_name}")
- return []
- # 使用视频分析接口,多模态评估意图支撑
- result = LLMInvoker.safe_invoke_video_analysis(
- operation_name=f"意图支撑评估-{dimension_name}-{point_type}",
- video_file=video_file,
- prompt=prompt,
- agent=self,
- fallback=[],
- )
- return result
- def _evaluate_support_in_batches(
- self,
- elements: list,
- points: list,
- point_type: str,
- max_batch_product: int = 100
- ) -> list:
- """分批评估相似度"""
- if not self.is_initialized:
- self.initialize()
- if not points:
- return []
- num_elements = len(elements)
- num_points = len(points)
- max_elements_per_batch = max(1, int(max_batch_product / num_points))
- num_batches = (num_elements + max_elements_per_batch - 1) // max_elements_per_batch
- # 分批处理
- batch_futures = {}
- for batch_idx in range(num_batches):
- start_idx = batch_idx * max_elements_per_batch
- end_idx = min(start_idx + max_elements_per_batch, num_elements)
- batch_elements = elements[start_idx:end_idx]
- future = _GLOBAL_THREAD_POOL.submit(
- self._evaluate_support_single_batch,
- batch_elements, points, point_type
- )
- batch_futures[batch_idx] = future
- # 收集结果
- all_results = []
- for batch_idx, future in batch_futures.items():
- try:
- batch_result = future.result()
- if batch_result:
- all_results.extend(batch_result)
- except Exception as e:
- logger.error(f"批次 {batch_idx + 1} 失败: {e}")
- # 合并并筛选(每个元素保留最相关的1-2个点)
- merged_results = self._merge_batch_results(all_results)
- return merged_results
- def _evaluate_support_single_batch(
- self,
- batch_elements: list,
- points: list,
- point_type: str
- ) -> list:
- """单批次评估"""
- if not self.is_initialized:
- self.initialize()
- elements_text = self._build_simple_items_text(batch_elements)
- points_text = self._build_points_text(point_type, points)
- prompt = f"""# 元素列表
- {elements_text}
- # 点列表
- {points_text}
- # 任务
- 对每个元素计算元素与点的文本相似度和语义相似度
- # 输出(JSON)
- [
- {{
- "id": "元素id",
- "名称": "元素名称",
- "相似度结果": [
- {{
- "点":"点的名称",
- "语义相似度":0.21,
- "语义相似度理由": "理由",
- "文本相似度":0.33,
- "文本相似度理由": "理由"
- }}
- ]
- }}
- ]
- """
- messages = [
- {"role": "system", "content": self.system_prompt},
- {"role": "user", "content": prompt}
- ]
- result = LLMInvoker.safe_invoke(
- self,
- f"评估支撑{point_type}",
- messages,
- fallback=[]
- )
- return result
- def _merge_batch_results(self, all_results: list) -> list:
- """合并批次结果,每个元素只保留最相关的1-2个点"""
- if not all_results:
- return []
- merged_map = {}
- for item in all_results:
- element_id = item.get("id")
- if element_id not in merged_map:
- merged_map[element_id] = {
- "id": element_id,
- "名称": item.get("名称"),
- "相似度结果": []
- }
- if not merged_map[element_id]["相似度结果"]:
- merged_map[element_id]["相似度结果"] = item.get("相似度结果", [])
- # 筛选每个元素的相似度结果
- for element_data in merged_map.values():
- similarity_results = element_data.get("相似度结果", [])
- if not similarity_results:
- continue
- max_text_sim_point = max(similarity_results, key=lambda x: x.get("文本相似度", 0))
- max_semantic_sim_point = max(similarity_results, key=lambda x: x.get("语义相似度", 0))
- if max_text_sim_point.get("点") == max_semantic_sim_point.get("点"):
- filtered_results = [max_text_sim_point]
- else:
- filtered_results = [max_text_sim_point, max_semantic_sim_point]
- element_data["相似度结果"] = filtered_results
- return list(merged_map.values())
- def _merge_support_batch_results(self, all_results: list) -> list:
- """合并批次结果(意图支撑),直接合并支撑的元素-点对"""
- if not all_results:
- return []
- merged_map = {}
- for item in all_results:
- element_id = item.get("id")
- if element_id not in merged_map:
- merged_map[element_id] = {
- "id": element_id,
- "名称": item.get("名称"),
- "支撑结果": [],
- }
- # 这里假设下游会控制去重,只在首次合并时写入
- if not merged_map[element_id]["支撑结果"]:
- merged_map[element_id]["支撑结果"] = item.get("支撑结果", [])
- return list(merged_map.values())
- def _step6_filter_elements(
- self,
- analyzed_result: list,
- scored_result: dict,
- intention_support_result: dict,
- ) -> list:
- """Step 6: 筛选实质元素
- 新的保留策略(基于意图支撑关系 + 覆盖率进行筛选):
- - 覆盖率和频次主要用于统计展示,但会作为必要条件之一
- - 必须**同时**满足以下三个条件才保留:
- - 出现频次 > 1
- - 存在任意“意图支撑”关系
- - 段落覆盖率 > 0.3(30%)
- - 相似度评分相关逻辑已全部停用,不再参与筛选
- """
- if not analyzed_result:
- return []
- # 创建 analyzed_result 的映射
- analyzed_map = {item.get("id"): item for item in analyzed_result}
- # 创建意图支撑映射:只要某个元素在任一维度、任一点类型下有支撑关系,即视为“有支撑”
- intention_support_map = {}
- if intention_support_result:
- for point_type in ["灵感点", "目的点", "关键点"]:
- dimension_data = intention_support_result.get(point_type, [])
- for item in dimension_data:
- if not isinstance(item, dict):
- continue
- element_id = item.get("id")
- support_results = item.get("支撑结果", [])
- if not element_id:
- continue
- if element_id not in intention_support_map:
- intention_support_map[element_id] = []
- # 只要有一条支撑结果就认为该元素“有支撑关系”
- if support_results:
- intention_support_map[element_id].extend(support_results)
- # 筛选
- filtered_ids = []
- rejected_ids = []
- for element_id, analyzed_data in analyzed_map.items():
- element_name = analyzed_data.get("名称", "N/A")
-
- # 声音类型的实质元素(背景音乐、音效声等)直接通过筛选,不参与后续判断
- sound_type_names = ["背景音乐", "音效声"]
- if element_name in sound_type_names:
- filtered_ids.append(element_id)
- logger.info(
- f"✅ 保留: id={element_id}, name={element_name}, 原因=声音类型元素,豁免筛选"
- )
- continue
-
- # 确保 frequency 是整数类型
- frequency_raw = analyzed_data.get("出现频次", 0)
- try:
- frequency = int(frequency_raw) if frequency_raw is not None else 0
- except (ValueError, TypeError):
- frequency = 0
- # 确保 coverage_rate 是浮点数类型
- coverage_rate_raw = analyzed_data.get("段落覆盖率", 0.0)
- try:
- coverage_rate = float(coverage_rate_raw) if coverage_rate_raw is not None else 0.0
- except (ValueError, TypeError):
- coverage_rate = 0.0
- # 频次过滤:出现频次<=1 的直接过滤(不再继续做支撑和覆盖率判断)
- if frequency <= 1:
- rejected_ids.append(element_id)
- logger.info(
- "❌ 过滤: id=%s, name=%s, 原因=出现频次<=1 (frequency=%d)",
- element_id,
- element_name,
- frequency,
- )
- continue
- support_info = intention_support_map.get(element_id, [])
- has_support = bool(support_info)
- has_high_coverage = coverage_rate > 0.3
- # 出现频次>1 且 有意图支撑关系 且 段落覆盖率 > 30%:直接保留,不进行相似度比较
- if has_support and has_high_coverage:
- filtered_ids.append(element_id)
- logger.info(
- f"✅ 保留: id={element_id}, name={element_name}, "
- f"support_count={len(support_info)}, coverage={coverage_rate}"
- )
- continue
- # 不满足“出现频次>1 + 有意图支撑 + 覆盖率>30%”的元素全部过滤
- rejected_ids.append(element_id)
- logger.info(
- "❌ 过滤: id=%s, name=%s, 原因=无隐含概念豁免且"
- "未同时满足出现频次>1、有意图支撑关系和段落覆盖率>0.3 "
- "(frequency=%d, coverage=%.4f)",
- element_id,
- element_name,
- frequency,
- coverage_rate,
- )
- logger.info(f"筛选完成: {len(filtered_ids)}/{len(analyzed_result)} 通过")
- return filtered_ids
- def _step7_categorize_elements(
- self,
- substance_elements: List[dict],
- filtered_ids: list
- ) -> dict:
- """Step 7: 元素分类 - 按二级维度分别分类"""
- if not filtered_ids:
- return {}
- # 只保留筛选后的元素
- filtered_elements = [
- elem for elem in substance_elements
- if elem.get("id") in filtered_ids
- ]
- # 按二级维度分组
- dimension_groups = {
- "具体元素": [],
- "具象概念": [],
- "抽象概念": [],
- }
- for elem in filtered_elements:
- second_level = elem.get("维度", {}).get("二级", "")
- if second_level in dimension_groups:
- dimension_groups[second_level].append(elem)
- # 并行分类
- categorization_results = {}
- futures = {}
- for dimension_name, elements in dimension_groups.items():
- if not elements:
- continue
- future = _GLOBAL_THREAD_POOL.submit(
- self._categorize_single_dimension,
- dimension_name,
- elements
- )
- futures[dimension_name] = future
- # 收集结果
- for dimension_name, future in futures.items():
- try:
- categorization_results[dimension_name] = future.result()
- except Exception as e:
- logger.error(f"{dimension_name} 分类失败: {e}")
- categorization_results[dimension_name] = {"元素分类": []}
- return categorization_results
- def _categorize_single_dimension(
- self,
- dimension_name: str,
- elements: list
- ) -> dict:
- """对单个维度的元素进行分类"""
- if not self.is_initialized:
- self.initialize()
- elements_text = json.dumps([
- {"id": elem.get("id"), "名称": elem.get("名称"), "描述": elem.get("描述")}
- for elem in elements
- ], ensure_ascii=False, indent=2)
- prompt = f"""# 任务
- 对"{dimension_name}"维度的元素进行分类
- # 元素列表
- {elements_text}
- # 分类要求
- ## 核心原则
- 1. **单一原子名词**: 分类名称必须是单一的原子名词
- 2. **MECE原则**: 分类之间相互独立、完全穷尽
- 3. **确定性归属**: 每个元素只能归属唯一一个分类
- 4. **层级限制**: 最多2层
- 5. 元素可以没有分类,不要强行归类
- 6. 分类下面至少要有2个元素,否则不要分类
- ## 实质维度的分类逻辑
- - **核心原则**:按照**内容本质、属性特征、功能作用**等角度来分类
- - **包含**:
- - 物理特征:形态、材质、颜色等
- - 功能用途:工具、装饰、食物等
- - 概念类别:情感、价值观、技能等
- - 领域归属:科技、艺术、健康等
- # 输出格式(JSON)
- {{
- "元素分类": [
- {{
- "元素id": "元素的ID",
- "元素名称": "元素名称",
- "分类": ["一级分类","二级分类","..."]
- }}
- ]
- }}
- """
- messages = [
- {"role": "system", "content": self.system_prompt},
- {"role": "user", "content": prompt}
- ]
- result = LLMInvoker.safe_invoke(
- self,
- f"分类-{dimension_name}",
- messages,
- fallback={}
- )
- return result
- def _merge_all_info(
- self,
- substance_elements: List[dict],
- analyzed_result: list,
- scored_result: dict,
- intention_support_result: dict,
- filtered_ids: list,
- categorized_result: dict
- ) -> list:
- """Step 8: 合并所有信息 - 每个元素包含所有中间信息"""
- if not filtered_ids:
- return []
- # 创建映射
- extraction_map = {item.get("id"): item for item in substance_elements}
- analyzed_map = {item.get("id"): item for item in analyzed_result}
- # 创建评分映射(相似度)
- scored_map = {}
- for dimension in ["灵感点", "目的点", "关键点"]:
- dimension_data = scored_result.get(dimension, [])
- for item in dimension_data:
- if not isinstance(item, dict):
- continue
- element_id = item.get("id")
- if element_id not in scored_map:
- scored_map[element_id] = {}
- similarity_results = item.get("相似度结果", [])
- sorted_results = sorted(
- similarity_results,
- key=lambda x: (x.get("文本相似度", 0), x.get("语义相似度", 0)),
- reverse=True
- )
- scored_map[element_id][dimension] = sorted_results
- # 创建意图支撑映射
- intention_support_map = {}
- if intention_support_result:
- for dimension in ["灵感点", "目的点", "关键点"]:
- dimension_data = intention_support_result.get(dimension, [])
- for item in dimension_data:
- if not isinstance(item, dict):
- continue
- element_id = item.get("id")
- if element_id not in intention_support_map:
- intention_support_map[element_id] = {}
- support_results = item.get("支撑结果", [])
- intention_support_map[element_id][dimension] = support_results
- # 创建分类映射
- category_map = {}
- for dimension_data in categorized_result.values():
- element_classifications = dimension_data.get("元素分类", [])
- for classification in element_classifications:
- element_id = classification.get("元素id")
- category_info = classification.get("分类", {})
- if element_id:
- category_map[element_id] = category_info
- # 合并信息
- final_elements = []
- for element_id in filtered_ids:
- base_info = extraction_map.get(element_id, {})
- analysis_info = analyzed_map.get(element_id, {})
- scoring_info = scored_map.get(element_id, {})
- intention_info = intention_support_map.get(element_id, {})
- category_info = category_map.get(element_id, {})
- merged_element = {
- "id": base_info.get("id"),
- "名称": base_info.get("名称"),
- "描述": base_info.get("描述"),
- "维度": base_info.get("维度", {}),
- "分类": category_info,
- "共性分析": {
- "出现频次": analysis_info.get("出现频次", 0),
- "出现段落列表": analysis_info.get("出现段落列表", []),
- "出现段落数": analysis_info.get("出现段落数", 0),
- "段落覆盖率": analysis_info.get("段落覆盖率", 0.0)
- },
- "多维度评分": {
- "灵感点": scoring_info.get("灵感点", []),
- "目的点": scoring_info.get("目的点", []),
- "关键点": scoring_info.get("关键点", [])
- },
- "意图支撑": {
- "灵感点": intention_info.get("灵感点", []),
- "目的点": intention_info.get("目的点", []),
- "关键点": intention_info.get("关键点", [])
- }
- }
- # 根据不同类型添加特定字段
- second_level = base_info.get("维度", {}).get("二级", "")
- if second_level == "具体元素":
- merged_element["来源"] = base_info.get("来源", [])
- elif second_level == "具象概念":
- merged_element["来源"] = base_info.get("来源", [])
- merged_element["字面位置"] = base_info.get("字面位置", [])
- elif second_level == "抽象概念" or second_level == "隐含概念":
- merged_element["类型"] = base_info.get("类型", "")
- merged_element["来源"] = base_info.get("来源", {})
- merged_element["推理过程"] = base_info.get("推理过程", "")
- merged_element["推理层次"] = base_info.get("推理层次", 1)
- final_elements.append(merged_element)
- return final_elements
- # ========== 辅助方法 ==========
- def _build_section_text(self, section_division: dict) -> str:
- """构建段落划分文本"""
- if not section_division:
- return "无段落划分信息"
- sections = section_division.get("段落列表", [])
- if not sections:
- return "无段落信息"
- def build_section_list(section_list, indent=0):
- text = ""
- for section in section_list:
- if section.get('子项'):
- text += build_section_list(section['子项'], indent + 1)
- else:
- section_id = section.get('id', 'N/A')
- section_desc = section.get('描述', 'N/A')
- content_range = section.get('内容范围', 'N/A')
- text += f"{section_id}: {section_desc}\n内容范围: {content_range}\n"
- return text
- return "段落列表:\n" + build_section_list(sections)
- def _build_post_content(self, text_data: dict) -> str:
- """构建原文内容文本(用于意图支撑判断)
- 这里不假设具体结构,直接以 JSON 形式展开,保证信息完整可见。
- """
- if not text_data:
- return "无文本内容"
- try:
- return json.dumps(text_data, ensure_ascii=False, indent=2)
- except TypeError:
- # 避免非序列化对象导致报错
- return str(text_data)
- def _build_simple_items_text_dimension(self, elements: list, dimension_name: str) -> str:
- """构建某个维度下元素列表文本(用于意图支撑判断)"""
- simple_items = [
- {
- "id": elem.get("id", "N/A"),
- "名称": elem.get("名称", "N/A"),
- "描述": elem.get("描述", "N/A"),
- }
- for elem in elements
- ]
- return json.dumps(
- {"维度": dimension_name, "元素列表": simple_items},
- ensure_ascii=False,
- indent=2,
- )
- def _build_simple_items_text(self, elements: list) -> str:
- """构建元素列表文本"""
- grouped_elements = {
- "具体元素": [],
- "具象概念": [],
- "抽象概念": [],
- }
- for elem in elements:
- element_type = elem.get('维度', {}).get('二级', 'N/A')
- element_data = {
- "id": elem.get('id', 'N/A'),
- "名称": elem.get('名称', 'N/A'),
- "描述": elem.get('描述', 'N/A')
- }
- if element_type in grouped_elements:
- grouped_elements[element_type].append(element_data)
- filtered_groups = {k: v for k, v in grouped_elements.items() if v}
- return json.dumps(filtered_groups, ensure_ascii=False, indent=2)
- def _build_points_text(self, point_type: str, points_data) -> str:
- """构建点列表文本"""
- if not points_data:
- return f"无{point_type}信息"
- filtered_points = [
- {"名称": item.get(point_type, 'N/A')}
- for item in points_data if isinstance(item, dict)
- ]
- return json.dumps(filtered_points, ensure_ascii=False, indent=2)
- def _build_concrete_element_support_prompt(
- self, post_content: str, elements_text: str, points_text: str
- ) -> str:
- """构建具体元素的意图支撑判断 prompt(基于视频画面)"""
- return f"""# 原文内容
- {post_content}
- # 具体元素列表
- {elements_text}
- # 点列表
- {points_text}
- # 任务
- 判断每个**具体元素**是否对点有关键支撑
- ## 具体元素定义(重要!)
- - 定义:视频画面中直接观察到的单一视觉实体对象
- - 判断标准:可以指着画面说"这是一个X"
- - 剥离测试:去掉表达方式后,该视觉实体仍然存在
- ## 核心判断原则:仅基于视频画面语境
- ### 关键约束
- 1. 只看视频画面:具体元素的支撑判断**只能基于视频中的视觉实体**,不能基于文字论述
- 2. 视觉实体角色:该视觉实体在视频画面中的作用是什么?
- - ✅ 核心展示对象:该视觉实体是画面的核心展示内容
- - ❌ 辅助/装饰:该视觉实体只是背景、装饰、示意
- 3. 关键支撑:该视觉实体对点的表达是否关键?去掉它是否会明显削弱点的支撑?
- ### 判断流程
- 1. 理解点的意图,点想表达什么
- 2. 在视频画面中找到该视觉实体
- 3. 判断:去掉该视觉实体,是否无法完整表达点
- - 如果是,支撑
- - 如果不是,不支撑
- ### 严格标准
- - 禁止使用文字内容来判断具体元素的支撑
- - 禁止仅凭名称字面匹配判定支撑
- - 必须基于该视觉实体在画面中的实际角色
- # 输出(JSON)
- 只输出有关键支撑的元素-点对,不支撑的不输出
- [
- {{
- "id": "元素id",
- "名称": "元素名称",
- "支撑结果": [
- {{
- "点": "点的名称",
- "点的意图": "点想表达什么",
- "支撑理由": "说明为什么去掉该视觉实体,会削弱点的表达,程度达到30%以上"
- }}
- ]
- }}
- ]
- 注意:
- 1. 只基于视频画面判断
- 2. 只输出"关键支撑"的元素-点对
- 3. 辅助/装饰元素直接排除,不输出
- 4. 必须基于视频画面中的视觉实体判断,不能做字面匹配"""
- def _build_concrete_concept_support_prompt(
- self, post_content: str, elements_text: str, points_text: str
- ) -> str:
- """构建具象概念的意图支撑判断 prompt(基于文字语境)"""
- return f"""# 原文内容
- {post_content}
- # 具象概念列表
- {elements_text}
- # 点列表
- {points_text}
- # 任务
- 判断每个**具象概念**是否对点有关键支撑
- ## 具象概念定义(重要!)
- - 定义:文字中字面出现的名词(包括标题、正文、字幕、视频画面中的文字)
- - 判断标准:文字中实际出现,禁止语义推导
- ## 核心判断原则:仅基于文字语境(包含视频中的文字)
- ### 关键约束
- 1. 只看文字:具象概念的支撑判断**只能基于文字中的概念论述**,不能基于视频中的视觉实体
- 2. 概念角色:该概念在文字论述中的作用是什么?
- - ✅ 核心论述概念:该概念是文字论述的核心对象、关键主题
- - ❌ 次要提及:该概念只是顺带提及、举例说明
- 3. 关键支撑:该概念对点的表达是否关键?去掉它是否会明显削弱点的支撑?
- ### 判断流程
- 1. 理解点的意图,点想表达什么
- 2. 在标题、正文、字幕、画面文字中找到该概念出现的位置
- 3. 判断:去掉该段文字,是否无法完整表达点
- - 如果是,支撑
- - 如果不是,不支撑
- ### 严格标准
- - 禁止用视频画面中的视觉实体来判断具象概念的支撑
- - 禁止仅凭名称字面匹配判定支撑
- - 必须判断该概念在文字论述中的实际角色
- # 输出(JSON)
- 只输出有关键支撑的元素-点对,不支撑的不输出
- [
- {{
- "id": "元素id",
- "名称": "元素名称",
- "支撑结果": [
- {{
- "点": "点的名称",
- "点的意图": "点想表达什么",
- "支撑理由": "说明为什么去掉该概念,会削弱点的表达,程度达到30%以上"
- }}
- ]
- }}
- ]
- 注意:
- 1. 只基于文字判断
- 2. 只输出"关键支撑"的元素-点对
- 3. 次要提及的概念直接排除,不输出
- 4. 必须基于文字中的概念论述判断,不能做字面匹配"""
- def _build_abstract_concept_support_prompt(
- self, post_content: str, elements_text: str, points_text: str
- ) -> str:
- """构建抽象概念的意图支撑判断 prompt"""
- return f"""# 原文内容
- {post_content}
- # 抽象概念列表
- {elements_text}
- # 点列表
- {points_text}
- # 任务
- 判断每个**抽象概念**是否对点有关键支撑
- ## 抽象概念定义(重要!)
- - 定义:从具体元素和具象概念中理解到的上位抽象
- - 类型1-上位抽象(归类):是下位元素的类别、分类
- - 类型2-引申含义:需要理解上下文的深层含义
- - 剥离测试:去掉表达方式后,该抽象概念仍然存在
- ## 核心判断原则:基于来源语境
- ### 关键约束
- 1. 追溯来源:抽象概念来源于具体元素和/或具象概念,必须追溯到来源
- 2. 继承语境:抽象概念的语境继承自其来源
- - 如果来源主要是具体元素 → 语境偏向视频画面
- - 如果来源主要是具象概念 → 语境偏向文字
- - 如果来源混合 → 综合判断
- 3. 关键支撑:该抽象概念对点的表达是否关键?
- ### 判断流程
- 1. 理解点的意图:点想表达什么?
- 2. 根据来源确定该抽象概念的主要语境
- 3. 判断:去掉该抽象概念,是否无法完整表达点
- - 如果是,支撑
- - 如果不是,不支撑
- ### 严格标准
- - 必须基于来源的语境来判断
- - 禁止仅凭名称字面匹配判定支撑
- - 必须能够追溯到来源元素,验证支撑关系
- # 输出(JSON)
- 只输出有关键支撑的元素-点对,不支撑的不输出
- [
- {{
- "id": "元素id",
- "名称": "元素名称",
- "支撑结果": [
- {{
- "点": "点的名称",
- "来源追溯": "该抽象概念的来源(具体元素/具象概念)及其语境",
- "语境分析": "基于来源确定的语境(画面/文字/混合)",
- "支撑理由": "说明该抽象概念为什么对该点有关键支撑"
- }}
- ]
- }}
- ]
- 注意:
- 1. 必须追溯到来源元素
- 2. 必须继承来源的语境来判断
- 3. 只输出"关键支撑"的元素-点对
- 4. 禁止字面匹配"""
- # 隐含概念相关的意图支撑判断已移除
- def _count_sections(self, section_division: dict) -> int:
- """统计段落总数(只统计叶子节点)"""
- if not section_division:
- return 0
- sections = section_division.get("段落列表", [])
- if not sections:
- return 0
- def count_leaf_nodes(section_list):
- count = 0
- for section in section_list:
- children = section.get("子项", [])
- if children:
- count += count_leaf_nodes(children)
- else:
- count += 1
- return count
- return count_leaf_nodes(sections)
- def _build_messages(self, state: dict) -> List[dict]:
- """构建消息 - 本Agent不使用此方法"""
- return []
- def _update_state(self, state: dict, response) -> dict:
- """更新状态 - 本Agent不使用此方法"""
- return state
|