data_query.py 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636
  1. """
  2. 数据查询工具 — auto_put_ad_mini V3
  3. V3 职责:
  4. - 创意级别明细数据采集(非广告级聚合)
  5. - 30 天增量拉取(已有 CSV 的日期跳过)
  6. - 广告状态按日快照
  7. 数据来源:
  8. - 创意效率:ODPS touliu_data + creative_data_day(创意级明细)
  9. - 广告状态:ODPS ad_put_tencent_ad(出价/预算/状态)
  10. 环境变量:
  11. ODPS_ACCESS_ID / ODPS_ACCESS_SECRET / ODPS_PROJECT
  12. """
  13. import logging
  14. import os
  15. from datetime import datetime, timedelta
  16. from pathlib import Path
  17. from typing import List, Optional
  18. import pandas as pd
  19. from agent.tools import tool
  20. from agent.tools.models import ToolContext, ToolResult
  21. logger = logging.getLogger(__name__)
  22. _MINI_DIR = Path(__file__).resolve().parent.parent
  23. _RAW_DIR = _MINI_DIR / "outputs" / "raw"
  24. _AD_STATUS_DIR = _MINI_DIR / "outputs" / "ad_status"
  25. _MERGED_DIR = _MINI_DIR / "outputs" / "merged"
  26. # 从 config 读取数据窗口配置
  27. import sys
  28. if str(_MINI_DIR) not in sys.path:
  29. sys.path.insert(0, str(_MINI_DIR))
  30. from config import DATA_WINDOW_DAYS
  31. # ===== ODPS 客户端(懒加载) =====
  32. _odps_client = None
  33. def _get_odps_client():
  34. """懒加载 ODPS 客户端,首次调用时初始化。"""
  35. global _odps_client
  36. if _odps_client is None:
  37. try:
  38. import sys
  39. autoput_ad_dir = _MINI_DIR.parent / "auto_put_ad"
  40. if str(autoput_ad_dir) not in sys.path:
  41. sys.path.insert(0, str(autoput_ad_dir))
  42. from odps_module import ODPSClient
  43. project = os.getenv("ODPS_PROJECT", "loghubods")
  44. _odps_client = ODPSClient(project=project)
  45. logger.info("ODPS 客户端已初始化,项目: %s", project)
  46. except ImportError as e:
  47. logger.warning("odps_module 导入失败: %s", e)
  48. _odps_client = None
  49. except Exception as e:
  50. logger.warning("ODPS 客户端初始化失败: %s", e)
  51. _odps_client = None
  52. return _odps_client
  53. # ===== 日期解析 =====
  54. def _parse_bizdate(bizdate: str) -> tuple:
  55. """解析业务日期,返回 (YYYYMMDD, YYYY-MM-DD)。"""
  56. if bizdate in ("yesterday", ""):
  57. biz = (datetime.now() - timedelta(days=1)).strftime("%Y%m%d")
  58. else:
  59. biz = bizdate.replace("-", "")
  60. biz_dash = f"{biz[:4]}-{biz[4:6]}-{biz[6:]}"
  61. return biz, biz_dash
  62. # ===== V3 创意级别 SQL =====
  63. def _build_creative_sql(biz: str, biz_dash: str) -> str:
  64. """
  65. 构建创意级别明细 SQL(V3)。
  66. 基于用户提供的原始 SQL,保留创意级明细(不做广告级聚合)。
  67. 返回字段:
  68. - 广告维度:account_id, ad_id, ad_name, create_time
  69. - 创意维度:creative_id, creative_name, rootsourceid, videoid, title, image_url
  70. - 业务维度:agent_name, package_name, 广告优化目标, 人群包人数
  71. - 效率指标:cost(元), valid_click_count, conversions_count,
  72. 首层小程序打开数, 裂变0层回流数, 裂变1层回流数, 总回流人数, 总收入,
  73. view_count, key_page_view_count, key_page_uv, thousand_display_price
  74. """
  75. return f"""
  76. SELECT '{biz}' AS bizdate
  77. ,a.account_id
  78. ,d.agent_name
  79. ,a.ad_id
  80. ,c.ad_name
  81. ,c.create_time
  82. ,CASE WHEN t2.event_name IS NULL THEN '无'
  83. ELSE t2.event_name
  84. END AS 广告优化目标
  85. ,t1.package_name
  86. ,t4.人群包人数
  87. ,a.creative_id
  88. ,t.creative_name
  89. ,SPLIT(t.rootsourceid,'_')[3] AS videoid
  90. ,t.rootsourceid
  91. ,t3.title
  92. ,t3.image_url
  93. ,b.view_count
  94. ,b.valid_click_count
  95. ,b.key_page_view_count
  96. ,b.key_page_uv
  97. ,b.thousand_display_price
  98. ,b.cost / 100 AS cost
  99. ,b.cost / 100 / b.valid_click_count AS 单点击成本
  100. ,b.conversions_count
  101. ,t.首层小程序打开数
  102. ,t.首层小程序打开数 / b.valid_click_count AS 点击转化率
  103. ,b.cost / 100 / t.首层小程序打开数 AS 单UV成本
  104. ,t.裂变层回流数
  105. ,t.裂变0层回流数
  106. ,t.裂变0层回流数 / t.首层小程序打开数 AS T0裂变系数
  107. ,t.裂变1层回流数
  108. ,t.裂变1层回流数 / t.首层小程序打开数 AS T1裂变系数
  109. ,t.总回流人数
  110. ,t.总收入
  111. FROM (
  112. SELECT IF(c.creative_name IS NOT NULL,c.creative_name,t.rootsourceid) AS creative_name
  113. ,t.*
  114. FROM loghubods.touliu_data t
  115. LEFT JOIN (
  116. SELECT DISTINCT creative_name
  117. ,SPLIT(GET_JSON_OBJECT(page_spec,'$.wechat_mini_program_spec.mini_program_path'),'rootSourceId%3D')[1] AS rootsourceid
  118. ,SPLIT(SPLIT(GET_JSON_OBJECT(page_spec,'$.wechat_mini_program_spec.mini_program_path'),'rootSourceId%3D')[1],'_')[3] AS videoid
  119. ,b.creative_id
  120. FROM loghubods.ad_put_tencent_creative_components a
  121. LEFT JOIN loghubods.ad_put_tencent_creative_day b
  122. ON a.creative_id = b.creative_id
  123. WHERE page_type = 'PAGE_TYPE_WECHAT_MINI_PROGRAM'
  124. ) c
  125. ON c.rootsourceid = t.rootsourceid
  126. WHERE t.dt = '{biz}'
  127. ) t
  128. LEFT JOIN loghubods.ad_put_tencent_creative_day a
  129. ON t.creative_name = a.creative_name
  130. LEFT JOIN loghubods.ad_put_tencent_ad c
  131. ON a.ad_id = c.ad_id
  132. LEFT JOIN (
  133. SELECT creative_id
  134. ,valid_click_count
  135. ,view_count
  136. ,cost
  137. ,conversions_count
  138. ,key_page_view_count
  139. ,key_page_uv
  140. ,thousand_display_price
  141. FROM (
  142. SELECT creative_id
  143. ,valid_click_count
  144. ,view_count
  145. ,cost
  146. ,conversions_count
  147. ,key_page_view_count
  148. ,key_page_uv
  149. ,thousand_display_price
  150. ,ROW_NUMBER() OVER (PARTITION BY creative_id ORDER BY update_time DESC ) AS rank
  151. FROM loghubods.ad_put_tencent_creative_data_day
  152. WHERE dt = REGEXP_REPLACE('{biz}','^(\\\\d{{4}})(\\\\d{{2}})(\\\\d{{2}})$','\\\\1-\\\\2-\\\\3')
  153. ) t
  154. WHERE rank = 1
  155. ) b
  156. ON a.creative_id = b.creative_id
  157. LEFT JOIN (
  158. SELECT account_id
  159. ,MAX(agent_name) AS agent_name
  160. FROM loghubods.ad_put_tencent_account
  161. GROUP BY account_id
  162. ) d
  163. ON a.account_id = d.account_id
  164. LEFT JOIN (
  165. SELECT t1.ad_id
  166. ,t1.package_id
  167. ,t1.package_name
  168. ,t1.min_people
  169. FROM (
  170. SELECT a.ad_id
  171. ,a.package_id
  172. ,b.package_name
  173. ,b.min_people
  174. ,ROW_NUMBER() OVER (PARTITION BY a.ad_id ORDER BY CAST(b.min_people AS BIGINT) ASC ) AS rank
  175. FROM loghubods.ad_put_tencent_ad_package_mapping a
  176. LEFT JOIN loghubods.ad_put_tencent_package b
  177. ON a.package_id = b.tencent_audience_id
  178. WHERE a.is_delete = 0
  179. ) t1
  180. WHERE t1.rank = 1
  181. ) t1
  182. ON a.ad_id = t1.ad_id
  183. LEFT JOIN loghubods.dim_ad_event_enum t2
  184. ON c.optimization_goal = t2.event_id
  185. LEFT JOIN loghubods.ad_put_tencent_creative_analysis t3
  186. ON a.creative_id = t3.creative_id
  187. LEFT JOIN (
  188. SELECT type
  189. ,COUNT(DISTINCT union_id) AS 人群包人数
  190. FROM loghubods.mid_share_return_people_1year
  191. WHERE dt = MAX_PT('loghubods.mid_share_return_people_1year')
  192. GROUP BY type
  193. ) t4
  194. ON t1.package_name = t4.type
  195. WHERE t.dt = '{biz}'
  196. AND (
  197. 总回流人数 >= 30
  198. OR a.account_id IS NOT NULL
  199. )
  200. """
  201. def _fetch_creative_data(bizdate: str) -> Optional[pd.DataFrame]:
  202. """拉取单日创意级别数据。"""
  203. client = _get_odps_client()
  204. if client is None:
  205. logger.error("ODPS 客户端未初始化")
  206. return None
  207. biz, biz_dash = _parse_bizdate(bizdate)
  208. sql = _build_creative_sql(biz, biz_dash)
  209. try:
  210. logger.info("开始拉取创意数据: %s", biz)
  211. df = client.execute_sql(sql)
  212. if df.empty:
  213. logger.warning("创意数据为空: %s", biz)
  214. return pd.DataFrame()
  215. # 类型转换(cost 已经是元,不需要再除以100)
  216. for col in ["cost", "单UV成本", "单点击成本", "总收入"]:
  217. if col in df.columns:
  218. df[col] = pd.to_numeric(df[col], errors="coerce")
  219. for col in ["首层小程序打开数", "裂变层回流数", "裂变0层回流数", "裂变1层回流数",
  220. "总回流人数", "valid_click_count", "view_count",
  221. "key_page_view_count", "key_page_uv", "conversions_count"]:
  222. if col in df.columns:
  223. df[col] = pd.to_numeric(df[col], errors="coerce").fillna(0).astype(int)
  224. logger.info("创意数据拉取成功: %s, %d 行", biz, len(df))
  225. return df
  226. except Exception as e:
  227. logger.error("创意数据拉取失败 (%s): %s", biz, e, exc_info=True)
  228. return None
  229. # ===== 广告状态拉取 =====
  230. def _fetch_ad_status(bizdate: str) -> Optional[pd.DataFrame]:
  231. """拉取单日广告状态快照。"""
  232. client = _get_odps_client()
  233. if client is None:
  234. return None
  235. biz, biz_dash = _parse_bizdate(bizdate)
  236. sql = f"""
  237. SELECT
  238. '{biz}' AS bizdate,
  239. ad_id,
  240. account_id,
  241. ad_name,
  242. create_time,
  243. ad_status,
  244. bid_amount,
  245. day_amount,
  246. optimization_goal,
  247. targeting
  248. FROM loghubods.ad_put_tencent_ad
  249. """
  250. try:
  251. logger.info("开始拉取广告状态: %s", biz)
  252. df = client.execute_sql(sql)
  253. if not df.empty:
  254. df["bid_amount"] = pd.to_numeric(df["bid_amount"], errors="coerce") / 100
  255. logger.info("广告状态拉取成功: %s, %d 行", biz, len(df))
  256. return df
  257. except Exception as e:
  258. logger.error("广告状态拉取失败 (%s): %s", biz, e, exc_info=True)
  259. return None
  260. # ===== V3 工具:30 天增量采集 =====
  261. @tool(description="拉取创意级别数据(增量,已有 CSV 的日期跳过)")
  262. async def fetch_creative_data(
  263. ctx: ToolContext = None,
  264. days: int = None,
  265. end_date: str = "yesterday"
  266. ) -> ToolResult:
  267. """
  268. 拉取创意级别明细数据(V3)。
  269. Args:
  270. days: 回溯天数(默认使用 config.DATA_WINDOW_DAYS)
  271. end_date: 结束日期(默认 yesterday,格式 YYYYMMDD 或 YYYY-MM-DD)
  272. Returns:
  273. ToolResult 包含采集摘要
  274. """
  275. # 使用配置的默认值
  276. if days is None:
  277. days = DATA_WINDOW_DAYS
  278. _RAW_DIR.mkdir(parents=True, exist_ok=True)
  279. _AD_STATUS_DIR.mkdir(parents=True, exist_ok=True)
  280. # 解析日期范围
  281. _, end_dash = _parse_bizdate(end_date)
  282. end_dt = datetime.strptime(end_dash, "%Y-%m-%d")
  283. dates_to_fetch = []
  284. for i in range(days):
  285. dt = end_dt - timedelta(days=i)
  286. biz = dt.strftime("%Y%m%d")
  287. creative_csv = _RAW_DIR / f"creative_{biz}.csv"
  288. status_csv = _AD_STATUS_DIR / f"ad_status_{biz}.csv"
  289. if creative_csv.exists() and status_csv.exists():
  290. logger.info("跳过已有数据: %s", biz)
  291. continue
  292. dates_to_fetch.append(biz)
  293. if not dates_to_fetch:
  294. return ToolResult(
  295. title="数据已完整",
  296. output=f"最近 {days} 天数据已全部存在,无需拉取"
  297. )
  298. # 拉取缺失日期
  299. success_count = 0
  300. failed_dates = []
  301. for biz in sorted(dates_to_fetch):
  302. # 创意数据
  303. creative_csv = _RAW_DIR / f"creative_{biz}.csv"
  304. if not creative_csv.exists():
  305. df_creative = _fetch_creative_data(biz)
  306. if df_creative is not None:
  307. df_creative.to_csv(creative_csv, index=False, encoding="utf-8-sig")
  308. logger.info("已保存: %s (%d 行)", creative_csv.name, len(df_creative))
  309. else:
  310. failed_dates.append(biz)
  311. continue
  312. # 广告状态
  313. status_csv = _AD_STATUS_DIR / f"ad_status_{biz}.csv"
  314. if not status_csv.exists():
  315. df_status = _fetch_ad_status(biz)
  316. if df_status is not None:
  317. df_status.to_csv(status_csv, index=False, encoding="utf-8-sig")
  318. logger.info("已保存: %s (%d 行)", status_csv.name, len(df_status))
  319. else:
  320. failed_dates.append(biz)
  321. continue
  322. success_count += 1
  323. # 汇总
  324. lines = [
  325. f"数据采集完成(最近 {days} 天)",
  326. f"需拉取: {len(dates_to_fetch)} 天",
  327. f"成功: {success_count} 天",
  328. f"失败: {len(failed_dates)} 天",
  329. "",
  330. f"数据目录:",
  331. f" 创意数据: {_RAW_DIR}",
  332. f" 广告状态: {_AD_STATUS_DIR}",
  333. ]
  334. if failed_dates:
  335. lines.append("")
  336. lines.append("失败日期:")
  337. for d in failed_dates:
  338. lines.append(f" - {d}")
  339. return ToolResult(
  340. title=f"数据采集完成({success_count}/{len(dates_to_fetch)})",
  341. output="\n".join(lines),
  342. metadata={
  343. "total_days": days,
  344. "to_fetch": len(dates_to_fetch),
  345. "success": success_count,
  346. "failed": len(failed_dates),
  347. "failed_dates": failed_dates,
  348. }
  349. )
  350. # ===== 合并创意数据与广告状态(独立工具)=====
  351. # 合并后的列顺序(共 38 列)
  352. _MERGED_COLUMNS = [
  353. # 时间
  354. "bizdate",
  355. # 账户
  356. "account_id", "agent_name",
  357. # 广告基本信息
  358. "ad_id", "ad_name", "create_time", "广告优化目标", "package_name", "人群包人数",
  359. # 广告状态(来自状态表)
  360. "ad_status", "bid_amount", "day_amount", "optimization_goal_raw", "targeting",
  361. # 创意维度
  362. "creative_id", "creative_name", "videoid", "rootsourceid", "title", "image_url",
  363. # 曝光 & 点击
  364. "view_count", "valid_click_count", "key_page_view_count", "key_page_uv", "thousand_display_price",
  365. # 成本
  366. "cost", "单点击成本", "conversions_count",
  367. # 回流 & 收入
  368. "首层小程序打开数", "点击转化率", "单uv成本",
  369. "裂变层回流数", "裂变0层回流数", "t0裂变系数",
  370. "裂变1层回流数", "t1裂变系数",
  371. "总回流人数", "总收入",
  372. ]
  373. def _merge_single_day(biz: str) -> Optional[pd.DataFrame]:
  374. """
  375. 合并单日创意数据与广告状态数据(内部函数)。
  376. - 以创意表为主表(left join)
  377. - 统一 ad_id 为 Int64(创意表是 float,状态表是 int)
  378. - 状态表的 optimization_goal 重命名为 optimization_goal_raw
  379. - 按 _MERGED_COLUMNS 顺序输出,保存到 outputs/merged/merged_{biz}.csv
  380. Returns:
  381. 合并后的 DataFrame,或 None(文件不存在时)
  382. """
  383. creative_csv = _RAW_DIR / f"creative_{biz}.csv"
  384. status_csv = _AD_STATUS_DIR / f"ad_status_{biz}.csv"
  385. if not creative_csv.exists():
  386. logger.warning("创意数据不存在,跳过合并: %s", creative_csv)
  387. return None
  388. if not status_csv.exists():
  389. logger.warning("广告状态不存在,跳过合并: %s", status_csv)
  390. return None
  391. df_creative = pd.read_csv(creative_csv)
  392. df_status = pd.read_csv(status_csv)
  393. # 统一 ad_id 类型为 Int64(可空整数,避免 float 精度问题)
  394. df_creative["ad_id"] = pd.to_numeric(df_creative["ad_id"], errors="coerce").astype("Int64")
  395. df_status["ad_id"] = pd.to_numeric(df_status["ad_id"], errors="coerce").astype("Int64")
  396. # 状态表只保留需要引入的列,重命名 optimization_goal 避免与创意表冲突
  397. status_cols = ["ad_id", "ad_status", "bid_amount", "day_amount", "optimization_goal", "targeting"]
  398. df_status = df_status[[c for c in status_cols if c in df_status.columns]].copy()
  399. if "optimization_goal" in df_status.columns:
  400. df_status = df_status.rename(columns={"optimization_goal": "optimization_goal_raw"})
  401. # Left join(保留所有广告,包括 SUSPEND 状态)
  402. df_merged = df_creative.merge(df_status, on="ad_id", how="left")
  403. logger.info("合并后总行数: %d", len(df_merged))
  404. # ===== 历史状态补充:用缓存中的 DELETED/SUSPEND 覆盖当日缺失状态 =====
  405. cache_path = _MINI_DIR / "outputs" / "ad_filter_cache.json"
  406. if cache_path.exists() and "ad_status" in df_merged.columns:
  407. import json
  408. try:
  409. with open(cache_path, encoding="utf-8") as _fc:
  410. _cache = json.load(_fc)
  411. skip_ids = _cache.get("skip_ad_ids", {})
  412. deleted_ids = set(str(x) for x in skip_ids.get("deleted_history", []))
  413. suspend_ids = set(str(x) for x in skip_ids.get("suspend", []))
  414. decided_ids = set(str(x) for x in skip_ids.get("decided_pause", []))
  415. ad_id_str = df_merged["ad_id"].astype(str)
  416. # 当日状态为空(LEFT JOIN 未匹配)→ 用缓存填充
  417. null_mask = df_merged["ad_status"].isna() | (df_merged["ad_status"] == "")
  418. filled_null = 0
  419. if null_mask.any():
  420. for ids, status in [
  421. (deleted_ids, "AD_STATUS_DELETED"),
  422. (suspend_ids, "AD_STATUS_SUSPEND"),
  423. ]:
  424. match = null_mask & ad_id_str.isin(ids)
  425. if match.any():
  426. df_merged.loc[match, "ad_status"] = status
  427. filled_null += match.sum()
  428. # 当日状态为 NORMAL 但历史为 DELETED → 标记为 DELETED
  429. normal_mask = df_merged["ad_status"] == "AD_STATUS_NORMAL"
  430. overridden = 0
  431. if normal_mask.any():
  432. match_deleted = normal_mask & ad_id_str.isin(deleted_ids)
  433. if match_deleted.any():
  434. df_merged.loc[match_deleted, "ad_status"] = "AD_STATUS_DELETED"
  435. overridden += match_deleted.sum()
  436. # 已决策pause的零消耗广告 → 标记为 SUSPEND(视同已暂停)
  437. match_decided = normal_mask & ad_id_str.isin(decided_ids)
  438. if match_decided.any():
  439. df_merged.loc[match_decided, "ad_status"] = "AD_STATUS_SUSPEND"
  440. overridden += match_decided.sum()
  441. if filled_null > 0 or overridden > 0:
  442. logger.info(
  443. "历史状态补充:填充空状态 %d 条,覆盖 NORMAL→DELETED/SUSPEND %d 条",
  444. filled_null, overridden,
  445. )
  446. except Exception as e:
  447. logger.warning("加载历史状态缓存失败(跳过): %s", e)
  448. # ===== 前置过滤:在数据源头直接移除 DELETED/SUSPEND,不干扰后续流程 =====
  449. if "ad_status" in df_merged.columns:
  450. excluded_mask = df_merged["ad_status"].isin(
  451. {"AD_STATUS_DELETED", "AD_STATUS_SUSPEND", "DELETED", "SUSPEND"}
  452. )
  453. n_excluded = excluded_mask.sum()
  454. if n_excluded > 0:
  455. # 按状态分别统计
  456. status_breakdown = (
  457. df_merged.loc[excluded_mask, "ad_status"]
  458. .value_counts().to_dict()
  459. )
  460. df_merged = df_merged[~excluded_mask].copy()
  461. logger.info(
  462. "前置过滤:移除 %d 条 DELETED/SUSPEND 广告(%s),剩余 %d 条 NORMAL",
  463. n_excluded, status_breakdown, len(df_merged),
  464. )
  465. # 按指定列顺序输出(只保留存在的列,保持顺序)
  466. final_cols = [c for c in _MERGED_COLUMNS if c in df_merged.columns]
  467. df_merged = df_merged[final_cols]
  468. # 保存
  469. _MERGED_DIR.mkdir(parents=True, exist_ok=True)
  470. out_path = _MERGED_DIR / f"merged_{biz}.csv"
  471. df_merged.to_csv(out_path, index=False, encoding="utf-8-sig")
  472. logger.info("合并完成: %s (%d 行, %d 列)", out_path.name, len(df_merged), len(df_merged.columns))
  473. return df_merged
  474. @tool(description="合并创意数据与广告状态(批量)")
  475. async def merge_creative_data(
  476. ctx: ToolContext = None,
  477. days: int = None,
  478. force: bool = False,
  479. ) -> ToolResult:
  480. """
  481. 合并创意级别数据与广告状态数据。
  482. 职责:
  483. - 读取 outputs/raw/creative_{date}.csv
  484. - 读取 outputs/ad_status/ad_status_{date}.csv
  485. - Left join on ad_id,输出到 outputs/merged/merged_{date}.csv
  486. - 支持批量合并(最近 N 天)
  487. - 幂等操作:已存在的合并文件默认跳过(force=True 强制重新合并)
  488. Args:
  489. days: 合并最近 N 天的数据(默认 30 天)
  490. force: 是否强制重新合并已存在的文件(默认 False)
  491. Returns:
  492. 合并结果摘要
  493. """
  494. try:
  495. # 使用配置的默认值
  496. if days is None:
  497. days = DATA_WINDOW_DAYS
  498. # 确定日期范围
  499. end_dt = datetime.now() - timedelta(days=1)
  500. dates_to_merge = []
  501. for i in range(days):
  502. date_dt = end_dt - timedelta(days=i)
  503. biz = date_dt.strftime("%Y%m%d")
  504. dates_to_merge.append(biz)
  505. success_count = 0
  506. skip_count = 0
  507. fail_count = 0
  508. for biz in dates_to_merge:
  509. merged_csv = _MERGED_DIR / f"merged_{biz}.csv"
  510. # 检查是否已存在
  511. if merged_csv.exists() and not force:
  512. skip_count += 1
  513. continue
  514. # 执行合并
  515. df = _merge_single_day(biz)
  516. if df is not None:
  517. success_count += 1
  518. else:
  519. fail_count += 1
  520. # 汇总
  521. lines = [
  522. f"数据合并完成(最近 {days} 天)",
  523. f"成功: {success_count} 天",
  524. f"跳过: {skip_count} 天(已存在)",
  525. f"失败: {fail_count} 天(源文件缺失)",
  526. "",
  527. f"输出目录: {_MERGED_DIR}",
  528. f"列数: 38 列",
  529. ]
  530. return ToolResult(
  531. title=f"合并完成({success_count}/{days})",
  532. output="\n".join(lines),
  533. metadata={
  534. "success": success_count,
  535. "skip": skip_count,
  536. "fail": fail_count,
  537. "output_dir": str(_MERGED_DIR),
  538. }
  539. )
  540. except Exception as e:
  541. logger.error("merge_creative_data 失败: %s", e, exc_info=True)
  542. return ToolResult(title="merge_creative_data 失败", output=str(e))