Просмотр исходного кода

fix(ad_decision): 完善年龄保护逻辑,修复重复列问题,新增ODPS模块

三个关键修复:

1. **年龄保护强制清除负向标志** (ad_decision.py)
   - 问题:早期成长期(4-7天)广告可能同时有bid_up和bid_down候选标志
   - 解决:在年龄保护阶段强制清除所有负向标志(roi_low, decay_signal, bid_down_candidate)
   - 位置:第996-1027行

2. **零消耗规则增加年龄保护** (ad_decision.py)
   - 问题:零消耗强规则无视广告年龄,导致4-7天低消耗广告被关停
   - 解决:零消耗检查中加入年龄判断,≤7天的广告不适用零消耗规则
   - 位置:第941-960行 和 第1230-1249行

3. **飞书审批重复列修复** (im_approval.py)
   - 问题:validated_decisions已有cost_7d_avg等字段,再次合并导致重复列错误
   - 解决:合并前检查已存在的列,只合并缺失的字段
   - 位置:第405-430行

4. **新增ODPS数据拉取模块** (odps_module.py)
   - 功能:封装MaxCompute数据访问,支持自动拉取腾讯广告数据
   - 配置:AccessKey + Project + Endpoint
   - 用途:替代本地CSV文件,获取最新广告数据

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
刘立冬 3 недель назад
Родитель
Сommit
5f898bc7bc

+ 56 - 23
examples/auto_put_ad_mini/tools/ad_decision.py

