""" 独立脚本:query 词多渠道搜索 + qwen/LLM 逐条评估(不经过 agent,也不写 source.json) 定位:一个自包含的「搜 + 评」工具。 输入 :一组 query 词(json 文件) 可选 :先用 LLM(默认 qwen)改写 / 扩展 query 搜索 :直接调各渠道 search_impl 拿 post 详情(绕开 agent) 评估 :把每条 post 详情交给 LLM,按 rubric 逐条评估 输出 :evaluated.json —— 每条 = post 详情 + llm_evaluation 与 run_pipeline / llm_evaluate_sources 的区别:这里不维护 source.json / filtered_cases, 不做跨轮去重与回写,只产出一份评估结果,方便单独跑、单独看。 评估的 rubric prompt 与单帖评估逻辑仍复用 llm_evaluate_sources(忠实 rubric)。 queries.json 支持两种格式: ["query1", "query2", ...] {"requirement": "采集目标描述", "queries": ["query1", "query2", ...]} 典型用法: # 直接用给定 query 搜索 + qwen 评估 python search_and_evaluate.py --queries q.json --platforms xhs,zhihu \ --output-dir scratch/run1 --eval-model qwen # 先让 qwen 改写 query 再搜 python search_and_evaluate.py --queries q.json --platforms xhs \ --output-dir scratch/run1 --gen-query --gen-model qwen --keep-original # 只搜不评估 python search_and_evaluate.py --queries q.json --platforms xhs --output-dir d --no-eval """ import argparse import asyncio import json import logging import sys from pathlib import Path from typing import Any, Callable, Dict, List, Optional, Tuple _PROJECT_ROOT = Path(__file__).resolve().parents[3] if str(_PROJECT_ROOT) not in sys.path: sys.path.insert(0, str(_PROJECT_ROOT)) from examples.process_pipeline.script.llm_helper import call_llm_with_retry logger = logging.getLogger(__name__) # ── queries 加载 ──────────────────────────────────────────────────────────────── def load_queries(path: Path) -> Tuple[List[str], str]: """读 queries.json,返回 (queries, requirement)。requirement 可能为空串。""" with open(path, "r", encoding="utf-8") as f: data = json.load(f) if isinstance(data, list): return [str(q).strip() for q in data if str(q).strip()], "" if isinstance(data, dict): raw = data.get("queries") or [] queries = [str(q).strip() for q in raw if str(q).strip()] return queries, str(data.get("requirement") or "").strip() raise ValueError(f"无法识别的 queries 文件格式: {path}(应为数组或 {{queries:[...]}})") # ── query 生成 / 改写 ─────────────────────────────────────────────────────────── def _validate_gen(data: Dict[str, Any]) -> Optional[str]: qs = data.get("queries") if not isinstance(qs, list) or not qs: return "queries 必须是非空数组" if not all(isinstance(q, str) and q.strip() for q in qs): return "queries 每一项必须是非空字符串" return None async def generate_queries( base_queries: List[str], requirement: str, llm_call: Callable, model: str, target_count: int, ) -> Tuple[List[str], float]: """让 LLM 基于采集需求改写 / 扩展已有 query。返回 (新 query 列表, cost)。""" system = ( "你是内容采集的搜索词优化器。基于采集需求和已有 query,产出一组更适合在" "社媒/内容平台搜索框直接使用的关键词:覆盖同义表达、具体工具名、典型用法场景," "去掉过于宽泛或重复的词。只输出一个 JSON 对象,不要解释、不要 markdown。" ) user = ( f"【采集需求 / 目标格子】\n{requirement or '(未提供,参考已有 query 自行归纳)'}\n\n" f"【已有 query】\n{json.dumps(base_queries, ensure_ascii=False)}\n\n" f"【要求】产出约 {target_count} 个改写 / 扩展后的搜索词,输出格式:\n" '{"queries": ["词1", "词2", ...]}\n' "词应简短(适合搜索框)、彼此不同、贴合『制作做法』而非泛泛话题。只输出 JSON。" ) data, cost = await call_llm_with_retry( llm_call=llm_call, messages=[{"role": "system", "content": system}, {"role": "user", "content": user}], model=model, temperature=0.5, max_tokens=1500, validate_fn=_validate_gen, task_name="GenQuery", ) if not data: logger.warning("query 生成失败,回退使用原始 query") return [], cost out, seen = [], set() for q in data["queries"]: q = q.strip() if q and q not in seen: seen.add(q) out.append(q) return out, cost # ── 渠道搜索 ──────────────────────────────────────────────────────────────────── # 英文为主的平台:搜索前把中文 query 翻成英文(其余平台用中文原词) _EN_PLATFORMS = {"youtube", "x"} def _validate_translation(data: Dict[str, Any], n: int) -> Optional[str]: t = data.get("translations") if not isinstance(t, list) or len(t) != n: return f"translations 必须是长度 {n} 的数组" if not all(isinstance(x, str) and x.strip() for x in t): return "translations 每项必须是非空字符串" return None async def build_query_overrides( platforms: List[str], queries: List[str], llm_call: Callable, model: str, ) -> Dict[Tuple[str, str], str]: """为英文平台(youtube/x)把 queries 翻成英文搜索词,返回 {(platform, q): en_q}。 无英文平台 → 返回空 dict(不调用 LLM)。翻译失败 → 回退用原中文(返回空,按原词搜)。 """ en_plats = [p for p in platforms if p in _EN_PLATFORMS] if not en_plats or not queries: return {} uniq = list(dict.fromkeys(queries)) system = ( "你是搜索词翻译器。把每个中文搜索词组翻译成地道的英文搜索关键词," "面向 YouTube / X(Twitter) 搜索框:用该领域英文母语者真实会搜的说法(含常用工具/术语英文名)," "简短、可直接搜,不要整句解释。只输出 JSON。" ) user = ( "把下面中文搜索词逐个翻译成英文搜索关键词,顺序一一对应,输出:\n" '{"translations": ["en1", "en2", ...]}\n\n' f"中文词(共 {len(uniq)} 个):\n{json.dumps(uniq, ensure_ascii=False, indent=2)}" ) data, _ = await call_llm_with_retry( llm_call=llm_call, messages=[{"role": "system", "content": system}, {"role": "user", "content": user}], model=model, temperature=0.2, max_tokens=1500, validate_fn=lambda d: _validate_translation(d, len(uniq)), task_name="TranslateQuery", ) if not data: logger.warning("query 翻译失败,英文平台回退用中文原词") return {} zh2en = {zh: en.strip() for zh, en in zip(uniq, data["translations"])} overrides = {(p, q): zh2en[q] for p in en_plats for q in queries if q in zh2en} print(f"🌐 英文平台 {en_plats} query 翻译:" + ";".join(f"{q}→{zh2en[q]}" for q in uniq[:5]) + (" …" if len(uniq) > 5 else "")) return overrides def _post_cid(post: Dict[str, Any]) -> Optional[str]: cid = post.get("channel_content_id") or post.get("video_id") if cid: return str(cid) link = post.get("link") or post.get("url") return str(link) if link else None async def _search_one(pdef, keyword: str, orig_q: str, max_count: int, sem: asyncio.Semaphore): """跑一次搜索(实际用 keyword,但回溯仍记原始 orig_q)。返回 (platform, orig_q, posts)。 keyword 可能是为英文平台翻译后的词;orig_q 是用户的原始 query,用于 found_by_queries 与跨渠道/跨形式对比的统一锚定。 """ async with sem: try: result = await pdef.search_impl( platform_id=pdef.id, keyword=keyword, max_count=max_count, cursor="", extras=None, ) except Exception as e: logger.warning("search 失败 [%s/%s]: %s", pdef.id, keyword, e) return pdef.id, orig_q, [] if getattr(result, "error", None): logger.warning("search 返回错误 [%s/%s]: %s", pdef.id, keyword, result.error) return pdef.id, orig_q, [] posts = (result.metadata or {}).get("posts", []) or [] return pdef.id, orig_q, posts async def search_all( platforms: List[str], queries: List[str], max_count: int, max_concurrent: int, query_overrides: Optional[Dict[Tuple[str, str], str]] = None, ) -> List[Dict[str, Any]]: """对所有 (platform × query) 组合并发搜索,按 (platform, cid) 去重。 query_overrides: {(platform, query): 实际搜索词}。命中则用覆盖词搜(如英文平台用译文), 未命中用原 query。found_by_queries 始终记原 query。 返回 source_dict 列表,每条带:case_id / platform / channel_content_id / source_url / post / comments / found_by_queries(命中它的 query,用于回溯 query 质量)。 """ import agent.tools.builtin.content.tools # noqa: F401 触发平台自注册 from agent.tools.builtin.content.registry import get_platform, all_platforms pdefs = [] for p in platforms: pdef = get_platform(p) if not pdef: avail = ", ".join(x.id for x in all_platforms()) raise ValueError(f"未知平台 '{p}'。可用: {avail}") if not pdef.search_impl: raise ValueError(f"平台 '{p}' 不支持搜索") pdefs.append(pdef) ov = query_overrides or {} sem = asyncio.Semaphore(max_concurrent) tasks = [_search_one(pdef, ov.get((pdef.id, q), q), q, max_count, sem) for pdef in pdefs for q in queries] print(f"🔎 搜索 {len(pdefs)} 渠道 × {len(queries)} query = {len(tasks)} 次请求 (并发 {max_concurrent})") results = await asyncio.gather(*tasks) # youtube / sph 的 post 字段与通用 schema 不同(youtube: video_id/description_snippet/ # thumbnails/url),统一 normalize 成 channel_content_id/body_text/images/link, # 否则下游取 cid / 正文 / 图片会漏。其他平台 no-op。 try: from examples.process_pipeline.script.extract_sources import _normalize_post_in_place except Exception: _normalize_post_in_place = None collected: Dict[Tuple[str, str], Dict[str, Any]] = {} per_query_counts: Dict[str, int] = {} for platform, query, posts in results: per_query_counts[f"{platform}/{query}"] = len(posts) for post in posts: if not isinstance(post, dict): continue if _normalize_post_in_place: try: _normalize_post_in_place(platform, post) except Exception: pass cid = _post_cid(post) if not cid: continue key = (platform, cid) if key in collected: collected[key]["found_by_queries"].append(query) continue link = post.get("link") or post.get("url") or "" collected[key] = { "case_id": f"{platform}_{cid}", "platform": platform, "channel_content_id": cid, "source_url": link, "post": post, "comments": post.get("author_comments", []) or [], "found_by_queries": [query], } print(" 每个 (渠道/query) 命中数:") for k, n in sorted(per_query_counts.items()): print(f" - {k}: {n}") print(f" 去重后唯一 post:{len(collected)}") return list(collected.values()) # ── 图片获取(下载转 base64 data URL,喂多模态评估)────────────────────────────── import base64 _MIME_BY_EXT = {".png": "image/png", ".webp": "image/webp", ".gif": "image/gif"} def _collect_post_image_urls(post: Dict[str, Any], max_images: int) -> List[str]: """从 post 收集图片 URL(复用 generate_case 的字段映射),截断到 max_images。""" try: from examples.process_pipeline.script.generate_case import _extract_raw_images urls = _extract_raw_images(post, post.get("channel") or "") except Exception: urls = post.get("images") or [] urls = [u for u in urls if isinstance(u, str) and u.startswith("http")] return urls[:max_images] async def _fetch_data_url(url: str, sem: asyncio.Semaphore) -> Optional[str]: """下载单张图片转 base64 data URL(用项目 _download_image,带 Referer/UA 绕防盗链)。""" from agent.tools.builtin.file.image_cdn import _download_image async with sem: try: data = await _download_image(url) except Exception as e: logger.warning("图片下载失败 %s: %s", url[:60], e) return None if not data: return None ext = next((e for e in _MIME_BY_EXT if url.lower().split("?")[0].endswith(e)), "") mime = _MIME_BY_EXT.get(ext, "image/jpeg") b64 = base64.b64encode(data).decode("ascii") return f"data:{mime};base64,{b64}" async def _attach_image_refs( sources: List[Dict[str, Any]], max_images: int, max_concurrent: int, mode: str, ) -> int: """为每条 source 收集配图,挂到 s['_image_data_urls'](评估后会清掉,不写进报告)。 mode="url" :直接用 http 图片 URL,让 OpenRouter/Gemini 服务端去抓(实测 xhs 可直连,最快)。 mode="base64":下载图片转 base64 data URL(带 Referer/UA 绕防盗链,最稳,payload 较大)。 """ if mode == "url": total = 0 for s in sources: urls = _collect_post_image_urls(s.get("post", {}) or {}, max_images) s["_image_data_urls"] = urls total += len(urls) return total # base64 模式:并发下载 sem = asyncio.Semaphore(max_concurrent) plan: List[Tuple[Dict[str, Any], List[str]]] = [] for s in sources: urls = _collect_post_image_urls(s.get("post", {}) or {}, max_images) plan.append((s, urls)) flat = [(s, u) for s, urls in plan for u in urls] results = await asyncio.gather(*[_fetch_data_url(u, sem) for _, u in flat]) bucket: Dict[int, List[str]] = {} for (s, _), data_url in zip(flat, results): if data_url: bucket.setdefault(id(s), []).append(data_url) total = 0 for s in sources: s["_image_data_urls"] = bucket.get(id(s), []) total += len(s["_image_data_urls"]) return total # ── 视频字幕:转写并并入正文(与 run_pipeline / extract_sources 同口径)────────── async def transcribe_video_posts(sources: List[Dict[str, Any]], concurrency: int = 3) -> int: """对视频帖跑 Deepgram 转写,把字幕并入 body_text(带 [视频字幕] 标记)再评估。 视频帖(抖音/视频号/youtube 等)正文常为空或只有 hashtag,真正内容在视频里。 复用 extract_sources 的逻辑:有视频源且还没 video_transcript 字段的帖子才转写, 转写结果合并进 body_text —— 这样评估(_format_post_for_eval 读 body_text)就能看到视频内容。 返回本次获得字幕的条数。 """ try: from examples.process_pipeline.script.extract_sources import ( _transcribe_pending_async, _merge_transcript_into_body, ) except Exception as e: logger.warning("转写模块导入失败,跳过: %s", e) return 0 try: updates = await _transcribe_pending_async(sources, concurrency=concurrency) except Exception as e: logger.warning("视频转写失败,跳过: %s", e) return 0 for s in sources: post = s.get("post") if isinstance(post, dict) and post.get("video_transcript"): merged = _merge_transcript_into_body(post) if merged is not post: post["body_text"] = merged.get("body_text", post.get("body_text", "")) return len(updates) # ── 逐条评估(直评,不写 source.json)───────────────────────────────────────────── async def evaluate_posts( sources: List[Dict[str, Any]], requirement: str, llm_call: Callable, model: str, max_concurrent: int, include_images: bool = True, max_images: int = 4, image_mode: str = "url", query: Optional[str] = None, ) -> Tuple[List[Dict[str, Any]], float]: """对每条 post 用 rubric 逐条评估,把 llm_evaluation 挂到 source 上。返回 (sources, total_cost)。 复用 llm_evaluate_sources 的 rubric 加载与单帖评估逻辑,保证与管线评估口径一致。 query 非空时作为检索锚点喂给模型(判相关性看『这帖是否回答了这个检索词』)。 include_images=True 时把帖子配图一并发给模型做多模态评估: image_mode="url" → 直接传图片 URL(最快,实测 xhs 可直连) image_mode="base64"→ 下载转 base64(最稳,绕防盗链) 评估失败(重试耗尽)的条目标 error 标记并保留,不丢。 """ from examples.process_pipeline.script.llm_evaluate_sources import _evaluate_one # rubric 详解 / 输出 schema 已固化在 eval_prompt_template.md, 不再 load 外部 rubric 文件 sem = asyncio.Semaphore(max_concurrent) if include_images: n_img = await _attach_image_refs(sources, max_images, max_concurrent * 2, image_mode) verb = "直传 URL" if image_mode == "url" else "下载 base64" print(f"🖼️ 配图 {n_img} 张(每帖≤{max_images},{verb})用于多模态评估") print(f"🧠 逐条评估 {len(sources)} 条 (并发 {max_concurrent}) ...") results = await asyncio.gather(*[ _evaluate_one(s, requirement, llm_call, model, sem, image_urls=(s.get("_image_data_urls") if include_images else None), query=query) for s in sources ]) # mod.md 中文 schema 不再有 decision/instructive_pass 字段 —— rep/dis 统计基于 # 制作相关性得分(1=明确无关 → 视作 discard 候选, 2/3 视作 report 候选)。失败仍按 error 标记。 total_cost = 0.0 rep = dis = failed = 0 for s, (llm_eval, cost) in zip(sources, results): total_cost += cost if llm_eval is None: s["llm_evaluation"] = {"_error": True, "_reason": "llm_eval_failed"} failed += 1 pr_score = "?" else: s["llm_evaluation"] = llm_eval pr_obj = llm_eval.get("制作相关性") or {} pr_score = pr_obj.get("得分", "?") try: if int(float(pr_score)) <= 1: dis += 1 else: rep += 1 except (TypeError, ValueError): rep += 1 # 无法解析得分时按 report 处理(prompt 引导失败,不强淘) title = (s.get("post", {}) or {}).get("title", "")[:30] print(f" - [制作相关性={pr_score}] {s['case_id'][:24]} {title}") print(f" 汇总: 制作相关>=2 {rep} / 明确无关(1) {dis} / 评估失败 {failed} / cost=${total_cost:.4f}") return sources, total_cost # ── 主流程 ──────────────────────────────────────────────────────────────────── async def run(args: argparse.Namespace) -> None: from examples.process_pipeline.script.llm_evaluate_sources import build_eval_llm_call queries, req_from_file = load_queries(Path(args.queries)) if not queries: print("❌ queries 为空"); sys.exit(1) platforms = [p.strip() for p in args.platforms.split(",") if p.strip()] if not platforms: print("❌ 未指定 --platforms"); sys.exit(1) requirement = args.requirement or req_from_file or (";".join(queries))[:200] output_dir = Path(args.output_dir) output_dir.mkdir(parents=True, exist_ok=True) print(f"📋 需求: {requirement[:80]}") print(f"📡 渠道: {platforms} | 原始 query 数: {len(queries)}") total_cost = 0.0 # 1. 可选:LLM 改写 / 扩展 query if args.gen_query: gen_llm, gen_model_id = build_eval_llm_call(args.gen_model) print(f"✍️ query 生成模型: {args.gen_model} -> {gen_model_id}") gen_queries, gen_cost = await generate_queries( queries, requirement, gen_llm, gen_model_id, args.gen_count, ) total_cost += gen_cost if gen_queries: search_queries = list(dict.fromkeys( (queries + gen_queries) if args.keep_original else gen_queries )) (output_dir / "generated_queries.json").write_text( json.dumps({"requirement": requirement, "original": queries, "generated": gen_queries, "used": search_queries}, ensure_ascii=False, indent=2), encoding="utf-8", ) print(f" 生成 {len(gen_queries)} 个 query,实际搜索 {len(search_queries)} 个" f"({'含' if args.keep_original else '不含'}原始)→ generated_queries.json") else: search_queries = queries print(" 生成失败,回退原始 query") else: search_queries = queries # 2. 多渠道搜索(英文平台 youtube/x 先把 query 翻成英文) tr_llm, tr_model = build_eval_llm_call("gemini-flash-lite") overrides = await build_query_overrides(platforms, search_queries, tr_llm, tr_model) sources = await search_all(platforms, search_queries, args.max_count, args.max_concurrent, query_overrides=overrides) # 3. 时间戳转可读 try: from examples.process_pipeline.script.extract_sources import _convert_timestamps _convert_timestamps(sources) except Exception as e: logger.warning("时间戳转换跳过: %s", e) # 4. 视频帖转写:把字幕并入正文再评估(默认开,--no-transcribe 关) if not args.no_transcribe and sources: n = await transcribe_video_posts(sources, concurrency=args.max_concurrent) if n: print(f"🎙️ 视频转写: {n} 条获得字幕并并入正文") # 5. 评估(除非 --no-eval) eval_model_id = None include_images = not args.no_images if not args.no_eval and sources: eval_llm, eval_model_id = build_eval_llm_call(args.eval_model) print(f"🧠 评估模型: {args.eval_model} -> {eval_model_id} | 多模态图片: {'开' if include_images else '关'}") sources, eval_cost = await evaluate_posts( sources, requirement, eval_llm, eval_model_id, args.max_concurrent, include_images=include_images, max_images=args.max_images, image_mode=args.image_mode, ) total_cost += eval_cost # 清掉评估用的临时 base64 图(别写进报告,否则文件爆炸),只留张数留痕 for s in sources: imgs = s.pop("_image_data_urls", None) if imgs is not None: s["images_sent"] = len(imgs) # 6. 写 evaluated.json out_file = output_dir / "evaluated.json" # mod.md 中文 schema 无 decision 字段; 统计基于 制作相关性.得分 # (>=2 视作 report 候选, ==1 视作 discard 候选, 失败/无字段不计) def _pr_score(s): ev = s.get("llm_evaluation") or {} if ev.get("_error"): return None try: return int(float((ev.get("制作相关性") or {}).get("得分"))) except (TypeError, ValueError): return None rep = sum(1 for s in sources if (_pr_score(s) or 0) >= 2) dis = sum(1 for s in sources if _pr_score(s) == 1) out = { "requirement": requirement, "platforms": platforms, "queries_used": search_queries, "eval_model": eval_model_id, "total": len(sources), "report": rep, "discard": dis, "results": sources, } with open(out_file, "w", encoding="utf-8") as f: json.dump(out, f, ensure_ascii=False, indent=2) print(f"💾 evaluated.json: {len(sources)} 条 (report={rep} discard={dis}) → {out_file}") print(f"💰 累计成本: ${total_cost:.4f}") def main() -> None: from dotenv import load_dotenv load_dotenv() from examples.process_pipeline.script.llm_evaluate_sources import EVAL_MODELS, DEFAULT_EVAL_MODEL parser = argparse.ArgumentParser(description="query 词多渠道直搜 + LLM 逐条评估") parser.add_argument("--queries", required=True, help="query 词 json 文件(数组或 {queries:[...]})") parser.add_argument("--platforms", required=True, help="逗号分隔渠道,如 xhs,zhihu,gzh,douyin,sph,youtube") parser.add_argument("--output-dir", required=True, help="输出目录(写 evaluated.json)") parser.add_argument("--max-count", type=int, default=20, help="每个 (渠道,query) 返回条数上限(部分渠道如 xhs 由后端决定)") parser.add_argument("--requirement", default="", help="评估用采集目标描述(缺省取 queries 文件 requirement 或 query 拼接)") parser.add_argument("--eval-model", default=DEFAULT_EVAL_MODEL, choices=list(EVAL_MODELS), help=f"评估模型(默认 {DEFAULT_EVAL_MODEL})") parser.add_argument("--max-concurrent", type=int, default=3, help="搜索 / 评估并发上限") parser.add_argument("--no-eval", action="store_true", help="只搜索,跳过 LLM 评估") parser.add_argument("--no-images", action="store_true", help="不把帖子配图发给模型(默认发,多模态评估;纯文本模型请加此项)") parser.add_argument("--max-images", type=int, default=4, help="每帖最多发给模型几张配图(默认 4)") parser.add_argument("--image-mode", choices=["url", "base64"], default="url", help="图片传输方式:url 直传(快,默认) / base64 下载内嵌(稳,绕防盗链)") parser.add_argument("--no-transcribe", action="store_true", help="不对视频帖跑转写(默认会转写并把字幕并入正文再评估)") # query 生成模式 parser.add_argument("--gen-query", action="store_true", help="搜索前用 LLM 改写 / 扩展 query") parser.add_argument("--gen-model", default=DEFAULT_EVAL_MODEL, choices=list(EVAL_MODELS), help=f"query 生成模型(默认 {DEFAULT_EVAL_MODEL})") parser.add_argument("--gen-count", type=int, default=10, help="生成 query 的目标数量") parser.add_argument("--keep-original", action="store_true", help="生成的 query 与原始 query 合并搜索(默认只用生成的)") args = parser.parse_args() asyncio.run(run(args)) if __name__ == "__main__": main()