""" 帖子评估模块 V2 - 分离的知识评估和相关性评估 改进: 1. 知识评估: 6维度分层打分系统 (0-100分) 2. 相关性评估: 目的性(70%) + 品类(30%) 3. 并发评估: 两个API同时调用 4. 详细数据: 嵌套结构存储完整评估信息 """ import asyncio import json import os from datetime import datetime from typing import Optional from pydantic import BaseModel, Field import requests MODEL_NAME = "google/gemini-2.5-flash" MAX_IMAGES_PER_POST = 10 MAX_CONCURRENT_EVALUATIONS = 5 API_TIMEOUT = 120 # ============================================================================ # 数据模型 # ============================================================================ class KnowledgeEvaluation(BaseModel): """知识评估结果""" is_knowledge: bool = Field(..., description="是否是知识内容") quick_exclude: bool = Field(False, description="快速排除判定") dimension_scores: dict[str, int] = Field(default_factory=dict, description="6维度得分") weighted_score: float = Field(..., description="加权总分(0-100)") level: int = Field(..., description="满足度等级(1-5星)") evidence: list[str] = Field(default_factory=list, description="关键证据") issues: list[str] = Field(default_factory=list, description="存在问题") summary: str = Field(..., description="总结陈述") class RelevanceEvaluation(BaseModel): """相关性评估结果""" purpose_score: float = Field(..., description="目的性匹配得分(0-100)") category_score: float = Field(..., description="品类匹配得分(0-100)") total_score: float = Field(..., description="综合得分(0-100)") conclusion: str = Field(..., description="匹配结论") summary: str = Field(..., description="总结说明") # ============================================================================ # Prompt 定义 # ============================================================================ KNOWLEDGE_EVALUATION_PROMPT = """# 内容知识判定系统 ## 角色定义 你是一个多模态内容评估专家,专门判断社交媒体帖子是否属于"内容知识"类别。 ## 内容知识定义 **内容知识**是指对创作/制作有实际帮助的、具有通用性和可迁移性的知识,包括: - ✅ **原理型知识**: 讲解创作背后的原理、逻辑、方法论 - ✅ **体系型知识**: 提供完整的框架、流程、体系化方法 - ✅ **案例提炼型知识**: 通过多案例总结出通用规律和可复用方法 **非内容知识**(需严格排除): - ❌ **单案例展示**: 仅展示某一个作品/项目,无方法论提炼 - ❌ **单点细节**: 只讲某个具体细节的操作,缺乏系统性 - ❌ **纯元素展示**: 配色/字体/素材等单点展示,无创作方法 - ❌ **作品集型**: 纯粹的作品展示集合,无教学目的 --- ## 输入信息 - **标题**: {title} - **正文**: {body_text} - **图片数量**: {num_images}张 --- ## 判断流程 ### 第一步: 快速排除判断(任一项为"是"则直接判定为非内容知识) 1. 标题是否为纯展示型? (如:"我的XX作品"、"今天做了XX"、"分享一下") 2. 正文或者图片里内容是否缺乏方法/原理/步骤描述,仅是叙事或展示? 3. 图片是否全为作品展示,无原理型/体系型/知识提炼型内容元素? 4. 是否只讲一个具体项目的单次操作,无通用性? **输出**: "quick_exclude": true/false --- ### 第二步: 分层评估体系(满分10分) #### 维度1: 标题语义 (权重15%) - 10分: 明确包含"教程/方法/技巧/如何/原理/攻略/指南/X步"等教学词 - 7分: 包含"合集/总结/分享XX方法"等整理型词汇 - 4分: 描述性标题但暗示有方法论 - 0分: 纯展示型标题或单案例描述 #### 维度2: 封面首图 (权重60%) - 10分: 包含步骤编号/流程图/对比图/知识框架图 - 7分: 有明显的教学性文字标注或视觉引导 - 4分: 有多个知识点的视觉呈现 - 0分: 单一作品展示或纯美图 #### 维度3: 多图教学性 (权重60%) - 10分: 多图形成步骤/对比/原理说明体系,有标注/序号/箭头 - 7分: 多图展示不同方法/案例,有一定教学逻辑 - 4分: 多图但教学性不明显 - 0分: 多图仅为作品多角度展示 #### 维度4: 内容结构 (权重60%) - 10分: 有清晰的知识框架(原理→方法→案例,或问题→方案→总结) - 7分: 有分层次的内容组织(分章节/要点/步骤展示) - 4分: 有一定逻辑但不够系统 - 0分: 流水账式/单线性叙述 #### 维度5: 正文步骤性 (权重25%) - 10分: 有清晰的步骤序号和完整流程(≥3步) - 7分: 有步骤描述但不够系统化 - 4分: 有零散的方法提及 - 0分: 无步骤,纯叙事或展示 #### 维度6: 知识提炼度 (权重25%) - 10分: 有明确的总结/归纳/对比/框架化输出 - 7分: 有一定的知识整理 - 4分: 有零散总结 - 0分: 无任何知识提炼 --- ### 第三步: 综合计算 **加权总分计算**: ``` 加权分 = 维度1×0.15 + (维度2+维度3+维度4)×0.6/3 + (维度5+维度6)×0.25/2 最终得分(weighted_score) = 加权分 × 10 (转换为0-100分) ``` **满足度等级**: - 90-100分: 5星 ⭐⭐⭐⭐⭐ 优质内容知识 - 75-89分: 4星 ⭐⭐⭐⭐ 良好内容知识 - 60-74分: 3星 ⭐⭐⭐ 基础内容知识 - 45-59分: 2星 ⭐⭐ 弱内容知识倾向 - 0-44分: 1星 ⭐ 非内容知识 --- ## 输出格式 请严格按照以下JSON格式输出: {{ "is_knowledge": true/false, "quick_exclude": false, "dimension_scores": {{ "标题语义": 8, "封面首图": 9, "多图教学性": 10, "内容结构": 7, "正文步骤性": 9, "知识提炼度": 8 }}, "weighted_score": 85.5, "level": 4, "evidence": [ "证据1", "证据2" ], "issues": [ "问题1" ], "summary": "总结陈述(2-3句话)" }} ## 重要提示 - 严格按照评分标准打分 - 每个维度得分范围: 0-10分 - weighted_score必须是0-100分(维度加权分×10) - 图片层占60%权重,重点评估 - 综合得分>=60分才判定为知识内容 """ RELEVANCE_EVALUATION_PROMPT = """# 相关性评估系统 ## 角色定义 你是一位专业的多模态内容评估专家,擅长分析社交媒体UGC平台的帖子内容,能够精准判断帖子与用户搜索需求的匹配程度。 ## 任务说明 评估帖子与原始搜索需求的匹配程度。 --- ## 输入信息 **原始搜索需求:** {original_query} **多模态帖子内容:** - **标题:** {title} - **正文:** {body_text} - **图片数量:** {num_images}张 --- ## 评估维度 ### 1. 目的性匹配判断(权重:70%) **分析要点:** - 识别原始需求中的**核心动词/意图**(如:推荐、教程、评测、对比、寻找、了解等) - 判断帖子是否实质性地**解答或满足**了这个目的 - 评估帖子内容的**实用性和完整性** **评分标准(0-100分):** - 90-100分:完全解答需求,内容实用且完整 - 70-89分:基本解答需求,但信息不够全面或深入 - 40-69分:部分相关,但核心目的未充分满足 - 10-39分:仅有微弱关联,未真正解答需求 - 0-9分:完全不相关 --- ### 2. 品类匹配判断(权重:30%) **分析要点:** - 从**图片内容**中识别:产品类别、场景、属性特征 - 从**标题和正文**中提取:品类名称、产品类型、关键词 - 将提取的品类信息与**原始需求中的品类**进行对比 - 判断品类的**一致性、包含关系或相关性** **评分标准(0-100分):** - 90-100分:品类完全一致,精准匹配 - 70-89分:品类高度相关,属于同类或子类 - 40-69分:品类部分相关,有交叉但存在偏差 - 10-39分:品类关联较弱,仅边缘相关 - 0-9分:品类完全不匹配 --- ## 综合评分计算 **总分 = 目的性匹配得分 × 0.7 + 品类匹配得分 × 0.3** **匹配结论:** - 85-100分:高度匹配 - 65-84分:基本匹配 - 40-64分:部分匹配 - 0-39分:不匹配 --- ## 输出格式 请严格按照以下JSON格式输出: {{ "purpose_score": 85.0, "category_score": 90.0, "total_score": 86.5, "conclusion": "高度匹配", "summary": "总结说明(2-3句话)" }} ## 重要提示 - 目的性权重70%,是评估重点 - 综合考虑文本和图片信息 - 评分要客观公正,避免主观偏好 """ # ============================================================================ # 核心评估函数 # ============================================================================ async def evaluate_knowledge_v2( post, semaphore: Optional[asyncio.Semaphore] = None ) -> Optional[KnowledgeEvaluation]: """ 评估帖子的知识属性(新版6维度评估) """ if post.type == "video": return None image_urls = post.images[:MAX_IMAGES_PER_POST] if post.images else [] try: if semaphore: async with semaphore: result = await _evaluate_knowledge_internal(post, image_urls) else: result = await _evaluate_knowledge_internal(post, image_urls) return result except Exception as e: print(f" ❌ 知识评估失败: {post.note_id} - {str(e)[:100]}") return None async def _evaluate_knowledge_internal(post, image_urls: list[str]) -> KnowledgeEvaluation: """内部知识评估函数""" api_key = os.getenv("OPENROUTER_API_KEY") if not api_key: raise ValueError("OPENROUTER_API_KEY environment variable not set") prompt_text = KNOWLEDGE_EVALUATION_PROMPT.format( title=post.title, body_text=post.body_text or "", num_images=len(image_urls) ) content = [{"type": "text", "text": prompt_text}] for url in image_urls: content.append({"type": "image_url", "image_url": {"url": url}}) payload = { "model": MODEL_NAME, "messages": [{"role": "user", "content": content}], "response_format": {"type": "json_object"} } headers = { "Authorization": f"Bearer {api_key}", "Content-Type": "application/json" } loop = asyncio.get_event_loop() response = await loop.run_in_executor( None, lambda: requests.post( "https://openrouter.ai/api/v1/chat/completions", headers=headers, json=payload, timeout=API_TIMEOUT ) ) if response.status_code != 200: raise Exception(f"API error: {response.status_code} - {response.text[:200]}") result = response.json() content_text = result["choices"][0]["message"]["content"] # 清理JSON标记 content_text = content_text.strip() if content_text.startswith("```json"): content_text = content_text[7:] elif content_text.startswith("```"): content_text = content_text[3:] if content_text.endswith("```"): content_text = content_text[:-3] content_text = content_text.strip() data = json.loads(content_text) return KnowledgeEvaluation( is_knowledge=data.get("is_knowledge", False), quick_exclude=data.get("quick_exclude", False), dimension_scores=data.get("dimension_scores", {}), weighted_score=data.get("weighted_score", 0.0), level=data.get("level", 1), evidence=data.get("evidence", []), issues=data.get("issues", []), summary=data.get("summary", "") ) async def evaluate_relevance_v2( post, original_query: str, semaphore: Optional[asyncio.Semaphore] = None ) -> Optional[RelevanceEvaluation]: """ 评估帖子与原始query的相关性(新版双维度评估) """ if post.type == "video": return None image_urls = post.images[:MAX_IMAGES_PER_POST] if post.images else [] try: if semaphore: async with semaphore: result = await _evaluate_relevance_internal(post, original_query, image_urls) else: result = await _evaluate_relevance_internal(post, original_query, image_urls) return result except Exception as e: print(f" ❌ 相关性评估失败: {post.note_id} - {str(e)[:100]}") return None async def _evaluate_relevance_internal( post, original_query: str, image_urls: list[str] ) -> RelevanceEvaluation: """内部相关性评估函数""" api_key = os.getenv("OPENROUTER_API_KEY") if not api_key: raise ValueError("OPENROUTER_API_KEY environment variable not set") prompt_text = RELEVANCE_EVALUATION_PROMPT.format( original_query=original_query, title=post.title, body_text=post.body_text or "", num_images=len(image_urls) ) content = [{"type": "text", "text": prompt_text}] for url in image_urls: content.append({"type": "image_url", "image_url": {"url": url}}) payload = { "model": MODEL_NAME, "messages": [{"role": "user", "content": content}], "response_format": {"type": "json_object"} } headers = { "Authorization": f"Bearer {api_key}", "Content-Type": "application/json" } loop = asyncio.get_event_loop() response = await loop.run_in_executor( None, lambda: requests.post( "https://openrouter.ai/api/v1/chat/completions", headers=headers, json=payload, timeout=API_TIMEOUT ) ) if response.status_code != 200: raise Exception(f"API error: {response.status_code} - {response.text[:200]}") result = response.json() content_text = result["choices"][0]["message"]["content"] # 清理JSON标记 content_text = content_text.strip() if content_text.startswith("```json"): content_text = content_text[7:] elif content_text.startswith("```"): content_text = content_text[3:] if content_text.endswith("```"): content_text = content_text[:-3] content_text = content_text.strip() data = json.loads(content_text) return RelevanceEvaluation( purpose_score=data.get("purpose_score", 0.0), category_score=data.get("category_score", 0.0), total_score=data.get("total_score", 0.0), conclusion=data.get("conclusion", "不匹配"), summary=data.get("summary", "") ) async def evaluate_post_v2( post, original_query: str, semaphore: Optional[asyncio.Semaphore] = None ) -> tuple[Optional[KnowledgeEvaluation], Optional[RelevanceEvaluation]]: """ 串行评估帖子(先知识,分数>40再评估相关性) Returns: (KnowledgeEvaluation, RelevanceEvaluation) 或 (Knowledge, None) 或 (None, None) """ if post.type == "video": print(f" ⊗ 跳过视频帖子: {post.note_id}") return None, None print(f" 🔍 开始评估帖子: {post.note_id}") # 第一步:先评估知识 knowledge_eval = await evaluate_knowledge_v2(post, semaphore) if not knowledge_eval: print(f" ⚠️ 知识评估失败: {post.note_id}") return None, None # 第二步:只有知识分数>40才评估相关性 relevance_eval = None if knowledge_eval.weighted_score > 40: print(f" ✅ 知识:{knowledge_eval.weighted_score:.1f}分({knowledge_eval.level}⭐) - 继续评估相关性") relevance_eval = await evaluate_relevance_v2(post, original_query, semaphore) if relevance_eval: print(f" ✅ 评估完成 | 相关性:{relevance_eval.total_score:.1f}分({relevance_eval.conclusion})") else: print(f" ⚠️ 相关性评估失败") else: print(f" ⊗ 知识:{knowledge_eval.weighted_score:.1f}分({knowledge_eval.level}⭐) - 分数≤40,跳过相关性评估") return knowledge_eval, relevance_eval def apply_evaluation_v2_to_post( post, knowledge_eval: Optional[KnowledgeEvaluation], relevance_eval: Optional[RelevanceEvaluation] ): """ 将V2评估结果应用到Post对象 """ # 知识评估 if knowledge_eval: post.is_knowledge = knowledge_eval.is_knowledge post.knowledge_score = knowledge_eval.weighted_score post.knowledge_level = knowledge_eval.level post.knowledge_reason = knowledge_eval.summary[:100] # 简短版本 # 详细信息 post.knowledge_evaluation = { "quick_exclude": knowledge_eval.quick_exclude, "dimension_scores": knowledge_eval.dimension_scores, "weighted_score": knowledge_eval.weighted_score, "level": knowledge_eval.level, "level_text": "⭐" * knowledge_eval.level, "evidence": knowledge_eval.evidence, "issues": knowledge_eval.issues, "summary": knowledge_eval.summary } # 相关性评估 if relevance_eval: post.relevance_score = relevance_eval.total_score post.relevance_conclusion = relevance_eval.conclusion post.relevance_reason = relevance_eval.summary[:150] # 简短版本 # 设置相关性级别(兼容旧系统) if relevance_eval.total_score >= 85: post.relevance_level = "高度相关" elif relevance_eval.total_score >= 65: post.relevance_level = "中度相关" else: post.relevance_level = "低度相关" # 详细信息 post.relevance_evaluation = { "purpose_score": relevance_eval.purpose_score, "category_score": relevance_eval.category_score, "total_score": relevance_eval.total_score, "conclusion": relevance_eval.conclusion, "summary": relevance_eval.summary } # 设置评估时间和版本 post.evaluation_time = datetime.now().isoformat() post.evaluator_version = "v2.0" async def batch_evaluate_posts_v2( posts: list, original_query: str, max_concurrent: int = MAX_CONCURRENT_EVALUATIONS ) -> int: """ 批量评估多个帖子(V2版本) Returns: 成功评估的帖子数量 """ semaphore = asyncio.Semaphore(max_concurrent) print(f"\n📊 开始批量评估 {len(posts)} 个帖子(并发限制: {max_concurrent})...") tasks = [evaluate_post_v2(post, original_query, semaphore) for post in posts] results = await asyncio.gather(*tasks) success_count = 0 for i, (knowledge_eval, relevance_eval) in enumerate(results): if knowledge_eval and relevance_eval: apply_evaluation_v2_to_post(posts[i], knowledge_eval, relevance_eval) success_count += 1 print(f"✅ 批量评估完成: 成功 {success_count}/{len(posts)}") return success_count