| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629 |
- #!/usr/bin/env python3
- # -*- coding: utf-8 -*-
- """
- LLM评估模块
- 用于评估搜索词质量和搜索结果相关度
- """
- import logging
- from typing import List, Dict, Any, Optional
- from concurrent.futures import ThreadPoolExecutor, as_completed
- from openrouter_client import OpenRouterClient
- logger = logging.getLogger(__name__)
- class LLMEvaluator:
- """LLM评估器"""
- def __init__(self, openrouter_client: OpenRouterClient):
- """
- 初始化评估器
- Args:
- openrouter_client: OpenRouter客户端实例
- """
- self.client = openrouter_client
- def evaluate_search_word(
- self,
- original_feature: str,
- search_word: str
- ) -> Dict[str, Any]:
- """
- 评估搜索词质量(阶段4)
- Args:
- original_feature: 原始特征名称
- search_word: 组合搜索词
- Returns:
- 评估结果
- """
- prompt = f"""你是一个小红书内容分析专家。
- # 任务说明
- 从给定关键词中提取并组合适合在小红书搜索的query词(目标是找到【{original_feature}】相关内容,但query中不能直接出现"{original_feature}")
- ## 可选词汇
- {search_word}
- ## 要求
- 1. 只能使用可选词汇中的词,可以进行以下变化:
- - 直接使用原词或括号内的同义词
- - 多个词组合
- - 适当精简
- 2. 不能添加可选词汇以外的新词
- 3. 按推荐程度排序(越靠前越推荐)
- ## 输出格式(JSON)
- {{
- "score": 0.75,
- "reasoning": "评估理由"
- }}
- 注意:只返回JSON,不要其他内容。"""
- result = self.client.chat_json(prompt=prompt, max_retries=3)
- if result:
- return {
- "score": result.get("score", 0.0),
- "reasoning": result.get("reasoning", ""),
- "original_feature": original_feature
- }
- else:
- logger.error(f"评估搜索词失败: {search_word}")
- return {
- "score": 0.0,
- "reasoning": "LLM评估失败",
- "original_feature": original_feature
- }
- def evaluate_search_words_batch(
- self,
- original_feature: str,
- search_words: List[str],
- max_workers: int = 5
- ) -> List[Dict[str, Any]]:
- """
- 批量评估搜索词(并行)
- Args:
- original_feature: 原始特征
- search_words: 搜索词列表
- max_workers: 最大并发数
- Returns:
- 评估结果列表(已排序)
- """
- logger.info(f"开始批量评估 {len(search_words)} 个搜索词...")
- results = []
- with ThreadPoolExecutor(max_workers=max_workers) as executor:
- # 提交任务
- future_to_word = {
- executor.submit(self.evaluate_search_word, original_feature, word): word
- for word in search_words
- }
- # 收集结果
- for idx, future in enumerate(as_completed(future_to_word), 1):
- word = future_to_word[future]
- try:
- result = future.result()
- result["search_word"] = word
- results.append(result)
- logger.info(f" [{idx}/{len(search_words)}] {word}: {result['score']:.3f}")
- except Exception as e:
- logger.error(f" 评估失败: {word}, 错误: {e}")
- results.append({
- "search_word": word,
- "score": 0.0,
- "reasoning": f"评估异常: {str(e)}",
- "original_feature": original_feature
- })
- # 按分数排序
- results.sort(key=lambda x: x["score"], reverse=True)
- # 添加排名
- for rank, result in enumerate(results, 1):
- result["rank"] = rank
- logger.info(f"批量评估完成,最高分: {results[0]['score']:.3f}")
- return results
- def evaluate_search_words_in_batches(
- self,
- original_feature: str,
- search_words: List[str],
- batch_size: int = 50
- ) -> List[Dict[str, Any]]:
- """
- 分批评估搜索词(每批N个,减少API调用)
- Args:
- original_feature: 原始特征
- search_words: 搜索词列表
- batch_size: 每批处理的搜索词数量,默认10
- Returns:
- 评估结果列表(已排序)
- """
- logger.info(f"开始分批评估 {len(search_words)} 个搜索词(每批 {batch_size} 个)...")
- all_results = []
- total_batches = (len(search_words) + batch_size - 1) // batch_size
- # 分批处理
- for batch_idx in range(total_batches):
- start_idx = batch_idx * batch_size
- end_idx = min(start_idx + batch_size, len(search_words))
- batch_words = search_words[start_idx:end_idx]
- logger.info(f" 处理第 {batch_idx + 1}/{total_batches} 批({len(batch_words)} 个搜索词)")
- # 从搜索词中提取所有独特的词作为可选词汇
- available_words_set = set()
- for word in batch_words:
- # 分割搜索词,提取单个词
- parts = word.split()
- available_words_set.update(parts)
- # 转换为列表并排序(保证稳定性)
- available_words = sorted(list(available_words_set))
- # 构建可选词汇字符串(逗号分隔)
- available_words_str = "、".join(available_words)
- prompt = f"""
- # 任务说明
- 从给定关键词中提取并组合适合在小红书搜索的query词(目标是找到【{original_feature}】相关内容,但query中不能直接出现"{original_feature}"二字)
- ## 可选词汇
- {available_words_str}
- ## 要求
- 1. 只能使用可选词汇中的词,可以进行以下变化:
- - 直接使用原词或括号内的同义词
- - 多个词组合
- - 适当精简
- 2. 不能添加可选词汇以外的新词
- 3. 按推荐程度排序(越靠前越推荐)
- ## 输出格式(JSON):
- [
- {{
- "index": 1,
- "search_word": "组合的搜索词",
- "score": 0.85,
- "reasoning": "推荐理由"
- }},
- {{
- "index": 2,
- "search_word": "组合的搜索词",
- "score": 0.80,
- "reasoning": "推荐理由"
- }}
- ]
- - 只返回JSON数组,不要其他内容"""
- # 调用LLM
- result = self.client.chat_json(prompt=prompt, max_retries=3)
- if result and isinstance(result, list):
- # 处理结果 - 新格式直接包含search_word
- for idx, item in enumerate(result):
- search_word = item.get("search_word", "")
- if search_word: # 确保有搜索词
- all_results.append({
- "search_word": search_word,
- "score": item.get("score", 0.0),
- "reasoning": item.get("reasoning", ""),
- "original_feature": original_feature
- })
- logger.info(f" [{start_idx + idx + 1}/{len(search_words)}] "
- f"{search_word}: {item.get('score', 0.0):.3f}")
- else:
- logger.error(f" 第 {batch_idx + 1} 批评估失败,跳过")
- # 为失败的批次添加默认结果(使用原搜索词)
- for word in batch_words:
- all_results.append({
- "search_word": word,
- "score": 0.0,
- "reasoning": "批量评估失败",
- "original_feature": original_feature
- })
- # 按分数排序
- all_results.sort(key=lambda x: x["score"], reverse=True)
- # 添加排名
- for rank, result in enumerate(all_results, 1):
- result["rank"] = rank
- logger.info(f"分批评估完成,最高分: {all_results[0]['score']:.3f} (总API调用: {total_batches} 次)")
- return all_results
- def evaluate_single_note(
- self,
- original_feature: str,
- search_word: str,
- note: Dict[str, Any],
- note_index: int = 0
- ) -> Dict[str, Any]:
- """
- 评估单个帖子(阶段6,多模态)
- Args:
- original_feature: 原始特征
- search_word: 搜索词
- note: 单个帖子
- note_index: 帖子索引
- Returns:
- 单个帖子的评估结果
- """
- card = note.get("note_card", {})
- title = card.get("display_title", "")
- desc = card.get("desc", "")[:500] # 限制长度
- images = card.get("image_list", [])[:10] # 最多10张图
- prompt = f"""你是一个小红书内容分析专家。
- 任务:评估这个帖子是否包含目标特征"{original_feature}"的元素
- 原始特征:"{original_feature}"
- 搜索词:"{search_word}"
- 帖子内容:
- 标题: {title}
- 正文: {desc}
- 请分析帖子的文字和图片内容,返回JSON格式:
- {{
- "relevance": 0.85, // 0.0-1.0,相关度
- "matched_elements": ["元素1", "元素2"], // 匹配的元素列表
- "reasoning": "简短的匹配理由"
- }}
- 只返回JSON,不要其他内容。"""
- result = self.client.chat_json(
- prompt=prompt,
- images=images if images else None,
- max_retries=3
- )
- if result:
- return {
- "note_index": note_index,
- "relevance": result.get("relevance", 0.0),
- "matched_elements": result.get("matched_elements", []),
- "reasoning": result.get("reasoning", "")
- }
- else:
- logger.error(f" 评估帖子 {note_index} 失败: {search_word}")
- return {
- "note_index": note_index,
- "relevance": 0.0,
- "matched_elements": [],
- "reasoning": "评估失败"
- }
- def evaluate_search_results_parallel(
- self,
- original_feature: str,
- search_word: str,
- notes: List[Dict[str, Any]],
- max_notes: int = 20,
- max_workers: int = 20
- ) -> Dict[str, Any]:
- """
- 并行评估搜索结果(每个帖子独立评估)
- Args:
- original_feature: 原始特征
- search_word: 搜索词
- notes: 帖子列表
- max_notes: 最多评估几条帖子
- max_workers: 最大并发数
- Returns:
- 评估结果汇总
- """
- if not notes:
- return {
- "overall_relevance": 0.0,
- "extracted_elements": [],
- "evaluated_notes": []
- }
- notes_to_eval = notes[:max_notes]
- evaluated_notes = []
- logger.info(f" 并行评估 {len(notes_to_eval)} 个帖子({max_workers}并发)")
- # 20并发评估每个帖子
- with ThreadPoolExecutor(max_workers=max_workers) as executor:
- futures = []
- for idx, note in enumerate(notes_to_eval):
- future = executor.submit(
- self.evaluate_single_note,
- original_feature,
- search_word,
- note,
- idx
- )
- futures.append(future)
- # 收集结果
- for future in as_completed(futures):
- try:
- result = future.result()
- evaluated_notes.append(result)
- except Exception as e:
- logger.error(f" 评估帖子失败: {e}")
- # 按note_index排序
- evaluated_notes.sort(key=lambda x: x['note_index'])
- # 汇总:计算整体相关度和提取元素
- if evaluated_notes:
- overall_relevance = sum(n['relevance'] for n in evaluated_notes) / len(evaluated_notes)
- # 提取所有元素并统计频次
- element_counts = {}
- for note in evaluated_notes:
- for elem in note['matched_elements']:
- element_counts[elem] = element_counts.get(elem, 0) + 1
- # 按频次排序,取前5个
- extracted_elements = sorted(
- element_counts.keys(),
- key=lambda x: element_counts[x],
- reverse=True
- )[:5]
- else:
- overall_relevance = 0.0
- extracted_elements = []
- return {
- "overall_relevance": overall_relevance,
- "extracted_elements": extracted_elements,
- "evaluated_notes": evaluated_notes
- }
- def evaluate_search_results(
- self,
- original_feature: str,
- search_word: str,
- notes: List[Dict[str, Any]],
- max_notes: int = 5,
- max_images_per_note: int = 10
- ) -> Dict[str, Any]:
- """
- 评估搜索结果(阶段6,多模态)
- Args:
- original_feature: 原始特征
- search_word: 搜索词
- notes: 帖子列表
- max_notes: 最多评估几条帖子
- max_images_per_note: 每条帖子最多取几张图片
- Returns:
- 评估结果
- """
- if not notes:
- return {
- "overall_relevance": 0.0,
- "extracted_elements": [],
- "recommended_extension": None,
- "evaluated_notes": []
- }
- # 限制评估数量
- notes_to_eval = notes[:max_notes]
- # 准备文本信息
- notes_info = []
- all_images = []
- for idx, note in enumerate(notes_to_eval):
- card = note.get("note_card", {})
- title = card.get("display_title", "")
- desc = card.get("desc", "")[:300] # 限制长度
- notes_info.append({
- "index": idx,
- "title": title,
- "desc": desc
- })
- # 收集图片
- images = card.get("image_list", [])[:max_images_per_note]
- all_images.extend(images)
- # 构建提示词
- notes_text = "\n\n".join([
- f"帖子 {n['index']}:\n标题: {n['title']}\n正文: {n['desc']}"
- for n in notes_info
- ])
- prompt = f"""你是一个小红书内容分析专家。
- 任务:评估搜索结果是否包含目标特征的元素
- 原始特征:"{original_feature}"
- 搜索词:"{search_word}"
- 帖子数量:{len(notes_to_eval)} 条
- 帖子内容:
- {notes_text}
- 请综合分析帖子的文字和图片内容,判断:
- 1. 这些搜索结果中是否包含与"{original_feature}"相似的元素
- 2. 提取最相关的元素关键词(2-4个字的词组)
- 3. 推荐最适合用于扩展搜索的关键词
- 返回JSON格式:
- {{
- "overall_relevance": 0.72, // 0.0-1.0,整体相关度
- "extracted_elements": ["关键词1", "关键词2", "关键词3"], // 提取的相似元素,按相关度排序
- "recommended_extension": "关键词1", // 最优的扩展关键词
- "evaluated_notes": [
- {{
- "note_index": 0, // 帖子索引
- "relevance": 0.85, // 该帖子的相关度
- "matched_elements": ["元素1", "元素2"], // 该帖子匹配的元素
- "reasoning": "简短的匹配理由"
- }}
- ]
- }}
- 注意:
- - extracted_elements 应该是帖子中实际包含的、与原始特征相似的元素
- - 优先提取在图片或文字中明显出现的元素
- - 只返回JSON,不要其他内容"""
- # 调用LLM(带图片)
- result = self.client.chat_json(
- prompt=prompt,
- images=all_images if all_images else None,
- max_retries=3
- )
- if result:
- # 确保返回完整格式
- return {
- "overall_relevance": result.get("overall_relevance", 0.0),
- "extracted_elements": result.get("extracted_elements", []),
- "recommended_extension": result.get("recommended_extension"),
- "evaluated_notes": result.get("evaluated_notes", [])
- }
- else:
- logger.error(f"评估搜索结果失败: {search_word}")
- return {
- "overall_relevance": 0.0,
- "extracted_elements": [],
- "recommended_extension": None,
- "evaluated_notes": []
- }
- def batch_evaluate_search_results(
- self,
- features_with_results: List[Dict[str, Any]],
- max_workers: int = 3
- ) -> List[Dict[str, Any]]:
- """
- 批量评估搜索结果(并行,但并发数较低以避免超时)
- Args:
- features_with_results: 带搜索结果的特征列表
- max_workers: 最大并发数
- Returns:
- 带评估结果的特征列表
- """
- logger.info(f"开始批量评估 {len(features_with_results)} 个搜索结果...")
- results = []
- with ThreadPoolExecutor(max_workers=max_workers) as executor:
- # 提交任务
- future_to_feature = {}
- for feature in features_with_results:
- if not feature.get("search_result"):
- # 无搜索结果,跳过
- feature["result_evaluation"] = None
- results.append(feature)
- continue
- original_feature = self._get_original_feature(feature)
- search_word = feature.get("search_word", "")
- notes = feature["search_result"].get("data", {}).get("data", [])
- future = executor.submit(
- self.evaluate_search_results,
- original_feature,
- search_word,
- notes
- )
- future_to_feature[future] = feature
- # 收集结果
- for idx, future in enumerate(as_completed(future_to_feature), 1):
- feature = future_to_feature[future]
- try:
- evaluation = future.result()
- feature["result_evaluation"] = evaluation
- results.append(feature)
- logger.info(f" [{idx}/{len(future_to_feature)}] {feature.get('search_word')}: "
- f"relevance={evaluation['overall_relevance']:.3f}")
- except Exception as e:
- logger.error(f" 评估失败: {feature.get('search_word')}, 错误: {e}")
- feature["result_evaluation"] = None
- results.append(feature)
- logger.info(f"批量评估完成")
- return results
- def _get_original_feature(self, feature_node: Dict[str, Any]) -> str:
- """
- 从特征节点中获取原始特征名称
- Args:
- feature_node: 特征节点
- Returns:
- 原始特征名称
- """
- # 尝试从llm_evaluation中获取
- if "llm_evaluation" in feature_node:
- return feature_node["llm_evaluation"].get("original_feature", "")
- # 尝试从其他字段获取
- return feature_node.get("原始特征名称", feature_node.get("特征名称", ""))
- def test_evaluator():
- """测试评估器"""
- import os
- # 初始化客户端
- client = OpenRouterClient()
- evaluator = LLMEvaluator(client)
- # 测试搜索词评估
- print("\n=== 测试搜索词评估 ===")
- result = evaluator.evaluate_search_word(
- original_feature="拟人",
- search_word="宠物猫 猫咪"
- )
- print(f"评分: {result['score']:.3f}")
- print(f"理由: {result['reasoning']}")
- # 测试批量评估
- print("\n=== 测试批量评估 ===")
- results = evaluator.evaluate_search_words_batch(
- original_feature="拟人",
- search_words=["宠物猫 猫咪", "宠物猫 猫孩子", "宠物猫 猫"],
- max_workers=2
- )
- for r in results:
- print(f"{r['search_word']}: {r['score']:.3f} (rank={r['rank']})")
- if __name__ == "__main__":
- logging.basicConfig(
- level=logging.INFO,
- format='%(asctime)s - %(levelname)s - %(message)s'
- )
- test_evaluator()
|