| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063 |
- """
- IM 审批流 — auto_put_ad_mini(飞书直连版)
- 职责:
- - 通过飞书 API 发送审批消息给运营(文本摘要 + Excel 附件)
- - 轮询飞书聊天获取审批回复
- - 支持阻塞式等待(工具内部 sleep+poll,不消耗 Agent iteration)
- 飞书能力(通过框架 FeishuClient):
- - 发文本消息:send_message(to, text)
- - 发文件附件:send_file(to, file, file_name)
- - 读消息列表:get_message_list(chat_id, start_time, page_size)
- """
- import asyncio
- import json
- import logging
- import sys
- import time
- import uuid
- from datetime import datetime, timedelta
- from pathlib import Path
- from typing import Dict, List, Optional
- import pandas as pd
- from agent.tools import tool
- from agent.tools.models import ToolContext, ToolResult
- from agent.tools.builtin.feishu.feishu_client import FeishuClient
- _MINI_DIR = Path(__file__).resolve().parent.parent
- _TOOLS_DIR = Path(__file__).resolve().parent
- if str(_MINI_DIR) not in sys.path:
- sys.path.insert(0, str(_MINI_DIR))
- if str(_TOOLS_DIR) not in sys.path:
- sys.path.insert(0, str(_TOOLS_DIR))
- from config import (
- IM_ENABLED,
- IM_APPROVAL_TIMEOUT_MINUTES,
- IM_APPROVAL_POLL_INTERVAL_SECONDS,
- FEISHU_APP_ID,
- FEISHU_APP_SECRET,
- FEISHU_OPERATOR_OPEN_ID,
- FEISHU_OPERATOR_CHAT_ID,
- FEISHU_AD_PROJECT_CHAT_ID, # 新增:投放项目群聊
- )
- logger = logging.getLogger(__name__)
- # chat_history 路径(websocket_event.py 写入的位置)
- _CHAT_HISTORY_DIR = (
- Path(__file__).resolve().parent.parent.parent.parent
- / "agent" / "tools" / "builtin" / "feishu" / "chat_history"
- )
- def _get_contact_name(open_id: str) -> Optional[str]:
- """根据 open_id 从 feishu_contacts.json 查联系人名"""
- try:
- from agent.tools.builtin.feishu.chat import get_contact_by_id
- contact = get_contact_by_id(open_id)
- return contact.get("name") if contact else None
- except Exception:
- return None
- def _snapshot_message_ids(contact_name: str) -> set:
- """快照当前 chat_history 里已有的 message_id"""
- chat_file = _CHAT_HISTORY_DIR / f"chat_{contact_name}.json"
- if not chat_file.exists():
- return set()
- try:
- msgs = json.loads(chat_file.read_text(encoding="utf-8"))
- return {m["message_id"] for m in msgs if "message_id" in m}
- except Exception:
- return set()
- def _poll_new_messages(contact_name: str, known_ids: set) -> list:
- """读取新消息文本(不在 known_ids 里的 message_id)"""
- chat_file = _CHAT_HISTORY_DIR / f"chat_{contact_name}.json"
- if not chat_file.exists():
- return []
- try:
- msgs = json.loads(chat_file.read_text(encoding="utf-8"))
- new_texts = []
- for m in msgs:
- if m.get("message_id") in known_ids:
- continue
- for block in m.get("content", []):
- if block.get("type") == "text" and block.get("text", "").strip():
- new_texts.append(block["text"])
- return new_texts
- except Exception:
- return []
- # 审批请求状态
- _approval_requests: Dict[str, Dict] = {}
- # 全局客户端实例(框架 FeishuClient,自动管理 token)
- _feishu = FeishuClient(app_id=FEISHU_APP_ID, app_secret=FEISHU_APP_SECRET)
- # ═══════════════════════════════════════════
- # 审批 Excel 生成
- # ═══════════════════════════════════════════
- # 审批表精选列(运营审阅所需的关键指标)
- # 列顺序:日期 → 账户ID → 广告ID → 广告消耗 → 决策动作 → 其他关键信息(简洁版)
- APPROVAL_COLUMNS = [
- # 核心标识(前5列,含决策动作)
- "approval_date", "account_id", "ad_id", "cost_7d_avg", "action",
- # 基础信息
- "ad_name", "audience_tier", "ad_age_days", "bid_amount",
- # 关键指标(使用实际列名)
- "动态ROI_7日均值", "cost_7d_total", "revenue_7d_total",
- # 决策详情
- "dimension", "reason",
- "recommended_change_pct",
- ]
- def _generate_approval_xlsx(df_tier2_3: pd.DataFrame, request_id: str) -> Path:
- """生成审批专用 Excel(仅需审批的 Tier 2/3 广告)
- 复用 report_generator._write_xlsx_with_format 生成带颜色标记的 Excel。
- 文件保存到 outputs/approvals/{request_id}.xlsx。
- """
- from report_generator import _write_xlsx_with_format
- approval_dir = _MINI_DIR / "outputs" / "approvals"
- approval_dir.mkdir(parents=True, exist_ok=True)
- xlsx_path = approval_dir / f"{request_id}.xlsx"
- # 添加审批日期列(当前日期)
- df_tier2_3 = df_tier2_3.copy()
- df_tier2_3["approval_date"] = datetime.now().strftime("%Y-%m-%d")
- # 精选列(仅保留 df 中存在的列)
- cols = [c for c in APPROVAL_COLUMNS if c in df_tier2_3.columns]
- df_out = df_tier2_3[cols].copy()
- # 排序:7日消耗<10元的放最后,有消耗的在前,同组内按消耗降序
- if "cost_7d_avg" in df_out.columns:
- df_out["_has_spend"] = (df_out["cost_7d_avg"] >= 10.0).astype(int) # >=10元算有消耗
- df_out = df_out.sort_values(
- ["_has_spend", "cost_7d_avg"],
- ascending=[False, False] # 有消耗在前(1在前),消耗高的在前
- )
- df_out.drop(columns=["_has_spend"], inplace=True)
- _write_xlsx_with_format(df_out, xlsx_path)
- return xlsx_path
- # ═══════════════════════════════════════════
- # 格式化审批消息
- # ═══════════════════════════════════════════
- def _format_project_notification_message(df_tier2: pd.DataFrame, df_tier1: pd.DataFrame, request_id: str) -> str:
- """格式化投放项目群聊通知消息(仅通知,不需要审批)"""
- lines = [
- "📊 广告调控决策通知",
- f"请求ID: {request_id}",
- f"时间: {datetime.now().strftime('%Y-%m-%d %H:%M')}",
- "",
- ]
- # 需审批操作统计
- if not df_tier2.empty:
- total_count = len(df_tier2)
- lines.append(f"🔶 待审批操作({total_count} 个):")
- lines.append("-" * 40)
- # 统计各操作类型
- action_counts = df_tier2.get("final_action", df_tier2.get("action", "")).value_counts().to_dict()
- for action, count in action_counts.items():
- if action == "pause":
- lines.append(f" ⏸️ 暂停: {count} 个")
- elif action == "bid_down":
- lines.append(f" ⬇️ 降价: {count} 个")
- elif action == "bid_up":
- lines.append(f" ⬆️ 提价: {count} 个")
- else:
- lines.append(f" {action}: {count} 个")
- lines.append("")
- # 简化展示(只显示前3个)
- lines.append("前 3 个示例:")
- for i, (_, row) in enumerate(df_tier2.head(3).iterrows()):
- ad_id = row.get("ad_id", "")
- action = row.get("final_action", row.get("action", ""))
- ad_name = str(row.get("ad_name", ""))[:20]
- cost_avg = row.get("cost_7d_avg", 0)
- roi = row.get("动态ROI_7日均值", 0)
- if action == "pause":
- action_label = "⏸️ 暂停"
- elif action == "bid_down":
- pct = row.get("recommended_change_pct", 0)
- if isinstance(pct, str):
- try:
- pct = float(pct)
- except ValueError:
- pct = 0
- action_label = f"⬇️ 降价{abs(pct)*100:.0f}%"
- elif action == "bid_up":
- pct = row.get("recommended_change_pct", 0)
- if isinstance(pct, str):
- try:
- pct = float(pct)
- except ValueError:
- pct = 0
- action_label = f"⬆️ 提价{pct*100:.0f}%"
- else:
- action_label = action
- lines.append(f" [{ad_id}] {ad_name}")
- lines.append(f" 操作: {action_label} | 日均消耗: {cost_avg:.0f}元 | ROI: {roi:.2f}")
- lines.append("")
- if total_count > 3:
- lines.append(f" ...还有 {total_count - 3} 个(查看在线表格)")
- lines.append("")
- # 无需操作 + 自动执行
- tier0_count = len(df_tier0) if not df_tier0.empty else 0
- tier1_count = len(df_tier1) if not df_tier1.empty else 0
- if tier0_count > 0 or tier1_count > 0:
- status_parts = []
- if tier0_count > 0:
- status_parts.append(f"{tier0_count}个无需操作(observe/hold)")
- if tier1_count > 0:
- status_parts.append(f"{tier1_count}个自动执行(小幅调价)")
- lines.append(f"ℹ️ {' + '.join(status_parts)}")
- lines.append("")
- lines.extend([
- "-" * 40,
- "ℹ️ 说明:",
- " • 此消息为智能决策结果通知",
- " • 运营审批通过后才会实际执行",
- " • 详情请查看在线表格(自动发送)",
- ])
- return "\n".join(lines)
- def _score_top_decisions(df_tier2: pd.DataFrame, top_n: int = 5) -> pd.DataFrame:
- """对 Tier 2/3 决策按"影响力"评分,选出 top N 展示给运营。
- 评分公式:
- score = normalize(cost_7d_avg) * 0.6
- + action_weight * 0.3
- + confidence_weight * 0.1
- 权重理由:
- - 消耗权重最高(0.6):决策影响的金额越大越该优先看
- - 动作权重其次(0.3):pause/bid_down 是风险型操作,优先展示
- - 置信度权重最低(0.1):仅作为 tiebreaker
- """
- if df_tier2.empty:
- return df_tier2
- df = df_tier2.copy()
- # 消耗归一化(0-1)
- cost = pd.to_numeric(df.get("cost_7d_avg", 0), errors="coerce").fillna(0)
- cost_max = cost.max()
- cost_norm = cost / cost_max if cost_max > 0 else cost * 0
- # 动作权重:pause/bid_down = 1.0(风险型),bid_up/scale_up = 0.7(机会型),其他 = 0.5
- def _action_weight(a: str) -> float:
- a = str(a).strip()
- if a in ("pause", "bid_down"):
- return 1.0
- if a in ("bid_up", "scale_up"):
- return 0.7
- return 0.5
- action_col = df.get("final_action", df.get("action", ""))
- action_weight = action_col.apply(_action_weight)
- # 置信度权重
- def _conf_weight(c: str) -> float:
- c = str(c).strip().lower()
- if c == "high":
- return 1.0
- if c == "medium":
- return 0.6
- return 0.3
- conf_weight = df.get("confidence", "medium").apply(_conf_weight) if "confidence" in df.columns else 0.6
- df["_top_score"] = cost_norm * 0.6 + action_weight * 0.3 + conf_weight * 0.1
- df = df.sort_values("_top_score", ascending=False)
- # 如果总量 <= top_n,全部返回
- if len(df) <= top_n:
- return df.drop(columns=["_top_score"])
- return df.head(top_n).drop(columns=["_top_score"])
- def _format_approval_message(df_tier2: pd.DataFrame, df_tier1: pd.DataFrame, df_tier0: pd.DataFrame, request_id: str) -> str:
- """格式化审批消息(极简版,目标:手机单屏可读)。
- 设计原则:
- - 只保留三块信息:总量 + 各动作数量及昨日消耗 + 追溯码
- - 决策明细、审批指引由调用方拼接的表格链接消息承载
- - hold / observe / creative_adjust 合并为"观察"一行
- - 只打印非零桶,固定顺序避免抖动
- """
- total = len(df_tier2)
- now_label = datetime.now().strftime("%m-%d %H:%M")
- lines = [
- f"📊 广告调控 · {now_label} · 请您复核",
- f"📌 共 {total} 条决策需您审批",
- ]
- if total > 0:
- # 取 action 列(双 fallback,保持和现状一致)
- action_col = df_tier2.get("final_action", df_tier2.get("action", pd.Series(dtype=str)))
- df = df_tier2.assign(_action=action_col.fillna(""))
- # hold / observe / creative_adjust 合并为 observe 桶
- def _bucket(a: str) -> str:
- if a in ("hold", "observe", "creative_adjust"):
- return "observe"
- return a
- df["_bucket"] = df["_action"].map(_bucket)
- # 昨日消耗(列缺失或非数值自动兜底为 0)
- cost_col = pd.to_numeric(df.get("yesterday_cost", 0), errors="coerce").fillna(0)
- df["_ycost"] = cost_col
- agg = df.groupby("_bucket").agg(
- n=("ad_id", "count"),
- cost=("_ycost", "sum"),
- ).to_dict("index")
- # 显式顺序;无数据的桶不打印
- display_order = [
- ("pause", "⏸ 暂停"),
- ("bid_down", "⬇ 降价"),
- ("bid_up", "⬆ 提价"),
- ("scale_up", "🚀 扩量"),
- ("observe", "👀 观察"),
- ]
- lines.extend(["", "各动作明细:"])
- for bucket, label in display_order:
- info = agg.get(bucket)
- if not info or info["n"] == 0:
- continue
- lines.append(f" {label} {info['n']} 条|昨日消耗 {info['cost']:,.0f} 元")
- lines.extend(["", f"(追溯码: {request_id})"])
- return "\n".join(lines)
- # ═══════════════════════════════════════════
- # 解析审批回复
- # ═══════════════════════════════════════════
- def _parse_approval_reply(content: str, all_ad_ids: List[int]) -> Dict:
- """将运营的自然语言回复原样返回给 Agent 解读。
- 不做硬解析 — 运营可能用任何自然语言表达审批意见(中文、英文、混合),
- 由 Agent(LLM)负责理解语义并决定后续动作。
- """
- content = content.strip()
- if not content:
- return {"status": "unknown", "raw_reply": ""}
- return {
- "status": "replied",
- "raw_reply": content,
- "ad_ids": all_ad_ids,
- }
- # ═══════════════════════════════════════════
- # 工具:发送审批请求(飞书版)
- # ═══════════════════════════════════════════
- @tool(description="通过飞书发送决策摘要给运营,收集审批结果(支持阻塞等待)")
- async def send_approval_request(
- ctx: ToolContext = None,
- validated_csv: str = "",
- timeout_minutes: int = IM_APPROVAL_TIMEOUT_MINUTES,
- wait_for_reply: bool = True,
- poll_interval_seconds: int = IM_APPROVAL_POLL_INTERVAL_SECONDS,
- ) -> ToolResult:
- """
- 发送结构化审批请求到飞书。
- 流程:
- 1. 读取验证后的决策 CSV,分级(Tier 1/2/3)
- 2. 生成审批 Excel(仅 Tier 2/3 广告 + 关键指标列)
- 3. 发送飞书消息 x2:文本摘要 + Excel 文件附件
- 4. 阻塞轮询等待回复(30s 间隔 x 最长 30 分钟)
- 5. 解析 approve/reject -> 返回结果
- Args:
- validated_csv: 护栏验证后的 CSV 路径
- timeout_minutes: 审批超时时间(分钟)
- wait_for_reply: 是否阻塞等待回复
- poll_interval_seconds: 轮询间隔(秒)
- """
- try:
- if not IM_ENABLED:
- return ToolResult(
- title="IM 审批未启用",
- output="IM_ENABLED=False,跳过飞书审批。需审批的操作已记录到执行日志。",
- )
- if not FEISHU_OPERATOR_CHAT_ID:
- return ToolResult(
- title="飞书配置缺失",
- output="FEISHU_OPERATOR_CHAT_ID 未配置,无法发送审批请求。",
- )
- # 查找验证 CSV
- if not validated_csv:
- reports_dir = _MINI_DIR / "outputs" / "reports"
- candidates = sorted(reports_dir.glob("validated_decisions_*.csv"), reverse=True)
- if not candidates:
- return ToolResult(title="send_approval_request", output="未找到验证后的决策 CSV")
- validated_csv = str(candidates[0])
- df = pd.read_csv(validated_csv)
- if df.empty:
- return ToolResult(title="send_approval_request", output="决策数据为空")
- # ⚠️ 关键:补充 metrics 数据(通过 ad_id 关联)
- metrics_path = _MINI_DIR / "outputs" / "metrics_temp.csv"
- if not metrics_path.exists():
- # 尝试查找最新的 metrics 文件
- reports_dir = _MINI_DIR / "outputs"
- candidates = sorted(reports_dir.glob("metrics_*.csv"), reverse=True)
- if candidates:
- metrics_path = candidates[0]
- if metrics_path.exists():
- df_metrics = pd.read_csv(metrics_path)
- # 选择需要的列(避免重复列,使用实际列名)
- metrics_cols_needed = [
- "ad_id", "account_id", "ad_name",
- "cost_7d_avg", "cost_7d_total", "revenue_7d_total",
- "动态ROI_7日均值", "bid_amount"
- ]
- # ===== 关键修复:只合并不存在的列,避免重复 =====
- # 检查哪些列已经存在于 df 中(从 ad_decision.py 已合并)
- existing_cols = set(df.columns)
- cols_to_merge = ["ad_id"] # ad_id 必须保留用于连接
- for col in metrics_cols_needed:
- if col not in existing_cols and col in df_metrics.columns:
- cols_to_merge.append(col)
- if len(cols_to_merge) > 1: # 除了ad_id还有其他列需要合并
- df_metrics_sub = df_metrics[cols_to_merge].copy()
- # 从 ad_name 中提取 audience_tier(如 "R500_xxx" → "R500")
- if "ad_name" in cols_to_merge and "audience_tier" not in existing_cols:
- df_metrics_sub["audience_tier"] = df_metrics_sub["ad_name"].str.extract(r"^(R\d+)")[0]
- cols_to_merge.append("audience_tier")
- # 左连接:只合并缺失的列
- df = df.merge(df_metrics_sub, on="ad_id", how="left")
- logger.info(f"已从 metrics 补充 {len(cols_to_merge)-1} 列数据(跳过已存在列)")
- else:
- logger.info("metrics 字段已在 validated_decisions 中存在,无需重复合并")
- else:
- logger.warning("未找到 metrics 文件,审批表格将缺少关键字段")
- # 过滤已暂停/已删除的广告(不应出现在审批表中)
- if "configured_status" in df.columns:
- before_count = len(df)
- excluded_status = {"AD_STATUS_SUSPEND", "AD_STATUS_DELETED"}
- df = df[~df["configured_status"].isin(excluded_status)].copy()
- filtered_count = before_count - len(df)
- if filtered_count > 0:
- logger.info(f"审批请求过滤掉 {filtered_count} 个已暂停/已删除广告(configured_status)")
- # 第二道防线:腾讯侧已删除(由 sync_ad_status.py 每日同步回写 is_deleted 列)
- # 即使上游 apply_decisions 的过滤改坏,也不让僵尸广告进入飞书审批表
- try:
- from config import AD_STATUS_DIR
- bizdate = (datetime.now() - timedelta(days=1)).strftime("%Y%m%d")
- ad_status_path = AD_STATUS_DIR / f"ad_status_{bizdate}.csv"
- if ad_status_path.exists():
- df_status = pd.read_csv(ad_status_path)
- if "is_deleted" in df_status.columns:
- deleted_ids = set(
- df_status[df_status["is_deleted"].fillna(False).astype(bool)]["ad_id"].tolist()
- )
- if deleted_ids:
- before_count = len(df)
- df = df[~df["ad_id"].isin(deleted_ids)].copy()
- dropped = before_count - len(df)
- if dropped > 0:
- logger.info(f"审批请求过滤掉 {dropped} 个腾讯侧已删除广告")
- except Exception as e:
- logger.warning(f"读取 ad_status CSV 做 is_deleted 过滤失败(不影响主流程): {e}")
- if df.empty:
- return ToolResult(title="send_approval_request", output="过滤后无数据")
- # 分级(包含hold记录,用于参考)
- from execution_engine import _classify_tier
- df["tier"] = df.apply(_classify_tier, axis=1)
- # 按tier分类
- df_tier0 = df[df["tier"] == 0].copy() # observe, hold, creative_adjust(无需操作)
- df_tier1 = df[df["tier"] == 1].copy() # 小幅调价(自动执行)
- df_tier2_3 = df[df["tier"] >= 2].copy() # 暂停、大幅调价(需审批)
- if df.empty:
- return ToolResult(title="send_approval_request", output="无决策数据")
- # 合并需审批的和无需操作的(供运营参考)
- # ⚠️ 排除不需运营立刻干预的 action,不写飞书表降低噪声
- # - hold/observe:无需操作
- # - scale_up:扩量逻辑尚在调试,暂不进审批表
- # - bid_up:提价决策由系统自动执行,不进审批表(仅保留 pause 和 bid_down)
- FEISHU_EXCLUDE_ACTIONS = {"hold", "observe", "scale_up", "bid_up"}
- # 对所有 tier 应用过滤(不仅仅是 tier0)
- action_col = "final_action" if "final_action" in df.columns else "action"
- before_filter = len(df)
- df_filtered = df[~df[action_col].isin(FEISHU_EXCLUDE_ACTIONS)].copy()
- dropped_count = before_filter - len(df_filtered)
- if dropped_count > 0:
- dropped_breakdown = (
- df[df[action_col].isin(FEISHU_EXCLUDE_ACTIONS)][action_col]
- .value_counts().to_dict()
- )
- logger.info(
- f"飞书表过滤掉 {dropped_count} 个 {sorted(FEISHU_EXCLUDE_ACTIONS)} 决策"
- f"(明细: {dropped_breakdown},减少表格噪声)"
- )
- # 使用过滤后的数据作为审批表
- df_for_review = df_filtered
- if df_tier2_3.empty:
- total_no_op = len(df_tier0) + len(df_tier1)
- return ToolResult(
- title="无需审批",
- output=f"共 {total_no_op} 个决策:{len(df_tier0)}个无需操作(observe/hold)+ {len(df_tier1)}个自动执行(小幅调价),无需审批",
- )
- # 生成审批请求
- request_id = f"req_{datetime.now().strftime('%Y%m%d_%H%M%S')}_{uuid.uuid4().hex[:6]}"
- message = _format_approval_message(df_tier2_3, df_tier1, df_tier0, request_id)
- # 保存请求状态
- tier2_ad_ids = df_tier2_3["ad_id"].astype(int).tolist()
- _approval_requests[request_id] = {
- "status": "pending",
- "created_at": datetime.now().isoformat(),
- "timeout_at": (datetime.now() + timedelta(minutes=timeout_minutes)).isoformat(),
- "ad_ids": tier2_ad_ids,
- "validated_csv": validated_csv,
- }
- # ─── 通过飞书 API 发送审批消息(单条合并:概述 + 表格链接 + 审批指引) ───
- # 设计:不再分两次发(纯文本 + 表格链接),改为一次发合并消息。
- # message 作为 preamble 传给 import_to_feishu,内部 _send_link_message 负责合并。
- feishu_sent = False
- feishu_sent_to_project_chat = False
- sent_time_sec = str(int(time.time())) # 飞书 API start_time 单位:秒
- poll_chat_ids = [] # 用于轮询的真正 chat_id(从 send_message 返回值中提取)
- try:
- xlsx_path = _generate_approval_xlsx(df_for_review, request_id)
- from feishu_doc import import_to_feishu
- # 发送到项目群
- if FEISHU_AD_PROJECT_CHAT_ID:
- try:
- import_result = await import_to_feishu(
- ctx=ctx,
- xlsx_path=str(xlsx_path),
- send_im=True,
- chat_id=FEISHU_AD_PROJECT_CHAT_ID,
- preamble=message,
- )
- meta = import_result.metadata or {}
- if meta.get("url"):
- logger.info("飞书审批合并消息发送成功(项目群): %s", meta["url"])
- if meta.get("im_sent"):
- feishu_sent = True
- feishu_sent_to_project_chat = True
- # 群聊 chat_id 本身就是 oc_xxx,可直接轮询
- if FEISHU_AD_PROJECT_CHAT_ID not in poll_chat_ids:
- poll_chat_ids.append(FEISHU_AD_PROJECT_CHAT_ID)
- else:
- logger.warning("飞书在线表格导入失败(项目群),回退到文件附件 + 文本")
- # 回退:发 preamble + 文件附件
- try:
- _feishu.send_message(to=FEISHU_AD_PROJECT_CHAT_ID, text=message)
- except Exception:
- pass
- file_result = _feishu.send_file(
- to=FEISHU_AD_PROJECT_CHAT_ID,
- file=str(xlsx_path),
- file_name=f"广告审批_{datetime.now().strftime('%Y%m%d_%H%M')}.xlsx",
- )
- logger.info("飞书审批 Excel(文件)发送成功(项目群): message_id=%s", file_result.message_id)
- feishu_sent = True
- feishu_sent_to_project_chat = True
- if FEISHU_AD_PROJECT_CHAT_ID not in poll_chat_ids:
- poll_chat_ids.append(FEISHU_AD_PROJECT_CHAT_ID)
- except Exception as e:
- logger.warning("发送合并消息到项目群失败: %s", e)
- # 发送到个人
- if FEISHU_OPERATOR_OPEN_ID:
- try:
- personal_import_result = await import_to_feishu(
- ctx=ctx,
- xlsx_path=str(xlsx_path),
- send_im=True,
- chat_id=FEISHU_OPERATOR_OPEN_ID,
- preamble=message,
- )
- meta_p = personal_import_result.metadata or {}
- if meta_p.get("url"):
- logger.info("飞书审批合并消息发送成功(个人): %s", meta_p["url"])
- if meta_p.get("im_sent"):
- feishu_sent = True
- # 从 send_message 返回值提取 P2P chat_id(用于后续轮询)
- p2p_chat = meta_p.get("im_chat_id")
- if p2p_chat and p2p_chat not in poll_chat_ids:
- poll_chat_ids.append(p2p_chat)
- logger.info("提取到 P2P chat_id: %s(用于轮询回复)", p2p_chat)
- else:
- # 回退:发 preamble + 文件附件
- logger.warning("飞书在线表格导入失败(个人),回退到文件附件 + 文本")
- try:
- res_txt = _feishu.send_message(to=FEISHU_OPERATOR_OPEN_ID, text=message)
- if hasattr(res_txt, 'chat_id') and res_txt.chat_id and res_txt.chat_id not in poll_chat_ids:
- poll_chat_ids.append(res_txt.chat_id)
- except Exception:
- pass
- file_result_personal = _feishu.send_file(
- to=FEISHU_OPERATOR_OPEN_ID,
- file=str(xlsx_path),
- file_name=f"广告审批_{datetime.now().strftime('%Y%m%d_%H%M')}.xlsx",
- )
- logger.info("飞书决策 Excel(文件)发送成功(个人): message_id=%s", file_result_personal.message_id)
- feishu_sent = True
- except Exception as e:
- logger.warning("发送合并消息到个人失败: %s", e)
- except Exception as e:
- logger.warning("飞书发消息失败: %s", e)
- # ✅ 兜底:如果 send_message 未返回 P2P chat_id,用 config 中的 FEISHU_OPERATOR_CHAT_ID
- if not poll_chat_ids and FEISHU_OPERATOR_CHAT_ID:
- poll_chat_ids.append(FEISHU_OPERATOR_CHAT_ID)
- logger.info("使用配置中的 FEISHU_OPERATOR_CHAT_ID 兜底: %s", FEISHU_OPERATOR_CHAT_ID)
- # 将 poll_chat_ids 存入请求状态(供 check_approval_status 使用)
- _approval_requests[request_id]["poll_chat_ids"] = poll_chat_ids
- logger.info("轮询目标 chat_ids: %s", poll_chat_ids)
- # 保存审批消息到文件(备份)
- approval_dir = _MINI_DIR / "outputs" / "approvals"
- approval_dir.mkdir(parents=True, exist_ok=True)
- msg_path = approval_dir / f"{request_id}.txt"
- msg_path.write_text(message, encoding="utf-8")
- # 快照当前 chat_history 里已有的 message_id(用于识别新回复)
- _operator_contact_name = _get_contact_name(FEISHU_OPERATOR_OPEN_ID)
- _known_message_ids: set = set()
- if _operator_contact_name:
- _known_message_ids = _snapshot_message_ids(_operator_contact_name)
- logger.info(
- "chat_history 快照:联系人=%s,已有消息=%d条",
- _operator_contact_name, len(_known_message_ids)
- )
- else:
- logger.warning(
- "无法识别运营联系人(open_id=%s),将回退到 HTTP 轮询",
- FEISHU_OPERATOR_OPEN_ID
- )
- # ─── 非阻塞模式 ───
- if not wait_for_reply:
- return ToolResult(
- title=f"审批请求已发送({len(tier2_ad_ids)}个待审批)",
- output=(
- f"审批请求 {request_id} 已{'通过飞书发送' if feishu_sent else '保存到文件(飞书发送失败)'}\n"
- f" 待审批: {len(tier2_ad_ids)} 个广告\n"
- f" 无需操作: {len(df_tier0)} 个广告(observe/hold)\n"
- f" 自动执行: {len(df_tier1)} 个广告(小幅调价)\n"
- f" 超时时间: {timeout_minutes} 分钟\n"
- f" 消息备份: {msg_path}\n\n"
- f"使用 check_approval_status(request_id='{request_id}') 检查审批结果"
- ),
- metadata={
- "request_id": request_id,
- "pending_count": len(tier2_ad_ids),
- "tier0_count": len(df_tier0),
- "tier1_count": len(df_tier1),
- "feishu_sent": feishu_sent,
- "msg_path": str(msg_path),
- },
- )
- # ═══ 等待飞书回复(本地 chat_history + HTTP 轮询双通道)═══
- timeout_at = datetime.now() + timedelta(minutes=timeout_minutes)
- use_local_poll = bool(_operator_contact_name)
- logger.info(
- "等待审批回复:mode=%s,超时 %d 分钟",
- "local+http" if use_local_poll else "http_poll",
- timeout_minutes,
- )
- poll_count = 0
- while datetime.now() < timeout_at:
- await asyncio.sleep(1 if use_local_poll else poll_interval_seconds)
- poll_count += 1
- remaining = (timeout_at - datetime.now()).total_seconds() / 60
- if poll_count % 30 == 0:
- logger.info("等待审批回复 #%d,剩余 %.1f 分钟", poll_count, remaining)
- # ─── 优先:读本地 chat_history(WebSocket 推送写入,1秒延迟)───
- if use_local_poll:
- try:
- new_texts = _poll_new_messages(_operator_contact_name, _known_message_ids)
- if new_texts:
- text = new_texts[0]
- _known_message_ids = _snapshot_message_ids(_operator_contact_name)
- _approval_requests[request_id].update({
- "status": "replied",
- "reply_content": text,
- "reply_at": datetime.now().isoformat(),
- "ad_ids": tier2_ad_ids,
- })
- logger.info("chat_history 收到运营回复(%d 秒): %s", poll_count, text[:200])
- ad_ids_str = ", ".join(str(x) for x in tier2_ad_ids[:10])
- if len(tier2_ad_ids) > 10:
- ad_ids_str += f"...共{len(tier2_ad_ids)}个"
- return ToolResult(
- title="运营已回复",
- output=(
- f"运营飞书回复原文: {text}\n"
- f"等待审批的广告ID: {ad_ids_str}\n"
- f"等待时间: {poll_count} 秒\n\n"
- f"请根据运营的自然语言回复判断后续操作:\n"
- f"- 运营同意/批准/通过 → 调用 execute_decisions 执行\n"
- f"- 运营拒绝/驳回 → 停止执行,告知原因\n"
- f"- 运营要求修改(如\"广告X不要暂停\"/\"降价的去掉\")→ modify_decisions → validate → 重新审批(等待再次明确'同意')"
- ),
- metadata={
- "request_id": request_id,
- "feishu_sent": feishu_sent,
- "msg_path": str(msg_path),
- "poll_count": poll_count,
- "raw_reply": text,
- "ad_ids": tier2_ad_ids,
- "poll_mode": "local_chat_history",
- },
- )
- except Exception as e:
- logger.debug("读取 chat_history 失败(将重试): %s", e)
- # 每 30 秒同时尝试 HTTP 轮询(兜底:WebSocket 断连时仍可检测)
- if poll_count % 30 != 0:
- continue
- # ─── HTTP 轮询(主通道 / 每 30 秒兜底)───
- try:
- for chat_id in poll_chat_ids:
- result = _feishu.get_message_list(
- chat_id=chat_id,
- start_time=sent_time_sec,
- page_size=10,
- )
- if result and result.get("items"):
- for msg in result["items"]:
- sender_type = msg.get("sender_type", "")
- if sender_type != "user":
- continue
- text = msg.get("content", "")
- if not text.strip():
- continue
- parsed = _parse_approval_reply(text, tier2_ad_ids)
- if parsed["status"] != "unknown":
- _approval_requests[request_id].update({
- "status": "replied",
- "reply_content": text,
- "reply_at": datetime.now().isoformat(),
- "ad_ids": tier2_ad_ids,
- })
- logger.info("HTTP 轮询收到运营回复: %s", text[:200])
- ad_ids_str = ", ".join(str(x) for x in tier2_ad_ids[:10])
- if len(tier2_ad_ids) > 10:
- ad_ids_str += f"...共{len(tier2_ad_ids)}个"
- return ToolResult(
- title="运营已回复",
- output=(
- f"运营飞书回复原文: {text}\n"
- f"等待审批的广告ID: {ad_ids_str}\n"
- f"等待时间: {poll_count * poll_interval_seconds} 秒\n\n"
- f"请根据运营的自然语言回复判断后续操作:\n"
- f"- 运营同意/批准/通过 → 调用 execute_decisions 执行\n"
- f"- 运营拒绝/驳回 → 停止执行,告知原因\n"
- f"- 运营要求修改(如\"广告X不要暂停\"/\"降价的去掉\")→ modify_decisions → validate → 重新审批(等待再次明确'同意')"
- ),
- metadata={
- "request_id": request_id,
- "feishu_sent": feishu_sent,
- "msg_path": str(msg_path),
- "poll_count": poll_count,
- "raw_reply": text,
- "ad_ids": tier2_ad_ids,
- "poll_mode": "http",
- },
- )
- except Exception as e:
- logger.debug("飞书读消息失败(将重试): %s", e)
- # ─── 超时 ───
- _approval_requests[request_id]["status"] = "timeout"
- logger.warning("审批请求 %s 超时(%d 分钟)", request_id, timeout_minutes)
- return ToolResult(
- title="审批超时",
- output=(
- f"请求 {request_id} 超时({timeout_minutes}分钟),需审批的 {len(tier2_ad_ids)} 个操作未执行\n"
- f"轮询次数: {poll_count}\n"
- f"消息备份: {msg_path}"
- ),
- metadata={
- "request_id": request_id,
- "status": "timeout",
- "pending_ad_ids": tier2_ad_ids,
- "feishu_sent": feishu_sent,
- "msg_path": str(msg_path),
- "poll_count": poll_count,
- },
- )
- except Exception as e:
- logger.error("send_approval_request 失败: %s", e, exc_info=True)
- return ToolResult(title="send_approval_request 失败", output=str(e))
- # ═══════════════════════════════════════════
- # 工具:检查审批状态
- # ═══════════════════════════════════════════
- @tool(description="检查运营飞书审批结果")
- async def check_approval_status(
- ctx: ToolContext = None,
- request_id: str = "",
- ) -> ToolResult:
- """
- 检查审批请求的状态。通过飞书 API 读取最新消息。
- Args:
- request_id: 审批请求 ID(send_approval_request 返回)
- """
- try:
- request = _approval_requests.get(request_id)
- if not request:
- return ToolResult(
- title="审批请求不存在",
- output=f"未找到请求 {request_id}。可能已过期或重启后丢失。",
- )
- # 检查超时
- timeout_at = datetime.fromisoformat(request["timeout_at"])
- if datetime.now() > timeout_at:
- request["status"] = "timeout"
- return ToolResult(
- title="审批已超时",
- output=f"请求 {request_id} 已超时({IM_APPROVAL_TIMEOUT_MINUTES}分钟)",
- metadata={"status": "timeout", "request_id": request_id},
- )
- if request["status"] != "pending":
- return ToolResult(
- title=f"审批状态: {request['status']}",
- output=json.dumps(request, ensure_ascii=False, indent=2),
- metadata=request,
- )
- # 通过框架 FeishuClient 读取审批消息之后的回复
- try:
- created_at_sec = str(int(
- datetime.fromisoformat(request["created_at"]).timestamp()
- ))
- # ✅ 修复:使用请求中存储的真实 chat_id(从 send_message 返回值提取)
- chat_ids_to_check = request.get("poll_chat_ids", [])
- if not chat_ids_to_check:
- logger.warning("请求 %s 没有 poll_chat_ids,可能是旧版请求", request_id)
- for chat_id in chat_ids_to_check:
- result = _feishu.get_message_list(
- chat_id=chat_id,
- start_time=created_at_sec,
- page_size=10,
- )
- if result and result.get("items"):
- for msg in result["items"]:
- sender_id = msg.get("sender_id", "")
- sender_type = msg.get("sender_type", "")
- logger.debug(
- "飞书消息(check) [%s]: sender_type=%s, sender_id=%s, content=%s",
- chat_id, sender_type, sender_id, str(msg.get("content", ""))[:100],
- )
- # ✅ 修改:接受任何用户的回复(不限制特定个人)
- if sender_type != "user":
- continue
- text = msg.get("content", "")
- if not text.strip():
- continue
- # 检测到运营回复,返回原文给 Agent 理解
- parsed = _parse_approval_reply(text, request["ad_ids"])
- if parsed["status"] != "unknown":
- request.update({
- "status": "replied",
- "reply_content": text,
- "reply_at": datetime.now().isoformat(),
- })
- ad_ids = request["ad_ids"]
- ad_ids_str = ", ".join(str(x) for x in ad_ids[:10])
- if len(ad_ids) > 10:
- ad_ids_str += f"...共{len(ad_ids)}个"
- return ToolResult(
- title="运营已回复",
- output=(
- f"运营飞书回复原文: {text}\n"
- f"等待审批的广告ID: {ad_ids_str}\n\n"
- f"请根据运营的自然语言回复判断后续操作:\n"
- f"- 运营同意/批准/通过 → 调用 execute_decisions 执行\n"
- f"- 运营拒绝/驳回 → 停止执行,告知原因\n"
- f"- 运营要求修改 → modify_decisions → validate → 重新审批(等待再次明确'同意')"
- ),
- metadata={"request_id": request_id, "raw_reply": text, "ad_ids": ad_ids},
- )
- except Exception as e:
- logger.debug("飞书读消息失败: %s", e)
- remaining = (timeout_at - datetime.now()).total_seconds() / 60
- return ToolResult(
- title="等待审批中",
- output=f"请求 {request_id} 等待飞书审批中(剩余 {remaining:.0f} 分钟)",
- metadata={"status": "pending", "request_id": request_id, "remaining_minutes": round(remaining, 1)},
- )
- except Exception as e:
- logger.error("check_approval_status 失败: %s", e, exc_info=True)
- return ToolResult(title="check_approval_status 失败", output=str(e))
- # ═══════════════════════════════════════════
- # 飞书文字消息(用于执行后向运营汇报)
- # ═══════════════════════════════════════════
- @tool(description="向运营飞书发送纯文本消息(用于执行后汇报 diff、确认、质疑回应等非审批场景)")
- async def send_feishu_text_message(
- ctx: ToolContext = None,
- text: str = "",
- to_operator: bool = True,
- to_project_chat: bool = True,
- ) -> ToolResult:
- """
- 向运营个人 IM 和/或投放项目群发送一条纯文本消息。
- 使用场景(必须显式调用):
- - 收到"部分批准"反馈后,发送执行 diff 表(本轮执行 X 条 / 保留 Y 条 / 不变 Z 条)
- - 质疑型反馈的详细回应(对比、历史、ROI 置信度)
- - 连续 2 轮未达成一致时主动提议"本轮暂停"
- - 任何"告知但不需要审批"的汇报
- ⚠️ 不要用于:首次审批请求(用 send_approval_request);表格链接发送(用 import_to_feishu)。
- Args:
- text: 消息正文(支持换行,建议 < 2 KB 单屏可读)。
- to_operator: 发送到运营个人 IM(FEISHU_OPERATOR_OPEN_ID)。默认 True。
- to_project_chat: 抄送到投放项目群(FEISHU_AD_PROJECT_CHAT_ID)。默认 True;未配置时自动跳过。
- Returns:
- ToolResult,包含发送目标清单和状态。
- """
- if not IM_ENABLED:
- return ToolResult(title="IM 未启用", output="IM_ENABLED=False,消息未发送")
- if not text or not text.strip():
- return ToolResult(title="消息为空", output="text 参数为空,拒绝发送")
- sent_targets: List[str] = []
- failed_targets: List[str] = []
- if to_operator and FEISHU_OPERATOR_OPEN_ID:
- try:
- _feishu.send_message(to=FEISHU_OPERATOR_OPEN_ID, text=text)
- sent_targets.append(f"个人 IM ({FEISHU_OPERATOR_OPEN_ID[:12]}...)")
- logger.info("飞书文字消息发送成功(个人): len=%d", len(text))
- except Exception as e:
- failed_targets.append(f"个人 IM: {e}")
- logger.error("飞书文字消息发送失败(个人): %s", e)
- if to_project_chat and FEISHU_AD_PROJECT_CHAT_ID:
- try:
- _feishu.send_message(to=FEISHU_AD_PROJECT_CHAT_ID, text=text)
- sent_targets.append(f"项目群 ({FEISHU_AD_PROJECT_CHAT_ID[:12]}...)")
- logger.info("飞书文字消息发送成功(群聊): len=%d", len(text))
- except Exception as e:
- failed_targets.append(f"项目群: {e}")
- logger.error("飞书文字消息发送失败(群聊): %s", e)
- if not sent_targets and not failed_targets:
- return ToolResult(
- title="无可用发送目标",
- output="to_operator/to_project_chat 均未启用或对应 ID 未配置",
- )
- summary_lines = []
- if sent_targets:
- summary_lines.append(f"✅ 已发送 {len(sent_targets)} 个目标:")
- for t in sent_targets:
- summary_lines.append(f" - {t}")
- if failed_targets:
- summary_lines.append(f"❌ 失败 {len(failed_targets)} 个目标:")
- for t in failed_targets:
- summary_lines.append(f" - {t}")
- return ToolResult(
- title=f"飞书文字消息已发送({len(sent_targets)} 个目标)",
- output="\n".join(summary_lines),
- metadata={
- "sent_count": len(sent_targets),
- "failed_count": len(failed_targets),
- "text_length": len(text),
- },
- )
|