llm_evaluator.py 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632
  1. #!/usr/bin/env python3
  2. # -*- coding: utf-8 -*-
  3. """
  4. LLM评估模块
  5. 用于评估搜索词质量和搜索结果相关度
  6. """
  7. import logging
  8. from typing import List, Dict, Any, Optional
  9. from concurrent.futures import ThreadPoolExecutor, as_completed
  10. from openrouter_client import OpenRouterClient
  11. logger = logging.getLogger(__name__)
  12. class LLMEvaluator:
  13. """LLM评估器"""
  14. def __init__(self, openrouter_client: OpenRouterClient):
  15. """
  16. 初始化评估器
  17. Args:
  18. openrouter_client: OpenRouter客户端实例
  19. """
  20. self.client = openrouter_client
  21. def evaluate_search_word(
  22. self,
  23. original_feature: str,
  24. search_word: str
  25. ) -> Dict[str, Any]:
  26. """
  27. 评估搜索词质量(阶段4)
  28. Args:
  29. original_feature: 原始特征名称
  30. search_word: 组合搜索词
  31. Returns:
  32. 评估结果
  33. """
  34. prompt = f"""你是一个小红书内容分析专家。
  35. # 任务说明
  36. 从给定关键词中提取并组合适合在小红书搜索的query词(目标是找到【{original_feature}】相关内容,但query中不能直接出现"{original_feature}")
  37. ## 可选词汇
  38. {search_word}
  39. ## 要求
  40. 1. 只能使用可选词汇中的词,可以进行以下变化:
  41. - 直接使用原词或括号内的同义词
  42. - 多个词组合
  43. - 适当精简
  44. 2. 不能添加可选词汇以外的新词
  45. 3. 按推荐程度排序(越靠前越推荐)
  46. ## 输出格式(JSON)
  47. {{
  48. "score": 0.75,
  49. "reasoning": "评估理由"
  50. }}
  51. 注意:只返回JSON,不要其他内容。"""
  52. result = self.client.chat_json(prompt=prompt, max_retries=3)
  53. if result:
  54. return {
  55. "score": result.get("score", 0.0),
  56. "reasoning": result.get("reasoning", ""),
  57. "original_feature": original_feature
  58. }
  59. else:
  60. logger.error(f"评估搜索词失败: {search_word}")
  61. return {
  62. "score": 0.0,
  63. "reasoning": "LLM评估失败",
  64. "original_feature": original_feature
  65. }
  66. def evaluate_search_words_batch(
  67. self,
  68. original_feature: str,
  69. search_words: List[str],
  70. max_workers: int = 5
  71. ) -> List[Dict[str, Any]]:
  72. """
  73. 批量评估搜索词(并行)
  74. Args:
  75. original_feature: 原始特征
  76. search_words: 搜索词列表
  77. max_workers: 最大并发数
  78. Returns:
  79. 评估结果列表(已排序)
  80. """
  81. logger.info(f"开始批量评估 {len(search_words)} 个搜索词...")
  82. results = []
  83. with ThreadPoolExecutor(max_workers=max_workers) as executor:
  84. # 提交任务
  85. future_to_word = {
  86. executor.submit(self.evaluate_search_word, original_feature, word): word
  87. for word in search_words
  88. }
  89. # 收集结果
  90. for idx, future in enumerate(as_completed(future_to_word), 1):
  91. word = future_to_word[future]
  92. try:
  93. result = future.result()
  94. result["search_word"] = word
  95. results.append(result)
  96. logger.info(f" [{idx}/{len(search_words)}] {word}: {result['score']:.3f}")
  97. except Exception as e:
  98. logger.error(f" 评估失败: {word}, 错误: {e}")
  99. results.append({
  100. "search_word": word,
  101. "score": 0.0,
  102. "reasoning": f"评估异常: {str(e)}",
  103. "original_feature": original_feature
  104. })
  105. # 按分数排序
  106. results.sort(key=lambda x: x["score"], reverse=True)
  107. # 添加排名
  108. for rank, result in enumerate(results, 1):
  109. result["rank"] = rank
  110. logger.info(f"批量评估完成,最高分: {results[0]['score']:.3f}")
  111. return results
  112. def evaluate_search_words_in_batches(
  113. self,
  114. original_feature: str,
  115. search_words: List[str],
  116. batch_size: int = 50
  117. ) -> List[Dict[str, Any]]:
  118. """
  119. 分批评估搜索词(每批N个,减少API调用)
  120. Args:
  121. original_feature: 原始特征
  122. search_words: 搜索词列表
  123. batch_size: 每批处理的搜索词数量,默认10
  124. Returns:
  125. 评估结果列表(已排序)
  126. """
  127. logger.info(f"开始分批评估 {len(search_words)} 个搜索词(每批 {batch_size} 个)...")
  128. all_results = []
  129. total_batches = (len(search_words) + batch_size - 1) // batch_size
  130. # 分批处理
  131. for batch_idx in range(total_batches):
  132. start_idx = batch_idx * batch_size
  133. end_idx = min(start_idx + batch_size, len(search_words))
  134. batch_words = search_words[start_idx:end_idx]
  135. logger.info(f" 处理第 {batch_idx + 1}/{total_batches} 批({len(batch_words)} 个搜索词)")
  136. # 从搜索词中提取所有独特的词作为可选词汇
  137. available_words_set = set()
  138. for word in batch_words:
  139. # 分割搜索词,提取单个词
  140. parts = word.split()
  141. available_words_set.update(parts)
  142. # 转换为列表并排序(保证稳定性)
  143. available_words = sorted(list(available_words_set))
  144. # 构建可选词汇字符串(逗号分隔)
  145. available_words_str = "、".join(available_words)
  146. prompt = f"""
  147. # 任务说明
  148. 从给定关键词中提取并组合适合在小红书搜索的query词(目标是找到【{original_feature}】相关内容,但query中不能直接出现"{original_feature}"二字)
  149. ## 可选词汇
  150. {available_words_str}
  151. ## 要求
  152. 1. 只能使用可选词汇中的词,可以进行以下变化:
  153. - 直接使用原词或括号内的同义词
  154. - 多个词组合
  155. - 适当精简
  156. 2. 不能添加可选词汇以外的新词
  157. 3. 按推荐程度排序(越靠前越推荐),取top10
  158. ## 输出格式(JSON):
  159. [
  160. {{
  161. "rank": 1,
  162. "search_word": "组合的搜索词",
  163. "source_word": "组合来源词,空格分割",
  164. "score": 0.85,
  165. "reasoning": "推荐理由"
  166. }},
  167. {{
  168. "index": 2,
  169. "search_word": "组合的搜索词",
  170. "source_word": "组合来源词,空格分割",
  171. "score": 0.80,
  172. "reasoning": "推荐理由"
  173. }}
  174. ]
  175. - 只返回JSON数组,不要其他内容"""
  176. # 调用LLM
  177. result = self.client.chat_json(prompt=prompt, max_retries=3)
  178. if result and isinstance(result, list):
  179. # 处理结果 - 新格式直接包含search_word
  180. for idx, item in enumerate(result):
  181. search_word = item.get("search_word", "")
  182. if search_word: # 确保有搜索词
  183. all_results.append({
  184. "search_word": search_word,
  185. "source_word": item.get("source_word", ""),
  186. "score": item.get("score", 0.0),
  187. "reasoning": item.get("reasoning", ""),
  188. "original_feature": original_feature
  189. })
  190. logger.info(f" [{start_idx + idx + 1}/{len(search_words)}] "
  191. f"{search_word}: {item.get('score', 0.0):.3f}")
  192. else:
  193. logger.error(f" 第 {batch_idx + 1} 批评估失败,跳过")
  194. # 为失败的批次添加默认结果(使用原搜索词)
  195. for word in batch_words:
  196. all_results.append({
  197. "search_word": word,
  198. "score": 0.0,
  199. "reasoning": "批量评估失败",
  200. "original_feature": original_feature
  201. })
  202. # 按分数排序
  203. all_results.sort(key=lambda x: x["score"], reverse=True)
  204. # 添加排名
  205. for rank, result in enumerate(all_results, 1):
  206. result["rank"] = rank
  207. logger.info(f"分批评估完成,最高分: {all_results[0]['score']:.3f} (总API调用: {total_batches} 次)")
  208. return all_results
  209. def evaluate_single_note(
  210. self,
  211. original_feature: str,
  212. search_word: str,
  213. note: Dict[str, Any],
  214. note_index: int = 0
  215. ) -> Dict[str, Any]:
  216. """
  217. 评估单个帖子(阶段6,多模态)
  218. Args:
  219. original_feature: 原始特征
  220. search_word: 搜索词
  221. note: 单个帖子
  222. note_index: 帖子索引
  223. Returns:
  224. 单个帖子的评估结果
  225. """
  226. card = note.get("note_card", {})
  227. title = card.get("display_title", "")
  228. desc = card.get("desc", "")[:500] # 限制长度
  229. images = card.get("image_list", [])[:10] # 最多10张图
  230. prompt = f"""你是一个小红书内容分析专家。
  231. 任务:评估这个帖子是否包含目标特征"{original_feature}"的元素
  232. 原始特征:"{original_feature}"
  233. 搜索词:"{search_word}"
  234. 帖子内容:
  235. 标题: {title}
  236. 正文: {desc}
  237. 请分析帖子的文字和图片内容,返回JSON格式:
  238. {{
  239. "relevance": 0.85, // 0.0-1.0,相关度
  240. "matched_elements": ["元素1", "元素2"], // 匹配的元素列表
  241. "reasoning": "简短的匹配理由"
  242. }}
  243. 只返回JSON,不要其他内容。"""
  244. result = self.client.chat_json(
  245. prompt=prompt,
  246. images=images if images else None,
  247. max_retries=3
  248. )
  249. if result:
  250. return {
  251. "note_index": note_index,
  252. "relevance": result.get("relevance", 0.0),
  253. "matched_elements": result.get("matched_elements", []),
  254. "reasoning": result.get("reasoning", "")
  255. }
  256. else:
  257. logger.error(f" 评估帖子 {note_index} 失败: {search_word}")
  258. return {
  259. "note_index": note_index,
  260. "relevance": 0.0,
  261. "matched_elements": [],
  262. "reasoning": "评估失败"
  263. }
  264. def evaluate_search_results_parallel(
  265. self,
  266. original_feature: str,
  267. search_word: str,
  268. notes: List[Dict[str, Any]],
  269. max_notes: int = 20,
  270. max_workers: int = 20
  271. ) -> Dict[str, Any]:
  272. """
  273. 并行评估搜索结果(每个帖子独立评估)
  274. Args:
  275. original_feature: 原始特征
  276. search_word: 搜索词
  277. notes: 帖子列表
  278. max_notes: 最多评估几条帖子
  279. max_workers: 最大并发数
  280. Returns:
  281. 评估结果汇总
  282. """
  283. if not notes:
  284. return {
  285. "overall_relevance": 0.0,
  286. "extracted_elements": [],
  287. "evaluated_notes": []
  288. }
  289. notes_to_eval = notes[:max_notes]
  290. evaluated_notes = []
  291. logger.info(f" 并行评估 {len(notes_to_eval)} 个帖子({max_workers}并发)")
  292. # 20并发评估每个帖子
  293. with ThreadPoolExecutor(max_workers=max_workers) as executor:
  294. futures = []
  295. for idx, note in enumerate(notes_to_eval):
  296. future = executor.submit(
  297. self.evaluate_single_note,
  298. original_feature,
  299. search_word,
  300. note,
  301. idx
  302. )
  303. futures.append(future)
  304. # 收集结果
  305. for future in as_completed(futures):
  306. try:
  307. result = future.result()
  308. evaluated_notes.append(result)
  309. except Exception as e:
  310. logger.error(f" 评估帖子失败: {e}")
  311. # 按note_index排序
  312. evaluated_notes.sort(key=lambda x: x['note_index'])
  313. # 汇总:计算整体相关度和提取元素
  314. if evaluated_notes:
  315. overall_relevance = sum(n['relevance'] for n in evaluated_notes) / len(evaluated_notes)
  316. # 提取所有元素并统计频次
  317. element_counts = {}
  318. for note in evaluated_notes:
  319. for elem in note['matched_elements']:
  320. element_counts[elem] = element_counts.get(elem, 0) + 1
  321. # 按频次排序,取前5个
  322. extracted_elements = sorted(
  323. element_counts.keys(),
  324. key=lambda x: element_counts[x],
  325. reverse=True
  326. )[:5]
  327. else:
  328. overall_relevance = 0.0
  329. extracted_elements = []
  330. return {
  331. "overall_relevance": overall_relevance,
  332. "extracted_elements": extracted_elements,
  333. "evaluated_notes": evaluated_notes
  334. }
  335. def evaluate_search_results(
  336. self,
  337. original_feature: str,
  338. search_word: str,
  339. notes: List[Dict[str, Any]],
  340. max_notes: int = 5,
  341. max_images_per_note: int = 10
  342. ) -> Dict[str, Any]:
  343. """
  344. 评估搜索结果(阶段6,多模态)
  345. Args:
  346. original_feature: 原始特征
  347. search_word: 搜索词
  348. notes: 帖子列表
  349. max_notes: 最多评估几条帖子
  350. max_images_per_note: 每条帖子最多取几张图片
  351. Returns:
  352. 评估结果
  353. """
  354. if not notes:
  355. return {
  356. "overall_relevance": 0.0,
  357. "extracted_elements": [],
  358. "recommended_extension": None,
  359. "evaluated_notes": []
  360. }
  361. # 限制评估数量
  362. notes_to_eval = notes[:max_notes]
  363. # 准备文本信息
  364. notes_info = []
  365. all_images = []
  366. for idx, note in enumerate(notes_to_eval):
  367. card = note.get("note_card", {})
  368. title = card.get("display_title", "")
  369. desc = card.get("desc", "")[:300] # 限制长度
  370. notes_info.append({
  371. "index": idx,
  372. "title": title,
  373. "desc": desc
  374. })
  375. # 收集图片
  376. images = card.get("image_list", [])[:max_images_per_note]
  377. all_images.extend(images)
  378. # 构建提示词
  379. notes_text = "\n\n".join([
  380. f"帖子 {n['index']}:\n标题: {n['title']}\n正文: {n['desc']}"
  381. for n in notes_info
  382. ])
  383. prompt = f"""你是一个小红书内容分析专家。
  384. 任务:评估搜索结果是否包含目标特征的元素
  385. 原始特征:"{original_feature}"
  386. 搜索词:"{search_word}"
  387. 帖子数量:{len(notes_to_eval)} 条
  388. 帖子内容:
  389. {notes_text}
  390. 请综合分析帖子的文字和图片内容,判断:
  391. 1. 这些搜索结果中是否包含与"{original_feature}"相似的元素
  392. 2. 提取最相关的元素关键词(2-4个字的词组)
  393. 3. 推荐最适合用于扩展搜索的关键词
  394. 返回JSON格式:
  395. {{
  396. "overall_relevance": 0.72, // 0.0-1.0,整体相关度
  397. "extracted_elements": ["关键词1", "关键词2", "关键词3"], // 提取的相似元素,按相关度排序
  398. "recommended_extension": "关键词1", // 最优的扩展关键词
  399. "evaluated_notes": [
  400. {{
  401. "note_index": 0, // 帖子索引
  402. "relevance": 0.85, // 该帖子的相关度
  403. "matched_elements": ["元素1", "元素2"], // 该帖子匹配的元素
  404. "reasoning": "简短的匹配理由"
  405. }}
  406. ]
  407. }}
  408. 注意:
  409. - extracted_elements 应该是帖子中实际包含的、与原始特征相似的元素
  410. - 优先提取在图片或文字中明显出现的元素
  411. - 只返回JSON,不要其他内容"""
  412. # 调用LLM(带图片)
  413. result = self.client.chat_json(
  414. prompt=prompt,
  415. images=all_images if all_images else None,
  416. max_retries=3
  417. )
  418. if result:
  419. # 确保返回完整格式
  420. return {
  421. "overall_relevance": result.get("overall_relevance", 0.0),
  422. "extracted_elements": result.get("extracted_elements", []),
  423. "recommended_extension": result.get("recommended_extension"),
  424. "evaluated_notes": result.get("evaluated_notes", [])
  425. }
  426. else:
  427. logger.error(f"评估搜索结果失败: {search_word}")
  428. return {
  429. "overall_relevance": 0.0,
  430. "extracted_elements": [],
  431. "recommended_extension": None,
  432. "evaluated_notes": []
  433. }
  434. def batch_evaluate_search_results(
  435. self,
  436. features_with_results: List[Dict[str, Any]],
  437. max_workers: int = 3
  438. ) -> List[Dict[str, Any]]:
  439. """
  440. 批量评估搜索结果(并行,但并发数较低以避免超时)
  441. Args:
  442. features_with_results: 带搜索结果的特征列表
  443. max_workers: 最大并发数
  444. Returns:
  445. 带评估结果的特征列表
  446. """
  447. logger.info(f"开始批量评估 {len(features_with_results)} 个搜索结果...")
  448. results = []
  449. with ThreadPoolExecutor(max_workers=max_workers) as executor:
  450. # 提交任务
  451. future_to_feature = {}
  452. for feature in features_with_results:
  453. if not feature.get("search_result"):
  454. # 无搜索结果,跳过
  455. feature["result_evaluation"] = None
  456. results.append(feature)
  457. continue
  458. original_feature = self._get_original_feature(feature)
  459. search_word = feature.get("search_word", "")
  460. notes = feature["search_result"].get("data", {}).get("data", [])
  461. future = executor.submit(
  462. self.evaluate_search_results,
  463. original_feature,
  464. search_word,
  465. notes
  466. )
  467. future_to_feature[future] = feature
  468. # 收集结果
  469. for idx, future in enumerate(as_completed(future_to_feature), 1):
  470. feature = future_to_feature[future]
  471. try:
  472. evaluation = future.result()
  473. feature["result_evaluation"] = evaluation
  474. results.append(feature)
  475. logger.info(f" [{idx}/{len(future_to_feature)}] {feature.get('search_word')}: "
  476. f"relevance={evaluation['overall_relevance']:.3f}")
  477. except Exception as e:
  478. logger.error(f" 评估失败: {feature.get('search_word')}, 错误: {e}")
  479. feature["result_evaluation"] = None
  480. results.append(feature)
  481. logger.info(f"批量评估完成")
  482. return results
  483. def _get_original_feature(self, feature_node: Dict[str, Any]) -> str:
  484. """
  485. 从特征节点中获取原始特征名称
  486. Args:
  487. feature_node: 特征节点
  488. Returns:
  489. 原始特征名称
  490. """
  491. # 尝试从llm_evaluation中获取
  492. if "llm_evaluation" in feature_node:
  493. return feature_node["llm_evaluation"].get("original_feature", "")
  494. # 尝试从其他字段获取
  495. return feature_node.get("原始特征名称", feature_node.get("特征名称", ""))
  496. def test_evaluator():
  497. """测试评估器"""
  498. import os
  499. # 初始化客户端
  500. client = OpenRouterClient()
  501. evaluator = LLMEvaluator(client)
  502. # 测试搜索词评估
  503. print("\n=== 测试搜索词评估 ===")
  504. result = evaluator.evaluate_search_word(
  505. original_feature="拟人",
  506. search_word="宠物猫 猫咪"
  507. )
  508. print(f"评分: {result['score']:.3f}")
  509. print(f"理由: {result['reasoning']}")
  510. # 测试批量评估
  511. print("\n=== 测试批量评估 ===")
  512. results = evaluator.evaluate_search_words_batch(
  513. original_feature="拟人",
  514. search_words=["宠物猫 猫咪", "宠物猫 猫孩子", "宠物猫 猫"],
  515. max_workers=2
  516. )
  517. for r in results:
  518. print(f"{r['search_word']}: {r['score']:.3f} (rank={r['rank']})")
  519. if __name__ == "__main__":
  520. logging.basicConfig(
  521. level=logging.INFO,
  522. format='%(asctime)s - %(levelname)s - %(message)s'
  523. )
  524. test_evaluator()