post_evaluator.py 9.1 KB


  1. """
  2. 帖子评估模块
  3. 功能:
  4. 1. 评估帖子是否包含面向创作的内容知识
  5. 2. 评估帖子与原始query的相关性
  6. 3. 支持多模态评估(文本+图片)
  7. 4. 支持批量并发评估
  8. """
  9. import asyncio
  10. import json
  11. import os
  12. from datetime import datetime
  13. from typing import Optional, Tuple
  14. from pydantic import BaseModel, Field
  15. import requests
  16. MODEL_NAME = "google/gemini-2.5-flash"
  17. MAX_IMAGES_PER_POST = 10 # 最大处理图片数
  18. MAX_CONCURRENT_EVALUATIONS = 5 # 最大并发评估数
  19. API_TIMEOUT = 120 # API 超时时间(秒)
  20. # ============================================================================
  21. # 数据模型
  22. # ============================================================================
  23. class EvaluationResult(BaseModel):
  24. """评估结果"""
  25. is_knowledge: bool = Field(..., description="是否是知识内容")
  26. knowledge_reason: str = Field(..., description="知识判定理由")
  27. relevance_score: float = Field(..., description="相关性得分(0-1)")
  28. relevance_reason: str = Field(..., description="相关性评分理由")
  29. # ============================================================================
  30. # Prompt 定义
  31. # ============================================================================
  32. EVALUATION_PROMPT_TEMPLATE = """
  33. 你是一名专业的内容分析专家,请对以下小红书帖子进行两项评估。
  34. ## 原始问题
  35. {original_query}
  36. ## 帖子信息
  37. **标题**: {title}
  38. **正文**: {body_text}
  39. **图片数量**: {num_images}张
  40. ## 评估任务
  41. ### 任务1: 知识判定
  42. 判断这个帖子是否包含"面向创作的内容知识"。
  43. **判断标准:**
  44. - **是知识内容**: 包含可复用的创作方法、技巧、工具使用、流程步骤、教程指导等实用知识
  45. - **非知识内容**: 纯个人分享、日常记录、商品推广、无实质性创作指导、纯情感表达等
  46. **要求**:
  47. - 综合分析标题、正文和图片(如有)
  48. - 重点关注是否有可学习、可复用的创作知识
  49. ### 任务2: 相关性评估
  50. 评估这个帖子与原始问题的相关性。
  51. **评估维度:**
  52. 1. 帖子内容能否解决原始问题?
  53. 2. 是否包含原始问题所需的知识/方法/工具?
  54. 3. 内容的针对性和完整性如何?
  55. **评分标准:**
  56. - 0.7-1.0: 高度相关 - 直接回答问题,内容针对性强
  57. - 0.4-0.7: 中度相关 - 部分相关,有一定参考价值
  58. - 0.0-0.4: 低度相关 - 相关性弱,参考价值有限
  59. ## 输出要求
  60. 必须返回一个JSON对象,包含以下字段:
  61. {{
  62. "is_knowledge": true/false,
  63. "knowledge_reason": "知识判定理由(100字以内,简明扼要)",
  64. "relevance_score": 0.85,
  65. "relevance_reason": "相关性分析(150字以内,说明相关程度和原因)"
  66. }}
  67. ## 重要提示
  68. - 两项评估相互独立:一个帖子可以是知识内容但与问题不相关,也可以相关但不是知识内容
  69. - 理由要具体,指出关键要素
  70. - 分数要准确,体现真实的相关程度
  71. """.strip()
  72. # ============================================================================
  73. # 核心评估函数
  74. # ============================================================================
  75. async def evaluate_post(
  76. post, # Post对象
  77. original_query: str,
  78. semaphore: Optional[asyncio.Semaphore] = None
  79. ) -> Optional[EvaluationResult]:
  80. """
  81. 评估单个帖子(知识判定 + 相关性评估)
  82. Args:
  83. post: Post对象(包含title, body_text, images, type等)
  84. original_query: 原始问题
  85. semaphore: 可选的信号量用于并发控制
  86. Returns:
  87. EvaluationResult对象,评估失败返回None
  88. """
  89. # 视频帖子跳过
  90. if post.type == "video":
  91. print(f" ⊗ 跳过视频帖子: {post.note_id}")
  92. return None
  93. # 准备图片列表(限制数量)
  94. image_urls = post.images[:MAX_IMAGES_PER_POST] if post.images else []
  95. image_count = len(image_urls)
  96. print(f" 🔍 开始评估帖子: {post.note_id} ({image_count}张图片)")
  97. try:
  98. # 如果有信号量,使用它进行并发控制
  99. if semaphore:
  100. async with semaphore:
  101. result = await _evaluate_post_internal(post, original_query, image_urls)
  102. else:
  103. result = await _evaluate_post_internal(post, original_query, image_urls)
  104. print(f" ✅ 评估完成: {post.note_id} | 知识:{result.is_knowledge} | 相关性:{result.relevance_score:.2f}")
  105. return result
  106. except Exception as e:
  107. print(f" ❌ 评估失败: {post.note_id} - {str(e)[:100]}")
  108. return None
  109. async def _evaluate_post_internal(post, original_query: str, image_urls: list[str]) -> EvaluationResult:
  110. """
  111. 实际执行评估的内部函数 - 直接调用OpenRouter API
  112. """
  113. # 获取API密钥
  114. api_key = os.getenv("OPENROUTER_API_KEY")
  115. if not api_key:
  116. raise ValueError("OPENROUTER_API_KEY environment variable not set")
  117. # 构建提示文本
  118. prompt_text = EVALUATION_PROMPT_TEMPLATE.format(
  119. original_query=original_query,
  120. title=post.title,
  121. body_text=post.body_text or "",
  122. num_images=len(image_urls)
  123. )
  124. # 构建消息内容:文本 + 多张图片
  125. content = [{"type": "text", "text": prompt_text}]
  126. for url in image_urls:
  127. content.append({
  128. "type": "image_url",
  129. "image_url": {"url": url}
  130. })
  131. # 构建API请求
  132. payload = {
  133. "model": MODEL_NAME,
  134. "messages": [{"role": "user", "content": content}],
  135. "response_format": {"type": "json_object"}
  136. }
  137. headers = {
  138. "Authorization": f"Bearer {api_key}",
  139. "Content-Type": "application/json"
  140. }
  141. # 在异步上下文中执行同步请求
  142. loop = asyncio.get_event_loop()
  143. response = await loop.run_in_executor(
  144. None,
  145. lambda: requests.post(
  146. "https://openrouter.ai/api/v1/chat/completions",
  147. headers=headers,
  148. json=payload,
  149. timeout=API_TIMEOUT
  150. )
  151. )
  152. # 检查响应
  153. if response.status_code != 200:
  154. raise Exception(f"OpenRouter API error: {response.status_code} - {response.text[:200]}")
  155. # 解析响应
  156. result = response.json()
  157. content_text = result["choices"][0]["message"]["content"]
  158. # 去除Markdown代码块标记(Gemini即使设置了json_object也会返回带```json标记的内容)
  159. content_text = content_text.strip()
  160. if content_text.startswith("```json"):
  161. content_text = content_text[7:]
  162. elif content_text.startswith("```"):
  163. content_text = content_text[3:]
  164. if content_text.endswith("```"):
  165. content_text = content_text[:-3]
  166. content_text = content_text.strip()
  167. # 解析JSON
  168. evaluation_data = json.loads(content_text)
  169. # 构建EvaluationResult
  170. evaluation = EvaluationResult(
  171. is_knowledge=evaluation_data.get("is_knowledge", False),
  172. knowledge_reason=evaluation_data.get("knowledge_reason", ""),
  173. relevance_score=evaluation_data.get("relevance_score", 0.0),
  174. relevance_reason=evaluation_data.get("relevance_reason", "")
  175. )
  176. return evaluation
  177. async def evaluate_all_posts(
  178. posts: list, # list[Post]
  179. original_query: str,
  180. max_concurrent: int = MAX_CONCURRENT_EVALUATIONS
  181. ) -> dict[str, EvaluationResult]:
  182. """
  183. 批量评估多个帖子(带并发控制)
  184. Args:
  185. posts: Post对象列表
  186. original_query: 原始问题
  187. max_concurrent: 最大并发数
  188. Returns:
  189. dict: {note_id: EvaluationResult}
  190. """
  191. semaphore = asyncio.Semaphore(max_concurrent)
  192. print(f"\n开始批量评估 {len(posts)} 个帖子(并发限制: {max_concurrent})...")
  193. tasks = [evaluate_post(post, original_query, semaphore) for post in posts]
  194. results = await asyncio.gather(*tasks)
  195. # 构建字典(过滤None)
  196. evaluation_dict = {}
  197. success_count = 0
  198. for i, evaluation in enumerate(results):
  199. if evaluation is not None:
  200. evaluation_dict[posts[i].note_id] = evaluation
  201. success_count += 1
  202. print(f"批量评估完成: 成功 {success_count}/{len(posts)}")
  203. return evaluation_dict
  204. def get_relevance_level(score: float) -> str:
  205. """
  206. 根据相关性分数获取分级标签
  207. Args:
  208. score: 相关性分数(0-1)
  209. Returns:
  210. 分级标签: "高度相关" | "中度相关" | "低度相关"
  211. """
  212. if score >= 0.7:
  213. return "高度相关"
  214. elif score >= 0.4:
  215. return "中度相关"
  216. else:
  217. return "低度相关"
  218. # ============================================================================
  219. # 辅助函数
  220. # ============================================================================
  221. def apply_evaluation_to_post(post, evaluation: EvaluationResult):
  222. """
  223. 将评估结果应用到Post对象
  224. Args:
  225. post: Post对象
  226. evaluation: EvaluationResult对象
  227. """
  228. post.is_knowledge = evaluation.is_knowledge
  229. post.knowledge_reason = evaluation.knowledge_reason
  230. post.relevance_score = evaluation.relevance_score
  231. post.relevance_level = get_relevance_level(evaluation.relevance_score)
  232. post.relevance_reason = evaluation.relevance_reason
  233. post.evaluation_time = datetime.now().isoformat()