data_query.py 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486
  1. """
  2. 数据查询工具 — 投放数据仓库查询
  3. 对接 ODPS(MaxCompute)数据仓库,支持多维度查询和聚合分析。
  4. ODPS 客户端位于 examples/autoput/odps_module.py。
  5. 环境变量(可选,覆盖 odps_module.py 中的默认配置):
  6. ODPS_ACCESS_ID 阿里云 AccessKey ID
  7. ODPS_ACCESS_SECRET 阿里云 AccessKey Secret
  8. ODPS_PROJECT MaxCompute 项目名,默认 loghubods
  9. """
  10. import logging
  11. import os
  12. from datetime import datetime, timedelta
  13. from typing import Any, Dict, List, Optional
  14. from agent.tools import tool
  15. from agent.tools.models import ToolContext, ToolResult
  16. logger = logging.getLogger(__name__)
  17. # ===== ODPS 客户端(懒加载,避免导入时失败) =====
  18. _odps_client = None
  19. def _get_odps_client():
  20. """懒加载 ODPS 客户端,首次调用时初始化。"""
  21. global _odps_client
  22. if _odps_client is None:
  23. try:
  24. # 使用相对于项目根目录的 autoput 目录下的 odps_module
  25. import sys
  26. from pathlib import Path
  27. # 将 autoput 目录加入 sys.path,以便直接 import odps_module
  28. autoput_dir = Path(__file__).parent.parent
  29. if str(autoput_dir) not in sys.path:
  30. sys.path.insert(0, str(autoput_dir))
  31. from odps_module import ODPSClient
  32. project = os.getenv("ODPS_PROJECT", "loghubods")
  33. _odps_client = ODPSClient(project=project)
  34. logger.info("ODPS 客户端已初始化,项目: %s", project)
  35. except ImportError as e:
  36. logger.warning("odps_module 导入失败(可能缺少 pyodps 依赖): %s", e)
  37. _odps_client = None
  38. except Exception as e:
  39. logger.warning("ODPS 客户端初始化失败: %s", e)
  40. _odps_client = None
  41. return _odps_client
  42. # ===== SQL 模板 =====
  43. # creative_detail: 小程序投放创意维度天级表现数据(来自 xcx_touliu_creative_detail)
  44. # bizdate 格式: YYYYMMDD
  45. _SQL_CREATIVE_DETAIL = """
  46. SELECT
  47. a.account_id,
  48. a.ad_id,
  49. c.ad_name,
  50. SPLIT(t.rootsourceid,'_')[3] AS videoid,
  51. t.creative_name,
  52. t.rootsourceid,
  53. t.总回流人数,
  54. t.总收入,
  55. b.cost/100 AS cost,
  56. b.valid_click_count,
  57. b.cost/100 / t.首层小程序打开数 AS cpa_open,
  58. b.cost/100 / b.valid_click_count AS cpc,
  59. t.首层小程序打开数,
  60. t.首层小程序打开数 / b.valid_click_count AS open_rate,
  61. t.裂变层回流数,
  62. t.裂变0层回流数,
  63. t.裂变0层回流数 / t.首层小程序打开数 AS fission0_rate,
  64. t.裂变1层回流数,
  65. t.裂变1层回流数 / t.首层小程序打开数 AS fission1_rate,
  66. d.agent_name,
  67. t1.package_name,
  68. CASE WHEN t2.event_name IS NULL THEN '无' ELSE t2.event_name END AS 广告优化目标
  69. FROM (
  70. SELECT
  71. IF(c.creative_name IS NOT NULL, c.creative_name, t.rootsourceid) AS creative_name,
  72. t.*
  73. FROM loghubods.touliu_data t
  74. LEFT JOIN (
  75. SELECT DISTINCT
  76. creative_name,
  77. SPLIT(GET_JSON_OBJECT(page_spec,'$.wechat_mini_program_spec.mini_program_path'),'rootSourceId%3D')[1] AS rootsourceid,
  78. SPLIT(SPLIT(GET_JSON_OBJECT(page_spec,'$.wechat_mini_program_spec.mini_program_path'),'rootSourceId%3D')[1],'_')[3] AS videoid
  79. FROM loghubods.ad_put_tencent_creative_components a
  80. LEFT JOIN loghubods.ad_put_tencent_creative_day b ON a.creative_id = b.creative_id
  81. WHERE page_type = 'PAGE_TYPE_WECHAT_MINI_PROGRAM'
  82. ) c ON c.rootsourceid = t.rootsourceid
  83. WHERE t.dt = '{bizdate}'
  84. ) t
  85. LEFT JOIN loghubods.ad_put_tencent_creative_day a ON t.creative_name = a.creative_name
  86. LEFT JOIN loghubods.ad_put_tencent_ad c ON a.ad_id = c.ad_id
  87. LEFT JOIN (
  88. SELECT creative_id, valid_click_count, view_count, cost, conversions_count
  89. FROM (
  90. SELECT
  91. creative_id, valid_click_count, view_count, cost, conversions_count,
  92. ROW_NUMBER() OVER (PARTITION BY creative_id ORDER BY update_time DESC) AS rank
  93. FROM loghubods.ad_put_tencent_creative_data_day
  94. WHERE dt = '{bizdate_dash}'
  95. ) t WHERE rank = 1
  96. ) b ON a.creative_id = b.creative_id
  97. LEFT JOIN (
  98. SELECT account_id, MAX(agent_name) AS agent_name
  99. FROM loghubods.ad_put_tencent_account
  100. GROUP BY account_id
  101. ) d ON a.account_id = d.account_id
  102. LEFT JOIN (
  103. SELECT t1.ad_id, t1.package_id, t1.package_name, t1.min_people
  104. FROM (
  105. SELECT
  106. a.ad_id, a.package_id, b.package_name, b.min_people,
  107. ROW_NUMBER() OVER (PARTITION BY a.ad_id ORDER BY CAST(b.min_people AS BIGINT) ASC) AS rank
  108. FROM loghubods.ad_put_tencent_ad_package_mapping a
  109. LEFT JOIN loghubods.ad_put_tencent_package b ON a.package_id = b.tencent_audience_id
  110. WHERE a.is_delete = 0
  111. ) t1 WHERE t1.rank = 1
  112. ) t1 ON a.ad_id = t1.ad_id
  113. LEFT JOIN loghubods.dim_ad_event_enum t2 ON c.optimization_goal = t2.event_id
  114. WHERE t.dt = '{bizdate}'
  115. AND (t.总回流人数 >= 30 OR a.account_id IS NOT NULL)
  116. """
  117. # 其他查询类型占位模板(后续按需填充真实 SQL)
  118. _SQL_TEMPLATES: Dict[str, str] = {
  119. "account_summary": """
  120. SELECT
  121. stat_date AS date,
  122. SUM(cost) AS cost,
  123. SUM(impression) AS impression,
  124. SUM(click) AS click,
  125. SUM(conversion) AS conversion,
  126. SUM(click) / NULLIF(SUM(impression), 0) AS ctr,
  127. SUM(conversion) / NULLIF(SUM(click), 0) AS cvr,
  128. SUM(cost) / NULLIF(SUM(conversion), 0) AS cpa
  129. FROM ad_report_daily
  130. WHERE stat_date >= '{start_date}' AND stat_date <= '{end_date}'
  131. GROUP BY stat_date
  132. ORDER BY stat_date DESC
  133. LIMIT {limit}
  134. """,
  135. "ad_detail": """
  136. SELECT
  137. stat_date AS date,
  138. adgroup_id,
  139. SUM(cost) AS cost,
  140. SUM(impression) AS impression,
  141. SUM(click) AS click,
  142. SUM(conversion) AS conversion,
  143. SUM(click) / NULLIF(SUM(impression), 0) AS ctr,
  144. SUM(cost) / NULLIF(SUM(click), 0) AS cpc,
  145. SUM(cost) / NULLIF(SUM(conversion), 0) AS cpa
  146. FROM ad_report_daily
  147. WHERE stat_date >= '{start_date}' AND stat_date <= '{end_date}'
  148. GROUP BY stat_date, adgroup_id
  149. ORDER BY cost DESC
  150. LIMIT {limit}
  151. """,
  152. "audience_analysis": """
  153. SELECT
  154. audience_id,
  155. audience_name,
  156. SUM(cost) AS cost,
  157. SUM(conversion) AS conversion,
  158. SUM(cost) / NULLIF(SUM(conversion), 0) AS cpa,
  159. SUM(conversion) / NULLIF(SUM(click), 0) AS cvr
  160. FROM ad_audience_report
  161. WHERE stat_date >= '{start_date}' AND stat_date <= '{end_date}'
  162. GROUP BY audience_id, audience_name
  163. ORDER BY cpa ASC
  164. LIMIT {limit}
  165. """,
  166. "creative_performance": """
  167. SELECT
  168. stat_date AS date,
  169. dynamic_creative_id AS creative_id,
  170. SUM(cost) AS cost,
  171. SUM(impression) AS impression,
  172. SUM(click) AS click,
  173. SUM(click) / NULLIF(SUM(impression), 0) AS ctr,
  174. SUM(conversion) / NULLIF(SUM(click), 0) AS cvr,
  175. SUM(cost) / NULLIF(SUM(conversion), 0) AS cpa
  176. FROM ad_creative_report_daily
  177. WHERE stat_date >= '{start_date}' AND stat_date <= '{end_date}'
  178. GROUP BY stat_date, dynamic_creative_id
  179. ORDER BY cost DESC
  180. LIMIT {limit}
  181. """,
  182. "cost_trend": """
  183. SELECT
  184. stat_date AS date,
  185. SUM(cost) AS cost,
  186. SUM(conversion) AS conversion,
  187. SUM(cost) / NULLIF(SUM(conversion), 0) AS cpa
  188. FROM ad_report_daily
  189. WHERE stat_date >= '{start_date}' AND stat_date <= '{end_date}'
  190. GROUP BY stat_date
  191. ORDER BY stat_date ASC
  192. LIMIT {limit}
  193. """,
  194. "hourly_distribution": """
  195. SELECT
  196. stat_hour AS hour,
  197. SUM(cost) AS cost,
  198. SUM(click) AS click,
  199. SUM(conversion) AS conversion
  200. FROM ad_report_hourly
  201. WHERE stat_date = '{start_date}'
  202. GROUP BY stat_hour
  203. ORDER BY stat_hour ASC
  204. LIMIT 24
  205. """,
  206. }
  207. def _bizdate_from_params(params: Dict[str, Any]) -> str:
  208. """从 date_range 提取 bizdate(YYYYMMDD)。支持 'yesterday' 关键字。"""
  209. date_range = params.get("date_range", {})
  210. bizdate = date_range.get("bizdate") or date_range.get("start_date", "")
  211. if bizdate in ("yesterday", ""):
  212. bizdate = (datetime.now() - timedelta(days=1)).strftime("%Y%m%d")
  213. # 如果传入的是 YYYY-MM-DD 格式,转成 YYYYMMDD
  214. bizdate = bizdate.replace("-", "")
  215. return bizdate
  216. def _build_sql(query_type: str, params: Dict[str, Any]) -> str:
  217. """根据查询类型和参数构建 SQL。"""
  218. # creative_detail 走独立模板,使用 bizdate(YYYYMMDD)
  219. if query_type == "creative_detail":
  220. bizdate = _bizdate_from_params(params)
  221. bizdate_dash = f"{bizdate[:4]}-{bizdate[4:6]}-{bizdate[6:]}"
  222. return _SQL_CREATIVE_DETAIL.format(bizdate=bizdate, bizdate_dash=bizdate_dash).strip()
  223. template = _SQL_TEMPLATES.get(query_type)
  224. if not template:
  225. raise ValueError(f"不支持的 query_type: {query_type},可选: creative_detail, " + ", ".join(_SQL_TEMPLATES.keys()))
  226. date_range = params.get("date_range", {})
  227. start_date = date_range.get("start_date", "")
  228. end_date = date_range.get("end_date", start_date)
  229. today = datetime.now().strftime("%Y-%m-%d")
  230. start_date = start_date.replace("today", today)
  231. end_date = end_date.replace("today", today)
  232. return template.format(
  233. start_date=start_date,
  234. end_date=end_date,
  235. limit=params.get("limit", 100),
  236. ).strip()
  237. def _query_warehouse(sql_or_desc: str, params: Dict[str, Any]) -> List[Dict[str, Any]]:
  238. """
  239. 连接 ODPS 数据仓库执行查询。
  240. 如果 ODPS 客户端不可用(依赖未安装/配置缺失),返回空列表并记录警告。
  241. """
  242. client = _get_odps_client()
  243. if client is None:
  244. logger.warning(
  245. "ODPS 客户端不可用,data_query 将返回空结果。"
  246. "请确认已安装 pyodps 并配置正确的访问凭证。"
  247. )
  248. raise NotImplementedError(
  249. "ODPS 客户端未初始化。请确认:\n"
  250. "1. 已安装 pyodps:pip install pyodps\n"
  251. "2. examples/autoput/odps_module.py 中的凭证配置正确\n"
  252. f"查询类型: {sql_or_desc}\n参数: {params}"
  253. )
  254. try:
  255. sql = _build_sql(sql_or_desc, params)
  256. logger.debug("执行 ODPS SQL:\n%s", sql)
  257. df = client.execute_sql(sql)
  258. return df.to_dict(orient="records")
  259. except ValueError as e:
  260. raise NotImplementedError(str(e)) from e
  261. except Exception as e:
  262. logger.error("ODPS 查询失败: %s", e)
  263. raise
  264. @tool(
  265. description="查询投放数据仓库,支持账户汇总/广告明细/人群分析/素材效果/成本趋势等多种查询类型",
  266. hidden_params=["context"],
  267. )
  268. async def data_query(
  269. query_type: str,
  270. date_range: Dict[str, str],
  271. dimensions: Optional[List[str]] = None,
  272. metrics: Optional[List[str]] = None,
  273. filters: Optional[Dict[str, Any]] = None,
  274. order_by: Optional[str] = None,
  275. limit: int = 100,
  276. context: Optional[ToolContext] = None,
  277. ) -> ToolResult:
  278. """从数据仓库查询投放数据,支持多维度聚合分析。
  279. query_type 可选值:
  280. - account_summary: 账户整体消耗汇总(按日)
  281. - ad_detail: 广告明细(消耗/点击/转化,按广告ID)
  282. - audience_analysis: 人群效果分析(各人群包的转化率/成本)
  283. - creative_performance: 素材效果(CTR/CVR/消耗,按创意ID)
  284. - cost_trend: 成本趋势(按日/小时的CPA趋势)
  285. - hourly_distribution: 小时分布(各时段消耗占比)
  286. Args:
  287. query_type: 查询类型(见上方说明)
  288. date_range: {"start_date": "2026-04-01", "end_date": "2026-04-07"}
  289. dimensions: 分组维度,如 ["date", "adgroup_id", "creative_id"]
  290. metrics: 查询指标,如 ["cost", "impression", "click", "conversion", "ctr", "cvr", "cpa"]
  291. filters: 过滤条件,如 {"adgroup_id": [123, 456], "status": "ACTIVE"}
  292. order_by: 排序字段,如 "cost DESC"
  293. limit: 返回条数上限
  294. """
  295. default_metrics = {
  296. "account_summary": ["cost", "impression", "click", "conversion", "ctr", "cvr", "cpa"],
  297. "ad_detail": ["cost", "impression", "click", "conversion", "ctr", "cpc", "cpa", "roi"],
  298. "audience_analysis": ["cost", "conversion", "cpa", "cvr", "reach"],
  299. "creative_performance": ["cost", "impression", "click", "ctr", "cvr", "cpa"],
  300. "cost_trend": ["cost", "cpa", "conversion"],
  301. "hourly_distribution": ["cost", "click", "conversion"],
  302. }
  303. actual_metrics = metrics or default_metrics.get(query_type, ["cost", "click", "conversion"])
  304. actual_dimensions = dimensions or ["date"]
  305. params = {
  306. "query_type": query_type,
  307. "date_range": date_range,
  308. "dimensions": actual_dimensions,
  309. "metrics": actual_metrics,
  310. "filters": filters or {},
  311. "order_by": order_by or f"{actual_metrics[0]} DESC",
  312. "limit": limit,
  313. }
  314. try:
  315. rows = _query_warehouse(query_type, params)
  316. if not rows:
  317. return ToolResult(title=f"数据查询({query_type})", output="该条件下无数据")
  318. # 格式化结果为可读文本
  319. header = " | ".join(actual_dimensions + actual_metrics)
  320. lines = [header, "-" * len(header)]
  321. for row in rows[:20]:
  322. values = []
  323. for col in actual_dimensions + actual_metrics:
  324. v = row.get(col, "-")
  325. if col == "cost" and isinstance(v, (int, float)):
  326. v = f"{v/100:.2f}元"
  327. elif col in ("ctr", "cvr") and isinstance(v, (int, float)):
  328. v = f"{v:.2%}"
  329. elif col == "cpa" and isinstance(v, (int, float)):
  330. v = f"{v/100:.2f}元"
  331. values.append(str(v))
  332. lines.append(" | ".join(values))
  333. if len(rows) > 20:
  334. lines.append(f"...共 {len(rows)} 条,仅显示前20条")
  335. output = "\n".join(lines)
  336. return ToolResult(
  337. title=f"数据查询({query_type},{len(rows)}条)",
  338. output=output,
  339. metadata={"rows": rows, "query_params": params},
  340. )
  341. except NotImplementedError as e:
  342. return ToolResult(title="data_query 未实现", output=str(e))
  343. except Exception as e:
  344. logger.error("data_query 失败: %s", e)
  345. return ToolResult(title="data_query 失败", output=str(e))
  346. @tool(description="数据聚合分析:对查询结果进行趋势分析、环比/同比对比")
  347. async def data_aggregate(
  348. query_type: str,
  349. date_range: Dict[str, str],
  350. aggregation: str = "trend",
  351. compare_type: Optional[str] = None,
  352. filters: Optional[Dict[str, Any]] = None,
  353. ) -> ToolResult:
  354. """对投放数据进行聚合分析。
  355. Args:
  356. query_type: 基础查询类型(同 data_query 的 query_type)
  357. date_range: 分析时间范围
  358. aggregation: 聚合方式 "trend"(趋势)/ "sum"(汇总)/ "compare"(对比)
  359. compare_type: 对比方式 "day_over_day"(日环比)/ "week_over_week"(周同比)
  360. filters: 过滤条件
  361. """
  362. try:
  363. # 查询当前期数据
  364. current_params = {
  365. "query_type": query_type,
  366. "date_range": date_range,
  367. "dimensions": ["date"],
  368. "metrics": ["cost", "conversion", "cpa", "ctr"],
  369. "filters": filters or {},
  370. "order_by": "date ASC",
  371. "limit": 90,
  372. }
  373. current_rows = _query_warehouse(f"{query_type}_aggregate", current_params)
  374. total_cost = sum(r.get("cost", 0) for r in current_rows) / 100
  375. total_conv = sum(r.get("conversion", 0) for r in current_rows)
  376. avg_cpa = (total_cost * 100 / total_conv) if total_conv > 0 else 0
  377. output_lines = [
  378. f"【{aggregation} 分析】{date_range['start_date']} ~ {date_range['end_date']}",
  379. f"总消耗: {total_cost:.2f} 元 | 总转化: {total_conv} | 平均CPA: {avg_cpa/100:.2f} 元",
  380. ]
  381. if compare_type and current_rows:
  382. output_lines.append(f"({compare_type} 对比数据需实现历史周期查询)")
  383. return ToolResult(title="数据聚合分析", output="\n".join(output_lines), metadata={"rows": current_rows})
  384. except NotImplementedError as e:
  385. return ToolResult(title="data_aggregate 未实现", output=str(e))
  386. except Exception as e:
  387. return ToolResult(title="data_aggregate 失败", output=str(e))
  388. @tool(description="查询广告当前状态(出价、预算、定向等)")
  389. async def get_ad_current_status(
  390. account_id: int,
  391. ad_ids: Optional[List[int]] = None,
  392. context: Optional[ToolContext] = None,
  393. ) -> ToolResult:
  394. """
  395. 从 ODPS ad_put_tencent_ad 表查询广告当前状态
  396. Args:
  397. account_id: 账户ID
  398. ad_ids: 广告ID列表(可选,不传则查询账户下所有正常广告)
  399. Returns:
  400. ToolResult: 包含广告状态列表
  401. """
  402. try:
  403. client = _get_odps_client()
  404. if not client:
  405. return ToolResult(title="get_ad_current_status 失败", output="ODPS 客户端未初始化")
  406. sql = f"""
  407. SELECT
  408. ad_id,
  409. ad_name,
  410. account_id,
  411. bid_amount,
  412. day_amount,
  413. ad_status,
  414. optimization_goal,
  415. targeting,
  416. create_time
  417. FROM loghubods.ad_put_tencent_ad
  418. WHERE account_id = {account_id}
  419. AND ad_status = 'AD_STATUS_NORMAL'
  420. """
  421. if ad_ids:
  422. # 过滤掉 None 和 NaN 值
  423. valid_ad_ids = [int(ad_id) for ad_id in ad_ids if ad_id is not None and str(ad_id) != 'nan']
  424. if valid_ad_ids:
  425. ad_ids_str = ",".join(map(str, valid_ad_ids))
  426. sql += f" AND ad_id IN ({ad_ids_str})"
  427. logger.info("执行 SQL: %s", sql)
  428. df = client.execute_sql(sql)
  429. # 转换为字典列表
  430. data = df.to_dict('records')
  431. output = f"查询到 {len(data)} 个广告的当前状态"
  432. return ToolResult(title="广告当前状态", output=output, metadata={"rows": data})
  433. except Exception as e:
  434. logger.error("get_ad_current_status 失败: %s", e, exc_info=True)
  435. return ToolResult(title="get_ad_current_status 失败", output=str(e))