llm_evaluator.py 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631
  1. #!/usr/bin/env python3
  2. # -*- coding: utf-8 -*-
  3. """
  4. LLM评估模块
  5. 用于评估搜索词质量和搜索结果相关度
  6. """
  7. import logging
  8. from typing import List, Dict, Any, Optional
  9. from concurrent.futures import ThreadPoolExecutor, as_completed
  10. from openrouter_client import OpenRouterClient
  11. logger = logging.getLogger(__name__)
  12. class LLMEvaluator:
  13. """LLM评估器"""
  14. def __init__(self, openrouter_client: OpenRouterClient):
  15. """
  16. 初始化评估器
  17. Args:
  18. openrouter_client: OpenRouter客户端实例
  19. """
  20. self.client = openrouter_client
  21. def evaluate_search_word(
  22. self,
  23. original_feature: str,
  24. search_word: str
  25. ) -> Dict[str, Any]:
  26. """
  27. 评估搜索词质量(阶段4)
  28. Args:
  29. original_feature: 原始特征名称
  30. search_word: 组合搜索词
  31. Returns:
  32. 评估结果
  33. """
  34. prompt = f"""你是一个小红书内容分析专家。
  35. # 任务说明
  36. 从给定关键词中提取并组合适合在小红书搜索的query词(目标是找到【{original_feature}】相关内容,但query中不能直接出现"{original_feature}")
  37. ## 可选词汇
  38. {search_word}
  39. ## 要求
  40. 1. 只能使用可选词汇中的词,可以进行以下变化:
  41. - 直接使用原词或括号内的同义词
  42. - 多个词组合
  43. - 适当精简
  44. 2. 不能添加可选词汇以外的新词
  45. 3. 按推荐程度排序(越靠前越推荐)
  46. ## 输出格式(JSON)
  47. {{
  48. "score": 0.75,
  49. "reasoning": "评估理由"
  50. }}
  51. 注意:只返回JSON,不要其他内容。"""
  52. result = self.client.chat_json(prompt=prompt, max_retries=3)
  53. if result:
  54. return {
  55. "score": result.get("score", 0.0),
  56. "reasoning": result.get("reasoning", ""),
  57. "original_feature": original_feature
  58. }
  59. else:
  60. logger.error(f"评估搜索词失败: {search_word}")
  61. return {
  62. "score": 0.0,
  63. "reasoning": "LLM评估失败",
  64. "original_feature": original_feature
  65. }
  66. def evaluate_search_words_batch(
  67. self,
  68. original_feature: str,
  69. search_words: List[str],
  70. max_workers: int = 5
  71. ) -> List[Dict[str, Any]]:
  72. """
  73. 批量评估搜索词(并行)
  74. Args:
  75. original_feature: 原始特征
  76. search_words: 搜索词列表
  77. max_workers: 最大并发数
  78. Returns:
  79. 评估结果列表(已排序)
  80. """
  81. logger.info(f"开始批量评估 {len(search_words)} 个搜索词...")
  82. results = []
  83. with ThreadPoolExecutor(max_workers=max_workers) as executor:
  84. # 提交任务
  85. future_to_word = {
  86. executor.submit(self.evaluate_search_word, original_feature, word): word
  87. for word in search_words
  88. }
  89. # 收集结果
  90. for idx, future in enumerate(as_completed(future_to_word), 1):
  91. word = future_to_word[future]
  92. try:
  93. result = future.result()
  94. result["search_word"] = word
  95. results.append(result)
  96. logger.info(f" [{idx}/{len(search_words)}] {word}: {result['score']:.3f}")
  97. except Exception as e:
  98. logger.error(f" 评估失败: {word}, 错误: {e}")
  99. results.append({
  100. "search_word": word,
  101. "score": 0.0,
  102. "reasoning": f"评估异常: {str(e)}",
  103. "original_feature": original_feature
  104. })
  105. # 按分数排序
  106. results.sort(key=lambda x: x["score"], reverse=True)
  107. # 添加排名
  108. for rank, result in enumerate(results, 1):
  109. result["rank"] = rank
  110. logger.info(f"批量评估完成,最高分: {results[0]['score']:.3f}")
  111. return results
  112. def evaluate_search_words_in_batches(
  113. self,
  114. original_feature: str,
  115. search_words: List[str],
  116. batch_size: int = 50
  117. ) -> List[Dict[str, Any]]:
  118. """
  119. 分批评估搜索词(每批N个,减少API调用)
  120. Args:
  121. original_feature: 原始特征
  122. search_words: 搜索词列表
  123. batch_size: 每批处理的搜索词数量,默认10
  124. Returns:
  125. 评估结果列表(已排序)
  126. """
  127. logger.info(f"开始分批评估 {len(search_words)} 个搜索词(每批 {batch_size} 个)...")
  128. all_results = []
  129. total_batches = (len(search_words) + batch_size - 1) // batch_size
  130. # 分批处理
  131. for batch_idx in range(total_batches):
  132. start_idx = batch_idx * batch_size
  133. end_idx = min(start_idx + batch_size, len(search_words))
  134. batch_words = search_words[start_idx:end_idx]
  135. logger.info(f" 处理第 {batch_idx + 1}/{total_batches} 批({len(batch_words)} 个搜索词)")
  136. # 从搜索词中提取所有独特的词作为可选词汇
  137. available_words_set = set()
  138. for word in batch_words:
  139. # 分割搜索词,提取单个词
  140. parts = word.split()
  141. available_words_set.update(parts)
  142. # 转换为列表并排序(保证稳定性)
  143. available_words = sorted(list(available_words_set))
  144. # 构建可选词汇字符串(逗号分隔)
  145. available_words_str = "、".join(available_words)
  146. prompt = f"""
  147. # 任务说明
  148. 从给定关键词中提取并组合适合在小红书搜索的query词(目标是找到【{original_feature}】相关内容,但query中不能直接出现"{original_feature}"二字)
  149. ## 可选词汇
  150. {available_words_str}
  151. ## 要求
  152. 1. 只能使用可选词汇中的词,可以进行以下变化:
  153. - 直接使用原词或括号内的同义词
  154. - 多个词组合
  155. - 适当精简
  156. 2. 不能添加可选词汇以外的新词
  157. 3. 按推荐程度排序(越靠前越推荐),取top10
  158. ## 输出格式(JSON):
  159. [
  160. {{
  161. "rank": 1,
  162. "search_word": "组合的搜索词",
  163. "source_word": "组合来源词,空格分割",
  164. "score": 0.85,
  165. "reasoning": "推荐理由"
  166. }},
  167. {{
  168. "index": 2,
  169. "search_word": "组合的搜索词",
  170. "source_word": "组合来源词,空格分割",
  171. "score": 0.80,
  172. "reasoning": "推荐理由"
  173. }}
  174. ]
  175. - 只返回JSON数组,不要其他内容"""
  176. # 调用LLM
  177. result = self.client.chat_json(prompt=prompt, max_retries=3)
  178. if result and isinstance(result, list):
  179. # 处理结果 - 新格式直接包含search_word
  180. for idx, item in enumerate(result):
  181. search_word = item.get("search_word", "")
  182. if search_word: # 确保有搜索词
  183. all_results.append({
  184. "search_word": search_word,
  185. "score": item.get("score", 0.0),
  186. "reasoning": item.get("reasoning", ""),
  187. "original_feature": original_feature
  188. })
  189. logger.info(f" [{start_idx + idx + 1}/{len(search_words)}] "
  190. f"{search_word}: {item.get('score', 0.0):.3f}")
  191. else:
  192. logger.error(f" 第 {batch_idx + 1} 批评估失败,跳过")
  193. # 为失败的批次添加默认结果(使用原搜索词)
  194. for word in batch_words:
  195. all_results.append({
  196. "search_word": word,
  197. "score": 0.0,
  198. "reasoning": "批量评估失败",
  199. "original_feature": original_feature
  200. })
  201. # 按分数排序
  202. all_results.sort(key=lambda x: x["score"], reverse=True)
  203. # 添加排名
  204. for rank, result in enumerate(all_results, 1):
  205. result["rank"] = rank
  206. logger.info(f"分批评估完成,最高分: {all_results[0]['score']:.3f} (总API调用: {total_batches} 次)")
  207. return all_results
  208. def evaluate_single_note(
  209. self,
  210. original_feature: str,
  211. search_word: str,
  212. note: Dict[str, Any],
  213. note_index: int = 0
  214. ) -> Dict[str, Any]:
  215. """
  216. 评估单个帖子(阶段6,多模态)
  217. Args:
  218. original_feature: 原始特征
  219. search_word: 搜索词
  220. note: 单个帖子
  221. note_index: 帖子索引
  222. Returns:
  223. 单个帖子的评估结果
  224. """
  225. card = note.get("note_card", {})
  226. title = card.get("display_title", "")
  227. desc = card.get("desc", "")[:500] # 限制长度
  228. images = card.get("image_list", [])[:10] # 最多10张图
  229. prompt = f"""你是一个小红书内容分析专家。
  230. 任务:评估这个帖子是否包含目标特征"{original_feature}"的元素
  231. 原始特征:"{original_feature}"
  232. 搜索词:"{search_word}"
  233. 帖子内容:
  234. 标题: {title}
  235. 正文: {desc}
  236. 请分析帖子的文字和图片内容,返回JSON格式:
  237. {{
  238. "relevance": 0.85, // 0.0-1.0,相关度
  239. "matched_elements": ["元素1", "元素2"], // 匹配的元素列表
  240. "reasoning": "简短的匹配理由"
  241. }}
  242. 只返回JSON,不要其他内容。"""
  243. result = self.client.chat_json(
  244. prompt=prompt,
  245. images=images if images else None,
  246. max_retries=3
  247. )
  248. if result:
  249. return {
  250. "note_index": note_index,
  251. "relevance": result.get("relevance", 0.0),
  252. "matched_elements": result.get("matched_elements", []),
  253. "reasoning": result.get("reasoning", "")
  254. }
  255. else:
  256. logger.error(f" 评估帖子 {note_index} 失败: {search_word}")
  257. return {
  258. "note_index": note_index,
  259. "relevance": 0.0,
  260. "matched_elements": [],
  261. "reasoning": "评估失败"
  262. }
  263. def evaluate_search_results_parallel(
  264. self,
  265. original_feature: str,
  266. search_word: str,
  267. notes: List[Dict[str, Any]],
  268. max_notes: int = 20,
  269. max_workers: int = 20
  270. ) -> Dict[str, Any]:
  271. """
  272. 并行评估搜索结果(每个帖子独立评估)
  273. Args:
  274. original_feature: 原始特征
  275. search_word: 搜索词
  276. notes: 帖子列表
  277. max_notes: 最多评估几条帖子
  278. max_workers: 最大并发数
  279. Returns:
  280. 评估结果汇总
  281. """
  282. if not notes:
  283. return {
  284. "overall_relevance": 0.0,
  285. "extracted_elements": [],
  286. "evaluated_notes": []
  287. }
  288. notes_to_eval = notes[:max_notes]
  289. evaluated_notes = []
  290. logger.info(f" 并行评估 {len(notes_to_eval)} 个帖子({max_workers}并发)")
  291. # 20并发评估每个帖子
  292. with ThreadPoolExecutor(max_workers=max_workers) as executor:
  293. futures = []
  294. for idx, note in enumerate(notes_to_eval):
  295. future = executor.submit(
  296. self.evaluate_single_note,
  297. original_feature,
  298. search_word,
  299. note,
  300. idx
  301. )
  302. futures.append(future)
  303. # 收集结果
  304. for future in as_completed(futures):
  305. try:
  306. result = future.result()
  307. evaluated_notes.append(result)
  308. except Exception as e:
  309. logger.error(f" 评估帖子失败: {e}")
  310. # 按note_index排序
  311. evaluated_notes.sort(key=lambda x: x['note_index'])
  312. # 汇总:计算整体相关度和提取元素
  313. if evaluated_notes:
  314. overall_relevance = sum(n['relevance'] for n in evaluated_notes) / len(evaluated_notes)
  315. # 提取所有元素并统计频次
  316. element_counts = {}
  317. for note in evaluated_notes:
  318. for elem in note['matched_elements']:
  319. element_counts[elem] = element_counts.get(elem, 0) + 1
  320. # 按频次排序,取前5个
  321. extracted_elements = sorted(
  322. element_counts.keys(),
  323. key=lambda x: element_counts[x],
  324. reverse=True
  325. )[:5]
  326. else:
  327. overall_relevance = 0.0
  328. extracted_elements = []
  329. return {
  330. "overall_relevance": overall_relevance,
  331. "extracted_elements": extracted_elements,
  332. "evaluated_notes": evaluated_notes
  333. }
  334. def evaluate_search_results(
  335. self,
  336. original_feature: str,
  337. search_word: str,
  338. notes: List[Dict[str, Any]],
  339. max_notes: int = 5,
  340. max_images_per_note: int = 10
  341. ) -> Dict[str, Any]:
  342. """
  343. 评估搜索结果(阶段6,多模态)
  344. Args:
  345. original_feature: 原始特征
  346. search_word: 搜索词
  347. notes: 帖子列表
  348. max_notes: 最多评估几条帖子
  349. max_images_per_note: 每条帖子最多取几张图片
  350. Returns:
  351. 评估结果
  352. """
  353. if not notes:
  354. return {
  355. "overall_relevance": 0.0,
  356. "extracted_elements": [],
  357. "recommended_extension": None,
  358. "evaluated_notes": []
  359. }
  360. # 限制评估数量
  361. notes_to_eval = notes[:max_notes]
  362. # 准备文本信息
  363. notes_info = []
  364. all_images = []
  365. for idx, note in enumerate(notes_to_eval):
  366. card = note.get("note_card", {})
  367. title = card.get("display_title", "")
  368. desc = card.get("desc", "")[:300] # 限制长度
  369. notes_info.append({
  370. "index": idx,
  371. "title": title,
  372. "desc": desc
  373. })
  374. # 收集图片
  375. images = card.get("image_list", [])[:max_images_per_note]
  376. all_images.extend(images)
  377. # 构建提示词
  378. notes_text = "\n\n".join([
  379. f"帖子 {n['index']}:\n标题: {n['title']}\n正文: {n['desc']}"
  380. for n in notes_info
  381. ])
  382. prompt = f"""你是一个小红书内容分析专家。
  383. 任务:评估搜索结果是否包含目标特征的元素
  384. 原始特征:"{original_feature}"
  385. 搜索词:"{search_word}"
  386. 帖子数量:{len(notes_to_eval)} 条
  387. 帖子内容:
  388. {notes_text}
  389. 请综合分析帖子的文字和图片内容,判断:
  390. 1. 这些搜索结果中是否包含与"{original_feature}"相似的元素
  391. 2. 提取最相关的元素关键词(2-4个字的词组)
  392. 3. 推荐最适合用于扩展搜索的关键词
  393. 返回JSON格式:
  394. {{
  395. "overall_relevance": 0.72, // 0.0-1.0,整体相关度
  396. "extracted_elements": ["关键词1", "关键词2", "关键词3"], // 提取的相似元素,按相关度排序
  397. "recommended_extension": "关键词1", // 最优的扩展关键词
  398. "evaluated_notes": [
  399. {{
  400. "note_index": 0, // 帖子索引
  401. "relevance": 0.85, // 该帖子的相关度
  402. "matched_elements": ["元素1", "元素2"], // 该帖子匹配的元素
  403. "reasoning": "简短的匹配理由"
  404. }}
  405. ]
  406. }}
  407. 注意:
  408. - extracted_elements 应该是帖子中实际包含的、与原始特征相似的元素
  409. - 优先提取在图片或文字中明显出现的元素
  410. - 只返回JSON,不要其他内容"""
  411. # 调用LLM(带图片)
  412. result = self.client.chat_json(
  413. prompt=prompt,
  414. images=all_images if all_images else None,
  415. max_retries=3
  416. )
  417. if result:
  418. # 确保返回完整格式
  419. return {
  420. "overall_relevance": result.get("overall_relevance", 0.0),
  421. "extracted_elements": result.get("extracted_elements", []),
  422. "recommended_extension": result.get("recommended_extension"),
  423. "evaluated_notes": result.get("evaluated_notes", [])
  424. }
  425. else:
  426. logger.error(f"评估搜索结果失败: {search_word}")
  427. return {
  428. "overall_relevance": 0.0,
  429. "extracted_elements": [],
  430. "recommended_extension": None,
  431. "evaluated_notes": []
  432. }
  433. def batch_evaluate_search_results(
  434. self,
  435. features_with_results: List[Dict[str, Any]],
  436. max_workers: int = 3
  437. ) -> List[Dict[str, Any]]:
  438. """
  439. 批量评估搜索结果(并行,但并发数较低以避免超时)
  440. Args:
  441. features_with_results: 带搜索结果的特征列表
  442. max_workers: 最大并发数
  443. Returns:
  444. 带评估结果的特征列表
  445. """
  446. logger.info(f"开始批量评估 {len(features_with_results)} 个搜索结果...")
  447. results = []
  448. with ThreadPoolExecutor(max_workers=max_workers) as executor:
  449. # 提交任务
  450. future_to_feature = {}
  451. for feature in features_with_results:
  452. if not feature.get("search_result"):
  453. # 无搜索结果,跳过
  454. feature["result_evaluation"] = None
  455. results.append(feature)
  456. continue
  457. original_feature = self._get_original_feature(feature)
  458. search_word = feature.get("search_word", "")
  459. notes = feature["search_result"].get("data", {}).get("data", [])
  460. future = executor.submit(
  461. self.evaluate_search_results,
  462. original_feature,
  463. search_word,
  464. notes
  465. )
  466. future_to_feature[future] = feature
  467. # 收集结果
  468. for idx, future in enumerate(as_completed(future_to_feature), 1):
  469. feature = future_to_feature[future]
  470. try:
  471. evaluation = future.result()
  472. feature["result_evaluation"] = evaluation
  473. results.append(feature)
  474. logger.info(f" [{idx}/{len(future_to_feature)}] {feature.get('search_word')}: "
  475. f"relevance={evaluation['overall_relevance']:.3f}")
  476. except Exception as e:
  477. logger.error(f" 评估失败: {feature.get('search_word')}, 错误: {e}")
  478. feature["result_evaluation"] = None
  479. results.append(feature)
  480. logger.info(f"批量评估完成")
  481. return results
  482. def _get_original_feature(self, feature_node: Dict[str, Any]) -> str:
  483. """
  484. 从特征节点中获取原始特征名称
  485. Args:
  486. feature_node: 特征节点
  487. Returns:
  488. 原始特征名称
  489. """
  490. # 尝试从llm_evaluation中获取
  491. if "llm_evaluation" in feature_node:
  492. return feature_node["llm_evaluation"].get("original_feature", "")
  493. # 尝试从其他字段获取
  494. return feature_node.get("原始特征名称", feature_node.get("特征名称", ""))
  495. def test_evaluator():
  496. """测试评估器"""
  497. import os
  498. # 初始化客户端
  499. client = OpenRouterClient()
  500. evaluator = LLMEvaluator(client)
  501. # 测试搜索词评估
  502. print("\n=== 测试搜索词评估 ===")
  503. result = evaluator.evaluate_search_word(
  504. original_feature="拟人",
  505. search_word="宠物猫 猫咪"
  506. )
  507. print(f"评分: {result['score']:.3f}")
  508. print(f"理由: {result['reasoning']}")
  509. # 测试批量评估
  510. print("\n=== 测试批量评估 ===")
  511. results = evaluator.evaluate_search_words_batch(
  512. original_feature="拟人",
  513. search_words=["宠物猫 猫咪", "宠物猫 猫孩子", "宠物猫 猫"],
  514. max_workers=2
  515. )
  516. for r in results:
  517. print(f"{r['search_word']}: {r['score']:.3f} (rank={r['rank']})")
  518. if __name__ == "__main__":
  519. logging.basicConfig(
  520. level=logging.INFO,
  521. format='%(asctime)s - %(levelname)s - %(message)s'
  522. )
  523. test_evaluator()