execute_creation_apply.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284
  1. """模块 B Phase 3 执行入口(P0-A 配套,2026-06-09)。
  2. 数据流:
  3. Phase 1 已准备好的 pending records(含 _request_body)+ 运营决策(action)
  4. → 对 action=approve 的 → POST 腾讯 /dynamic_creatives/add
  5. → 写 creation_run_{date}.json + 发"执行汇报"飞书消息
  6. 可独立运行(从 outputs/data/creation_pending_*.json 读 → 审批已写 → apply):
  7. python execute_creation_apply.py <pending_records_json>
  8. 也可被 execute_creation_once.py import 调用:
  9. apply_pending_records(records) -> summary
  10. """
  11. import asyncio
  12. import json
  13. import logging
  14. import sys
  15. from datetime import datetime, timezone
  16. from pathlib import Path
  17. _HERE = Path(__file__).parent
  18. sys.path.insert(0, str(_HERE.parent.parent))
  19. sys.path.insert(0, str(_HERE))
  20. from dotenv import load_dotenv # noqa: E402
  21. load_dotenv(_HERE / ".env")
  22. from config import FEISHU_OPERATOR_CHAT_ID # noqa: E402
  23. # 强占 sys.path[0],绕过 config import 副作用(im-client/tools.py 同名冲突)
  24. while str(_HERE) in sys.path:
  25. sys.path.remove(str(_HERE))
  26. sys.path.insert(0, str(_HERE))
  27. from tools.creative_creation import post_creative_with_prepared_body # noqa: E402
  28. logger = logging.getLogger("execute_creation_apply")
  29. def _fmt_quality_summary(r: dict) -> str:
  30. """Task 29:把 record 里的素材质量字段(prepare 阶段写入)格式化成一行。
  31. 返回类似 ' · 素材质量: ctr=12.5% imp=8,500 cost=420元',字段缺失时省略对应项。
  32. """
  33. ctr = r.get("material_ctr")
  34. imp = r.get("material_impressions")
  35. cost = r.get("material_cost")
  36. parts = []
  37. if ctr is not None:
  38. # ctr 取决于服务端格式,有的返回 0.125(比例),有的 12.5(百分号字面值)
  39. try:
  40. ctr_f = float(ctr)
  41. parts.append(f"ctr={ctr_f * 100:.1f}%" if ctr_f <= 1 else f"ctr={ctr_f:.1f}%")
  42. except (TypeError, ValueError):
  43. pass
  44. if imp is not None:
  45. try:
  46. parts.append(f"imp={int(imp):,}")
  47. except (TypeError, ValueError):
  48. pass
  49. if cost is not None:
  50. try:
  51. parts.append(f"cost={float(cost):.0f}元")
  52. except (TypeError, ValueError):
  53. pass
  54. return f" · 素材质量: {' '.join(parts)}" if parts else ""
  55. def _send_apply_summary_to_feishu(
  56. summary: dict, chat_id: str = ""
  57. ) -> None:
  58. """Phase 3 完成后发"执行汇报"飞书纯文本消息(创意 ID 列表 + 素材质量摘要)。
  59. Task 29(2026-06-11):每行加 ctr / cost / impressions 三个字段,
  60. 让运营在不打开 Excel 的情况下,一眼看出本次挂上的创意素材质量。
  61. """
  62. if not chat_id:
  63. chat_id = FEISHU_OPERATOR_CHAT_ID
  64. if not chat_id:
  65. logger.warning("[apply] FEISHU_OPERATOR_CHAT_ID 未配置,不发执行汇报")
  66. return
  67. try:
  68. from tools.feishu_doc import _auth_headers, _get_tenant_token
  69. import httpx
  70. t = summary["total"]
  71. lines = [
  72. "【创意搭建·执行汇报】",
  73. f"运营 approve 共 {t['approved']} 条,实际挂上 {t['posted_ok']} 条,失败 {t['posted_failed']} 条",
  74. ]
  75. approved_list = [r for r in summary["records"] if r.get("action") == "approve"]
  76. if approved_list:
  77. # Task 29:算挂上创意的素材质量均值,放汇报头(运营快速判断本批整体质量)
  78. ok_records = [r for r in approved_list if r.get("dynamic_creative_id")]
  79. valid_ctr = [float(r["material_ctr"]) for r in ok_records
  80. if r.get("material_ctr") is not None]
  81. valid_imp = [int(r["material_impressions"]) for r in ok_records
  82. if r.get("material_impressions") is not None]
  83. valid_cost = [float(r["material_cost"]) for r in ok_records
  84. if r.get("material_cost") is not None]
  85. if valid_ctr or valid_imp or valid_cost:
  86. stat_parts = []
  87. if valid_ctr:
  88. avg_ctr = sum(valid_ctr) / len(valid_ctr)
  89. stat_parts.append(
  90. f"平均 ctr={avg_ctr * 100:.1f}%" if avg_ctr <= 1
  91. else f"平均 ctr={avg_ctr:.1f}%"
  92. )
  93. if valid_imp:
  94. stat_parts.append(f"平均 imp={int(sum(valid_imp) / len(valid_imp)):,}")
  95. if valid_cost:
  96. stat_parts.append(f"平均 cost={sum(valid_cost) / len(valid_cost):.0f}元")
  97. lines.append(f"素材质量(n={len(ok_records)}): {' · '.join(stat_parts)}")
  98. lines.append("")
  99. lines.append("【已挂创意】")
  100. for r in approved_list:
  101. cid = r.get("dynamic_creative_id") or "(挂失败)"
  102. quality = _fmt_quality_summary(r)
  103. lines.append(
  104. f" · adgroup {r['adgroup_id']}({r['adgroup_name']}) "
  105. f"→ creative_id={cid} · name={r['creative_name']}{quality}"
  106. )
  107. rejected_list = [r for r in summary["records"] if r.get("action") == "reject"]
  108. hold_list = [r for r in summary["records"] if r.get("action") == "hold"]
  109. skip_list = [r for r in summary["records"] if r.get("action") == "skip"]
  110. if rejected_list or hold_list or skip_list:
  111. lines.append("")
  112. lines.append(
  113. f"【未挂】reject {len(rejected_list)} · hold {len(hold_list)} · skip {len(skip_list)}"
  114. )
  115. text = "\n".join(lines)
  116. token = _get_tenant_token()
  117. url = "https://open.feishu.cn/open-apis/im/v1/messages?receive_id_type=chat_id"
  118. body = {
  119. "receive_id": chat_id,
  120. "msg_type": "text",
  121. "content": json.dumps({"text": text}, ensure_ascii=False),
  122. }
  123. resp = httpx.post(url, headers=_auth_headers(token), json=body, timeout=30)
  124. if resp.json().get("code") == 0:
  125. logger.info("[apply] 执行汇报已发送 chat=%s", chat_id)
  126. else:
  127. logger.warning("[apply] 执行汇报发送失败: %s", resp.text[:200])
  128. except Exception as e:
  129. logger.exception("[apply] 发送执行汇报异常: %s", e)
  130. def apply_pending_records(records: list[dict]) -> dict:
  131. """Phase 3:对 records 里 action=approve 的项调腾讯 POST 创意,返回 summary。
  132. Args:
  133. records: pending records,每个含 action 字段(approve/reject/hold/skip)
  134. 以及 _request_body / account_id / adgroup_id / 等
  135. Returns:
  136. summary dict:
  137. {
  138. "run_started", "run_finished",
  139. "total": {records, approved, posted_ok, posted_failed},
  140. "records": [...] # 每条含 action + dynamic_creative_id(成功时) + error(失败时)
  141. }
  142. """
  143. run_started = datetime.now(timezone.utc).isoformat()
  144. posted_ok = 0
  145. posted_failed = 0
  146. approved_total = 0
  147. out_records = []
  148. for r in records:
  149. rec = dict(r)
  150. action = (rec.get("action") or "skip").lower()
  151. rec["action"] = action
  152. if action != "approve":
  153. out_records.append(rec)
  154. continue
  155. approved_total += 1
  156. body = rec.get("_request_body")
  157. if not body:
  158. rec["error"] = "missing _request_body"
  159. posted_failed += 1
  160. out_records.append(rec)
  161. continue
  162. cid = post_creative_with_prepared_body(
  163. account_id=int(rec["account_id"]),
  164. body=body,
  165. skip_if_exists=True,
  166. )
  167. if cid:
  168. rec["dynamic_creative_id"] = str(cid)
  169. posted_ok += 1
  170. else:
  171. rec["error"] = "post_failed"
  172. posted_failed += 1
  173. out_records.append(rec)
  174. run_finished = datetime.now(timezone.utc).isoformat()
  175. return {
  176. "run_started": run_started,
  177. "run_finished": run_finished,
  178. "total": {
  179. "records": len(records),
  180. "approved": approved_total,
  181. "posted_ok": posted_ok,
  182. "posted_failed": posted_failed,
  183. },
  184. "records": out_records,
  185. }
  186. def _strip_body_for_json(records: list[dict]) -> list[dict]:
  187. """删 _request_body 中可能含 jump_spec 的长字段,JSON 落盘体积更小。
  188. 保留 body 顶层字段(adgroup_id/name/account_id)作为追溯。"""
  189. out = []
  190. for r in records:
  191. rec = dict(r)
  192. body = rec.pop("_request_body", None)
  193. if body:
  194. rec["_body_summary"] = {
  195. "account_id": body.get("account_id"),
  196. "adgroup_id": body.get("adgroup_id"),
  197. "dynamic_creative_name": body.get("dynamic_creative_name"),
  198. }
  199. out.append(rec)
  200. return out
  201. def write_summary(summary: dict, output_dir: Path) -> Path:
  202. output_dir.mkdir(parents=True, exist_ok=True)
  203. date_str = datetime.now(timezone.utc).strftime("%Y%m%d")
  204. ts = datetime.now(timezone.utc).strftime("%H%M%S")
  205. out_path = output_dir / f"creation_run_{date_str}_{ts}.json"
  206. persisted = dict(summary)
  207. persisted["records"] = _strip_body_for_json(summary["records"])
  208. with open(out_path, "w", encoding="utf-8") as f:
  209. json.dump(persisted, f, ensure_ascii=False, indent=2)
  210. return out_path
  211. def main() -> int:
  212. """独立模式:从 JSON 文件读 pending records → apply → 写 summary + 发飞书。"""
  213. logging.basicConfig(
  214. level=logging.INFO,
  215. format="%(asctime)s | %(levelname)s | %(name)s | %(message)s",
  216. datefmt="%H:%M:%S",
  217. )
  218. for noisy in ("httpx", "httpcore", "config", "db.config"):
  219. logging.getLogger(noisy).setLevel(logging.WARNING)
  220. if len(sys.argv) < 2:
  221. logger.error("用法: python execute_creation_apply.py <pending_records.json>")
  222. return 1
  223. pending_path = Path(sys.argv[1])
  224. if not pending_path.exists():
  225. logger.error(f"文件不存在: {pending_path}")
  226. return 1
  227. with open(pending_path, encoding="utf-8") as f:
  228. records = json.load(f)
  229. logger.info(f"读到 {len(records)} 条 pending records,开始 Phase 3 执行")
  230. summary = apply_pending_records(records)
  231. out_path = write_summary(summary, _HERE / "outputs" / "data")
  232. _send_apply_summary_to_feishu(summary)
  233. t = summary["total"]
  234. logger.info(
  235. f"Phase 3 完成: approve={t['approved']} ok={t['posted_ok']} fail={t['posted_failed']}"
  236. )
  237. logger.info(f"summary: {out_path}")
  238. return 0 if t["posted_failed"] == 0 else 1
  239. if __name__ == "__main__":
  240. sys.exit(main())