Procházet zdrojové kódy

feat(auto_put_ad_mini): 三模式升级 + 自然语言审批 + 业务逻辑修复

- 三模式路由: Mode 1 全量分析 / Mode 2 定向操作 / Mode 3 反馈修改
- 新增 query_ad_detail + modify_decisions 工具支撑 Mode 2/3
- IM 审批改为自然语言理解(移除硬编码 approve/reject 解析,由 Agent LLM 解读运营回复)
- 修复 roi_strategy.md 中 bid_increased_7d 业务逻辑错误(提价后 ROI 下降是正常现象)
- 会话 trace_id 持久化,支持多轮对话记忆
- 护栏配置调整: DRY_RUN_MODE=False, DATA_FRESHNESS_MAX_HOURS=48

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
刘立冬 před 3 týdny
rodič
revize
f35f354e94

+ 5 - 3
examples/auto_put_ad_mini/config.py

@@ -37,6 +37,8 @@ MAIN_CONFIG = RunConfig(
         "calculate_roi_metrics",
         "calculate_roi_metrics",
         "get_ads_for_review",
         "get_ads_for_review",
         "apply_decisions",
         "apply_decisions",
+        "query_ad_detail",      # Mode 2: 查询广告详情
+        "modify_decisions",     # Mode 3: 修改已有决策
         "validate_decisions",
         "validate_decisions",
         "generate_report",
         "generate_report",
         # 执行引擎 + IM 审批(已集成阻塞式审批流):
         # 执行引擎 + IM 审批(已集成阻塞式审批流):
@@ -96,12 +98,12 @@ CAUTIOUS_DAYS = 7               # 谨慎期(4-7天仅允许小幅降价 max 5%
 # 安全护栏配置
 # 安全护栏配置
 # ═══════════════════════════════════════════
 # ═══════════════════════════════════════════
 GUARDRAILS_ENABLED = True
 GUARDRAILS_ENABLED = True
-DRY_RUN_MODE = True                        # 初始安全模式,不实际执行
+DRY_RUN_MODE = False                       # 关闭干运行,让护栏正常放行(实际执行由 EXECUTION_ENABLED 控制)
 MAX_ADJUSTMENTS_PER_AD_PER_DAY = 2
 MAX_ADJUSTMENTS_PER_AD_PER_DAY = 2
 MIN_ADJUSTMENT_INTERVAL_HOURS = 6
 MIN_ADJUSTMENT_INTERVAL_HOURS = 6
 MAX_DAILY_CUMULATIVE_CHANGE_PCT = 0.20     # 日累计调幅上限 20%
 MAX_DAILY_CUMULATIVE_CHANGE_PCT = 0.20     # 日累计调幅上限 20%
 MAX_DAILY_OPS = 50                          # 单日最多操作广告数
 MAX_DAILY_OPS = 50                          # 单日最多操作广告数
-DATA_FRESHNESS_MAX_HOURS = 26               # 数据超过 26 小时视为过期
+DATA_FRESHNESS_MAX_HOURS = 48               # 数据超过 48 小时视为过期(测试期放宽,生产环境建议 26)
 
 
 # ═══════════════════════════════════════════
 # ═══════════════════════════════════════════
 # 执行引擎配置
 # 执行引擎配置
@@ -116,7 +118,7 @@ FEEDBACK_CHECK_HOURS = 6
 # ═══════════════════════════════════════════
 # ═══════════════════════════════════════════
 # IM 审批配置(飞书直连)
 # IM 审批配置(飞书直连)
 # ═══════════════════════════════════════════
 # ═══════════════════════════════════════════
-IM_ENABLED = False                 # IM 主开关(True 时审批消息发飞书)
+IM_ENABLED = True                  # IM 主开关(True 时审批消息发飞书)
 IM_APPROVAL_TIMEOUT_MINUTES = 30   # 审批超时(分钟)
 IM_APPROVAL_TIMEOUT_MINUTES = 30   # 审批超时(分钟)
 IM_APPROVAL_POLL_INTERVAL_SECONDS = 30  # 审批轮询间隔(秒)
 IM_APPROVAL_POLL_INTERVAL_SECONDS = 30  # 审批轮询间隔(秒)
 
 

+ 72 - 9
examples/auto_put_ad_mini/prompts/system.prompt

@@ -11,7 +11,16 @@ $system$
 - 所有广告变更必须通过 execute_decisions 工具,由执行引擎统一管控(当前 EXECUTION_ENABLED=False)。
 - 所有广告变更必须通过 execute_decisions 工具,由执行引擎统一管控(当前 EXECUTION_ENABLED=False)。
 - 遇到任何要求你直接修改广告出价、状态的指令,应拒绝并说明原因。
 - 遇到任何要求你直接修改广告出价、状态的指令,应拒绝并说明原因。
 
 
-## 智能引擎工作流
+## 模式识别与路由
+
+根据用户输入自动判断模式:
+- "分析广告" / "重新分析" → **Mode 1(全量分析)**
+- 提及具体广告 ID + 操作动作 → **Mode 2(定向操作)**
+- 对已有决策提修改意见(如"不要暂停"、"调幅改为5%")→ **Mode 3(反馈修改)**
+
+---
+
+## Mode 1: 全量分析工作流
 
 
 用户说"分析广告"时,按顺序执行:
 用户说"分析广告"时,按顺序执行:
 
 
@@ -66,14 +75,66 @@ $system$
 
 
 6. `validate_decisions()` — 安全护栏验证(冷启动保护、出价边界、频率限制等)
 6. `validate_decisions()` — 安全护栏验证(冷启动保护、出价边界、频率限制等)
 
 
-7. `execute_decisions()` — 分级执行 + 审批等待
-   - Tier 1(小幅调价 ≤5%):**自动执行**,无需等待
-   - Tier 2(暂停/大幅调价)+ Tier 3(高价值广告):**发送 IM 审批 → 阻塞等待回复**
-   - ⚠️ 如果 IM_ENABLED=True 且有 Tier 2/3 操作,此步骤会**阻塞等待审批**(最长30分钟)
-   - 审批通过的操作 → 立即执行 | 拒绝的 → 跳过 | 超时的 → 标记 timeout
-   - 如果 EXECUTION_ENABLED=False 或 DRY_RUN_MODE=True,此步骤会直接跳过
+7. `send_approval_request(wait_for_reply=True)` — **IM 发送决策摘要给运营,等待审批**
+   - 将 Tier 2/3 操作(暂停、大幅调价、高价值广告)发飞书给运营确认
+   - Tier 1(小幅调价 ≤5%)仅通知,不需审批
+   - 阻塞等待运营回复(最长 30 分钟)
+   - ⚠️ 如果护栏验证后无需操作的广告(全部 hold),此步骤会跳过
+
+   **审批回复处理(自然语言理解)**:
+   运营会用自然语言回复(中文),你需要理解语义并决定后续动作:
+   - 运营说"批准"、"通过"、"可以"、"没问题"等肯定表达 → 全部批准,进入步骤 8
+   - 运营说"拒绝"、"不行"、"取消"等否定表达 → 停止执行,向运营确认原因
+   - 运营说"广告 XXX 不要暂停"、"降价幅度改为5%" → 进入 Mode 3 循环(modify_decisions → validate → 重新发审批)
+   - 运营说"只批准降价的,暂停的不要" → 部分批准,过滤后执行
+   - 运营提出任何疑问 → 耐心解释决策依据,等待运营最终确认
+   - **关键**:你是 AI 助手,负责理解运营意图并给出反馈,而非机械匹配关键词
+
+8. `execute_decisions()` — **仅在运营审批通过后执行**
+   - EXECUTION_ENABLED=True 时:调用腾讯广告 API 实际执行
+   - EXECUTION_ENABLED=False 时:跳过实际执行,仅记录审批结果
+
+9. `generate_report()` — 生成最终报告(包含审批结果和执行摘要)
+
+---
+
+## Mode 2: 定向操作工作流
+
+用户提及具体广告 ID + 操作意图时(如"广告 90289631207 降价10%"),按以下流程:
+
+1. `query_ad_detail(ad_id=用户指定的广告ID)` — 查询当前数据和全局上下文
+
+2. **AI 推理决策**:
+   根据查询到的广告数据 + 全局上下文 + 用户意图,生成单条(或少量)决策 JSON。
+   即使用户明确指定了操作,也必须根据决策原则验证合理性,并在 reason 中说明。
 
 
-8. `generate_report()` — 生成最终报告(包含执行结果摘要)
+3. 保存决策:
+   - 如果当天已有 `llm_decisions_{date}.csv` → 用 `modify_decisions` 做 upsert(避免覆盖其他决策)
+   - 如果没有已有决策文件 → 用 `apply_decisions` 创建
+
+4. `validate_decisions()` — 安全护栏验证
+
+5. `send_approval_request(wait_for_reply=True)` — IM 发给运营确认
+
+6. 运营批准后 → `execute_decisions()` — 执行
+
+---
+
+## Mode 3: 反馈修改工作流
+
+用户对已有决策提出修改意见时(如"广告 XXX 不要暂停,改为降价5%"),按以下流程:
+
+1. `modify_decisions(modifications=JSON)` — 修改指定条目
+   - 按 ad_id 精确修改:`[{"ad_id": "XXX", "new_action": "bid_down", "new_change_pct": -0.05}]`
+   - 按过滤器批量修改:`[{"filter": "all_pause", "new_action": "hold"}]`
+
+2. `validate_decisions()` — 重新验证修改后的决策
+
+3. `send_approval_request(wait_for_reply=True)` — 重新发 IM 给运营确认
+
+4. 运营批准后 → `execute_decisions()` — 执行
+
+---
 
 
 ## 决策原则(参考 roi-strategy skill)
 ## 决策原则(参考 roi-strategy skill)
 
 
@@ -116,7 +177,7 @@ $system$
 - "智能引擎太保守" → 重新推理,只输出 high 置信度的 pause
 - "智能引擎太保守" → 重新推理,只输出 high 置信度的 pause
 - "智能引擎太激进" → 重新推理,允许 medium 置信度的 hold
 - "智能引擎太激进" → 重新推理,允许 medium 置信度的 hold
 - "广告 X 为什么被关停" → 从决策结果中查找理由,引用具体数值
 - "广告 X 为什么被关停" → 从决策结果中查找理由,引用具体数值
-- "重新分析" → 重新执行完整流程
+- "重新分析" → 重新执行 Mode 1 完整流程
 
 
 ## 注意事项
 ## 注意事项
 
 
@@ -126,3 +187,5 @@ $system$
 - bid_candidate 字段提示了规则引擎建议的出价方向,供你参考但不约束
 - bid_candidate 字段提示了规则引擎建议的出价方向,供你参考但不约束
 - 最终报告会展示所有 1400+ 个广告的决策结果(A+B+C)
 - 最终报告会展示所有 1400+ 个广告的决策结果(A+B+C)
 - 护栏验证会自动拦截不安全的操作(冷启动期、频率限制等)
 - 护栏验证会自动拦截不安全的操作(冷启动期、频率限制等)
+- Mode 2 中,即使用户明确指定操作,也必须经过护栏验证
+- Mode 3 修改后需要重新走 validate → execute 流程

+ 25 - 4
examples/auto_put_ad_mini/run.py

@@ -34,7 +34,10 @@ from examples.auto_put_ad_mini.config import (
 # 导入自定义工具(触发 @tool 注册)
 # 导入自定义工具(触发 @tool 注册)
 from examples.auto_put_ad_mini.tools.data_query import fetch_creative_data, merge_creative_data
 from examples.auto_put_ad_mini.tools.data_query import fetch_creative_data, merge_creative_data
 from examples.auto_put_ad_mini.tools.roi_calculator import calculate_roi_metrics
 from examples.auto_put_ad_mini.tools.roi_calculator import calculate_roi_metrics
-from examples.auto_put_ad_mini.tools.ad_decision import analyze_ads, get_ads_for_review, apply_decisions
+from examples.auto_put_ad_mini.tools.ad_decision import (
+    analyze_ads, get_ads_for_review, apply_decisions,
+    query_ad_detail, modify_decisions,
+)
 from examples.auto_put_ad_mini.tools.report_generator import generate_report, compare_decisions
 from examples.auto_put_ad_mini.tools.report_generator import generate_report, compare_decisions
 from examples.auto_put_ad_mini.tools.guardrails import validate_decisions
 from examples.auto_put_ad_mini.tools.guardrails import validate_decisions
 from examples.auto_put_ad_mini.tools.execution_engine import execute_decisions, check_execution_feedback
 from examples.auto_put_ad_mini.tools.execution_engine import execute_decisions, check_execution_feedback
@@ -109,11 +112,14 @@ async def main():
     print("  广告智能调控助手已启动")
     print("  广告智能调控助手已启动")
     print("=" * 50)
     print("=" * 50)
     print("请输入指令(输入 'exit' 退出):")
     print("请输入指令(输入 'exit' 退出):")
-    print("示例:")
-    print("  - 分析广告")
+    print("指令示例:")
+    print("  - 分析广告              → 全量分析")
+    print("  - 广告 XXXXX 降价10%    → 定向操作")
+    print("  - 广告 XXXXX 不要暂停   → 修改决策")
     print()
     print()
 
 
     step_count = 0
     step_count = 0
+    session_trace_id = None  # 会话级 trace_id,保持多轮对话记忆
 
 
     while True:
     while True:
         try:
         try:
@@ -125,7 +131,7 @@ async def main():
                 break
                 break
 
 
             messages = [{"role": "user", "content": user_input}]
             messages = [{"role": "user", "content": user_input}]
-            config.trace_id = None
+            config.trace_id = session_trace_id
 
 
             print(f"\n🚀 执行: {user_input}")
             print(f"\n🚀 执行: {user_input}")
             print("=" * 70)
             print("=" * 70)
@@ -137,10 +143,13 @@ async def main():
 
 
             async for item in runner.run(messages=messages, config=config):
             async for item in runner.run(messages=messages, config=config):
                 if isinstance(item, Trace):
                 if isinstance(item, Trace):
+                    if session_trace_id is None:
+                        session_trace_id = item.trace_id
                     if item.status == "completed":
                     if item.status == "completed":
                         print(f"\n✅ [Trace] 完成")
                         print(f"\n✅ [Trace] 完成")
                     elif item.status == "failed":
                     elif item.status == "failed":
                         print(f"\n❌ [Trace] 失败")
                         print(f"\n❌ [Trace] 失败")
+                        session_trace_id = None  # 失败后重置,下次开新会话
 
 
                 elif isinstance(item, Message):
                 elif isinstance(item, Message):
                     if item.role == "assistant" and item.content:
                     if item.role == "assistant" and item.content:
@@ -174,12 +183,24 @@ async def main():
                                 print(f"📌 步骤 {step_count}: 广告分类(A/B/C)")
                                 print(f"📌 步骤 {step_count}: 广告分类(A/B/C)")
                                 print(f"{'='*70}")
                                 print(f"{'='*70}")
 
 
+                            elif tool_name == "query_ad_detail":
+                                step_count += 1
+                                print(f"\n{'='*70}")
+                                print(f"📌 步骤 {step_count}: 查询广告详情")
+                                print(f"{'='*70}")
+
                             elif tool_name == "apply_decisions":
                             elif tool_name == "apply_decisions":
                                 step_count += 1
                                 step_count += 1
                                 print(f"\n{'='*70}")
                                 print(f"\n{'='*70}")
                                 print(f"📌 步骤 {step_count}: 保存智能引擎决策")
                                 print(f"📌 步骤 {step_count}: 保存智能引擎决策")
                                 print(f"{'='*70}")
                                 print(f"{'='*70}")
 
 
+                            elif tool_name == "modify_decisions":
+                                step_count += 1
+                                print(f"\n{'='*70}")
+                                print(f"📌 步骤 {step_count}: 修改已有决策")
+                                print(f"{'='*70}")
+
                             elif tool_name == "validate_decisions":
                             elif tool_name == "validate_decisions":
                                 step_count += 1
                                 step_count += 1
                                 print(f"\n{'='*70}")
                                 print(f"\n{'='*70}")

+ 11 - 2
examples/auto_put_ad_mini/skills/roi_strategy.md

@@ -16,10 +16,19 @@ category: ad_optimization
 - **动态ROI_7日均值**:相对指标,和全体均值比(工具会提供 distribution 分布)
 - **动态ROI_7日均值**:相对指标,和全体均值比(工具会提供 distribution 分布)
 - **cost_7d_avg**:消耗越高,数据越可信,决策越有把握
 - **cost_7d_avg**:消耗越高,数据越可信,决策越有把握
 - **ad_age_days**:< 4天绝对保护,4-7天谨慎操作,> 7天正常决策
 - **ad_age_days**:< 4天绝对保护,4-7天谨慎操作,> 7天正常决策
-- **bid_increased_7d / creative_changed_7d**:已干预但没好转 → 衰退信号
+- **bid_increased_7d / creative_changed_7d**:干预信号。提价后 ROI 短期下降是正常现象(消耗↑ → ROI↓),不应作为负面信号。只有 **已干预 + 消耗仍低**(广告不跑量)才是衰退信号
 - **bid_amount**:当前出价(元),出价调整的基准
 - **bid_amount**:当前出价(元),出价调整的基准
 - **bid_candidate**:规则引擎标注的出价调整方向(bid_up / bid_down / null),供参考
 - **bid_candidate**:规则引擎标注的出价调整方向(bid_up / bid_down / null),供参考
 
 
+### 干预信号 vs 衰退信号(重要区别)
+
+- **提价本身不是坏事**:提价 → 竞价力↑ → 消耗↑ → ROI = 收入/消耗 → 分母变大 → ROI 短期下降是正常的
+- **衰退信号的完整条件**(三个缺一不可):
+  1. 历史曾稳定消耗 ≥ 7 天
+  2. 近 7 天日均消耗 < 100 元
+  3. 已尝试提价或换创意(干预无效)
+- **提价后的正确评估**:看绝对收入是否增长,而非 ROI 是否下降
+
 ### 四动作判断逻辑
 ### 四动作判断逻辑
 
 
 | 场景 | ROI 范围 | 消耗水平 | 动作 | 调幅 | 置信度 |
 | 场景 | ROI 范围 | 消耗水平 | 动作 | 调幅 | 置信度 |
@@ -32,7 +41,7 @@ category: ad_optimization
 | 极低ROI | < 均值×0.5 | ≥ 100/天,≥ 7天 | pause | — | high |
 | 极低ROI | < 均值×0.5 | ≥ 100/天,≥ 7天 | pause | — | high |
 | 极低ROI | < 均值×0.5 | 200-500/天,7-14天 | pause | — | medium |
 | 极低ROI | < 均值×0.5 | 200-500/天,7-14天 | pause | — | medium |
 | 极低ROI | < 均值×0.5 | < 200/天 | hold | — | low |
 | 极低ROI | < 均值×0.5 | < 200/天 | hold | — | low |
-| 已提价+换创意,仍低迷 | any | 曾稳定现低 | pause(衰退) | — | high |
+| 已干预+消耗仍低 | any | 曾稳定消耗≥7天,现消耗<100元/天 | pause(衰退) | — | high |
 | 冷启动期 | any | any | hold | — | high |
 | 冷启动期 | any | any | hold | — | high |
 
 
 ### 出价调整决策矩阵
 ### 出价调整决策矩阵

+ 56 - 9
examples/auto_put_ad_mini/tools/ad_api.py

@@ -36,18 +36,63 @@ BASE_URL = os.getenv("TENCENT_AD_BASE_URL", "https://api.e.qq.com/v3.0")
 DEFAULT_ACCOUNT_ID = int(os.getenv("TENCENT_AD_ACCOUNT_ID", "0") or 0)
 DEFAULT_ACCOUNT_ID = int(os.getenv("TENCENT_AD_ACCOUNT_ID", "0") or 0)
 TIMEOUT = 30  # 秒
 TIMEOUT = 30  # 秒
 
 
+# Token 获取 API(内部服务,根据 accountId 返回最新 access_token)
+TOKEN_API_URL = os.getenv(
+    "TENCENT_AD_TOKEN_API",
+    "https://api.piaoquantv.com/ad/put/tencent/getAccessToken",
+)
 
 
-def _get_access_token() -> str:
-    token = os.getenv("TENCENT_AD_ACCESS_TOKEN", "")
-    if not token:
-        raise ValueError("未配置 TENCENT_AD_ACCESS_TOKEN 环境变量")
-    return token
+# Token 缓存:避免每次 API 调用都重新获取
+_token_cache: Dict[int, Dict[str, Any]] = {}
+_TOKEN_CACHE_TTL = 1800  # 缓存有效期 30 分钟
 
 
 
 
-def _common_params() -> Dict[str, str]:
+def _get_access_token(account_id: int = 0) -> str:
+    """动态获取 access_token。
+
+    优先通过内部 token API 获取(自动刷新),缓存 30 分钟。
+    若 token API 不可用,降级使用环境变量中的静态 token。
+    """
+    acct = account_id or DEFAULT_ACCOUNT_ID
+    if not acct:
+        raise ValueError("未配置 TENCENT_AD_ACCOUNT_ID 环境变量")
+
+    # 检查缓存是否有效
+    cached = _token_cache.get(acct)
+    if cached and time.time() - cached["ts"] < _TOKEN_CACHE_TTL:
+        return cached["token"]
+
+    # 尝试从 token API 动态获取
+    try:
+        resp = httpx.get(
+            TOKEN_API_URL,
+            params={"accountId": acct},
+            timeout=10,
+        )
+        resp.raise_for_status()
+        token = resp.text.strip()
+        if token and len(token) > 10:
+            _token_cache[acct] = {"token": token, "ts": time.time()}
+            logger.info("[TokenAPI] 已获取 account=%s 的 access_token (缓存30分钟)", acct)
+            return token
+        else:
+            logger.warning("[TokenAPI] 返回内容异常: %s,降级使用环境变量", token[:50])
+    except Exception as e:
+        logger.warning("[TokenAPI] 请求失败: %s,降级使用环境变量", e)
+
+    # 降级:使用环境变量中的静态 token
+    static_token = os.getenv("TENCENT_AD_ACCESS_TOKEN", "")
+    if not static_token:
+        raise ValueError(
+            f"Token API 请求失败且未配置 TENCENT_AD_ACCESS_TOKEN 环境变量 (account={acct})"
+        )
+    return static_token
+
+
+def _common_params(account_id: int = 0) -> Dict[str, str]:
     """公共查询参数:access_token / timestamp / nonce"""
     """公共查询参数:access_token / timestamp / nonce"""
     return {
     return {
-        "access_token": _get_access_token(),
+        "access_token": _get_access_token(account_id),
         "timestamp": str(int(time.time())),
         "timestamp": str(int(time.time())),
         "nonce": uuid.uuid4().hex,
         "nonce": uuid.uuid4().hex,
     }
     }
@@ -58,7 +103,8 @@ def _get(path: str, params: Dict[str, Any]) -> Dict[str, Any]:
     发送 GET 请求。
     发送 GET 请求。
     复杂对象(list/dict)自动 JSON 序列化后作为 query string 参数传递。
     复杂对象(list/dict)自动 JSON 序列化后作为 query string 参数传递。
     """
     """
-    query = dict(_common_params())
+    account_id = params.get("account_id", 0)
+    query = dict(_common_params(account_id))
     for k, v in params.items():
     for k, v in params.items():
         if v is None:
         if v is None:
             continue
             continue
@@ -80,7 +126,8 @@ def _post(path: str, body: Dict[str, Any]) -> Dict[str, Any]:
     发送 POST 请求。
     发送 POST 请求。
     公共参数在 URL query,业务参数在 JSON body。
     公共参数在 URL query,业务参数在 JSON body。
     """
     """
-    query = urlencode(_common_params())
+    account_id = body.get("account_id", 0)
+    query = urlencode(_common_params(account_id))
     url = f"{BASE_URL}{path}?{query}"
     url = f"{BASE_URL}{path}?{query}"
     logger.debug("[TencentAPI] POST %s body=%s", url, json.dumps(body, ensure_ascii=False)[:200])
     logger.debug("[TencentAPI] POST %s body=%s", url, json.dumps(body, ensure_ascii=False)[:200])
 
 

+ 357 - 0
examples/auto_put_ad_mini/tools/ad_decision.py

@@ -1101,3 +1101,360 @@ async def apply_decisions(
     except Exception as e:
     except Exception as e:
         logger.error("apply_decisions 失败: %s", e, exc_info=True)
         logger.error("apply_decisions 失败: %s", e, exc_info=True)
         return ToolResult(title="apply_decisions 失败", output=str(e))
         return ToolResult(title="apply_decisions 失败", output=str(e))
+
+
+# ═══════════════════════════════════════════
+# 智能引擎工具 3:查询单个广告详情(Mode 2 支撑)
+# ═══════════════════════════════════════════
+
+
+@tool(description="查询单个广告的当前指标和历史数据")
+async def query_ad_detail(
+    ctx: ToolContext,
+    ad_id: str,
+    metrics_csv: str = "",
+) -> ToolResult:
+    """
+    查询单个广告的当前指标 + 全局分布上下文(Mode 2 定向操作用)。
+
+    Args:
+        ctx: 工具上下文
+        ad_id: 广告 ID(字符串或数字均可)
+        metrics_csv: ROI 指标 CSV 路径(默认 outputs/metrics_temp.csv)
+
+    Returns:
+        ToolResult,包含该广告的详细指标和全局上下文
+    """
+    import json
+    import os
+
+    try:
+        if not metrics_csv:
+            metrics_csv = str(_MINI_DIR / "outputs" / "metrics_temp.csv")
+
+        metrics_path = Path(metrics_csv)
+        if not metrics_path.exists():
+            return ToolResult(
+                title="query_ad_detail 失败",
+                output=f"指标文件不存在: {metrics_csv},请先执行 calculate_roi_metrics",
+            )
+
+        # 检查数据新鲜度
+        file_mtime = os.path.getmtime(metrics_path)
+        age_hours = (datetime.now().timestamp() - file_mtime) / 3600
+        freshness_warning = ""
+        if age_hours > 24:
+            freshness_warning = f"⚠️ 数据已过期({age_hours:.1f}小时前更新),建议先执行 fetch_creative_data + calculate_roi_metrics 刷新数据。\n\n"
+
+        df = pd.read_csv(metrics_csv)
+
+        # 查找目标广告
+        ad_id_int = int(ad_id)
+        ad_row = df[df["ad_id"] == ad_id_int]
+
+        if ad_row.empty:
+            return ToolResult(
+                title="query_ad_detail",
+                output=f"{freshness_warning}未找到广告 {ad_id},共有 {len(df)} 个广告",
+            )
+
+        row = ad_row.iloc[0]
+
+        # 计算广告年龄
+        ad_age_days = _calculate_ad_age_days(row.get("create_time"))
+
+        # 全局 ROI 分布
+        roi_series = df["动态ROI_7日均值"].dropna()
+        roi_mean = float(roi_series.mean()) if len(roi_series) > 0 else 0.0
+        cost_series = df["cost_7d_avg"].dropna()
+        cost_median = float(cost_series.median()) if len(cost_series) > 0 else 0.0
+
+        roi_low_line = roi_mean * ROI_LOW_FACTOR if "ROI_LOW_FACTOR" in dir() else roi_mean * 0.5
+        bid_down_line = roi_mean * BID_DOWN_ROI_FACTOR
+        bid_up_line = roi_mean * BID_UP_ROI_FACTOR
+
+        # 构建广告详情
+        f_roi = row.get("动态ROI_7日均值")
+        ad_detail = {
+            "ad_id": ad_id_int,
+            "ad_name": str(row.get("ad_name", "")),
+            "bid_amount": round(float(row.get("bid_amount", 0) or 0), 2),
+            "动态ROI_7日均值": round(float(f_roi), 4) if not pd.isna(f_roi) else None,
+            "cost_7d_avg": round(float(row.get("cost_7d_avg", 0) or 0), 2),
+            "cost_7d_total": round(float(row.get("cost_7d_total", 0) or 0), 2),
+            "ad_age_days": ad_age_days,
+            "configured_status": str(row.get("configured_status", "")),
+        }
+
+        # 检测干预信号
+        try:
+            end_date = (datetime.now() - timedelta(days=1)).strftime("%Y%m%d")
+            raw_dir = _MINI_DIR / "outputs" / "raw"
+            ad_status_dir = _MINI_DIR / "outputs" / "ad_status"
+            decay_signals = _detect_decay_signals(
+                ad_ids=[ad_id_int],
+                raw_dir=raw_dir,
+                ad_status_dir=ad_status_dir,
+                end_date=end_date,
+            )
+            if not decay_signals.empty:
+                ds_row = decay_signals.iloc[0]
+                ad_detail["bid_increased_7d"] = bool(ds_row.get("bid_increased_7d", False))
+                ad_detail["creative_changed_7d"] = bool(ds_row.get("creative_changed_7d", False))
+        except Exception as e:
+            logger.warning("检测干预信号失败: %s", e)
+
+        # 全局上下文
+        global_context = {
+            "全体动态ROI均值": round(roi_mean, 4),
+            "ROI关停线": round(roi_mean * 0.5, 4),
+            "ROI降价线": round(bid_down_line, 4),
+            "ROI提价线": round(bid_up_line, 4),
+            "全体消耗中位数": round(cost_median, 2),
+        }
+
+        result = {
+            "ad_detail": ad_detail,
+            "global_context": global_context,
+        }
+
+        output = freshness_warning + json.dumps(result, ensure_ascii=False, indent=2)
+
+        return ToolResult(
+            title=f"广告 {ad_id} 详情",
+            output=output,
+            metadata=result,
+        )
+
+    except Exception as e:
+        logger.error("query_ad_detail 失败: %s", e, exc_info=True)
+        return ToolResult(title="query_ad_detail 失败", output=str(e))
+
+
+# ═══════════════════════════════════════════
+# 智能引擎工具 4:修改已有决策(Mode 3 支撑)
+# ═══════════════════════════════════════════
+
+
+@tool(description="修改已有决策:修改指定广告的操作或调幅,也可新增决策")
+async def modify_decisions(
+    ctx: ToolContext,
+    modifications: str,
+    decisions_csv: str = "",
+    end_date: str = "yesterday",
+) -> ToolResult:
+    """
+    修改已有 llm_decisions_{date}.csv 中的决策(Mode 3 反馈修改用)。
+
+    支持两种修改方式:
+    1. 按 ad_id 精确修改/新增(upsert):
+       [{"ad_id": "90289631207", "new_action": "bid_down", "new_change_pct": -0.05}]
+    2. 按过滤器批量修改:
+       [{"filter": "all_bid_down", "new_change_pct": -0.03}]
+       支持: all_pause / all_bid_down / all_bid_up / all_llm
+
+    Args:
+        ctx: 工具上下文
+        modifications: JSON 字符串,修改列表
+        decisions_csv: 决策 CSV 路径(默认自动查找最新)
+        end_date: 结束日期(用于查找默认 CSV)
+
+    Returns:
+        ToolResult,包含修改日志和新的 action 分布
+    """
+    import json
+    import glob as glob_mod
+
+    try:
+        if end_date == "yesterday":
+            end_date = (datetime.now() - timedelta(days=1)).strftime("%Y%m%d")
+
+        # 解析修改列表
+        try:
+            mod_list = json.loads(modifications)
+        except json.JSONDecodeError as e:
+            return ToolResult(title="modify_decisions 失败", output=f"modifications 不是合法 JSON: {e}")
+
+        if not isinstance(mod_list, list):
+            return ToolResult(title="modify_decisions 失败", output="modifications 必须是 JSON 数组")
+
+        # 定位决策 CSV
+        if not decisions_csv:
+            reports_dir = _MINI_DIR / "outputs" / "reports"
+            # 先找当天的,再找最新的
+            target_path = reports_dir / f"llm_decisions_{end_date}.csv"
+            if target_path.exists():
+                decisions_csv = str(target_path)
+            else:
+                # 查找最新的 llm_decisions_*.csv
+                pattern = str(reports_dir / "llm_decisions_*.csv")
+                files = sorted(glob_mod.glob(pattern), reverse=True)
+                if files:
+                    decisions_csv = files[0]
+                else:
+                    return ToolResult(
+                        title="modify_decisions 失败",
+                        output="未找到任何已有决策文件(llm_decisions_*.csv),请先执行全量分析",
+                    )
+
+        decisions_path = Path(decisions_csv)
+        if not decisions_path.exists():
+            return ToolResult(title="modify_decisions 失败", output=f"决策文件不存在: {decisions_csv}")
+
+        df = pd.read_csv(decisions_csv)
+        if df.empty:
+            return ToolResult(title="modify_decisions 失败", output="决策文件为空")
+
+        # 加载 metrics 获取 bid_amount
+        metrics_csv_path = str(_MINI_DIR / "outputs" / "metrics_temp.csv")
+        bid_map = {}
+        try:
+            df_metrics = pd.read_csv(metrics_csv_path)
+            bid_map = dict(zip(df_metrics["ad_id"].astype(int), df_metrics["bid_amount"].fillna(0)))
+        except Exception as e:
+            logger.warning("加载 metrics 获取 bid_amount 失败: %s", e)
+
+        change_log = []
+        new_rows = []
+
+        for mod in mod_list:
+            if "filter" in mod:
+                # 批量修改
+                filter_type = mod["filter"]
+                filter_map = {
+                    "all_pause": "pause",
+                    "all_bid_down": "bid_down",
+                    "all_bid_up": "bid_up",
+                    "all_llm": None,  # 所有 LLM 决策
+                }
+                if filter_type not in filter_map:
+                    change_log.append(f"⚠️ 未知 filter: {filter_type},跳过")
+                    continue
+
+                target_action = filter_map[filter_type]
+                if target_action:
+                    mask = df["action"] == target_action
+                else:
+                    mask = df["source"] == "llm"
+
+                matched = mask.sum()
+                if matched == 0:
+                    change_log.append(f"filter={filter_type}: 无匹配行")
+                    continue
+
+                # 应用修改
+                if "new_action" in mod:
+                    df.loc[mask, "action"] = mod["new_action"]
+                if "new_change_pct" in mod:
+                    df.loc[mask, "recommended_change_pct"] = mod["new_change_pct"]
+                    # 重算 recommended_bid
+                    for idx in df[mask].index:
+                        ad_id_val = int(df.at[idx, "ad_id"])
+                        bid = bid_map.get(ad_id_val, 0)
+                        if bid > 0:
+                            new_bid = round(bid * (1 + mod["new_change_pct"]), 2)
+                            new_bid = max(new_bid, BID_FLOOR_YUAN)
+                            new_bid = min(new_bid, BID_CEILING_YUAN)
+                            df.at[idx, "recommended_bid"] = new_bid
+                            df.at[idx, "current_bid"] = round(bid, 2)
+                if "new_dimension" in mod:
+                    df.loc[mask, "dimension"] = mod["new_dimension"]
+                if "new_reason" in mod:
+                    df.loc[mask, "reason"] = mod["new_reason"]
+
+                df.loc[mask, "source"] = "llm_modified"
+                change_log.append(f"filter={filter_type}: 修改 {matched} 行")
+
+            elif "ad_id" in mod:
+                # 精确修改/新增(upsert)
+                target_id = int(mod["ad_id"])
+                mask = df["ad_id"] == target_id
+
+                if mask.any():
+                    # 修改已有行
+                    if "new_action" in mod:
+                        old_action = df.loc[mask, "action"].iloc[0]
+                        df.loc[mask, "action"] = mod["new_action"]
+                        change_log.append(f"ad_id={target_id}: action {old_action} → {mod['new_action']}")
+                    if "new_change_pct" in mod:
+                        df.loc[mask, "recommended_change_pct"] = mod["new_change_pct"]
+                        bid = bid_map.get(target_id, 0)
+                        if bid > 0:
+                            new_bid = round(bid * (1 + mod["new_change_pct"]), 2)
+                            new_bid = max(new_bid, BID_FLOOR_YUAN)
+                            new_bid = min(new_bid, BID_CEILING_YUAN)
+                            df.loc[mask, "recommended_bid"] = new_bid
+                            df.loc[mask, "current_bid"] = round(bid, 2)
+                        change_log.append(f"ad_id={target_id}: change_pct → {mod['new_change_pct']}")
+                    if "new_dimension" in mod:
+                        df.loc[mask, "dimension"] = mod["new_dimension"]
+                    if "new_reason" in mod:
+                        df.loc[mask, "reason"] = mod["new_reason"]
+                    df.loc[mask, "source"] = "llm_modified"
+                else:
+                    # 新增行
+                    new_action = mod.get("new_action", "hold")
+                    change_pct = mod.get("new_change_pct")
+                    bid = bid_map.get(target_id, 0)
+                    new_bid = None
+                    if change_pct is not None and bid > 0:
+                        new_bid = round(bid * (1 + change_pct), 2)
+                        new_bid = max(new_bid, BID_FLOOR_YUAN)
+                        new_bid = min(new_bid, BID_CEILING_YUAN)
+
+                    new_row = {
+                        "ad_id": target_id,
+                        "action": new_action,
+                        "dimension": mod.get("new_dimension", "用户指定"),
+                        "reason": mod.get("new_reason", "用户定向操作"),
+                        "confidence": mod.get("confidence", "high"),
+                        "source": "llm_modified",
+                        "recommended_change_pct": change_pct,
+                        "current_bid": round(bid, 2) if bid > 0 else None,
+                        "recommended_bid": new_bid,
+                    }
+                    new_rows.append(new_row)
+                    change_log.append(f"ad_id={target_id}: 新增 action={new_action}")
+            else:
+                change_log.append(f"⚠️ 修改项缺少 ad_id 或 filter,跳过: {mod}")
+
+        # 合并新增行
+        if new_rows:
+            df = pd.concat([df, pd.DataFrame(new_rows)], ignore_index=True)
+
+        # 保存(覆盖原文件)
+        df.to_csv(decisions_csv, index=False, encoding="utf-8-sig")
+
+        # 统计新的 action 分布
+        action_dist = df["action"].value_counts().to_dict()
+
+        output_parts = [
+            f"决策已修改并保存: {decisions_csv}",
+            "",
+            "修改日志:",
+        ]
+        for log in change_log:
+            output_parts.append(f"  {log}")
+
+        output_parts.extend([
+            "",
+            "当前 action 分布:",
+        ])
+        for action, count in action_dist.items():
+            output_parts.append(f"  {action}: {count} 个")
+        output_parts.append(f"  总计: {len(df)} 个")
+
+        return ToolResult(
+            title=f"决策修改完成({len(change_log)}项变更)",
+            output="\n".join(output_parts),
+            metadata={
+                "csv_path": str(decisions_csv),
+                "changes": len(change_log),
+                "action_distribution": action_dist,
+                "total": len(df),
+            },
+        )
+
+    except Exception as e:
+        logger.error("modify_decisions 失败: %s", e, exc_info=True)
+        return ToolResult(title="modify_decisions 失败", output=str(e))

+ 66 - 61
examples/auto_put_ad_mini/tools/im_approval.py

@@ -152,11 +152,12 @@ def _format_approval_message(df_tier2: pd.DataFrame, df_tier1: pd.DataFrame, req
     # 回复指令
     # 回复指令
     lines.extend([
     lines.extend([
         "-" * 40,
         "-" * 40,
-        "📝 回复指令:",
-        "  approve        — 全部批准",
-        "  approve 12345,23456  — 仅批准指定广告",
-        "  reject         — 全部拒绝",
-        "  reject 12345   — 拒绝指定广告",
+        "📝 直接回复即可,示例:",
+        "  \"批准\" / \"通过\"          — 全部批准",
+        "  \"拒绝\" / \"不行\"          — 全部拒绝",
+        "  \"广告 12345 不要暂停\"     — 修改指定广告",
+        "  \"只批准降价的\"            — 部分批准",
+        "  \"降幅改小一点\"            — 调整后重新审批",
         f"  ⏰ 超时时间: {IM_APPROVAL_TIMEOUT_MINUTES} 分钟",
         f"  ⏰ 超时时间: {IM_APPROVAL_TIMEOUT_MINUTES} 分钟",
         "",
         "",
         "📎 决策详情请查看附件 Excel 表格",
         "📎 决策详情请查看附件 Excel 表格",
@@ -171,35 +172,20 @@ def _format_approval_message(df_tier2: pd.DataFrame, df_tier1: pd.DataFrame, req
 
 
 
 
 def _parse_approval_reply(content: str, all_ad_ids: List[int]) -> Dict:
 def _parse_approval_reply(content: str, all_ad_ids: List[int]) -> Dict:
-    content = content.strip().lower()
-
-    if content.startswith("approve"):
-        parts = content.replace("approve", "").strip()
-        if not parts:
-            return {"status": "approved", "approved_ids": all_ad_ids, "rejected_ids": []}
-        else:
-            ids = [int(x.strip()) for x in parts.split(",") if x.strip().isdigit()]
-            rejected = [aid for aid in all_ad_ids if aid not in ids]
-            return {
-                "status": "partially_approved" if rejected else "approved",
-                "approved_ids": ids,
-                "rejected_ids": rejected,
-            }
-
-    elif content.startswith("reject"):
-        parts = content.replace("reject", "").strip()
-        if not parts:
-            return {"status": "rejected", "approved_ids": [], "rejected_ids": all_ad_ids}
-        else:
-            rejected_ids = [int(x.strip()) for x in parts.split(",") if x.strip().isdigit()]
-            approved = [aid for aid in all_ad_ids if aid not in rejected_ids]
-            return {
-                "status": "partially_approved" if approved else "rejected",
-                "approved_ids": approved,
-                "rejected_ids": rejected_ids,
-            }
-
-    return {"status": "unknown", "approved_ids": [], "rejected_ids": all_ad_ids, "raw_reply": content}
+    """将运营的自然语言回复原样返回给 Agent 解读。
+
+    不做硬解析 — 运营可能用任何自然语言表达审批意见(中文、英文、混合),
+    由 Agent(LLM)负责理解语义并决定后续动作。
+    """
+    content = content.strip()
+    if not content:
+        return {"status": "unknown", "raw_reply": ""}
+
+    return {
+        "status": "replied",
+        "raw_reply": content,
+        "ad_ids": all_ad_ids,
+    }
 
 
 
 
 # ═══════════════════════════════════════════
 # ═══════════════════════════════════════════
@@ -367,6 +353,11 @@ async def send_approval_request(
                         sender_id = msg.get("sender_id", "")
                         sender_id = msg.get("sender_id", "")
                         sender_type = msg.get("sender_type", "")
                         sender_type = msg.get("sender_type", "")
 
 
+                        logger.debug(
+                            "飞书消息: sender_type=%s, sender_id=%s, content=%s",
+                            sender_type, sender_id, str(msg.get("content", ""))[:100],
+                        )
+
                         # 只看指定运营的用户消息(非机器人)
                         # 只看指定运营的用户消息(非机器人)
                         if sender_type != "user" or sender_id != FEISHU_OPERATOR_OPEN_ID:
                         if sender_type != "user" or sender_id != FEISHU_OPERATOR_OPEN_ID:
                             continue
                             continue
@@ -376,39 +367,41 @@ async def send_approval_request(
                         if not text.strip():
                         if not text.strip():
                             continue
                             continue
 
 
-                        # 解析审批回复
+                        # 检测到运营回复,返回原文给 Agent 理解
                         parsed = _parse_approval_reply(text, tier2_ad_ids)
                         parsed = _parse_approval_reply(text, tier2_ad_ids)
                         if parsed["status"] != "unknown":
                         if parsed["status"] != "unknown":
                             _approval_requests[request_id].update({
                             _approval_requests[request_id].update({
-                                "status": parsed["status"],
-                                "approved_ids": parsed.get("approved_ids", []),
-                                "rejected_ids": parsed.get("rejected_ids", []),
+                                "status": "replied",
                                 "reply_content": text,
                                 "reply_content": text,
                                 "reply_at": datetime.now().isoformat(),
                                 "reply_at": datetime.now().isoformat(),
+                                "ad_ids": tier2_ad_ids,
                             })
                             })
 
 
-                            logger.info(
-                                "飞书审批回复: %s(批准 %d / 拒绝 %d)",
-                                parsed["status"],
-                                len(parsed.get("approved_ids", [])),
-                                len(parsed.get("rejected_ids", [])),
-                            )
+                            logger.info("飞书审批收到运营回复: %s", text[:200])
+
+                            ad_ids_str = ", ".join(str(x) for x in tier2_ad_ids[:10])
+                            if len(tier2_ad_ids) > 10:
+                                ad_ids_str += f"...共{len(tier2_ad_ids)}个"
 
 
                             return ToolResult(
                             return ToolResult(
-                                title=f"审批结果: {parsed['status']}",
+                                title="运营已回复",
                                 output=(
                                 output=(
-                                    f"运营飞书回复: {text}\n"
-                                    f"状态: {parsed['status']}\n"
-                                    f"批准: {len(parsed.get('approved_ids', []))} 个广告\n"
-                                    f"拒绝: {len(parsed.get('rejected_ids', []))} 个广告\n"
-                                    f"等待时间: {poll_count * poll_interval_seconds} 秒"
+                                    f"运营飞书回复原文: {text}\n"
+                                    f"等待审批的广告ID: {ad_ids_str}\n"
+                                    f"等待时间: {poll_count * poll_interval_seconds} 秒\n\n"
+                                    f"请根据运营的自然语言回复判断后续操作:\n"
+                                    f"- 运营同意/批准/通过 → 调用 execute_decisions 执行\n"
+                                    f"- 运营拒绝/驳回 → 停止执行,告知原因\n"
+                                    f"- 运营要求修改(如\"广告X不要暂停\")→ 进入 Mode 3: modify_decisions → validate → 重新审批\n"
+                                    f"- 运营部分批准(如\"只批准降价的\")→ 相应过滤后 execute_decisions"
                                 ),
                                 ),
                                 metadata={
                                 metadata={
                                     "request_id": request_id,
                                     "request_id": request_id,
                                     "feishu_sent": feishu_sent,
                                     "feishu_sent": feishu_sent,
                                     "msg_path": str(msg_path),
                                     "msg_path": str(msg_path),
                                     "poll_count": poll_count,
                                     "poll_count": poll_count,
-                                    **parsed,
+                                    "raw_reply": text,
+                                    "ad_ids": tier2_ad_ids,
                                 },
                                 },
                             )
                             )
             except Exception as e:
             except Exception as e:
@@ -497,32 +490,44 @@ async def check_approval_status(
                     sender_id = msg.get("sender_id", "")
                     sender_id = msg.get("sender_id", "")
                     sender_type = msg.get("sender_type", "")
                     sender_type = msg.get("sender_type", "")
 
 
+                    logger.debug(
+                        "飞书消息(check): sender_type=%s, sender_id=%s, content=%s",
+                        sender_type, sender_id, str(msg.get("content", ""))[:100],
+                    )
+
                     if sender_type != "user" or sender_id != FEISHU_OPERATOR_OPEN_ID:
                     if sender_type != "user" or sender_id != FEISHU_OPERATOR_OPEN_ID:
                         continue
                         continue
 
 
-                    # 框架已自动解析 text 消息的 JSON -> 纯文本
                     text = msg.get("content", "")
                     text = msg.get("content", "")
                     if not text.strip():
                     if not text.strip():
                         continue
                         continue
 
 
+                    # 检测到运营回复,返回原文给 Agent 理解
                     parsed = _parse_approval_reply(text, request["ad_ids"])
                     parsed = _parse_approval_reply(text, request["ad_ids"])
                     if parsed["status"] != "unknown":
                     if parsed["status"] != "unknown":
                         request.update({
                         request.update({
-                            "status": parsed["status"],
-                            "approved_ids": parsed.get("approved_ids", []),
-                            "rejected_ids": parsed.get("rejected_ids", []),
+                            "status": "replied",
                             "reply_content": text,
                             "reply_content": text,
                             "reply_at": datetime.now().isoformat(),
                             "reply_at": datetime.now().isoformat(),
                         })
                         })
+
+                        ad_ids = request["ad_ids"]
+                        ad_ids_str = ", ".join(str(x) for x in ad_ids[:10])
+                        if len(ad_ids) > 10:
+                            ad_ids_str += f"...共{len(ad_ids)}个"
+
                         return ToolResult(
                         return ToolResult(
-                            title=f"审批结果: {parsed['status']}",
+                            title="运营已回复",
                             output=(
                             output=(
-                                f"运营飞书回复: {text}\n"
-                                f"状态: {parsed['status']}\n"
-                                f"批准: {len(parsed.get('approved_ids', []))} 个\n"
-                                f"拒绝: {len(parsed.get('rejected_ids', []))} 个"
+                                f"运营飞书回复原文: {text}\n"
+                                f"等待审批的广告ID: {ad_ids_str}\n\n"
+                                f"请根据运营的自然语言回复判断后续操作:\n"
+                                f"- 运营同意/批准/通过 → 调用 execute_decisions 执行\n"
+                                f"- 运营拒绝/驳回 → 停止执行,告知原因\n"
+                                f"- 运营要求修改 → 进入 Mode 3: modify_decisions → validate → 重新审批\n"
+                                f"- 运营部分批准 → 相应过滤后 execute_decisions"
                             ),
                             ),
-                            metadata={"request_id": request_id, **parsed},
+                            metadata={"request_id": request_id, "raw_reply": text, "ad_ids": ad_ids},
                         )
                         )
         except Exception as e:
         except Exception as e:
             logger.debug("飞书读消息失败: %s", e)
             logger.debug("飞书读消息失败: %s", e)