search_and_evaluate.py 27 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588
  1. """
  2. 独立脚本:query 词多渠道搜索 + qwen/LLM 逐条评估(不经过 agent,也不写 source.json)
  3. 定位:一个自包含的「搜 + 评」工具。
  4. 输入 :一组 query 词(json 文件)
  5. 可选 :先用 LLM(默认 qwen)改写 / 扩展 query
  6. 搜索 :直接调各渠道 search_impl 拿 post 详情(绕开 agent)
  7. 评估 :把每条 post 详情交给 LLM,按 rubric 逐条评估
  8. 输出 :evaluated.json —— 每条 = post 详情 + llm_evaluation
  9. 与 run_pipeline / llm_evaluate_sources 的区别:这里不维护 source.json / filtered_cases,
  10. 不做跨轮去重与回写,只产出一份评估结果,方便单独跑、单独看。
  11. 评估的 rubric prompt 与单帖评估逻辑仍复用 llm_evaluate_sources(忠实 rubric)。
  12. queries.json 支持两种格式:
  13. ["query1", "query2", ...]
  14. {"requirement": "采集目标描述", "queries": ["query1", "query2", ...]}
  15. 典型用法:
  16. # 直接用给定 query 搜索 + qwen 评估
  17. python search_and_evaluate.py --queries q.json --platforms xhs,zhihu \
  18. --output-dir scratch/run1 --eval-model qwen
  19. # 先让 qwen 改写 query 再搜
  20. python search_and_evaluate.py --queries q.json --platforms xhs \
  21. --output-dir scratch/run1 --gen-query --gen-model qwen --keep-original
  22. # 只搜不评估
  23. python search_and_evaluate.py --queries q.json --platforms xhs --output-dir d --no-eval
  24. """
  25. import argparse
  26. import asyncio
  27. import json
  28. import logging
  29. import sys
  30. from pathlib import Path
  31. from typing import Any, Callable, Dict, List, Optional, Tuple
  32. _PROJECT_ROOT = Path(__file__).resolve().parents[3]
  33. if str(_PROJECT_ROOT) not in sys.path:
  34. sys.path.insert(0, str(_PROJECT_ROOT))
  35. from examples.process_pipeline.script.llm_helper import call_llm_with_retry
  36. logger = logging.getLogger(__name__)
  37. # ── queries 加载 ────────────────────────────────────────────────────────────────
  38. def load_queries(path: Path) -> Tuple[List[str], str]:
  39. """读 queries.json,返回 (queries, requirement)。requirement 可能为空串。"""
  40. with open(path, "r", encoding="utf-8") as f:
  41. data = json.load(f)
  42. if isinstance(data, list):
  43. return [str(q).strip() for q in data if str(q).strip()], ""
  44. if isinstance(data, dict):
  45. raw = data.get("queries") or []
  46. queries = [str(q).strip() for q in raw if str(q).strip()]
  47. return queries, str(data.get("requirement") or "").strip()
  48. raise ValueError(f"无法识别的 queries 文件格式: {path}(应为数组或 {{queries:[...]}})")
  49. # ── query 生成 / 改写 ───────────────────────────────────────────────────────────
  50. def _validate_gen(data: Dict[str, Any]) -> Optional[str]:
  51. qs = data.get("queries")
  52. if not isinstance(qs, list) or not qs:
  53. return "queries 必须是非空数组"
  54. if not all(isinstance(q, str) and q.strip() for q in qs):
  55. return "queries 每一项必须是非空字符串"
  56. return None
  57. async def generate_queries(
  58. base_queries: List[str],
  59. requirement: str,
  60. llm_call: Callable,
  61. model: str,
  62. target_count: int,
  63. ) -> Tuple[List[str], float]:
  64. """让 LLM 基于采集需求改写 / 扩展已有 query。返回 (新 query 列表, cost)。"""
  65. system = (
  66. "你是内容采集的搜索词优化器。基于采集需求和已有 query,产出一组更适合在"
  67. "社媒/内容平台搜索框直接使用的关键词:覆盖同义表达、具体工具名、典型用法场景,"
  68. "去掉过于宽泛或重复的词。只输出一个 JSON 对象,不要解释、不要 markdown。"
  69. )
  70. user = (
  71. f"【采集需求 / 目标格子】\n{requirement or '(未提供,参考已有 query 自行归纳)'}\n\n"
  72. f"【已有 query】\n{json.dumps(base_queries, ensure_ascii=False)}\n\n"
  73. f"【要求】产出约 {target_count} 个改写 / 扩展后的搜索词,输出格式:\n"
  74. '{"queries": ["词1", "词2", ...]}\n'
  75. "词应简短(适合搜索框)、彼此不同、贴合『制作做法』而非泛泛话题。只输出 JSON。"
  76. )
  77. data, cost = await call_llm_with_retry(
  78. llm_call=llm_call,
  79. messages=[{"role": "system", "content": system},
  80. {"role": "user", "content": user}],
  81. model=model, temperature=0.5, max_tokens=1500,
  82. validate_fn=_validate_gen, task_name="GenQuery",
  83. )
  84. if not data:
  85. logger.warning("query 生成失败,回退使用原始 query")
  86. return [], cost
  87. out, seen = [], set()
  88. for q in data["queries"]:
  89. q = q.strip()
  90. if q and q not in seen:
  91. seen.add(q)
  92. out.append(q)
  93. return out, cost
  94. # ── 渠道搜索 ────────────────────────────────────────────────────────────────────
  95. # 英文为主的平台:搜索前把中文 query 翻成英文(其余平台用中文原词)
  96. _EN_PLATFORMS = {"youtube", "x"}
  97. def _validate_translation(data: Dict[str, Any], n: int) -> Optional[str]:
  98. t = data.get("translations")
  99. if not isinstance(t, list) or len(t) != n:
  100. return f"translations 必须是长度 {n} 的数组"
  101. if not all(isinstance(x, str) and x.strip() for x in t):
  102. return "translations 每项必须是非空字符串"
  103. return None
  104. async def build_query_overrides(
  105. platforms: List[str], queries: List[str], llm_call: Callable, model: str,
  106. ) -> Dict[Tuple[str, str], str]:
  107. """为英文平台(youtube/x)把 queries 翻成英文搜索词,返回 {(platform, q): en_q}。
  108. 无英文平台 → 返回空 dict(不调用 LLM)。翻译失败 → 回退用原中文(返回空,按原词搜)。
  109. """
  110. en_plats = [p for p in platforms if p in _EN_PLATFORMS]
  111. if not en_plats or not queries:
  112. return {}
  113. uniq = list(dict.fromkeys(queries))
  114. system = (
  115. "你是搜索词翻译器。把每个中文搜索词组翻译成地道的英文搜索关键词,"
  116. "面向 YouTube / X(Twitter) 搜索框:用该领域英文母语者真实会搜的说法(含常用工具/术语英文名),"
  117. "简短、可直接搜,不要整句解释。只输出 JSON。"
  118. )
  119. user = (
  120. "把下面中文搜索词逐个翻译成英文搜索关键词,顺序一一对应,输出:\n"
  121. '{"translations": ["en1", "en2", ...]}\n\n'
  122. f"中文词(共 {len(uniq)} 个):\n{json.dumps(uniq, ensure_ascii=False, indent=2)}"
  123. )
  124. data, _ = await call_llm_with_retry(
  125. llm_call=llm_call, messages=[{"role": "system", "content": system},
  126. {"role": "user", "content": user}],
  127. model=model, temperature=0.2, max_tokens=1500,
  128. validate_fn=lambda d: _validate_translation(d, len(uniq)), task_name="TranslateQuery",
  129. )
  130. if not data:
  131. logger.warning("query 翻译失败,英文平台回退用中文原词")
  132. return {}
  133. zh2en = {zh: en.strip() for zh, en in zip(uniq, data["translations"])}
  134. overrides = {(p, q): zh2en[q] for p in en_plats for q in queries if q in zh2en}
  135. print(f"🌐 英文平台 {en_plats} query 翻译:" +
  136. ";".join(f"{q}→{zh2en[q]}" for q in uniq[:5]) + (" …" if len(uniq) > 5 else ""))
  137. return overrides
  138. def _post_cid(post: Dict[str, Any]) -> Optional[str]:
  139. cid = post.get("channel_content_id") or post.get("video_id")
  140. if cid:
  141. return str(cid)
  142. link = post.get("link") or post.get("url")
  143. return str(link) if link else None
  144. async def _search_one(pdef, keyword: str, orig_q: str, max_count: int, sem: asyncio.Semaphore):
  145. """跑一次搜索(实际用 keyword,但回溯仍记原始 orig_q)。返回 (platform, orig_q, posts)。
  146. keyword 可能是为英文平台翻译后的词;orig_q 是用户的原始 query,用于 found_by_queries
  147. 与跨渠道/跨形式对比的统一锚定。
  148. """
  149. async with sem:
  150. try:
  151. result = await pdef.search_impl(
  152. platform_id=pdef.id, keyword=keyword, max_count=max_count,
  153. cursor="", extras=None,
  154. )
  155. except Exception as e:
  156. logger.warning("search 失败 [%s/%s]: %s", pdef.id, keyword, e)
  157. return pdef.id, orig_q, []
  158. if getattr(result, "error", None):
  159. logger.warning("search 返回错误 [%s/%s]: %s", pdef.id, keyword, result.error)
  160. return pdef.id, orig_q, []
  161. posts = (result.metadata or {}).get("posts", []) or []
  162. return pdef.id, orig_q, posts
  163. async def search_all(
  164. platforms: List[str], queries: List[str], max_count: int, max_concurrent: int,
  165. query_overrides: Optional[Dict[Tuple[str, str], str]] = None,
  166. ) -> List[Dict[str, Any]]:
  167. """对所有 (platform × query) 组合并发搜索,按 (platform, cid) 去重。
  168. query_overrides: {(platform, query): 实际搜索词}。命中则用覆盖词搜(如英文平台用译文),
  169. 未命中用原 query。found_by_queries 始终记原 query。
  170. 返回 source_dict 列表,每条带:case_id / platform / channel_content_id /
  171. source_url / post / comments / found_by_queries(命中它的 query,用于回溯 query 质量)。
  172. """
  173. import agent.tools.builtin.content.tools # noqa: F401 触发平台自注册
  174. from agent.tools.builtin.content.registry import get_platform, all_platforms
  175. pdefs = []
  176. for p in platforms:
  177. pdef = get_platform(p)
  178. if not pdef:
  179. avail = ", ".join(x.id for x in all_platforms())
  180. raise ValueError(f"未知平台 '{p}'。可用: {avail}")
  181. if not pdef.search_impl:
  182. raise ValueError(f"平台 '{p}' 不支持搜索")
  183. pdefs.append(pdef)
  184. ov = query_overrides or {}
  185. sem = asyncio.Semaphore(max_concurrent)
  186. tasks = [_search_one(pdef, ov.get((pdef.id, q), q), q, max_count, sem)
  187. for pdef in pdefs for q in queries]
  188. print(f"🔎 搜索 {len(pdefs)} 渠道 × {len(queries)} query = {len(tasks)} 次请求 (并发 {max_concurrent})")
  189. results = await asyncio.gather(*tasks)
  190. # youtube / sph 的 post 字段与通用 schema 不同(youtube: video_id/description_snippet/
  191. # thumbnails/url),统一 normalize 成 channel_content_id/body_text/images/link,
  192. # 否则下游取 cid / 正文 / 图片会漏。其他平台 no-op。
  193. try:
  194. from examples.process_pipeline.script.extract_sources import _normalize_post_in_place
  195. except Exception:
  196. _normalize_post_in_place = None
  197. collected: Dict[Tuple[str, str], Dict[str, Any]] = {}
  198. per_query_counts: Dict[str, int] = {}
  199. for platform, query, posts in results:
  200. per_query_counts[f"{platform}/{query}"] = len(posts)
  201. for post in posts:
  202. if not isinstance(post, dict):
  203. continue
  204. if _normalize_post_in_place:
  205. try:
  206. _normalize_post_in_place(platform, post)
  207. except Exception:
  208. pass
  209. cid = _post_cid(post)
  210. if not cid:
  211. continue
  212. key = (platform, cid)
  213. if key in collected:
  214. collected[key]["found_by_queries"].append(query)
  215. continue
  216. link = post.get("link") or post.get("url") or ""
  217. collected[key] = {
  218. "case_id": f"{platform}_{cid}",
  219. "platform": platform,
  220. "channel_content_id": cid,
  221. "source_url": link,
  222. "post": post,
  223. "comments": post.get("author_comments", []) or [],
  224. "found_by_queries": [query],
  225. }
  226. print(" 每个 (渠道/query) 命中数:")
  227. for k, n in sorted(per_query_counts.items()):
  228. print(f" - {k}: {n}")
  229. print(f" 去重后唯一 post:{len(collected)}")
  230. return list(collected.values())
  231. # ── 图片获取(下载转 base64 data URL,喂多模态评估)──────────────────────────────
  232. import base64
  233. _MIME_BY_EXT = {".png": "image/png", ".webp": "image/webp", ".gif": "image/gif"}
  234. def _collect_post_image_urls(post: Dict[str, Any], max_images: int) -> List[str]:
  235. """从 post 收集图片 URL(复用 generate_case 的字段映射),截断到 max_images。"""
  236. try:
  237. from examples.process_pipeline.script.generate_case import _extract_raw_images
  238. urls = _extract_raw_images(post, post.get("channel") or "")
  239. except Exception:
  240. urls = post.get("images") or []
  241. urls = [u for u in urls if isinstance(u, str) and u.startswith("http")]
  242. return urls[:max_images]
  243. async def _fetch_data_url(url: str, sem: asyncio.Semaphore) -> Optional[str]:
  244. """下载单张图片转 base64 data URL(用项目 _download_image,带 Referer/UA 绕防盗链)。"""
  245. from agent.tools.builtin.file.image_cdn import _download_image
  246. async with sem:
  247. try:
  248. data = await _download_image(url)
  249. except Exception as e:
  250. logger.warning("图片下载失败 %s: %s", url[:60], e)
  251. return None
  252. if not data:
  253. return None
  254. ext = next((e for e in _MIME_BY_EXT if url.lower().split("?")[0].endswith(e)), "")
  255. mime = _MIME_BY_EXT.get(ext, "image/jpeg")
  256. b64 = base64.b64encode(data).decode("ascii")
  257. return f"data:{mime};base64,{b64}"
  258. async def _attach_image_refs(
  259. sources: List[Dict[str, Any]], max_images: int, max_concurrent: int, mode: str,
  260. ) -> int:
  261. """为每条 source 收集配图,挂到 s['_image_data_urls'](评估后会清掉,不写进报告)。
  262. mode="url" :直接用 http 图片 URL,让 OpenRouter/Gemini 服务端去抓(实测 xhs 可直连,最快)。
  263. mode="base64":下载图片转 base64 data URL(带 Referer/UA 绕防盗链,最稳,payload 较大)。
  264. """
  265. if mode == "url":
  266. total = 0
  267. for s in sources:
  268. urls = _collect_post_image_urls(s.get("post", {}) or {}, max_images)
  269. s["_image_data_urls"] = urls
  270. total += len(urls)
  271. return total
  272. # base64 模式:并发下载
  273. sem = asyncio.Semaphore(max_concurrent)
  274. plan: List[Tuple[Dict[str, Any], List[str]]] = []
  275. for s in sources:
  276. urls = _collect_post_image_urls(s.get("post", {}) or {}, max_images)
  277. plan.append((s, urls))
  278. flat = [(s, u) for s, urls in plan for u in urls]
  279. results = await asyncio.gather(*[_fetch_data_url(u, sem) for _, u in flat])
  280. bucket: Dict[int, List[str]] = {}
  281. for (s, _), data_url in zip(flat, results):
  282. if data_url:
  283. bucket.setdefault(id(s), []).append(data_url)
  284. total = 0
  285. for s in sources:
  286. s["_image_data_urls"] = bucket.get(id(s), [])
  287. total += len(s["_image_data_urls"])
  288. return total
  289. # ── 视频字幕:转写并并入正文(与 run_pipeline / extract_sources 同口径)──────────
  290. async def transcribe_video_posts(sources: List[Dict[str, Any]], concurrency: int = 3) -> int:
  291. """对视频帖跑 Deepgram 转写,把字幕并入 body_text(带 [视频字幕] 标记)再评估。
  292. 视频帖(抖音/视频号/youtube 等)正文常为空或只有 hashtag,真正内容在视频里。
  293. 复用 extract_sources 的逻辑:有视频源且还没 video_transcript 字段的帖子才转写,
  294. 转写结果合并进 body_text —— 这样评估(_format_post_for_eval 读 body_text)就能看到视频内容。
  295. 返回本次获得字幕的条数。
  296. """
  297. try:
  298. from examples.process_pipeline.script.extract_sources import (
  299. _transcribe_pending_async, _merge_transcript_into_body,
  300. )
  301. except Exception as e:
  302. logger.warning("转写模块导入失败,跳过: %s", e)
  303. return 0
  304. try:
  305. updates = await _transcribe_pending_async(sources, concurrency=concurrency)
  306. except Exception as e:
  307. logger.warning("视频转写失败,跳过: %s", e)
  308. return 0
  309. for s in sources:
  310. post = s.get("post")
  311. if isinstance(post, dict) and post.get("video_transcript"):
  312. merged = _merge_transcript_into_body(post)
  313. if merged is not post:
  314. post["body_text"] = merged.get("body_text", post.get("body_text", ""))
  315. return len(updates)
  316. # ── 逐条评估(直评,不写 source.json)─────────────────────────────────────────────
  317. async def evaluate_posts(
  318. sources: List[Dict[str, Any]],
  319. requirement: str,
  320. llm_call: Callable,
  321. model: str,
  322. max_concurrent: int,
  323. include_images: bool = True,
  324. max_images: int = 4,
  325. image_mode: str = "url",
  326. query: Optional[str] = None,
  327. ) -> Tuple[List[Dict[str, Any]], float]:
  328. """对每条 post 用 rubric 逐条评估,把 llm_evaluation 挂到 source 上。返回 (sources, total_cost)。
  329. 复用 llm_evaluate_sources 的 rubric 加载与单帖评估逻辑,保证与管线评估口径一致。
  330. query 非空时作为检索锚点喂给模型(判相关性看『这帖是否回答了这个检索词』)。
  331. include_images=True 时把帖子配图一并发给模型做多模态评估:
  332. image_mode="url" → 直接传图片 URL(最快,实测 xhs 可直连)
  333. image_mode="base64"→ 下载转 base64(最稳,绕防盗链)
  334. 评估失败(重试耗尽)的条目标 error 标记并保留,不丢。
  335. """
  336. from examples.process_pipeline.script.llm_evaluate_sources import _evaluate_one
  337. # rubric 详解 / 输出 schema 已固化在 eval_prompt_template.md, 不再 load 外部 rubric 文件
  338. sem = asyncio.Semaphore(max_concurrent)
  339. if include_images:
  340. n_img = await _attach_image_refs(sources, max_images, max_concurrent * 2, image_mode)
  341. verb = "直传 URL" if image_mode == "url" else "下载 base64"
  342. print(f"🖼️ 配图 {n_img} 张(每帖≤{max_images},{verb})用于多模态评估")
  343. print(f"🧠 逐条评估 {len(sources)} 条 (并发 {max_concurrent}) ...")
  344. results = await asyncio.gather(*[
  345. _evaluate_one(s, requirement, llm_call, model, sem,
  346. image_urls=(s.get("_image_data_urls") if include_images else None),
  347. query=query)
  348. for s in sources
  349. ])
  350. # mod.md 中文 schema 不再有 decision/instructive_pass 字段 —— rep/dis 统计基于
  351. # 制作相关性得分(1=明确无关 → 视作 discard 候选, 2/3 视作 report 候选)。失败仍按 error 标记。
  352. total_cost = 0.0
  353. rep = dis = failed = 0
  354. for s, (llm_eval, cost) in zip(sources, results):
  355. total_cost += cost
  356. if llm_eval is None:
  357. s["llm_evaluation"] = {"_error": True, "_reason": "llm_eval_failed"}
  358. failed += 1
  359. pr_score = "?"
  360. else:
  361. s["llm_evaluation"] = llm_eval
  362. pr_obj = llm_eval.get("制作相关性") or {}
  363. pr_score = pr_obj.get("得分", "?")
  364. try:
  365. if int(float(pr_score)) <= 1:
  366. dis += 1
  367. else:
  368. rep += 1
  369. except (TypeError, ValueError):
  370. rep += 1 # 无法解析得分时按 report 处理(prompt 引导失败,不强淘)
  371. title = (s.get("post", {}) or {}).get("title", "")[:30]
  372. print(f" - [制作相关性={pr_score}] {s['case_id'][:24]} {title}")
  373. print(f" 汇总: 制作相关>=2 {rep} / 明确无关(1) {dis} / 评估失败 {failed} / cost=${total_cost:.4f}")
  374. return sources, total_cost
  375. # ── 主流程 ────────────────────────────────────────────────────────────────────
  376. async def run(args: argparse.Namespace) -> None:
  377. from examples.process_pipeline.script.llm_evaluate_sources import build_eval_llm_call
  378. queries, req_from_file = load_queries(Path(args.queries))
  379. if not queries:
  380. print("❌ queries 为空"); sys.exit(1)
  381. platforms = [p.strip() for p in args.platforms.split(",") if p.strip()]
  382. if not platforms:
  383. print("❌ 未指定 --platforms"); sys.exit(1)
  384. requirement = args.requirement or req_from_file or (";".join(queries))[:200]
  385. output_dir = Path(args.output_dir)
  386. output_dir.mkdir(parents=True, exist_ok=True)
  387. print(f"📋 需求: {requirement[:80]}")
  388. print(f"📡 渠道: {platforms} | 原始 query 数: {len(queries)}")
  389. total_cost = 0.0
  390. # 1. 可选:LLM 改写 / 扩展 query
  391. if args.gen_query:
  392. gen_llm, gen_model_id = build_eval_llm_call(args.gen_model)
  393. print(f"✍️ query 生成模型: {args.gen_model} -> {gen_model_id}")
  394. gen_queries, gen_cost = await generate_queries(
  395. queries, requirement, gen_llm, gen_model_id, args.gen_count,
  396. )
  397. total_cost += gen_cost
  398. if gen_queries:
  399. search_queries = list(dict.fromkeys(
  400. (queries + gen_queries) if args.keep_original else gen_queries
  401. ))
  402. (output_dir / "generated_queries.json").write_text(
  403. json.dumps({"requirement": requirement, "original": queries,
  404. "generated": gen_queries, "used": search_queries},
  405. ensure_ascii=False, indent=2), encoding="utf-8",
  406. )
  407. print(f" 生成 {len(gen_queries)} 个 query,实际搜索 {len(search_queries)} 个"
  408. f"({'含' if args.keep_original else '不含'}原始)→ generated_queries.json")
  409. else:
  410. search_queries = queries
  411. print(" 生成失败,回退原始 query")
  412. else:
  413. search_queries = queries
  414. # 2. 多渠道搜索(英文平台 youtube/x 先把 query 翻成英文)
  415. tr_llm, tr_model = build_eval_llm_call("gemini-flash-lite")
  416. overrides = await build_query_overrides(platforms, search_queries, tr_llm, tr_model)
  417. sources = await search_all(platforms, search_queries, args.max_count, args.max_concurrent,
  418. query_overrides=overrides)
  419. # 3. 时间戳转可读
  420. try:
  421. from examples.process_pipeline.script.extract_sources import _convert_timestamps
  422. _convert_timestamps(sources)
  423. except Exception as e:
  424. logger.warning("时间戳转换跳过: %s", e)
  425. # 4. 视频帖转写:把字幕并入正文再评估(默认开,--no-transcribe 关)
  426. if not args.no_transcribe and sources:
  427. n = await transcribe_video_posts(sources, concurrency=args.max_concurrent)
  428. if n:
  429. print(f"🎙️ 视频转写: {n} 条获得字幕并并入正文")
  430. # 5. 评估(除非 --no-eval)
  431. eval_model_id = None
  432. include_images = not args.no_images
  433. if not args.no_eval and sources:
  434. eval_llm, eval_model_id = build_eval_llm_call(args.eval_model)
  435. print(f"🧠 评估模型: {args.eval_model} -> {eval_model_id} | 多模态图片: {'开' if include_images else '关'}")
  436. sources, eval_cost = await evaluate_posts(
  437. sources, requirement, eval_llm, eval_model_id, args.max_concurrent,
  438. include_images=include_images, max_images=args.max_images,
  439. image_mode=args.image_mode,
  440. )
  441. total_cost += eval_cost
  442. # 清掉评估用的临时 base64 图(别写进报告,否则文件爆炸),只留张数留痕
  443. for s in sources:
  444. imgs = s.pop("_image_data_urls", None)
  445. if imgs is not None:
  446. s["images_sent"] = len(imgs)
  447. # 6. 写 evaluated.json
  448. out_file = output_dir / "evaluated.json"
  449. # mod.md 中文 schema 无 decision 字段; 统计基于 制作相关性.得分
  450. # (>=2 视作 report 候选, ==1 视作 discard 候选, 失败/无字段不计)
  451. def _pr_score(s):
  452. ev = s.get("llm_evaluation") or {}
  453. if ev.get("_error"):
  454. return None
  455. try:
  456. return int(float((ev.get("制作相关性") or {}).get("得分")))
  457. except (TypeError, ValueError):
  458. return None
  459. rep = sum(1 for s in sources if (_pr_score(s) or 0) >= 2)
  460. dis = sum(1 for s in sources if _pr_score(s) == 1)
  461. out = {
  462. "requirement": requirement,
  463. "platforms": platforms,
  464. "queries_used": search_queries,
  465. "eval_model": eval_model_id,
  466. "total": len(sources),
  467. "report": rep,
  468. "discard": dis,
  469. "results": sources,
  470. }
  471. with open(out_file, "w", encoding="utf-8") as f:
  472. json.dump(out, f, ensure_ascii=False, indent=2)
  473. print(f"💾 evaluated.json: {len(sources)} 条 (report={rep} discard={dis}) → {out_file}")
  474. print(f"💰 累计成本: ${total_cost:.4f}")
  475. def main() -> None:
  476. from dotenv import load_dotenv
  477. load_dotenv()
  478. from examples.process_pipeline.script.llm_evaluate_sources import EVAL_MODELS, DEFAULT_EVAL_MODEL
  479. parser = argparse.ArgumentParser(description="query 词多渠道直搜 + LLM 逐条评估")
  480. parser.add_argument("--queries", required=True, help="query 词 json 文件(数组或 {queries:[...]})")
  481. parser.add_argument("--platforms", required=True, help="逗号分隔渠道,如 xhs,zhihu,gzh,douyin,sph,youtube")
  482. parser.add_argument("--output-dir", required=True, help="输出目录(写 evaluated.json)")
  483. parser.add_argument("--max-count", type=int, default=20, help="每个 (渠道,query) 返回条数上限(部分渠道如 xhs 由后端决定)")
  484. parser.add_argument("--requirement", default="", help="评估用采集目标描述(缺省取 queries 文件 requirement 或 query 拼接)")
  485. parser.add_argument("--eval-model", default=DEFAULT_EVAL_MODEL, choices=list(EVAL_MODELS),
  486. help=f"评估模型(默认 {DEFAULT_EVAL_MODEL})")
  487. parser.add_argument("--max-concurrent", type=int, default=3, help="搜索 / 评估并发上限")
  488. parser.add_argument("--no-eval", action="store_true", help="只搜索,跳过 LLM 评估")
  489. parser.add_argument("--no-images", action="store_true", help="不把帖子配图发给模型(默认发,多模态评估;纯文本模型请加此项)")
  490. parser.add_argument("--max-images", type=int, default=4, help="每帖最多发给模型几张配图(默认 4)")
  491. parser.add_argument("--image-mode", choices=["url", "base64"], default="url",
  492. help="图片传输方式:url 直传(快,默认) / base64 下载内嵌(稳,绕防盗链)")
  493. parser.add_argument("--no-transcribe", action="store_true",
  494. help="不对视频帖跑转写(默认会转写并把字幕并入正文再评估)")
  495. # query 生成模式
  496. parser.add_argument("--gen-query", action="store_true", help="搜索前用 LLM 改写 / 扩展 query")
  497. parser.add_argument("--gen-model", default=DEFAULT_EVAL_MODEL, choices=list(EVAL_MODELS),
  498. help=f"query 生成模型(默认 {DEFAULT_EVAL_MODEL})")
  499. parser.add_argument("--gen-count", type=int, default=10, help="生成 query 的目标数量")
  500. parser.add_argument("--keep-original", action="store_true", help="生成的 query 与原始 query 合并搜索(默认只用生成的)")
  501. args = parser.parse_args()
  502. asyncio.run(run(args))
  503. if __name__ == "__main__":
  504. main()