#!/usr/bin/env python # -*- coding: utf-8 -*- """ 实质提取Agent (SubstanceExtractionAgent) 功能: - 从视频内容中提取实质元素(具体元素、具象概念、抽象概念) - Step 1: 提取具体元素(只看视频画面中的实体) - Step 2: 提取具象概念(只看画面中的文字 + 口播内容中的文字) - Step 3: 总结抽象概念(基于Step 1+2) - Step 4: 共性分析(频次、段落覆盖率) - Step 5: 多维度评分(vs 灵感点/目的点/实质关键点) - Step 6: 筛选(基于频次+覆盖率+相似度) - Step 7: 分类 - Step 8: 合并所有信息 参考:元素提取新方案设计文档.md """ import json from typing import List, Dict, Any from concurrent.futures import ThreadPoolExecutor from src.components.agents.base import BaseLLMAgent from src.utils.logger import get_logger from src.utils.llm_invoker import LLMInvoker, get_video_file_from_state logger = get_logger(__name__) # 全局线程池 - 用于并行处理 _GLOBAL_THREAD_POOL = ThreadPoolExecutor(max_workers=16, thread_name_prefix="SubstanceExtraction") class ScriptSubstanceExtractionAgent(BaseLLMAgent): """实质提取Agent - 自底向上的归纳过程 提取流程(视频版): - Step 1: 提取具体元素(只看视频画面中的实体) - Step 2: 提取具象概念(只看画面中的文字 + 口播内容中的文字) - Step 3: 总结抽象概念(基于Step 1+2) - Step 4: 共性分析(频次、段落覆盖率) - Step 5: 多维度评分(vs 灵感点/目的点/实质关键点) - Step 6: 筛选(基于频次+覆盖率+相似度) - Step 7: 分类 - Step 8: 合并所有信息 """ def __init__( self, name: str = "substance_extraction_agent", description: str = "实质提取Agent", model_provider: str = "google_genai", temperature: float = 0.1, max_tokens: int = 40960 ): system_prompt = self._build_system_prompt() super().__init__( name=name, description=description, model_provider=model_provider, system_prompt=system_prompt, temperature=temperature, max_tokens=max_tokens ) def _build_system_prompt(self) -> str: """构建系统提示词""" return """你是一个专业的内容分析专家,擅长从图文内容中提取实质性元素。 # 核心定义 ## 实质(Substance):"是什么" 内容本身,独立于表达方式而存在 ### 具体元素 - 定义:从图像中直接观察到的单一视觉实体对象 - 判断标准:可以指着图片说"这是一个X" ### 具象概念 - 定义:画面或者口播内容中出现的名词 - 判断标准:画面或者口播内容中实际出现,禁止语义推导 ### 抽象概念 - 定义:从具体元素和具象概念中理解到的上位抽象 - 类型1:上位抽象(归类)- 是下位元素的类别、分类 - 类型2:引申含义 - 需要理解上下文的深层含义 ## 区分方法:"剥离测试" 问题:如果去掉所有表达手法/风格/技巧,这个特征还存在吗? - 存在 → 实质(内容本身) - 不存在/失去意义 → 形式(表达方式) """ def process(self, state: dict) -> dict: """执行完整的实质提取流程(Step 1-8)""" logger.info("=== 开始实质元素提取(完整流程) ===") # 从 state 中获取视频文件(视频版本核心输入) video_file = get_video_file_from_state(state) if not video_file: logger.error("无法从 state 中获取视频文件,实质提取终止") return { "concrete_elements": [], "concrete_concepts": [], "implicit_concepts": [], "abstract_concepts": [], "substance_elements": [], "substance_analyzed_result": [], "substance_scored_result": {}, "substance_filtered_ids": [], "substance_categorized_result": {}, "substance_final_elements": [] } # 从state中提取其他文本/上下文数据 text_data = state.get("text", {}) section_division = state.get("section_division", {}) # 处理灵感点:支持列表和字典两种格式 inspiration_points_raw = state.get("inspiration_points", {}) if isinstance(inspiration_points_raw, list): inspiration_points = inspiration_points_raw elif isinstance(inspiration_points_raw, dict): # 兼容旧格式:{"points": [...]} 或直接是列表 inspiration_points = inspiration_points_raw.get("points", []) else: inspiration_points = [] purpose_points = state.get("purpose_points", []) key_points = state.get("key_points", []) # 只保留实质类关键点 substance_key_points = [ kp for kp in key_points if kp.get("维度大类") == "实质" ] if key_points else [] logger.info( f"意图支撑评估输入: 灵感点={len(inspiration_points)}, " f"目的点={len(purpose_points)}, 关键点(实质类)={len(substance_key_points)}" ) # Step 1: 提取具体元素(画面中的实体) logger.info("▶ Step 1: 提取具体元素") concrete_elements = self._step1_extract_concrete_elements(video_file) # Step 2: 提取具象概念(画面中的文字 + 口播内容中的文字) logger.info("▶ Step 2: 提取具象概念") concrete_concepts = self._step2_extract_concrete_concepts( video_file, text_data, concrete_elements ) # 隐含概念相关逻辑已移除,不再单独提取 implicit_concepts: List[dict] = [] # Step 3: 总结抽象概念(基于Step 1+2) logger.info("▶ Step 3: 总结抽象概念") abstract_concepts = self._step3_summarize_abstract_concepts( video_file, concrete_elements, concrete_concepts, implicit_concepts ) # 合并所有实质元素(不再包含隐含概念) all_substance_elements = ( concrete_elements + concrete_concepts + abstract_concepts ) logger.info( "Step 1-3 完成 - 总计: %d 个元素 (具体:%d, 具象:%d, 抽象:%d)", len(all_substance_elements), len(concrete_elements), len(concrete_concepts), len(abstract_concepts), ) # Step 4: 共性分析 logger.info("▶ Step 4: 共性分析") analyzed_result = self._step4_commonality_analysis( video_file, all_substance_elements, text_data, section_division ) # Step 5: 多维度评分(已废弃相似度比较逻辑,当前不再进行相似度评分) logger.info("▶ Step 5: 多维度评分(已停用相似度计算,仅返回空结果)") scored_result = self._step5_multi_dimensional_scoring( all_substance_elements, analyzed_result, inspiration_points, purpose_points, substance_key_points ) # Step 5.1: 意图支撑评估(基于视频与文本) logger.info("▶ Step 5.1: 意图支撑评估") intention_support_result = self._step5_1_intention_support_evaluation( video_file, all_substance_elements, analyzed_result, inspiration_points, purpose_points, substance_key_points, text_data, ) # Step 6: 筛选 logger.info("▶ Step 6: 筛选") filtered_ids = self._step6_filter_elements( analyzed_result, scored_result, intention_support_result, ) # Step 7: 分类 logger.info("▶ Step 7: 分类") categorized_result = self._step7_categorize_elements( all_substance_elements, filtered_ids ) # Step 8: 合并信息 logger.info("▶ Step 8: 合并信息") final_elements = self._merge_all_info( all_substance_elements, analyzed_result, scored_result, intention_support_result, filtered_ids, categorized_result, ) logger.info(f"实质元素提取完成 - 最终元素数: {len(final_elements)}") # 返回所有结果 return { # Step 1-3 原始提取结果 "concrete_elements": concrete_elements, "concrete_concepts": concrete_concepts, "implicit_concepts": implicit_concepts, "abstract_concepts": abstract_concepts, "substance_elements": all_substance_elements, # Step 4-8 处理结果 "substance_analyzed_result": analyzed_result, "substance_scored_result": scored_result, "substance_intention_support_result": intention_support_result, "substance_filtered_ids": filtered_ids, "substance_categorized_result": categorized_result, # 最终结果 "substance_final_elements": final_elements } # ========== Step 1-3: 实质提取 ========== def _step1_extract_concrete_elements( self, video_file ) -> List[dict]: """Step 1: 提取具体元素 - 从图像中直接观察到的单一视觉实体对象""" if not self.is_initialized: self.initialize() if not video_file: logger.warning("⚠️ 没有视频文件,跳过具体元素提取") return [] prompt = """# 任务 从视频中提取"具体元素" # 核心定义 ## 具体元素 - **定义**: -- 1.从视频画面中直接观察到的、可独立存在的**单一视觉实体对象** -- 2.视频的背景音乐、音效等非口播内容的声音 - **判断标准**: -- 1.可以指着画面说"这是一个X"(单一、具体、可见的实体) -- 2.有背景音乐、音效等非口播内容的声音,直接用"背景音乐/音效声"作为名称即可,不要重复提取 - **示例**: -- 1.胡萝卜、青椒、西兰花(每个都是单独的实体) -- 2.背景音乐/音效声 - **禁止**: - 归类词(蔬菜、水果) - 概念性名词(食物、植物、人) - 文字内容(只关注视觉实体) ## 提取原则(仅针对画面中的视觉实体对象) - 只从视频画面中提取,不关注文字 - 每个元素必须是单一的、具体的视觉实体 - 使用"剥离测试":去掉表达方式后,这个实体仍然存在 # 命名规范 - 原子性:单一原子名词,不可再拆分 - 名词性:纯名词,严禁形容词、动词、副词 - 具体性:直接指向可观察的实体 # 输出json结构 [ { "id": "从1开始的自增序列", "名称": "单一原子名词", "描述": "说明这个元素是什么,外观特征", "维度": {"一级": "实质", "二级": "具体元素"}, "来源": ["视频画面"], "推理": "为什么识别这个具体元素" }, { "id": "从1开始的自增序列", "名称": "背景音乐/音效声", "描述": "说明背景音乐/音效声是什么", "维度": {"一级": "实质", "二级": "具体元素"}, "来源": ["视频"], "推理": "为什么识别这个背景音乐/音效声" } ] 注意:只提取具体的视觉实体对象,不要提取抽象概念或归类词 """ # 使用视频分析接口 result = LLMInvoker.safe_invoke_video_analysis( operation_name="具体元素提取", video_file=video_file, prompt=prompt, agent=self, fallback=[] ) # 为每个具体元素添加id for idx, element in enumerate(result, 1): element["id"] = f"具体元素-{idx}" return result def _step2_extract_concrete_concepts( self, video_file, text_data: dict, concrete_elements: List[dict], ) -> List[dict]: """Step 2: 提取具象概念 - 文字中字面出现的名词""" if not self.is_initialized: self.initialize() # 从第一步结果中提取已识别的具体元素名称,供本步骤排除使用 element_names = [ e.get("名称") for e in (concrete_elements or []) if e.get("名称") ] element_names_text = ( json.dumps(element_names, ensure_ascii=False, indent=2) if element_names else "[]" ) prompt = f"""# 任务 从视频中提取"具象概念" # 核心定义 ## 具象概念 - **定义**:视频画面内的文字或者口播内容中明确提到的完整名词 ## 排除的名称(来自第一步,仅用于排除) **禁止提取的名称**:{element_names_text} ## 判断标准 - **视频画面内的文字或者口播内容**中实际出现的**完整名词** - **不能是视频画面中出现的元素的名称等归类词** - 去掉表达方式后,这个概念仍然存在 # 约束 - 禁止通过语义推导、联想、理解得出的名词 - **禁止归类词(蔬菜、水果、人等)** - **禁止使用第一步中已提取的具体元素名称** - 禁止拆分复合词 - 禁止提取形容词、动词 - 禁止提取谓语、定语、状语、补语 - 禁止提取副词 ## 提取原则 - **词语完整性**:必须提取完整的**名词**,不允许拆分复合词 - **严格约束**:必须是**画面文字或者口播内容中实际出现**的完整名词 - **严格的名词验证**(必须同时满足以下两个条件): - 条件1:词性是名词(词典意义上的名词) - 条件2:在当前上下文中作为名词使用(语境判断) **验证方法**: - 找到该词在视频画面内的文字或者口播内容中的具体位置 - 分析该词在句子中的语法成分和实际作用 - 判断:该词是否在这个语境中充当"事物/对象/概念"的角色? # 输出json结构 [ {{ "id": "从1开始的自增序列", "名称": "字面原词(完整名词)", "描述": "说明这个概念是什么", "维度": {{"一级": "实质", "二级": "具象概念"}}, "来源": "HH:MM:SS", "上下文验证": {{ "原文位置": "该词在原视频画面内的文字或者口播内容中的具体句子", "语法成分": "该词在句子中的语法成分(主语/宾语/定语中心语等)", "语境判断": "说明该词在此语境中确实作为名词使用的理由" }}, "推理": "为什么这个名词被认为是具象概念" }} ] 注意:只输出同时满足"词性是名词"和"上下文中作为名词使用"两个条件的概念 """ # 使用视频分析接口(可综合语音与画面中的文字) result = LLMInvoker.safe_invoke_video_analysis( operation_name="具象概念提取", video_file=video_file, prompt=prompt, agent=self, fallback=[] ) # 为每个具象概念添加id for idx, concept in enumerate(result, 1): concept["id"] = f"具象概念-{idx}" return result def _step3_summarize_abstract_concepts( self, video_file, concrete_elements: List[dict], concrete_concepts: List[dict], implicit_concepts: List[dict] ) -> List[dict]: """Step 3: 总结抽象概念 - 从具体元素和具象概念中归纳上位抽象""" if not self.is_initialized: self.initialize() if not concrete_elements and not concrete_concepts: logger.warning("⚠️ 没有具体元素或具象概念,跳过抽象概念提取") return [] # 构建已提取的元素文本 elements_text = json.dumps([ {"id": e.get("id"), "名称": e.get("名称"), "描述": e.get("描述")} for e in concrete_elements ], ensure_ascii=False, indent=2) if concrete_elements else "无" concepts_text = json.dumps([ {"id": c.get("id"), "名称": c.get("名称"), "描述": c.get("描述")} for c in concrete_concepts ], ensure_ascii=False, indent=2) if concrete_concepts else "无" prompt = f"""# 任务 基于已提取的具体元素和具象概念,总结新的"抽象概念" # 已提取的具体元素 {elements_text} # 已提取的具象概念 {concepts_text} # 核心定义 # 定义与分类 **抽象概念**分两类: **类型1-上位抽象**:对具体元素/具象概念的归类 **类型2-引申含义**:具体元素/具象概念无法直接表达的深层含义 # 提取原则 - 对具体元素/具象概念的归类 - 具体元素和具象概念无法直接表达的深层含义 - 基于归纳:基于已提取的具体元素/具象概念 - 来源追溯:准确标明所有来源ID(具体元素ID、具象概念ID),必须完整可追溯 # 命名规范 - 有完整独立语义的概念 - 单一原子名词,不可拆分 - 纯名词,禁止形容词、动词、副词 - 精准描述概念,不做修饰 # 判断标准 - 去掉表达方式后,概念仍存在 # 输出json结构 [ {{ "id": "从1开始的自增序列", "名称": "单一名词或短语", "描述": "说明这个抽象概念是什么", "维度": {{"一级": "实质", "二级": "抽象概念"}}, "类型": "上位抽象 | 引申含义", "来源": {{ "具体元素": [{{"id":"具体元素-X", "名称":"具体元素-X的名称"}}, {{"id":"具体元素-Y", "名称":"具体元素-Y的名称"}}], "具象概念": [{{"id":"具象概念-A", "名称":"具象概念-A的名称"}}, {{"id":"具象概念-B", "名称":"具象概念-B的名称"}}] }}, "推理过程": "明确说明如何从上述来源(具体哪些元素ID和概念ID)推导出这个抽象概念", }} ] 注意:只输出验证全部通过的概念 """ # 使用视频分析接口总结抽象概念 result = LLMInvoker.safe_invoke_video_analysis( operation_name="抽象概念总结", video_file=video_file, prompt=prompt, agent=self, fallback=[] ) # 为每个抽象概念添加id for idx, concept in enumerate(result, 1): concept["id"] = f"抽象概念-{idx}" return result # ========== Step 4-8: 后续处理 ========== def _step4_commonality_analysis( self, video_file, substance_elements: List[dict], text_data: dict, section_division: dict ) -> List[dict]: """Step 4: 共性分析 - 统计频次和段落覆盖率""" if not substance_elements: return [] total_sections = self._count_sections(section_division) # 分批处理 analyzed_items = self._commonality_analysis_in_batches( video_file, substance_elements, text_data, section_division, total_sections, max_batch_size=100 ) return analyzed_items def _commonality_analysis_in_batches( self, video_file, substance_elements: list, text_data: dict, section_division: dict, total_sections: int, max_batch_size: int = 100 ) -> list: """分批处理共性分析""" if not self.is_initialized: self.initialize() num_elements = len(substance_elements) if num_elements == 0: return [] # 如果元素数少于批次大小,一次性处理 if num_elements <= max_batch_size: return self._commonality_analysis_single_batch( video_file, substance_elements, text_data, section_division, total_sections ) # 分批处理 num_batches = (num_elements + max_batch_size - 1) // max_batch_size batch_futures = {} for batch_idx in range(num_batches): start_idx = batch_idx * max_batch_size end_idx = min(start_idx + max_batch_size, num_elements) batch_elements = substance_elements[start_idx:end_idx] future = _GLOBAL_THREAD_POOL.submit( self._commonality_analysis_single_batch, video_file, batch_elements, text_data, section_division, total_sections ) batch_futures[batch_idx] = future # 收集结果 all_results = [] for batch_idx, future in batch_futures.items(): try: batch_result = future.result() if batch_result: all_results.extend(batch_result) except Exception as e: logger.error(f"批次 {batch_idx + 1} 失败: {e}") return all_results def _commonality_analysis_single_batch( self, video_file, batch_elements: list, text_data: dict, section_division: dict, total_sections: int ) -> list: """单批次共性分析""" if not self.is_initialized: self.initialize() section_text = self._build_section_text(section_division) elements_text = self._build_simple_items_text(batch_elements) prompt = f"""# 段落列表 {section_text} # 元素列表 {elements_text} # 任务 对每个元素统计出现的段落和频次 ## 统计规则 ### 1. 具体元素统计(只统计视觉实体) - **出现频次**: 统计该**单一视觉实体对象**在视频图像中直接观察到的次数 - **出现段落列表**: 只统计能在视频图像中**直接看到该视觉实体**的段落 ### 2. 具象概念统计(只统计文字字面) - **出现频次**: 统计该名词在视频画面文字和口播内容中**画面或者口播内容中出现**的次数 - **出现段落列表**: 只统计**视频画面文字或者口播内容中包含该名词**的段落 ### 3. 抽象概念统计(统计语义归类) - **出现频次**: 统计该概念被**隐含表达**的总次数 - **出现段落列表**: 统计**包含该概念所归类的具体元素/具象概念**的段落 # 输出(JSON) [ {{ "id": "元素id", "名称": "元素名称", "出现频次": 0, "出现段落列表": [ {{ "段落ID": "段落id", "如何体现": "描述该元素在这个段落中的具体体现方式" }} ] }} ] """ # 使用视频分析接口做共性分析 llm_result = LLMInvoker.safe_invoke_video_analysis( operation_name="共性分析", video_file=video_file, prompt=prompt, agent=self, fallback=[] ) # 计算覆盖率 analyzed_items = [] for analysis in llm_result: section_list = analysis.get("出现段落列表", []) unique_paragraph_ids = set() for item in section_list: unique_paragraph_ids.add(item.get("段落ID", "")) coverage_count = len(unique_paragraph_ids) coverage_rate = round(coverage_count / total_sections, 4) if total_sections > 0 else 0 analyzed_items.append({ "id": analysis.get("id", 0), "名称": analysis.get("名称", ""), "出现频次": analysis.get("出现频次", 0), "出现段落列表": section_list, "出现段落数": coverage_count, "段落覆盖率": coverage_rate }) return analyzed_items def _step5_multi_dimensional_scoring( self, substance_elements: List[dict], analyzed_result: list, inspiration_points: dict, purpose_points: list, substance_key_points: list ) -> dict: """Step 5: 多维度评分(相似度逻辑已废弃) 说明: - 不再进行任何相似度计算,完全依赖后续的“意图支撑”进行筛选 - 保留函数与返回结构,仅返回空结果,避免下游依赖崩溃 """ logger.info( "【多维度评分】相似度比较逻辑已关闭,当前不进行评分,仅返回空结果。" ) return { "灵感点": [], "目的点": [], "关键点": [], } def _step5_1_intention_support_evaluation( self, video_file, substance_elements: List[dict], analyzed_result: list, inspiration_points: dict, purpose_points: list, substance_key_points: list, text_data: dict, ) -> dict: """Step 5.1: 意图支撑评估 说明: - 在保留相似度评分的基础上,增加一套“意图支撑”视角的评估 - 不再使用频次/覆盖率作为筛选条件,仅用于日志与统计 - 结果以元素-点的支撑关系形式返回,不直接参与筛选决策 """ if not substance_elements: return {"灵感点": [], "目的点": [], "关键点": []} logger.info(f"【意图支撑评估】输入: {len(substance_elements)} 个实质元素") # 按二级维度分组(不做频次过滤,全部评估) dimension_groups = { "具体元素": [], "具象概念": [], "抽象概念": [], } for elem in substance_elements: second_level = elem.get("维度", {}).get("二级", "") if second_level in dimension_groups: dimension_groups[second_level].append(elem) logger.info( "维度分组(意图支撑): 具体元素=%d, 具象概念=%d, 抽象概念=%d", len(dimension_groups["具体元素"]), len(dimension_groups["具象概念"]), len(dimension_groups["抽象概念"]), ) # 并行评估(各维度 x 3个点类型) futures = {} def submit_if_needed(dimension_name: str, point_type: str, points_list): if not points_list: logger.info( f"⏭️ 跳过意图支撑评估: {dimension_name}-{point_type} " f"(原因: 点列表为空, len={len(points_list) if isinstance(points_list, list) else 'N/A'})" ) return if not dimension_groups.get(dimension_name): logger.info( f"⏭️ 跳过意图支撑评估: {dimension_name}-{point_type} " f"(原因: 该维度无元素, len={len(dimension_groups.get(dimension_name, []))})" ) return key = (dimension_name, point_type) logger.info( f"📤 提交意图支撑评估任务: {dimension_name}-{point_type} " f"(元素数={len(dimension_groups[dimension_name])}, 点数={len(points_list)})" ) futures[key] = _GLOBAL_THREAD_POOL.submit( self._evaluate_support_by_dimension, video_file, dimension_name, dimension_groups[dimension_name], points_list, point_type, text_data, ) # 具体元素 / 具象概念 / 抽象概念 × 灵感点 / 目的点 / 关键点 for dim in ["具体元素", "具象概念", "抽象概念"]: submit_if_needed(dim, "灵感点", inspiration_points if isinstance(inspiration_points, list) else []) submit_if_needed(dim, "目的点", purpose_points if isinstance(purpose_points, list) else []) submit_if_needed(dim, "关键点", substance_key_points if isinstance(substance_key_points, list) else []) # 收集结果(按点类型汇总) result = { "灵感点": [], "目的点": [], "关键点": [], } for (dimension_name, point_type), future in futures.items(): try: dimension_result = future.result() if dimension_result: result[point_type].extend(dimension_result) logger.info( f"✅ 意图支撑-{dimension_name}-{point_type} 评估完成: {len(dimension_result)} 条支撑关系" ) except Exception as e: logger.error(f"❌ 意图支撑-{dimension_name}-{point_type} 评估失败: {e}") return result def _evaluate_support_by_dimension( self, video_file, dimension_name: str, elements: list, points: list, point_type: str, text_data: dict, ) -> list: """按维度评估意图支撑关系(分批处理)""" if not self.is_initialized: self.initialize() if not elements or not points: return [] # 分批控制:元素数 × 点数 ≈ 100 以内 num_elements = len(elements) num_points = len(points) max_batch_product = 100 max_elements_per_batch = max(1, int(max_batch_product / max(1, num_points))) num_batches = (num_elements + max_elements_per_batch - 1) // max_elements_per_batch batch_futures = {} for batch_idx in range(num_batches): start_idx = batch_idx * max_elements_per_batch end_idx = min(start_idx + max_elements_per_batch, num_elements) batch_elements = elements[start_idx:end_idx] future = _GLOBAL_THREAD_POOL.submit( self._evaluate_support_single_batch_by_dimension, video_file, dimension_name, batch_elements, points, point_type, text_data, ) batch_futures[batch_idx] = future # 收集结果 all_results = [] for batch_idx, future in batch_futures.items(): try: batch_result = future.result() if batch_result: all_results.extend(batch_result) except Exception as e: logger.error(f"【意图支撑-{dimension_name}】批次 {batch_idx + 1} 失败: {e}") # 合并结果(支撑结果) merged_results = self._merge_support_batch_results(all_results) return merged_results def _evaluate_support_single_batch_by_dimension( self, video_file, dimension_name: str, batch_elements: list, points: list, point_type: str, text_data: dict, ) -> list: """单批次意图支撑评估(按维度)""" if not self.is_initialized: self.initialize() post_content = self._build_post_content(text_data) elements_text = self._build_simple_items_text_dimension(batch_elements, dimension_name) points_text = self._build_points_text(point_type, points) # 根据维度选择不同的 prompt if dimension_name == "具体元素": prompt = self._build_concrete_element_support_prompt(post_content, elements_text, points_text) elif dimension_name == "具象概念": prompt = self._build_concrete_concept_support_prompt(post_content, elements_text, points_text) elif dimension_name == "抽象概念": prompt = self._build_abstract_concept_support_prompt(post_content, elements_text, points_text) else: logger.error(f"未知维度(意图支撑): {dimension_name}") return [] # 使用视频分析接口,多模态评估意图支撑 result = LLMInvoker.safe_invoke_video_analysis( operation_name=f"意图支撑评估-{dimension_name}-{point_type}", video_file=video_file, prompt=prompt, agent=self, fallback=[], ) return result def _evaluate_support_in_batches( self, elements: list, points: list, point_type: str, max_batch_product: int = 100 ) -> list: """分批评估相似度""" if not self.is_initialized: self.initialize() if not points: return [] num_elements = len(elements) num_points = len(points) max_elements_per_batch = max(1, int(max_batch_product / num_points)) num_batches = (num_elements + max_elements_per_batch - 1) // max_elements_per_batch # 分批处理 batch_futures = {} for batch_idx in range(num_batches): start_idx = batch_idx * max_elements_per_batch end_idx = min(start_idx + max_elements_per_batch, num_elements) batch_elements = elements[start_idx:end_idx] future = _GLOBAL_THREAD_POOL.submit( self._evaluate_support_single_batch, batch_elements, points, point_type ) batch_futures[batch_idx] = future # 收集结果 all_results = [] for batch_idx, future in batch_futures.items(): try: batch_result = future.result() if batch_result: all_results.extend(batch_result) except Exception as e: logger.error(f"批次 {batch_idx + 1} 失败: {e}") # 合并并筛选(每个元素保留最相关的1-2个点) merged_results = self._merge_batch_results(all_results) return merged_results def _evaluate_support_single_batch( self, batch_elements: list, points: list, point_type: str ) -> list: """单批次评估""" if not self.is_initialized: self.initialize() elements_text = self._build_simple_items_text(batch_elements) points_text = self._build_points_text(point_type, points) prompt = f"""# 元素列表 {elements_text} # 点列表 {points_text} # 任务 对每个元素计算元素与点的文本相似度和语义相似度 # 输出(JSON) [ {{ "id": "元素id", "名称": "元素名称", "相似度结果": [ {{ "点":"点的名称", "语义相似度":0.21, "语义相似度理由": "理由", "文本相似度":0.33, "文本相似度理由": "理由" }} ] }} ] """ messages = [ {"role": "system", "content": self.system_prompt}, {"role": "user", "content": prompt} ] result = LLMInvoker.safe_invoke( self, f"评估支撑{point_type}", messages, fallback=[] ) return result def _merge_batch_results(self, all_results: list) -> list: """合并批次结果,每个元素只保留最相关的1-2个点""" if not all_results: return [] merged_map = {} for item in all_results: element_id = item.get("id") if element_id not in merged_map: merged_map[element_id] = { "id": element_id, "名称": item.get("名称"), "相似度结果": [] } if not merged_map[element_id]["相似度结果"]: merged_map[element_id]["相似度结果"] = item.get("相似度结果", []) # 筛选每个元素的相似度结果 for element_data in merged_map.values(): similarity_results = element_data.get("相似度结果", []) if not similarity_results: continue max_text_sim_point = max(similarity_results, key=lambda x: x.get("文本相似度", 0)) max_semantic_sim_point = max(similarity_results, key=lambda x: x.get("语义相似度", 0)) if max_text_sim_point.get("点") == max_semantic_sim_point.get("点"): filtered_results = [max_text_sim_point] else: filtered_results = [max_text_sim_point, max_semantic_sim_point] element_data["相似度结果"] = filtered_results return list(merged_map.values()) def _merge_support_batch_results(self, all_results: list) -> list: """合并批次结果(意图支撑),直接合并支撑的元素-点对""" if not all_results: return [] merged_map = {} for item in all_results: element_id = item.get("id") if element_id not in merged_map: merged_map[element_id] = { "id": element_id, "名称": item.get("名称"), "支撑结果": [], } # 这里假设下游会控制去重,只在首次合并时写入 if not merged_map[element_id]["支撑结果"]: merged_map[element_id]["支撑结果"] = item.get("支撑结果", []) return list(merged_map.values()) def _step6_filter_elements( self, analyzed_result: list, scored_result: dict, intention_support_result: dict, ) -> list: """Step 6: 筛选实质元素 新的保留策略(基于意图支撑关系 + 覆盖率进行筛选): - 覆盖率和频次主要用于统计展示,但会作为必要条件之一 - 必须**同时**满足以下三个条件才保留: - 出现频次 > 1 - 存在任意“意图支撑”关系 - 段落覆盖率 > 0.3(30%) - 相似度评分相关逻辑已全部停用,不再参与筛选 """ if not analyzed_result: return [] # 创建 analyzed_result 的映射 analyzed_map = {item.get("id"): item for item in analyzed_result} # 创建意图支撑映射:只要某个元素在任一维度、任一点类型下有支撑关系,即视为“有支撑” intention_support_map = {} if intention_support_result: for point_type in ["灵感点", "目的点", "关键点"]: dimension_data = intention_support_result.get(point_type, []) for item in dimension_data: if not isinstance(item, dict): continue element_id = item.get("id") support_results = item.get("支撑结果", []) if not element_id: continue if element_id not in intention_support_map: intention_support_map[element_id] = [] # 只要有一条支撑结果就认为该元素“有支撑关系” if support_results: intention_support_map[element_id].extend(support_results) # 筛选 filtered_ids = [] rejected_ids = [] for element_id, analyzed_data in analyzed_map.items(): element_name = analyzed_data.get("名称", "N/A") # 声音类型的实质元素(背景音乐、音效声等)直接通过筛选,不参与后续判断 sound_type_names = ["背景音乐", "音效声"] if element_name in sound_type_names: filtered_ids.append(element_id) logger.info( f"✅ 保留: id={element_id}, name={element_name}, 原因=声音类型元素,豁免筛选" ) continue # 确保 frequency 是整数类型 frequency_raw = analyzed_data.get("出现频次", 0) try: frequency = int(frequency_raw) if frequency_raw is not None else 0 except (ValueError, TypeError): frequency = 0 # 确保 coverage_rate 是浮点数类型 coverage_rate_raw = analyzed_data.get("段落覆盖率", 0.0) try: coverage_rate = float(coverage_rate_raw) if coverage_rate_raw is not None else 0.0 except (ValueError, TypeError): coverage_rate = 0.0 # 频次过滤:出现频次<=1 的直接过滤(不再继续做支撑和覆盖率判断) if frequency <= 1: rejected_ids.append(element_id) logger.info( "❌ 过滤: id=%s, name=%s, 原因=出现频次<=1 (frequency=%d)", element_id, element_name, frequency, ) continue support_info = intention_support_map.get(element_id, []) has_support = bool(support_info) has_high_coverage = coverage_rate > 0.3 # 出现频次>1 且 有意图支撑关系 且 段落覆盖率 > 30%:直接保留,不进行相似度比较 if has_support and has_high_coverage: filtered_ids.append(element_id) logger.info( f"✅ 保留: id={element_id}, name={element_name}, " f"support_count={len(support_info)}, coverage={coverage_rate}" ) continue # 不满足“出现频次>1 + 有意图支撑 + 覆盖率>30%”的元素全部过滤 rejected_ids.append(element_id) logger.info( "❌ 过滤: id=%s, name=%s, 原因=无隐含概念豁免且" "未同时满足出现频次>1、有意图支撑关系和段落覆盖率>0.3 " "(frequency=%d, coverage=%.4f)", element_id, element_name, frequency, coverage_rate, ) logger.info(f"筛选完成: {len(filtered_ids)}/{len(analyzed_result)} 通过") return filtered_ids def _step7_categorize_elements( self, substance_elements: List[dict], filtered_ids: list ) -> dict: """Step 7: 元素分类 - 按二级维度分别分类""" if not filtered_ids: return {} # 只保留筛选后的元素 filtered_elements = [ elem for elem in substance_elements if elem.get("id") in filtered_ids ] # 按二级维度分组 dimension_groups = { "具体元素": [], "具象概念": [], "抽象概念": [], } for elem in filtered_elements: second_level = elem.get("维度", {}).get("二级", "") if second_level in dimension_groups: dimension_groups[second_level].append(elem) # 并行分类 categorization_results = {} futures = {} for dimension_name, elements in dimension_groups.items(): if not elements: continue future = _GLOBAL_THREAD_POOL.submit( self._categorize_single_dimension, dimension_name, elements ) futures[dimension_name] = future # 收集结果 for dimension_name, future in futures.items(): try: categorization_results[dimension_name] = future.result() except Exception as e: logger.error(f"{dimension_name} 分类失败: {e}") categorization_results[dimension_name] = {"元素分类": []} return categorization_results def _categorize_single_dimension( self, dimension_name: str, elements: list ) -> dict: """对单个维度的元素进行分类""" if not self.is_initialized: self.initialize() elements_text = json.dumps([ {"id": elem.get("id"), "名称": elem.get("名称"), "描述": elem.get("描述")} for elem in elements ], ensure_ascii=False, indent=2) prompt = f"""# 任务 对"{dimension_name}"维度的元素进行分类 # 元素列表 {elements_text} # 分类要求 ## 核心原则 1. **单一原子名词**: 分类名称必须是单一的原子名词 2. **MECE原则**: 分类之间相互独立、完全穷尽 3. **确定性归属**: 每个元素只能归属唯一一个分类 4. **层级限制**: 最多2层 5. 元素可以没有分类,不要强行归类 6. 分类下面至少要有2个元素,否则不要分类 ## 实质维度的分类逻辑 - **核心原则**:按照**内容本质、属性特征、功能作用**等角度来分类 - **包含**: - 物理特征:形态、材质、颜色等 - 功能用途:工具、装饰、食物等 - 概念类别:情感、价值观、技能等 - 领域归属:科技、艺术、健康等 # 输出格式(JSON) {{ "元素分类": [ {{ "元素id": "元素的ID", "元素名称": "元素名称", "分类": ["一级分类","二级分类","..."] }} ] }} """ messages = [ {"role": "system", "content": self.system_prompt}, {"role": "user", "content": prompt} ] result = LLMInvoker.safe_invoke( self, f"分类-{dimension_name}", messages, fallback={} ) return result def _merge_all_info( self, substance_elements: List[dict], analyzed_result: list, scored_result: dict, intention_support_result: dict, filtered_ids: list, categorized_result: dict ) -> list: """Step 8: 合并所有信息 - 每个元素包含所有中间信息""" if not filtered_ids: return [] # 创建映射 extraction_map = {item.get("id"): item for item in substance_elements} analyzed_map = {item.get("id"): item for item in analyzed_result} # 创建评分映射(相似度) scored_map = {} for dimension in ["灵感点", "目的点", "关键点"]: dimension_data = scored_result.get(dimension, []) for item in dimension_data: if not isinstance(item, dict): continue element_id = item.get("id") if element_id not in scored_map: scored_map[element_id] = {} similarity_results = item.get("相似度结果", []) sorted_results = sorted( similarity_results, key=lambda x: (x.get("文本相似度", 0), x.get("语义相似度", 0)), reverse=True ) scored_map[element_id][dimension] = sorted_results # 创建意图支撑映射 intention_support_map = {} if intention_support_result: for dimension in ["灵感点", "目的点", "关键点"]: dimension_data = intention_support_result.get(dimension, []) for item in dimension_data: if not isinstance(item, dict): continue element_id = item.get("id") if element_id not in intention_support_map: intention_support_map[element_id] = {} support_results = item.get("支撑结果", []) intention_support_map[element_id][dimension] = support_results # 创建分类映射 category_map = {} for dimension_data in categorized_result.values(): element_classifications = dimension_data.get("元素分类", []) for classification in element_classifications: element_id = classification.get("元素id") category_info = classification.get("分类", {}) if element_id: category_map[element_id] = category_info # 合并信息 final_elements = [] for element_id in filtered_ids: base_info = extraction_map.get(element_id, {}) analysis_info = analyzed_map.get(element_id, {}) scoring_info = scored_map.get(element_id, {}) intention_info = intention_support_map.get(element_id, {}) category_info = category_map.get(element_id, {}) merged_element = { "id": base_info.get("id"), "名称": base_info.get("名称"), "描述": base_info.get("描述"), "维度": base_info.get("维度", {}), "分类": category_info, "共性分析": { "出现频次": analysis_info.get("出现频次", 0), "出现段落列表": analysis_info.get("出现段落列表", []), "出现段落数": analysis_info.get("出现段落数", 0), "段落覆盖率": analysis_info.get("段落覆盖率", 0.0) }, "多维度评分": { "灵感点": scoring_info.get("灵感点", []), "目的点": scoring_info.get("目的点", []), "关键点": scoring_info.get("关键点", []) }, "意图支撑": { "灵感点": intention_info.get("灵感点", []), "目的点": intention_info.get("目的点", []), "关键点": intention_info.get("关键点", []) } } # 根据不同类型添加特定字段 second_level = base_info.get("维度", {}).get("二级", "") if second_level == "具体元素": merged_element["来源"] = base_info.get("来源", []) elif second_level == "具象概念": merged_element["来源"] = base_info.get("来源", []) merged_element["字面位置"] = base_info.get("字面位置", []) elif second_level == "抽象概念" or second_level == "隐含概念": merged_element["类型"] = base_info.get("类型", "") merged_element["来源"] = base_info.get("来源", {}) merged_element["推理过程"] = base_info.get("推理过程", "") merged_element["推理层次"] = base_info.get("推理层次", 1) final_elements.append(merged_element) return final_elements # ========== 辅助方法 ========== def _build_section_text(self, section_division: dict) -> str: """构建段落划分文本""" if not section_division: return "无段落划分信息" sections = section_division.get("段落列表", []) if not sections: return "无段落信息" def build_section_list(section_list, indent=0): text = "" for section in section_list: if section.get('子项'): text += build_section_list(section['子项'], indent + 1) else: section_id = section.get('id', 'N/A') section_desc = section.get('描述', 'N/A') content_range = section.get('内容范围', 'N/A') text += f"{section_id}: {section_desc}\n内容范围: {content_range}\n" return text return "段落列表:\n" + build_section_list(sections) def _build_post_content(self, text_data: dict) -> str: """构建原文内容文本(用于意图支撑判断) 这里不假设具体结构,直接以 JSON 形式展开,保证信息完整可见。 """ if not text_data: return "无文本内容" try: return json.dumps(text_data, ensure_ascii=False, indent=2) except TypeError: # 避免非序列化对象导致报错 return str(text_data) def _build_simple_items_text_dimension(self, elements: list, dimension_name: str) -> str: """构建某个维度下元素列表文本(用于意图支撑判断)""" simple_items = [ { "id": elem.get("id", "N/A"), "名称": elem.get("名称", "N/A"), "描述": elem.get("描述", "N/A"), } for elem in elements ] return json.dumps( {"维度": dimension_name, "元素列表": simple_items}, ensure_ascii=False, indent=2, ) def _build_simple_items_text(self, elements: list) -> str: """构建元素列表文本""" grouped_elements = { "具体元素": [], "具象概念": [], "抽象概念": [], } for elem in elements: element_type = elem.get('维度', {}).get('二级', 'N/A') element_data = { "id": elem.get('id', 'N/A'), "名称": elem.get('名称', 'N/A'), "描述": elem.get('描述', 'N/A') } if element_type in grouped_elements: grouped_elements[element_type].append(element_data) filtered_groups = {k: v for k, v in grouped_elements.items() if v} return json.dumps(filtered_groups, ensure_ascii=False, indent=2) def _build_points_text(self, point_type: str, points_data) -> str: """构建点列表文本""" if not points_data: return f"无{point_type}信息" filtered_points = [ {"名称": item.get(point_type, 'N/A')} for item in points_data if isinstance(item, dict) ] return json.dumps(filtered_points, ensure_ascii=False, indent=2) def _build_concrete_element_support_prompt( self, post_content: str, elements_text: str, points_text: str ) -> str: """构建具体元素的意图支撑判断 prompt(基于视频画面)""" return f"""# 原文内容 {post_content} # 具体元素列表 {elements_text} # 点列表 {points_text} # 任务 判断每个**具体元素**是否对点有关键支撑 ## 具体元素定义(重要!) - 定义:视频画面中直接观察到的单一视觉实体对象 - 判断标准:可以指着画面说"这是一个X" - 剥离测试:去掉表达方式后,该视觉实体仍然存在 ## 核心判断原则:仅基于视频画面语境 ### 关键约束 1. 只看视频画面:具体元素的支撑判断**只能基于视频中的视觉实体**,不能基于文字论述 2. 视觉实体角色:该视觉实体在视频画面中的作用是什么? - ✅ 核心展示对象:该视觉实体是画面的核心展示内容 - ❌ 辅助/装饰:该视觉实体只是背景、装饰、示意 3. 关键支撑:该视觉实体对点的表达是否关键?去掉它是否会明显削弱点的支撑? ### 判断流程 1. 理解点的意图,点想表达什么 2. 在视频画面中找到该视觉实体 3. 判断:去掉该视觉实体,是否无法完整表达点 - 如果是,支撑 - 如果不是,不支撑 ### 严格标准 - 禁止使用文字内容来判断具体元素的支撑 - 禁止仅凭名称字面匹配判定支撑 - 必须基于该视觉实体在画面中的实际角色 # 输出(JSON) 只输出有关键支撑的元素-点对,不支撑的不输出 [ {{ "id": "元素id", "名称": "元素名称", "支撑结果": [ {{ "点": "点的名称", "点的意图": "点想表达什么", "支撑理由": "说明为什么去掉该视觉实体,会削弱点的表达,程度达到30%以上" }} ] }} ] 注意: 1. 只基于视频画面判断 2. 只输出"关键支撑"的元素-点对 3. 辅助/装饰元素直接排除,不输出 4. 必须基于视频画面中的视觉实体判断,不能做字面匹配""" def _build_concrete_concept_support_prompt( self, post_content: str, elements_text: str, points_text: str ) -> str: """构建具象概念的意图支撑判断 prompt(基于文字语境)""" return f"""# 原文内容 {post_content} # 具象概念列表 {elements_text} # 点列表 {points_text} # 任务 判断每个**具象概念**是否对点有关键支撑 ## 具象概念定义(重要!) - 定义:文字中字面出现的名词(包括标题、正文、字幕、视频画面中的文字) - 判断标准:文字中实际出现,禁止语义推导 ## 核心判断原则:仅基于文字语境(包含视频中的文字) ### 关键约束 1. 只看文字:具象概念的支撑判断**只能基于文字中的概念论述**,不能基于视频中的视觉实体 2. 概念角色:该概念在文字论述中的作用是什么? - ✅ 核心论述概念:该概念是文字论述的核心对象、关键主题 - ❌ 次要提及:该概念只是顺带提及、举例说明 3. 关键支撑:该概念对点的表达是否关键?去掉它是否会明显削弱点的支撑? ### 判断流程 1. 理解点的意图,点想表达什么 2. 在标题、正文、字幕、画面文字中找到该概念出现的位置 3. 判断:去掉该段文字,是否无法完整表达点 - 如果是,支撑 - 如果不是,不支撑 ### 严格标准 - 禁止用视频画面中的视觉实体来判断具象概念的支撑 - 禁止仅凭名称字面匹配判定支撑 - 必须判断该概念在文字论述中的实际角色 # 输出(JSON) 只输出有关键支撑的元素-点对,不支撑的不输出 [ {{ "id": "元素id", "名称": "元素名称", "支撑结果": [ {{ "点": "点的名称", "点的意图": "点想表达什么", "支撑理由": "说明为什么去掉该概念,会削弱点的表达,程度达到30%以上" }} ] }} ] 注意: 1. 只基于文字判断 2. 只输出"关键支撑"的元素-点对 3. 次要提及的概念直接排除,不输出 4. 必须基于文字中的概念论述判断,不能做字面匹配""" def _build_abstract_concept_support_prompt( self, post_content: str, elements_text: str, points_text: str ) -> str: """构建抽象概念的意图支撑判断 prompt""" return f"""# 原文内容 {post_content} # 抽象概念列表 {elements_text} # 点列表 {points_text} # 任务 判断每个**抽象概念**是否对点有关键支撑 ## 抽象概念定义(重要!) - 定义:从具体元素和具象概念中理解到的上位抽象 - 类型1-上位抽象(归类):是下位元素的类别、分类 - 类型2-引申含义:需要理解上下文的深层含义 - 剥离测试:去掉表达方式后,该抽象概念仍然存在 ## 核心判断原则:基于来源语境 ### 关键约束 1. 追溯来源:抽象概念来源于具体元素和/或具象概念,必须追溯到来源 2. 继承语境:抽象概念的语境继承自其来源 - 如果来源主要是具体元素 → 语境偏向视频画面 - 如果来源主要是具象概念 → 语境偏向文字 - 如果来源混合 → 综合判断 3. 关键支撑:该抽象概念对点的表达是否关键? ### 判断流程 1. 理解点的意图:点想表达什么? 2. 根据来源确定该抽象概念的主要语境 3. 判断:去掉该抽象概念,是否无法完整表达点 - 如果是,支撑 - 如果不是,不支撑 ### 严格标准 - 必须基于来源的语境来判断 - 禁止仅凭名称字面匹配判定支撑 - 必须能够追溯到来源元素,验证支撑关系 # 输出(JSON) 只输出有关键支撑的元素-点对,不支撑的不输出 [ {{ "id": "元素id", "名称": "元素名称", "支撑结果": [ {{ "点": "点的名称", "来源追溯": "该抽象概念的来源(具体元素/具象概念)及其语境", "语境分析": "基于来源确定的语境(画面/文字/混合)", "支撑理由": "说明该抽象概念为什么对该点有关键支撑" }} ] }} ] 注意: 1. 必须追溯到来源元素 2. 必须继承来源的语境来判断 3. 只输出"关键支撑"的元素-点对 4. 禁止字面匹配""" # 隐含概念相关的意图支撑判断已移除 def _count_sections(self, section_division: dict) -> int: """统计段落总数(只统计叶子节点)""" if not section_division: return 0 sections = section_division.get("段落列表", []) if not sections: return 0 def count_leaf_nodes(section_list): count = 0 for section in section_list: children = section.get("子项", []) if children: count += count_leaf_nodes(children) else: count += 1 return count return count_leaf_nodes(sections) def _build_messages(self, state: dict) -> List[dict]: """构建消息 - 本Agent不使用此方法""" return [] def _update_state(self, state: dict, response) -> dict: """更新状态 - 本Agent不使用此方法""" return state