""" 帖子评估模块 功能: 1. 评估帖子是否包含面向创作的内容知识 2. 评估帖子与原始query的相关性 3. 支持多模态评估(文本+图片) 4. 支持批量并发评估 """ import asyncio import json import os from datetime import datetime from typing import Optional, Tuple from pydantic import BaseModel, Field import requests MODEL_NAME = "google/gemini-2.5-flash" MAX_IMAGES_PER_POST = 10 # 最大处理图片数 MAX_CONCURRENT_EVALUATIONS = 5 # 最大并发评估数 API_TIMEOUT = 120 # API 超时时间(秒) # ============================================================================ # 数据模型 # ============================================================================ class EvaluationResult(BaseModel): """评估结果""" is_knowledge: bool = Field(..., description="是否是知识内容") knowledge_reason: str = Field(..., description="知识判定理由") relevance_score: float = Field(..., description="相关性得分(0-1)") relevance_reason: str = Field(..., description="相关性评分理由") # ============================================================================ # Prompt 定义 # ============================================================================ EVALUATION_PROMPT_TEMPLATE = """ 你是一名专业的内容分析专家,请对以下小红书帖子进行两项评估。 ## 原始问题 {original_query} ## 帖子信息 **标题**: {title} **正文**: {body_text} **图片数量**: {num_images}张 ## 评估任务 ### 任务1: 知识判定 判断这个帖子是否包含"面向创作的内容知识"。 **判断标准:** - **是知识内容**: 包含可复用的创作方法、技巧、工具使用、流程步骤、教程指导等实用知识 - **非知识内容**: 纯个人分享、日常记录、商品推广、无实质性创作指导、纯情感表达等 **要求**: - 综合分析标题、正文和图片(如有) - 重点关注是否有可学习、可复用的创作知识 ### 任务2: 相关性评估 评估这个帖子与原始问题的相关性。 **评估维度:** 1. 帖子内容能否解决原始问题? 2. 是否包含原始问题所需的知识/方法/工具? 3. 内容的针对性和完整性如何? **评分标准:** - 0.7-1.0: 高度相关 - 直接回答问题,内容针对性强 - 0.4-0.7: 中度相关 - 部分相关,有一定参考价值 - 0.0-0.4: 低度相关 - 相关性弱,参考价值有限 ## 输出要求 必须返回一个JSON对象,包含以下字段: {{ "is_knowledge": true/false, "knowledge_reason": "知识判定理由(100字以内,简明扼要)", "relevance_score": 0.85, "relevance_reason": "相关性分析(150字以内,说明相关程度和原因)" }} ## 重要提示 - 两项评估相互独立:一个帖子可以是知识内容但与问题不相关,也可以相关但不是知识内容 - 理由要具体,指出关键要素 - 分数要准确,体现真实的相关程度 """.strip() # ============================================================================ # 核心评估函数 # ============================================================================ async def evaluate_post( post, # Post对象 original_query: str, semaphore: Optional[asyncio.Semaphore] = None ) -> Optional[EvaluationResult]: """ 评估单个帖子(知识判定 + 相关性评估) Args: post: Post对象(包含title, body_text, images, type等) original_query: 原始问题 semaphore: 可选的信号量用于并发控制 Returns: EvaluationResult对象,评估失败返回None """ # 视频帖子跳过 if post.type == "video": print(f" ⊗ 跳过视频帖子: {post.note_id}") return None # 准备图片列表(限制数量) image_urls = post.images[:MAX_IMAGES_PER_POST] if post.images else [] image_count = len(image_urls) print(f" 🔍 开始评估帖子: {post.note_id} ({image_count}张图片)") try: # 如果有信号量,使用它进行并发控制 if semaphore: async with semaphore: result = await _evaluate_post_internal(post, original_query, image_urls) else: result = await _evaluate_post_internal(post, original_query, image_urls) print(f" ✅ 评估完成: {post.note_id} | 知识:{result.is_knowledge} | 相关性:{result.relevance_score:.2f}") return result except Exception as e: print(f" ❌ 评估失败: {post.note_id} - {str(e)[:100]}") return None async def _evaluate_post_internal(post, original_query: str, image_urls: list[str]) -> EvaluationResult: """ 实际执行评估的内部函数 - 直接调用OpenRouter API """ # 获取API密钥 api_key = os.getenv("OPENROUTER_API_KEY") if not api_key: raise ValueError("OPENROUTER_API_KEY environment variable not set") # 构建提示文本 prompt_text = EVALUATION_PROMPT_TEMPLATE.format( original_query=original_query, title=post.title, body_text=post.body_text or "", num_images=len(image_urls) ) # 构建消息内容:文本 + 多张图片 content = [{"type": "text", "text": prompt_text}] for url in image_urls: content.append({ "type": "image_url", "image_url": {"url": url} }) # 构建API请求 payload = { "model": MODEL_NAME, "messages": [{"role": "user", "content": content}], "response_format": {"type": "json_object"} } headers = { "Authorization": f"Bearer {api_key}", "Content-Type": "application/json" } # 在异步上下文中执行同步请求 loop = asyncio.get_event_loop() response = await loop.run_in_executor( None, lambda: requests.post( "https://openrouter.ai/api/v1/chat/completions", headers=headers, json=payload, timeout=API_TIMEOUT ) ) # 检查响应 if response.status_code != 200: raise Exception(f"OpenRouter API error: {response.status_code} - {response.text[:200]}") # 解析响应 result = response.json() content_text = result["choices"][0]["message"]["content"] # 去除Markdown代码块标记(Gemini即使设置了json_object也会返回带```json标记的内容) content_text = content_text.strip() if content_text.startswith("```json"): content_text = content_text[7:] elif content_text.startswith("```"): content_text = content_text[3:] if content_text.endswith("```"): content_text = content_text[:-3] content_text = content_text.strip() # 解析JSON evaluation_data = json.loads(content_text) # 构建EvaluationResult evaluation = EvaluationResult( is_knowledge=evaluation_data.get("is_knowledge", False), knowledge_reason=evaluation_data.get("knowledge_reason", ""), relevance_score=evaluation_data.get("relevance_score", 0.0), relevance_reason=evaluation_data.get("relevance_reason", "") ) return evaluation async def evaluate_all_posts( posts: list, # list[Post] original_query: str, max_concurrent: int = MAX_CONCURRENT_EVALUATIONS ) -> dict[str, EvaluationResult]: """ 批量评估多个帖子(带并发控制) Args: posts: Post对象列表 original_query: 原始问题 max_concurrent: 最大并发数 Returns: dict: {note_id: EvaluationResult} """ semaphore = asyncio.Semaphore(max_concurrent) print(f"\n开始批量评估 {len(posts)} 个帖子(并发限制: {max_concurrent})...") tasks = [evaluate_post(post, original_query, semaphore) for post in posts] results = await asyncio.gather(*tasks) # 构建字典(过滤None) evaluation_dict = {} success_count = 0 for i, evaluation in enumerate(results): if evaluation is not None: evaluation_dict[posts[i].note_id] = evaluation success_count += 1 print(f"批量评估完成: 成功 {success_count}/{len(posts)}") return evaluation_dict def get_relevance_level(score: float) -> str: """ 根据相关性分数获取分级标签 Args: score: 相关性分数(0-1) Returns: 分级标签: "高度相关" | "中度相关" | "低度相关" """ if score >= 0.7: return "高度相关" elif score >= 0.4: return "中度相关" else: return "低度相关" # ============================================================================ # 辅助函数 # ============================================================================ def apply_evaluation_to_post(post, evaluation: EvaluationResult): """ 将评估结果应用到Post对象 Args: post: Post对象 evaluation: EvaluationResult对象 """ post.is_knowledge = evaluation.is_knowledge post.knowledge_reason = evaluation.knowledge_reason post.relevance_score = evaluation.relevance_score post.relevance_level = get_relevance_level(evaluation.relevance_score) post.relevance_reason = evaluation.relevance_reason post.evaluation_time = datetime.now().isoformat()