刘立冬 před 1 týdnem
rodič
revize
9066d771f2
2 změnil soubory, kde provedl 253 přidání a 72 odebrání
  1. 219 1
      post_evaluator_v3.py
  2. 34 71
      test_evaluation_v3.py

+ 219 - 1
post_evaluator_v3.py

@@ -22,7 +22,7 @@ import requests
 
 MODEL_NAME = "google/gemini-2.5-flash"
 MAX_IMAGES_PER_POST = 10
-MAX_CONCURRENT_EVALUATIONS = 5
+MAX_CONCURRENT_EVALUATIONS = 15  # 提升并发数以加快评估速度
 API_TIMEOUT = 120
 
 # 缓存配置
@@ -1864,3 +1864,221 @@ async def batch_evaluate_posts_v3(
     print(f"✅ 批量评估完成: {success_count}/{len(posts)} 帖子已评估")
 
     return success_count
+
+
+# ============================================================================
+# 两阶段评估:粗评+细评
+# ============================================================================
+
+async def _prepare_media_content_lite(post) -> tuple[list[str], None, str]:
+    """
+    轻量级媒体准备(粗评专用)
+    只使用封面图,不处理视频,正文截断
+
+    Args:
+        post: Post对象
+
+    Returns:
+        (image_urls, None, "video/mp4")  # 不返回视频
+    """
+    # 只取第一张图作为封面
+    image_urls = [post.images[0]] if post.images else []
+
+    # 不处理视频,直接返回None
+    return image_urls, None, "video/mp4"
+
+
+async def evaluate_post_quick(
+    post,
+    original_query: str,
+    semaphore: Optional[asyncio.Semaphore] = None
+) -> tuple:
+    """
+    粗评函数:只评估Prompt1+Prompt2,使用简化数据
+
+    用于快速过滤非知识和非内容知识的帖子
+
+    Args:
+        post: Post对象
+        original_query: 原始搜索query(暂未使用,为接口一致性保留)
+        semaphore: 并发控制信号量
+
+    Returns:
+        (knowledge_eval, content_eval, should_proceed_to_detail)
+        - knowledge_eval: Prompt1评估结果
+        - content_eval: Prompt2评估结果(如果Prompt1通过)
+        - should_proceed_to_detail: 是否需要进入细评
+    """
+    # 备份原始数据
+    original_body_text = post.body_text
+    original_prepare_func = globals()['_prepare_media_content']
+
+    try:
+        # 临时替换为轻量级媒体准备函数
+        globals()['_prepare_media_content'] = _prepare_media_content_lite
+
+        # 正文截断到500字符
+        if post.body_text:
+            post.body_text = post.body_text[:500]
+
+        # Step 1: 判断是知识
+        knowledge_eval = await evaluate_is_knowledge(post, semaphore)
+
+        if not knowledge_eval or not knowledge_eval.is_knowledge:
+            # 非知识,不需要细评
+            return (knowledge_eval, None, False)
+
+        # Step 2: 判断是否是内容知识
+        content_eval = await evaluate_is_content_knowledge(post, semaphore)
+
+        if not content_eval or not content_eval.is_content_knowledge:
+            # 非内容知识,不需要细评
+            return (knowledge_eval, content_eval, False)
+
+        # 通过粗评,需要细评
+        return (knowledge_eval, content_eval, True)
+
+    finally:
+        # 恢复原始数据和函数
+        post.body_text = original_body_text
+        globals()['_prepare_media_content'] = original_prepare_func
+
+
+async def two_stage_batch_evaluate(
+    posts: list,
+    original_query: str,
+    quick_concurrent: int = 15,
+    detail_concurrent: int = 15
+) -> int:
+    """
+    两阶段批量评估:粗评过滤 + 细评打分
+
+    Args:
+        posts: Post对象列表
+        original_query: 原始搜索query
+        quick_concurrent: 粗评并发数
+        detail_concurrent: 细评并发数
+
+    Returns:
+        成功评估的帖子数量
+    """
+    import time
+
+    # 记录总开始时间
+    total_start_time = time.time()
+
+    print(f"\n{'='*80}")
+    print(f"🚀 两阶段评估模式")
+    print(f"{'='*80}\n")
+
+    # ========== 阶段1: 粗评(快速过滤) ==========
+    print(f"📊 阶段1/2: 粗评(快速过滤)")
+    print(f"  - 数据: 标题 + 正文(前500字) + 封面图")
+    print(f"  - 评估: Prompt1 + Prompt2")
+    print(f"  - 并发: {quick_concurrent}")
+    print(f"  - 帖子数: {len(posts)}\n")
+
+    # 粗评开始时间
+    quick_start_time = time.time()
+    print("⏳ 正在执行粗评...")
+
+    quick_semaphore = asyncio.Semaphore(quick_concurrent)
+    quick_tasks = [evaluate_post_quick(post, original_query, quick_semaphore) for post in posts]
+    quick_results = await asyncio.gather(*quick_tasks)
+
+    # 粗评结束时间
+    quick_end_time = time.time()
+    quick_duration = quick_end_time - quick_start_time
+
+    # 过滤结果
+    print("\n📋 粗评结果:\n")
+    posts_for_detail = []
+    quick_stats = {"淘汰": 0, "通过": 0}
+
+    for i, (knowledge_eval, content_eval, should_detail) in enumerate(quick_results):
+        post = posts[i]
+        progress_percent = (i + 1) / len(posts) * 100
+        if should_detail:
+            posts_for_detail.append(post)
+            quick_stats["通过"] += 1
+            print(f"  ✅ [{i+1:3d}/{len(posts)} {progress_percent:5.1f}%] {post.title[:45]:<45} - 通过粗评")
+        else:
+            quick_stats["淘汰"] += 1
+            if not knowledge_eval or not knowledge_eval.is_knowledge:
+                reason = "非知识"
+            else:
+                reason = f"非内容知识(得分:{content_eval.final_score if content_eval else 0})"
+            print(f"  ❌ [{i+1:3d}/{len(posts)} {progress_percent:5.1f}%] {post.title[:45]:<45} - {reason}")
+
+            # 保存粗评结果(标记为淘汰)
+            apply_evaluation_v3_to_post(
+                post, knowledge_eval, content_eval, None, None, None, "粗评淘汰"
+            )
+
+    print(f"\n📈 粗评统计:")
+    print(f"  通过: {quick_stats['通过']:3d}/{len(posts)} ({quick_stats['通过']/len(posts)*100:5.1f}%)")
+    print(f"  淘汰: {quick_stats['淘汰']:3d}/{len(posts)} ({quick_stats['淘汰']/len(posts)*100:5.1f}%)")
+    print(f"  ⏱️  耗时: {quick_duration:.2f}秒 (平均 {quick_duration/len(posts):.2f}秒/帖)")
+
+    if not posts_for_detail:
+        total_duration = time.time() - total_start_time
+        print(f"\n⚠️  没有帖子通过粗评,评估结束")
+        print(f"⏱️  总耗时: {total_duration:.2f}秒")
+        return len(posts) - quick_stats["淘汰"]
+
+    # ========== 阶段2: 细评(完整评估) ==========
+    print(f"\n{'='*80}")
+    print(f"📊 阶段2/2: 细评(完整评估)")
+    print(f"  - 数据: 全部图片(最多10张) + 完整正文 + 视频")
+    print(f"  - 评估: 完整4步流程(Prompt1-4)")
+    print(f"  - 并发: {detail_concurrent}")
+    print(f"  - 帖子数: {len(posts_for_detail)}\n")
+
+    # 细评开始时间
+    detail_start_time = time.time()
+    print("⏳ 正在执行细评...")
+
+    detail_semaphore = asyncio.Semaphore(detail_concurrent)
+    detail_tasks = [evaluate_post_v3(post, original_query, detail_semaphore)
+                    for post in posts_for_detail]
+    detail_results = await asyncio.gather(*detail_tasks)
+
+    # 细评结束时间
+    detail_end_time = time.time()
+    detail_duration = detail_end_time - detail_start_time
+
+    print("\n📋 细评结果:\n")
+    success_count = 0
+    for i, result in enumerate(detail_results):
+        knowledge_eval, content_eval, purpose_eval, category_eval, final_score, match_level = result
+        progress_percent = (i + 1) / len(posts_for_detail) * 100
+
+        if knowledge_eval:
+            apply_evaluation_v3_to_post(
+                posts_for_detail[i],
+                knowledge_eval, content_eval, purpose_eval, category_eval,
+                final_score, match_level
+            )
+            success_count += 1
+
+            post = posts_for_detail[i]
+            score_str = f"{final_score:.1f}" if final_score is not None else "N/A"
+            level_str = match_level if match_level else "未完成"
+            print(f"  ✅ [{i+1:3d}/{len(posts_for_detail)} {progress_percent:5.1f}%] {post.title[:40]:<40} - {score_str}分 ({level_str})")
+
+    # 计算总耗时
+    total_duration = time.time() - total_start_time
+
+    print(f"\n{'='*80}")
+    print(f"✅ 两阶段评估完成:")
+    print(f"  粗评通过: {len(posts_for_detail):3d}/{len(posts)}")
+    print(f"  细评成功: {success_count:3d}/{len(posts_for_detail)}")
+    print(f"  最终有效: {success_count:3d}/{len(posts)} ({success_count/len(posts)*100:5.1f}%)")
+    print(f"\n⏱️  耗时统计:")
+    print(f"  粗评阶段: {quick_duration:.2f}秒 ({len(posts)}个帖子, 平均 {quick_duration/len(posts):.2f}秒/帖)")
+    print(f"  细评阶段: {detail_duration:.2f}秒 ({len(posts_for_detail)}个帖子, 平均 {detail_duration/len(posts_for_detail):.2f}秒/帖)" if len(posts_for_detail) > 0 else f"  细评阶段: {detail_duration:.2f}秒")
+    print(f"  总耗时: {total_duration:.2f}秒 ({total_duration/60:.2f}分钟)")
+    print(f"  平均速度: {total_duration/len(posts):.2f}秒/帖")
+    print(f"{'='*80}\n")
+
+    return len(posts) - quick_stats["淘汰"] + success_count

+ 34 - 71
test_evaluation_v3.py

@@ -12,7 +12,7 @@ from collections import defaultdict
 
 # 导入必要的模块
 from knowledge_search_traverse import Post
-from post_evaluator_v3 import evaluate_post_v3, apply_evaluation_v3_to_post
+from post_evaluator_v3 import evaluate_post_v3, apply_evaluation_v3_to_post, two_stage_batch_evaluate
 
 
 async def test_evaluation_v3(run_context_path: str, max_posts: int = 10):
@@ -79,98 +79,61 @@ async def test_evaluation_v3(run_context_path: str, max_posts: int = 10):
         )
         posts.append((round_idx, search_idx, post_id, post))
 
