""" 数据查询工具 — 投放数据仓库查询 对接 ODPS(MaxCompute)数据仓库,支持多维度查询和聚合分析。 ODPS 客户端位于 examples/autoput/odps_module.py。 环境变量(可选,覆盖 odps_module.py 中的默认配置): ODPS_ACCESS_ID 阿里云 AccessKey ID ODPS_ACCESS_SECRET 阿里云 AccessKey Secret ODPS_PROJECT MaxCompute 项目名,默认 loghubods """ import logging import os from datetime import datetime, timedelta from typing import Any, Dict, List, Optional from agent.tools import tool from agent.tools.models import ToolContext, ToolResult logger = logging.getLogger(__name__) # ===== ODPS 客户端(懒加载,避免导入时失败) ===== _odps_client = None def _get_odps_client(): """懒加载 ODPS 客户端,首次调用时初始化。""" global _odps_client if _odps_client is None: try: # 使用相对于项目根目录的 autoput 目录下的 odps_module import sys from pathlib import Path # 将 autoput 目录加入 sys.path,以便直接 import odps_module autoput_dir = Path(__file__).parent.parent if str(autoput_dir) not in sys.path: sys.path.insert(0, str(autoput_dir)) from odps_module import ODPSClient project = os.getenv("ODPS_PROJECT", "loghubods") _odps_client = ODPSClient(project=project) logger.info("ODPS 客户端已初始化,项目: %s", project) except ImportError as e: logger.warning("odps_module 导入失败(可能缺少 pyodps 依赖): %s", e) _odps_client = None except Exception as e: logger.warning("ODPS 客户端初始化失败: %s", e) _odps_client = None return _odps_client # ===== SQL 模板 ===== # creative_detail: 小程序投放创意维度天级表现数据(来自 xcx_touliu_creative_detail) # bizdate 格式: YYYYMMDD _SQL_CREATIVE_DETAIL = """ SELECT a.account_id, a.ad_id, c.ad_name, SPLIT(t.rootsourceid,'_')[3] AS videoid, t.creative_name, t.rootsourceid, t.总回流人数, t.总收入, b.cost/100 AS cost, b.valid_click_count, b.cost/100 / t.首层小程序打开数 AS cpa_open, b.cost/100 / b.valid_click_count AS cpc, t.首层小程序打开数, t.首层小程序打开数 / b.valid_click_count AS open_rate, t.裂变层回流数, t.裂变0层回流数, t.裂变0层回流数 / t.首层小程序打开数 AS fission0_rate, t.裂变1层回流数, t.裂变1层回流数 / t.首层小程序打开数 AS fission1_rate, d.agent_name, t1.package_name, CASE WHEN t2.event_name IS NULL THEN '无' ELSE t2.event_name END AS 广告优化目标 FROM ( SELECT IF(c.creative_name IS NOT NULL, c.creative_name, t.rootsourceid) AS creative_name, t.* FROM loghubods.touliu_data t LEFT JOIN ( SELECT DISTINCT creative_name, SPLIT(GET_JSON_OBJECT(page_spec,'$.wechat_mini_program_spec.mini_program_path'),'rootSourceId%3D')[1] AS rootsourceid, SPLIT(SPLIT(GET_JSON_OBJECT(page_spec,'$.wechat_mini_program_spec.mini_program_path'),'rootSourceId%3D')[1],'_')[3] AS videoid FROM loghubods.ad_put_tencent_creative_components a LEFT JOIN loghubods.ad_put_tencent_creative_day b ON a.creative_id = b.creative_id WHERE page_type = 'PAGE_TYPE_WECHAT_MINI_PROGRAM' ) c ON c.rootsourceid = t.rootsourceid WHERE t.dt = '{bizdate}' ) t LEFT JOIN loghubods.ad_put_tencent_creative_day a ON t.creative_name = a.creative_name LEFT JOIN loghubods.ad_put_tencent_ad c ON a.ad_id = c.ad_id LEFT JOIN ( SELECT creative_id, valid_click_count, view_count, cost, conversions_count FROM ( SELECT creative_id, valid_click_count, view_count, cost, conversions_count, ROW_NUMBER() OVER (PARTITION BY creative_id ORDER BY update_time DESC) AS rank FROM loghubods.ad_put_tencent_creative_data_day WHERE dt = '{bizdate_dash}' ) t WHERE rank = 1 ) b ON a.creative_id = b.creative_id LEFT JOIN ( SELECT account_id, MAX(agent_name) AS agent_name FROM loghubods.ad_put_tencent_account GROUP BY account_id ) d ON a.account_id = d.account_id LEFT JOIN ( SELECT t1.ad_id, t1.package_id, t1.package_name, t1.min_people FROM ( SELECT a.ad_id, a.package_id, b.package_name, b.min_people, ROW_NUMBER() OVER (PARTITION BY a.ad_id ORDER BY CAST(b.min_people AS BIGINT) ASC) AS rank FROM loghubods.ad_put_tencent_ad_package_mapping a LEFT JOIN loghubods.ad_put_tencent_package b ON a.package_id = b.tencent_audience_id WHERE a.is_delete = 0 ) t1 WHERE t1.rank = 1 ) t1 ON a.ad_id = t1.ad_id LEFT JOIN loghubods.dim_ad_event_enum t2 ON c.optimization_goal = t2.event_id WHERE t.dt = '{bizdate}' AND (t.总回流人数 >= 30 OR a.account_id IS NOT NULL) """ # 其他查询类型占位模板(后续按需填充真实 SQL) _SQL_TEMPLATES: Dict[str, str] = { "account_summary": """ SELECT stat_date AS date, SUM(cost) AS cost, SUM(impression) AS impression, SUM(click) AS click, SUM(conversion) AS conversion, SUM(click) / NULLIF(SUM(impression), 0) AS ctr, SUM(conversion) / NULLIF(SUM(click), 0) AS cvr, SUM(cost) / NULLIF(SUM(conversion), 0) AS cpa FROM ad_report_daily WHERE stat_date >= '{start_date}' AND stat_date <= '{end_date}' GROUP BY stat_date ORDER BY stat_date DESC LIMIT {limit} """, "ad_detail": """ SELECT stat_date AS date, adgroup_id, SUM(cost) AS cost, SUM(impression) AS impression, SUM(click) AS click, SUM(conversion) AS conversion, SUM(click) / NULLIF(SUM(impression), 0) AS ctr, SUM(cost) / NULLIF(SUM(click), 0) AS cpc, SUM(cost) / NULLIF(SUM(conversion), 0) AS cpa FROM ad_report_daily WHERE stat_date >= '{start_date}' AND stat_date <= '{end_date}' GROUP BY stat_date, adgroup_id ORDER BY cost DESC LIMIT {limit} """, "audience_analysis": """ SELECT audience_id, audience_name, SUM(cost) AS cost, SUM(conversion) AS conversion, SUM(cost) / NULLIF(SUM(conversion), 0) AS cpa, SUM(conversion) / NULLIF(SUM(click), 0) AS cvr FROM ad_audience_report WHERE stat_date >= '{start_date}' AND stat_date <= '{end_date}' GROUP BY audience_id, audience_name ORDER BY cpa ASC LIMIT {limit} """, "creative_performance": """ SELECT stat_date AS date, dynamic_creative_id AS creative_id, SUM(cost) AS cost, SUM(impression) AS impression, SUM(click) AS click, SUM(click) / NULLIF(SUM(impression), 0) AS ctr, SUM(conversion) / NULLIF(SUM(click), 0) AS cvr, SUM(cost) / NULLIF(SUM(conversion), 0) AS cpa FROM ad_creative_report_daily WHERE stat_date >= '{start_date}' AND stat_date <= '{end_date}' GROUP BY stat_date, dynamic_creative_id ORDER BY cost DESC LIMIT {limit} """, "cost_trend": """ SELECT stat_date AS date, SUM(cost) AS cost, SUM(conversion) AS conversion, SUM(cost) / NULLIF(SUM(conversion), 0) AS cpa FROM ad_report_daily WHERE stat_date >= '{start_date}' AND stat_date <= '{end_date}' GROUP BY stat_date ORDER BY stat_date ASC LIMIT {limit} """, "hourly_distribution": """ SELECT stat_hour AS hour, SUM(cost) AS cost, SUM(click) AS click, SUM(conversion) AS conversion FROM ad_report_hourly WHERE stat_date = '{start_date}' GROUP BY stat_hour ORDER BY stat_hour ASC LIMIT 24 """, } def _bizdate_from_params(params: Dict[str, Any]) -> str: """从 date_range 提取 bizdate(YYYYMMDD)。支持 'yesterday' 关键字。""" date_range = params.get("date_range", {}) bizdate = date_range.get("bizdate") or date_range.get("start_date", "") if bizdate in ("yesterday", ""): bizdate = (datetime.now() - timedelta(days=1)).strftime("%Y%m%d") # 如果传入的是 YYYY-MM-DD 格式,转成 YYYYMMDD bizdate = bizdate.replace("-", "") return bizdate def _build_sql(query_type: str, params: Dict[str, Any]) -> str: """根据查询类型和参数构建 SQL。""" # creative_detail 走独立模板,使用 bizdate(YYYYMMDD) if query_type == "creative_detail": bizdate = _bizdate_from_params(params) bizdate_dash = f"{bizdate[:4]}-{bizdate[4:6]}-{bizdate[6:]}" return _SQL_CREATIVE_DETAIL.format(bizdate=bizdate, bizdate_dash=bizdate_dash).strip() template = _SQL_TEMPLATES.get(query_type) if not template: raise ValueError(f"不支持的 query_type: {query_type},可选: creative_detail, " + ", ".join(_SQL_TEMPLATES.keys())) date_range = params.get("date_range", {}) start_date = date_range.get("start_date", "") end_date = date_range.get("end_date", start_date) today = datetime.now().strftime("%Y-%m-%d") start_date = start_date.replace("today", today) end_date = end_date.replace("today", today) return template.format( start_date=start_date, end_date=end_date, limit=params.get("limit", 100), ).strip() def _query_warehouse(sql_or_desc: str, params: Dict[str, Any]) -> List[Dict[str, Any]]: """ 连接 ODPS 数据仓库执行查询。 如果 ODPS 客户端不可用(依赖未安装/配置缺失),返回空列表并记录警告。 """ client = _get_odps_client() if client is None: logger.warning( "ODPS 客户端不可用,data_query 将返回空结果。" "请确认已安装 pyodps 并配置正确的访问凭证。" ) raise NotImplementedError( "ODPS 客户端未初始化。请确认:\n" "1. 已安装 pyodps:pip install pyodps\n" "2. examples/autoput/odps_module.py 中的凭证配置正确\n" f"查询类型: {sql_or_desc}\n参数: {params}" ) try: sql = _build_sql(sql_or_desc, params) logger.debug("执行 ODPS SQL:\n%s", sql) df = client.execute_sql(sql) return df.to_dict(orient="records") except ValueError as e: raise NotImplementedError(str(e)) from e except Exception as e: logger.error("ODPS 查询失败: %s", e) raise @tool( description="查询投放数据仓库,支持账户汇总/广告明细/人群分析/素材效果/成本趋势等多种查询类型", hidden_params=["context"], ) async def data_query( query_type: str, date_range: Dict[str, str], dimensions: Optional[List[str]] = None, metrics: Optional[List[str]] = None, filters: Optional[Dict[str, Any]] = None, order_by: Optional[str] = None, limit: int = 100, context: Optional[ToolContext] = None, ) -> ToolResult: """从数据仓库查询投放数据,支持多维度聚合分析。 query_type 可选值: - account_summary: 账户整体消耗汇总(按日) - ad_detail: 广告明细(消耗/点击/转化,按广告ID) - audience_analysis: 人群效果分析(各人群包的转化率/成本) - creative_performance: 素材效果(CTR/CVR/消耗,按创意ID) - cost_trend: 成本趋势(按日/小时的CPA趋势) - hourly_distribution: 小时分布(各时段消耗占比) Args: query_type: 查询类型(见上方说明) date_range: {"start_date": "2026-04-01", "end_date": "2026-04-07"} dimensions: 分组维度,如 ["date", "adgroup_id", "creative_id"] metrics: 查询指标,如 ["cost", "impression", "click", "conversion", "ctr", "cvr", "cpa"] filters: 过滤条件,如 {"adgroup_id": [123, 456], "status": "ACTIVE"} order_by: 排序字段,如 "cost DESC" limit: 返回条数上限 """ default_metrics = { "account_summary": ["cost", "impression", "click", "conversion", "ctr", "cvr", "cpa"], "ad_detail": ["cost", "impression", "click", "conversion", "ctr", "cpc", "cpa", "roi"], "audience_analysis": ["cost", "conversion", "cpa", "cvr", "reach"], "creative_performance": ["cost", "impression", "click", "ctr", "cvr", "cpa"], "cost_trend": ["cost", "cpa", "conversion"], "hourly_distribution": ["cost", "click", "conversion"], } actual_metrics = metrics or default_metrics.get(query_type, ["cost", "click", "conversion"]) actual_dimensions = dimensions or ["date"] params = { "query_type": query_type, "date_range": date_range, "dimensions": actual_dimensions, "metrics": actual_metrics, "filters": filters or {}, "order_by": order_by or f"{actual_metrics[0]} DESC", "limit": limit, } try: rows = _query_warehouse(query_type, params) if not rows: return ToolResult(title=f"数据查询({query_type})", output="该条件下无数据") # 格式化结果为可读文本 header = " | ".join(actual_dimensions + actual_metrics) lines = [header, "-" * len(header)] for row in rows[:20]: values = [] for col in actual_dimensions + actual_metrics: v = row.get(col, "-") if col == "cost" and isinstance(v, (int, float)): v = f"{v/100:.2f}元" elif col in ("ctr", "cvr") and isinstance(v, (int, float)): v = f"{v:.2%}" elif col == "cpa" and isinstance(v, (int, float)): v = f"{v/100:.2f}元" values.append(str(v)) lines.append(" | ".join(values)) if len(rows) > 20: lines.append(f"...共 {len(rows)} 条,仅显示前20条") output = "\n".join(lines) return ToolResult( title=f"数据查询({query_type},{len(rows)}条)", output=output, metadata={"rows": rows, "query_params": params}, ) except NotImplementedError as e: return ToolResult(title="data_query 未实现", output=str(e)) except Exception as e: logger.error("data_query 失败: %s", e) return ToolResult(title="data_query 失败", output=str(e)) @tool(description="数据聚合分析:对查询结果进行趋势分析、环比/同比对比") async def data_aggregate( query_type: str, date_range: Dict[str, str], aggregation: str = "trend", compare_type: Optional[str] = None, filters: Optional[Dict[str, Any]] = None, ) -> ToolResult: """对投放数据进行聚合分析。 Args: query_type: 基础查询类型(同 data_query 的 query_type) date_range: 分析时间范围 aggregation: 聚合方式 "trend"(趋势)/ "sum"(汇总)/ "compare"(对比) compare_type: 对比方式 "day_over_day"(日环比)/ "week_over_week"(周同比) filters: 过滤条件 """ try: # 查询当前期数据 current_params = { "query_type": query_type, "date_range": date_range, "dimensions": ["date"], "metrics": ["cost", "conversion", "cpa", "ctr"], "filters": filters or {}, "order_by": "date ASC", "limit": 90, } current_rows = _query_warehouse(f"{query_type}_aggregate", current_params) total_cost = sum(r.get("cost", 0) for r in current_rows) / 100 total_conv = sum(r.get("conversion", 0) for r in current_rows) avg_cpa = (total_cost * 100 / total_conv) if total_conv > 0 else 0 output_lines = [ f"【{aggregation} 分析】{date_range['start_date']} ~ {date_range['end_date']}", f"总消耗: {total_cost:.2f} 元 | 总转化: {total_conv} | 平均CPA: {avg_cpa/100:.2f} 元", ] if compare_type and current_rows: output_lines.append(f"({compare_type} 对比数据需实现历史周期查询)") return ToolResult(title="数据聚合分析", output="\n".join(output_lines), metadata={"rows": current_rows}) except NotImplementedError as e: return ToolResult(title="data_aggregate 未实现", output=str(e)) except Exception as e: return ToolResult(title="data_aggregate 失败", output=str(e)) @tool(description="查询广告当前状态(出价、预算、定向等)") async def get_ad_current_status( account_id: int, ad_ids: Optional[List[int]] = None, context: Optional[ToolContext] = None, ) -> ToolResult: """ 从 ODPS ad_put_tencent_ad 表查询广告当前状态 Args: account_id: 账户ID ad_ids: 广告ID列表(可选,不传则查询账户下所有正常广告) Returns: ToolResult: 包含广告状态列表 """ try: client = _get_odps_client() if not client: return ToolResult(title="get_ad_current_status 失败", output="ODPS 客户端未初始化") sql = f""" SELECT ad_id, ad_name, account_id, bid_amount, day_amount, ad_status, optimization_goal, targeting, create_time FROM loghubods.ad_put_tencent_ad WHERE account_id = {account_id} AND ad_status = 'AD_STATUS_NORMAL' """ if ad_ids: # 过滤掉 None 和 NaN 值 valid_ad_ids = [int(ad_id) for ad_id in ad_ids if ad_id is not None and str(ad_id) != 'nan'] if valid_ad_ids: ad_ids_str = ",".join(map(str, valid_ad_ids)) sql += f" AND ad_id IN ({ad_ids_str})" logger.info("执行 SQL: %s", sql) df = client.execute_sql(sql) # 转换为字典列表 data = df.to_dict('records') output = f"查询到 {len(data)} 个广告的当前状态" return ToolResult(title="广告当前状态", output=output, metadata={"rows": data}) except Exception as e: logger.error("get_ad_current_status 失败: %s", e, exc_info=True) return ToolResult(title="get_ad_current_status 失败", output=str(e))