Просмотр исходного кода

feat(auto_put_ad_mini): 审批消息增强 — 框架 FeishuClient + Excel 附件

- 删除自定义 _FeishuClient 类,改用框架 FeishuClient(零框架修改)
- 审批消息新增 Excel 文件附件(复用 report_generator._write_xlsx_with_format)
- 轮询改用 get_message_list(start_time=秒) 替代手动 HTTP + JSON 解析
- 修复飞书 API start_time 时间戳单位(秒 vs 毫秒)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
刘立冬 3 недель назад
Родитель
Сommit
ec47455e5c
1 измененных файлов с 539 добавлено и 0 удалено
  1. 539 0
      examples/auto_put_ad_mini/tools/im_approval.py

+ 539 - 0
examples/auto_put_ad_mini/tools/im_approval.py

@@ -0,0 +1,539 @@
+"""
+IM 审批流 — auto_put_ad_mini(飞书直连版)
+
+职责:
+  - 通过飞书 API 发送审批消息给运营(文本摘要 + Excel 附件)
+  - 轮询飞书聊天获取审批回复
+  - 支持阻塞式等待(工具内部 sleep+poll,不消耗 Agent iteration)
+
+飞书能力(通过框架 FeishuClient):
+  - 发文本消息:send_message(to, text)
+  - 发文件附件:send_file(to, file, file_name)
+  - 读消息列表:get_message_list(chat_id, start_time, page_size)
+"""
+
+import asyncio
+import json
+import logging
+import sys
+import time
+import uuid
+from datetime import datetime, timedelta
+from pathlib import Path
+from typing import Dict, List, Optional
+
+import pandas as pd
+
+from agent.tools import tool
+from agent.tools.models import ToolContext, ToolResult
+from agent.tools.builtin.feishu.feishu_client import FeishuClient
+
+_MINI_DIR = Path(__file__).resolve().parent.parent
+_TOOLS_DIR = Path(__file__).resolve().parent
+if str(_MINI_DIR) not in sys.path:
+    sys.path.insert(0, str(_MINI_DIR))
+if str(_TOOLS_DIR) not in sys.path:
+    sys.path.insert(0, str(_TOOLS_DIR))
+
+from config import (
+    IM_ENABLED,
+    IM_APPROVAL_TIMEOUT_MINUTES,
+    IM_APPROVAL_POLL_INTERVAL_SECONDS,
+    FEISHU_APP_ID,
+    FEISHU_APP_SECRET,
+    FEISHU_OPERATOR_OPEN_ID,
+    FEISHU_OPERATOR_CHAT_ID,
+)
+
+logger = logging.getLogger(__name__)
+
+# 审批请求状态
+_approval_requests: Dict[str, Dict] = {}
+
+# 全局客户端实例(框架 FeishuClient,自动管理 token)
+_feishu = FeishuClient(app_id=FEISHU_APP_ID, app_secret=FEISHU_APP_SECRET)
+
+
+# ═══════════════════════════════════════════
+# 审批 Excel 生成
+# ═══════════════════════════════════════════
+
+# 审批表精选列(运营审阅所需的关键指标)
+APPROVAL_COLUMNS = [
+    "ad_id", "ad_name", "audience_tier",
+    "ad_age_days", "bid_amount",
+    "cost_7d_avg", "f_7日动态ROI", "f_7日动态ROI_mean_all",
+    "action", "reason",
+    "recommended_change_pct", "current_bid", "recommended_bid",
+    "guardrail_status", "final_action", "final_bid",
+]
+
+
+def _generate_approval_xlsx(df_tier2_3: pd.DataFrame, request_id: str) -> Path:
+    """生成审批专用 Excel(仅需审批的 Tier 2/3 广告)
+
+    复用 report_generator._write_xlsx_with_format 生成带颜色标记的 Excel。
+    文件保存到 outputs/approvals/{request_id}.xlsx。
+    """
+    from report_generator import _write_xlsx_with_format
+
+    approval_dir = _MINI_DIR / "outputs" / "approvals"
+    approval_dir.mkdir(parents=True, exist_ok=True)
+    xlsx_path = approval_dir / f"{request_id}.xlsx"
+
+    # 精选列(仅保留 df 中存在的列)
+    cols = [c for c in APPROVAL_COLUMNS if c in df_tier2_3.columns]
+    df_out = df_tier2_3[cols].copy()
+
+    _write_xlsx_with_format(df_out, xlsx_path)
+    return xlsx_path
+
+
+# ═══════════════════════════════════════════
+# 格式化审批消息
+# ═══════════════════════════════════════════
+
+
+def _format_approval_message(df_tier2: pd.DataFrame, df_tier1: pd.DataFrame, request_id: str) -> str:
+    lines = [
+        "📊 广告调控审批请求",
+        f"请求ID: {request_id}",
+        f"时间: {datetime.now().strftime('%Y-%m-%d %H:%M')}",
+        "",
+    ]
+
+    # Tier 2/3: 需审批
+    if not df_tier2.empty:
+        lines.append(f"🔶 需审批操作({len(df_tier2)} 个):")
+        lines.append("-" * 40)
+
+        for _, row in df_tier2.iterrows():
+            ad_id = row.get("ad_id", "")
+            action = row.get("final_action", row.get("action", ""))
+            ad_name = str(row.get("ad_name", ""))[:20]
+            reason = str(row.get("reason", ""))[:60]
+            cost_avg = row.get("cost_7d_avg", 0)
+
+            if action == "pause":
+                action_label = "⏸️ 暂停"
+            elif action == "bid_down":
+                pct = row.get("recommended_change_pct", 0)
+                if isinstance(pct, str):
+                    try:
+                        pct = float(pct)
+                    except ValueError:
+                        pct = 0
+                action_label = f"⬇️ 降价{abs(pct)*100:.0f}%"
+            elif action == "bid_up":
+                pct = row.get("recommended_change_pct", 0)
+                if isinstance(pct, str):
+                    try:
+                        pct = float(pct)
+                    except ValueError:
+                        pct = 0
+                action_label = f"⬆️ 提价{pct*100:.0f}%"
+            else:
+                action_label = action
+
+            lines.append(f"  [{ad_id}] {ad_name}")
+            lines.append(f"    操作: {action_label} | 日均消耗: {cost_avg:.0f}元")
+            lines.append(f"    原因: {reason}")
+            lines.append("")
+
+    # Tier 1: 已自动执行(通知)
+    if not df_tier1.empty:
+        lines.append(f"✅ 已自动执行({len(df_tier1)} 个,仅通知):")
+        for _, row in df_tier1.iterrows():
+            ad_id = row.get("ad_id", "")
+            action = row.get("final_action", row.get("action", ""))
+            lines.append(f"  [{ad_id}] {action}")
+        lines.append("")
+
+    # 回复指令
+    lines.extend([
+        "-" * 40,
+        "📝 回复指令:",
+        "  approve        — 全部批准",
+        "  approve 12345,23456  — 仅批准指定广告",
+        "  reject         — 全部拒绝",
+        "  reject 12345   — 拒绝指定广告",
+        f"  ⏰ 超时时间: {IM_APPROVAL_TIMEOUT_MINUTES} 分钟",
+        "",
+        "📎 决策详情请查看附件 Excel 表格",
+    ])
+
+    return "\n".join(lines)
+
+
+# ═══════════════════════════════════════════
+# 解析审批回复
+# ═══════════════════════════════════════════
+
+
+def _parse_approval_reply(content: str, all_ad_ids: List[int]) -> Dict:
+    content = content.strip().lower()
+
+    if content.startswith("approve"):
+        parts = content.replace("approve", "").strip()
+        if not parts:
+            return {"status": "approved", "approved_ids": all_ad_ids, "rejected_ids": []}
+        else:
+            ids = [int(x.strip()) for x in parts.split(",") if x.strip().isdigit()]
+            rejected = [aid for aid in all_ad_ids if aid not in ids]
+            return {
+                "status": "partially_approved" if rejected else "approved",
+                "approved_ids": ids,
+                "rejected_ids": rejected,
+            }
+
+    elif content.startswith("reject"):
+        parts = content.replace("reject", "").strip()
+        if not parts:
+            return {"status": "rejected", "approved_ids": [], "rejected_ids": all_ad_ids}
+        else:
+            rejected_ids = [int(x.strip()) for x in parts.split(",") if x.strip().isdigit()]
+            approved = [aid for aid in all_ad_ids if aid not in rejected_ids]
+            return {
+                "status": "partially_approved" if approved else "rejected",
+                "approved_ids": approved,
+                "rejected_ids": rejected_ids,
+            }
+
+    return {"status": "unknown", "approved_ids": [], "rejected_ids": all_ad_ids, "raw_reply": content}
+
+
+# ═══════════════════════════════════════════
+# 工具:发送审批请求(飞书版)
+# ═══════════════════════════════════════════
+
+
+@tool(description="通过飞书发送决策摘要给运营,收集审批结果(支持阻塞等待)")
+async def send_approval_request(
+    ctx: ToolContext,
+    validated_csv: str = "",
+    timeout_minutes: int = IM_APPROVAL_TIMEOUT_MINUTES,
+    wait_for_reply: bool = True,
+    poll_interval_seconds: int = IM_APPROVAL_POLL_INTERVAL_SECONDS,
+) -> ToolResult:
+    """
+    发送结构化审批请求到飞书。
+
+    流程:
+    1. 读取验证后的决策 CSV,分级(Tier 1/2/3)
+    2. 生成审批 Excel(仅 Tier 2/3 广告 + 关键指标列)
+    3. 发送飞书消息 x2:文本摘要 + Excel 文件附件
+    4. 阻塞轮询等待回复(30s 间隔 x 最长 30 分钟)
+    5. 解析 approve/reject -> 返回结果
+
+    Args:
+        validated_csv: 护栏验证后的 CSV 路径
+        timeout_minutes: 审批超时时间(分钟)
+        wait_for_reply: 是否阻塞等待回复
+        poll_interval_seconds: 轮询间隔(秒)
+    """
+    try:
+        if not IM_ENABLED:
+            return ToolResult(
+                title="IM 审批未启用",
+                output="IM_ENABLED=False,跳过飞书审批。需审批的操作已记录到执行日志。",
+            )
+
+        if not FEISHU_OPERATOR_CHAT_ID:
+            return ToolResult(
+                title="飞书配置缺失",
+                output="FEISHU_OPERATOR_CHAT_ID 未配置,无法发送审批请求。",
+            )
+
+        # 查找验证 CSV
+        if not validated_csv:
+            reports_dir = _MINI_DIR / "outputs" / "reports"
+            candidates = sorted(reports_dir.glob("validated_decisions_*.csv"), reverse=True)
+            if not candidates:
+                return ToolResult(title="send_approval_request", output="未找到验证后的决策 CSV")
+            validated_csv = str(candidates[0])
+
+        df = pd.read_csv(validated_csv)
+        if df.empty:
+            return ToolResult(title="send_approval_request", output="决策数据为空")
+
+        # 筛选有操作的决策
+        df_active = df[df["final_action"] != "hold"].copy()
+        if df_active.empty:
+            return ToolResult(title="send_approval_request", output="无需审批的操作")
+
+        # 分级
+        from execution_engine import _classify_tier
+        df_active["tier"] = df_active.apply(_classify_tier, axis=1)
+
+        df_tier1 = df_active[df_active["tier"] == 1]
+        df_tier2_3 = df_active[df_active["tier"] >= 2]
+
+        if df_tier2_3.empty:
+            return ToolResult(
+                title="无需审批",
+                output=f"所有 {len(df_tier1)} 个操作均为 Tier 1(自动执行),无需审批",
+            )
+
+        # 生成审批请求
+        request_id = f"req_{datetime.now().strftime('%Y%m%d_%H%M%S')}_{uuid.uuid4().hex[:6]}"
+        message = _format_approval_message(df_tier2_3, df_tier1, request_id)
+
+        # 保存请求状态
+        tier2_ad_ids = df_tier2_3["ad_id"].astype(int).tolist()
+        _approval_requests[request_id] = {
+            "status": "pending",
+            "created_at": datetime.now().isoformat(),
+            "timeout_at": (datetime.now() + timedelta(minutes=timeout_minutes)).isoformat(),
+            "ad_ids": tier2_ad_ids,
+            "validated_csv": validated_csv,
+        }
+
+        # ─── 通过飞书 API 发送审批消息(文本 + Excel) ───
+        feishu_sent = False
+        sent_time_sec = str(int(time.time()))  # 飞书 API start_time 单位:秒
+        try:
+            # 消息 1:文本摘要
+            result = _feishu.send_message(to=FEISHU_OPERATOR_CHAT_ID, text=message)
+            feishu_sent = True
+            logger.info("飞书审批消息发送成功: message_id=%s", result.message_id)
+
+            # 消息 2:Excel 文件附件(决策详情)
+            try:
+                xlsx_path = _generate_approval_xlsx(df_tier2_3, request_id)
+                file_result = _feishu.send_file(
+                    to=FEISHU_OPERATOR_CHAT_ID,
+                    file=str(xlsx_path),
+                    file_name=f"审批决策表_{request_id}.xlsx",
+                )
+                logger.info("飞书审批 Excel 发送成功: message_id=%s", file_result.message_id)
+            except Exception as e:
+                logger.warning("飞书审批 Excel 发送失败(不影响审批流程): %s", e)
+        except Exception as e:
+            logger.warning("飞书发消息失败: %s", e)
+
+        # 保存审批消息到文件(备份)
+        approval_dir = _MINI_DIR / "outputs" / "approvals"
+        approval_dir.mkdir(parents=True, exist_ok=True)
+        msg_path = approval_dir / f"{request_id}.txt"
+        msg_path.write_text(message, encoding="utf-8")
+
+        # ─── 非阻塞模式 ───
+        if not wait_for_reply:
+            return ToolResult(
+                title=f"审批请求已发送({len(tier2_ad_ids)}个待审批)",
+                output=(
+                    f"审批请求 {request_id} 已{'通过飞书发送' if feishu_sent else '保存到文件(飞书发送失败)'}\n"
+                    f"  待审批: {len(tier2_ad_ids)} 个广告\n"
+                    f"  已自动执行: {len(df_tier1)} 个广告\n"
+                    f"  超时时间: {timeout_minutes} 分钟\n"
+                    f"  消息备份: {msg_path}\n\n"
+                    f"使用 check_approval_status(request_id='{request_id}') 检查审批结果"
+                ),
+                metadata={
+                    "request_id": request_id,
+                    "pending_count": len(tier2_ad_ids),
+                    "auto_count": len(df_tier1),
+                    "feishu_sent": feishu_sent,
+                    "msg_path": str(msg_path),
+                },
+            )
+
+        # ═══ 阻塞轮询等待飞书回复 ═══
+        timeout_at = datetime.now() + timedelta(minutes=timeout_minutes)
+        logger.info(
+            "阻塞等待飞书审批回复(超时 %d 分钟,间隔 %d 秒)...",
+            timeout_minutes,
+            poll_interval_seconds,
+        )
+
+        poll_count = 0
+        while datetime.now() < timeout_at:
+            await asyncio.sleep(poll_interval_seconds)
+            poll_count += 1
+
+            remaining = (timeout_at - datetime.now()).total_seconds() / 60
+            if poll_count % 10 == 0:
+                logger.info("飞书审批轮询 #%d,剩余 %.1f 分钟", poll_count, remaining)
+
+            # 读取飞书聊天中审批消息之后的回复
+            try:
+                result = _feishu.get_message_list(
+                    chat_id=FEISHU_OPERATOR_CHAT_ID,
+                    start_time=sent_time_sec,
+                    page_size=10,
+                )
+                if result and result.get("items"):
+                    for msg in result["items"]:
+                        sender_id = msg.get("sender_id", "")
+                        sender_type = msg.get("sender_type", "")
+
+                        # 只看指定运营的用户消息(非机器人)
+                        if sender_type != "user" or sender_id != FEISHU_OPERATOR_OPEN_ID:
+                            continue
+
+                        # 框架已自动解析 text 消息的 JSON -> 纯文本
+                        text = msg.get("content", "")
+                        if not text.strip():
+                            continue
+
+                        # 解析审批回复
+                        parsed = _parse_approval_reply(text, tier2_ad_ids)
+                        if parsed["status"] != "unknown":
+                            _approval_requests[request_id].update({
+                                "status": parsed["status"],
+                                "approved_ids": parsed.get("approved_ids", []),
+                                "rejected_ids": parsed.get("rejected_ids", []),
+                                "reply_content": text,
+                                "reply_at": datetime.now().isoformat(),
+                            })
+
+                            logger.info(
+                                "飞书审批回复: %s(批准 %d / 拒绝 %d)",
+                                parsed["status"],
+                                len(parsed.get("approved_ids", [])),
+                                len(parsed.get("rejected_ids", [])),
+                            )
+
+                            return ToolResult(
+                                title=f"审批结果: {parsed['status']}",
+                                output=(
+                                    f"运营飞书回复: {text}\n"
+                                    f"状态: {parsed['status']}\n"
+                                    f"批准: {len(parsed.get('approved_ids', []))} 个广告\n"
+                                    f"拒绝: {len(parsed.get('rejected_ids', []))} 个广告\n"
+                                    f"等待时间: {poll_count * poll_interval_seconds} 秒"
+                                ),
+                                metadata={
+                                    "request_id": request_id,
+                                    "feishu_sent": feishu_sent,
+                                    "msg_path": str(msg_path),
+                                    "poll_count": poll_count,
+                                    **parsed,
+                                },
+                            )
+            except Exception as e:
+                logger.debug("飞书读消息失败(将重试): %s", e)
+
+        # ─── 超时 ───
+        _approval_requests[request_id]["status"] = "timeout"
+        logger.warning("审批请求 %s 超时(%d 分钟)", request_id, timeout_minutes)
+
+        return ToolResult(
+            title="审批超时",
+            output=(
+                f"请求 {request_id} 超时({timeout_minutes}分钟),需审批的 {len(tier2_ad_ids)} 个操作未执行\n"
+                f"轮询次数: {poll_count}\n"
+                f"消息备份: {msg_path}"
+            ),
+            metadata={
+                "request_id": request_id,
+                "status": "timeout",
+                "pending_ad_ids": tier2_ad_ids,
+                "feishu_sent": feishu_sent,
+                "msg_path": str(msg_path),
+                "poll_count": poll_count,
+            },
+        )
+
+    except Exception as e:
+        logger.error("send_approval_request 失败: %s", e, exc_info=True)
+        return ToolResult(title="send_approval_request 失败", output=str(e))
+
+
+# ═══════════════════════════════════════════
+# 工具:检查审批状态
+# ═══════════════════════════════════════════
+
+
+@tool(description="检查运营飞书审批结果")
+async def check_approval_status(
+    ctx: ToolContext,
+    request_id: str,
+) -> ToolResult:
+    """
+    检查审批请求的状态。通过飞书 API 读取最新消息。
+
+    Args:
+        request_id: 审批请求 ID(send_approval_request 返回)
+    """
+    try:
+        request = _approval_requests.get(request_id)
+        if not request:
+            return ToolResult(
+                title="审批请求不存在",
+                output=f"未找到请求 {request_id}。可能已过期或重启后丢失。",
+            )
+
+        # 检查超时
+        timeout_at = datetime.fromisoformat(request["timeout_at"])
+        if datetime.now() > timeout_at:
+            request["status"] = "timeout"
+            return ToolResult(
+                title="审批已超时",
+                output=f"请求 {request_id} 已超时({IM_APPROVAL_TIMEOUT_MINUTES}分钟)",
+                metadata={"status": "timeout", "request_id": request_id},
+            )
+
+        if request["status"] != "pending":
+            return ToolResult(
+                title=f"审批状态: {request['status']}",
+                output=json.dumps(request, ensure_ascii=False, indent=2),
+                metadata=request,
+            )
+
+        # 通过框架 FeishuClient 读取审批消息之后的回复
+        try:
+            created_at_sec = str(int(
+                datetime.fromisoformat(request["created_at"]).timestamp()
+            ))
+            result = _feishu.get_message_list(
+                chat_id=FEISHU_OPERATOR_CHAT_ID,
+                start_time=created_at_sec,
+                page_size=10,
+            )
+
+            if result and result.get("items"):
+                for msg in result["items"]:
+                    sender_id = msg.get("sender_id", "")
+                    sender_type = msg.get("sender_type", "")
+
+                    if sender_type != "user" or sender_id != FEISHU_OPERATOR_OPEN_ID:
+                        continue
+
+                    # 框架已自动解析 text 消息的 JSON -> 纯文本
+                    text = msg.get("content", "")
+                    if not text.strip():
+                        continue
+
+                    parsed = _parse_approval_reply(text, request["ad_ids"])
+                    if parsed["status"] != "unknown":
+                        request.update({
+                            "status": parsed["status"],
+                            "approved_ids": parsed.get("approved_ids", []),
+                            "rejected_ids": parsed.get("rejected_ids", []),
+                            "reply_content": text,
+                            "reply_at": datetime.now().isoformat(),
+                        })
+                        return ToolResult(
+                            title=f"审批结果: {parsed['status']}",
+                            output=(
+                                f"运营飞书回复: {text}\n"
+                                f"状态: {parsed['status']}\n"
+                                f"批准: {len(parsed.get('approved_ids', []))} 个\n"
+                                f"拒绝: {len(parsed.get('rejected_ids', []))} 个"
+                            ),
+                            metadata={"request_id": request_id, **parsed},
+                        )
+        except Exception as e:
+            logger.debug("飞书读消息失败: %s", e)
+
+        remaining = (timeout_at - datetime.now()).total_seconds() / 60
+        return ToolResult(
+            title="等待审批中",
+            output=f"请求 {request_id} 等待飞书审批中(剩余 {remaining:.0f} 分钟)",
+            metadata={"status": "pending", "request_id": request_id, "remaining_minutes": round(remaining, 1)},
+        )
+
+    except Exception as e:
+        logger.error("check_approval_status 失败: %s", e, exc_info=True)
+        return ToolResult(title="check_approval_status 失败", output=str(e))