"""按品类挑需求跑 CFA:取某天需求池里「条数最多的前 N 个品类」,每个品类挑 1 条需求, 各在抖音 + 快手各跑一个 run(默认 N=3 → 3 条需求 × 2 平台 = 6 个 run)。 今天先手动跑,以后可挂定时(--date 默认今天,改成 cron 传当天日期即可)。 安全:默认 **dry-run**,只查库 + 打印「选了哪 3 个品类、各挑哪条需求、要跑哪 6 个 run」, **不跑任何 run**;核对无误后加 `--live` 才真跑。 用法: 看计划(不跑): python -m scripts.run_top_categories 指定日期看计划: python -m scripts.run_top_categories --date 20260625 真跑 6 个 run: python -m scripts.run_top_categories --date 20260625 --live 选取规则(确定性、可复现,便于以后定时): - 品类排序:按当天该品类需求条数 desc;并列时按「品类内最高 score」desc,再按品类名 asc。 - 每品类挑 1 条:按 score desc、id asc 取第 1 条(最高分那条;同分取 id 最小)。 - 取词的需求日 = --date,对应 demand_content.dt(形如 20260625)。 """ from __future__ import annotations import argparse import json import time import traceback from datetime import datetime, timezone from pathlib import Path from content_agent.integrations.database_runtime import ContentSupplyDbConfig from content_agent.run_service import RunService from content_agent.schemas import RunStartRequest PLATFORMS = ["douyin", "kuaishou"] DEFAULT_TOP_N = 3 STRATEGY_VERSION = "V4" PLATFORM_MODE = "real" # 选品类:某天需求按品类(merge_leve2)计数,多→少;并列按品类内最高分→品类名,保证可复现。 _TOP_CATEGORIES_SQL = """ SELECT merge_leve2 AS category, COUNT(*) AS cnt, MAX(score) AS top_score FROM demand_content WHERE dt = %s GROUP BY merge_leve2 ORDER BY cnt DESC, top_score DESC, merge_leve2 ASC LIMIT %s """ # 每个品类挑 1 条:分高→id 小(确定、可复现)。 _PICK_DEMAND_SQL = """ SELECT id, merge_leve2 AS category, name, score FROM demand_content WHERE dt = %s AND merge_leve2 = %s ORDER BY score DESC, id ASC LIMIT 1 """ def _today() -> str: return datetime.now().strftime("%Y%m%d") def _select_picks_and_jobs(date: str, top_n: int) -> tuple[list[dict], list[dict]]: """只读查库:返回 (每品类选中的需求, 展开后的 job 列表)。""" conn = ContentSupplyDbConfig.from_env().connect() with conn: with conn.cursor() as cur: cur.execute(_TOP_CATEGORIES_SQL, (date, top_n)) categories = cur.fetchall() picks: list[dict] = [] for cat in categories: cur.execute(_PICK_DEMAND_SQL, (date, cat["category"])) demand = cur.fetchone() if demand: picks.append({**demand, "category_cnt": cat["cnt"]}) jobs = [ {"demand_content_id": p["id"], "category": p["category"], "platform": platform} for p in picks for platform in PLATFORMS ] return picks, jobs def _emit(handle, obj: dict) -> None: obj["ts"] = datetime.now(timezone.utc).isoformat() line = json.dumps(obj, ensure_ascii=False) print(line, flush=True) if handle is not None: handle.write(line + "\n") handle.flush() def main() -> None: parser = argparse.ArgumentParser(description="按品类挑需求跑 CFA(前 N 品类 × 抖音/快手)") parser.add_argument("--date", default=_today(), help="需求生成日 dt(YYYYMMDD,默认今天)") parser.add_argument("--top-n", type=int, default=DEFAULT_TOP_N, help="取条数最多的前 N 个品类(默认 3)") parser.add_argument("--live", action="store_true", help="真跑(否则 dry-run 只打印计划)") args = parser.parse_args() picks, jobs = _select_picks_and_jobs(args.date, args.top_n) if not picks: # 当天没有新生成的需求(dt=当天 查不到)→ 正常跳过,不跑、干净退出(退出码 0,不触发定时告警)。 print(f"[skip] {args.date} 当天没有新需求(demand_content.dt={args.date} 无记录),不跑。") return print(f"== {args.date} 取前 {args.top_n} 品类,每品类挑 1 条,各跑 {PLATFORMS} ==") for idx, p in enumerate(picks, 1): print( f" {idx}. 品类「{p['category']}」(当天共 {p['category_cnt']} 条)" f" → demand {p['id']}(score={p['score']}) {p['name']}" ) print(f" 共 {len(jobs)} 个 run = {len(picks)} 条需求 × {len(PLATFORMS)} 平台") if not args.live: print("\n[dry-run] 没跑任何 run。核对上面 3 条需求无误后,加 --live 真跑。") return ts = datetime.now(timezone.utc).strftime("%Y%m%d_%H%M%S") log_path = Path("e2e_logs") / f"run_top_categories_{args.date}_{ts}.jsonl" log_path.parent.mkdir(parents=True, exist_ok=True) service = RunService.from_env() with log_path.open("a", encoding="utf-8") as handle: _emit(handle, {"event": "batch_start", "date": args.date, "jobs": jobs}) for job in jobs: # 串行:一个跑完再跑下一个 demand_id, platform = job["demand_content_id"], job["platform"] run_id = f"v1_run_{demand_id}_{platform}_{int(time.time())}" _emit(handle, { "event": "run_start", "demand": demand_id, "category": job["category"], "platform": platform, "run_id": run_id, }) start = time.time() try: state = service.start_run(RunStartRequest( run_id=run_id, demand_content_id=demand_id, platform=platform, platform_mode=PLATFORM_MODE, strategy_version=STRATEGY_VERSION, )) _emit(handle, { "event": "run_done", "demand": demand_id, "platform": platform, "run_id": state["run_id"], "status": state.get("status"), "current_step": state.get("current_step"), "elapsed_s": round(time.time() - start, 1), }) except Exception as exc: # noqa: BLE001 — 单个 run 失败不连累后面,记账后继续 _emit(handle, { "event": "run_error", "demand": demand_id, "platform": platform, "run_id": run_id, "error": str(exc), "traceback": traceback.format_exc()[-2000:], "elapsed_s": round(time.time() - start, 1), }) _emit(handle, {"event": "batch_done"}) print(f"\n完成。日志:{log_path}") if __name__ == "__main__": main()