| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294 |
- """
- 帖子评估模块
- 功能:
- 1. 评估帖子是否包含面向创作的内容知识
- 2. 评估帖子与原始query的相关性
- 3. 支持多模态评估(文本+图片)
- 4. 支持批量并发评估
- """
- import asyncio
- import json
- import os
- from datetime import datetime
- from typing import Optional, Tuple
- from pydantic import BaseModel, Field
- import requests
- MODEL_NAME = "google/gemini-2.5-flash"
- MAX_IMAGES_PER_POST = 10 # 最大处理图片数
- MAX_CONCURRENT_EVALUATIONS = 5 # 最大并发评估数
- API_TIMEOUT = 120 # API 超时时间(秒)
- # ============================================================================
- # 数据模型
- # ============================================================================
- class EvaluationResult(BaseModel):
- """评估结果"""
- is_knowledge: bool = Field(..., description="是否是知识内容")
- knowledge_reason: str = Field(..., description="知识判定理由")
- relevance_score: float = Field(..., description="相关性得分(0-1)")
- relevance_reason: str = Field(..., description="相关性评分理由")
- # ============================================================================
- # Prompt 定义
- # ============================================================================
- EVALUATION_PROMPT_TEMPLATE = """
- 你是一名专业的内容分析专家,请对以下小红书帖子进行两项评估。
- ## 原始问题
- {original_query}
- ## 帖子信息
- **标题**: {title}
- **正文**: {body_text}
- **图片数量**: {num_images}张
- ## 评估任务
- ### 任务1: 知识判定
- 判断这个帖子是否包含"面向创作的内容知识"。
- **判断标准:**
- - **是知识内容**: 包含可复用的创作方法、技巧、工具使用、流程步骤、教程指导等实用知识
- - **非知识内容**: 纯个人分享、日常记录、商品推广、无实质性创作指导、纯情感表达等
- **要求**:
- - 综合分析标题、正文和图片(如有)
- - 重点关注是否有可学习、可复用的创作知识
- ### 任务2: 相关性评估
- 评估这个帖子与原始问题的相关性。
- **评估维度:**
- 1. 帖子内容能否解决原始问题?
- 2. 是否包含原始问题所需的知识/方法/工具?
- 3. 内容的针对性和完整性如何?
- **评分标准:**
- - 0.7-1.0: 高度相关 - 直接回答问题,内容针对性强
- - 0.4-0.7: 中度相关 - 部分相关,有一定参考价值
- - 0.0-0.4: 低度相关 - 相关性弱,参考价值有限
- ## 输出要求
- 必须返回一个JSON对象,包含以下字段:
- {{
- "is_knowledge": true/false,
- "knowledge_reason": "知识判定理由(100字以内,简明扼要)",
- "relevance_score": 0.85,
- "relevance_reason": "相关性分析(150字以内,说明相关程度和原因)"
- }}
- ## 重要提示
- - 两项评估相互独立:一个帖子可以是知识内容但与问题不相关,也可以相关但不是知识内容
- - 理由要具体,指出关键要素
- - 分数要准确,体现真实的相关程度
- """.strip()
- # ============================================================================
- # 核心评估函数
- # ============================================================================
- async def evaluate_post(
- post, # Post对象
- original_query: str,
- semaphore: Optional[asyncio.Semaphore] = None
- ) -> Optional[EvaluationResult]:
- """
- 评估单个帖子(知识判定 + 相关性评估)
- Args:
- post: Post对象(包含title, body_text, images, type等)
- original_query: 原始问题
- semaphore: 可选的信号量用于并发控制
- Returns:
- EvaluationResult对象,评估失败返回None
- """
- # 视频帖子跳过
- if post.type == "video":
- print(f" ⊗ 跳过视频帖子: {post.note_id}")
- return None
- # 准备图片列表(限制数量)
- image_urls = post.images[:MAX_IMAGES_PER_POST] if post.images else []
- image_count = len(image_urls)
- print(f" 🔍 开始评估帖子: {post.note_id} ({image_count}张图片)")
- try:
- # 如果有信号量,使用它进行并发控制
- if semaphore:
- async with semaphore:
- result = await _evaluate_post_internal(post, original_query, image_urls)
- else:
- result = await _evaluate_post_internal(post, original_query, image_urls)
- print(f" ✅ 评估完成: {post.note_id} | 知识:{result.is_knowledge} | 相关性:{result.relevance_score:.2f}")
- return result
- except Exception as e:
- print(f" ❌ 评估失败: {post.note_id} - {str(e)[:100]}")
- return None
- async def _evaluate_post_internal(post, original_query: str, image_urls: list[str]) -> EvaluationResult:
- """
- 实际执行评估的内部函数 - 直接调用OpenRouter API
- """
- # 获取API密钥
- api_key = os.getenv("OPENROUTER_API_KEY")
- if not api_key:
- raise ValueError("OPENROUTER_API_KEY environment variable not set")
- # 构建提示文本
- prompt_text = EVALUATION_PROMPT_TEMPLATE.format(
- original_query=original_query,
- title=post.title,
- body_text=post.body_text or "",
- num_images=len(image_urls)
- )
- # 构建消息内容:文本 + 多张图片
- content = [{"type": "text", "text": prompt_text}]
- for url in image_urls:
- content.append({
- "type": "image_url",
- "image_url": {"url": url}
- })
- # 构建API请求
- payload = {
- "model": MODEL_NAME,
- "messages": [{"role": "user", "content": content}],
- "response_format": {"type": "json_object"}
- }
- headers = {
- "Authorization": f"Bearer {api_key}",
- "Content-Type": "application/json"
- }
- # 在异步上下文中执行同步请求
- loop = asyncio.get_event_loop()
- response = await loop.run_in_executor(
- None,
- lambda: requests.post(
- "https://openrouter.ai/api/v1/chat/completions",
- headers=headers,
- json=payload,
- timeout=API_TIMEOUT
- )
- )
- # 检查响应
- if response.status_code != 200:
- raise Exception(f"OpenRouter API error: {response.status_code} - {response.text[:200]}")
- # 解析响应
- result = response.json()
- content_text = result["choices"][0]["message"]["content"]
- # 去除Markdown代码块标记(Gemini即使设置了json_object也会返回带```json标记的内容)
- content_text = content_text.strip()
- if content_text.startswith("```json"):
- content_text = content_text[7:]
- elif content_text.startswith("```"):
- content_text = content_text[3:]
- if content_text.endswith("```"):
- content_text = content_text[:-3]
- content_text = content_text.strip()
- # 解析JSON
- evaluation_data = json.loads(content_text)
- # 构建EvaluationResult
- evaluation = EvaluationResult(
- is_knowledge=evaluation_data.get("is_knowledge", False),
- knowledge_reason=evaluation_data.get("knowledge_reason", ""),
- relevance_score=evaluation_data.get("relevance_score", 0.0),
- relevance_reason=evaluation_data.get("relevance_reason", "")
- )
- return evaluation
- async def evaluate_all_posts(
- posts: list, # list[Post]
- original_query: str,
- max_concurrent: int = MAX_CONCURRENT_EVALUATIONS
- ) -> dict[str, EvaluationResult]:
- """
- 批量评估多个帖子(带并发控制)
- Args:
- posts: Post对象列表
- original_query: 原始问题
- max_concurrent: 最大并发数
- Returns:
- dict: {note_id: EvaluationResult}
- """
- semaphore = asyncio.Semaphore(max_concurrent)
- print(f"\n开始批量评估 {len(posts)} 个帖子(并发限制: {max_concurrent})...")
- tasks = [evaluate_post(post, original_query, semaphore) for post in posts]
- results = await asyncio.gather(*tasks)
- # 构建字典(过滤None)
- evaluation_dict = {}
- success_count = 0
- for i, evaluation in enumerate(results):
- if evaluation is not None:
- evaluation_dict[posts[i].note_id] = evaluation
- success_count += 1
- print(f"批量评估完成: 成功 {success_count}/{len(posts)}")
- return evaluation_dict
- def get_relevance_level(score: float) -> str:
- """
- 根据相关性分数获取分级标签
- Args:
- score: 相关性分数(0-1)
- Returns:
- 分级标签: "高度相关" | "中度相关" | "低度相关"
- """
- if score >= 0.7:
- return "高度相关"
- elif score >= 0.4:
- return "中度相关"
- else:
- return "低度相关"
- # ============================================================================
- # 辅助函数
- # ============================================================================
- def apply_evaluation_to_post(post, evaluation: EvaluationResult):
- """
- 将评估结果应用到Post对象
- Args:
- post: Post对象
- evaluation: EvaluationResult对象
- """
- post.is_knowledge = evaluation.is_knowledge
- post.knowledge_reason = evaluation.knowledge_reason
- post.relevance_score = evaluation.relevance_score
- post.relevance_level = get_relevance_level(evaluation.relevance_score)
- post.relevance_reason = evaluation.relevance_reason
- post.evaluation_time = datetime.now().isoformat()
|