-    # 批量评估
-    print(f"🚀 开始并行评估 (最多{len(posts)}个任务,并发限制: 5)...\n")
+    # 提取纯post列表用于两阶段评估
+    post_list = [post for _, _, _, post in posts]
 
-    semaphore = asyncio.Semaphore(5)
-    tasks = []
+    # 使用两阶段批量评估
+    await two_stage_batch_evaluate(post_list, original_query, quick_concurrent=15, detail_concurrent=15)
 
-    # 1. 创建所有任务
-    for round_idx, search_idx, post_id, post in posts:
-        task = evaluate_post_v3(post, original_query, semaphore)
-        tasks.append((round_idx, search_idx, post_id, post, task))
-
-    # 2. 并行执行所有任务
-    task_coroutines = [task for _, _, _, _, task in tasks]
-    all_eval_results = await asyncio.gather(*task_coroutines)
-
-    # 3. 处理结果
+    # 处理评估结果(两阶段评估已将结果应用到post对象)
     results = []
     detailed_reports = []  # 收集详细评估报告
-    print(f"📊 处理评估结果...\n")
-    for i, ((round_idx, search_idx, post_id, post, _), eval_result) in enumerate(zip(tasks, all_eval_results), 1):
-        knowledge_eval, content_eval, purpose_eval, category_eval, final_score, match_level = eval_result
 
