search_eval.py 9.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191
  1. # -*- coding: utf-8 -*-
  2. """搜索 + 评估 · 任意 query → 多渠道搜索去重 → LLM 逐帖评估 → search_process/search_tools 表
  3. (方向由 --mode-type 决定:工序 → search_process,工具 → search_tools)
  4. ================================================================================
  5. 引擎函数全部只读复用 search_and_evaluate.py(搜索/去重/转写/评估/英平台翻译)。
  6. 用法(一般由 server.py 起子进程调):
  7. python pipeline/search_eval.py --query-id q0004 --query "AI 人像 图片 生成 怎么做"
  8. python pipeline/search_eval.py --query-id q0005 --query "GPT image2 评测" \
  9. --synonyms "GPT image2 测评,GPT image2 实测" --platforms xhs,gzh --max-count 10
  10. """
  11. import argparse
  12. import asyncio
  13. import copy
  14. import json
  15. import sys
  16. from pathlib import Path
  17. PROJECT_ROOT = Path(__file__).resolve().parents[3] # …/Agent
  18. sys.path.insert(0, str(PROJECT_ROOT))
  19. from dotenv import load_dotenv
  20. load_dotenv()
  21. from examples.process_pipeline.script.search_eval.search_and_evaluate import (
  22. search_all, evaluate_posts, transcribe_video_posts, build_query_overrides,
  23. )
  24. from examples.process_pipeline.script.llm_evaluate_sources import (
  25. build_eval_llm_call, EVAL_MODELS, DEFAULT_EVAL_MODEL, _format_post_for_eval,
  26. )
  27. from examples.process_pipeline.script.llm_helper import call_llm_with_retry
  28. HERE = Path(__file__).resolve().parent
  29. MW = HERE.parent
  30. sys.path.insert(0, str(MW))
  31. import db
  32. async def _rescore_query_relevance(source, query, llm_call, model, sem):
  33. """评估去重的轻量重算:只判「和 query 相关」{得分,理由}。
  34. 纯文本(不带图)、低 token —— 比整套多模态评估便宜得多。返回 (dict|None, cost)。"""
  35. post_block = _format_post_for_eval(source)
  36. system = ("你是内容评估器。只判断这篇帖子与给定【检索词】的相关程度——"
  37. "即「这帖是否回答/命中了这个检索词」。严格只输出一个 JSON 对象,"
  38. '形如 {"得分": <0到10的数字>, "理由": "<一句话>"},不要任何额外字段或解释。')
  39. user = f"【检索词】{query}\n\n【帖子】\n{post_block}"
  40. messages = [{"role": "system", "content": system},
  41. {"role": "user", "content": user}]
  42. def _v(d):
  43. if not isinstance(d, dict):
  44. return "需 JSON 对象"
  45. try:
  46. v = float(d.get("得分"))
  47. except (TypeError, ValueError):
  48. return "得分 缺失或非数字"
  49. return None if 0 <= v <= 10 else "得分需在 0-10"
  50. async with sem:
  51. data, cost = await call_llm_with_retry(
  52. llm_call=llm_call, messages=messages, model=model,
  53. temperature=0.1, max_tokens=300, validate_fn=_v,
  54. task_name=f"QueryRel[{source.get('case_id', '?')}]")
  55. return data, cost
  56. async def run(args):
  57. phrasings = [args.query] + [s.strip() for s in (args.synonyms or "").split(",") if s.strip()]
  58. # 去重保序
  59. seen, uniq = set(), []
  60. for q in phrasings:
  61. if q not in seen:
  62. seen.add(q); uniq.append(q)
  63. phrasings = uniq
  64. platforms = [p.strip() for p in args.platforms.split(",") if p.strip()]
  65. eval_llm, eval_model_id = build_eval_llm_call(args.eval_model)
  66. print(f"▶ {args.query_id} query={args.query!r} 措辞={phrasings} 渠道={platforms}")
  67. overrides = await build_query_overrides(platforms, phrasings, eval_llm, eval_model_id)
  68. sources = await search_all(platforms, phrasings, args.max_count,
  69. args.max_concurrent, query_overrides=overrides)
  70. print(f"🔎 去重后 {len(sources)} 帖")
  71. if not sources:
  72. print("❌ 搜索无结果"); return 1
  73. try:
  74. from examples.process_pipeline.script.extract_sources import _convert_timestamps
  75. _convert_timestamps(sources)
  76. except Exception:
  77. pass
  78. if not args.no_transcribe:
  79. n = await transcribe_video_posts(sources, concurrency=args.max_concurrent)
  80. if n:
  81. print(f"🎙️ 视频转写 {n} 条")
  82. table = "search_tools" if args.mode_type == "工具" else "search_process"
  83. # ── 评估去重 ────────────────────────────────────────────────────────────────
  84. # 评估的相关性含两子项:「和内容制作知识相关」(与 query 无关)与「和 query 相关」
  85. # (query 专属)。同帖在别的相似 query 下评过时,质量/通用相关/时效等 query 无关分
  86. # 可直接复用,只需用一次轻量纯文本调用重算「和 query 相关」,免去整套多模态评估,省钱。
  87. # --force-eval 跳过去重,全部走完整评估。
  88. cost = 0.0
  89. if not args.no_eval:
  90. prior = {}
  91. if not args.force_eval:
  92. for s in sources:
  93. e = db.fetch_existing_eval(s["case_id"], table)
  94. if e:
  95. prior[s["case_id"]] = e
  96. fresh = [s for s in sources if s["case_id"] not in prior]
  97. reused = [s for s in sources if s["case_id"] in prior]
  98. if reused:
  99. print(f"♻️ 评估去重:{len(reused)} 帖已评过 → 复用通用分+重算 query 相关分;"
  100. f"{len(fresh)} 帖走完整评估")
  101. if fresh:
  102. esc_llm = esc_model = None
  103. if args.escalate_model:
  104. esc_llm, esc_model = build_eval_llm_call(args.escalate_model)
  105. print(f"⬆️ 启用模糊带升级:{eval_model_id} 初评 → {esc_model} "
  106. f"复核(带 [{args.escalate_band[0]:g},{args.escalate_band[1]:g}])")
  107. _, c = await evaluate_posts(
  108. fresh, "", eval_llm, eval_model_id, args.max_concurrent,
  109. include_images=not args.no_images, max_images=args.max_images,
  110. image_mode=args.image_mode, query=args.query,
  111. escalate_llm=esc_llm, escalate_model=esc_model,
  112. escalate_band=tuple(args.escalate_band),
  113. ) # evaluate_posts 就地把 llm_evaluation 挂到各 source 上
  114. cost += c
  115. if reused:
  116. sem = asyncio.Semaphore(args.max_concurrent)
  117. rr = await asyncio.gather(*[
  118. _rescore_query_relevance(s, args.query, eval_llm, eval_model_id, sem)
  119. for s in reused])
  120. for s, (qr, c) in zip(reused, rr):
  121. cost += c
  122. blob = copy.deepcopy(prior[s["case_id"]])
  123. if qr is not None: # 重算成功才覆盖,失败则沿用旧 query 相关分
  124. blob.setdefault("相关性", {})["和 query 相关"] = {
  125. "得分": qr.get("得分"), "理由": qr.get("理由", "")}
  126. s["llm_evaluation"] = blob
  127. qr_s = (blob.get("相关性", {}).get("和 query 相关") or {}).get("得分", "?")
  128. print(f" ♻️ [query={qr_s}] {s['case_id'][:24]}")
  129. for s in sources:
  130. s.pop("_image_data_urls", None)
  131. n = db.upsert_search_posts(args.query_id, args.query, sources, table=table)
  132. print(f"🗄️ {table} 入库 {n} 行 · 方向 {args.mode_type or '工序'} · 评估成本 ${cost:.4f}")
  133. out_dir = MW / "runs" / table
  134. out_dir.mkdir(parents=True, exist_ok=True)
  135. (out_dir / f"{args.query_id}.json").write_text(json.dumps({
  136. "query_id": args.query_id, "query": args.query, "phrasings": phrasings,
  137. "platforms": platforms, "total": len(sources), "results": sources,
  138. }, ensure_ascii=False, indent=2), encoding="utf-8")
  139. return 0
  140. def main():
  141. p = argparse.ArgumentParser(description="搜索+评估 → search_process/search_tools")
  142. p.add_argument("--query-id", required=True, help="如 q0004(server 自动分配)")
  143. p.add_argument("--query", required=True, help="基准 query(评估锚点)")
  144. p.add_argument("--synonyms", default="", help="逗号分隔的同义措辞(可选)")
  145. p.add_argument("--mode-type", default="", choices=["", "工序", "工具"],
  146. help="解构方向,决定写哪张表(工具 → search_tools;其余 → search_process)")
  147. p.add_argument("--platforms", default="xhs,gzh")
  148. p.add_argument("--max-count", type=int, default=10)
  149. p.add_argument("--eval-model", default=DEFAULT_EVAL_MODEL, choices=list(EVAL_MODELS))
  150. p.add_argument("--escalate-model", default="", choices=[""] + list(EVAL_MODELS),
  151. help="模糊带升级用的强模型(如 sonnet);留空=不升级。初评落在 --escalate-band "
  152. "的可复现性/意图可控性帖子交此模型复核")
  153. p.add_argument("--escalate-band", type=float, nargs=2, default=[4.0, 6.0],
  154. metavar=("LO", "HI"), help="升级触发的闭区间,默认 4 6")
  155. p.add_argument("--max-concurrent", type=int, default=3)
  156. p.add_argument("--max-images", type=int, default=4)
  157. p.add_argument("--image-mode", choices=["url", "base64"], default="url")
  158. p.add_argument("--no-transcribe", action="store_true")
  159. p.add_argument("--no-eval", action="store_true")
  160. p.add_argument("--no-images", action="store_true")
  161. p.add_argument("--force-eval", action="store_true",
  162. help="跳过评估去重,所有帖都走完整评估(换 prompt/模型对比时用)")
  163. args = p.parse_args()
  164. raise SystemExit(asyncio.run(run(args)))
  165. if __name__ == "__main__":
  166. main()