evaluate_agent.py 12 KB


  1. #!/usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. """
  4. 评估Agent
  5. 功能: 从待评估的视频列表中筛选出和原视频最匹配的内容
  6. 核心任务: 根据原视频的解构内容,对每个候选视频进行相关性评分和筛选
  7. 特征: 使用LLM进行相关性评估,输出评分和是否入选
  8. """
  9. from typing import Any, Dict, List
  10. import json
  11. from src.components.agents.base import BaseLLMAgent
  12. from src.utils.logger import get_logger
  13. from src.utils.llm_invoker import LLMInvoker
  14. logger = get_logger(__name__)
  15. class EvaluateAgent(BaseLLMAgent):
  16. """评估Agent - 筛选和原视频最匹配的视频"""
  17. def __init__(
  18. self,
  19. name: str = "evaluate_agent",
  20. description: str = "评估Agent - 筛选和原视频最匹配的视频",
  21. model_provider: str = "google_genai",
  22. temperature: float = 0.3,
  23. max_tokens: int = 20480
  24. ):
  25. """
  26. 初始化评估Agent
  27. Args:
  28. name: Agent名称
  29. description: Agent描述
  30. model_provider: 模型提供商 ("openai" 或 "google_genai")
  31. temperature: 生成温度(较低,保持客观性)
  32. max_tokens: 最大token数
  33. """
  34. system_prompt = self._build_system_prompt()
  35. super().__init__(
  36. name=name,
  37. description=description,
  38. model_provider=model_provider,
  39. system_prompt=system_prompt,
  40. temperature=temperature,
  41. max_tokens=max_tokens
  42. )
  43. def _build_system_prompt(self) -> str:
  44. """构建系统提示词"""
  45. return """你是内容分析专家,擅长评估视频内容的相关性和匹配度。
  46. # 任务
  47. 根据原视频的解构内容(包括标题、灵感点、目的点、关键点、选题理解等),对候选视频列表进行相关性评估和筛选。
  48. # 评估维度
  49. 1. **内容主题匹配度**:候选视频的主题是否与原视频的主题一致或相关
  50. 2. **切入点相似度**:候选视频的切入角度是否与原视频相似
  51. 3. **受众重合度**:候选视频的目标受众是否与原视频重合
  52. 4. **表现形式相似度**:候选视频的表现形式(风格、结构等)是否与原视频相似
  53. # 评分标准
  54. - 相关性得分范围:0-100分
  55. - 90-100分:高度相关,主题、切入点、受众、表现形式都高度匹配
  56. - 70-89分:相关,在多个维度上匹配
  57. - 50-69分:中等相关,在某些维度上匹配
  58. - 30-49分:低相关,匹配度较低
  59. - 0-29分:不相关,基本不匹配
  60. # 输出要求
  61. 1. 对每个候选视频进行评分(0-100分)
  62. 2. 根据评分排序,默认选前50%进入候选
  63. 3. 输出结果保持与输入列表相同的顺序和字段,新增两个字段:
  64. - relevance_score: 相关性得分(0-100)
  65. - is_selected: 是否入选(true/false)
  66. """
  67. def process(self, state: Dict[str, Any], config=None) -> Dict[str, Any]:
  68. """处理状态 - 评估视频相关性并筛选
  69. Args:
  70. state: 状态字典,包含:
  71. - original_video_title: 原视频标题
  72. - original_video_content: 原视频解构内容(JSON格式)
  73. - search_result: 待评估的视频列表
  74. Returns:
  75. 更新后的状态,包含:
  76. - evaluate_result: 评估结果列表(每个视频包含原始字段 + relevance_score + is_selected)
  77. """
  78. if not self.is_initialized:
  79. self.initialize()
  80. logger.info("开始评估视频相关性")
  81. try:
  82. # 从state获取数据
  83. original_video_title = state.get("original_video_title", "")
  84. original_video_content = state.get("original_video_content", {})
  85. search_result = state.get("search_result", [])
  86. if not search_result:
  87. logger.warning("待评估的视频列表为空")
  88. return {
  89. "evaluate_result": []
  90. }
  91. if not original_video_title and not original_video_content:
  92. logger.warning("原视频信息为空,无法进行评估")
  93. return {
  94. "evaluate_result": []
  95. }
  96. # 构建评估提示词
  97. prompt = self._build_evaluate_prompt(
  98. original_video_title,
  99. original_video_content,
  100. search_result
  101. )
  102. messages = [
  103. {"role": "system", "content": self.system_prompt},
  104. {"role": "user", "content": prompt}
  105. ]
  106. # 调用LLM进行评估
  107. result = LLMInvoker.safe_invoke(
  108. self,
  109. "视频相关性评估",
  110. messages,
  111. fallback={"评估结果": []}
  112. )
  113. # 提取评估结果
  114. evaluate_result = result.get("评估结果", [])
  115. # 如果LLM返回的结果数量与输入不一致,进行修正
  116. if len(evaluate_result) != len(search_result):
  117. logger.warning(
  118. f"LLM返回结果数量({len(evaluate_result)})与输入数量({len(search_result)})不一致,"
  119. "将进行修正"
  120. )
  121. evaluate_result = self._fix_evaluate_result(search_result, evaluate_result)
  122. # 确保每个结果都有relevance_score和is_selected字段
  123. evaluate_result = self._ensure_evaluate_fields(search_result, evaluate_result)
  124. # 根据评分排序并标记前50%为入选
  125. evaluate_result = self._mark_selected_videos(evaluate_result)
  126. logger.info(f"评估完成,共评估{len(evaluate_result)}个视频")
  127. return {
  128. "evaluate_result": evaluate_result
  129. }
  130. except Exception as e:
  131. logger.error(f"视频评估失败: {e}", exc_info=True)
  132. # 返回原始列表,但添加默认的评分和选择状态
  133. search_result = state.get("search_result", [])
  134. return {
  135. "evaluate_result": [
  136. {**video, "relevance_score": 0, "is_selected": False}
  137. for video in search_result
  138. ]
  139. }
  140. def _build_evaluate_prompt(
  141. self,
  142. original_video_title: str,
  143. original_video_content: Dict[str, Any],
  144. search_result: List[Dict[str, Any]]
  145. ) -> str:
  146. """构建评估提示词"""
  147. # 格式化原视频内容
  148. content_str = json.dumps(original_video_content, ensure_ascii=False, indent=2)
  149. # 格式化候选视频列表
  150. candidates_text = ""
  151. for i, video in enumerate(search_result, 1):
  152. video_str = json.dumps(video, ensure_ascii=False, indent=2)
  153. candidates_text += f"\n## 候选视频 {i}\n{video_str}\n"
  154. prompt = f"""# 任务:评估视频相关性
  155. ## 原视频信息
  156. ### 标题
  157. {original_video_title}
  158. ### 解构内容
  159. {content_str}
  160. ## 候选视频列表
  161. {candidates_text}
  162. ## 评估要求
  163. 1. **对每个候选视频进行相关性评分**(0-100分)
  164. - 考虑内容主题匹配度、切入点相似度、受众重合度、表现形式相似度
  165. - 评分要客观、准确
  166. 2. **输出格式要求**
  167. - 保持与输入列表相同的顺序
  168. - 保留原始字段不变
  169. - 新增两个字段:
  170. - `relevance_score`: 相关性得分(整数,0-100)
  171. - `is_selected`: 是否入选(布尔值,暂时设为false,后续会根据评分排序后标记前50%)
  172. ## 输出格式(JSON)
  173. ```json
  174. {{
  175. "评估结果": [
  176. {{
  177. // 保留原始字段...
  178. "relevance_score": 85,
  179. "is_selected": false
  180. }}
  181. ]
  182. }}
  183. ```
  184. **重要**:
  185. - 输出结果的数量必须与输入列表的数量完全一致
  186. - 每个结果必须包含所有原始字段
  187. - 每个结果必须包含relevance_score和is_selected字段
  188. """
  189. return prompt
  190. def _fix_evaluate_result(
  191. self,
  192. original_list: List[Dict[str, Any]],
  193. llm_result: List[Dict[str, Any]]
  194. ) -> List[Dict[str, Any]]:
  195. """修正评估结果,确保数量一致"""
  196. fixed_result = []
  197. # 创建LLM结果的索引(通过某些唯一字段匹配)
  198. llm_result_map = {}
  199. for item in llm_result:
  200. # 尝试通过video_id或其他唯一字段匹配
  201. video_id = item.get("video_id") or item.get("id") or item.get("videoId")
  202. if video_id:
  203. llm_result_map[str(video_id)] = item
  204. # 遍历原始列表,匹配LLM结果
  205. for i, original in enumerate(original_list):
  206. video_id = original.get("video_id") or original.get("id") or original.get("videoId")
  207. if video_id and str(video_id) in llm_result_map:
  208. # 找到匹配的结果,合并字段
  209. matched = llm_result_map[str(video_id)]
  210. fixed_item = {**original, **matched}
  211. fixed_result.append(fixed_item)
  212. elif i < len(llm_result):
  213. # 按索引匹配
  214. matched = llm_result[i]
  215. fixed_item = {**original, **matched}
  216. fixed_result.append(fixed_item)
  217. else:
  218. # 没有匹配的结果,使用原始数据并添加默认评分
  219. fixed_item = {**original, "relevance_score": 0, "is_selected": False}
  220. fixed_result.append(fixed_item)
  221. return fixed_result
  222. def _ensure_evaluate_fields(
  223. self,
  224. original_list: List[Dict[str, Any]],
  225. evaluate_result: List[Dict[str, Any]]
  226. ) -> List[Dict[str, Any]]:
  227. """确保每个评估结果都有必要的字段"""
  228. ensured_result = []
  229. for i, original in enumerate(original_list):
  230. if i < len(evaluate_result):
  231. item = evaluate_result[i]
  232. # 合并原始字段和评估字段
  233. merged_item = {**original}
  234. # 确保有relevance_score
  235. if "relevance_score" in item:
  236. merged_item["relevance_score"] = item["relevance_score"]
  237. else:
  238. merged_item["relevance_score"] = 0
  239. # 确保有is_selected(暂时设为false,后续会重新标记)
  240. merged_item["is_selected"] = False
  241. ensured_result.append(merged_item)
  242. else:
  243. # 如果LLM结果不足,使用原始数据并添加默认值
  244. ensured_result.append({
  245. **original,
  246. "relevance_score": 0,
  247. "is_selected": False
  248. })
  249. return ensured_result
  250. def _mark_selected_videos(self, evaluate_result: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
  251. """根据评分排序并标记前50%为入选,然后恢复原始顺序"""
  252. if not evaluate_result:
  253. return evaluate_result
  254. # 为每个视频添加临时索引,以便后续恢复原始顺序
  255. indexed_result = [
  256. {**item, "_original_index": i}
  257. for i, item in enumerate(evaluate_result)
  258. ]
  259. # 按评分降序排序
  260. sorted_result = sorted(
  261. indexed_result,
  262. key=lambda x: x.get("relevance_score", 0),
  263. reverse=True
  264. )
  265. # 计算前50%的数量(向上取整)
  266. selected_count = max(1, (len(sorted_result) + 1) // 2)
  267. # 标记前50%为入选
  268. for i, item in enumerate(sorted_result):
  269. item["is_selected"] = (i < selected_count)
  270. # 恢复原始顺序
  271. sorted_result.sort(key=lambda x: x.get("_original_index", len(evaluate_result)))
  272. # 移除临时索引
  273. for item in sorted_result:
  274. item.pop("_original_index", None)
  275. return sorted_result
  276. def _build_messages(self, state: Dict[str, Any]) -> List[Dict[str, Any]]:
  277. """构建消息 - BaseLLMAgent要求实现(本Agent不使用此方法)"""
  278. return []
  279. def _update_state(self, state: Dict[str, Any], response: Any) -> Dict[str, Any]:
  280. """更新状态 - BaseLLMAgent要求实现(本Agent不使用此方法)"""
  281. return state