| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158 |
- """按品类挑需求跑 CFA:取某天需求池里「条数最多的前 N 个品类」,每个品类挑 1 条需求,
- 各在抖音 + 快手各跑一个 run(默认 N=3 → 3 条需求 × 2 平台 = 6 个 run)。
- 今天先手动跑,以后可挂定时(--date 默认今天,改成 cron 传当天日期即可)。
- 安全:默认 **dry-run**,只查库 + 打印「选了哪 3 个品类、各挑哪条需求、要跑哪 6 个 run」,
- **不跑任何 run**;核对无误后加 `--live` 才真跑。
- 用法:
- 看计划(不跑): python -m scripts.run_top_categories
- 指定日期看计划: python -m scripts.run_top_categories --date 20260625
- 真跑 6 个 run: python -m scripts.run_top_categories --date 20260625 --live
- 选取规则(确定性、可复现,便于以后定时):
- - 品类排序:按当天该品类需求条数 desc;并列时按「品类内最高 score」desc,再按品类名 asc。
- - 每品类挑 1 条:按 score desc、id asc 取第 1 条(最高分那条;同分取 id 最小)。
- - 取词的需求日 = --date,对应 demand_content.dt(形如 20260625)。
- """
- from __future__ import annotations
- import argparse
- import json
- import time
- import traceback
- from datetime import datetime, timezone
- from pathlib import Path
- from content_agent.integrations.database_runtime import ContentSupplyDbConfig
- from content_agent.run_service import RunService
- from content_agent.schemas import RunStartRequest
- PLATFORMS = ["douyin", "kuaishou"]
- DEFAULT_TOP_N = 3
- STRATEGY_VERSION = "V4"
- PLATFORM_MODE = "real"
- # 选品类:某天需求按品类(merge_leve2)计数,多→少;并列按品类内最高分→品类名,保证可复现。
- _TOP_CATEGORIES_SQL = """
- SELECT merge_leve2 AS category, COUNT(*) AS cnt, MAX(score) AS top_score
- FROM demand_content
- WHERE dt = %s
- GROUP BY merge_leve2
- ORDER BY cnt DESC, top_score DESC, merge_leve2 ASC
- LIMIT %s
- """
- # 每个品类挑 1 条:分高→id 小(确定、可复现)。
- _PICK_DEMAND_SQL = """
- SELECT id, merge_leve2 AS category, name, score
- FROM demand_content
- WHERE dt = %s AND merge_leve2 = %s
- ORDER BY score DESC, id ASC
- LIMIT 1
- """
- def _today() -> str:
- return datetime.now().strftime("%Y%m%d")
- def _select_picks_and_jobs(date: str, top_n: int) -> tuple[list[dict], list[dict]]:
- """只读查库:返回 (每品类选中的需求, 展开后的 job 列表)。"""
- conn = ContentSupplyDbConfig.from_env().connect()
- with conn:
- with conn.cursor() as cur:
- cur.execute(_TOP_CATEGORIES_SQL, (date, top_n))
- categories = cur.fetchall()
- picks: list[dict] = []
- for cat in categories:
- cur.execute(_PICK_DEMAND_SQL, (date, cat["category"]))
- demand = cur.fetchone()
- if demand:
- picks.append({**demand, "category_cnt": cat["cnt"]})
- jobs = [
- {"demand_content_id": p["id"], "category": p["category"], "platform": platform}
- for p in picks
- for platform in PLATFORMS
- ]
- return picks, jobs
- def _emit(handle, obj: dict) -> None:
- obj["ts"] = datetime.now(timezone.utc).isoformat()
- line = json.dumps(obj, ensure_ascii=False)
- print(line, flush=True)
- if handle is not None:
- handle.write(line + "\n")
- handle.flush()
- def main() -> None:
- parser = argparse.ArgumentParser(description="按品类挑需求跑 CFA(前 N 品类 × 抖音/快手)")
- parser.add_argument("--date", default=_today(), help="需求生成日 dt(YYYYMMDD,默认今天)")
- parser.add_argument("--top-n", type=int, default=DEFAULT_TOP_N, help="取条数最多的前 N 个品类(默认 3)")
- parser.add_argument("--live", action="store_true", help="真跑(否则 dry-run 只打印计划)")
- args = parser.parse_args()
- picks, jobs = _select_picks_and_jobs(args.date, args.top_n)
- if not picks:
- # 当天没有新生成的需求(dt=当天 查不到)→ 正常跳过,不跑、干净退出(退出码 0,不触发定时告警)。
- print(f"[skip] {args.date} 当天没有新需求(demand_content.dt={args.date} 无记录),不跑。")
- return
- print(f"== {args.date} 取前 {args.top_n} 品类,每品类挑 1 条,各跑 {PLATFORMS} ==")
- for idx, p in enumerate(picks, 1):
- print(
- f" {idx}. 品类「{p['category']}」(当天共 {p['category_cnt']} 条)"
- f" → demand {p['id']}(score={p['score']}) {p['name']}"
- )
- print(f" 共 {len(jobs)} 个 run = {len(picks)} 条需求 × {len(PLATFORMS)} 平台")
- if not args.live:
- print("\n[dry-run] 没跑任何 run。核对上面 3 条需求无误后,加 --live 真跑。")
- return
- ts = datetime.now(timezone.utc).strftime("%Y%m%d_%H%M%S")
- log_path = Path("e2e_logs") / f"run_top_categories_{args.date}_{ts}.jsonl"
- log_path.parent.mkdir(parents=True, exist_ok=True)
- service = RunService.from_env()
- with log_path.open("a", encoding="utf-8") as handle:
- _emit(handle, {"event": "batch_start", "date": args.date, "jobs": jobs})
- for job in jobs: # 串行:一个跑完再跑下一个
- demand_id, platform = job["demand_content_id"], job["platform"]
- run_id = f"v1_run_{demand_id}_{platform}_{int(time.time())}"
- _emit(handle, {
- "event": "run_start", "demand": demand_id,
- "category": job["category"], "platform": platform, "run_id": run_id,
- })
- start = time.time()
- try:
- state = service.start_run(RunStartRequest(
- run_id=run_id,
- demand_content_id=demand_id,
- platform=platform,
- platform_mode=PLATFORM_MODE,
- strategy_version=STRATEGY_VERSION,
- ))
- _emit(handle, {
- "event": "run_done", "demand": demand_id, "platform": platform,
- "run_id": state["run_id"], "status": state.get("status"),
- "current_step": state.get("current_step"),
- "elapsed_s": round(time.time() - start, 1),
- })
- except Exception as exc: # noqa: BLE001 — 单个 run 失败不连累后面,记账后继续
- _emit(handle, {
- "event": "run_error", "demand": demand_id, "platform": platform,
- "run_id": run_id, "error": str(exc),
- "traceback": traceback.format_exc()[-2000:],
- "elapsed_s": round(time.time() - start, 1),
- })
- _emit(handle, {"event": "batch_done"})
- print(f"\n完成。日志:{log_path}")
- if __name__ == "__main__":
- main()
|