""" 数据查询工具 — auto_put_ad_mini V3 V3 职责: - 创意级别明细数据采集(非广告级聚合) - 30 天增量拉取(已有 CSV 的日期跳过) - 广告状态按日快照 数据来源: - 创意效率:ODPS touliu_data + creative_data_day(创意级明细) - 广告状态:ODPS ad_put_tencent_ad(出价/预算/状态) 环境变量: ODPS_ACCESS_ID / ODPS_ACCESS_SECRET / ODPS_PROJECT """ import logging import os from datetime import datetime, timedelta from pathlib import Path from typing import List, Optional import pandas as pd from agent.tools import tool from agent.tools.models import ToolContext, ToolResult logger = logging.getLogger(__name__) _MINI_DIR = Path(__file__).resolve().parent.parent _RAW_DIR = _MINI_DIR / "outputs" / "raw" _AD_STATUS_DIR = _MINI_DIR / "outputs" / "ad_status" _MERGED_DIR = _MINI_DIR / "outputs" / "merged" # ===== ODPS 客户端(懒加载) ===== _odps_client = None def _get_odps_client(): """懒加载 ODPS 客户端,首次调用时初始化。""" global _odps_client if _odps_client is None: try: import sys autoput_ad_dir = _MINI_DIR.parent / "auto_put_ad" if str(autoput_ad_dir) not in sys.path: sys.path.insert(0, str(autoput_ad_dir)) from odps_module import ODPSClient project = os.getenv("ODPS_PROJECT", "loghubods") _odps_client = ODPSClient(project=project) logger.info("ODPS 客户端已初始化,项目: %s", project) except ImportError as e: logger.warning("odps_module 导入失败: %s", e) _odps_client = None except Exception as e: logger.warning("ODPS 客户端初始化失败: %s", e) _odps_client = None return _odps_client # ===== 日期解析 ===== def _parse_bizdate(bizdate: str) -> tuple: """解析业务日期,返回 (YYYYMMDD, YYYY-MM-DD)。""" if bizdate in ("yesterday", ""): biz = (datetime.now() - timedelta(days=1)).strftime("%Y%m%d") else: biz = bizdate.replace("-", "") biz_dash = f"{biz[:4]}-{biz[4:6]}-{biz[6:]}" return biz, biz_dash # ===== V3 创意级别 SQL ===== def _build_creative_sql(biz: str, biz_dash: str) -> str: """ 构建创意级别明细 SQL(V3)。 基于用户提供的原始 SQL,保留创意级明细(不做广告级聚合)。 返回字段: - 广告维度:account_id, ad_id, ad_name, create_time - 创意维度:creative_id, creative_name, rootsourceid, videoid, title, image_url - 业务维度:agent_name, package_name, 广告优化目标, 人群包人数 - 效率指标:cost(元), valid_click_count, conversions_count, 首层小程序打开数, 裂变0层回流数, 裂变1层回流数, 总回流人数, 总收入, view_count, key_page_view_count, key_page_uv, thousand_display_price """ return f""" SELECT '{biz}' AS bizdate ,a.account_id ,d.agent_name ,a.ad_id ,c.ad_name ,c.create_time ,CASE WHEN t2.event_name IS NULL THEN '无' ELSE t2.event_name END AS 广告优化目标 ,t1.package_name ,t4.人群包人数 ,a.creative_id ,t.creative_name ,SPLIT(t.rootsourceid,'_')[3] AS videoid ,t.rootsourceid ,t3.title ,t3.image_url ,b.view_count ,b.valid_click_count ,b.key_page_view_count ,b.key_page_uv ,b.thousand_display_price ,b.cost / 100 AS cost ,b.cost / 100 / b.valid_click_count AS 单点击成本 ,b.conversions_count ,t.首层小程序打开数 ,t.首层小程序打开数 / b.valid_click_count AS 点击转化率 ,b.cost / 100 / t.首层小程序打开数 AS 单UV成本 ,t.裂变层回流数 ,t.裂变0层回流数 ,t.裂变0层回流数 / t.首层小程序打开数 AS T0裂变系数 ,t.裂变1层回流数 ,t.裂变1层回流数 / t.首层小程序打开数 AS T1裂变系数 ,t.总回流人数 ,t.总收入 FROM ( SELECT IF(c.creative_name IS NOT NULL,c.creative_name,t.rootsourceid) AS creative_name ,t.* FROM loghubods.touliu_data t LEFT JOIN ( SELECT DISTINCT creative_name ,SPLIT(GET_JSON_OBJECT(page_spec,'$.wechat_mini_program_spec.mini_program_path'),'rootSourceId%3D')[1] AS rootsourceid ,SPLIT(SPLIT(GET_JSON_OBJECT(page_spec,'$.wechat_mini_program_spec.mini_program_path'),'rootSourceId%3D')[1],'_')[3] AS videoid ,b.creative_id FROM loghubods.ad_put_tencent_creative_components a LEFT JOIN loghubods.ad_put_tencent_creative_day b ON a.creative_id = b.creative_id WHERE page_type = 'PAGE_TYPE_WECHAT_MINI_PROGRAM' ) c ON c.rootsourceid = t.rootsourceid WHERE t.dt = '{biz}' ) t LEFT JOIN loghubods.ad_put_tencent_creative_day a ON t.creative_name = a.creative_name LEFT JOIN loghubods.ad_put_tencent_ad c ON a.ad_id = c.ad_id LEFT JOIN ( SELECT creative_id ,valid_click_count ,view_count ,cost ,conversions_count ,key_page_view_count ,key_page_uv ,thousand_display_price FROM ( SELECT creative_id ,valid_click_count ,view_count ,cost ,conversions_count ,key_page_view_count ,key_page_uv ,thousand_display_price ,ROW_NUMBER() OVER (PARTITION BY creative_id ORDER BY update_time DESC ) AS rank FROM loghubods.ad_put_tencent_creative_data_day WHERE dt = REGEXP_REPLACE('{biz}','^(\\\\d{{4}})(\\\\d{{2}})(\\\\d{{2}})$','\\\\1-\\\\2-\\\\3') ) t WHERE rank = 1 ) b ON a.creative_id = b.creative_id LEFT JOIN ( SELECT account_id ,MAX(agent_name) AS agent_name FROM loghubods.ad_put_tencent_account GROUP BY account_id ) d ON a.account_id = d.account_id LEFT JOIN ( SELECT t1.ad_id ,t1.package_id ,t1.package_name ,t1.min_people FROM ( SELECT a.ad_id ,a.package_id ,b.package_name ,b.min_people ,ROW_NUMBER() OVER (PARTITION BY a.ad_id ORDER BY CAST(b.min_people AS BIGINT) ASC ) AS rank FROM loghubods.ad_put_tencent_ad_package_mapping a LEFT JOIN loghubods.ad_put_tencent_package b ON a.package_id = b.tencent_audience_id WHERE a.is_delete = 0 ) t1 WHERE t1.rank = 1 ) t1 ON a.ad_id = t1.ad_id LEFT JOIN loghubods.dim_ad_event_enum t2 ON c.optimization_goal = t2.event_id LEFT JOIN loghubods.ad_put_tencent_creative_analysis t3 ON a.creative_id = t3.creative_id LEFT JOIN ( SELECT type ,COUNT(DISTINCT union_id) AS 人群包人数 FROM loghubods.mid_share_return_people_1year WHERE dt = MAX_PT('loghubods.mid_share_return_people_1year') GROUP BY type ) t4 ON t1.package_name = t4.type WHERE t.dt = '{biz}' AND ( 总回流人数 >= 30 OR a.account_id IS NOT NULL ) """ def _fetch_creative_data(bizdate: str) -> Optional[pd.DataFrame]: """拉取单日创意级别数据。""" client = _get_odps_client() if client is None: logger.error("ODPS 客户端未初始化") return None biz, biz_dash = _parse_bizdate(bizdate) sql = _build_creative_sql(biz, biz_dash) try: logger.info("开始拉取创意数据: %s", biz) df = client.execute_sql(sql) if df.empty: logger.warning("创意数据为空: %s", biz) return pd.DataFrame() # 类型转换(cost 已经是元,不需要再除以100) for col in ["cost", "单UV成本", "单点击成本", "总收入"]: if col in df.columns: df[col] = pd.to_numeric(df[col], errors="coerce") for col in ["首层小程序打开数", "裂变层回流数", "裂变0层回流数", "裂变1层回流数", "总回流人数", "valid_click_count", "view_count", "key_page_view_count", "key_page_uv", "conversions_count"]: if col in df.columns: df[col] = pd.to_numeric(df[col], errors="coerce").fillna(0).astype(int) logger.info("创意数据拉取成功: %s, %d 行", biz, len(df)) return df except Exception as e: logger.error("创意数据拉取失败 (%s): %s", biz, e, exc_info=True) return None # ===== 广告状态拉取 ===== def _fetch_ad_status(bizdate: str) -> Optional[pd.DataFrame]: """拉取单日广告状态快照。""" client = _get_odps_client() if client is None: return None biz, biz_dash = _parse_bizdate(bizdate) sql = f""" SELECT '{biz}' AS bizdate, ad_id, account_id, ad_name, create_time, ad_status, bid_amount, day_amount, optimization_goal, targeting FROM loghubods.ad_put_tencent_ad """ try: logger.info("开始拉取广告状态: %s", biz) df = client.execute_sql(sql) if not df.empty: df["bid_amount"] = pd.to_numeric(df["bid_amount"], errors="coerce") / 100 logger.info("广告状态拉取成功: %s, %d 行", biz, len(df)) return df except Exception as e: logger.error("广告状态拉取失败 (%s): %s", biz, e, exc_info=True) return None # ===== V3 工具:30 天增量采集 ===== @tool(description="拉取 30 天创意级别数据(增量,已有 CSV 的日期跳过)") async def fetch_creative_data( ctx: ToolContext, days: int = 7, end_date: str = "yesterday" ) -> ToolResult: """ 拉取创意级别明细数据(V3)。 Args: days: 回溯天数(默认 30) end_date: 结束日期(默认 yesterday,格式 YYYYMMDD 或 YYYY-MM-DD) Returns: ToolResult 包含采集摘要 """ _RAW_DIR.mkdir(parents=True, exist_ok=True) _AD_STATUS_DIR.mkdir(parents=True, exist_ok=True) # 解析日期范围 _, end_dash = _parse_bizdate(end_date) end_dt = datetime.strptime(end_dash, "%Y-%m-%d") dates_to_fetch = [] for i in range(days): dt = end_dt - timedelta(days=i) biz = dt.strftime("%Y%m%d") creative_csv = _RAW_DIR / f"creative_{biz}.csv" status_csv = _AD_STATUS_DIR / f"ad_status_{biz}.csv" if creative_csv.exists() and status_csv.exists(): logger.info("跳过已有数据: %s", biz) continue dates_to_fetch.append(biz) if not dates_to_fetch: return ToolResult( title="数据已完整", output=f"最近 {days} 天数据已全部存在,无需拉取" ) # 拉取缺失日期 success_count = 0 failed_dates = [] for biz in sorted(dates_to_fetch): # 创意数据 creative_csv = _RAW_DIR / f"creative_{biz}.csv" if not creative_csv.exists(): df_creative = _fetch_creative_data(biz) if df_creative is not None: df_creative.to_csv(creative_csv, index=False, encoding="utf-8-sig") logger.info("已保存: %s (%d 行)", creative_csv.name, len(df_creative)) else: failed_dates.append(biz) continue # 广告状态 status_csv = _AD_STATUS_DIR / f"ad_status_{biz}.csv" if not status_csv.exists(): df_status = _fetch_ad_status(biz) if df_status is not None: df_status.to_csv(status_csv, index=False, encoding="utf-8-sig") logger.info("已保存: %s (%d 行)", status_csv.name, len(df_status)) else: failed_dates.append(biz) continue success_count += 1 # 汇总 lines = [ f"数据采集完成(最近 {days} 天)", f"需拉取: {len(dates_to_fetch)} 天", f"成功: {success_count} 天", f"失败: {len(failed_dates)} 天", "", f"数据目录:", f" 创意数据: {_RAW_DIR}", f" 广告状态: {_AD_STATUS_DIR}", ] if failed_dates: lines.append("") lines.append("失败日期:") for d in failed_dates: lines.append(f" - {d}") return ToolResult( title=f"数据采集完成({success_count}/{len(dates_to_fetch)})", output="\n".join(lines), metadata={ "total_days": days, "to_fetch": len(dates_to_fetch), "success": success_count, "failed": len(failed_dates), "failed_dates": failed_dates, } ) # ===== 合并创意数据与广告状态(独立工具)===== # 合并后的列顺序(共 38 列) _MERGED_COLUMNS = [ # 时间 "bizdate", # 账户 "account_id", "agent_name", # 广告基本信息 "ad_id", "ad_name", "create_time", "广告优化目标", "package_name", "人群包人数", # 广告状态(来自状态表) "ad_status", "bid_amount", "day_amount", "optimization_goal_raw", "targeting", # 创意维度 "creative_id", "creative_name", "videoid", "rootsourceid", "title", "image_url", # 曝光 & 点击 "view_count", "valid_click_count", "key_page_view_count", "key_page_uv", "thousand_display_price", # 成本 "cost", "单点击成本", "conversions_count", # 回流 & 收入 "首层小程序打开数", "点击转化率", "单uv成本", "裂变层回流数", "裂变0层回流数", "t0裂变系数", "裂变1层回流数", "t1裂变系数", "总回流人数", "总收入", ] def _merge_single_day(biz: str) -> Optional[pd.DataFrame]: """ 合并单日创意数据与广告状态数据(内部函数)。 - 以创意表为主表(left join) - 统一 ad_id 为 Int64(创意表是 float,状态表是 int) - 状态表的 optimization_goal 重命名为 optimization_goal_raw - 按 _MERGED_COLUMNS 顺序输出,保存到 outputs/merged/merged_{biz}.csv Returns: 合并后的 DataFrame,或 None(文件不存在时) """ creative_csv = _RAW_DIR / f"creative_{biz}.csv" status_csv = _AD_STATUS_DIR / f"ad_status_{biz}.csv" if not creative_csv.exists(): logger.warning("创意数据不存在,跳过合并: %s", creative_csv) return None if not status_csv.exists(): logger.warning("广告状态不存在,跳过合并: %s", status_csv) return None df_creative = pd.read_csv(creative_csv) df_status = pd.read_csv(status_csv) # 统一 ad_id 类型为 Int64(可空整数,避免 float 精度问题) df_creative["ad_id"] = pd.to_numeric(df_creative["ad_id"], errors="coerce").astype("Int64") df_status["ad_id"] = pd.to_numeric(df_status["ad_id"], errors="coerce").astype("Int64") # 状态表只保留需要引入的列,重命名 optimization_goal 避免与创意表冲突 status_cols = ["ad_id", "ad_status", "bid_amount", "day_amount", "optimization_goal", "targeting"] df_status = df_status[[c for c in status_cols if c in df_status.columns]].copy() if "optimization_goal" in df_status.columns: df_status = df_status.rename(columns={"optimization_goal": "optimization_goal_raw"}) # Left join(保留所有广告,包括 SUSPEND 状态) df_merged = df_creative.merge(df_status, on="ad_id", how="left") logger.info("合并后总行数: %d", len(df_merged)) # 按指定列顺序输出(只保留存在的列,保持顺序) final_cols = [c for c in _MERGED_COLUMNS if c in df_merged.columns] df_merged = df_merged[final_cols] # 保存 _MERGED_DIR.mkdir(parents=True, exist_ok=True) out_path = _MERGED_DIR / f"merged_{biz}.csv" df_merged.to_csv(out_path, index=False, encoding="utf-8-sig") logger.info("合并完成: %s (%d 行, %d 列)", out_path.name, len(df_merged), len(df_merged.columns)) return df_merged @tool(description="合并创意数据与广告状态(批量)") async def merge_creative_data( ctx: ToolContext, days: int = 7, force: bool = False, ) -> ToolResult: """ 合并创意级别数据与广告状态数据。 职责: - 读取 outputs/raw/creative_{date}.csv - 读取 outputs/ad_status/ad_status_{date}.csv - Left join on ad_id,输出到 outputs/merged/merged_{date}.csv - 支持批量合并(最近 N 天) - 幂等操作:已存在的合并文件默认跳过(force=True 强制重新合并) Args: days: 合并最近 N 天的数据(默认 30 天) force: 是否强制重新合并已存在的文件(默认 False) Returns: 合并结果摘要 """ try: # 确定日期范围 end_dt = datetime.now() - timedelta(days=1) dates_to_merge = [] for i in range(days): date_dt = end_dt - timedelta(days=i) biz = date_dt.strftime("%Y%m%d") dates_to_merge.append(biz) success_count = 0 skip_count = 0 fail_count = 0 for biz in dates_to_merge: merged_csv = _MERGED_DIR / f"merged_{biz}.csv" # 检查是否已存在 if merged_csv.exists() and not force: skip_count += 1 continue # 执行合并 df = _merge_single_day(biz) if df is not None: success_count += 1 else: fail_count += 1 # 汇总 lines = [ f"数据合并完成(最近 {days} 天)", f"成功: {success_count} 天", f"跳过: {skip_count} 天(已存在)", f"失败: {fail_count} 天(源文件缺失)", "", f"输出目录: {_MERGED_DIR}", f"列数: 38 列", ] return ToolResult( title=f"合并完成({success_count}/{days})", output="\n".join(lines), metadata={ "success": success_count, "skip": skip_count, "fail": fail_count, "output_dir": str(_MERGED_DIR), } ) except Exception as e: logger.error("merge_creative_data 失败: %s", e, exc_info=True) return ToolResult(title="merge_creative_data 失败", output=str(e))