# -*- coding: utf-8 -*- """搜索 + 评估 · 任意 query → 多渠道搜索去重 → LLM 逐帖评估 → search_process/search_tools 表 (方向由 --mode-type 决定:工序 → search_process,工具 → search_tools) ================================================================================ 引擎函数全部只读复用 search_and_evaluate.py(搜索/去重/转写/评估/英平台翻译)。 用法(一般由 server.py 起子进程调): python pipeline/search_eval.py --query-id q0004 --query "AI 人像 图片 生成 怎么做" python pipeline/search_eval.py --query-id q0005 --query "GPT image2 评测" \ --synonyms "GPT image2 测评,GPT image2 实测" --platforms xhs,gzh --max-count 10 """ import argparse import asyncio import json import sys from pathlib import Path PROJECT_ROOT = Path(__file__).resolve().parents[3] # …/Agent sys.path.insert(0, str(PROJECT_ROOT)) from dotenv import load_dotenv load_dotenv() from examples.process_pipeline.script.search_eval.search_and_evaluate import ( search_all, evaluate_posts, transcribe_video_posts, build_query_overrides, ) from examples.process_pipeline.script.llm_evaluate_sources import ( build_eval_llm_call, EVAL_MODELS, DEFAULT_EVAL_MODEL, ) HERE = Path(__file__).resolve().parent MW = HERE.parent sys.path.insert(0, str(MW)) import db async def run(args): phrasings = [args.query] + [s.strip() for s in (args.synonyms or "").split(",") if s.strip()] # 去重保序 seen, uniq = set(), [] for q in phrasings: if q not in seen: seen.add(q); uniq.append(q) phrasings = uniq platforms = [p.strip() for p in args.platforms.split(",") if p.strip()] eval_llm, eval_model_id = build_eval_llm_call(args.eval_model) print(f"▶ {args.query_id} query={args.query!r} 措辞={phrasings} 渠道={platforms}") overrides = await build_query_overrides(platforms, phrasings, eval_llm, eval_model_id) sources = await search_all(platforms, phrasings, args.max_count, args.max_concurrent, query_overrides=overrides) print(f"🔎 去重后 {len(sources)} 帖") if not sources: print("❌ 搜索无结果"); return 1 try: from examples.process_pipeline.script.extract_sources import _convert_timestamps _convert_timestamps(sources) except Exception: pass if not args.no_transcribe: n = await transcribe_video_posts(sources, concurrency=args.max_concurrent) if n: print(f"🎙️ 视频转写 {n} 条") cost = 0.0 if not args.no_eval: sources, cost = await evaluate_posts( sources, "", eval_llm, eval_model_id, args.max_concurrent, include_images=not args.no_images, max_images=args.max_images, image_mode=args.image_mode, query=args.query, ) for s in sources: s.pop("_image_data_urls", None) table = "search_tools" if args.mode_type == "工具" else "search_process" n = db.upsert_search_posts(args.query_id, args.query, sources, table=table) print(f"🗄️ {table} 入库 {n} 行 · 方向 {args.mode_type or '工序'} · 评估成本 ${cost:.4f}") out_dir = MW / "runs" / table out_dir.mkdir(parents=True, exist_ok=True) (out_dir / f"{args.query_id}.json").write_text(json.dumps({ "query_id": args.query_id, "query": args.query, "phrasings": phrasings, "platforms": platforms, "total": len(sources), "results": sources, }, ensure_ascii=False, indent=2), encoding="utf-8") return 0 def main(): p = argparse.ArgumentParser(description="搜索+评估 → search_process/search_tools") p.add_argument("--query-id", required=True, help="如 q0004(server 自动分配)") p.add_argument("--query", required=True, help="基准 query(评估锚点)") p.add_argument("--synonyms", default="", help="逗号分隔的同义措辞(可选)") p.add_argument("--mode-type", default="", choices=["", "工序", "工具"], help="解构方向,决定写哪张表(工具 → search_tools;其余 → search_process)") p.add_argument("--platforms", default="xhs,gzh") p.add_argument("--max-count", type=int, default=10) p.add_argument("--eval-model", default=DEFAULT_EVAL_MODEL, choices=list(EVAL_MODELS)) p.add_argument("--max-concurrent", type=int, default=3) p.add_argument("--max-images", type=int, default=4) p.add_argument("--image-mode", choices=["url", "base64"], default="url") p.add_argument("--no-transcribe", action="store_true") p.add_argument("--no-eval", action="store_true") p.add_argument("--no-images", action="store_true") args = p.parse_args() raise SystemExit(asyncio.run(run(args))) if __name__ == "__main__": main()