#!/usr/bin/env python # -*- coding: utf-8 -*- """ 评估Agent 功能: 从待评估的视频列表中筛选出和原视频最匹配的内容 核心任务: 根据原视频的解构内容,对每个候选视频进行相关性评分和筛选 特征: 使用LLM进行相关性评估,输出评分和是否入选 """ from typing import Any, Dict, List import json from src.components.agents.base import BaseLLMAgent from src.utils.logger import get_logger from src.utils.llm_invoker import LLMInvoker logger = get_logger(__name__) class EvaluateAgent(BaseLLMAgent): """评估Agent - 筛选和原视频最匹配的视频""" def __init__( self, name: str = "evaluate_agent", description: str = "评估Agent - 筛选和原视频最匹配的视频", model_provider: str = "google_genai", temperature: float = 0.3, max_tokens: int = 20480 ): """ 初始化评估Agent Args: name: Agent名称 description: Agent描述 model_provider: 模型提供商 ("openai" 或 "google_genai") temperature: 生成温度(较低,保持客观性) max_tokens: 最大token数 """ system_prompt = self._build_system_prompt() super().__init__( name=name, description=description, model_provider=model_provider, system_prompt=system_prompt, temperature=temperature, max_tokens=max_tokens ) def _build_system_prompt(self) -> str: """构建系统提示词""" return """你是内容分析专家,擅长评估视频内容的相关性和匹配度。 # 任务 根据原视频的解构内容(包括标题、灵感点、目的点、关键点、选题理解等),对候选视频列表进行相关性评估和筛选。 # 评估维度 1. **内容主题匹配度**:候选视频的主题是否与原视频的主题一致或相关 2. **切入点相似度**:候选视频的切入角度是否与原视频相似 3. **受众重合度**:候选视频的目标受众是否与原视频重合 4. **表现形式相似度**:候选视频的表现形式(风格、结构等)是否与原视频相似 # 评分标准 - 相关性得分范围:0-100分 - 90-100分:高度相关,主题、切入点、受众、表现形式都高度匹配 - 70-89分:相关,在多个维度上匹配 - 50-69分:中等相关,在某些维度上匹配 - 30-49分:低相关,匹配度较低 - 0-29分:不相关,基本不匹配 # 输出要求 1. 对每个候选视频进行评分(0-100分) 2. 根据评分排序,默认选前50%进入候选 3. 输出结果保持与输入列表相同的顺序和字段,新增两个字段: - relevance_score: 相关性得分(0-100) - is_selected: 是否入选(true/false) """ def process(self, state: Dict[str, Any], config=None) -> Dict[str, Any]: """处理状态 - 评估视频相关性并筛选 Args: state: 状态字典,包含: - original_video_title: 原视频标题 - original_video_content: 原视频解构内容(JSON格式) - search_result: 待评估的视频列表 Returns: 更新后的状态,包含: - evaluate_result: 评估结果列表(每个视频包含原始字段 + relevance_score + is_selected) """ if not self.is_initialized: self.initialize() logger.info("开始评估视频相关性") try: # 从state获取数据 original_video_title = state.get("original_video_title", "") original_video_content = state.get("original_video_content", {}) search_result = state.get("search_result", []) if not search_result: logger.warning("待评估的视频列表为空") return { "evaluate_result": [] } if not original_video_title and not original_video_content: logger.warning("原视频信息为空,无法进行评估") return { "evaluate_result": [] } # 构建评估提示词 prompt = self._build_evaluate_prompt( original_video_title, original_video_content, search_result ) messages = [ {"role": "system", "content": self.system_prompt}, {"role": "user", "content": prompt} ] # 调用LLM进行评估 result = LLMInvoker.safe_invoke( self, "视频相关性评估", messages, fallback={"评估结果": []} ) # 提取评估结果 evaluate_result = result.get("评估结果", []) # 如果LLM返回的结果数量与输入不一致,进行修正 if len(evaluate_result) != len(search_result): logger.warning( f"LLM返回结果数量({len(evaluate_result)})与输入数量({len(search_result)})不一致," "将进行修正" ) evaluate_result = self._fix_evaluate_result(search_result, evaluate_result) # 确保每个结果都有relevance_score和is_selected字段 evaluate_result = self._ensure_evaluate_fields(search_result, evaluate_result) # 根据评分排序并标记前50%为入选 evaluate_result = self._mark_selected_videos(evaluate_result) logger.info(f"评估完成,共评估{len(evaluate_result)}个视频") return { "evaluate_result": evaluate_result } except Exception as e: logger.error(f"视频评估失败: {e}", exc_info=True) # 返回原始列表,但添加默认的评分和选择状态 search_result = state.get("search_result", []) return { "evaluate_result": [ {**video, "relevance_score": 0, "is_selected": False} for video in search_result ] } def _build_evaluate_prompt( self, original_video_title: str, original_video_content: Dict[str, Any], search_result: List[Dict[str, Any]] ) -> str: """构建评估提示词""" # 格式化原视频内容 content_str = json.dumps(original_video_content, ensure_ascii=False, indent=2) # 格式化候选视频列表 candidates_text = "" for i, video in enumerate(search_result, 1): video_str = json.dumps(video, ensure_ascii=False, indent=2) candidates_text += f"\n## 候选视频 {i}\n{video_str}\n" prompt = f"""# 任务:评估视频相关性 ## 原视频信息 ### 标题 {original_video_title} ### 解构内容 {content_str} ## 候选视频列表 {candidates_text} ## 评估要求 1. **对每个候选视频进行相关性评分**(0-100分) - 考虑内容主题匹配度、切入点相似度、受众重合度、表现形式相似度 - 评分要客观、准确 2. **输出格式要求** - 保持与输入列表相同的顺序 - 保留原始字段不变 - 新增两个字段: - `relevance_score`: 相关性得分(整数,0-100) - `is_selected`: 是否入选(布尔值,暂时设为false,后续会根据评分排序后标记前50%) ## 输出格式(JSON) ```json {{ "评估结果": [ {{ // 保留原始字段... "relevance_score": 85, "is_selected": false }} ] }} ``` **重要**: - 输出结果的数量必须与输入列表的数量完全一致 - 每个结果必须包含所有原始字段 - 每个结果必须包含relevance_score和is_selected字段 """ return prompt def _fix_evaluate_result( self, original_list: List[Dict[str, Any]], llm_result: List[Dict[str, Any]] ) -> List[Dict[str, Any]]: """修正评估结果,确保数量一致""" fixed_result = [] # 创建LLM结果的索引(通过某些唯一字段匹配) llm_result_map = {} for item in llm_result: # 尝试通过video_id或其他唯一字段匹配 video_id = item.get("video_id") or item.get("id") or item.get("videoId") if video_id: llm_result_map[str(video_id)] = item # 遍历原始列表,匹配LLM结果 for i, original in enumerate(original_list): video_id = original.get("video_id") or original.get("id") or original.get("videoId") if video_id and str(video_id) in llm_result_map: # 找到匹配的结果,合并字段 matched = llm_result_map[str(video_id)] fixed_item = {**original, **matched} fixed_result.append(fixed_item) elif i < len(llm_result): # 按索引匹配 matched = llm_result[i] fixed_item = {**original, **matched} fixed_result.append(fixed_item) else: # 没有匹配的结果,使用原始数据并添加默认评分 fixed_item = {**original, "relevance_score": 0, "is_selected": False} fixed_result.append(fixed_item) return fixed_result def _ensure_evaluate_fields( self, original_list: List[Dict[str, Any]], evaluate_result: List[Dict[str, Any]] ) -> List[Dict[str, Any]]: """确保每个评估结果都有必要的字段""" ensured_result = [] for i, original in enumerate(original_list): if i < len(evaluate_result): item = evaluate_result[i] # 合并原始字段和评估字段 merged_item = {**original} # 确保有relevance_score if "relevance_score" in item: merged_item["relevance_score"] = item["relevance_score"] else: merged_item["relevance_score"] = 0 # 确保有is_selected(暂时设为false,后续会重新标记) merged_item["is_selected"] = False ensured_result.append(merged_item) else: # 如果LLM结果不足,使用原始数据并添加默认值 ensured_result.append({ **original, "relevance_score": 0, "is_selected": False }) return ensured_result def _mark_selected_videos(self, evaluate_result: List[Dict[str, Any]]) -> List[Dict[str, Any]]: """根据评分排序并标记前50%为入选,然后恢复原始顺序""" if not evaluate_result: return evaluate_result # 为每个视频添加临时索引,以便后续恢复原始顺序 indexed_result = [ {**item, "_original_index": i} for i, item in enumerate(evaluate_result) ] # 按评分降序排序 sorted_result = sorted( indexed_result, key=lambda x: x.get("relevance_score", 0), reverse=True ) # 计算前50%的数量(向上取整) selected_count = max(1, (len(sorted_result) + 1) // 2) # 标记前50%为入选 for i, item in enumerate(sorted_result): item["is_selected"] = (i < selected_count) # 恢复原始顺序 sorted_result.sort(key=lambda x: x.get("_original_index", len(evaluate_result))) # 移除临时索引 for item in sorted_result: item.pop("_original_index", None) return sorted_result def _build_messages(self, state: Dict[str, Any]) -> List[Dict[str, Any]]: """构建消息 - BaseLLMAgent要求实现(本Agent不使用此方法)""" return [] def _update_state(self, state: Dict[str, Any], response: Any) -> Dict[str, Any]: """更新状态 - BaseLLMAgent要求实现(本Agent不使用此方法)""" return state