yangxiaohui il y a 1 mois
Parent
commit
a899fb95e2
1 fichiers modifiés avec 126 ajouts et 18 suppressions
  1. 126 18
      sug_v6_3_with_annotation.py

+ 126 - 18
sug_v6_3_with_annotation.py

@@ -262,19 +262,128 @@ def generate_query_combinations(keywords: list[str], max_combination_size: int)
 
         result[f"{size}-word"] = queries
 
-        print(f"\n{size}词组合:{len(queries)} 个")
-        if len(queries) <= 10:
-            for q in queries:
-                print(f"  - {q}")
-        else:
-            print(f"  - {queries[0]}")
-            print(f"  - {queries[1]}")
-            print(f"  ...")
-            print(f"  - {queries[-1]}")
+        print(f"\n{size}词组合:共 {len(queries)} 个")
+        # 打印所有query,带序号
+        for i, q in enumerate(queries, 1):
+            print(f"  {i}. {q}")
 
     return result
 
 
+async def fetch_and_evaluate_streaming(
+    queries: list[str],
+    original_question: str,
+    question_annotation: str,
+    context: RunContext
+) -> tuple[list[dict], list[dict]]:
+    """
+    流式处理:边获取推荐词边评估
+
+    Returns:
+        (sug_results, evaluations)
+    """
+    xiaohongshu_api = XiaohongshuSearchRecommendations()
+
+    # 创建信号量
+    api_semaphore = asyncio.Semaphore(API_CONCURRENCY_LIMIT)
+    model_semaphore = asyncio.Semaphore(MODEL_CONCURRENCY_LIMIT)
+
+    # 结果收集
+    sug_results = []
+    all_evaluations = []
+
+    # 统计
+    total_queries = len(queries)
+    completed_queries = 0
+    total_sugs = 0
+    completed_evals = 0
+
+    async def get_and_evaluate_single_query(query: str):
+        nonlocal completed_queries, total_sugs, completed_evals
+
+        # 步骤1:获取推荐词
+        async with api_semaphore:
+            suggestions = xiaohongshu_api.get_recommendations(keyword=query)
+            sug_count = len(suggestions) if suggestions else 0
+
+            completed_queries += 1
+            total_sugs += sug_count
+
+            print(f"  [{completed_queries}/{total_queries}] {query} → {sug_count} 个推荐词")
+
+            sug_result = {
+                "query": query,
+                "suggestions": suggestions or [],
+                "timestamp": datetime.now().isoformat()
+            }
+            sug_results.append(sug_result)
+
+        # 步骤2:立即评估这些推荐词
+        if suggestions:
+            eval_tasks = []
+            for sug in suggestions:
+                eval_tasks.append(evaluate_single_sug_with_semaphore(
+                    query, sug, original_question, question_annotation, model_semaphore
+                ))
+
+            if eval_tasks:
+                evals = await asyncio.gather(*eval_tasks)
+                all_evaluations.extend(evals)
+                completed_evals += len(evals)
+                print(f"      ↳ 已评估 {len(evals)} 个,累计评估 {completed_evals} 个")
+
+    # 并发处理所有query
+    await asyncio.gather(*[get_and_evaluate_single_query(q) for q in queries])
+
+    # 保存到context
+    context.all_sug_queries = sug_results
+    context.evaluation_results = all_evaluations
+
+    print(f"\n总计:获取 {total_sugs} 个推荐词,完成 {completed_evals} 个评估")
+
+    return sug_results, all_evaluations
+
+
+async def evaluate_single_sug_with_semaphore(
+    source_query: str,
+    sug_query: str,
+    original_question: str,
+    question_annotation: str,
+    semaphore: asyncio.Semaphore
+) -> dict:
+    """带信号量的单个推荐词评估"""
+    async with semaphore:
+        eval_input = f"""
+<原始问题>
+{original_question}
+</原始问题>
+
+<问题标注(三层)>
+{question_annotation}
+</问题标注(三层)>
+
+<待评估的推荐query>
+{sug_query}
+</待评估的推荐query>
+
+请评估该推荐query:
+1. intent_match: 意图是否匹配(true/false)
+2. relevance_score: 相关性分数(0-1)
+3. reason: 详细的评估理由
+
+评估时请参考问题标注中的[本质]、[硬]、[软]标记。
+"""
+        result = await Runner.run(evaluator, eval_input)
+        evaluation: RelevanceEvaluation = result.final_output
+        return {
+            "source_query": source_query,
+            "sug_query": sug_query,
+            "intent_match": evaluation.intent_match,
+            "relevance_score": evaluation.relevance_score,
+            "reason": evaluation.reason,
+        }
+
+
 async def fetch_suggestions_for_queries(queries: list[str], context: RunContext) -> list[dict]:
     """
     并发获取所有query的推荐词(带并发控制)
@@ -450,20 +559,19 @@ async def combinatorial_search(context: RunContext, max_combination_size: int =
     query_combinations = generate_query_combinations(context.keywords, max_combination_size)
     context.query_combinations = query_combinations
 
-    # 步骤4:获取所有query的推荐词
+    # 步骤4:获取所有query的推荐词,并流式评估
     all_queries = []
     for level, queries in query_combinations.items():
         all_queries.extend(queries)
 
-    sug_results = await fetch_suggestions_for_queries(all_queries, context)
-    context.all_sug_queries = sug_results
-
-    # 统计
-    total_sugs = sum(len(r["suggestions"]) for r in sug_results)
-    print(f"\n总共获取到 {total_sugs} 个推荐词")
+    print(f"\n{'='*60}")
+    print(f"流式处理:边获取推荐词边评估(API并发度:{API_CONCURRENCY_LIMIT},模型并发度:{MODEL_CONCURRENCY_LIMIT})")
+    print(f"{'='*60}")
 
-    # 步骤5:评估所有推荐词(使用原始问题和标注)
-    evaluations = await evaluate_all_suggestions(sug_results, context.q, annotation, context)
+    # 流式处理:边获取边评估
+    sug_results, evaluations = await fetch_and_evaluate_streaming(
+        all_queries, context.q, annotation, context
+    )
 
     # 步骤6:筛选合格query
     qualified = find_qualified_queries(evaluations, min_relevance_score=0.7)