im_approval.py 36 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819
  1. """
  2. IM 审批流 — auto_put_ad_mini(飞书直连版)
  3. 职责:
  4. - 通过飞书 API 发送审批消息给运营(文本摘要 + Excel 附件)
  5. - 轮询飞书聊天获取审批回复
  6. - 支持阻塞式等待(工具内部 sleep+poll,不消耗 Agent iteration)
  7. 飞书能力(通过框架 FeishuClient):
  8. - 发文本消息:send_message(to, text)
  9. - 发文件附件:send_file(to, file, file_name)
  10. - 读消息列表:get_message_list(chat_id, start_time, page_size)
  11. """
  12. import asyncio
  13. import json
  14. import logging
  15. import sys
  16. import time
  17. import uuid
  18. from datetime import datetime, timedelta
  19. from pathlib import Path
  20. from typing import Dict, List, Optional
  21. import pandas as pd
  22. from agent.tools import tool
  23. from agent.tools.models import ToolContext, ToolResult
  24. from agent.tools.builtin.feishu.feishu_client import FeishuClient
  25. _MINI_DIR = Path(__file__).resolve().parent.parent
  26. _TOOLS_DIR = Path(__file__).resolve().parent
  27. if str(_MINI_DIR) not in sys.path:
  28. sys.path.insert(0, str(_MINI_DIR))
  29. if str(_TOOLS_DIR) not in sys.path:
  30. sys.path.insert(0, str(_TOOLS_DIR))
  31. from config import (
  32. IM_ENABLED,
  33. IM_APPROVAL_TIMEOUT_MINUTES,
  34. IM_APPROVAL_POLL_INTERVAL_SECONDS,
  35. FEISHU_APP_ID,
  36. FEISHU_APP_SECRET,
  37. FEISHU_OPERATOR_OPEN_ID,
  38. FEISHU_OPERATOR_CHAT_ID,
  39. FEISHU_AD_PROJECT_CHAT_ID, # 新增:投放项目群聊
  40. )
  41. logger = logging.getLogger(__name__)
  42. # 审批请求状态
  43. _approval_requests: Dict[str, Dict] = {}
  44. # 全局客户端实例(框架 FeishuClient,自动管理 token)
  45. _feishu = FeishuClient(app_id=FEISHU_APP_ID, app_secret=FEISHU_APP_SECRET)
  46. # ═══════════════════════════════════════════
  47. # 审批 Excel 生成
  48. # ═══════════════════════════════════════════
  49. # 审批表精选列(运营审阅所需的关键指标)
  50. # 列顺序:日期 → 账户ID → 广告ID → 广告消耗 → 决策动作 → 其他关键信息(简洁版)
  51. APPROVAL_COLUMNS = [
  52. # 核心标识(前5列,含决策动作)
  53. "approval_date", "account_id", "ad_id", "cost_7d_avg", "action",
  54. # 基础信息
  55. "ad_name", "audience_tier", "ad_age_days", "bid_amount",
  56. # 关键指标(使用实际列名)
  57. "动态ROI_7日均值", "cost_7d_total", "revenue_7d_total",
  58. # 决策详情
  59. "dimension", "reason",
  60. "recommended_change_pct",
  61. ]
  62. def _generate_approval_xlsx(df_tier2_3: pd.DataFrame, request_id: str) -> Path:
  63. """生成审批专用 Excel(仅需审批的 Tier 2/3 广告)
  64. 复用 report_generator._write_xlsx_with_format 生成带颜色标记的 Excel。
  65. 文件保存到 outputs/approvals/{request_id}.xlsx。
  66. """
  67. from report_generator import _write_xlsx_with_format
  68. approval_dir = _MINI_DIR / "outputs" / "approvals"
  69. approval_dir.mkdir(parents=True, exist_ok=True)
  70. xlsx_path = approval_dir / f"{request_id}.xlsx"
  71. # 添加审批日期列(当前日期)
  72. df_tier2_3 = df_tier2_3.copy()
  73. df_tier2_3["approval_date"] = datetime.now().strftime("%Y-%m-%d")
  74. # 精选列(仅保留 df 中存在的列)
  75. cols = [c for c in APPROVAL_COLUMNS if c in df_tier2_3.columns]
  76. df_out = df_tier2_3[cols].copy()
  77. # 排序:7日消耗<10元的放最后,有消耗的在前,同组内按消耗降序
  78. if "cost_7d_avg" in df_out.columns:
  79. df_out["_has_spend"] = (df_out["cost_7d_avg"] >= 10.0).astype(int) # >=10元算有消耗
  80. df_out = df_out.sort_values(
  81. ["_has_spend", "cost_7d_avg"],
  82. ascending=[False, False] # 有消耗在前(1在前),消耗高的在前
  83. )
  84. df_out.drop(columns=["_has_spend"], inplace=True)
  85. _write_xlsx_with_format(df_out, xlsx_path)
  86. return xlsx_path
  87. # ═══════════════════════════════════════════
  88. # 格式化审批消息
  89. # ═══════════════════════════════════════════
  90. def _format_project_notification_message(df_tier2: pd.DataFrame, df_tier1: pd.DataFrame, request_id: str) -> str:
  91. """格式化投放项目群聊通知消息(仅通知,不需要审批)"""
  92. lines = [
  93. "📊 广告调控决策通知",
  94. f"请求ID: {request_id}",
  95. f"时间: {datetime.now().strftime('%Y-%m-%d %H:%M')}",
  96. "",
  97. ]
  98. # 需审批操作统计
  99. if not df_tier2.empty:
  100. total_count = len(df_tier2)
  101. lines.append(f"🔶 待审批操作({total_count} 个):")
  102. lines.append("-" * 40)
  103. # 统计各操作类型
  104. action_counts = df_tier2.get("final_action", df_tier2.get("action", "")).value_counts().to_dict()
  105. for action, count in action_counts.items():
  106. if action == "pause":
  107. lines.append(f" ⏸️ 暂停: {count} 个")
  108. elif action == "bid_down":
  109. lines.append(f" ⬇️ 降价: {count} 个")
  110. elif action == "bid_up":
  111. lines.append(f" ⬆️ 提价: {count} 个")
  112. else:
  113. lines.append(f" {action}: {count} 个")
  114. lines.append("")
  115. # 简化展示(只显示前3个)
  116. lines.append("前 3 个示例:")
  117. for i, (_, row) in enumerate(df_tier2.head(3).iterrows()):
  118. ad_id = row.get("ad_id", "")
  119. action = row.get("final_action", row.get("action", ""))
  120. ad_name = str(row.get("ad_name", ""))[:20]
  121. cost_avg = row.get("cost_7d_avg", 0)
  122. roi = row.get("动态ROI_7日均值", 0)
  123. if action == "pause":
  124. action_label = "⏸️ 暂停"
  125. elif action == "bid_down":
  126. pct = row.get("recommended_change_pct", 0)
  127. if isinstance(pct, str):
  128. try:
  129. pct = float(pct)
  130. except ValueError:
  131. pct = 0
  132. action_label = f"⬇️ 降价{abs(pct)*100:.0f}%"
  133. elif action == "bid_up":
  134. pct = row.get("recommended_change_pct", 0)
  135. if isinstance(pct, str):
  136. try:
  137. pct = float(pct)
  138. except ValueError:
  139. pct = 0
  140. action_label = f"⬆️ 提价{pct*100:.0f}%"
  141. else:
  142. action_label = action
  143. lines.append(f" [{ad_id}] {ad_name}")
  144. lines.append(f" 操作: {action_label} | 日均消耗: {cost_avg:.0f}元 | ROI: {roi:.2f}")
  145. lines.append("")
  146. if total_count > 3:
  147. lines.append(f" ...还有 {total_count - 3} 个(查看在线表格)")
  148. lines.append("")
  149. # 无需操作 + 自动执行
  150. tier0_count = len(df_tier0) if not df_tier0.empty else 0
  151. tier1_count = len(df_tier1) if not df_tier1.empty else 0
  152. if tier0_count > 0 or tier1_count > 0:
  153. status_parts = []
  154. if tier0_count > 0:
  155. status_parts.append(f"{tier0_count}个无需操作(observe/hold)")
  156. if tier1_count > 0:
  157. status_parts.append(f"{tier1_count}个自动执行(小幅调价)")
  158. lines.append(f"ℹ️ {' + '.join(status_parts)}")
  159. lines.append("")
  160. lines.extend([
  161. "-" * 40,
  162. "ℹ️ 说明:",
  163. " • 此消息为智能决策结果通知",
  164. " • 运营审批通过后才会实际执行",
  165. " • 详情请查看在线表格(自动发送)",
  166. ])
  167. return "\n".join(lines)
  168. def _format_approval_message(df_tier2: pd.DataFrame, df_tier1: pd.DataFrame, df_tier0: pd.DataFrame, request_id: str) -> str:
  169. lines = [
  170. "📊 广告调控审批请求",
  171. f"请求ID: {request_id}",
  172. f"时间: {datetime.now().strftime('%Y-%m-%d %H:%M')}",
  173. "",
  174. ]
  175. # Tier 2/3: 需审批
  176. if not df_tier2.empty:
  177. total_count = len(df_tier2)
  178. lines.append(f"🔶 需审批操作({total_count} 个):")
  179. lines.append("-" * 40)
  180. # 统计各操作类型
  181. action_counts = df_tier2.get("final_action", df_tier2.get("action", "")).value_counts().to_dict()
  182. for action, count in action_counts.items():
  183. if action == "pause":
  184. lines.append(f" ⏸️ 暂停: {count} 个")
  185. elif action == "bid_down":
  186. lines.append(f" ⬇️ 降价: {count} 个")
  187. elif action == "bid_up":
  188. lines.append(f" ⬆️ 提价: {count} 个")
  189. else:
  190. lines.append(f" {action}: {count} 个")
  191. lines.append("")
  192. # 如果数量过多(>20),只显示摘要统计,不逐条列出
  193. if total_count > 20:
  194. lines.append(f"⚠️ 广告数量较多({total_count} 个),详情请查看在线表格")
  195. # 只展示前 5 个示例
  196. lines.append("")
  197. lines.append("前 5 个示例:")
  198. for i, (_, row) in enumerate(df_tier2.head(5).iterrows()):
  199. ad_id = row.get("ad_id", "")
  200. action = row.get("final_action", row.get("action", ""))
  201. ad_name = str(row.get("ad_name", ""))[:20]
  202. lines.append(f" [{ad_id}] {ad_name} → {action}")
  203. lines.append(" ...")
  204. else:
  205. # 数量较少,逐条列出
  206. for _, row in df_tier2.iterrows():
  207. ad_id = row.get("ad_id", "")
  208. action = row.get("final_action", row.get("action", ""))
  209. ad_name = str(row.get("ad_name", ""))[:20]
  210. reason = str(row.get("reason", ""))[:60]
  211. cost_avg = row.get("cost_7d_avg", 0)
  212. if action == "pause":
  213. action_label = "⏸️ 暂停"
  214. elif action == "bid_down":
  215. pct = row.get("recommended_change_pct", 0)
  216. if isinstance(pct, str):
  217. try:
  218. pct = float(pct)
  219. except ValueError:
  220. pct = 0
  221. action_label = f"⬇️ 降价{abs(pct)*100:.0f}%"
  222. elif action == "bid_up":
  223. pct = row.get("recommended_change_pct", 0)
  224. if isinstance(pct, str):
  225. try:
  226. pct = float(pct)
  227. except ValueError:
  228. pct = 0
  229. action_label = f"⬆️ 提价{pct*100:.0f}%"
  230. else:
  231. action_label = action
  232. lines.append(f" [{ad_id}] {ad_name}")
  233. lines.append(f" 操作: {action_label} | 日均消耗: {cost_avg:.0f}元")
  234. lines.append(f" 原因: {reason}")
  235. lines.append("")
  236. # Tier 0: 无需操作(observe/hold/creative_adjust)
  237. if not df_tier0.empty:
  238. lines.append(f"ℹ️ 无需操作({len(df_tier0)} 个,仅通知):")
  239. for _, row in df_tier0.iterrows():
  240. ad_id = row.get("ad_id", "")
  241. action = row.get("final_action", row.get("action", ""))
  242. action_label = {
  243. "observe": "观察等待",
  244. "hold": "保持不变",
  245. "creative_adjust": "需人工调整素材",
  246. "scale_up": "建议扩量(新增广告/创意)"
  247. }.get(action, action)
  248. lines.append(f" [{ad_id}] {action_label}")
  249. lines.append("")
  250. # Tier 1: 小幅调价(自动执行)
  251. if not df_tier1.empty:
  252. lines.append(f"✅ 自动执行({len(df_tier1)} 个小幅调价):")
  253. for _, row in df_tier1.iterrows():
  254. ad_id = row.get("ad_id", "")
  255. action = row.get("final_action", row.get("action", ""))
  256. change_pct = row.get("recommended_change_pct", 0)
  257. lines.append(f" [{ad_id}] {action} {change_pct:+.1%}")
  258. lines.append("")
  259. # 回复指令
  260. lines.extend([
  261. "-" * 40,
  262. "📝 直接回复即可,示例:",
  263. " \"批准\" / \"通过\" — 全部批准",
  264. " \"拒绝\" / \"不行\" — 全部拒绝",
  265. " \"广告 12345 不要暂停\" — 修改指定广告",
  266. " \"只批准降价的\" — 部分批准",
  267. " \"降幅改小一点\" — 调整后重新审批",
  268. f" ⏰ 超时时间: {IM_APPROVAL_TIMEOUT_MINUTES} 分钟",
  269. "",
  270. "📎 决策详情请查看在线表格(自动发送链接)",
  271. ])
  272. return "\n".join(lines)
  273. # ═══════════════════════════════════════════
  274. # 解析审批回复
  275. # ═══════════════════════════════════════════
  276. def _parse_approval_reply(content: str, all_ad_ids: List[int]) -> Dict:
  277. """将运营的自然语言回复原样返回给 Agent 解读。
  278. 不做硬解析 — 运营可能用任何自然语言表达审批意见(中文、英文、混合),
  279. 由 Agent(LLM)负责理解语义并决定后续动作。
  280. """
  281. content = content.strip()
  282. if not content:
  283. return {"status": "unknown", "raw_reply": ""}
  284. return {
  285. "status": "replied",
  286. "raw_reply": content,
  287. "ad_ids": all_ad_ids,
  288. }
  289. # ═══════════════════════════════════════════
  290. # 工具:发送审批请求(飞书版)
  291. # ═══════════════════════════════════════════
  292. @tool(description="通过飞书发送决策摘要给运营,收集审批结果(支持阻塞等待)")
  293. async def send_approval_request(
  294. ctx: ToolContext,
  295. validated_csv: str = "",
  296. timeout_minutes: int = IM_APPROVAL_TIMEOUT_MINUTES,
  297. wait_for_reply: bool = True,
  298. poll_interval_seconds: int = IM_APPROVAL_POLL_INTERVAL_SECONDS,
  299. ) -> ToolResult:
  300. """
  301. 发送结构化审批请求到飞书。
  302. 流程:
  303. 1. 读取验证后的决策 CSV,分级(Tier 1/2/3)
  304. 2. 生成审批 Excel(仅 Tier 2/3 广告 + 关键指标列)
  305. 3. 发送飞书消息 x2:文本摘要 + Excel 文件附件
  306. 4. 阻塞轮询等待回复(30s 间隔 x 最长 30 分钟)
  307. 5. 解析 approve/reject -> 返回结果
  308. Args:
  309. validated_csv: 护栏验证后的 CSV 路径
  310. timeout_minutes: 审批超时时间(分钟)
  311. wait_for_reply: 是否阻塞等待回复
  312. poll_interval_seconds: 轮询间隔(秒)
  313. """
  314. try:
  315. if not IM_ENABLED:
  316. return ToolResult(
  317. title="IM 审批未启用",
  318. output="IM_ENABLED=False,跳过飞书审批。需审批的操作已记录到执行日志。",
  319. )
  320. if not FEISHU_OPERATOR_CHAT_ID:
  321. return ToolResult(
  322. title="飞书配置缺失",
  323. output="FEISHU_OPERATOR_CHAT_ID 未配置,无法发送审批请求。",
  324. )
  325. # 查找验证 CSV
  326. if not validated_csv:
  327. reports_dir = _MINI_DIR / "outputs" / "reports"
  328. candidates = sorted(reports_dir.glob("validated_decisions_*.csv"), reverse=True)
  329. if not candidates:
  330. return ToolResult(title="send_approval_request", output="未找到验证后的决策 CSV")
  331. validated_csv = str(candidates[0])
  332. df = pd.read_csv(validated_csv)
  333. if df.empty:
  334. return ToolResult(title="send_approval_request", output="决策数据为空")
  335. # ⚠️ 关键:补充 metrics 数据(通过 ad_id 关联)
  336. metrics_path = _MINI_DIR / "outputs" / "metrics_temp.csv"
  337. if not metrics_path.exists():
  338. # 尝试查找最新的 metrics 文件
  339. reports_dir = _MINI_DIR / "outputs"
  340. candidates = sorted(reports_dir.glob("metrics_*.csv"), reverse=True)
  341. if candidates:
  342. metrics_path = candidates[0]
  343. if metrics_path.exists():
  344. df_metrics = pd.read_csv(metrics_path)
  345. # 选择需要的列(避免重复列,使用实际列名)
  346. metrics_cols = [
  347. "ad_id", "account_id", "ad_name",
  348. "cost_7d_avg", "cost_7d_total", "revenue_7d_total",
  349. "动态ROI_7日均值", "bid_amount"
  350. ]
  351. # 只保留存在的列
  352. metrics_cols = [c for c in metrics_cols if c in df_metrics.columns]
  353. df_metrics_sub = df_metrics[metrics_cols].copy()
  354. # 从 ad_name 中提取 audience_tier(如 "R500_xxx" → "R500")
  355. if "ad_name" in df_metrics_sub.columns:
  356. df_metrics_sub["audience_tier"] = df_metrics_sub["ad_name"].str.extract(r"^(R\d+)")[0]
  357. # 左连接:保留 df 的所有行,补充 metrics 数据
  358. df = df.merge(df_metrics_sub, on="ad_id", how="left", suffixes=("", "_metrics"))
  359. logger.info(f"已从 metrics 补充 {len(metrics_cols)} 列数据")
  360. else:
  361. logger.warning("未找到 metrics 文件,审批表格将缺少关键字段")
  362. # 过滤已暂停的广告(不应出现在审批表中)
  363. if "configured_status" in df.columns:
  364. before_count = len(df)
  365. df = df[df["configured_status"] != "AD_STATUS_SUSPEND"].copy()
  366. filtered_count = before_count - len(df)
  367. if filtered_count > 0:
  368. logger.info(f"审批请求过滤掉 {filtered_count} 个已暂停广告")
  369. if df.empty:
  370. return ToolResult(title="send_approval_request", output="过滤后无数据")
  371. # 分级(包含hold记录,用于参考)
  372. from execution_engine import _classify_tier
  373. df["tier"] = df.apply(_classify_tier, axis=1)
  374. # 按tier分类
  375. df_tier0 = df[df["tier"] == 0].copy() # observe, hold, creative_adjust(无需操作)
  376. df_tier1 = df[df["tier"] == 1].copy() # 小幅调价(自动执行)
  377. df_tier2_3 = df[df["tier"] >= 2].copy() # 暂停、大幅调价(需审批)
  378. if df.empty:
  379. return ToolResult(title="send_approval_request", output="无决策数据")
  380. # 合并需审批的和无需操作的(供运营参考)
  381. df_for_review = pd.concat([df_tier2_3, df_tier0], ignore_index=True) if not df_tier0.empty else df_tier2_3
  382. if df_tier2_3.empty:
  383. total_no_op = len(df_tier0) + len(df_tier1)
  384. return ToolResult(
  385. title="无需审批",
  386. output=f"共 {total_no_op} 个决策:{len(df_tier0)}个无需操作(observe/hold)+ {len(df_tier1)}个自动执行(小幅调价),无需审批",
  387. )
  388. # 生成审批请求
  389. request_id = f"req_{datetime.now().strftime('%Y%m%d_%H%M%S')}_{uuid.uuid4().hex[:6]}"
  390. message = _format_approval_message(df_tier2_3, df_tier1, df_tier0, request_id)
  391. # 保存请求状态
  392. tier2_ad_ids = df_tier2_3["ad_id"].astype(int).tolist()
  393. _approval_requests[request_id] = {
  394. "status": "pending",
  395. "created_at": datetime.now().isoformat(),
  396. "timeout_at": (datetime.now() + timedelta(minutes=timeout_minutes)).isoformat(),
  397. "ad_ids": tier2_ad_ids,
  398. "validated_csv": validated_csv,
  399. }
  400. # ─── 通过飞书 API 发送审批消息(文本 + Excel) ───
  401. feishu_sent = False
  402. feishu_sent_to_project_chat = False
  403. sent_time_sec = str(int(time.time())) # 飞书 API start_time 单位:秒
  404. try:
  405. # 消息 1a:发送到个人(FEISHU_OPERATOR_OPEN_ID)
  406. if FEISHU_OPERATOR_OPEN_ID:
  407. try:
  408. result_personal = _feishu.send_message(to=FEISHU_OPERATOR_OPEN_ID, text=message)
  409. logger.info("飞书审批消息发送成功(个人): message_id=%s", result_personal.message_id)
  410. except Exception as e:
  411. logger.warning("发送到个人失败: %s", e)
  412. # 消息 1b:发送到投放项目群聊(如果配置了)— 临时禁用
  413. # if FEISHU_AD_PROJECT_CHAT_ID:
  414. # try:
  415. # result_project = _feishu.send_message(to=FEISHU_AD_PROJECT_CHAT_ID, text=message)
  416. # feishu_sent_to_project_chat = True
  417. # feishu_sent = True
  418. # logger.info("飞书审批消息发送成功(项目群): message_id=%s", result_project.message_id)
  419. # except Exception as e:
  420. # logger.warning("发送到项目群聊失败: %s", e)
  421. # 消息 2:导入为飞书在线表格(决策详情,含hold参考)
  422. try:
  423. xlsx_path = _generate_approval_xlsx(df_for_review, request_id)
  424. # 导入飞书在线表格并发送链接(项目群)— 临时禁用
  425. from feishu_doc import import_to_feishu
  426. # 发送到项目群 — 临时禁用
  427. # if FEISHU_AD_PROJECT_CHAT_ID:
  428. # import_result = await import_to_feishu(
  429. # ctx=ctx,
  430. # xlsx_path=str(xlsx_path),
  431. # send_im=True,
  432. # chat_id=FEISHU_AD_PROJECT_CHAT_ID
  433. # )
  434. #
  435. # if import_result.metadata and import_result.metadata.get("url"):
  436. # sheet_url = import_result.metadata["url"]
  437. # logger.info("飞书审批表格导入成功(项目群): %s", sheet_url)
  438. # else:
  439. # logger.warning("飞书在线表格导入失败(项目群),回退到文件附件模式")
  440. # # 回退:发送文件附件(项目群)
  441. # file_result = _feishu.send_file(
  442. # to=FEISHU_AD_PROJECT_CHAT_ID,
  443. # file=str(xlsx_path),
  444. # file_name=f"审批决策表_{request_id}.xlsx",
  445. # )
  446. # logger.info("飞书审批 Excel(文件)发送成功(项目群): message_id=%s", file_result.message_id)
  447. # 发送到个人
  448. if FEISHU_OPERATOR_OPEN_ID:
  449. try:
  450. personal_import_result = await import_to_feishu(
  451. ctx=ctx,
  452. xlsx_path=str(xlsx_path),
  453. send_im=True,
  454. chat_id=FEISHU_OPERATOR_OPEN_ID
  455. )
  456. if personal_import_result.metadata and personal_import_result.metadata.get("url"):
  457. logger.info("飞书审批表格发送成功(个人): %s", personal_import_result.metadata["url"])
  458. else:
  459. # 回退:发送文件附件(个人)
  460. file_result_personal = _feishu.send_file(
  461. to=FEISHU_OPERATOR_OPEN_ID,
  462. file=str(xlsx_path),
  463. file_name=f"决策表_{request_id}.xlsx",
  464. )
  465. logger.info("飞书决策 Excel(文件)发送成功(个人): message_id=%s", file_result_personal.message_id)
  466. except Exception as e:
  467. logger.warning("发送表格到个人失败: %s", e)
  468. except Exception as e:
  469. logger.warning("飞书在线表格导入失败(不影响审批流程): %s", e)
  470. except Exception as e:
  471. logger.warning("飞书发消息失败: %s", e)
  472. # 保存审批消息到文件(备份)
  473. approval_dir = _MINI_DIR / "outputs" / "approvals"
  474. approval_dir.mkdir(parents=True, exist_ok=True)
  475. msg_path = approval_dir / f"{request_id}.txt"
  476. msg_path.write_text(message, encoding="utf-8")
  477. # ─── 非阻塞模式 ───
  478. if not wait_for_reply:
  479. return ToolResult(
  480. title=f"审批请求已发送({len(tier2_ad_ids)}个待审批)",
  481. output=(
  482. f"审批请求 {request_id} 已{'通过飞书发送' if feishu_sent else '保存到文件(飞书发送失败)'}\n"
  483. f" 待审批: {len(tier2_ad_ids)} 个广告\n"
  484. f" 无需操作: {len(df_tier0)} 个广告(observe/hold)\n"
  485. f" 自动执行: {len(df_tier1)} 个广告(小幅调价)\n"
  486. f" 超时时间: {timeout_minutes} 分钟\n"
  487. f" 消息备份: {msg_path}\n\n"
  488. f"使用 check_approval_status(request_id='{request_id}') 检查审批结果"
  489. ),
  490. metadata={
  491. "request_id": request_id,
  492. "pending_count": len(tier2_ad_ids),
  493. "tier0_count": len(df_tier0),
  494. "tier1_count": len(df_tier1),
  495. "feishu_sent": feishu_sent,
  496. "msg_path": str(msg_path),
  497. },
  498. )
  499. # ═══ 阻塞轮询等待飞书回复 ═══
  500. timeout_at = datetime.now() + timedelta(minutes=timeout_minutes)
  501. logger.info(
  502. "阻塞等待飞书审批回复(超时 %d 分钟,间隔 %d 秒)...",
  503. timeout_minutes,
  504. poll_interval_seconds,
  505. )
  506. poll_count = 0
  507. while datetime.now() < timeout_at:
  508. await asyncio.sleep(poll_interval_seconds)
  509. poll_count += 1
  510. remaining = (timeout_at - datetime.now()).total_seconds() / 60
  511. if poll_count % 10 == 0:
  512. logger.info("飞书审批轮询 #%d,剩余 %.1f 分钟", poll_count, remaining)
  513. # 读取个人和项目群的审批回复
  514. try:
  515. # ✅ 修改:监听个人私聊和项目群聊的消息 — 临时只监听个人
  516. chat_ids_to_check = []
  517. if FEISHU_OPERATOR_OPEN_ID:
  518. chat_ids_to_check.append(FEISHU_OPERATOR_OPEN_ID)
  519. # 临时禁用项目群聊监听
  520. # if FEISHU_AD_PROJECT_CHAT_ID:
  521. # chat_ids_to_check.append(FEISHU_AD_PROJECT_CHAT_ID)
  522. for chat_id in chat_ids_to_check:
  523. result = _feishu.get_message_list(
  524. chat_id=chat_id,
  525. start_time=sent_time_sec,
  526. page_size=10,
  527. )
  528. if result and result.get("items"):
  529. for msg in result["items"]:
  530. sender_id = msg.get("sender_id", "")
  531. sender_type = msg.get("sender_type", "")
  532. logger.debug(
  533. "飞书消息 [%s]: sender_type=%s, sender_id=%s, content=%s",
  534. chat_id, sender_type, sender_id, str(msg.get("content", ""))[:100],
  535. )
  536. # ✅ 修改:接受任何用户的回复(不限制特定个人)
  537. if sender_type != "user":
  538. continue
  539. # 框架已自动解析 text 消息的 JSON -> 纯文本
  540. text = msg.get("content", "")
  541. if not text.strip():
  542. continue
  543. # 检测到运营回复,返回原文给 Agent 理解
  544. parsed = _parse_approval_reply(text, tier2_ad_ids)
  545. if parsed["status"] != "unknown":
  546. _approval_requests[request_id].update({
  547. "status": "replied",
  548. "reply_content": text,
  549. "reply_at": datetime.now().isoformat(),
  550. "ad_ids": tier2_ad_ids,
  551. })
  552. logger.info("飞书审批收到运营回复: %s", text[:200])
  553. ad_ids_str = ", ".join(str(x) for x in tier2_ad_ids[:10])
  554. if len(tier2_ad_ids) > 10:
  555. ad_ids_str += f"...共{len(tier2_ad_ids)}个"
  556. return ToolResult(
  557. title="运营已回复",
  558. output=(
  559. f"运营飞书回复原文: {text}\n"
  560. f"等待审批的广告ID: {ad_ids_str}\n"
  561. f"等待时间: {poll_count * poll_interval_seconds} 秒\n\n"
  562. f"请根据运营的自然语言回复判断后续操作:\n"
  563. f"- 运营同意/批准/通过 → 调用 execute_decisions 执行\n"
  564. f"- 运营拒绝/驳回 → 停止执行,告知原因\n"
  565. f"- 运营要求修改(如\"广告X不要暂停\")→ 进入 Mode 3: modify_decisions → validate → 重新审批\n"
  566. f"- 运营部分批准(如\"只批准降价的\")→ 相应过滤后 execute_decisions"
  567. ),
  568. metadata={
  569. "request_id": request_id,
  570. "feishu_sent": feishu_sent,
  571. "msg_path": str(msg_path),
  572. "poll_count": poll_count,
  573. "raw_reply": text,
  574. "ad_ids": tier2_ad_ids,
  575. },
  576. )
  577. except Exception as e:
  578. logger.debug("飞书读消息失败(将重试): %s", e)
  579. # ─── 超时 ───
  580. _approval_requests[request_id]["status"] = "timeout"
  581. logger.warning("审批请求 %s 超时(%d 分钟)", request_id, timeout_minutes)
  582. return ToolResult(
  583. title="审批超时",
  584. output=(
  585. f"请求 {request_id} 超时({timeout_minutes}分钟),需审批的 {len(tier2_ad_ids)} 个操作未执行\n"
  586. f"轮询次数: {poll_count}\n"
  587. f"消息备份: {msg_path}"
  588. ),
  589. metadata={
  590. "request_id": request_id,
  591. "status": "timeout",
  592. "pending_ad_ids": tier2_ad_ids,
  593. "feishu_sent": feishu_sent,
  594. "msg_path": str(msg_path),
  595. "poll_count": poll_count,
  596. },
  597. )
  598. except Exception as e:
  599. logger.error("send_approval_request 失败: %s", e, exc_info=True)
  600. return ToolResult(title="send_approval_request 失败", output=str(e))
  601. # ═══════════════════════════════════════════
  602. # 工具:检查审批状态
  603. # ═══════════════════════════════════════════
  604. @tool(description="检查运营飞书审批结果")
  605. async def check_approval_status(
  606. ctx: ToolContext,
  607. request_id: str,
  608. ) -> ToolResult:
  609. """
  610. 检查审批请求的状态。通过飞书 API 读取最新消息。
  611. Args:
  612. request_id: 审批请求 ID(send_approval_request 返回)
  613. """
  614. try:
  615. request = _approval_requests.get(request_id)
  616. if not request:
  617. return ToolResult(
  618. title="审批请求不存在",
  619. output=f"未找到请求 {request_id}。可能已过期或重启后丢失。",
  620. )
  621. # 检查超时
  622. timeout_at = datetime.fromisoformat(request["timeout_at"])
  623. if datetime.now() > timeout_at:
  624. request["status"] = "timeout"
  625. return ToolResult(
  626. title="审批已超时",
  627. output=f"请求 {request_id} 已超时({IM_APPROVAL_TIMEOUT_MINUTES}分钟)",
  628. metadata={"status": "timeout", "request_id": request_id},
  629. )
  630. if request["status"] != "pending":
  631. return ToolResult(
  632. title=f"审批状态: {request['status']}",
  633. output=json.dumps(request, ensure_ascii=False, indent=2),
  634. metadata=request,
  635. )
  636. # 通过框架 FeishuClient 读取审批消息之后的回复
  637. try:
  638. created_at_sec = str(int(
  639. datetime.fromisoformat(request["created_at"]).timestamp()
  640. ))
  641. # ✅ 修改:监听个人私聊和项目群聊的消息 — 临时只监听个人
  642. chat_ids_to_check = []
  643. if FEISHU_OPERATOR_OPEN_ID:
  644. chat_ids_to_check.append(FEISHU_OPERATOR_OPEN_ID)
  645. # 临时禁用项目群聊监听
  646. # if FEISHU_AD_PROJECT_CHAT_ID:
  647. # chat_ids_to_check.append(FEISHU_AD_PROJECT_CHAT_ID)
  648. for chat_id in chat_ids_to_check:
  649. result = _feishu.get_message_list(
  650. chat_id=chat_id,
  651. start_time=created_at_sec,
  652. page_size=10,
  653. )
  654. if result and result.get("items"):
  655. for msg in result["items"]:
  656. sender_id = msg.get("sender_id", "")
  657. sender_type = msg.get("sender_type", "")
  658. logger.debug(
  659. "飞书消息(check) [%s]: sender_type=%s, sender_id=%s, content=%s",
  660. chat_id, sender_type, sender_id, str(msg.get("content", ""))[:100],
  661. )
  662. # ✅ 修改:接受任何用户的回复(不限制特定个人)
  663. if sender_type != "user":
  664. continue
  665. text = msg.get("content", "")
  666. if not text.strip():
  667. continue
  668. # 检测到运营回复,返回原文给 Agent 理解
  669. parsed = _parse_approval_reply(text, request["ad_ids"])
  670. if parsed["status"] != "unknown":
  671. request.update({
  672. "status": "replied",
  673. "reply_content": text,
  674. "reply_at": datetime.now().isoformat(),
  675. })
  676. ad_ids = request["ad_ids"]
  677. ad_ids_str = ", ".join(str(x) for x in ad_ids[:10])
  678. if len(ad_ids) > 10:
  679. ad_ids_str += f"...共{len(ad_ids)}个"
  680. return ToolResult(
  681. title="运营已回复",
  682. output=(
  683. f"运营飞书回复原文: {text}\n"
  684. f"等待审批的广告ID: {ad_ids_str}\n\n"
  685. f"请根据运营的自然语言回复判断后续操作:\n"
  686. f"- 运营同意/批准/通过 → 调用 execute_decisions 执行\n"
  687. f"- 运营拒绝/驳回 → 停止执行,告知原因\n"
  688. f"- 运营要求修改 → 进入 Mode 3: modify_decisions → validate → 重新审批\n"
  689. f"- 运营部分批准 → 相应过滤后 execute_decisions"
  690. ),
  691. metadata={"request_id": request_id, "raw_reply": text, "ad_ids": ad_ids},
  692. )
  693. except Exception as e:
  694. logger.debug("飞书读消息失败: %s", e)
  695. remaining = (timeout_at - datetime.now()).total_seconds() / 60
  696. return ToolResult(
  697. title="等待审批中",
  698. output=f"请求 {request_id} 等待飞书审批中(剩余 {remaining:.0f} 分钟)",
  699. metadata={"status": "pending", "request_id": request_id, "remaining_minutes": round(remaining, 1)},
  700. )
  701. except Exception as e:
  702. logger.error("check_approval_status 失败: %s", e, exc_info=True)
  703. return ToolResult(title="check_approval_status 失败", output=str(e))