-        print(f"  [{i}/{len(tasks)}] {post.note_id} - {post.title[:40]}", end="")
+    print(f"📊 收集评估报告...\n")
+    for i, (round_idx, search_idx, post_id, post) in enumerate(posts, 1):
+        # 从post对象的嵌套字段中提取评估结果
+        knowledge_eval = post.knowledge_evaluation
+        content_eval = post.content_knowledge_evaluation
+        purpose_eval = post.purpose_evaluation
+        category_eval = post.category_evaluation
+
+        # 收集详细报告
         if knowledge_eval:
-            if final_score is not None:
-                print(f" → {match_level} ({final_score:.1f}分)")
-            elif content_eval and not content_eval.is_content_knowledge:
-                print(f" → 非内容知识")
-            elif knowledge_eval and not knowledge_eval.is_knowledge:
-                print(f" → 非知识")
-            else:
-                print(f" → 评估未完成")
-
-            # 打印详细判断原因
-            print(f"      📝 知识评估: {knowledge_eval.conclusion if knowledge_eval.conclusion else '无'}")
-            if content_eval and content_eval.is_content_knowledge:
-                print(f"      📚 内容知识: {content_eval.summary[:80] if content_eval.summary else '无'}...")
-            if purpose_eval:
-                print(f"      🎯 目的匹配: {purpose_eval.core_basis[:80] if purpose_eval.core_basis else '无'}...")
-            if category_eval:
-                print(f"      🏷️  品类匹配: {category_eval.core_basis[:80] if category_eval.core_basis else '无'}...")
-            print()
-
-            # 收集详细报告
             detailed_report = {
                 'post_index': i,
                 'note_id': post.note_id,
                 'title': post.title,
-                'final_score': final_score,
-                'match_level': match_level,
-                'is_knowledge': knowledge_eval.is_knowledge if knowledge_eval else None,
-                'is_content_knowledge': content_eval.is_content_knowledge if content_eval else None,
-                'knowledge_score': content_eval.final_score if content_eval else None,
+                'final_score': post.final_score,
+                'match_level': post.match_level,
+                'is_knowledge': post.is_knowledge,
+                'is_content_knowledge': post.is_content_knowledge,
+                'knowledge_score': post.knowledge_score,
                 'evaluations': {
                     'knowledge': {
-                        'conclusion': knowledge_eval.conclusion if knowledge_eval else None,
-                        'core_evidence': knowledge_eval.core_evidence if knowledge_eval and hasattr(knowledge_eval, 'core_evidence') else None,
-                        'issues': knowledge_eval.issues if knowledge_eval and hasattr(knowledge_eval, 'issues') else None
+                        'conclusion': knowledge_eval.get('conclusion') if isinstance(knowledge_eval, dict) else getattr(knowledge_eval, 'conclusion', None),
+                        'core_evidence': knowledge_eval.get('core_evidence') if isinstance(knowledge_eval, dict) else getattr(knowledge_eval, 'core_evidence', None),
+                        'issues': knowledge_eval.get('issues') if isinstance(knowledge_eval, dict) else getattr(knowledge_eval, 'issues', None)
                     },
                     'content_knowledge': {
-                        'summary': content_eval.summary if content_eval else None,
-                        'final_score': content_eval.final_score if content_eval else None,
-                        'level': content_eval.level if content_eval else None
-                    } if content_eval and content_eval.is_content_knowledge else None,
+                        'summary': content_eval.get('summary') if isinstance(content_eval, dict) else getattr(content_eval, 'summary', None),
+                        'final_score': content_eval.get('final_score') if isinstance(content_eval, dict) else getattr(content_eval, 'final_score', None),
+                        'level': content_eval.get('level') if isinstance(content_eval, dict) else getattr(content_eval, 'level', None)
+                    } if content_eval and post.is_content_knowledge else None,
                     'purpose': {
-                        'score': purpose_eval.purpose_score if purpose_eval else None,
-                        'core_motivation': purpose_eval.core_motivation if purpose_eval else None,
-                        'core_basis': purpose_eval.core_basis if purpose_eval else None,
-                        'match_level': purpose_eval.match_level if purpose_eval else None
+                        'score': purpose_eval.get('purpose_score') if isinstance(purpose_eval, dict) else getattr(purpose_eval, 'purpose_score', None),
+                        'core_motivation': purpose_eval.get('core_motivation') if isinstance(purpose_eval, dict) else getattr(purpose_eval, 'core_motivation', None),
+                        'core_basis': purpose_eval.get('core_basis') if isinstance(purpose_eval, dict) else getattr(purpose_eval, 'core_basis', None),
+                        'match_level': purpose_eval.get('match_level') if isinstance(purpose_eval, dict) else getattr(purpose_eval, 'match_level', None)
                     } if purpose_eval else None,
                     'category': {
-                        'score': category_eval.category_score if category_eval else None,
-                        'core_basis': category_eval.core_basis if category_eval else None,
-                        'match_level': category_eval.match_level if category_eval else None
+                        'score': category_eval.get('category_score') if isinstance(category_eval, dict) else getattr(category_eval, 'category_score', None),
+                        'core_basis': category_eval.get('core_basis') if isinstance(category_eval, dict) else getattr(category_eval, 'core_basis', None),
+                        'match_level': category_eval.get('match_level') if isinstance(category_eval, dict) else getattr(category_eval, 'match_level', None)
                     } if category_eval else None
                 }
             }
             detailed_reports.append(detailed_report)
-
-            # 应用评估结果
-            apply_evaluation_v3_to_post(
-                post,
-                knowledge_eval,
-                content_eval,
-                purpose_eval,
-                category_eval,
-                final_score,
-                match_level
-            )
             results.append((round_idx, search_idx, post_id, post))
-        else:
-            print(f" → ❌ 评估失败\n")
 
     print(f"\n✅ 评估完成: {len(results)}/{len(posts)} 成功\n")