| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486 |
- """
- 数据查询工具 — 投放数据仓库查询
- 对接 ODPS(MaxCompute)数据仓库,支持多维度查询和聚合分析。
- ODPS 客户端位于 examples/autoput/odps_module.py。
- 环境变量(可选,覆盖 odps_module.py 中的默认配置):
- ODPS_ACCESS_ID 阿里云 AccessKey ID
- ODPS_ACCESS_SECRET 阿里云 AccessKey Secret
- ODPS_PROJECT MaxCompute 项目名,默认 loghubods
- """
- import logging
- import os
- from datetime import datetime, timedelta
- from typing import Any, Dict, List, Optional
- from agent.tools import tool
- from agent.tools.models import ToolContext, ToolResult
- logger = logging.getLogger(__name__)
- # ===== ODPS 客户端(懒加载,避免导入时失败) =====
- _odps_client = None
- def _get_odps_client():
- """懒加载 ODPS 客户端,首次调用时初始化。"""
- global _odps_client
- if _odps_client is None:
- try:
- # 使用相对于项目根目录的 autoput 目录下的 odps_module
- import sys
- from pathlib import Path
- # 将 autoput 目录加入 sys.path,以便直接 import odps_module
- autoput_dir = Path(__file__).parent.parent
- if str(autoput_dir) not in sys.path:
- sys.path.insert(0, str(autoput_dir))
- from odps_module import ODPSClient
- project = os.getenv("ODPS_PROJECT", "loghubods")
- _odps_client = ODPSClient(project=project)
- logger.info("ODPS 客户端已初始化,项目: %s", project)
- except ImportError as e:
- logger.warning("odps_module 导入失败(可能缺少 pyodps 依赖): %s", e)
- _odps_client = None
- except Exception as e:
- logger.warning("ODPS 客户端初始化失败: %s", e)
- _odps_client = None
- return _odps_client
- # ===== SQL 模板 =====
- # creative_detail: 小程序投放创意维度天级表现数据(来自 xcx_touliu_creative_detail)
- # bizdate 格式: YYYYMMDD
- _SQL_CREATIVE_DETAIL = """
- SELECT
- a.account_id,
- a.ad_id,
- c.ad_name,
- SPLIT(t.rootsourceid,'_')[3] AS videoid,
- t.creative_name,
- t.rootsourceid,
- t.总回流人数,
- t.总收入,
- b.cost/100 AS cost,
- b.valid_click_count,
- b.cost/100 / t.首层小程序打开数 AS cpa_open,
- b.cost/100 / b.valid_click_count AS cpc,
- t.首层小程序打开数,
- t.首层小程序打开数 / b.valid_click_count AS open_rate,
- t.裂变层回流数,
- t.裂变0层回流数,
- t.裂变0层回流数 / t.首层小程序打开数 AS fission0_rate,
- t.裂变1层回流数,
- t.裂变1层回流数 / t.首层小程序打开数 AS fission1_rate,
- d.agent_name,
- t1.package_name,
- CASE WHEN t2.event_name IS NULL THEN '无' ELSE t2.event_name END AS 广告优化目标
- FROM (
- SELECT
- IF(c.creative_name IS NOT NULL, c.creative_name, t.rootsourceid) AS creative_name,
- t.*
- FROM loghubods.touliu_data t
- LEFT JOIN (
- SELECT DISTINCT
- creative_name,
- SPLIT(GET_JSON_OBJECT(page_spec,'$.wechat_mini_program_spec.mini_program_path'),'rootSourceId%3D')[1] AS rootsourceid,
- SPLIT(SPLIT(GET_JSON_OBJECT(page_spec,'$.wechat_mini_program_spec.mini_program_path'),'rootSourceId%3D')[1],'_')[3] AS videoid
- FROM loghubods.ad_put_tencent_creative_components a
- LEFT JOIN loghubods.ad_put_tencent_creative_day b ON a.creative_id = b.creative_id
- WHERE page_type = 'PAGE_TYPE_WECHAT_MINI_PROGRAM'
- ) c ON c.rootsourceid = t.rootsourceid
- WHERE t.dt = '{bizdate}'
- ) t
- LEFT JOIN loghubods.ad_put_tencent_creative_day a ON t.creative_name = a.creative_name
- LEFT JOIN loghubods.ad_put_tencent_ad c ON a.ad_id = c.ad_id
- LEFT JOIN (
- SELECT creative_id, valid_click_count, view_count, cost, conversions_count
- FROM (
- SELECT
- creative_id, valid_click_count, view_count, cost, conversions_count,
- ROW_NUMBER() OVER (PARTITION BY creative_id ORDER BY update_time DESC) AS rank
- FROM loghubods.ad_put_tencent_creative_data_day
- WHERE dt = '{bizdate_dash}'
- ) t WHERE rank = 1
- ) b ON a.creative_id = b.creative_id
- LEFT JOIN (
- SELECT account_id, MAX(agent_name) AS agent_name
- FROM loghubods.ad_put_tencent_account
- GROUP BY account_id
- ) d ON a.account_id = d.account_id
- LEFT JOIN (
- SELECT t1.ad_id, t1.package_id, t1.package_name, t1.min_people
- FROM (
- SELECT
- a.ad_id, a.package_id, b.package_name, b.min_people,
- ROW_NUMBER() OVER (PARTITION BY a.ad_id ORDER BY CAST(b.min_people AS BIGINT) ASC) AS rank
- FROM loghubods.ad_put_tencent_ad_package_mapping a
- LEFT JOIN loghubods.ad_put_tencent_package b ON a.package_id = b.tencent_audience_id
- WHERE a.is_delete = 0
- ) t1 WHERE t1.rank = 1
- ) t1 ON a.ad_id = t1.ad_id
- LEFT JOIN loghubods.dim_ad_event_enum t2 ON c.optimization_goal = t2.event_id
- WHERE t.dt = '{bizdate}'
- AND (t.总回流人数 >= 30 OR a.account_id IS NOT NULL)
- """
- # 其他查询类型占位模板(后续按需填充真实 SQL)
- _SQL_TEMPLATES: Dict[str, str] = {
- "account_summary": """
- SELECT
- stat_date AS date,
- SUM(cost) AS cost,
- SUM(impression) AS impression,
- SUM(click) AS click,
- SUM(conversion) AS conversion,
- SUM(click) / NULLIF(SUM(impression), 0) AS ctr,
- SUM(conversion) / NULLIF(SUM(click), 0) AS cvr,
- SUM(cost) / NULLIF(SUM(conversion), 0) AS cpa
- FROM ad_report_daily
- WHERE stat_date >= '{start_date}' AND stat_date <= '{end_date}'
- GROUP BY stat_date
- ORDER BY stat_date DESC
- LIMIT {limit}
- """,
- "ad_detail": """
- SELECT
- stat_date AS date,
- adgroup_id,
- SUM(cost) AS cost,
- SUM(impression) AS impression,
- SUM(click) AS click,
- SUM(conversion) AS conversion,
- SUM(click) / NULLIF(SUM(impression), 0) AS ctr,
- SUM(cost) / NULLIF(SUM(click), 0) AS cpc,
- SUM(cost) / NULLIF(SUM(conversion), 0) AS cpa
- FROM ad_report_daily
- WHERE stat_date >= '{start_date}' AND stat_date <= '{end_date}'
- GROUP BY stat_date, adgroup_id
- ORDER BY cost DESC
- LIMIT {limit}
- """,
- "audience_analysis": """
- SELECT
- audience_id,
- audience_name,
- SUM(cost) AS cost,
- SUM(conversion) AS conversion,
- SUM(cost) / NULLIF(SUM(conversion), 0) AS cpa,
- SUM(conversion) / NULLIF(SUM(click), 0) AS cvr
- FROM ad_audience_report
- WHERE stat_date >= '{start_date}' AND stat_date <= '{end_date}'
- GROUP BY audience_id, audience_name
- ORDER BY cpa ASC
- LIMIT {limit}
- """,
- "creative_performance": """
- SELECT
- stat_date AS date,
- dynamic_creative_id AS creative_id,
- SUM(cost) AS cost,
- SUM(impression) AS impression,
- SUM(click) AS click,
- SUM(click) / NULLIF(SUM(impression), 0) AS ctr,
- SUM(conversion) / NULLIF(SUM(click), 0) AS cvr,
- SUM(cost) / NULLIF(SUM(conversion), 0) AS cpa
- FROM ad_creative_report_daily
- WHERE stat_date >= '{start_date}' AND stat_date <= '{end_date}'
- GROUP BY stat_date, dynamic_creative_id
- ORDER BY cost DESC
- LIMIT {limit}
- """,
- "cost_trend": """
- SELECT
- stat_date AS date,
- SUM(cost) AS cost,
- SUM(conversion) AS conversion,
- SUM(cost) / NULLIF(SUM(conversion), 0) AS cpa
- FROM ad_report_daily
- WHERE stat_date >= '{start_date}' AND stat_date <= '{end_date}'
- GROUP BY stat_date
- ORDER BY stat_date ASC
- LIMIT {limit}
- """,
- "hourly_distribution": """
- SELECT
- stat_hour AS hour,
- SUM(cost) AS cost,
- SUM(click) AS click,
- SUM(conversion) AS conversion
- FROM ad_report_hourly
- WHERE stat_date = '{start_date}'
- GROUP BY stat_hour
- ORDER BY stat_hour ASC
- LIMIT 24
- """,
- }
- def _bizdate_from_params(params: Dict[str, Any]) -> str:
- """从 date_range 提取 bizdate(YYYYMMDD)。支持 'yesterday' 关键字。"""
- date_range = params.get("date_range", {})
- bizdate = date_range.get("bizdate") or date_range.get("start_date", "")
- if bizdate in ("yesterday", ""):
- bizdate = (datetime.now() - timedelta(days=1)).strftime("%Y%m%d")
- # 如果传入的是 YYYY-MM-DD 格式,转成 YYYYMMDD
- bizdate = bizdate.replace("-", "")
- return bizdate
- def _build_sql(query_type: str, params: Dict[str, Any]) -> str:
- """根据查询类型和参数构建 SQL。"""
- # creative_detail 走独立模板,使用 bizdate(YYYYMMDD)
- if query_type == "creative_detail":
- bizdate = _bizdate_from_params(params)
- bizdate_dash = f"{bizdate[:4]}-{bizdate[4:6]}-{bizdate[6:]}"
- return _SQL_CREATIVE_DETAIL.format(bizdate=bizdate, bizdate_dash=bizdate_dash).strip()
- template = _SQL_TEMPLATES.get(query_type)
- if not template:
- raise ValueError(f"不支持的 query_type: {query_type},可选: creative_detail, " + ", ".join(_SQL_TEMPLATES.keys()))
- date_range = params.get("date_range", {})
- start_date = date_range.get("start_date", "")
- end_date = date_range.get("end_date", start_date)
- today = datetime.now().strftime("%Y-%m-%d")
- start_date = start_date.replace("today", today)
- end_date = end_date.replace("today", today)
- return template.format(
- start_date=start_date,
- end_date=end_date,
- limit=params.get("limit", 100),
- ).strip()
- def _query_warehouse(sql_or_desc: str, params: Dict[str, Any]) -> List[Dict[str, Any]]:
- """
- 连接 ODPS 数据仓库执行查询。
- 如果 ODPS 客户端不可用(依赖未安装/配置缺失),返回空列表并记录警告。
- """
- client = _get_odps_client()
- if client is None:
- logger.warning(
- "ODPS 客户端不可用,data_query 将返回空结果。"
- "请确认已安装 pyodps 并配置正确的访问凭证。"
- )
- raise NotImplementedError(
- "ODPS 客户端未初始化。请确认:\n"
- "1. 已安装 pyodps:pip install pyodps\n"
- "2. examples/autoput/odps_module.py 中的凭证配置正确\n"
- f"查询类型: {sql_or_desc}\n参数: {params}"
- )
- try:
- sql = _build_sql(sql_or_desc, params)
- logger.debug("执行 ODPS SQL:\n%s", sql)
- df = client.execute_sql(sql)
- return df.to_dict(orient="records")
- except ValueError as e:
- raise NotImplementedError(str(e)) from e
- except Exception as e:
- logger.error("ODPS 查询失败: %s", e)
- raise
- @tool(
- description="查询投放数据仓库,支持账户汇总/广告明细/人群分析/素材效果/成本趋势等多种查询类型",
- hidden_params=["context"],
- )
- async def data_query(
- query_type: str,
- date_range: Dict[str, str],
- dimensions: Optional[List[str]] = None,
- metrics: Optional[List[str]] = None,
- filters: Optional[Dict[str, Any]] = None,
- order_by: Optional[str] = None,
- limit: int = 100,
- context: Optional[ToolContext] = None,
- ) -> ToolResult:
- """从数据仓库查询投放数据,支持多维度聚合分析。
- query_type 可选值:
- - account_summary: 账户整体消耗汇总(按日)
- - ad_detail: 广告明细(消耗/点击/转化,按广告ID)
- - audience_analysis: 人群效果分析(各人群包的转化率/成本)
- - creative_performance: 素材效果(CTR/CVR/消耗,按创意ID)
- - cost_trend: 成本趋势(按日/小时的CPA趋势)
- - hourly_distribution: 小时分布(各时段消耗占比)
- Args:
- query_type: 查询类型(见上方说明)
- date_range: {"start_date": "2026-04-01", "end_date": "2026-04-07"}
- dimensions: 分组维度,如 ["date", "adgroup_id", "creative_id"]
- metrics: 查询指标,如 ["cost", "impression", "click", "conversion", "ctr", "cvr", "cpa"]
- filters: 过滤条件,如 {"adgroup_id": [123, 456], "status": "ACTIVE"}
- order_by: 排序字段,如 "cost DESC"
- limit: 返回条数上限
- """
- default_metrics = {
- "account_summary": ["cost", "impression", "click", "conversion", "ctr", "cvr", "cpa"],
- "ad_detail": ["cost", "impression", "click", "conversion", "ctr", "cpc", "cpa", "roi"],
- "audience_analysis": ["cost", "conversion", "cpa", "cvr", "reach"],
- "creative_performance": ["cost", "impression", "click", "ctr", "cvr", "cpa"],
- "cost_trend": ["cost", "cpa", "conversion"],
- "hourly_distribution": ["cost", "click", "conversion"],
- }
- actual_metrics = metrics or default_metrics.get(query_type, ["cost", "click", "conversion"])
- actual_dimensions = dimensions or ["date"]
- params = {
- "query_type": query_type,
- "date_range": date_range,
- "dimensions": actual_dimensions,
- "metrics": actual_metrics,
- "filters": filters or {},
- "order_by": order_by or f"{actual_metrics[0]} DESC",
- "limit": limit,
- }
- try:
- rows = _query_warehouse(query_type, params)
- if not rows:
- return ToolResult(title=f"数据查询({query_type})", output="该条件下无数据")
- # 格式化结果为可读文本
- header = " | ".join(actual_dimensions + actual_metrics)
- lines = [header, "-" * len(header)]
- for row in rows[:20]:
- values = []
- for col in actual_dimensions + actual_metrics:
- v = row.get(col, "-")
- if col == "cost" and isinstance(v, (int, float)):
- v = f"{v/100:.2f}元"
- elif col in ("ctr", "cvr") and isinstance(v, (int, float)):
- v = f"{v:.2%}"
- elif col == "cpa" and isinstance(v, (int, float)):
- v = f"{v/100:.2f}元"
- values.append(str(v))
- lines.append(" | ".join(values))
- if len(rows) > 20:
- lines.append(f"...共 {len(rows)} 条,仅显示前20条")
- output = "\n".join(lines)
- return ToolResult(
- title=f"数据查询({query_type},{len(rows)}条)",
- output=output,
- metadata={"rows": rows, "query_params": params},
- )
- except NotImplementedError as e:
- return ToolResult(title="data_query 未实现", output=str(e))
- except Exception as e:
- logger.error("data_query 失败: %s", e)
- return ToolResult(title="data_query 失败", output=str(e))
- @tool(description="数据聚合分析:对查询结果进行趋势分析、环比/同比对比")
- async def data_aggregate(
- query_type: str,
- date_range: Dict[str, str],
- aggregation: str = "trend",
- compare_type: Optional[str] = None,
- filters: Optional[Dict[str, Any]] = None,
- ) -> ToolResult:
- """对投放数据进行聚合分析。
- Args:
- query_type: 基础查询类型(同 data_query 的 query_type)
- date_range: 分析时间范围
- aggregation: 聚合方式 "trend"(趋势)/ "sum"(汇总)/ "compare"(对比)
- compare_type: 对比方式 "day_over_day"(日环比)/ "week_over_week"(周同比)
- filters: 过滤条件
- """
- try:
- # 查询当前期数据
- current_params = {
- "query_type": query_type,
- "date_range": date_range,
- "dimensions": ["date"],
- "metrics": ["cost", "conversion", "cpa", "ctr"],
- "filters": filters or {},
- "order_by": "date ASC",
- "limit": 90,
- }
- current_rows = _query_warehouse(f"{query_type}_aggregate", current_params)
- total_cost = sum(r.get("cost", 0) for r in current_rows) / 100
- total_conv = sum(r.get("conversion", 0) for r in current_rows)
- avg_cpa = (total_cost * 100 / total_conv) if total_conv > 0 else 0
- output_lines = [
- f"【{aggregation} 分析】{date_range['start_date']} ~ {date_range['end_date']}",
- f"总消耗: {total_cost:.2f} 元 | 总转化: {total_conv} | 平均CPA: {avg_cpa/100:.2f} 元",
- ]
- if compare_type and current_rows:
- output_lines.append(f"({compare_type} 对比数据需实现历史周期查询)")
- return ToolResult(title="数据聚合分析", output="\n".join(output_lines), metadata={"rows": current_rows})
- except NotImplementedError as e:
- return ToolResult(title="data_aggregate 未实现", output=str(e))
- except Exception as e:
- return ToolResult(title="data_aggregate 失败", output=str(e))
- @tool(description="查询广告当前状态(出价、预算、定向等)")
- async def get_ad_current_status(
- account_id: int,
- ad_ids: Optional[List[int]] = None,
- context: Optional[ToolContext] = None,
- ) -> ToolResult:
- """
- 从 ODPS ad_put_tencent_ad 表查询广告当前状态
- Args:
- account_id: 账户ID
- ad_ids: 广告ID列表(可选,不传则查询账户下所有正常广告)
- Returns:
- ToolResult: 包含广告状态列表
- """
- try:
- client = _get_odps_client()
- if not client:
- return ToolResult(title="get_ad_current_status 失败", output="ODPS 客户端未初始化")
- sql = f"""
- SELECT
- ad_id,
- ad_name,
- account_id,
- bid_amount,
- day_amount,
- ad_status,
- optimization_goal,
- targeting,
- create_time
- FROM loghubods.ad_put_tencent_ad
- WHERE account_id = {account_id}
- AND ad_status = 'AD_STATUS_NORMAL'
- """
- if ad_ids:
- # 过滤掉 None 和 NaN 值
- valid_ad_ids = [int(ad_id) for ad_id in ad_ids if ad_id is not None and str(ad_id) != 'nan']
- if valid_ad_ids:
- ad_ids_str = ",".join(map(str, valid_ad_ids))
- sql += f" AND ad_id IN ({ad_ids_str})"
- logger.info("执行 SQL: %s", sql)
- df = client.execute_sql(sql)
- # 转换为字典列表
- data = df.to_dict('records')
- output = f"查询到 {len(data)} 个广告的当前状态"
- return ToolResult(title="广告当前状态", output=output, metadata={"rows": data})
- except Exception as e:
- logger.error("get_ad_current_status 失败: %s", e, exc_info=True)
- return ToolResult(title="get_ad_current_status 失败", output=str(e))
|