""" IM 审批流 — auto_put_ad_mini(飞书直连版) 职责: - 通过飞书 API 发送审批消息给运营(文本摘要 + Excel 附件) - 轮询飞书聊天获取审批回复 - 支持阻塞式等待(工具内部 sleep+poll,不消耗 Agent iteration) 飞书能力(通过框架 FeishuClient): - 发文本消息:send_message(to, text) - 发文件附件:send_file(to, file, file_name) - 读消息列表:get_message_list(chat_id, start_time, page_size) """ import asyncio import json import logging import sys import time import uuid from datetime import datetime, timedelta from pathlib import Path from typing import Dict, List, Optional import pandas as pd from agent.tools import tool from agent.tools.models import ToolContext, ToolResult from agent.tools.builtin.feishu.feishu_client import FeishuClient _MINI_DIR = Path(__file__).resolve().parent.parent _TOOLS_DIR = Path(__file__).resolve().parent if str(_MINI_DIR) not in sys.path: sys.path.insert(0, str(_MINI_DIR)) if str(_TOOLS_DIR) not in sys.path: sys.path.insert(0, str(_TOOLS_DIR)) from config import ( IM_ENABLED, IM_APPROVAL_TIMEOUT_MINUTES, IM_APPROVAL_POLL_INTERVAL_SECONDS, FEISHU_APP_ID, FEISHU_APP_SECRET, FEISHU_OPERATOR_OPEN_ID, FEISHU_OPERATOR_CHAT_ID, FEISHU_AD_PROJECT_CHAT_ID, # 新增:投放项目群聊 ) logger = logging.getLogger(__name__) # chat_history 路径(websocket_event.py 写入的位置) _CHAT_HISTORY_DIR = ( Path(__file__).resolve().parent.parent.parent.parent / "agent" / "tools" / "builtin" / "feishu" / "chat_history" ) def _get_contact_name(open_id: str) -> Optional[str]: """根据 open_id 从 feishu_contacts.json 查联系人名""" try: from agent.tools.builtin.feishu.chat import get_contact_by_id contact = get_contact_by_id(open_id) return contact.get("name") if contact else None except Exception: return None def _snapshot_message_ids(contact_name: str) -> set: """快照当前 chat_history 里已有的 message_id""" chat_file = _CHAT_HISTORY_DIR / f"chat_{contact_name}.json" if not chat_file.exists(): return set() try: msgs = json.loads(chat_file.read_text(encoding="utf-8")) return {m["message_id"] for m in msgs if "message_id" in m} except Exception: return set() def _poll_new_messages(contact_name: str, known_ids: set) -> list: """读取新消息文本(不在 known_ids 里的 message_id)""" chat_file = _CHAT_HISTORY_DIR / f"chat_{contact_name}.json" if not chat_file.exists(): return [] try: msgs = json.loads(chat_file.read_text(encoding="utf-8")) new_texts = [] for m in msgs: if m.get("message_id") in known_ids: continue for block in m.get("content", []): if block.get("type") == "text" and block.get("text", "").strip(): new_texts.append(block["text"]) return new_texts except Exception: return [] # 审批请求状态 _approval_requests: Dict[str, Dict] = {} # 全局客户端实例(框架 FeishuClient,自动管理 token) _feishu = FeishuClient(app_id=FEISHU_APP_ID, app_secret=FEISHU_APP_SECRET) # ═══════════════════════════════════════════ # 审批 Excel 生成 # ═══════════════════════════════════════════ # 审批表精选列(运营审阅所需的关键指标) # 列顺序:日期 → 账户ID → 广告ID → 广告消耗 → 决策动作 → 其他关键信息(简洁版) APPROVAL_COLUMNS = [ # 核心标识(前5列,含决策动作) "approval_date", "account_id", "ad_id", "cost_7d_avg", "action", # 基础信息 "ad_name", "audience_tier", "ad_age_days", "bid_amount", # 关键指标(使用实际列名) "动态ROI_7日均值", "cost_7d_total", "revenue_7d_total", # 决策详情 "dimension", "reason", "recommended_change_pct", ] def _generate_approval_xlsx(df_tier2_3: pd.DataFrame, request_id: str) -> Path: """生成审批专用 Excel(仅需审批的 Tier 2/3 广告) 复用 report_generator._write_xlsx_with_format 生成带颜色标记的 Excel。 文件保存到 outputs/approvals/{request_id}.xlsx。 """ from report_generator import _write_xlsx_with_format approval_dir = _MINI_DIR / "outputs" / "approvals" approval_dir.mkdir(parents=True, exist_ok=True) xlsx_path = approval_dir / f"{request_id}.xlsx" # 添加审批日期列(当前日期) df_tier2_3 = df_tier2_3.copy() df_tier2_3["approval_date"] = datetime.now().strftime("%Y-%m-%d") # 精选列(仅保留 df 中存在的列) cols = [c for c in APPROVAL_COLUMNS if c in df_tier2_3.columns] df_out = df_tier2_3[cols].copy() # 排序:7日消耗<10元的放最后,有消耗的在前,同组内按消耗降序 if "cost_7d_avg" in df_out.columns: df_out["_has_spend"] = (df_out["cost_7d_avg"] >= 10.0).astype(int) # >=10元算有消耗 df_out = df_out.sort_values( ["_has_spend", "cost_7d_avg"], ascending=[False, False] # 有消耗在前(1在前),消耗高的在前 ) df_out.drop(columns=["_has_spend"], inplace=True) _write_xlsx_with_format(df_out, xlsx_path) return xlsx_path # ═══════════════════════════════════════════ # 格式化审批消息 # ═══════════════════════════════════════════ def _format_project_notification_message(df_tier2: pd.DataFrame, df_tier1: pd.DataFrame, request_id: str) -> str: """格式化投放项目群聊通知消息(仅通知,不需要审批)""" lines = [ "📊 广告调控决策通知", f"请求ID: {request_id}", f"时间: {datetime.now().strftime('%Y-%m-%d %H:%M')}", "", ] # 需审批操作统计 if not df_tier2.empty: total_count = len(df_tier2) lines.append(f"🔶 待审批操作({total_count} 个):") lines.append("-" * 40) # 统计各操作类型 action_counts = df_tier2.get("final_action", df_tier2.get("action", "")).value_counts().to_dict() for action, count in action_counts.items(): if action == "pause": lines.append(f" ⏸️ 暂停: {count} 个") elif action == "bid_down": lines.append(f" ⬇️ 降价: {count} 个") elif action == "bid_up": lines.append(f" ⬆️ 提价: {count} 个") else: lines.append(f" {action}: {count} 个") lines.append("") # 简化展示(只显示前3个) lines.append("前 3 个示例:") for i, (_, row) in enumerate(df_tier2.head(3).iterrows()): ad_id = row.get("ad_id", "") action = row.get("final_action", row.get("action", "")) ad_name = str(row.get("ad_name", ""))[:20] cost_avg = row.get("cost_7d_avg", 0) roi = row.get("动态ROI_7日均值", 0) if action == "pause": action_label = "⏸️ 暂停" elif action == "bid_down": pct = row.get("recommended_change_pct", 0) if isinstance(pct, str): try: pct = float(pct) except ValueError: pct = 0 action_label = f"⬇️ 降价{abs(pct)*100:.0f}%" elif action == "bid_up": pct = row.get("recommended_change_pct", 0) if isinstance(pct, str): try: pct = float(pct) except ValueError: pct = 0 action_label = f"⬆️ 提价{pct*100:.0f}%" else: action_label = action lines.append(f" [{ad_id}] {ad_name}") lines.append(f" 操作: {action_label} | 日均消耗: {cost_avg:.0f}元 | ROI: {roi:.2f}") lines.append("") if total_count > 3: lines.append(f" ...还有 {total_count - 3} 个(查看在线表格)") lines.append("") # 无需操作 + 自动执行 tier0_count = len(df_tier0) if not df_tier0.empty else 0 tier1_count = len(df_tier1) if not df_tier1.empty else 0 if tier0_count > 0 or tier1_count > 0: status_parts = [] if tier0_count > 0: status_parts.append(f"{tier0_count}个无需操作(observe/hold)") if tier1_count > 0: status_parts.append(f"{tier1_count}个自动执行(小幅调价)") lines.append(f"ℹ️ {' + '.join(status_parts)}") lines.append("") lines.extend([ "-" * 40, "ℹ️ 说明:", " • 此消息为智能决策结果通知", " • 运营审批通过后才会实际执行", " • 详情请查看在线表格(自动发送)", ]) return "\n".join(lines) def _score_top_decisions(df_tier2: pd.DataFrame, top_n: int = 5) -> pd.DataFrame: """对 Tier 2/3 决策按"影响力"评分,选出 top N 展示给运营。 评分公式: score = normalize(cost_7d_avg) * 0.6 + action_weight * 0.3 + confidence_weight * 0.1 权重理由: - 消耗权重最高(0.6):决策影响的金额越大越该优先看 - 动作权重其次(0.3):pause/bid_down 是风险型操作,优先展示 - 置信度权重最低(0.1):仅作为 tiebreaker """ if df_tier2.empty: return df_tier2 df = df_tier2.copy() # 消耗归一化(0-1) cost = pd.to_numeric(df.get("cost_7d_avg", 0), errors="coerce").fillna(0) cost_max = cost.max() cost_norm = cost / cost_max if cost_max > 0 else cost * 0 # 动作权重:pause/bid_down = 1.0(风险型),bid_up/scale_up = 0.7(机会型),其他 = 0.5 def _action_weight(a: str) -> float: a = str(a).strip() if a in ("pause", "bid_down"): return 1.0 if a in ("bid_up", "scale_up"): return 0.7 return 0.5 action_col = df.get("final_action", df.get("action", "")) action_weight = action_col.apply(_action_weight) # 置信度权重 def _conf_weight(c: str) -> float: c = str(c).strip().lower() if c == "high": return 1.0 if c == "medium": return 0.6 return 0.3 conf_weight = df.get("confidence", "medium").apply(_conf_weight) if "confidence" in df.columns else 0.6 df["_top_score"] = cost_norm * 0.6 + action_weight * 0.3 + conf_weight * 0.1 df = df.sort_values("_top_score", ascending=False) # 如果总量 <= top_n,全部返回 if len(df) <= top_n: return df.drop(columns=["_top_score"]) return df.head(top_n).drop(columns=["_top_score"]) def _format_approval_message(df_tier2: pd.DataFrame, df_tier1: pd.DataFrame, df_tier0: pd.DataFrame, request_id: str) -> str: """格式化审批消息(极简版,目标:手机单屏可读)。 设计原则: - 只保留三块信息:总量 + 各动作数量及昨日消耗 + 追溯码 - 决策明细、审批指引由调用方拼接的表格链接消息承载 - hold / observe / creative_adjust 合并为"观察"一行 - 只打印非零桶,固定顺序避免抖动 """ total = len(df_tier2) now_label = datetime.now().strftime("%m-%d %H:%M") lines = [ f"📊 广告调控 · {now_label} · 请您复核", f"📌 共 {total} 条决策需您审批", ] if total > 0: # 取 action 列(双 fallback,保持和现状一致) action_col = df_tier2.get("final_action", df_tier2.get("action", pd.Series(dtype=str))) df = df_tier2.assign(_action=action_col.fillna("")) # hold / observe / creative_adjust 合并为 observe 桶 def _bucket(a: str) -> str: if a in ("hold", "observe", "creative_adjust"): return "observe" return a df["_bucket"] = df["_action"].map(_bucket) # 昨日消耗(列缺失或非数值自动兜底为 0) cost_col = pd.to_numeric(df.get("yesterday_cost", 0), errors="coerce").fillna(0) df["_ycost"] = cost_col agg = df.groupby("_bucket").agg( n=("ad_id", "count"), cost=("_ycost", "sum"), ).to_dict("index") # 显式顺序;无数据的桶不打印 display_order = [ ("pause", "⏸ 暂停"), ("bid_down", "⬇ 降价"), ("bid_up", "⬆ 提价"), ("scale_up", "🚀 扩量"), ("observe", "👀 观察"), ] lines.extend(["", "各动作明细:"]) for bucket, label in display_order: info = agg.get(bucket) if not info or info["n"] == 0: continue lines.append(f" {label} {info['n']} 条|昨日消耗 {info['cost']:,.0f} 元") lines.extend(["", f"(追溯码: {request_id})"]) return "\n".join(lines) # ═══════════════════════════════════════════ # 解析审批回复 # ═══════════════════════════════════════════ def _parse_approval_reply(content: str, all_ad_ids: List[int]) -> Dict: """将运营的自然语言回复原样返回给 Agent 解读。 不做硬解析 — 运营可能用任何自然语言表达审批意见(中文、英文、混合), 由 Agent(LLM)负责理解语义并决定后续动作。 """ content = content.strip() if not content: return {"status": "unknown", "raw_reply": ""} return { "status": "replied", "raw_reply": content, "ad_ids": all_ad_ids, } # ═══════════════════════════════════════════ # 工具:发送审批请求(飞书版) # ═══════════════════════════════════════════ @tool(description="通过飞书发送决策摘要给运营,收集审批结果(支持阻塞等待)") async def send_approval_request( ctx: ToolContext = None, validated_csv: str = "", timeout_minutes: int = IM_APPROVAL_TIMEOUT_MINUTES, wait_for_reply: bool = True, poll_interval_seconds: int = IM_APPROVAL_POLL_INTERVAL_SECONDS, ) -> ToolResult: """ 发送结构化审批请求到飞书。 流程: 1. 读取验证后的决策 CSV,分级(Tier 1/2/3) 2. 生成审批 Excel(仅 Tier 2/3 广告 + 关键指标列) 3. 发送飞书消息 x2:文本摘要 + Excel 文件附件 4. 阻塞轮询等待回复(30s 间隔 x 最长 30 分钟) 5. 解析 approve/reject -> 返回结果 Args: validated_csv: 护栏验证后的 CSV 路径 timeout_minutes: 审批超时时间(分钟) wait_for_reply: 是否阻塞等待回复 poll_interval_seconds: 轮询间隔(秒) """ try: if not IM_ENABLED: return ToolResult( title="IM 审批未启用", output="IM_ENABLED=False,跳过飞书审批。需审批的操作已记录到执行日志。", ) if not FEISHU_OPERATOR_CHAT_ID: return ToolResult( title="飞书配置缺失", output="FEISHU_OPERATOR_CHAT_ID 未配置,无法发送审批请求。", ) # 查找验证 CSV if not validated_csv: reports_dir = _MINI_DIR / "outputs" / "reports" candidates = sorted(reports_dir.glob("validated_decisions_*.csv"), reverse=True) if not candidates: return ToolResult(title="send_approval_request", output="未找到验证后的决策 CSV") validated_csv = str(candidates[0]) df = pd.read_csv(validated_csv) if df.empty: return ToolResult(title="send_approval_request", output="决策数据为空") # ⚠️ 关键:补充 metrics 数据(通过 ad_id 关联) metrics_path = _MINI_DIR / "outputs" / "metrics_temp.csv" if not metrics_path.exists(): # 尝试查找最新的 metrics 文件 reports_dir = _MINI_DIR / "outputs" candidates = sorted(reports_dir.glob("metrics_*.csv"), reverse=True) if candidates: metrics_path = candidates[0] if metrics_path.exists(): df_metrics = pd.read_csv(metrics_path) # 选择需要的列(避免重复列,使用实际列名) metrics_cols_needed = [ "ad_id", "account_id", "ad_name", "cost_7d_avg", "cost_7d_total", "revenue_7d_total", "动态ROI_7日均值", "bid_amount" ] # ===== 关键修复:只合并不存在的列,避免重复 ===== # 检查哪些列已经存在于 df 中(从 ad_decision.py 已合并) existing_cols = set(df.columns) cols_to_merge = ["ad_id"] # ad_id 必须保留用于连接 for col in metrics_cols_needed: if col not in existing_cols and col in df_metrics.columns: cols_to_merge.append(col) if len(cols_to_merge) > 1: # 除了ad_id还有其他列需要合并 df_metrics_sub = df_metrics[cols_to_merge].copy() # 从 ad_name 中提取 audience_tier(如 "R500_xxx" → "R500") if "ad_name" in cols_to_merge and "audience_tier" not in existing_cols: df_metrics_sub["audience_tier"] = df_metrics_sub["ad_name"].str.extract(r"^(R\d+)")[0] cols_to_merge.append("audience_tier") # 左连接:只合并缺失的列 df = df.merge(df_metrics_sub, on="ad_id", how="left") logger.info(f"已从 metrics 补充 {len(cols_to_merge)-1} 列数据(跳过已存在列)") else: logger.info("metrics 字段已在 validated_decisions 中存在,无需重复合并") else: logger.warning("未找到 metrics 文件,审批表格将缺少关键字段") # 过滤已暂停/已删除的广告(不应出现在审批表中) if "configured_status" in df.columns: before_count = len(df) excluded_status = {"AD_STATUS_SUSPEND", "AD_STATUS_DELETED"} df = df[~df["configured_status"].isin(excluded_status)].copy() filtered_count = before_count - len(df) if filtered_count > 0: logger.info(f"审批请求过滤掉 {filtered_count} 个已暂停/已删除广告(configured_status)") # 第二道防线:腾讯侧已删除(由 sync_ad_status.py 每日同步回写 is_deleted 列) # 即使上游 apply_decisions 的过滤改坏,也不让僵尸广告进入飞书审批表 try: from config import AD_STATUS_DIR bizdate = (datetime.now() - timedelta(days=1)).strftime("%Y%m%d") ad_status_path = AD_STATUS_DIR / f"ad_status_{bizdate}.csv" if ad_status_path.exists(): df_status = pd.read_csv(ad_status_path) if "is_deleted" in df_status.columns: deleted_ids = set( df_status[df_status["is_deleted"].fillna(False).astype(bool)]["ad_id"].tolist() ) if deleted_ids: before_count = len(df) df = df[~df["ad_id"].isin(deleted_ids)].copy() dropped = before_count - len(df) if dropped > 0: logger.info(f"审批请求过滤掉 {dropped} 个腾讯侧已删除广告") except Exception as e: logger.warning(f"读取 ad_status CSV 做 is_deleted 过滤失败(不影响主流程): {e}") if df.empty: return ToolResult(title="send_approval_request", output="过滤后无数据") # 分级(包含hold记录,用于参考) from execution_engine import _classify_tier df["tier"] = df.apply(_classify_tier, axis=1) # 按tier分类 df_tier0 = df[df["tier"] == 0].copy() # observe, hold, creative_adjust(无需操作) df_tier1 = df[df["tier"] == 1].copy() # 小幅调价(自动执行) df_tier2_3 = df[df["tier"] >= 2].copy() # 暂停、大幅调价(需审批) if df.empty: return ToolResult(title="send_approval_request", output="无决策数据") # 合并需审批的和无需操作的(供运营参考) # ⚠️ 排除不需运营立刻干预的 action,不写飞书表降低噪声 # - hold/observe:无需操作 # - scale_up:扩量逻辑尚在调试,暂不进审批表 # - bid_up:提价决策由系统自动执行,不进审批表(仅保留 pause 和 bid_down) FEISHU_EXCLUDE_ACTIONS = {"hold", "observe", "scale_up", "bid_up"} # 对所有 tier 应用过滤(不仅仅是 tier0) action_col = "final_action" if "final_action" in df.columns else "action" before_filter = len(df) df_filtered = df[~df[action_col].isin(FEISHU_EXCLUDE_ACTIONS)].copy() dropped_count = before_filter - len(df_filtered) if dropped_count > 0: dropped_breakdown = ( df[df[action_col].isin(FEISHU_EXCLUDE_ACTIONS)][action_col] .value_counts().to_dict() ) logger.info( f"飞书表过滤掉 {dropped_count} 个 {sorted(FEISHU_EXCLUDE_ACTIONS)} 决策" f"(明细: {dropped_breakdown},减少表格噪声)" ) # 使用过滤后的数据作为审批表 df_for_review = df_filtered if df_tier2_3.empty: total_no_op = len(df_tier0) + len(df_tier1) return ToolResult( title="无需审批", output=f"共 {total_no_op} 个决策:{len(df_tier0)}个无需操作(observe/hold)+ {len(df_tier1)}个自动执行(小幅调价),无需审批", ) # 生成审批请求 request_id = f"req_{datetime.now().strftime('%Y%m%d_%H%M%S')}_{uuid.uuid4().hex[:6]}" message = _format_approval_message(df_tier2_3, df_tier1, df_tier0, request_id) # 保存请求状态 tier2_ad_ids = df_tier2_3["ad_id"].astype(int).tolist() _approval_requests[request_id] = { "status": "pending", "created_at": datetime.now().isoformat(), "timeout_at": (datetime.now() + timedelta(minutes=timeout_minutes)).isoformat(), "ad_ids": tier2_ad_ids, "validated_csv": validated_csv, } # ─── 通过飞书 API 发送审批消息(单条合并:概述 + 表格链接 + 审批指引) ─── # 设计:不再分两次发(纯文本 + 表格链接),改为一次发合并消息。 # message 作为 preamble 传给 import_to_feishu,内部 _send_link_message 负责合并。 feishu_sent = False feishu_sent_to_project_chat = False sent_time_sec = str(int(time.time())) # 飞书 API start_time 单位:秒 poll_chat_ids = [] # 用于轮询的真正 chat_id(从 send_message 返回值中提取) try: xlsx_path = _generate_approval_xlsx(df_for_review, request_id) from feishu_doc import import_to_feishu # 发送到项目群 if FEISHU_AD_PROJECT_CHAT_ID: try: import_result = await import_to_feishu( ctx=ctx, xlsx_path=str(xlsx_path), send_im=True, chat_id=FEISHU_AD_PROJECT_CHAT_ID, preamble=message, ) meta = import_result.metadata or {} if meta.get("url"): logger.info("飞书审批合并消息发送成功(项目群): %s", meta["url"]) if meta.get("im_sent"): feishu_sent = True feishu_sent_to_project_chat = True # 群聊 chat_id 本身就是 oc_xxx,可直接轮询 if FEISHU_AD_PROJECT_CHAT_ID not in poll_chat_ids: poll_chat_ids.append(FEISHU_AD_PROJECT_CHAT_ID) else: logger.warning("飞书在线表格导入失败(项目群),回退到文件附件 + 文本") # 回退:发 preamble + 文件附件 try: _feishu.send_message(to=FEISHU_AD_PROJECT_CHAT_ID, text=message) except Exception: pass file_result = _feishu.send_file( to=FEISHU_AD_PROJECT_CHAT_ID, file=str(xlsx_path), file_name=f"广告审批_{datetime.now().strftime('%Y%m%d_%H%M')}.xlsx", ) logger.info("飞书审批 Excel(文件)发送成功(项目群): message_id=%s", file_result.message_id) feishu_sent = True feishu_sent_to_project_chat = True if FEISHU_AD_PROJECT_CHAT_ID not in poll_chat_ids: poll_chat_ids.append(FEISHU_AD_PROJECT_CHAT_ID) except Exception as e: logger.warning("发送合并消息到项目群失败: %s", e) # 发送到个人 if FEISHU_OPERATOR_OPEN_ID: try: personal_import_result = await import_to_feishu( ctx=ctx, xlsx_path=str(xlsx_path), send_im=True, chat_id=FEISHU_OPERATOR_OPEN_ID, preamble=message, ) meta_p = personal_import_result.metadata or {} if meta_p.get("url"): logger.info("飞书审批合并消息发送成功(个人): %s", meta_p["url"]) if meta_p.get("im_sent"): feishu_sent = True # 从 send_message 返回值提取 P2P chat_id(用于后续轮询) p2p_chat = meta_p.get("im_chat_id") if p2p_chat and p2p_chat not in poll_chat_ids: poll_chat_ids.append(p2p_chat) logger.info("提取到 P2P chat_id: %s(用于轮询回复)", p2p_chat) else: # 回退:发 preamble + 文件附件 logger.warning("飞书在线表格导入失败(个人),回退到文件附件 + 文本") try: res_txt = _feishu.send_message(to=FEISHU_OPERATOR_OPEN_ID, text=message) if hasattr(res_txt, 'chat_id') and res_txt.chat_id and res_txt.chat_id not in poll_chat_ids: poll_chat_ids.append(res_txt.chat_id) except Exception: pass file_result_personal = _feishu.send_file( to=FEISHU_OPERATOR_OPEN_ID, file=str(xlsx_path), file_name=f"广告审批_{datetime.now().strftime('%Y%m%d_%H%M')}.xlsx", ) logger.info("飞书决策 Excel(文件)发送成功(个人): message_id=%s", file_result_personal.message_id) feishu_sent = True except Exception as e: logger.warning("发送合并消息到个人失败: %s", e) except Exception as e: logger.warning("飞书发消息失败: %s", e) # ✅ 兜底:如果 send_message 未返回 P2P chat_id,用 config 中的 FEISHU_OPERATOR_CHAT_ID if not poll_chat_ids and FEISHU_OPERATOR_CHAT_ID: poll_chat_ids.append(FEISHU_OPERATOR_CHAT_ID) logger.info("使用配置中的 FEISHU_OPERATOR_CHAT_ID 兜底: %s", FEISHU_OPERATOR_CHAT_ID) # 将 poll_chat_ids 存入请求状态(供 check_approval_status 使用) _approval_requests[request_id]["poll_chat_ids"] = poll_chat_ids logger.info("轮询目标 chat_ids: %s", poll_chat_ids) # 保存审批消息到文件(备份) approval_dir = _MINI_DIR / "outputs" / "approvals" approval_dir.mkdir(parents=True, exist_ok=True) msg_path = approval_dir / f"{request_id}.txt" msg_path.write_text(message, encoding="utf-8") # 快照当前 chat_history 里已有的 message_id(用于识别新回复) _operator_contact_name = _get_contact_name(FEISHU_OPERATOR_OPEN_ID) _known_message_ids: set = set() if _operator_contact_name: _known_message_ids = _snapshot_message_ids(_operator_contact_name) logger.info( "chat_history 快照:联系人=%s,已有消息=%d条", _operator_contact_name, len(_known_message_ids) ) else: logger.warning( "无法识别运营联系人(open_id=%s),将回退到 HTTP 轮询", FEISHU_OPERATOR_OPEN_ID ) # ─── 非阻塞模式 ─── if not wait_for_reply: return ToolResult( title=f"审批请求已发送({len(tier2_ad_ids)}个待审批)", output=( f"审批请求 {request_id} 已{'通过飞书发送' if feishu_sent else '保存到文件(飞书发送失败)'}\n" f" 待审批: {len(tier2_ad_ids)} 个广告\n" f" 无需操作: {len(df_tier0)} 个广告(observe/hold)\n" f" 自动执行: {len(df_tier1)} 个广告(小幅调价)\n" f" 超时时间: {timeout_minutes} 分钟\n" f" 消息备份: {msg_path}\n\n" f"使用 check_approval_status(request_id='{request_id}') 检查审批结果" ), metadata={ "request_id": request_id, "pending_count": len(tier2_ad_ids), "tier0_count": len(df_tier0), "tier1_count": len(df_tier1), "feishu_sent": feishu_sent, "msg_path": str(msg_path), }, ) # ═══ 等待飞书回复(本地 chat_history + HTTP 轮询双通道)═══ timeout_at = datetime.now() + timedelta(minutes=timeout_minutes) use_local_poll = bool(_operator_contact_name) logger.info( "等待审批回复:mode=%s,超时 %d 分钟", "local+http" if use_local_poll else "http_poll", timeout_minutes, ) poll_count = 0 while datetime.now() < timeout_at: await asyncio.sleep(1 if use_local_poll else poll_interval_seconds) poll_count += 1 remaining = (timeout_at - datetime.now()).total_seconds() / 60 if poll_count % 30 == 0: logger.info("等待审批回复 #%d,剩余 %.1f 分钟", poll_count, remaining) # ─── 优先:读本地 chat_history(WebSocket 推送写入,1秒延迟)─── if use_local_poll: try: new_texts = _poll_new_messages(_operator_contact_name, _known_message_ids) if new_texts: text = new_texts[0] _known_message_ids = _snapshot_message_ids(_operator_contact_name) _approval_requests[request_id].update({ "status": "replied", "reply_content": text, "reply_at": datetime.now().isoformat(), "ad_ids": tier2_ad_ids, }) logger.info("chat_history 收到运营回复(%d 秒): %s", poll_count, text[:200]) ad_ids_str = ", ".join(str(x) for x in tier2_ad_ids[:10]) if len(tier2_ad_ids) > 10: ad_ids_str += f"...共{len(tier2_ad_ids)}个" return ToolResult( title="运营已回复", output=( f"运营飞书回复原文: {text}\n" f"等待审批的广告ID: {ad_ids_str}\n" f"等待时间: {poll_count} 秒\n\n" f"请根据运营的自然语言回复判断后续操作:\n" f"- 运营同意/批准/通过 → 调用 execute_decisions 执行\n" f"- 运营拒绝/驳回 → 停止执行,告知原因\n" f"- 运营要求修改(如\"广告X不要暂停\"/\"降价的去掉\")→ modify_decisions → validate → 重新审批(等待再次明确'同意')" ), metadata={ "request_id": request_id, "feishu_sent": feishu_sent, "msg_path": str(msg_path), "poll_count": poll_count, "raw_reply": text, "ad_ids": tier2_ad_ids, "poll_mode": "local_chat_history", }, ) except Exception as e: logger.debug("读取 chat_history 失败(将重试): %s", e) # 每 30 秒同时尝试 HTTP 轮询(兜底:WebSocket 断连时仍可检测) if poll_count % 30 != 0: continue # ─── HTTP 轮询(主通道 / 每 30 秒兜底)─── try: for chat_id in poll_chat_ids: result = _feishu.get_message_list( chat_id=chat_id, start_time=sent_time_sec, page_size=10, ) if result and result.get("items"): for msg in result["items"]: sender_type = msg.get("sender_type", "") if sender_type != "user": continue text = msg.get("content", "") if not text.strip(): continue parsed = _parse_approval_reply(text, tier2_ad_ids) if parsed["status"] != "unknown": _approval_requests[request_id].update({ "status": "replied", "reply_content": text, "reply_at": datetime.now().isoformat(), "ad_ids": tier2_ad_ids, }) logger.info("HTTP 轮询收到运营回复: %s", text[:200]) ad_ids_str = ", ".join(str(x) for x in tier2_ad_ids[:10]) if len(tier2_ad_ids) > 10: ad_ids_str += f"...共{len(tier2_ad_ids)}个" return ToolResult( title="运营已回复", output=( f"运营飞书回复原文: {text}\n" f"等待审批的广告ID: {ad_ids_str}\n" f"等待时间: {poll_count * poll_interval_seconds} 秒\n\n" f"请根据运营的自然语言回复判断后续操作:\n" f"- 运营同意/批准/通过 → 调用 execute_decisions 执行\n" f"- 运营拒绝/驳回 → 停止执行,告知原因\n" f"- 运营要求修改(如\"广告X不要暂停\"/\"降价的去掉\")→ modify_decisions → validate → 重新审批(等待再次明确'同意')" ), metadata={ "request_id": request_id, "feishu_sent": feishu_sent, "msg_path": str(msg_path), "poll_count": poll_count, "raw_reply": text, "ad_ids": tier2_ad_ids, "poll_mode": "http", }, ) except Exception as e: logger.debug("飞书读消息失败(将重试): %s", e) # ─── 超时 ─── _approval_requests[request_id]["status"] = "timeout" logger.warning("审批请求 %s 超时(%d 分钟)", request_id, timeout_minutes) return ToolResult( title="审批超时", output=( f"请求 {request_id} 超时({timeout_minutes}分钟),需审批的 {len(tier2_ad_ids)} 个操作未执行\n" f"轮询次数: {poll_count}\n" f"消息备份: {msg_path}" ), metadata={ "request_id": request_id, "status": "timeout", "pending_ad_ids": tier2_ad_ids, "feishu_sent": feishu_sent, "msg_path": str(msg_path), "poll_count": poll_count, }, ) except Exception as e: logger.error("send_approval_request 失败: %s", e, exc_info=True) return ToolResult(title="send_approval_request 失败", output=str(e)) # ═══════════════════════════════════════════ # 工具:检查审批状态 # ═══════════════════════════════════════════ @tool(description="检查运营飞书审批结果") async def check_approval_status( ctx: ToolContext = None, request_id: str = "", ) -> ToolResult: """ 检查审批请求的状态。通过飞书 API 读取最新消息。 Args: request_id: 审批请求 ID(send_approval_request 返回) """ try: request = _approval_requests.get(request_id) if not request: return ToolResult( title="审批请求不存在", output=f"未找到请求 {request_id}。可能已过期或重启后丢失。", ) # 检查超时 timeout_at = datetime.fromisoformat(request["timeout_at"]) if datetime.now() > timeout_at: request["status"] = "timeout" return ToolResult( title="审批已超时", output=f"请求 {request_id} 已超时({IM_APPROVAL_TIMEOUT_MINUTES}分钟)", metadata={"status": "timeout", "request_id": request_id}, ) if request["status"] != "pending": return ToolResult( title=f"审批状态: {request['status']}", output=json.dumps(request, ensure_ascii=False, indent=2), metadata=request, ) # 通过框架 FeishuClient 读取审批消息之后的回复 try: created_at_sec = str(int( datetime.fromisoformat(request["created_at"]).timestamp() )) # ✅ 修复:使用请求中存储的真实 chat_id(从 send_message 返回值提取) chat_ids_to_check = request.get("poll_chat_ids", []) if not chat_ids_to_check: logger.warning("请求 %s 没有 poll_chat_ids,可能是旧版请求", request_id) for chat_id in chat_ids_to_check: result = _feishu.get_message_list( chat_id=chat_id, start_time=created_at_sec, page_size=10, ) if result and result.get("items"): for msg in result["items"]: sender_id = msg.get("sender_id", "") sender_type = msg.get("sender_type", "") logger.debug( "飞书消息(check) [%s]: sender_type=%s, sender_id=%s, content=%s", chat_id, sender_type, sender_id, str(msg.get("content", ""))[:100], ) # ✅ 修改:接受任何用户的回复(不限制特定个人) if sender_type != "user": continue text = msg.get("content", "") if not text.strip(): continue # 检测到运营回复,返回原文给 Agent 理解 parsed = _parse_approval_reply(text, request["ad_ids"]) if parsed["status"] != "unknown": request.update({ "status": "replied", "reply_content": text, "reply_at": datetime.now().isoformat(), }) ad_ids = request["ad_ids"] ad_ids_str = ", ".join(str(x) for x in ad_ids[:10]) if len(ad_ids) > 10: ad_ids_str += f"...共{len(ad_ids)}个" return ToolResult( title="运营已回复", output=( f"运营飞书回复原文: {text}\n" f"等待审批的广告ID: {ad_ids_str}\n\n" f"请根据运营的自然语言回复判断后续操作:\n" f"- 运营同意/批准/通过 → 调用 execute_decisions 执行\n" f"- 运营拒绝/驳回 → 停止执行,告知原因\n" f"- 运营要求修改 → modify_decisions → validate → 重新审批(等待再次明确'同意')" ), metadata={"request_id": request_id, "raw_reply": text, "ad_ids": ad_ids}, ) except Exception as e: logger.debug("飞书读消息失败: %s", e) remaining = (timeout_at - datetime.now()).total_seconds() / 60 return ToolResult( title="等待审批中", output=f"请求 {request_id} 等待飞书审批中(剩余 {remaining:.0f} 分钟)", metadata={"status": "pending", "request_id": request_id, "remaining_minutes": round(remaining, 1)}, ) except Exception as e: logger.error("check_approval_status 失败: %s", e, exc_info=True) return ToolResult(title="check_approval_status 失败", output=str(e)) # ═══════════════════════════════════════════ # 飞书文字消息(用于执行后向运营汇报) # ═══════════════════════════════════════════ @tool(description="向运营飞书发送纯文本消息(用于执行后汇报 diff、确认、质疑回应等非审批场景)") async def send_feishu_text_message( ctx: ToolContext = None, text: str = "", to_operator: bool = True, to_project_chat: bool = True, ) -> ToolResult: """ 向运营个人 IM 和/或投放项目群发送一条纯文本消息。 使用场景(必须显式调用): - 收到"部分批准"反馈后,发送执行 diff 表(本轮执行 X 条 / 保留 Y 条 / 不变 Z 条) - 质疑型反馈的详细回应(对比、历史、ROI 置信度) - 连续 2 轮未达成一致时主动提议"本轮暂停" - 任何"告知但不需要审批"的汇报 ⚠️ 不要用于:首次审批请求(用 send_approval_request);表格链接发送(用 import_to_feishu)。 Args: text: 消息正文(支持换行,建议 < 2 KB 单屏可读)。 to_operator: 发送到运营个人 IM(FEISHU_OPERATOR_OPEN_ID)。默认 True。 to_project_chat: 抄送到投放项目群(FEISHU_AD_PROJECT_CHAT_ID)。默认 True;未配置时自动跳过。 Returns: ToolResult,包含发送目标清单和状态。 """ if not IM_ENABLED: return ToolResult(title="IM 未启用", output="IM_ENABLED=False,消息未发送") if not text or not text.strip(): return ToolResult(title="消息为空", output="text 参数为空,拒绝发送") sent_targets: List[str] = [] failed_targets: List[str] = [] if to_operator and FEISHU_OPERATOR_OPEN_ID: try: _feishu.send_message(to=FEISHU_OPERATOR_OPEN_ID, text=text) sent_targets.append(f"个人 IM ({FEISHU_OPERATOR_OPEN_ID[:12]}...)") logger.info("飞书文字消息发送成功(个人): len=%d", len(text)) except Exception as e: failed_targets.append(f"个人 IM: {e}") logger.error("飞书文字消息发送失败(个人): %s", e) if to_project_chat and FEISHU_AD_PROJECT_CHAT_ID: try: _feishu.send_message(to=FEISHU_AD_PROJECT_CHAT_ID, text=text) sent_targets.append(f"项目群 ({FEISHU_AD_PROJECT_CHAT_ID[:12]}...)") logger.info("飞书文字消息发送成功(群聊): len=%d", len(text)) except Exception as e: failed_targets.append(f"项目群: {e}") logger.error("飞书文字消息发送失败(群聊): %s", e) if not sent_targets and not failed_targets: return ToolResult( title="无可用发送目标", output="to_operator/to_project_chat 均未启用或对应 ID 未配置", ) summary_lines = [] if sent_targets: summary_lines.append(f"✅ 已发送 {len(sent_targets)} 个目标:") for t in sent_targets: summary_lines.append(f" - {t}") if failed_targets: summary_lines.append(f"❌ 失败 {len(failed_targets)} 个目标:") for t in failed_targets: summary_lines.append(f" - {t}") return ToolResult( title=f"飞书文字消息已发送({len(sent_targets)} 个目标)", output="\n".join(summary_lines), metadata={ "sent_count": len(sent_targets), "failed_count": len(failed_targets), "text_length": len(text), }, )