run_top_categories.py 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158
  1. """按品类挑需求跑 CFA:取某天需求池里「条数最多的前 N 个品类」,每个品类挑 1 条需求,
  2. 各在抖音 + 快手各跑一个 run(默认 N=3 → 3 条需求 × 2 平台 = 6 个 run)。
  3. 今天先手动跑,以后可挂定时(--date 默认今天,改成 cron 传当天日期即可)。
  4. 安全:默认 **dry-run**,只查库 + 打印「选了哪 3 个品类、各挑哪条需求、要跑哪 6 个 run」,
  5. **不跑任何 run**;核对无误后加 `--live` 才真跑。
  6. 用法:
  7. 看计划(不跑): python -m scripts.run_top_categories
  8. 指定日期看计划: python -m scripts.run_top_categories --date 20260625
  9. 真跑 6 个 run: python -m scripts.run_top_categories --date 20260625 --live
  10. 选取规则(确定性、可复现,便于以后定时):
  11. - 品类排序:按当天该品类需求条数 desc;并列时按「品类内最高 score」desc,再按品类名 asc。
  12. - 每品类挑 1 条:按 score desc、id asc 取第 1 条(最高分那条;同分取 id 最小)。
  13. - 取词的需求日 = --date,对应 demand_content.dt(形如 20260625)。
  14. """
  15. from __future__ import annotations
  16. import argparse
  17. import json
  18. import time
  19. import traceback
  20. from datetime import datetime, timezone
  21. from pathlib import Path
  22. from content_agent.integrations.database_runtime import ContentSupplyDbConfig
  23. from content_agent.run_service import RunService
  24. from content_agent.schemas import RunStartRequest
  25. PLATFORMS = ["douyin", "kuaishou"]
  26. DEFAULT_TOP_N = 3
  27. STRATEGY_VERSION = "V4"
  28. PLATFORM_MODE = "real"
  29. # 选品类:某天需求按品类(merge_leve2)计数,多→少;并列按品类内最高分→品类名,保证可复现。
  30. _TOP_CATEGORIES_SQL = """
  31. SELECT merge_leve2 AS category, COUNT(*) AS cnt, MAX(score) AS top_score
  32. FROM demand_content
  33. WHERE dt = %s
  34. GROUP BY merge_leve2
  35. ORDER BY cnt DESC, top_score DESC, merge_leve2 ASC
  36. LIMIT %s
  37. """
  38. # 每个品类挑 1 条:分高→id 小(确定、可复现)。
  39. _PICK_DEMAND_SQL = """
  40. SELECT id, merge_leve2 AS category, name, score
  41. FROM demand_content
  42. WHERE dt = %s AND merge_leve2 = %s
  43. ORDER BY score DESC, id ASC
  44. LIMIT 1
  45. """
  46. def _today() -> str:
  47. return datetime.now().strftime("%Y%m%d")
  48. def _select_picks_and_jobs(date: str, top_n: int) -> tuple[list[dict], list[dict]]:
  49. """只读查库:返回 (每品类选中的需求, 展开后的 job 列表)。"""
  50. conn = ContentSupplyDbConfig.from_env().connect()
  51. with conn:
  52. with conn.cursor() as cur:
  53. cur.execute(_TOP_CATEGORIES_SQL, (date, top_n))
  54. categories = cur.fetchall()
  55. picks: list[dict] = []
  56. for cat in categories:
  57. cur.execute(_PICK_DEMAND_SQL, (date, cat["category"]))
  58. demand = cur.fetchone()
  59. if demand:
  60. picks.append({**demand, "category_cnt": cat["cnt"]})
  61. jobs = [
  62. {"demand_content_id": p["id"], "category": p["category"], "platform": platform}
  63. for p in picks
  64. for platform in PLATFORMS
  65. ]
  66. return picks, jobs
  67. def _emit(handle, obj: dict) -> None:
  68. obj["ts"] = datetime.now(timezone.utc).isoformat()
  69. line = json.dumps(obj, ensure_ascii=False)
  70. print(line, flush=True)
  71. if handle is not None:
  72. handle.write(line + "\n")
  73. handle.flush()
  74. def main() -> None:
  75. parser = argparse.ArgumentParser(description="按品类挑需求跑 CFA(前 N 品类 × 抖音/快手)")
  76. parser.add_argument("--date", default=_today(), help="需求生成日 dt(YYYYMMDD,默认今天)")
  77. parser.add_argument("--top-n", type=int, default=DEFAULT_TOP_N, help="取条数最多的前 N 个品类(默认 3)")
  78. parser.add_argument("--live", action="store_true", help="真跑(否则 dry-run 只打印计划)")
  79. args = parser.parse_args()
  80. picks, jobs = _select_picks_and_jobs(args.date, args.top_n)
  81. if not picks:
  82. # 当天没有新生成的需求(dt=当天 查不到)→ 正常跳过,不跑、干净退出(退出码 0,不触发定时告警)。
  83. print(f"[skip] {args.date} 当天没有新需求(demand_content.dt={args.date} 无记录),不跑。")
  84. return
  85. print(f"== {args.date} 取前 {args.top_n} 品类,每品类挑 1 条,各跑 {PLATFORMS} ==")
  86. for idx, p in enumerate(picks, 1):
  87. print(
  88. f" {idx}. 品类「{p['category']}」(当天共 {p['category_cnt']} 条)"
  89. f" → demand {p['id']}(score={p['score']}) {p['name']}"
  90. )
  91. print(f" 共 {len(jobs)} 个 run = {len(picks)} 条需求 × {len(PLATFORMS)} 平台")
  92. if not args.live:
  93. print("\n[dry-run] 没跑任何 run。核对上面 3 条需求无误后,加 --live 真跑。")
  94. return
  95. ts = datetime.now(timezone.utc).strftime("%Y%m%d_%H%M%S")
  96. log_path = Path("e2e_logs") / f"run_top_categories_{args.date}_{ts}.jsonl"
  97. log_path.parent.mkdir(parents=True, exist_ok=True)
  98. service = RunService.from_env()
  99. with log_path.open("a", encoding="utf-8") as handle:
  100. _emit(handle, {"event": "batch_start", "date": args.date, "jobs": jobs})
  101. for job in jobs: # 串行:一个跑完再跑下一个
  102. demand_id, platform = job["demand_content_id"], job["platform"]
  103. run_id = f"v1_run_{demand_id}_{platform}_{int(time.time())}"
  104. _emit(handle, {
  105. "event": "run_start", "demand": demand_id,
  106. "category": job["category"], "platform": platform, "run_id": run_id,
  107. })
  108. start = time.time()
  109. try:
  110. state = service.start_run(RunStartRequest(
  111. run_id=run_id,
  112. demand_content_id=demand_id,
  113. platform=platform,
  114. platform_mode=PLATFORM_MODE,
  115. strategy_version=STRATEGY_VERSION,
  116. ))
  117. _emit(handle, {
  118. "event": "run_done", "demand": demand_id, "platform": platform,
  119. "run_id": state["run_id"], "status": state.get("status"),
  120. "current_step": state.get("current_step"),
  121. "elapsed_s": round(time.time() - start, 1),
  122. })
  123. except Exception as exc: # noqa: BLE001 — 单个 run 失败不连累后面,记账后继续
  124. _emit(handle, {
  125. "event": "run_error", "demand": demand_id, "platform": platform,
  126. "run_id": run_id, "error": str(exc),
  127. "traceback": traceback.format_exc()[-2000:],
  128. "elapsed_s": round(time.time() - start, 1),
  129. })
  130. _emit(handle, {"event": "batch_done"})
  131. print(f"\n完成。日志:{log_path}")
  132. if __name__ == "__main__":
  133. main()