| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291 |
- """模块 B Phase 3 执行入口(P0-A 配套,2026-06-09)。
- 数据流:
- Phase 1 已准备好的 pending records(含 _request_body)+ 运营决策(action)
- → 对 action=approve 的 → POST 腾讯 /dynamic_creatives/add
- → 写 creation_run_{date}.json + 发"执行汇报"飞书消息
- 可独立运行(从 outputs/data/creation_pending_*.json 读 → 审批已写 → apply):
- python execute_creation_apply.py <pending_records_json>
- 也可被 execute_creation_once.py import 调用:
- apply_pending_records(records) -> summary
- """
- import asyncio
- import json
- import logging
- import sys
- from datetime import datetime, timezone
- from pathlib import Path
- _HERE = Path(__file__).parent
- sys.path.insert(0, str(_HERE.parent.parent))
- sys.path.insert(0, str(_HERE))
- from dotenv import load_dotenv # noqa: E402
- load_dotenv(_HERE / ".env")
- from config import FEISHU_OPERATOR_CHAT_ID # noqa: E402
- # 强占 sys.path[0],绕过 config import 副作用(im-client/tools.py 同名冲突)
- while str(_HERE) in sys.path:
- sys.path.remove(str(_HERE))
- sys.path.insert(0, str(_HERE))
- from tools.creative_creation import post_creative_with_prepared_body # noqa: E402
- logger = logging.getLogger("execute_creation_apply")
- def _fmt_quality_summary(r: dict) -> str:
- """Task 29:把 record 里的素材质量字段(prepare 阶段写入)格式化成一行。
- 返回类似 ' · 素材质量: ctr=12.5% imp=8,500 cost=420元',字段缺失时省略对应项。
- """
- ctr = r.get("material_ctr")
- imp = r.get("material_impressions")
- cost = r.get("material_cost")
- parts = []
- if ctr is not None:
- # ctr 取决于服务端格式,有的返回 0.125(比例),有的 12.5(百分号字面值)
- try:
- ctr_f = float(ctr)
- parts.append(f"ctr={ctr_f * 100:.1f}%" if ctr_f <= 1 else f"ctr={ctr_f:.1f}%")
- except (TypeError, ValueError):
- pass
- if imp is not None:
- try:
- parts.append(f"imp={int(imp):,}")
- except (TypeError, ValueError):
- pass
- if cost is not None:
- try:
- parts.append(f"cost={float(cost):.0f}元")
- except (TypeError, ValueError):
- pass
- return f" · 素材质量: {' '.join(parts)}" if parts else ""
- def _send_apply_summary_to_feishu(
- summary: dict, chat_id: str = ""
- ) -> None:
- """Phase 3 完成后发"执行汇报"飞书纯文本消息(创意 ID 列表 + 素材质量摘要)。
- Task 29(2026-06-11):每行加 ctr / cost / impressions 三个字段,
- 让运营在不打开 Excel 的情况下,一眼看出本次挂上的创意素材质量。
- """
- if not chat_id:
- chat_id = FEISHU_OPERATOR_CHAT_ID
- if not chat_id:
- logger.warning("[apply] FEISHU_OPERATOR_CHAT_ID 未配置,不发执行汇报")
- return
- try:
- from tools.feishu_doc import _auth_headers, _get_tenant_token
- import httpx
- t = summary["total"]
- lines = [
- "【创意搭建·执行汇报】",
- f"运营 approve 共 {t['approved']} 条,实际挂上 {t['posted_ok']} 条,失败 {t['posted_failed']} 条",
- ]
- approved_list = [r for r in summary["records"] if r.get("action") == "approve"]
- if approved_list:
- # Task 29:算挂上创意的素材质量均值,放汇报头(运营快速判断本批整体质量)
- ok_records = [r for r in approved_list if r.get("dynamic_creative_id")]
- valid_ctr = [float(r["material_ctr"]) for r in ok_records
- if r.get("material_ctr") is not None]
- valid_imp = [int(r["material_impressions"]) for r in ok_records
- if r.get("material_impressions") is not None]
- valid_cost = [float(r["material_cost"]) for r in ok_records
- if r.get("material_cost") is not None]
- if valid_ctr or valid_imp or valid_cost:
- stat_parts = []
- if valid_ctr:
- avg_ctr = sum(valid_ctr) / len(valid_ctr)
- stat_parts.append(
- f"平均 ctr={avg_ctr * 100:.1f}%" if avg_ctr <= 1
- else f"平均 ctr={avg_ctr:.1f}%"
- )
- if valid_imp:
- stat_parts.append(f"平均 imp={int(sum(valid_imp) / len(valid_imp)):,}")
- if valid_cost:
- stat_parts.append(f"平均 cost={sum(valid_cost) / len(valid_cost):.0f}元")
- lines.append(f"素材质量(n={len(ok_records)}): {' · '.join(stat_parts)}")
- lines.append("")
- lines.append("【已挂创意】")
- for r in approved_list:
- cid = r.get("dynamic_creative_id") or "(挂失败)"
- quality = _fmt_quality_summary(r)
- lines.append(
- f" · adgroup {r['adgroup_id']}({r['adgroup_name']}) "
- f"→ creative_id={cid} · name={r['creative_name']}{quality}"
- )
- rejected_list = [r for r in summary["records"] if r.get("action") == "reject"]
- hold_list = [r for r in summary["records"] if r.get("action") == "hold"]
- skip_list = [r for r in summary["records"] if r.get("action") == "skip"]
- if rejected_list or hold_list or skip_list:
- lines.append("")
- lines.append(
- f"【未挂】reject {len(rejected_list)} · hold {len(hold_list)} · skip {len(skip_list)}"
- )
- text = "\n".join(lines)
- token = _get_tenant_token()
- url = "https://open.feishu.cn/open-apis/im/v1/messages?receive_id_type=chat_id"
- body = {
- "receive_id": chat_id,
- "msg_type": "text",
- "content": json.dumps({"text": text}, ensure_ascii=False),
- }
- resp = httpx.post(url, headers=_auth_headers(token), json=body, timeout=30)
- if resp.json().get("code") == 0:
- logger.info("[apply] 执行汇报已发送 chat=%s", chat_id)
- else:
- logger.warning("[apply] 执行汇报发送失败: %s", resp.text[:200])
- except Exception as e:
- logger.exception("[apply] 发送执行汇报异常: %s", e)
- def apply_pending_records(records: list[dict]) -> dict:
- """Phase 3:对 records 里 action=approve 的项调腾讯 POST 创意,返回 summary。
- Args:
- records: pending records,每个含 action 字段(approve/reject/hold/skip)
- 以及 _request_body / account_id / adgroup_id / 等
- Returns:
- summary dict:
- {
- "run_started", "run_finished",
- "total": {records, approved, posted_ok, posted_failed},
- "records": [...] # 每条含 action + dynamic_creative_id(成功时) + error(失败时)
- }
- """
- run_started = datetime.now(timezone.utc).isoformat()
- posted_ok = 0
- posted_failed = 0
- approved_total = 0
- out_records = []
- for r in records:
- rec = dict(r)
- action = (rec.get("action") or "skip").lower()
- rec["action"] = action
- if action != "approve":
- out_records.append(rec)
- continue
- approved_total += 1
- body = rec.get("_request_body")
- if not body:
- rec["error"] = "missing _request_body"
- posted_failed += 1
- out_records.append(rec)
- continue
- cid = post_creative_with_prepared_body(
- account_id=int(rec["account_id"]),
- body=body,
- skip_if_exists=True,
- )
- if cid:
- rec["dynamic_creative_id"] = str(cid)
- posted_ok += 1
- else:
- rec["error"] = "post_failed"
- posted_failed += 1
- out_records.append(rec)
- run_finished = datetime.now(timezone.utc).isoformat()
- return {
- "run_started": run_started,
- "run_finished": run_finished,
- "total": {
- "records": len(records),
- "approved": approved_total,
- "posted_ok": posted_ok,
- "posted_failed": posted_failed,
- },
- "records": out_records,
- }
- def _strip_body_for_json(records: list[dict]) -> list[dict]:
- """删 _request_body 中可能含 jump_spec 的长字段,JSON 落盘体积更小。
- 保留 body 顶层字段(adgroup_id/name/account_id)作为追溯。"""
- out = []
- for r in records:
- rec = dict(r)
- body = rec.pop("_request_body", None)
- if body:
- rec["_body_summary"] = {
- "account_id": body.get("account_id"),
- "adgroup_id": body.get("adgroup_id"),
- "dynamic_creative_name": body.get("dynamic_creative_name"),
- }
- out.append(rec)
- return out
- def write_summary(summary: dict, output_dir: Path) -> Path:
- output_dir.mkdir(parents=True, exist_ok=True)
- date_str = datetime.now(timezone.utc).strftime("%Y%m%d")
- ts = datetime.now(timezone.utc).strftime("%H%M%S")
- out_path = output_dir / f"creation_run_{date_str}_{ts}.json"
- persisted = dict(summary)
- persisted["records"] = _strip_body_for_json(summary["records"])
- with open(out_path, "w", encoding="utf-8") as f:
- json.dump(persisted, f, ensure_ascii=False, indent=2)
- return out_path
- def main() -> int:
- """独立模式:从 JSON 文件读 pending records → apply → 写 summary + 发飞书。"""
- logging.basicConfig(
- level=logging.INFO,
- format="%(asctime)s | %(levelname)s | %(name)s | %(message)s",
- datefmt="%H:%M:%S",
- )
- for noisy in ("httpx", "httpcore", "config", "db.config"):
- logging.getLogger(noisy).setLevel(logging.WARNING)
- # SLS 上报(2026-06-11 接入)— 配置缺失自动降级
- try:
- from tools.sls_setup import attach_sls_handler
- attach_sls_handler()
- except Exception as e:
- logger.warning("[sls] 挂载异常(降级为本地 only):%s", e)
- if len(sys.argv) < 2:
- logger.error("用法: python execute_creation_apply.py <pending_records.json>")
- return 1
- pending_path = Path(sys.argv[1])
- if not pending_path.exists():
- logger.error(f"文件不存在: {pending_path}")
- return 1
- with open(pending_path, encoding="utf-8") as f:
- records = json.load(f)
- logger.info(f"读到 {len(records)} 条 pending records,开始 Phase 3 执行")
- summary = apply_pending_records(records)
- out_path = write_summary(summary, _HERE / "outputs" / "data")
- _send_apply_summary_to_feishu(summary)
- t = summary["total"]
- logger.info(
- f"Phase 3 完成: approve={t['approved']} ok={t['posted_ok']} fail={t['posted_failed']}"
- )
- logger.info(f"summary: {out_path}")
- return 0 if t["posted_failed"] == 0 else 1
- if __name__ == "__main__":
- sys.exit(main())
|