| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347 |
- #!/usr/bin/env python
- # -*- coding: utf-8 -*-
- """
- 评估Agent
- 功能: 从待评估的视频列表中筛选出和原视频最匹配的内容
- 核心任务: 根据原视频的解构内容,对每个候选视频进行相关性评分和筛选
- 特征: 使用LLM进行相关性评估,输出评分和是否入选
- """
- from typing import Any, Dict, List
- import json
- from src.components.agents.base import BaseLLMAgent
- from src.utils.logger import get_logger
- from src.utils.llm_invoker import LLMInvoker
- logger = get_logger(__name__)
- class EvaluateAgent(BaseLLMAgent):
- """评估Agent - 筛选和原视频最匹配的视频"""
- def __init__(
- self,
- name: str = "evaluate_agent",
- description: str = "评估Agent - 筛选和原视频最匹配的视频",
- model_provider: str = "google_genai",
- temperature: float = 0.3,
- max_tokens: int = 20480
- ):
- """
- 初始化评估Agent
- Args:
- name: Agent名称
- description: Agent描述
- model_provider: 模型提供商 ("openai" 或 "google_genai")
- temperature: 生成温度(较低,保持客观性)
- max_tokens: 最大token数
- """
- system_prompt = self._build_system_prompt()
- super().__init__(
- name=name,
- description=description,
- model_provider=model_provider,
- system_prompt=system_prompt,
- temperature=temperature,
- max_tokens=max_tokens
- )
- def _build_system_prompt(self) -> str:
- """构建系统提示词"""
- return """你是内容分析专家,擅长评估视频内容的相关性和匹配度。
- # 任务
- 根据原视频的解构内容(包括标题、灵感点、目的点、关键点、选题理解等),对候选视频列表进行相关性评估和筛选。
- # 评估维度
- 1. **内容主题匹配度**:候选视频的主题是否与原视频的主题一致或相关
- 2. **切入点相似度**:候选视频的切入角度是否与原视频相似
- 3. **受众重合度**:候选视频的目标受众是否与原视频重合
- 4. **表现形式相似度**:候选视频的表现形式(风格、结构等)是否与原视频相似
- # 评分标准
- - 相关性得分范围:0-100分
- - 90-100分:高度相关,主题、切入点、受众、表现形式都高度匹配
- - 70-89分:相关,在多个维度上匹配
- - 50-69分:中等相关,在某些维度上匹配
- - 30-49分:低相关,匹配度较低
- - 0-29分:不相关,基本不匹配
- # 输出要求
- 1. 对每个候选视频进行评分(0-100分)
- 2. 根据评分排序,默认选前50%进入候选
- 3. 输出结果保持与输入列表相同的顺序和字段,新增两个字段:
- - relevance_score: 相关性得分(0-100)
- - is_selected: 是否入选(true/false)
- """
- def process(self, state: Dict[str, Any], config=None) -> Dict[str, Any]:
- """处理状态 - 评估视频相关性并筛选
-
- Args:
- state: 状态字典,包含:
- - original_video_title: 原视频标题
- - original_video_content: 原视频解构内容(JSON格式)
- - search_result: 待评估的视频列表
-
- Returns:
- 更新后的状态,包含:
- - evaluate_result: 评估结果列表(每个视频包含原始字段 + relevance_score + is_selected)
- """
- if not self.is_initialized:
- self.initialize()
- logger.info("开始评估视频相关性")
- try:
- # 从state获取数据
- original_video_title = state.get("original_video_title", "")
- original_video_content = state.get("original_video_content", {})
- search_result = state.get("search_result", [])
- if not search_result:
- logger.warning("待评估的视频列表为空")
- return {
- "evaluate_result": []
- }
- if not original_video_title and not original_video_content:
- logger.warning("原视频信息为空,无法进行评估")
- return {
- "evaluate_result": []
- }
- # 构建评估提示词
- prompt = self._build_evaluate_prompt(
- original_video_title,
- original_video_content,
- search_result
- )
- messages = [
- {"role": "system", "content": self.system_prompt},
- {"role": "user", "content": prompt}
- ]
- # 调用LLM进行评估
- result = LLMInvoker.safe_invoke(
- self,
- "视频相关性评估",
- messages,
- fallback={"评估结果": []}
- )
- # 提取评估结果
- evaluate_result = result.get("评估结果", [])
-
- # 如果LLM返回的结果数量与输入不一致,进行修正
- if len(evaluate_result) != len(search_result):
- logger.warning(
- f"LLM返回结果数量({len(evaluate_result)})与输入数量({len(search_result)})不一致,"
- "将进行修正"
- )
- evaluate_result = self._fix_evaluate_result(search_result, evaluate_result)
- # 确保每个结果都有relevance_score和is_selected字段
- evaluate_result = self._ensure_evaluate_fields(search_result, evaluate_result)
- # 根据评分排序并标记前50%为入选
- evaluate_result = self._mark_selected_videos(evaluate_result)
- logger.info(f"评估完成,共评估{len(evaluate_result)}个视频")
- return {
- "evaluate_result": evaluate_result
- }
- except Exception as e:
- logger.error(f"视频评估失败: {e}", exc_info=True)
- # 返回原始列表,但添加默认的评分和选择状态
- search_result = state.get("search_result", [])
- return {
- "evaluate_result": [
- {**video, "relevance_score": 0, "is_selected": False}
- for video in search_result
- ]
- }
- def _build_evaluate_prompt(
- self,
- original_video_title: str,
- original_video_content: Dict[str, Any],
- search_result: List[Dict[str, Any]]
- ) -> str:
- """构建评估提示词"""
-
- # 格式化原视频内容
- content_str = json.dumps(original_video_content, ensure_ascii=False, indent=2)
-
- # 格式化候选视频列表
- candidates_text = ""
- for i, video in enumerate(search_result, 1):
- video_str = json.dumps(video, ensure_ascii=False, indent=2)
- candidates_text += f"\n## 候选视频 {i}\n{video_str}\n"
- prompt = f"""# 任务:评估视频相关性
- ## 原视频信息
- ### 标题
- {original_video_title}
- ### 解构内容
- {content_str}
- ## 候选视频列表
- {candidates_text}
- ## 评估要求
- 1. **对每个候选视频进行相关性评分**(0-100分)
- - 考虑内容主题匹配度、切入点相似度、受众重合度、表现形式相似度
- - 评分要客观、准确
- 2. **输出格式要求**
- - 保持与输入列表相同的顺序
- - 保留原始字段不变
- - 新增两个字段:
- - `relevance_score`: 相关性得分(整数,0-100)
- - `is_selected`: 是否入选(布尔值,暂时设为false,后续会根据评分排序后标记前50%)
- ## 输出格式(JSON)
- ```json
- {{
- "评估结果": [
- {{
- // 保留原始字段...
- "relevance_score": 85,
- "is_selected": false
- }}
- ]
- }}
- ```
- **重要**:
- - 输出结果的数量必须与输入列表的数量完全一致
- - 每个结果必须包含所有原始字段
- - 每个结果必须包含relevance_score和is_selected字段
- """
- return prompt
- def _fix_evaluate_result(
- self,
- original_list: List[Dict[str, Any]],
- llm_result: List[Dict[str, Any]]
- ) -> List[Dict[str, Any]]:
- """修正评估结果,确保数量一致"""
- fixed_result = []
-
- # 创建LLM结果的索引(通过某些唯一字段匹配)
- llm_result_map = {}
- for item in llm_result:
- # 尝试通过video_id或其他唯一字段匹配
- video_id = item.get("video_id") or item.get("id") or item.get("videoId")
- if video_id:
- llm_result_map[str(video_id)] = item
-
- # 遍历原始列表,匹配LLM结果
- for i, original in enumerate(original_list):
- video_id = original.get("video_id") or original.get("id") or original.get("videoId")
- if video_id and str(video_id) in llm_result_map:
- # 找到匹配的结果,合并字段
- matched = llm_result_map[str(video_id)]
- fixed_item = {**original, **matched}
- fixed_result.append(fixed_item)
- elif i < len(llm_result):
- # 按索引匹配
- matched = llm_result[i]
- fixed_item = {**original, **matched}
- fixed_result.append(fixed_item)
- else:
- # 没有匹配的结果,使用原始数据并添加默认评分
- fixed_item = {**original, "relevance_score": 0, "is_selected": False}
- fixed_result.append(fixed_item)
-
- return fixed_result
- def _ensure_evaluate_fields(
- self,
- original_list: List[Dict[str, Any]],
- evaluate_result: List[Dict[str, Any]]
- ) -> List[Dict[str, Any]]:
- """确保每个评估结果都有必要的字段"""
- ensured_result = []
-
- for i, original in enumerate(original_list):
- if i < len(evaluate_result):
- item = evaluate_result[i]
- # 合并原始字段和评估字段
- merged_item = {**original}
-
- # 确保有relevance_score
- if "relevance_score" in item:
- merged_item["relevance_score"] = item["relevance_score"]
- else:
- merged_item["relevance_score"] = 0
-
- # 确保有is_selected(暂时设为false,后续会重新标记)
- merged_item["is_selected"] = False
-
- ensured_result.append(merged_item)
- else:
- # 如果LLM结果不足,使用原始数据并添加默认值
- ensured_result.append({
- **original,
- "relevance_score": 0,
- "is_selected": False
- })
-
- return ensured_result
- def _mark_selected_videos(self, evaluate_result: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
- """根据评分排序并标记前50%为入选,然后恢复原始顺序"""
- if not evaluate_result:
- return evaluate_result
-
- # 为每个视频添加临时索引,以便后续恢复原始顺序
- indexed_result = [
- {**item, "_original_index": i}
- for i, item in enumerate(evaluate_result)
- ]
-
- # 按评分降序排序
- sorted_result = sorted(
- indexed_result,
- key=lambda x: x.get("relevance_score", 0),
- reverse=True
- )
-
- # 计算前50%的数量(向上取整)
- selected_count = max(1, (len(sorted_result) + 1) // 2)
-
- # 标记前50%为入选
- for i, item in enumerate(sorted_result):
- item["is_selected"] = (i < selected_count)
-
- # 恢复原始顺序
- sorted_result.sort(key=lambda x: x.get("_original_index", len(evaluate_result)))
-
- # 移除临时索引
- for item in sorted_result:
- item.pop("_original_index", None)
-
- return sorted_result
- def _build_messages(self, state: Dict[str, Any]) -> List[Dict[str, Any]]:
- """构建消息 - BaseLLMAgent要求实现(本Agent不使用此方法)"""
- return []
- def _update_state(self, state: Dict[str, Any], response: Any) -> Dict[str, Any]:
- """更新状态 - BaseLLMAgent要求实现(本Agent不使用此方法)"""
- return state
|