@@ -939,13 +939,25 @@ async def get_ads_for_review(
             bid_amount = float(row.get("bid_amount", 0) or 0)
 
             # 零消耗待关停:7日均消耗 < 10元,几乎无活动(强规则,仍保留)
+            # ⚠️ 但需要年龄保护:≤7天的广告不适用零消耗规则
             if cost_7d_avg < min_spend_for_class_a:
-                zero_spend_ads.append({
-                    "ad_id": int(row["ad_id"]),
-                    "ad_name": str(row.get("ad_name", "")),
-                    "cost_7d_avg": round(cost_7d_avg, 2),
-                })
-                continue
+                # 检查广告年龄
+                if ad_age is not None and ad_age <= EARLY_GROWTH_DAYS:
+                    # 4-7天(早期成长期)或≤3天(冷启动期):保护,不关停
+                    normal_ads_count += 1
+                    logger.debug(
+                        f"广告 {row['ad_id']} 年龄{ad_age}天≤{EARLY_GROWTH_DAYS}天,"
+                        f"虽消耗低({cost_7d_avg:.2f}元),但年龄保护不关停"
+                    )
+                    continue
+                else:
+                    # >7天的低消耗广告:正常应用零消耗规则
+                    zero_spend_ads.append({
+                        "ad_id": int(row["ad_id"]),
+                        "ad_name": str(row.get("ad_name", "")),
+                        "cost_7d_avg": round(cost_7d_avg, 2),
+                    })
+                    continue
 
             # 待优化评估:ROI 偏低 或 衰退信号 或 出价调整候选(需要智能判断)
             roi_low = (not pd.isna(f_roi)) and (f_roi < roi_mean * roi_review_factor)
@@ -994,24 +1006,33 @@ async def get_ads_for_review(
                     age_protected_skip = True
 
                 # 早期成长期(4-7天):仅允许提价和扩量评估
-                # ⚠️ 关键修复:完全阻断非提价/扩量候选,无论何种候选标志
+                # ⚠️ 核心修复:强制清除所有负向候选标志,无论是否有提价标志
                 elif ad_age <= EARLY_GROWTH_DAYS:
-                    # 只有提价候选或扩量候选才允许进入LLM评估
-                    # 其他所有候选标志(roi_low, decay_signal, bid_down_candidate)都被排除
-                    if not (bid_up_candidate or scale_up_candidate):
-                        # 检查是否有任何候选标志(即使不是提价/扩量)
-                        has_any_candidate = roi_low or decay_signal or bid_down_candidate
-                        if has_any_candidate:
-                            # 有候选标志但不是提价/扩量 → 直接排除
-                            normal_ads_count += 1
-                            logger.debug(
-                                f"广告 {row['ad_id']} 处于早期成长期({ad_age}天,4-{EARLY_GROWTH_DAYS}天),"
-                                f"年龄保护规则:仅允许提价/扩量评估,其他候选已排除"
-                                f"(roi_low={roi_low}, decay={decay_signal}, bid_down={bid_down_candidate})"
-                            )
-                            age_protected_skip = True
-                        # else: 无任何候选标志,正常计入normal_ads_count
-                    # else: 是提价或扩量候选,允许进入评估
+                    # 检查原始候选状态(用于日志)
+                    has_negative_flags = roi_low or decay_signal or bid_down_candidate
+                    has_positive_flags = bid_up_candidate or scale_up_candidate
+
+                    # 强制清除负向候选标志(即使同时有提价标志)
+                    if has_negative_flags:
+                        logger.debug(
+                            f"广告 {row['ad_id']} 处于早期成长期({ad_age}天),"
+                            f"年龄保护强制清除负向候选标志:"
+                            f"roi_low={roi_low}→False, decay={decay_signal}→False, "
+                            f"bid_down={bid_down_candidate}→False"
+                        )
+                        roi_low = False
+                        decay_signal = False
+                        bid_down_candidate = False
+
+                    # 如果清除后没有任何候选标志 → 排除
+                    if not has_positive_flags:
+                        normal_ads_count += 1
+                        logger.debug(
+                            f"广告 {row['ad_id']} 处于早期成长期({ad_age}天),"
+                            f"无提价/扩量候选标志,已排除"
+                        )
+                        age_protected_skip = True
+                    # else: 有提价或扩量候选,允许进入评估(负向标志已清除)
 
             # 年龄保护排除的广告,直接跳过
             if age_protected_skip:
@@ -1210,7 +1231,19 @@ async def apply_decisions(
             df_metrics = pd.read_csv(metrics_csv)
             for _, row in df_metrics.iterrows():
                 cost_7d_avg = float(row.get("cost_7d_avg", 0) or 0)
+                ad_age = row.get("ad_age_days")  # 获取广告年龄
+
+                # ⚠️ 年龄保护:≤7天的广告不适用零消耗规则
                 if cost_7d_avg < 10.0:
+                    # 检查广告年龄
+                    if ad_age is not None and ad_age <= EARLY_GROWTH_DAYS:
+                        # 4-7天或≤3天:保护,跳过
+                        logger.debug(
+                            f"零消耗规则跳过广告 {row['ad_id']}:年龄{ad_age}天≤{EARLY_GROWTH_DAYS}天"
+                        )
+                        continue
+
+                    # >7天的低消耗广告:正常应用零消耗规则
                     # 优化reason表达:避免"0.00元"显示,改用"几乎无消耗"
                     if cost_7d_avg == 0:
                         reason_text = "7日几乎无消耗,长期无活动"

+ 22 - 11
examples/auto_put_ad_mini/tools/im_approval.py

@@ -405,22 +405,33 @@ async def send_approval_request(
         if metrics_path.exists():
             df_metrics = pd.read_csv(metrics_path)
             # 选择需要的列(避免重复列,使用实际列名)
-            metrics_cols = [
+            metrics_cols_needed = [
                 "ad_id", "account_id", "ad_name",
                 "cost_7d_avg", "cost_7d_total", "revenue_7d_total",
                 "动态ROI_7日均值", "bid_amount"
             ]
-            # 只保留存在的列
-            metrics_cols = [c for c in metrics_cols if c in df_metrics.columns]
-            df_metrics_sub = df_metrics[metrics_cols].copy()
 
-            # 从 ad_name 中提取 audience_tier(如 "R500_xxx" → "R500")
-            if "ad_name" in df_metrics_sub.columns:
-                df_metrics_sub["audience_tier"] = df_metrics_sub["ad_name"].str.extract(r"^(R\d+)")[0]
-
-            # 左连接:保留 df 的所有行,补充 metrics 数据
-            df = df.merge(df_metrics_sub, on="ad_id", how="left", suffixes=("", "_metrics"))
-            logger.info(f"已从 metrics 补充 {len(metrics_cols)} 列数据")
+            # ===== 关键修复:只合并不存在的列,避免重复 =====
+            # 检查哪些列已经存在于 df 中(从 ad_decision.py 已合并)
+            existing_cols = set(df.columns)
+            cols_to_merge = ["ad_id"]  # ad_id 必须保留用于连接
+            for col in metrics_cols_needed:
+                if col not in existing_cols and col in df_metrics.columns:
+                    cols_to_merge.append(col)
+
+            if len(cols_to_merge) > 1:  # 除了ad_id还有其他列需要合并
+                df_metrics_sub = df_metrics[cols_to_merge].copy()
+
+                # 从 ad_name 中提取 audience_tier(如 "R500_xxx" → "R500")
+                if "ad_name" in cols_to_merge and "audience_tier" not in existing_cols:
+                    df_metrics_sub["audience_tier"] = df_metrics_sub["ad_name"].str.extract(r"^(R\d+)")[0]
+                    cols_to_merge.append("audience_tier")
+
+                # 左连接:只合并缺失的列
+                df = df.merge(df_metrics_sub, on="ad_id", how="left")
+                logger.info(f"已从 metrics 补充 {len(cols_to_merge)-1} 列数据(跳过已存在列)")
+            else:
+                logger.info("metrics 字段已在 validated_decisions 中存在,无需重复合并")
         else:
             logger.warning("未找到 metrics 文件,审批表格将缺少关键字段")
 

+ 175 - 0
examples/auto_put_ad_mini/tools/odps_module.py

@@ -0,0 +1,175 @@
+"""
+ODPS(MaxCompute)数据拉取模块
+"""
+import logging
+from typing import Optional, List, Dict
+from datetime import datetime, timedelta
+
+logger = logging.getLogger(__name__)
+
+class ODPSClient:
+    """ODPS客户端封装"""
+
+    def __init__(self, project="loghubods"):
+        """初始化ODPS客户端
+
+        Args:
+            project: ODPS项目名称
+        """
+        self.accessId = "LTAIWYUujJAm7CbH"
+        self.accessSecret = "RfSjdiWwED1sGFlsjXv0DlfTnZTG1P"
+        self.endpoint = "http://service.odps.aliyun.com/api"
+        self.tunnelUrl = "http://dt.cn-hangzhou.maxcompute.aliyun-inc.com"
+        self.project = project
+
+        self._odps = None
+        self._initialized = False
+
+    def _ensure_initialized(self):
+        """确保ODPS客户端已初始化"""
+        if self._initialized:
+            return True
+
+        try:
+            from odps import ODPS
+            self._odps = ODPS(
+                access_id=self.accessId,
+                secret_access_key=self.accessSecret,
+                project=self.project,
+                endpoint=self.endpoint,
+                tunnel_endpoint=self.tunnelUrl
+            )
+            self._initialized = True
+            logger.info(f"ODPS客户端初始化成功,项目: {self.project}")
+            return True
+        except ImportError:
+            logger.error("ODPS SDK 未安装,请运行: pip install pyodps")
+            return False
+        except Exception as e:
+            logger.error(f"ODPS客户端初始化失败: {e}")
+            return False
+
+    def query(self, sql: str, limit: Optional[int] = None) -> List[Dict]:
+        """执行SQL查询
+
+        Args:
+            sql: SQL查询语句
+            limit: 最大返回行数
+
+        Returns:
+            查询结果列表(每行为字典)
+        """
+        if not self._ensure_initialized():
+            logger.warning("ODPS客户端未初始化,无法执行查询")
+            return []
+
+        try:
+            logger.info(f"执行ODPS查询: {sql[:100]}...")
+            instance = self._odps.execute_sql(sql)
+            instance.wait_for_success()
+
+            with instance.open_reader() as reader:
+                results = []
+                for i, record in enumerate(reader):
+                    if limit and i >= limit:
+                        break
+                    # 将Record对象转换为字典
+                    row_dict = {col: record[i] for i, col in enumerate(reader._schema.names)}
+                    results.append(row_dict)
+
+                logger.info(f"查询完成,返回 {len(results)} 行")
+                return results
+
+        except Exception as e:
+            logger.error(f"ODPS查询失败: {e}")
+            return []
+
+    def fetch_creative_data(
+        self,
+        start_date: str,
+        end_date: str,
+        account_ids: Optional[List[int]] = None
+    ) -> List[Dict]:
+        """从ODPS拉取创意数据
+
+        Args:
+            start_date: 开始日期 (YYYYMMDD)
+            end_date: 结束日期 (YYYYMMDD)
+            account_ids: 账户ID列表,为None时拉取所有账户
+
+        Returns:
+            创意数据列表
+        """
+        # 构建WHERE条件
+        where_clauses = [f"dt BETWEEN '{start_date}' AND '{end_date}'"]
+        if account_ids:
+            account_ids_str = ",".join(map(str, account_ids))
+            where_clauses.append(f"account_id IN ({account_ids_str})")
+
+        where_clause = " AND ".join(where_clauses)
+
+        # SQL查询(根据实际表结构调整)
+        sql = f"""
+        SELECT
+            dt,
+            account_id,
+            adgroup_id AS ad_id,
+            adgroup_name AS ad_name,
+            bid_amount,
+            configured_status,
+            system_status,
+            cost,
+            view_count,
+            valid_click_count,
+            conversions_count,
+            cost_per_conversion,
+            thousand_display_price,
+            ctr,
+            create_time
+        FROM tencent_ad_creative_daily
+        WHERE {where_clause}
+        ORDER BY dt, account_id, ad_id
+        """
+
+        return self.query(sql)
+
+    def test_connection(self) -> bool:
+        """测试ODPS连接
+
+        Returns:
+            连接是否成功
+        """
+        if not self._ensure_initialized():
+            return False
+
+        try:
+            # 执行简单查询测试连接
+            sql = "SELECT 1 AS test"
+            result = self.query(sql, limit=1)
+            return len(result) > 0
+        except Exception as e:
+            logger.error(f"ODPS连接测试失败: {e}")
+            return False
+
+
+# 全局ODPS客户端实例
+_odps_client: Optional[ODPSClient] = None
+
+
+def get_odps_client(project="loghubods") -> Optional[ODPSClient]:
+    """获取全局ODPS客户端实例
+
+    Args:
+        project: ODPS项目名称
+
+    Returns:
+        ODPS客户端实例,初始化失败时返回None
+    """
+    global _odps_client
+    if _odps_client is None:
+        _odps_client = ODPSClient(project=project)
+
+    if _odps_client._ensure_initialized():
+        return _odps_client
+    else:
+        return None