| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554 |
- """
- 数据查询工具 — auto_put_ad_mini V3
- V3 职责:
- - 创意级别明细数据采集(非广告级聚合)
- - 30 天增量拉取(已有 CSV 的日期跳过)
- - 广告状态按日快照
- 数据来源:
- - 创意效率:ODPS touliu_data + creative_data_day(创意级明细)
- - 广告状态:ODPS ad_put_tencent_ad(出价/预算/状态)
- 环境变量:
- ODPS_ACCESS_ID / ODPS_ACCESS_SECRET / ODPS_PROJECT
- """
- import logging
- import os
- from datetime import datetime, timedelta
- from pathlib import Path
- from typing import List, Optional
- import pandas as pd
- from agent.tools import tool
- from agent.tools.models import ToolContext, ToolResult
- logger = logging.getLogger(__name__)
- _MINI_DIR = Path(__file__).resolve().parent.parent
- _RAW_DIR = _MINI_DIR / "outputs" / "raw"
- _AD_STATUS_DIR = _MINI_DIR / "outputs" / "ad_status"
- _MERGED_DIR = _MINI_DIR / "outputs" / "merged"
- # ===== ODPS 客户端(懒加载) =====
- _odps_client = None
- def _get_odps_client():
- """懒加载 ODPS 客户端,首次调用时初始化。"""
- global _odps_client
- if _odps_client is None:
- try:
- import sys
- autoput_ad_dir = _MINI_DIR.parent / "auto_put_ad"
- if str(autoput_ad_dir) not in sys.path:
- sys.path.insert(0, str(autoput_ad_dir))
- from odps_module import ODPSClient
- project = os.getenv("ODPS_PROJECT", "loghubods")
- _odps_client = ODPSClient(project=project)
- logger.info("ODPS 客户端已初始化,项目: %s", project)
- except ImportError as e:
- logger.warning("odps_module 导入失败: %s", e)
- _odps_client = None
- except Exception as e:
- logger.warning("ODPS 客户端初始化失败: %s", e)
- _odps_client = None
- return _odps_client
- # ===== 日期解析 =====
- def _parse_bizdate(bizdate: str) -> tuple:
- """解析业务日期,返回 (YYYYMMDD, YYYY-MM-DD)。"""
- if bizdate in ("yesterday", ""):
- biz = (datetime.now() - timedelta(days=1)).strftime("%Y%m%d")
- else:
- biz = bizdate.replace("-", "")
- biz_dash = f"{biz[:4]}-{biz[4:6]}-{biz[6:]}"
- return biz, biz_dash
- # ===== V3 创意级别 SQL =====
- def _build_creative_sql(biz: str, biz_dash: str) -> str:
- """
- 构建创意级别明细 SQL(V3)。
- 基于用户提供的原始 SQL,保留创意级明细(不做广告级聚合)。
- 返回字段:
- - 广告维度:account_id, ad_id, ad_name, create_time
- - 创意维度:creative_id, creative_name, rootsourceid, videoid, title, image_url
- - 业务维度:agent_name, package_name, 广告优化目标, 人群包人数
- - 效率指标:cost(元), valid_click_count, conversions_count,
- 首层小程序打开数, 裂变0层回流数, 裂变1层回流数, 总回流人数, 总收入,
- view_count, key_page_view_count, key_page_uv, thousand_display_price
- """
- return f"""
- SELECT '{biz}' AS bizdate
- ,a.account_id
- ,d.agent_name
- ,a.ad_id
- ,c.ad_name
- ,c.create_time
- ,CASE WHEN t2.event_name IS NULL THEN '无'
- ELSE t2.event_name
- END AS 广告优化目标
- ,t1.package_name
- ,t4.人群包人数
- ,a.creative_id
- ,t.creative_name
- ,SPLIT(t.rootsourceid,'_')[3] AS videoid
- ,t.rootsourceid
- ,t3.title
- ,t3.image_url
- ,b.view_count
- ,b.valid_click_count
- ,b.key_page_view_count
- ,b.key_page_uv
- ,b.thousand_display_price
- ,b.cost / 100 AS cost
- ,b.cost / 100 / b.valid_click_count AS 单点击成本
- ,b.conversions_count
- ,t.首层小程序打开数
- ,t.首层小程序打开数 / b.valid_click_count AS 点击转化率
- ,b.cost / 100 / t.首层小程序打开数 AS 单UV成本
- ,t.裂变层回流数
- ,t.裂变0层回流数
- ,t.裂变0层回流数 / t.首层小程序打开数 AS T0裂变系数
- ,t.裂变1层回流数
- ,t.裂变1层回流数 / t.首层小程序打开数 AS T1裂变系数
- ,t.总回流人数
- ,t.总收入
- FROM (
- SELECT IF(c.creative_name IS NOT NULL,c.creative_name,t.rootsourceid) AS creative_name
- ,t.*
- FROM loghubods.touliu_data t
- LEFT JOIN (
- SELECT DISTINCT creative_name
- ,SPLIT(GET_JSON_OBJECT(page_spec,'$.wechat_mini_program_spec.mini_program_path'),'rootSourceId%3D')[1] AS rootsourceid
- ,SPLIT(SPLIT(GET_JSON_OBJECT(page_spec,'$.wechat_mini_program_spec.mini_program_path'),'rootSourceId%3D')[1],'_')[3] AS videoid
- ,b.creative_id
- FROM loghubods.ad_put_tencent_creative_components a
- LEFT JOIN loghubods.ad_put_tencent_creative_day b
- ON a.creative_id = b.creative_id
- WHERE page_type = 'PAGE_TYPE_WECHAT_MINI_PROGRAM'
- ) c
- ON c.rootsourceid = t.rootsourceid
- WHERE t.dt = '{biz}'
- ) t
- LEFT JOIN loghubods.ad_put_tencent_creative_day a
- ON t.creative_name = a.creative_name
- LEFT JOIN loghubods.ad_put_tencent_ad c
- ON a.ad_id = c.ad_id
- LEFT JOIN (
- SELECT creative_id
- ,valid_click_count
- ,view_count
- ,cost
- ,conversions_count
- ,key_page_view_count
- ,key_page_uv
- ,thousand_display_price
- FROM (
- SELECT creative_id
- ,valid_click_count
- ,view_count
- ,cost
- ,conversions_count
- ,key_page_view_count
- ,key_page_uv
- ,thousand_display_price
- ,ROW_NUMBER() OVER (PARTITION BY creative_id ORDER BY update_time DESC ) AS rank
- FROM loghubods.ad_put_tencent_creative_data_day
- WHERE dt = REGEXP_REPLACE('{biz}','^(\\\\d{{4}})(\\\\d{{2}})(\\\\d{{2}})$','\\\\1-\\\\2-\\\\3')
- ) t
- WHERE rank = 1
- ) b
- ON a.creative_id = b.creative_id
- LEFT JOIN (
- SELECT account_id
- ,MAX(agent_name) AS agent_name
- FROM loghubods.ad_put_tencent_account
- GROUP BY account_id
- ) d
- ON a.account_id = d.account_id
- LEFT JOIN (
- SELECT t1.ad_id
- ,t1.package_id
- ,t1.package_name
- ,t1.min_people
- FROM (
- SELECT a.ad_id
- ,a.package_id
- ,b.package_name
- ,b.min_people
- ,ROW_NUMBER() OVER (PARTITION BY a.ad_id ORDER BY CAST(b.min_people AS BIGINT) ASC ) AS rank
- FROM loghubods.ad_put_tencent_ad_package_mapping a
- LEFT JOIN loghubods.ad_put_tencent_package b
- ON a.package_id = b.tencent_audience_id
- WHERE a.is_delete = 0
- ) t1
- WHERE t1.rank = 1
- ) t1
- ON a.ad_id = t1.ad_id
- LEFT JOIN loghubods.dim_ad_event_enum t2
- ON c.optimization_goal = t2.event_id
- LEFT JOIN loghubods.ad_put_tencent_creative_analysis t3
- ON a.creative_id = t3.creative_id
- LEFT JOIN (
- SELECT type
- ,COUNT(DISTINCT union_id) AS 人群包人数
- FROM loghubods.mid_share_return_people_1year
- WHERE dt = MAX_PT('loghubods.mid_share_return_people_1year')
- GROUP BY type
- ) t4
- ON t1.package_name = t4.type
- WHERE t.dt = '{biz}'
- AND (
- 总回流人数 >= 30
- OR a.account_id IS NOT NULL
- )
- """
- def _fetch_creative_data(bizdate: str) -> Optional[pd.DataFrame]:
- """拉取单日创意级别数据。"""
- client = _get_odps_client()
- if client is None:
- logger.error("ODPS 客户端未初始化")
- return None
- biz, biz_dash = _parse_bizdate(bizdate)
- sql = _build_creative_sql(biz, biz_dash)
- try:
- logger.info("开始拉取创意数据: %s", biz)
- df = client.execute_sql(sql)
- if df.empty:
- logger.warning("创意数据为空: %s", biz)
- return pd.DataFrame()
- # 类型转换(cost 已经是元,不需要再除以100)
- for col in ["cost", "单UV成本", "单点击成本", "总收入"]:
- if col in df.columns:
- df[col] = pd.to_numeric(df[col], errors="coerce")
- for col in ["首层小程序打开数", "裂变层回流数", "裂变0层回流数", "裂变1层回流数",
- "总回流人数", "valid_click_count", "view_count",
- "key_page_view_count", "key_page_uv", "conversions_count"]:
- if col in df.columns:
- df[col] = pd.to_numeric(df[col], errors="coerce").fillna(0).astype(int)
- logger.info("创意数据拉取成功: %s, %d 行", biz, len(df))
- return df
- except Exception as e:
- logger.error("创意数据拉取失败 (%s): %s", biz, e, exc_info=True)
- return None
- # ===== 广告状态拉取 =====
- def _fetch_ad_status(bizdate: str) -> Optional[pd.DataFrame]:
- """拉取单日广告状态快照。"""
- client = _get_odps_client()
- if client is None:
- return None
- biz, biz_dash = _parse_bizdate(bizdate)
- sql = f"""
- SELECT
- '{biz}' AS bizdate,
- ad_id,
- account_id,
- ad_name,
- create_time,
- ad_status,
- bid_amount,
- day_amount,
- optimization_goal,
- targeting
- FROM loghubods.ad_put_tencent_ad
- """
- try:
- logger.info("开始拉取广告状态: %s", biz)
- df = client.execute_sql(sql)
- if not df.empty:
- df["bid_amount"] = pd.to_numeric(df["bid_amount"], errors="coerce") / 100
- logger.info("广告状态拉取成功: %s, %d 行", biz, len(df))
- return df
- except Exception as e:
- logger.error("广告状态拉取失败 (%s): %s", biz, e, exc_info=True)
- return None
- # ===== V3 工具:30 天增量采集 =====
- @tool(description="拉取 30 天创意级别数据(增量,已有 CSV 的日期跳过)")
- async def fetch_creative_data(
- ctx: ToolContext,
- days: int = 7,
- end_date: str = "yesterday"
- ) -> ToolResult:
- """
- 拉取创意级别明细数据(V3)。
- Args:
- days: 回溯天数(默认 30)
- end_date: 结束日期(默认 yesterday,格式 YYYYMMDD 或 YYYY-MM-DD)
- Returns:
- ToolResult 包含采集摘要
- """
- _RAW_DIR.mkdir(parents=True, exist_ok=True)
- _AD_STATUS_DIR.mkdir(parents=True, exist_ok=True)
- # 解析日期范围
- _, end_dash = _parse_bizdate(end_date)
- end_dt = datetime.strptime(end_dash, "%Y-%m-%d")
- dates_to_fetch = []
- for i in range(days):
- dt = end_dt - timedelta(days=i)
- biz = dt.strftime("%Y%m%d")
- creative_csv = _RAW_DIR / f"creative_{biz}.csv"
- status_csv = _AD_STATUS_DIR / f"ad_status_{biz}.csv"
- if creative_csv.exists() and status_csv.exists():
- logger.info("跳过已有数据: %s", biz)
- continue
- dates_to_fetch.append(biz)
- if not dates_to_fetch:
- return ToolResult(
- title="数据已完整",
- output=f"最近 {days} 天数据已全部存在,无需拉取"
- )
- # 拉取缺失日期
- success_count = 0
- failed_dates = []
- for biz in sorted(dates_to_fetch):
- # 创意数据
- creative_csv = _RAW_DIR / f"creative_{biz}.csv"
- if not creative_csv.exists():
- df_creative = _fetch_creative_data(biz)
- if df_creative is not None:
- df_creative.to_csv(creative_csv, index=False, encoding="utf-8-sig")
- logger.info("已保存: %s (%d 行)", creative_csv.name, len(df_creative))
- else:
- failed_dates.append(biz)
- continue
- # 广告状态
- status_csv = _AD_STATUS_DIR / f"ad_status_{biz}.csv"
- if not status_csv.exists():
- df_status = _fetch_ad_status(biz)
- if df_status is not None:
- df_status.to_csv(status_csv, index=False, encoding="utf-8-sig")
- logger.info("已保存: %s (%d 行)", status_csv.name, len(df_status))
- else:
- failed_dates.append(biz)
- continue
- success_count += 1
- # 汇总
- lines = [
- f"数据采集完成(最近 {days} 天)",
- f"需拉取: {len(dates_to_fetch)} 天",
- f"成功: {success_count} 天",
- f"失败: {len(failed_dates)} 天",
- "",
- f"数据目录:",
- f" 创意数据: {_RAW_DIR}",
- f" 广告状态: {_AD_STATUS_DIR}",
- ]
- if failed_dates:
- lines.append("")
- lines.append("失败日期:")
- for d in failed_dates:
- lines.append(f" - {d}")
- return ToolResult(
- title=f"数据采集完成({success_count}/{len(dates_to_fetch)})",
- output="\n".join(lines),
- metadata={
- "total_days": days,
- "to_fetch": len(dates_to_fetch),
- "success": success_count,
- "failed": len(failed_dates),
- "failed_dates": failed_dates,
- }
- )
- # ===== 合并创意数据与广告状态(独立工具)=====
- # 合并后的列顺序(共 38 列)
- _MERGED_COLUMNS = [
- # 时间
- "bizdate",
- # 账户
- "account_id", "agent_name",
- # 广告基本信息
- "ad_id", "ad_name", "create_time", "广告优化目标", "package_name", "人群包人数",
- # 广告状态(来自状态表)
- "ad_status", "bid_amount", "day_amount", "optimization_goal_raw", "targeting",
- # 创意维度
- "creative_id", "creative_name", "videoid", "rootsourceid", "title", "image_url",
- # 曝光 & 点击
- "view_count", "valid_click_count", "key_page_view_count", "key_page_uv", "thousand_display_price",
- # 成本
- "cost", "单点击成本", "conversions_count",
- # 回流 & 收入
- "首层小程序打开数", "点击转化率", "单uv成本",
- "裂变层回流数", "裂变0层回流数", "t0裂变系数",
- "裂变1层回流数", "t1裂变系数",
- "总回流人数", "总收入",
- ]
- def _merge_single_day(biz: str) -> Optional[pd.DataFrame]:
- """
- 合并单日创意数据与广告状态数据(内部函数)。
- - 以创意表为主表(left join)
- - 统一 ad_id 为 Int64(创意表是 float,状态表是 int)
- - 状态表的 optimization_goal 重命名为 optimization_goal_raw
- - 按 _MERGED_COLUMNS 顺序输出,保存到 outputs/merged/merged_{biz}.csv
- Returns:
- 合并后的 DataFrame,或 None(文件不存在时)
- """
- creative_csv = _RAW_DIR / f"creative_{biz}.csv"
- status_csv = _AD_STATUS_DIR / f"ad_status_{biz}.csv"
- if not creative_csv.exists():
- logger.warning("创意数据不存在,跳过合并: %s", creative_csv)
- return None
- if not status_csv.exists():
- logger.warning("广告状态不存在,跳过合并: %s", status_csv)
- return None
- df_creative = pd.read_csv(creative_csv)
- df_status = pd.read_csv(status_csv)
- # 统一 ad_id 类型为 Int64(可空整数,避免 float 精度问题)
- df_creative["ad_id"] = pd.to_numeric(df_creative["ad_id"], errors="coerce").astype("Int64")
- df_status["ad_id"] = pd.to_numeric(df_status["ad_id"], errors="coerce").astype("Int64")
- # 状态表只保留需要引入的列,重命名 optimization_goal 避免与创意表冲突
- status_cols = ["ad_id", "ad_status", "bid_amount", "day_amount", "optimization_goal", "targeting"]
- df_status = df_status[[c for c in status_cols if c in df_status.columns]].copy()
- if "optimization_goal" in df_status.columns:
- df_status = df_status.rename(columns={"optimization_goal": "optimization_goal_raw"})
- # Left join(保留所有广告,包括 SUSPEND 状态)
- df_merged = df_creative.merge(df_status, on="ad_id", how="left")
- logger.info("合并后总行数: %d", len(df_merged))
- # 按指定列顺序输出(只保留存在的列,保持顺序)
- final_cols = [c for c in _MERGED_COLUMNS if c in df_merged.columns]
- df_merged = df_merged[final_cols]
- # 保存
- _MERGED_DIR.mkdir(parents=True, exist_ok=True)
- out_path = _MERGED_DIR / f"merged_{biz}.csv"
- df_merged.to_csv(out_path, index=False, encoding="utf-8-sig")
- logger.info("合并完成: %s (%d 行, %d 列)", out_path.name, len(df_merged), len(df_merged.columns))
- return df_merged
- @tool(description="合并创意数据与广告状态(批量)")
- async def merge_creative_data(
- ctx: ToolContext,
- days: int = 7,
- force: bool = False,
- ) -> ToolResult:
- """
- 合并创意级别数据与广告状态数据。
- 职责:
- - 读取 outputs/raw/creative_{date}.csv
- - 读取 outputs/ad_status/ad_status_{date}.csv
- - Left join on ad_id,输出到 outputs/merged/merged_{date}.csv
- - 支持批量合并(最近 N 天)
- - 幂等操作:已存在的合并文件默认跳过(force=True 强制重新合并)
- Args:
- days: 合并最近 N 天的数据(默认 30 天)
- force: 是否强制重新合并已存在的文件(默认 False)
- Returns:
- 合并结果摘要
- """
- try:
- # 确定日期范围
- end_dt = datetime.now() - timedelta(days=1)
- dates_to_merge = []
- for i in range(days):
- date_dt = end_dt - timedelta(days=i)
- biz = date_dt.strftime("%Y%m%d")
- dates_to_merge.append(biz)
- success_count = 0
- skip_count = 0
- fail_count = 0
- for biz in dates_to_merge:
- merged_csv = _MERGED_DIR / f"merged_{biz}.csv"
- # 检查是否已存在
- if merged_csv.exists() and not force:
- skip_count += 1
- continue
- # 执行合并
- df = _merge_single_day(biz)
- if df is not None:
- success_count += 1
- else:
- fail_count += 1
- # 汇总
- lines = [
- f"数据合并完成(最近 {days} 天)",
- f"成功: {success_count} 天",
- f"跳过: {skip_count} 天(已存在)",
- f"失败: {fail_count} 天(源文件缺失)",
- "",
- f"输出目录: {_MERGED_DIR}",
- f"列数: 38 列",
- ]
- return ToolResult(
- title=f"合并完成({success_count}/{days})",
- output="\n".join(lines),
- metadata={
- "success": success_count,
- "skip": skip_count,
- "fail": fail_count,
- "output_dir": str(_MERGED_DIR),
- }
- )
- except Exception as e:
- logger.error("merge_creative_data 失败: %s", e, exc_info=True)
- return ToolResult(title="merge_creative_data 失败", output=str(e))
|