data_query.py 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554
  1. """
  2. 数据查询工具 — auto_put_ad_mini V3
  3. V3 职责:
  4. - 创意级别明细数据采集(非广告级聚合)
  5. - 30 天增量拉取(已有 CSV 的日期跳过)
  6. - 广告状态按日快照
  7. 数据来源:
  8. - 创意效率:ODPS touliu_data + creative_data_day(创意级明细)
  9. - 广告状态:ODPS ad_put_tencent_ad(出价/预算/状态)
  10. 环境变量:
  11. ODPS_ACCESS_ID / ODPS_ACCESS_SECRET / ODPS_PROJECT
  12. """
  13. import logging
  14. import os
  15. from datetime import datetime, timedelta
  16. from pathlib import Path
  17. from typing import List, Optional
  18. import pandas as pd
  19. from agent.tools import tool
  20. from agent.tools.models import ToolContext, ToolResult
  21. logger = logging.getLogger(__name__)
  22. _MINI_DIR = Path(__file__).resolve().parent.parent
  23. _RAW_DIR = _MINI_DIR / "outputs" / "raw"
  24. _AD_STATUS_DIR = _MINI_DIR / "outputs" / "ad_status"
  25. _MERGED_DIR = _MINI_DIR / "outputs" / "merged"
  26. # ===== ODPS 客户端(懒加载) =====
  27. _odps_client = None
  28. def _get_odps_client():
  29. """懒加载 ODPS 客户端,首次调用时初始化。"""
  30. global _odps_client
  31. if _odps_client is None:
  32. try:
  33. import sys
  34. autoput_ad_dir = _MINI_DIR.parent / "auto_put_ad"
  35. if str(autoput_ad_dir) not in sys.path:
  36. sys.path.insert(0, str(autoput_ad_dir))
  37. from odps_module import ODPSClient
  38. project = os.getenv("ODPS_PROJECT", "loghubods")
  39. _odps_client = ODPSClient(project=project)
  40. logger.info("ODPS 客户端已初始化,项目: %s", project)
  41. except ImportError as e:
  42. logger.warning("odps_module 导入失败: %s", e)
  43. _odps_client = None
  44. except Exception as e:
  45. logger.warning("ODPS 客户端初始化失败: %s", e)
  46. _odps_client = None
  47. return _odps_client
  48. # ===== 日期解析 =====
  49. def _parse_bizdate(bizdate: str) -> tuple:
  50. """解析业务日期,返回 (YYYYMMDD, YYYY-MM-DD)。"""
  51. if bizdate in ("yesterday", ""):
  52. biz = (datetime.now() - timedelta(days=1)).strftime("%Y%m%d")
  53. else:
  54. biz = bizdate.replace("-", "")
  55. biz_dash = f"{biz[:4]}-{biz[4:6]}-{biz[6:]}"
  56. return biz, biz_dash
  57. # ===== V3 创意级别 SQL =====
  58. def _build_creative_sql(biz: str, biz_dash: str) -> str:
  59. """
  60. 构建创意级别明细 SQL(V3)。
  61. 基于用户提供的原始 SQL,保留创意级明细(不做广告级聚合)。
  62. 返回字段:
  63. - 广告维度:account_id, ad_id, ad_name, create_time
  64. - 创意维度:creative_id, creative_name, rootsourceid, videoid, title, image_url
  65. - 业务维度:agent_name, package_name, 广告优化目标, 人群包人数
  66. - 效率指标:cost(元), valid_click_count, conversions_count,
  67. 首层小程序打开数, 裂变0层回流数, 裂变1层回流数, 总回流人数, 总收入,
  68. view_count, key_page_view_count, key_page_uv, thousand_display_price
  69. """
  70. return f"""
  71. SELECT '{biz}' AS bizdate
  72. ,a.account_id
  73. ,d.agent_name
  74. ,a.ad_id
  75. ,c.ad_name
  76. ,c.create_time
  77. ,CASE WHEN t2.event_name IS NULL THEN '无'
  78. ELSE t2.event_name
  79. END AS 广告优化目标
  80. ,t1.package_name
  81. ,t4.人群包人数
  82. ,a.creative_id
  83. ,t.creative_name
  84. ,SPLIT(t.rootsourceid,'_')[3] AS videoid
  85. ,t.rootsourceid
  86. ,t3.title
  87. ,t3.image_url
  88. ,b.view_count
  89. ,b.valid_click_count
  90. ,b.key_page_view_count
  91. ,b.key_page_uv
  92. ,b.thousand_display_price
  93. ,b.cost / 100 AS cost
  94. ,b.cost / 100 / b.valid_click_count AS 单点击成本
  95. ,b.conversions_count
  96. ,t.首层小程序打开数
  97. ,t.首层小程序打开数 / b.valid_click_count AS 点击转化率
  98. ,b.cost / 100 / t.首层小程序打开数 AS 单UV成本
  99. ,t.裂变层回流数
  100. ,t.裂变0层回流数
  101. ,t.裂变0层回流数 / t.首层小程序打开数 AS T0裂变系数
  102. ,t.裂变1层回流数
  103. ,t.裂变1层回流数 / t.首层小程序打开数 AS T1裂变系数
  104. ,t.总回流人数
  105. ,t.总收入
  106. FROM (
  107. SELECT IF(c.creative_name IS NOT NULL,c.creative_name,t.rootsourceid) AS creative_name
  108. ,t.*
  109. FROM loghubods.touliu_data t
  110. LEFT JOIN (
  111. SELECT DISTINCT creative_name
  112. ,SPLIT(GET_JSON_OBJECT(page_spec,'$.wechat_mini_program_spec.mini_program_path'),'rootSourceId%3D')[1] AS rootsourceid
  113. ,SPLIT(SPLIT(GET_JSON_OBJECT(page_spec,'$.wechat_mini_program_spec.mini_program_path'),'rootSourceId%3D')[1],'_')[3] AS videoid
  114. ,b.creative_id
  115. FROM loghubods.ad_put_tencent_creative_components a
  116. LEFT JOIN loghubods.ad_put_tencent_creative_day b
  117. ON a.creative_id = b.creative_id
  118. WHERE page_type = 'PAGE_TYPE_WECHAT_MINI_PROGRAM'
  119. ) c
  120. ON c.rootsourceid = t.rootsourceid
  121. WHERE t.dt = '{biz}'
  122. ) t
  123. LEFT JOIN loghubods.ad_put_tencent_creative_day a
  124. ON t.creative_name = a.creative_name
  125. LEFT JOIN loghubods.ad_put_tencent_ad c
  126. ON a.ad_id = c.ad_id
  127. LEFT JOIN (
  128. SELECT creative_id
  129. ,valid_click_count
  130. ,view_count
  131. ,cost
  132. ,conversions_count
  133. ,key_page_view_count
  134. ,key_page_uv
  135. ,thousand_display_price
  136. FROM (
  137. SELECT creative_id
  138. ,valid_click_count
  139. ,view_count
  140. ,cost
  141. ,conversions_count
  142. ,key_page_view_count
  143. ,key_page_uv
  144. ,thousand_display_price
  145. ,ROW_NUMBER() OVER (PARTITION BY creative_id ORDER BY update_time DESC ) AS rank
  146. FROM loghubods.ad_put_tencent_creative_data_day
  147. WHERE dt = REGEXP_REPLACE('{biz}','^(\\\\d{{4}})(\\\\d{{2}})(\\\\d{{2}})$','\\\\1-\\\\2-\\\\3')
  148. ) t
  149. WHERE rank = 1
  150. ) b
  151. ON a.creative_id = b.creative_id
  152. LEFT JOIN (
  153. SELECT account_id
  154. ,MAX(agent_name) AS agent_name
  155. FROM loghubods.ad_put_tencent_account
  156. GROUP BY account_id
  157. ) d
  158. ON a.account_id = d.account_id
  159. LEFT JOIN (
  160. SELECT t1.ad_id
  161. ,t1.package_id
  162. ,t1.package_name
  163. ,t1.min_people
  164. FROM (
  165. SELECT a.ad_id
  166. ,a.package_id
  167. ,b.package_name
  168. ,b.min_people
  169. ,ROW_NUMBER() OVER (PARTITION BY a.ad_id ORDER BY CAST(b.min_people AS BIGINT) ASC ) AS rank
  170. FROM loghubods.ad_put_tencent_ad_package_mapping a
  171. LEFT JOIN loghubods.ad_put_tencent_package b
  172. ON a.package_id = b.tencent_audience_id
  173. WHERE a.is_delete = 0
  174. ) t1
  175. WHERE t1.rank = 1
  176. ) t1
  177. ON a.ad_id = t1.ad_id
  178. LEFT JOIN loghubods.dim_ad_event_enum t2
  179. ON c.optimization_goal = t2.event_id
  180. LEFT JOIN loghubods.ad_put_tencent_creative_analysis t3
  181. ON a.creative_id = t3.creative_id
  182. LEFT JOIN (
  183. SELECT type
  184. ,COUNT(DISTINCT union_id) AS 人群包人数
  185. FROM loghubods.mid_share_return_people_1year
  186. WHERE dt = MAX_PT('loghubods.mid_share_return_people_1year')
  187. GROUP BY type
  188. ) t4
  189. ON t1.package_name = t4.type
  190. WHERE t.dt = '{biz}'
  191. AND (
  192. 总回流人数 >= 30
  193. OR a.account_id IS NOT NULL
  194. )
  195. """
  196. def _fetch_creative_data(bizdate: str) -> Optional[pd.DataFrame]:
  197. """拉取单日创意级别数据。"""
  198. client = _get_odps_client()
  199. if client is None:
  200. logger.error("ODPS 客户端未初始化")
  201. return None
  202. biz, biz_dash = _parse_bizdate(bizdate)
  203. sql = _build_creative_sql(biz, biz_dash)
  204. try:
  205. logger.info("开始拉取创意数据: %s", biz)
  206. df = client.execute_sql(sql)
  207. if df.empty:
  208. logger.warning("创意数据为空: %s", biz)
  209. return pd.DataFrame()
  210. # 类型转换(cost 已经是元,不需要再除以100)
  211. for col in ["cost", "单UV成本", "单点击成本", "总收入"]:
  212. if col in df.columns:
  213. df[col] = pd.to_numeric(df[col], errors="coerce")
  214. for col in ["首层小程序打开数", "裂变层回流数", "裂变0层回流数", "裂变1层回流数",
  215. "总回流人数", "valid_click_count", "view_count",
  216. "key_page_view_count", "key_page_uv", "conversions_count"]:
  217. if col in df.columns:
  218. df[col] = pd.to_numeric(df[col], errors="coerce").fillna(0).astype(int)
  219. logger.info("创意数据拉取成功: %s, %d 行", biz, len(df))
  220. return df
  221. except Exception as e:
  222. logger.error("创意数据拉取失败 (%s): %s", biz, e, exc_info=True)
  223. return None
  224. # ===== 广告状态拉取 =====
  225. def _fetch_ad_status(bizdate: str) -> Optional[pd.DataFrame]:
  226. """拉取单日广告状态快照。"""
  227. client = _get_odps_client()
  228. if client is None:
  229. return None
  230. biz, biz_dash = _parse_bizdate(bizdate)
  231. sql = f"""
  232. SELECT
  233. '{biz}' AS bizdate,
  234. ad_id,
  235. account_id,
  236. ad_name,
  237. create_time,
  238. ad_status,
  239. bid_amount,
  240. day_amount,
  241. optimization_goal,
  242. targeting
  243. FROM loghubods.ad_put_tencent_ad
  244. """
  245. try:
  246. logger.info("开始拉取广告状态: %s", biz)
  247. df = client.execute_sql(sql)
  248. if not df.empty:
  249. df["bid_amount"] = pd.to_numeric(df["bid_amount"], errors="coerce") / 100
  250. logger.info("广告状态拉取成功: %s, %d 行", biz, len(df))
  251. return df
  252. except Exception as e:
  253. logger.error("广告状态拉取失败 (%s): %s", biz, e, exc_info=True)
  254. return None
  255. # ===== V3 工具:30 天增量采集 =====
  256. @tool(description="拉取 30 天创意级别数据(增量,已有 CSV 的日期跳过)")
  257. async def fetch_creative_data(
  258. ctx: ToolContext,
  259. days: int = 7,
  260. end_date: str = "yesterday"
  261. ) -> ToolResult:
  262. """
  263. 拉取创意级别明细数据(V3)。
  264. Args:
  265. days: 回溯天数(默认 30)
  266. end_date: 结束日期(默认 yesterday,格式 YYYYMMDD 或 YYYY-MM-DD)
  267. Returns:
  268. ToolResult 包含采集摘要
  269. """
  270. _RAW_DIR.mkdir(parents=True, exist_ok=True)
  271. _AD_STATUS_DIR.mkdir(parents=True, exist_ok=True)
  272. # 解析日期范围
  273. _, end_dash = _parse_bizdate(end_date)
  274. end_dt = datetime.strptime(end_dash, "%Y-%m-%d")
  275. dates_to_fetch = []
  276. for i in range(days):
  277. dt = end_dt - timedelta(days=i)
  278. biz = dt.strftime("%Y%m%d")
  279. creative_csv = _RAW_DIR / f"creative_{biz}.csv"
  280. status_csv = _AD_STATUS_DIR / f"ad_status_{biz}.csv"
  281. if creative_csv.exists() and status_csv.exists():
  282. logger.info("跳过已有数据: %s", biz)
  283. continue
  284. dates_to_fetch.append(biz)
  285. if not dates_to_fetch:
  286. return ToolResult(
  287. title="数据已完整",
  288. output=f"最近 {days} 天数据已全部存在,无需拉取"
  289. )
  290. # 拉取缺失日期
  291. success_count = 0
  292. failed_dates = []
  293. for biz in sorted(dates_to_fetch):
  294. # 创意数据
  295. creative_csv = _RAW_DIR / f"creative_{biz}.csv"
  296. if not creative_csv.exists():
  297. df_creative = _fetch_creative_data(biz)
  298. if df_creative is not None:
  299. df_creative.to_csv(creative_csv, index=False, encoding="utf-8-sig")
  300. logger.info("已保存: %s (%d 行)", creative_csv.name, len(df_creative))
  301. else:
  302. failed_dates.append(biz)
  303. continue
  304. # 广告状态
  305. status_csv = _AD_STATUS_DIR / f"ad_status_{biz}.csv"
  306. if not status_csv.exists():
  307. df_status = _fetch_ad_status(biz)
  308. if df_status is not None:
  309. df_status.to_csv(status_csv, index=False, encoding="utf-8-sig")
  310. logger.info("已保存: %s (%d 行)", status_csv.name, len(df_status))
  311. else:
  312. failed_dates.append(biz)
  313. continue
  314. success_count += 1
  315. # 汇总
  316. lines = [
  317. f"数据采集完成(最近 {days} 天)",
  318. f"需拉取: {len(dates_to_fetch)} 天",
  319. f"成功: {success_count} 天",
  320. f"失败: {len(failed_dates)} 天",
  321. "",
  322. f"数据目录:",
  323. f" 创意数据: {_RAW_DIR}",
  324. f" 广告状态: {_AD_STATUS_DIR}",
  325. ]
  326. if failed_dates:
  327. lines.append("")
  328. lines.append("失败日期:")
  329. for d in failed_dates:
  330. lines.append(f" - {d}")
  331. return ToolResult(
  332. title=f"数据采集完成({success_count}/{len(dates_to_fetch)})",
  333. output="\n".join(lines),
  334. metadata={
  335. "total_days": days,
  336. "to_fetch": len(dates_to_fetch),
  337. "success": success_count,
  338. "failed": len(failed_dates),
  339. "failed_dates": failed_dates,
  340. }
  341. )
  342. # ===== 合并创意数据与广告状态(独立工具)=====
  343. # 合并后的列顺序(共 38 列)
  344. _MERGED_COLUMNS = [
  345. # 时间
  346. "bizdate",
  347. # 账户
  348. "account_id", "agent_name",
  349. # 广告基本信息
  350. "ad_id", "ad_name", "create_time", "广告优化目标", "package_name", "人群包人数",
  351. # 广告状态(来自状态表)
  352. "ad_status", "bid_amount", "day_amount", "optimization_goal_raw", "targeting",
  353. # 创意维度
  354. "creative_id", "creative_name", "videoid", "rootsourceid", "title", "image_url",
  355. # 曝光 & 点击
  356. "view_count", "valid_click_count", "key_page_view_count", "key_page_uv", "thousand_display_price",
  357. # 成本
  358. "cost", "单点击成本", "conversions_count",
  359. # 回流 & 收入
  360. "首层小程序打开数", "点击转化率", "单uv成本",
  361. "裂变层回流数", "裂变0层回流数", "t0裂变系数",
  362. "裂变1层回流数", "t1裂变系数",
  363. "总回流人数", "总收入",
  364. ]
  365. def _merge_single_day(biz: str) -> Optional[pd.DataFrame]:
  366. """
  367. 合并单日创意数据与广告状态数据(内部函数)。
  368. - 以创意表为主表(left join)
  369. - 统一 ad_id 为 Int64(创意表是 float,状态表是 int)
  370. - 状态表的 optimization_goal 重命名为 optimization_goal_raw
  371. - 按 _MERGED_COLUMNS 顺序输出,保存到 outputs/merged/merged_{biz}.csv
  372. Returns:
  373. 合并后的 DataFrame,或 None(文件不存在时)
  374. """
  375. creative_csv = _RAW_DIR / f"creative_{biz}.csv"
  376. status_csv = _AD_STATUS_DIR / f"ad_status_{biz}.csv"
  377. if not creative_csv.exists():
  378. logger.warning("创意数据不存在,跳过合并: %s", creative_csv)
  379. return None
  380. if not status_csv.exists():
  381. logger.warning("广告状态不存在,跳过合并: %s", status_csv)
  382. return None
  383. df_creative = pd.read_csv(creative_csv)
  384. df_status = pd.read_csv(status_csv)
  385. # 统一 ad_id 类型为 Int64(可空整数,避免 float 精度问题)
  386. df_creative["ad_id"] = pd.to_numeric(df_creative["ad_id"], errors="coerce").astype("Int64")
  387. df_status["ad_id"] = pd.to_numeric(df_status["ad_id"], errors="coerce").astype("Int64")
  388. # 状态表只保留需要引入的列,重命名 optimization_goal 避免与创意表冲突
  389. status_cols = ["ad_id", "ad_status", "bid_amount", "day_amount", "optimization_goal", "targeting"]
  390. df_status = df_status[[c for c in status_cols if c in df_status.columns]].copy()
  391. if "optimization_goal" in df_status.columns:
  392. df_status = df_status.rename(columns={"optimization_goal": "optimization_goal_raw"})
  393. # Left join(保留所有广告,包括 SUSPEND 状态)
  394. df_merged = df_creative.merge(df_status, on="ad_id", how="left")
  395. logger.info("合并后总行数: %d", len(df_merged))
  396. # 按指定列顺序输出(只保留存在的列,保持顺序)
  397. final_cols = [c for c in _MERGED_COLUMNS if c in df_merged.columns]
  398. df_merged = df_merged[final_cols]
  399. # 保存
  400. _MERGED_DIR.mkdir(parents=True, exist_ok=True)
  401. out_path = _MERGED_DIR / f"merged_{biz}.csv"
  402. df_merged.to_csv(out_path, index=False, encoding="utf-8-sig")
  403. logger.info("合并完成: %s (%d 行, %d 列)", out_path.name, len(df_merged), len(df_merged.columns))
  404. return df_merged
  405. @tool(description="合并创意数据与广告状态(批量)")
  406. async def merge_creative_data(
  407. ctx: ToolContext,
  408. days: int = 7,
  409. force: bool = False,
  410. ) -> ToolResult:
  411. """
  412. 合并创意级别数据与广告状态数据。
  413. 职责:
  414. - 读取 outputs/raw/creative_{date}.csv
  415. - 读取 outputs/ad_status/ad_status_{date}.csv
  416. - Left join on ad_id,输出到 outputs/merged/merged_{date}.csv
  417. - 支持批量合并(最近 N 天)
  418. - 幂等操作:已存在的合并文件默认跳过(force=True 强制重新合并)
  419. Args:
  420. days: 合并最近 N 天的数据(默认 30 天)
  421. force: 是否强制重新合并已存在的文件(默认 False)
  422. Returns:
  423. 合并结果摘要
  424. """
  425. try:
  426. # 确定日期范围
  427. end_dt = datetime.now() - timedelta(days=1)
  428. dates_to_merge = []
  429. for i in range(days):
  430. date_dt = end_dt - timedelta(days=i)
  431. biz = date_dt.strftime("%Y%m%d")
  432. dates_to_merge.append(biz)
  433. success_count = 0
  434. skip_count = 0
  435. fail_count = 0
  436. for biz in dates_to_merge:
  437. merged_csv = _MERGED_DIR / f"merged_{biz}.csv"
  438. # 检查是否已存在
  439. if merged_csv.exists() and not force:
  440. skip_count += 1
  441. continue
  442. # 执行合并
  443. df = _merge_single_day(biz)
  444. if df is not None:
  445. success_count += 1
  446. else:
  447. fail_count += 1
  448. # 汇总
  449. lines = [
  450. f"数据合并完成(最近 {days} 天)",
  451. f"成功: {success_count} 天",
  452. f"跳过: {skip_count} 天(已存在)",
  453. f"失败: {fail_count} 天(源文件缺失)",
  454. "",
  455. f"输出目录: {_MERGED_DIR}",
  456. f"列数: 38 列",
  457. ]
  458. return ToolResult(
  459. title=f"合并完成({success_count}/{days})",
  460. output="\n".join(lines),
  461. metadata={
  462. "success": success_count,
  463. "skip": skip_count,
  464. "fail": fail_count,
  465. "output_dir": str(_MERGED_DIR),
  466. }
  467. )
  468. except Exception as e:
  469. logger.error("merge_creative_data 失败: %s", e, exc_info=True)
  470. return ToolResult(title="merge_creative_data 失败", output=str(e))