#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ LLM评估模块 用于评估搜索词质量和搜索结果相关度 """ import logging from typing import List, Dict, Any, Optional from concurrent.futures import ThreadPoolExecutor, as_completed from openrouter_client import OpenRouterClient logger = logging.getLogger(__name__) class LLMEvaluator: """LLM评估器""" def __init__(self, openrouter_client: OpenRouterClient): """ 初始化评估器 Args: openrouter_client: OpenRouter客户端实例 """ self.client = openrouter_client def evaluate_search_word( self, original_feature: str, search_word: str ) -> Dict[str, Any]: """ 评估搜索词质量(阶段4) Args: original_feature: 原始特征名称 search_word: 组合搜索词 Returns: 评估结果 """ prompt = f"""你是一个小红书内容分析专家。 # 任务说明 从给定关键词中提取并组合适合在小红书搜索的query词(目标是找到【{original_feature}】相关内容,但query中不能直接出现"{original_feature}") ## 可选词汇 {search_word} ## 要求 1. 只能使用可选词汇中的词,可以进行以下变化: - 直接使用原词或括号内的同义词 - 多个词组合 - 适当精简 2. 不能添加可选词汇以外的新词 3. 按推荐程度排序(越靠前越推荐) ## 输出格式(JSON) {{ "score": 0.75, "reasoning": "评估理由" }} 注意:只返回JSON,不要其他内容。""" result = self.client.chat_json(prompt=prompt, max_retries=3) if result: return { "score": result.get("score", 0.0), "reasoning": result.get("reasoning", ""), "original_feature": original_feature } else: logger.error(f"评估搜索词失败: {search_word}") return { "score": 0.0, "reasoning": "LLM评估失败", "original_feature": original_feature } def evaluate_search_words_batch( self, original_feature: str, search_words: List[str], max_workers: int = 5 ) -> List[Dict[str, Any]]: """ 批量评估搜索词(并行) Args: original_feature: 原始特征 search_words: 搜索词列表 max_workers: 最大并发数 Returns: 评估结果列表(已排序) """ logger.info(f"开始批量评估 {len(search_words)} 个搜索词...") results = [] with ThreadPoolExecutor(max_workers=max_workers) as executor: # 提交任务 future_to_word = { executor.submit(self.evaluate_search_word, original_feature, word): word for word in search_words } # 收集结果 for idx, future in enumerate(as_completed(future_to_word), 1): word = future_to_word[future] try: result = future.result() result["search_word"] = word results.append(result) logger.info(f" [{idx}/{len(search_words)}] {word}: {result['score']:.3f}") except Exception as e: logger.error(f" 评估失败: {word}, 错误: {e}") results.append({ "search_word": word, "score": 0.0, "reasoning": f"评估异常: {str(e)}", "original_feature": original_feature }) # 按分数排序 results.sort(key=lambda x: x["score"], reverse=True) # 添加排名 for rank, result in enumerate(results, 1): result["rank"] = rank logger.info(f"批量评估完成,最高分: {results[0]['score']:.3f}") return results def evaluate_search_words_in_batches( self, original_feature: str, search_words: List[str], batch_size: int = 50 ) -> List[Dict[str, Any]]: """ 分批评估搜索词(每批N个,减少API调用) Args: original_feature: 原始特征 search_words: 搜索词列表 batch_size: 每批处理的搜索词数量,默认10 Returns: 评估结果列表(已排序) """ logger.info(f"开始分批评估 {len(search_words)} 个搜索词(每批 {batch_size} 个)...") all_results = [] total_batches = (len(search_words) + batch_size - 1) // batch_size # 分批处理 for batch_idx in range(total_batches): start_idx = batch_idx * batch_size end_idx = min(start_idx + batch_size, len(search_words)) batch_words = search_words[start_idx:end_idx] logger.info(f" 处理第 {batch_idx + 1}/{total_batches} 批({len(batch_words)} 个搜索词)") # 从搜索词中提取所有独特的词作为可选词汇 available_words_set = set() for word in batch_words: # 分割搜索词,提取单个词 parts = word.split() available_words_set.update(parts) # 转换为列表并排序(保证稳定性) available_words = sorted(list(available_words_set)) # 构建可选词汇字符串(逗号分隔) available_words_str = "、".join(available_words) prompt = f""" # 任务说明 从给定关键词中提取并组合适合在小红书搜索的query词(目标是找到【{original_feature}】相关内容,但query中不能直接出现"{original_feature}"二字) ## 可选词汇 {available_words_str} ## 要求 1. 只能使用可选词汇中的词,可以进行以下变化: - 直接使用原词或括号内的同义词 - 多个词组合 - 适当精简 2. 不能添加可选词汇以外的新词 3. 按推荐程度排序(越靠前越推荐),取top10 ## 输出格式(JSON): [ {{ "rank": 1, "search_word": "组合的搜索词", "source_word": "组合来源词,空格分割", "score": 0.85, "reasoning": "推荐理由" }}, {{ "index": 2, "search_word": "组合的搜索词", "source_word": "组合来源词,空格分割", "score": 0.80, "reasoning": "推荐理由" }} ] - 只返回JSON数组,不要其他内容""" # 调用LLM result = self.client.chat_json(prompt=prompt, max_retries=3) if result and isinstance(result, list): # 处理结果 - 新格式直接包含search_word for idx, item in enumerate(result): search_word = item.get("search_word", "") if search_word: # 确保有搜索词 all_results.append({ "search_word": search_word, "source_word": item.get("source_word", ""), "score": item.get("score", 0.0), "reasoning": item.get("reasoning", ""), "original_feature": original_feature }) logger.info(f" [{start_idx + idx + 1}/{len(search_words)}] " f"{search_word}: {item.get('score', 0.0):.3f}") else: logger.error(f" 第 {batch_idx + 1} 批评估失败,跳过") # 为失败的批次添加默认结果(使用原搜索词) for word in batch_words: all_results.append({ "search_word": word, "score": 0.0, "reasoning": "批量评估失败", "original_feature": original_feature }) # 按分数排序 all_results.sort(key=lambda x: x["score"], reverse=True) # 添加排名 for rank, result in enumerate(all_results, 1): result["rank"] = rank logger.info(f"分批评估完成,最高分: {all_results[0]['score']:.3f} (总API调用: {total_batches} 次)") return all_results def evaluate_single_note( self, original_feature: str, search_word: str, note: Dict[str, Any], note_index: int = 0 ) -> Dict[str, Any]: """ 评估单个帖子(阶段6,多模态) Args: original_feature: 原始特征 search_word: 搜索词 note: 单个帖子 note_index: 帖子索引 Returns: 单个帖子的评估结果 """ card = note.get("note_card", {}) title = card.get("display_title", "") desc = card.get("desc", "")[:500] # 限制长度 images = card.get("image_list", [])[:10] # 最多10张图 prompt = f"""你是一个小红书内容分析专家。 任务:评估这个帖子是否包含目标特征"{original_feature}"的元素 原始特征:"{original_feature}" 搜索词:"{search_word}" 帖子内容: 标题: {title} 正文: {desc} 请分析帖子的文字和图片内容,返回JSON格式: {{ "relevance": 0.85, // 0.0-1.0,相关度 "matched_elements": ["元素1", "元素2"], // 匹配的元素列表 "reasoning": "简短的匹配理由" }} 只返回JSON,不要其他内容。""" result = self.client.chat_json( prompt=prompt, images=images if images else None, max_retries=3 ) if result: return { "note_index": note_index, "relevance": result.get("relevance", 0.0), "matched_elements": result.get("matched_elements", []), "reasoning": result.get("reasoning", "") } else: logger.error(f" 评估帖子 {note_index} 失败: {search_word}") return { "note_index": note_index, "relevance": 0.0, "matched_elements": [], "reasoning": "评估失败" } def evaluate_search_results_parallel( self, original_feature: str, search_word: str, notes: List[Dict[str, Any]], max_notes: int = 20, max_workers: int = 20 ) -> Dict[str, Any]: """ 并行评估搜索结果(每个帖子独立评估) Args: original_feature: 原始特征 search_word: 搜索词 notes: 帖子列表 max_notes: 最多评估几条帖子 max_workers: 最大并发数 Returns: 评估结果汇总 """ if not notes: return { "overall_relevance": 0.0, "extracted_elements": [], "evaluated_notes": [] } notes_to_eval = notes[:max_notes] evaluated_notes = [] logger.info(f" 并行评估 {len(notes_to_eval)} 个帖子({max_workers}并发)") # 20并发评估每个帖子 with ThreadPoolExecutor(max_workers=max_workers) as executor: futures = [] for idx, note in enumerate(notes_to_eval): future = executor.submit( self.evaluate_single_note, original_feature, search_word, note, idx ) futures.append(future) # 收集结果 for future in as_completed(futures): try: result = future.result() evaluated_notes.append(result) except Exception as e: logger.error(f" 评估帖子失败: {e}") # 按note_index排序 evaluated_notes.sort(key=lambda x: x['note_index']) # 汇总:计算整体相关度和提取元素 if evaluated_notes: overall_relevance = sum(n['relevance'] for n in evaluated_notes) / len(evaluated_notes) # 提取所有元素并统计频次 element_counts = {} for note in evaluated_notes: for elem in note['matched_elements']: element_counts[elem] = element_counts.get(elem, 0) + 1 # 按频次排序,取前5个 extracted_elements = sorted( element_counts.keys(), key=lambda x: element_counts[x], reverse=True )[:5] else: overall_relevance = 0.0 extracted_elements = [] return { "overall_relevance": overall_relevance, "extracted_elements": extracted_elements, "evaluated_notes": evaluated_notes } def evaluate_search_results( self, original_feature: str, search_word: str, notes: List[Dict[str, Any]], max_notes: int = 5, max_images_per_note: int = 10 ) -> Dict[str, Any]: """ 评估搜索结果(阶段6,多模态) Args: original_feature: 原始特征 search_word: 搜索词 notes: 帖子列表 max_notes: 最多评估几条帖子 max_images_per_note: 每条帖子最多取几张图片 Returns: 评估结果 """ if not notes: return { "overall_relevance": 0.0, "extracted_elements": [], "recommended_extension": None, "evaluated_notes": [] } # 限制评估数量 notes_to_eval = notes[:max_notes] # 准备文本信息 notes_info = [] all_images = [] for idx, note in enumerate(notes_to_eval): card = note.get("note_card", {}) title = card.get("display_title", "") desc = card.get("desc", "")[:300] # 限制长度 notes_info.append({ "index": idx, "title": title, "desc": desc }) # 收集图片 images = card.get("image_list", [])[:max_images_per_note] all_images.extend(images) # 构建提示词 notes_text = "\n\n".join([ f"帖子 {n['index']}:\n标题: {n['title']}\n正文: {n['desc']}" for n in notes_info ]) prompt = f"""你是一个小红书内容分析专家。 任务:评估搜索结果是否包含目标特征的元素 原始特征:"{original_feature}" 搜索词:"{search_word}" 帖子数量:{len(notes_to_eval)} 条 帖子内容: {notes_text} 请综合分析帖子的文字和图片内容,判断: 1. 这些搜索结果中是否包含与"{original_feature}"相似的元素 2. 提取最相关的元素关键词(2-4个字的词组) 3. 推荐最适合用于扩展搜索的关键词 返回JSON格式: {{ "overall_relevance": 0.72, // 0.0-1.0,整体相关度 "extracted_elements": ["关键词1", "关键词2", "关键词3"], // 提取的相似元素,按相关度排序 "recommended_extension": "关键词1", // 最优的扩展关键词 "evaluated_notes": [ {{ "note_index": 0, // 帖子索引 "relevance": 0.85, // 该帖子的相关度 "matched_elements": ["元素1", "元素2"], // 该帖子匹配的元素 "reasoning": "简短的匹配理由" }} ] }} 注意: - extracted_elements 应该是帖子中实际包含的、与原始特征相似的元素 - 优先提取在图片或文字中明显出现的元素 - 只返回JSON,不要其他内容""" # 调用LLM(带图片) result = self.client.chat_json( prompt=prompt, images=all_images if all_images else None, max_retries=3 ) if result: # 确保返回完整格式 return { "overall_relevance": result.get("overall_relevance", 0.0), "extracted_elements": result.get("extracted_elements", []), "recommended_extension": result.get("recommended_extension"), "evaluated_notes": result.get("evaluated_notes", []) } else: logger.error(f"评估搜索结果失败: {search_word}") return { "overall_relevance": 0.0, "extracted_elements": [], "recommended_extension": None, "evaluated_notes": [] } def batch_evaluate_search_results( self, features_with_results: List[Dict[str, Any]], max_workers: int = 3 ) -> List[Dict[str, Any]]: """ 批量评估搜索结果(并行,但并发数较低以避免超时) Args: features_with_results: 带搜索结果的特征列表 max_workers: 最大并发数 Returns: 带评估结果的特征列表 """ logger.info(f"开始批量评估 {len(features_with_results)} 个搜索结果...") results = [] with ThreadPoolExecutor(max_workers=max_workers) as executor: # 提交任务 future_to_feature = {} for feature in features_with_results: if not feature.get("search_result"): # 无搜索结果,跳过 feature["result_evaluation"] = None results.append(feature) continue original_feature = self._get_original_feature(feature) search_word = feature.get("search_word", "") notes = feature["search_result"].get("data", {}).get("data", []) future = executor.submit( self.evaluate_search_results, original_feature, search_word, notes ) future_to_feature[future] = feature # 收集结果 for idx, future in enumerate(as_completed(future_to_feature), 1): feature = future_to_feature[future] try: evaluation = future.result() feature["result_evaluation"] = evaluation results.append(feature) logger.info(f" [{idx}/{len(future_to_feature)}] {feature.get('search_word')}: " f"relevance={evaluation['overall_relevance']:.3f}") except Exception as e: logger.error(f" 评估失败: {feature.get('search_word')}, 错误: {e}") feature["result_evaluation"] = None results.append(feature) logger.info(f"批量评估完成") return results def _get_original_feature(self, feature_node: Dict[str, Any]) -> str: """ 从特征节点中获取原始特征名称 Args: feature_node: 特征节点 Returns: 原始特征名称 """ # 尝试从llm_evaluation中获取 if "llm_evaluation" in feature_node: return feature_node["llm_evaluation"].get("original_feature", "") # 尝试从其他字段获取 return feature_node.get("原始特征名称", feature_node.get("特征名称", "")) def test_evaluator(): """测试评估器""" import os # 初始化客户端 client = OpenRouterClient() evaluator = LLMEvaluator(client) # 测试搜索词评估 print("\n=== 测试搜索词评估 ===") result = evaluator.evaluate_search_word( original_feature="拟人", search_word="宠物猫 猫咪" ) print(f"评分: {result['score']:.3f}") print(f"理由: {result['reasoning']}") # 测试批量评估 print("\n=== 测试批量评估 ===") results = evaluator.evaluate_search_words_batch( original_feature="拟人", search_words=["宠物猫 猫咪", "宠物猫 猫孩子", "宠物猫 猫"], max_workers=2 ) for r in results: print(f"{r['search_word']}: {r['score']:.3f} (rank={r['rank']})") if __name__ == "__main__": logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s' ) test_evaluator()