| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819 |
- """
- IM 审批流 — auto_put_ad_mini(飞书直连版)
- 职责:
- - 通过飞书 API 发送审批消息给运营(文本摘要 + Excel 附件)
- - 轮询飞书聊天获取审批回复
- - 支持阻塞式等待(工具内部 sleep+poll,不消耗 Agent iteration)
- 飞书能力(通过框架 FeishuClient):
- - 发文本消息:send_message(to, text)
- - 发文件附件:send_file(to, file, file_name)
- - 读消息列表:get_message_list(chat_id, start_time, page_size)
- """
- import asyncio
- import json
- import logging
- import sys
- import time
- import uuid
- from datetime import datetime, timedelta
- from pathlib import Path
- from typing import Dict, List, Optional
- import pandas as pd
- from agent.tools import tool
- from agent.tools.models import ToolContext, ToolResult
- from agent.tools.builtin.feishu.feishu_client import FeishuClient
- _MINI_DIR = Path(__file__).resolve().parent.parent
- _TOOLS_DIR = Path(__file__).resolve().parent
- if str(_MINI_DIR) not in sys.path:
- sys.path.insert(0, str(_MINI_DIR))
- if str(_TOOLS_DIR) not in sys.path:
- sys.path.insert(0, str(_TOOLS_DIR))
- from config import (
- IM_ENABLED,
- IM_APPROVAL_TIMEOUT_MINUTES,
- IM_APPROVAL_POLL_INTERVAL_SECONDS,
- FEISHU_APP_ID,
- FEISHU_APP_SECRET,
- FEISHU_OPERATOR_OPEN_ID,
- FEISHU_OPERATOR_CHAT_ID,
- FEISHU_AD_PROJECT_CHAT_ID, # 新增:投放项目群聊
- )
- logger = logging.getLogger(__name__)
- # 审批请求状态
- _approval_requests: Dict[str, Dict] = {}
- # 全局客户端实例(框架 FeishuClient,自动管理 token)
- _feishu = FeishuClient(app_id=FEISHU_APP_ID, app_secret=FEISHU_APP_SECRET)
- # ═══════════════════════════════════════════
- # 审批 Excel 生成
- # ═══════════════════════════════════════════
- # 审批表精选列(运营审阅所需的关键指标)
- # 列顺序:日期 → 账户ID → 广告ID → 广告消耗 → 决策动作 → 其他关键信息(简洁版)
- APPROVAL_COLUMNS = [
- # 核心标识(前5列,含决策动作)
- "approval_date", "account_id", "ad_id", "cost_7d_avg", "action",
- # 基础信息
- "ad_name", "audience_tier", "ad_age_days", "bid_amount",
- # 关键指标(使用实际列名)
- "动态ROI_7日均值", "cost_7d_total", "revenue_7d_total",
- # 决策详情
- "dimension", "reason",
- "recommended_change_pct",
- ]
- def _generate_approval_xlsx(df_tier2_3: pd.DataFrame, request_id: str) -> Path:
- """生成审批专用 Excel(仅需审批的 Tier 2/3 广告)
- 复用 report_generator._write_xlsx_with_format 生成带颜色标记的 Excel。
- 文件保存到 outputs/approvals/{request_id}.xlsx。
- """
- from report_generator import _write_xlsx_with_format
- approval_dir = _MINI_DIR / "outputs" / "approvals"
- approval_dir.mkdir(parents=True, exist_ok=True)
- xlsx_path = approval_dir / f"{request_id}.xlsx"
- # 添加审批日期列(当前日期)
- df_tier2_3 = df_tier2_3.copy()
- df_tier2_3["approval_date"] = datetime.now().strftime("%Y-%m-%d")
- # 精选列(仅保留 df 中存在的列)
- cols = [c for c in APPROVAL_COLUMNS if c in df_tier2_3.columns]
- df_out = df_tier2_3[cols].copy()
- # 排序:7日消耗<10元的放最后,有消耗的在前,同组内按消耗降序
- if "cost_7d_avg" in df_out.columns:
- df_out["_has_spend"] = (df_out["cost_7d_avg"] >= 10.0).astype(int) # >=10元算有消耗
- df_out = df_out.sort_values(
- ["_has_spend", "cost_7d_avg"],
- ascending=[False, False] # 有消耗在前(1在前),消耗高的在前
- )
- df_out.drop(columns=["_has_spend"], inplace=True)
- _write_xlsx_with_format(df_out, xlsx_path)
- return xlsx_path
- # ═══════════════════════════════════════════
- # 格式化审批消息
- # ═══════════════════════════════════════════
- def _format_project_notification_message(df_tier2: pd.DataFrame, df_tier1: pd.DataFrame, request_id: str) -> str:
- """格式化投放项目群聊通知消息(仅通知,不需要审批)"""
- lines = [
- "📊 广告调控决策通知",
- f"请求ID: {request_id}",
- f"时间: {datetime.now().strftime('%Y-%m-%d %H:%M')}",
- "",
- ]
- # 需审批操作统计
- if not df_tier2.empty:
- total_count = len(df_tier2)
- lines.append(f"🔶 待审批操作({total_count} 个):")
- lines.append("-" * 40)
- # 统计各操作类型
- action_counts = df_tier2.get("final_action", df_tier2.get("action", "")).value_counts().to_dict()
- for action, count in action_counts.items():
- if action == "pause":
- lines.append(f" ⏸️ 暂停: {count} 个")
- elif action == "bid_down":
- lines.append(f" ⬇️ 降价: {count} 个")
- elif action == "bid_up":
- lines.append(f" ⬆️ 提价: {count} 个")
- else:
- lines.append(f" {action}: {count} 个")
- lines.append("")
- # 简化展示(只显示前3个)
- lines.append("前 3 个示例:")
- for i, (_, row) in enumerate(df_tier2.head(3).iterrows()):
- ad_id = row.get("ad_id", "")
- action = row.get("final_action", row.get("action", ""))
- ad_name = str(row.get("ad_name", ""))[:20]
- cost_avg = row.get("cost_7d_avg", 0)
- roi = row.get("动态ROI_7日均值", 0)
- if action == "pause":
- action_label = "⏸️ 暂停"
- elif action == "bid_down":
- pct = row.get("recommended_change_pct", 0)
- if isinstance(pct, str):
- try:
- pct = float(pct)
- except ValueError:
- pct = 0
- action_label = f"⬇️ 降价{abs(pct)*100:.0f}%"
- elif action == "bid_up":
- pct = row.get("recommended_change_pct", 0)
- if isinstance(pct, str):
- try:
- pct = float(pct)
- except ValueError:
- pct = 0
- action_label = f"⬆️ 提价{pct*100:.0f}%"
- else:
- action_label = action
- lines.append(f" [{ad_id}] {ad_name}")
- lines.append(f" 操作: {action_label} | 日均消耗: {cost_avg:.0f}元 | ROI: {roi:.2f}")
- lines.append("")
- if total_count > 3:
- lines.append(f" ...还有 {total_count - 3} 个(查看在线表格)")
- lines.append("")
- # 无需操作 + 自动执行
- tier0_count = len(df_tier0) if not df_tier0.empty else 0
- tier1_count = len(df_tier1) if not df_tier1.empty else 0
- if tier0_count > 0 or tier1_count > 0:
- status_parts = []
- if tier0_count > 0:
- status_parts.append(f"{tier0_count}个无需操作(observe/hold)")
- if tier1_count > 0:
- status_parts.append(f"{tier1_count}个自动执行(小幅调价)")
- lines.append(f"ℹ️ {' + '.join(status_parts)}")
- lines.append("")
- lines.extend([
- "-" * 40,
- "ℹ️ 说明:",
- " • 此消息为智能决策结果通知",
- " • 运营审批通过后才会实际执行",
- " • 详情请查看在线表格(自动发送)",
- ])
- return "\n".join(lines)
- def _format_approval_message(df_tier2: pd.DataFrame, df_tier1: pd.DataFrame, df_tier0: pd.DataFrame, request_id: str) -> str:
- lines = [
- "📊 广告调控审批请求",
- f"请求ID: {request_id}",
- f"时间: {datetime.now().strftime('%Y-%m-%d %H:%M')}",
- "",
- ]
- # Tier 2/3: 需审批
- if not df_tier2.empty:
- total_count = len(df_tier2)
- lines.append(f"🔶 需审批操作({total_count} 个):")
- lines.append("-" * 40)
- # 统计各操作类型
- action_counts = df_tier2.get("final_action", df_tier2.get("action", "")).value_counts().to_dict()
- for action, count in action_counts.items():
- if action == "pause":
- lines.append(f" ⏸️ 暂停: {count} 个")
- elif action == "bid_down":
- lines.append(f" ⬇️ 降价: {count} 个")
- elif action == "bid_up":
- lines.append(f" ⬆️ 提价: {count} 个")
- else:
- lines.append(f" {action}: {count} 个")
- lines.append("")
- # 如果数量过多(>20),只显示摘要统计,不逐条列出
- if total_count > 20:
- lines.append(f"⚠️ 广告数量较多({total_count} 个),详情请查看在线表格")
- # 只展示前 5 个示例
- lines.append("")
- lines.append("前 5 个示例:")
- for i, (_, row) in enumerate(df_tier2.head(5).iterrows()):
- ad_id = row.get("ad_id", "")
- action = row.get("final_action", row.get("action", ""))
- ad_name = str(row.get("ad_name", ""))[:20]
- lines.append(f" [{ad_id}] {ad_name} → {action}")
- lines.append(" ...")
- else:
- # 数量较少,逐条列出
- for _, row in df_tier2.iterrows():
- ad_id = row.get("ad_id", "")
- action = row.get("final_action", row.get("action", ""))
- ad_name = str(row.get("ad_name", ""))[:20]
- reason = str(row.get("reason", ""))[:60]
- cost_avg = row.get("cost_7d_avg", 0)
- if action == "pause":
- action_label = "⏸️ 暂停"
- elif action == "bid_down":
- pct = row.get("recommended_change_pct", 0)
- if isinstance(pct, str):
- try:
- pct = float(pct)
- except ValueError:
- pct = 0
- action_label = f"⬇️ 降价{abs(pct)*100:.0f}%"
- elif action == "bid_up":
- pct = row.get("recommended_change_pct", 0)
- if isinstance(pct, str):
- try:
- pct = float(pct)
- except ValueError:
- pct = 0
- action_label = f"⬆️ 提价{pct*100:.0f}%"
- else:
- action_label = action
- lines.append(f" [{ad_id}] {ad_name}")
- lines.append(f" 操作: {action_label} | 日均消耗: {cost_avg:.0f}元")
- lines.append(f" 原因: {reason}")
- lines.append("")
- # Tier 0: 无需操作(observe/hold/creative_adjust)
- if not df_tier0.empty:
- lines.append(f"ℹ️ 无需操作({len(df_tier0)} 个,仅通知):")
- for _, row in df_tier0.iterrows():
- ad_id = row.get("ad_id", "")
- action = row.get("final_action", row.get("action", ""))
- action_label = {
- "observe": "观察等待",
- "hold": "保持不变",
- "creative_adjust": "需人工调整素材",
- "scale_up": "建议扩量(新增广告/创意)"
- }.get(action, action)
- lines.append(f" [{ad_id}] {action_label}")
- lines.append("")
- # Tier 1: 小幅调价(自动执行)
- if not df_tier1.empty:
- lines.append(f"✅ 自动执行({len(df_tier1)} 个小幅调价):")
- for _, row in df_tier1.iterrows():
- ad_id = row.get("ad_id", "")
- action = row.get("final_action", row.get("action", ""))
- change_pct = row.get("recommended_change_pct", 0)
- lines.append(f" [{ad_id}] {action} {change_pct:+.1%}")
- lines.append("")
- # 回复指令
- lines.extend([
- "-" * 40,
- "📝 直接回复即可,示例:",
- " \"批准\" / \"通过\" — 全部批准",
- " \"拒绝\" / \"不行\" — 全部拒绝",
- " \"广告 12345 不要暂停\" — 修改指定广告",
- " \"只批准降价的\" — 部分批准",
- " \"降幅改小一点\" — 调整后重新审批",
- f" ⏰ 超时时间: {IM_APPROVAL_TIMEOUT_MINUTES} 分钟",
- "",
- "📎 决策详情请查看在线表格(自动发送链接)",
- ])
- return "\n".join(lines)
- # ═══════════════════════════════════════════
- # 解析审批回复
- # ═══════════════════════════════════════════
- def _parse_approval_reply(content: str, all_ad_ids: List[int]) -> Dict:
- """将运营的自然语言回复原样返回给 Agent 解读。
- 不做硬解析 — 运营可能用任何自然语言表达审批意见(中文、英文、混合),
- 由 Agent(LLM)负责理解语义并决定后续动作。
- """
- content = content.strip()
- if not content:
- return {"status": "unknown", "raw_reply": ""}
- return {
- "status": "replied",
- "raw_reply": content,
- "ad_ids": all_ad_ids,
- }
- # ═══════════════════════════════════════════
- # 工具:发送审批请求(飞书版)
- # ═══════════════════════════════════════════
- @tool(description="通过飞书发送决策摘要给运营,收集审批结果(支持阻塞等待)")
- async def send_approval_request(
- ctx: ToolContext,
- validated_csv: str = "",
- timeout_minutes: int = IM_APPROVAL_TIMEOUT_MINUTES,
- wait_for_reply: bool = True,
- poll_interval_seconds: int = IM_APPROVAL_POLL_INTERVAL_SECONDS,
- ) -> ToolResult:
- """
- 发送结构化审批请求到飞书。
- 流程:
- 1. 读取验证后的决策 CSV,分级(Tier 1/2/3)
- 2. 生成审批 Excel(仅 Tier 2/3 广告 + 关键指标列)
- 3. 发送飞书消息 x2:文本摘要 + Excel 文件附件
- 4. 阻塞轮询等待回复(30s 间隔 x 最长 30 分钟)
- 5. 解析 approve/reject -> 返回结果
- Args:
- validated_csv: 护栏验证后的 CSV 路径
- timeout_minutes: 审批超时时间(分钟)
- wait_for_reply: 是否阻塞等待回复
- poll_interval_seconds: 轮询间隔(秒)
- """
- try:
- if not IM_ENABLED:
- return ToolResult(
- title="IM 审批未启用",
- output="IM_ENABLED=False,跳过飞书审批。需审批的操作已记录到执行日志。",
- )
- if not FEISHU_OPERATOR_CHAT_ID:
- return ToolResult(
- title="飞书配置缺失",
- output="FEISHU_OPERATOR_CHAT_ID 未配置,无法发送审批请求。",
- )
- # 查找验证 CSV
- if not validated_csv:
- reports_dir = _MINI_DIR / "outputs" / "reports"
- candidates = sorted(reports_dir.glob("validated_decisions_*.csv"), reverse=True)
- if not candidates:
- return ToolResult(title="send_approval_request", output="未找到验证后的决策 CSV")
- validated_csv = str(candidates[0])
- df = pd.read_csv(validated_csv)
- if df.empty:
- return ToolResult(title="send_approval_request", output="决策数据为空")
- # ⚠️ 关键:补充 metrics 数据(通过 ad_id 关联)
- metrics_path = _MINI_DIR / "outputs" / "metrics_temp.csv"
- if not metrics_path.exists():
- # 尝试查找最新的 metrics 文件
- reports_dir = _MINI_DIR / "outputs"
- candidates = sorted(reports_dir.glob("metrics_*.csv"), reverse=True)
- if candidates:
- metrics_path = candidates[0]
- if metrics_path.exists():
- df_metrics = pd.read_csv(metrics_path)
- # 选择需要的列(避免重复列,使用实际列名)
- metrics_cols = [
- "ad_id", "account_id", "ad_name",
- "cost_7d_avg", "cost_7d_total", "revenue_7d_total",
- "动态ROI_7日均值", "bid_amount"
- ]
- # 只保留存在的列
- metrics_cols = [c for c in metrics_cols if c in df_metrics.columns]
- df_metrics_sub = df_metrics[metrics_cols].copy()
- # 从 ad_name 中提取 audience_tier(如 "R500_xxx" → "R500")
- if "ad_name" in df_metrics_sub.columns:
- df_metrics_sub["audience_tier"] = df_metrics_sub["ad_name"].str.extract(r"^(R\d+)")[0]
- # 左连接:保留 df 的所有行,补充 metrics 数据
- df = df.merge(df_metrics_sub, on="ad_id", how="left", suffixes=("", "_metrics"))
- logger.info(f"已从 metrics 补充 {len(metrics_cols)} 列数据")
- else:
- logger.warning("未找到 metrics 文件,审批表格将缺少关键字段")
- # 过滤已暂停的广告(不应出现在审批表中)
- if "configured_status" in df.columns:
- before_count = len(df)
- df = df[df["configured_status"] != "AD_STATUS_SUSPEND"].copy()
- filtered_count = before_count - len(df)
- if filtered_count > 0:
- logger.info(f"审批请求过滤掉 {filtered_count} 个已暂停广告")
- if df.empty:
- return ToolResult(title="send_approval_request", output="过滤后无数据")
- # 分级(包含hold记录,用于参考)
- from execution_engine import _classify_tier
- df["tier"] = df.apply(_classify_tier, axis=1)
- # 按tier分类
- df_tier0 = df[df["tier"] == 0].copy() # observe, hold, creative_adjust(无需操作)
- df_tier1 = df[df["tier"] == 1].copy() # 小幅调价(自动执行)
- df_tier2_3 = df[df["tier"] >= 2].copy() # 暂停、大幅调价(需审批)
- if df.empty:
- return ToolResult(title="send_approval_request", output="无决策数据")
- # 合并需审批的和无需操作的(供运营参考)
- df_for_review = pd.concat([df_tier2_3, df_tier0], ignore_index=True) if not df_tier0.empty else df_tier2_3
- if df_tier2_3.empty:
- total_no_op = len(df_tier0) + len(df_tier1)
- return ToolResult(
- title="无需审批",
- output=f"共 {total_no_op} 个决策:{len(df_tier0)}个无需操作(observe/hold)+ {len(df_tier1)}个自动执行(小幅调价),无需审批",
- )
- # 生成审批请求
- request_id = f"req_{datetime.now().strftime('%Y%m%d_%H%M%S')}_{uuid.uuid4().hex[:6]}"
- message = _format_approval_message(df_tier2_3, df_tier1, df_tier0, request_id)
- # 保存请求状态
- tier2_ad_ids = df_tier2_3["ad_id"].astype(int).tolist()
- _approval_requests[request_id] = {
- "status": "pending",
- "created_at": datetime.now().isoformat(),
- "timeout_at": (datetime.now() + timedelta(minutes=timeout_minutes)).isoformat(),
- "ad_ids": tier2_ad_ids,
- "validated_csv": validated_csv,
- }
- # ─── 通过飞书 API 发送审批消息(文本 + Excel) ───
- feishu_sent = False
- feishu_sent_to_project_chat = False
- sent_time_sec = str(int(time.time())) # 飞书 API start_time 单位:秒
- try:
- # 消息 1a:发送到个人(FEISHU_OPERATOR_OPEN_ID)
- if FEISHU_OPERATOR_OPEN_ID:
- try:
- result_personal = _feishu.send_message(to=FEISHU_OPERATOR_OPEN_ID, text=message)
- logger.info("飞书审批消息发送成功(个人): message_id=%s", result_personal.message_id)
- except Exception as e:
- logger.warning("发送到个人失败: %s", e)
- # 消息 1b:发送到投放项目群聊(如果配置了)— 临时禁用
- # if FEISHU_AD_PROJECT_CHAT_ID:
- # try:
- # result_project = _feishu.send_message(to=FEISHU_AD_PROJECT_CHAT_ID, text=message)
- # feishu_sent_to_project_chat = True
- # feishu_sent = True
- # logger.info("飞书审批消息发送成功(项目群): message_id=%s", result_project.message_id)
- # except Exception as e:
- # logger.warning("发送到项目群聊失败: %s", e)
- # 消息 2:导入为飞书在线表格(决策详情,含hold参考)
- try:
- xlsx_path = _generate_approval_xlsx(df_for_review, request_id)
- # 导入飞书在线表格并发送链接(项目群)— 临时禁用
- from feishu_doc import import_to_feishu
- # 发送到项目群 — 临时禁用
- # if FEISHU_AD_PROJECT_CHAT_ID:
- # import_result = await import_to_feishu(
- # ctx=ctx,
- # xlsx_path=str(xlsx_path),
- # send_im=True,
- # chat_id=FEISHU_AD_PROJECT_CHAT_ID
- # )
- #
- # if import_result.metadata and import_result.metadata.get("url"):
- # sheet_url = import_result.metadata["url"]
- # logger.info("飞书审批表格导入成功(项目群): %s", sheet_url)
- # else:
- # logger.warning("飞书在线表格导入失败(项目群),回退到文件附件模式")
- # # 回退:发送文件附件(项目群)
- # file_result = _feishu.send_file(
- # to=FEISHU_AD_PROJECT_CHAT_ID,
- # file=str(xlsx_path),
- # file_name=f"审批决策表_{request_id}.xlsx",
- # )
- # logger.info("飞书审批 Excel(文件)发送成功(项目群): message_id=%s", file_result.message_id)
- # 发送到个人
- if FEISHU_OPERATOR_OPEN_ID:
- try:
- personal_import_result = await import_to_feishu(
- ctx=ctx,
- xlsx_path=str(xlsx_path),
- send_im=True,
- chat_id=FEISHU_OPERATOR_OPEN_ID
- )
- if personal_import_result.metadata and personal_import_result.metadata.get("url"):
- logger.info("飞书审批表格发送成功(个人): %s", personal_import_result.metadata["url"])
- else:
- # 回退:发送文件附件(个人)
- file_result_personal = _feishu.send_file(
- to=FEISHU_OPERATOR_OPEN_ID,
- file=str(xlsx_path),
- file_name=f"决策表_{request_id}.xlsx",
- )
- logger.info("飞书决策 Excel(文件)发送成功(个人): message_id=%s", file_result_personal.message_id)
- except Exception as e:
- logger.warning("发送表格到个人失败: %s", e)
- except Exception as e:
- logger.warning("飞书在线表格导入失败(不影响审批流程): %s", e)
- except Exception as e:
- logger.warning("飞书发消息失败: %s", e)
- # 保存审批消息到文件(备份)
- approval_dir = _MINI_DIR / "outputs" / "approvals"
- approval_dir.mkdir(parents=True, exist_ok=True)
- msg_path = approval_dir / f"{request_id}.txt"
- msg_path.write_text(message, encoding="utf-8")
- # ─── 非阻塞模式 ───
- if not wait_for_reply:
- return ToolResult(
- title=f"审批请求已发送({len(tier2_ad_ids)}个待审批)",
- output=(
- f"审批请求 {request_id} 已{'通过飞书发送' if feishu_sent else '保存到文件(飞书发送失败)'}\n"
- f" 待审批: {len(tier2_ad_ids)} 个广告\n"
- f" 无需操作: {len(df_tier0)} 个广告(observe/hold)\n"
- f" 自动执行: {len(df_tier1)} 个广告(小幅调价)\n"
- f" 超时时间: {timeout_minutes} 分钟\n"
- f" 消息备份: {msg_path}\n\n"
- f"使用 check_approval_status(request_id='{request_id}') 检查审批结果"
- ),
- metadata={
- "request_id": request_id,
- "pending_count": len(tier2_ad_ids),
- "tier0_count": len(df_tier0),
- "tier1_count": len(df_tier1),
- "feishu_sent": feishu_sent,
- "msg_path": str(msg_path),
- },
- )
- # ═══ 阻塞轮询等待飞书回复 ═══
- timeout_at = datetime.now() + timedelta(minutes=timeout_minutes)
- logger.info(
- "阻塞等待飞书审批回复(超时 %d 分钟,间隔 %d 秒)...",
- timeout_minutes,
- poll_interval_seconds,
- )
- poll_count = 0
- while datetime.now() < timeout_at:
- await asyncio.sleep(poll_interval_seconds)
- poll_count += 1
- remaining = (timeout_at - datetime.now()).total_seconds() / 60
- if poll_count % 10 == 0:
- logger.info("飞书审批轮询 #%d,剩余 %.1f 分钟", poll_count, remaining)
- # 读取个人和项目群的审批回复
- try:
- # ✅ 修改:监听个人私聊和项目群聊的消息 — 临时只监听个人
- chat_ids_to_check = []
- if FEISHU_OPERATOR_OPEN_ID:
- chat_ids_to_check.append(FEISHU_OPERATOR_OPEN_ID)
- # 临时禁用项目群聊监听
- # if FEISHU_AD_PROJECT_CHAT_ID:
- # chat_ids_to_check.append(FEISHU_AD_PROJECT_CHAT_ID)
- for chat_id in chat_ids_to_check:
- result = _feishu.get_message_list(
- chat_id=chat_id,
- start_time=sent_time_sec,
- page_size=10,
- )
- if result and result.get("items"):
- for msg in result["items"]:
- sender_id = msg.get("sender_id", "")
- sender_type = msg.get("sender_type", "")
- logger.debug(
- "飞书消息 [%s]: sender_type=%s, sender_id=%s, content=%s",
- chat_id, sender_type, sender_id, str(msg.get("content", ""))[:100],
- )
- # ✅ 修改:接受任何用户的回复(不限制特定个人)
- if sender_type != "user":
- continue
- # 框架已自动解析 text 消息的 JSON -> 纯文本
- text = msg.get("content", "")
- if not text.strip():
- continue
- # 检测到运营回复,返回原文给 Agent 理解
- parsed = _parse_approval_reply(text, tier2_ad_ids)
- if parsed["status"] != "unknown":
- _approval_requests[request_id].update({
- "status": "replied",
- "reply_content": text,
- "reply_at": datetime.now().isoformat(),
- "ad_ids": tier2_ad_ids,
- })
- logger.info("飞书审批收到运营回复: %s", text[:200])
- ad_ids_str = ", ".join(str(x) for x in tier2_ad_ids[:10])
- if len(tier2_ad_ids) > 10:
- ad_ids_str += f"...共{len(tier2_ad_ids)}个"
- return ToolResult(
- title="运营已回复",
- output=(
- f"运营飞书回复原文: {text}\n"
- f"等待审批的广告ID: {ad_ids_str}\n"
- f"等待时间: {poll_count * poll_interval_seconds} 秒\n\n"
- f"请根据运营的自然语言回复判断后续操作:\n"
- f"- 运营同意/批准/通过 → 调用 execute_decisions 执行\n"
- f"- 运营拒绝/驳回 → 停止执行,告知原因\n"
- f"- 运营要求修改(如\"广告X不要暂停\")→ 进入 Mode 3: modify_decisions → validate → 重新审批\n"
- f"- 运营部分批准(如\"只批准降价的\")→ 相应过滤后 execute_decisions"
- ),
- metadata={
- "request_id": request_id,
- "feishu_sent": feishu_sent,
- "msg_path": str(msg_path),
- "poll_count": poll_count,
- "raw_reply": text,
- "ad_ids": tier2_ad_ids,
- },
- )
- except Exception as e:
- logger.debug("飞书读消息失败(将重试): %s", e)
- # ─── 超时 ───
- _approval_requests[request_id]["status"] = "timeout"
- logger.warning("审批请求 %s 超时(%d 分钟)", request_id, timeout_minutes)
- return ToolResult(
- title="审批超时",
- output=(
- f"请求 {request_id} 超时({timeout_minutes}分钟),需审批的 {len(tier2_ad_ids)} 个操作未执行\n"
- f"轮询次数: {poll_count}\n"
- f"消息备份: {msg_path}"
- ),
- metadata={
- "request_id": request_id,
- "status": "timeout",
- "pending_ad_ids": tier2_ad_ids,
- "feishu_sent": feishu_sent,
- "msg_path": str(msg_path),
- "poll_count": poll_count,
- },
- )
- except Exception as e:
- logger.error("send_approval_request 失败: %s", e, exc_info=True)
- return ToolResult(title="send_approval_request 失败", output=str(e))
- # ═══════════════════════════════════════════
- # 工具:检查审批状态
- # ═══════════════════════════════════════════
- @tool(description="检查运营飞书审批结果")
- async def check_approval_status(
- ctx: ToolContext,
- request_id: str,
- ) -> ToolResult:
- """
- 检查审批请求的状态。通过飞书 API 读取最新消息。
- Args:
- request_id: 审批请求 ID(send_approval_request 返回)
- """
- try:
- request = _approval_requests.get(request_id)
- if not request:
- return ToolResult(
- title="审批请求不存在",
- output=f"未找到请求 {request_id}。可能已过期或重启后丢失。",
- )
- # 检查超时
- timeout_at = datetime.fromisoformat(request["timeout_at"])
- if datetime.now() > timeout_at:
- request["status"] = "timeout"
- return ToolResult(
- title="审批已超时",
- output=f"请求 {request_id} 已超时({IM_APPROVAL_TIMEOUT_MINUTES}分钟)",
- metadata={"status": "timeout", "request_id": request_id},
- )
- if request["status"] != "pending":
- return ToolResult(
- title=f"审批状态: {request['status']}",
- output=json.dumps(request, ensure_ascii=False, indent=2),
- metadata=request,
- )
- # 通过框架 FeishuClient 读取审批消息之后的回复
- try:
- created_at_sec = str(int(
- datetime.fromisoformat(request["created_at"]).timestamp()
- ))
- # ✅ 修改:监听个人私聊和项目群聊的消息 — 临时只监听个人
- chat_ids_to_check = []
- if FEISHU_OPERATOR_OPEN_ID:
- chat_ids_to_check.append(FEISHU_OPERATOR_OPEN_ID)
- # 临时禁用项目群聊监听
- # if FEISHU_AD_PROJECT_CHAT_ID:
- # chat_ids_to_check.append(FEISHU_AD_PROJECT_CHAT_ID)
- for chat_id in chat_ids_to_check:
- result = _feishu.get_message_list(
- chat_id=chat_id,
- start_time=created_at_sec,
- page_size=10,
- )
- if result and result.get("items"):
- for msg in result["items"]:
- sender_id = msg.get("sender_id", "")
- sender_type = msg.get("sender_type", "")
- logger.debug(
- "飞书消息(check) [%s]: sender_type=%s, sender_id=%s, content=%s",
- chat_id, sender_type, sender_id, str(msg.get("content", ""))[:100],
- )
- # ✅ 修改:接受任何用户的回复(不限制特定个人)
- if sender_type != "user":
- continue
- text = msg.get("content", "")
- if not text.strip():
- continue
- # 检测到运营回复,返回原文给 Agent 理解
- parsed = _parse_approval_reply(text, request["ad_ids"])
- if parsed["status"] != "unknown":
- request.update({
- "status": "replied",
- "reply_content": text,
- "reply_at": datetime.now().isoformat(),
- })
- ad_ids = request["ad_ids"]
- ad_ids_str = ", ".join(str(x) for x in ad_ids[:10])
- if len(ad_ids) > 10:
- ad_ids_str += f"...共{len(ad_ids)}个"
- return ToolResult(
- title="运营已回复",
- output=(
- f"运营飞书回复原文: {text}\n"
- f"等待审批的广告ID: {ad_ids_str}\n\n"
- f"请根据运营的自然语言回复判断后续操作:\n"
- f"- 运营同意/批准/通过 → 调用 execute_decisions 执行\n"
- f"- 运营拒绝/驳回 → 停止执行,告知原因\n"
- f"- 运营要求修改 → 进入 Mode 3: modify_decisions → validate → 重新审批\n"
- f"- 运营部分批准 → 相应过滤后 execute_decisions"
- ),
- metadata={"request_id": request_id, "raw_reply": text, "ad_ids": ad_ids},
- )
- except Exception as e:
- logger.debug("飞书读消息失败: %s", e)
- remaining = (timeout_at - datetime.now()).total_seconds() / 60
- return ToolResult(
- title="等待审批中",
- output=f"请求 {request_id} 等待飞书审批中(剩余 {remaining:.0f} 分钟)",
- metadata={"status": "pending", "request_id": request_id, "remaining_minutes": round(remaining, 1)},
- )
- except Exception as e:
- logger.error("check_approval_status 失败: %s", e, exc_info=True)
- return ToolResult(title="check_approval_status 失败", output=str(e))
|