fetch_daily.py 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509
  1. #!/usr/bin/env python
  2. # coding=utf-8
  3. """
  4. 按天增量获取数据 - 通用版本
  5. 支持并发获取,自动跳过已有数据
  6. 用法:
  7. python fetch_daily.py tasks/xxx/query.sql # 获取最近7天
  8. python fetch_daily.py tasks/xxx/query.sql --days 30 # 获取最近30天
  9. python fetch_daily.py tasks/xxx/query.sql --start 20260101 --end 20260107
  10. python fetch_daily.py tasks/xxx/query.sql --date 20260105 # 单天
  11. python fetch_daily.py tasks/xxx/query.sql --force # 强制重新获取
  12. python fetch_daily.py tasks/xxx/query.sql --workers 10 # 设置天级并发数
  13. python fetch_daily.py tasks/xxx/query.sql --parallel 50 # 单天多线程下载(默认50,大数据量推荐)
  14. python fetch_daily.py tasks/xxx/query.sql --parallel 0 # 关闭多线程,使用单线程下载
  15. python fetch_daily.py tasks/xxx/query.sql --feishu # 获取后上传到飞书表格
  16. python fetch_daily.py tasks/xxx/query.sql --feishu TOKEN # 指定飞书表格token
  17. python fetch_daily.py tasks/xxx/query.sql --merge --feishu # 仅合并并上传飞书
  18. """
  19. import argparse
  20. import sys
  21. from datetime import datetime, timedelta
  22. from pathlib import Path
  23. from concurrent.futures import ThreadPoolExecutor, as_completed
  24. import threading
  25. sys.path.insert(0, str(Path(__file__).parent / "lib"))
  26. from odps_module import ODPSClient
  27. import csv
  28. # 线程安全的计数器
  29. counter_lock = threading.Lock()
  30. success_count = 0
  31. fail_count = 0
  32. def get_existing_dates(daily_dir):
  33. """获取已下载的日期列表"""
  34. existing = set()
  35. if not daily_dir.exists():
  36. return existing
  37. for f in daily_dir.glob("*.csv"):
  38. try:
  39. dt = f.stem
  40. if len(dt) == 8 and dt.isdigit():
  41. existing.add(dt)
  42. except:
  43. pass
  44. return existing
  45. def merge_csv_files(daily_dir, output_file=None):
  46. """合并目录下所有日期 CSV 文件,只保留一个表头"""
  47. csv_files = sorted(daily_dir.glob("*.csv"))
  48. if not csv_files:
  49. print("没有找到 CSV 文件")
  50. return None
  51. if output_file is None:
  52. output_file = daily_dir.parent / f"{daily_dir.name}_merged.csv"
  53. with open(output_file, "w", encoding="utf-8") as out:
  54. header_written = False
  55. total_rows = 0
  56. for csv_file in csv_files:
  57. with open(csv_file, "r", encoding="utf-8") as f:
  58. lines = f.readlines()
  59. if not lines:
  60. continue
  61. if not header_written:
  62. out.write(lines[0])
  63. header_written = True
  64. for line in lines[1:]:
  65. out.write(line)
  66. total_rows += 1
  67. print(f"合并完成: {len(csv_files)} 个文件, {total_rows} 行数据")
  68. print(f"输出文件: {output_file}")
  69. return output_file
  70. def infer_column_types(rows):
  71. """推断每列的类型:int, float, 或 str"""
  72. if not rows:
  73. return []
  74. num_cols = len(rows[0])
  75. col_types = []
  76. for col_idx in range(num_cols):
  77. has_float = False
  78. all_numeric = True
  79. for row in rows:
  80. if col_idx >= len(row):
  81. continue
  82. v = row[col_idx].strip() if row[col_idx] else ""
  83. if not v: # 空值不影响类型判断
  84. continue
  85. try:
  86. if '.' in v or 'e' in v.lower():
  87. float(v)
  88. has_float = True
  89. else:
  90. int(v)
  91. except ValueError:
  92. all_numeric = False
  93. break
  94. if all_numeric:
  95. col_types.append('float' if has_float else 'int')
  96. else:
  97. col_types.append('str')
  98. return col_types
  99. def convert_row_by_types(row, col_types):
  100. """按列类型转换一行数据"""
  101. result = []
  102. for i, cell in enumerate(row):
  103. if i >= len(col_types):
  104. result.append(cell)
  105. continue
  106. v = cell.strip() if cell else ""
  107. if not v:
  108. result.append("")
  109. continue
  110. col_type = col_types[i]
  111. if col_type == 'int':
  112. result.append(int(v))
  113. elif col_type == 'float':
  114. result.append(float(v))
  115. else:
  116. result.append(cell)
  117. return result
  118. def load_feishu_config(sql_file):
  119. """加载飞书配置,优先级: {sql名}.json > sql目录/default.json > 根目录/default.json > 默认值"""
  120. import json
  121. defaults = {
  122. "token": "ONZqsxB9BhGH8tt90EScSJT5nHh",
  123. "sheet_id": None,
  124. "sort": "dt:desc",
  125. "cols": None,
  126. }
  127. root_dir = Path(__file__).parent
  128. sql_dir = sql_file.parent
  129. sql_name = sql_file.stem
  130. def load_json(path, name):
  131. if path.exists():
  132. try:
  133. with open(path, "r", encoding="utf-8") as f:
  134. defaults.update(json.load(f))
  135. except Exception as e:
  136. print(f"警告: 读取 {name} 失败: {e}")
  137. # 按优先级从低到高加载(后加载的覆盖先加载的)
  138. load_json(root_dir / "default.json", "根目录/default.json")
  139. load_json(sql_dir / "default.json", "sql目录/default.json")
  140. load_json(sql_dir / f"{sql_name}.json", f"{sql_name}.json")
  141. return defaults
  142. def parse_sort_spec(sort_spec):
  143. """解析排序规格,如 'dt:desc,name:asc' -> [('dt', True), ('name', False)]"""
  144. if not sort_spec:
  145. return []
  146. result = []
  147. for part in sort_spec.split(","):
  148. part = part.strip()
  149. if not part:
  150. continue
  151. if ":" in part:
  152. field, order = part.rsplit(":", 1)
  153. desc = order.lower() != "asc"
  154. else:
  155. field, desc = part, True # 默认逆序
  156. result.append((field.strip(), desc))
  157. return result
  158. def parse_cols_spec(cols_spec):
  159. """解析列映射规格,如 'dt:日期,name,value:数值' -> [('dt', '日期'), ('name', 'name'), ('value', '数值')]"""
  160. if not cols_spec:
  161. return []
  162. result = []
  163. for part in cols_spec.split(","):
  164. part = part.strip()
  165. if not part:
  166. continue
  167. if ":" in part:
  168. old_name, new_name = part.split(":", 1)
  169. result.append((old_name.strip(), new_name.strip()))
  170. else:
  171. result.append((part, part))
  172. return result
  173. def apply_cols_mapping(header, data_rows, cols_spec):
  174. """应用列映射:筛选、排序、重命名"""
  175. col_mapping = parse_cols_spec(cols_spec)
  176. if not col_mapping:
  177. return header, data_rows
  178. # 构建索引映射
  179. header_index = {name: i for i, name in enumerate(header)}
  180. new_header = []
  181. col_indices = []
  182. for old_name, new_name in col_mapping:
  183. if old_name in header_index:
  184. col_indices.append(header_index[old_name])
  185. new_header.append(new_name)
  186. else:
  187. print(f"警告: 字段 '{old_name}' 不存在,已跳过")
  188. if not col_indices:
  189. print("警告: 没有有效的列映射,保持原样")
  190. return header, data_rows
  191. # 应用映射
  192. new_rows = []
  193. for row in data_rows:
  194. new_row = [row[i] if i < len(row) else "" for i in col_indices]
  195. new_rows.append(new_row)
  196. print(f"列映射: {len(col_indices)} 列")
  197. return new_header, new_rows
  198. def upload_to_feishu(csv_file, sheet_token, sheet_id=None, sort_spec="dt:desc", cols_spec=None):
  199. """上传 CSV 文件到飞书表格
  200. Args:
  201. csv_file: CSV 文件路径
  202. sheet_token: 飞书表格 token
  203. sheet_id: 工作表 ID(None 时自动获取第一个)
  204. sort_spec: 排序规格,如 "dt:desc,name:asc"
  205. cols_spec: 列映射规格,如 "dt:日期,name,value:数值"
  206. """
  207. from feishu import write_data_to_sheet
  208. # 读取 CSV
  209. with open(csv_file, "r", encoding="utf-8") as f:
  210. reader = csv.reader(f)
  211. rows = list(reader)
  212. if len(rows) < 2:
  213. print("CSV 文件为空,跳过上传")
  214. return
  215. header = rows[0]
  216. data_rows = rows[1:]
  217. # 排序(在列映射之前,使用原始列名)
  218. sort_fields = parse_sort_spec(sort_spec)
  219. if sort_fields:
  220. applied = []
  221. for field, desc in reversed(sort_fields):
  222. if field in header:
  223. idx = header.index(field)
  224. data_rows.sort(key=lambda row: row[idx] if idx < len(row) else "", reverse=desc)
  225. applied.append(f"{field}:{'desc' if desc else 'asc'}")
  226. if applied:
  227. print(f"排序: {', '.join(reversed(applied))}")
  228. # 列映射(排序之后)
  229. header, data_rows = apply_cols_mapping(header, data_rows, cols_spec)
  230. # 按列推断类型并转换
  231. col_types = infer_column_types(data_rows)
  232. converted_rows = [convert_row_by_types(row, col_types) for row in data_rows]
  233. data = [header] + converted_rows
  234. print(f"上传到飞书: {len(data_rows)} 行数据")
  235. write_data_to_sheet(
  236. data=data,
  237. sheet_token=sheet_token,
  238. sheetid=sheet_id, # None 时自动获取第一个工作表
  239. )
  240. print(f"飞书上传完成: {sheet_token}")
  241. def get_date_range(start_str, end_str):
  242. """生成日期范围列表"""
  243. start = datetime.strptime(start_str, "%Y%m%d")
  244. end = datetime.strptime(end_str, "%Y%m%d")
  245. dates = []
  246. current = start
  247. while current <= end:
  248. dates.append(current.strftime("%Y%m%d"))
  249. current += timedelta(days=1)
  250. return dates
  251. def fetch_single_day(dt, sql_template, daily_dir, parallel_threads=0):
  252. """获取单天数据"""
  253. global success_count, fail_count
  254. try:
  255. client = ODPSClient()
  256. sql = sql_template.replace("${dt}", dt)
  257. output_file = daily_dir / f"{dt}.csv"
  258. # 下载到文件
  259. if parallel_threads > 0:
  260. # 多线程并行下载(适合大数据量)
  261. client.execute_sql_result_save_file_parallel(sql, str(output_file), workers=parallel_threads)
  262. else:
  263. # 单线程下载
  264. client.execute_sql_result_save_file(sql, str(output_file))
  265. # 检查结果
  266. if output_file.exists():
  267. row_count = sum(1 for _ in open(output_file)) - 1 # 减去表头
  268. with counter_lock:
  269. success_count += 1
  270. if row_count > 0:
  271. return (dt, "success", row_count)
  272. else:
  273. return (dt, "empty", 0)
  274. else:
  275. with counter_lock:
  276. fail_count += 1
  277. return (dt, "fail", 0)
  278. except Exception as e:
  279. with counter_lock:
  280. fail_count += 1
  281. return (dt, "error", str(e))
  282. def main():
  283. global success_count, fail_count
  284. parser = argparse.ArgumentParser(description="按天增量获取数据")
  285. parser.add_argument("sql_file", type=str, help="SQL文件路径")
  286. parser.add_argument("--days", type=int, default=7, help="获取最近N天 (默认7)")
  287. parser.add_argument("--start", type=str, help="开始日期 YYYYMMDD")
  288. parser.add_argument("--end", type=str, help="结束日期 YYYYMMDD")
  289. parser.add_argument("--date", type=str, help="单天日期 YYYYMMDD")
  290. parser.add_argument("--force", action="store_true", help="强制重新获取")
  291. parser.add_argument("--workers", type=int, default=5, help="天级并发数 (默认5)")
  292. parser.add_argument("--parallel", type=int, default=50, help="单天多线程下载 (默认50, 大数据量推荐)")
  293. parser.add_argument("--merge", action="store_true", help="合并所有日期数据到一个文件")
  294. parser.add_argument("--feishu", nargs="?", const="__USE_CONFIG__",
  295. help="上传到飞书表格")
  296. parser.add_argument("--sheet-id", type=str, default=None, help="飞书工作表ID")
  297. parser.add_argument("--sort", type=str, default=None, help="排序: 字段:asc/desc")
  298. parser.add_argument("--cols", type=str, default=None, help="列映射: 原名:新名,...")
  299. args = parser.parse_args()
  300. # 解析 SQL 文件路径
  301. sql_file = Path(args.sql_file).resolve()
  302. if not sql_file.exists():
  303. print(f"错误: 找不到 {sql_file}")
  304. return
  305. # 加载飞书配置(优先级: 命令行 > {sql名}.json > sql目录/default.json > 根目录/default.json > 默认值)
  306. feishu_config = load_feishu_config(sql_file)
  307. if args.feishu == "__USE_CONFIG__":
  308. args.feishu = feishu_config["token"]
  309. elif args.feishu is None:
  310. pass # 未启用飞书上传
  311. # 命令行参数覆盖配置文件
  312. if args.sheet_id is None:
  313. args.sheet_id = feishu_config["sheet_id"]
  314. if args.sort is None:
  315. args.sort = feishu_config["sort"]
  316. if args.cols is None:
  317. args.cols = feishu_config["cols"]
  318. # 打印飞书配置
  319. if args.feishu:
  320. print(f"飞书配置: token={args.feishu}, sheet_id={args.sheet_id}, sort={args.sort}, cols={args.cols}")
  321. # 输出目录:SQL 同目录下的 output/SQL文件名/
  322. output_dir = sql_file.parent / "output"
  323. daily_dir = output_dir / sql_file.stem
  324. daily_dir.mkdir(parents=True, exist_ok=True)
  325. print(f"SQL文件: {sql_file}")
  326. print(f"数据目录: {daily_dir}")
  327. # 仅合并模式:不获取数据,直接合并已有文件
  328. if args.merge:
  329. existing_dates = get_existing_dates(daily_dir)
  330. print(f"已有数据: {len(existing_dates)}天")
  331. if existing_dates:
  332. merged_file = merge_csv_files(daily_dir)
  333. # 如果指定了飞书上传
  334. if args.feishu and merged_file:
  335. upload_to_feishu(merged_file, args.feishu, args.sheet_id, args.sort, args.cols)
  336. else:
  337. print("没有可合并的数据")
  338. return
  339. # 确定日期范围
  340. if args.date:
  341. target_dates = [args.date]
  342. elif args.start and args.end:
  343. target_dates = get_date_range(args.start, args.end)
  344. else:
  345. today = datetime.now()
  346. end_date = (today - timedelta(days=1)).strftime("%Y%m%d")
  347. start_date = (today - timedelta(days=args.days)).strftime("%Y%m%d")
  348. target_dates = get_date_range(start_date, end_date)
  349. print(f"目标日期: {target_dates[0]} ~ {target_dates[-1]} ({len(target_dates)}天)")
  350. # 检查已有数据
  351. existing_dates = get_existing_dates(daily_dir)
  352. print(f"已有数据: {len(existing_dates)}天")
  353. # 确定需要获取的日期
  354. if args.force:
  355. missing_dates = target_dates
  356. print(f"强制模式: 重新获取所有 {len(missing_dates)} 天")
  357. else:
  358. missing_dates = [d for d in target_dates if d not in existing_dates]
  359. print(f"需要获取: {len(missing_dates)}天")
  360. if not missing_dates:
  361. print("没有需要获取的数据,退出")
  362. return
  363. # 读取 SQL 模板
  364. sql_template = sql_file.read_text(encoding="utf-8")
  365. # 检测 SQL 中是否包含 ${dt} 变量
  366. has_dt_var = "${dt}" in sql_template
  367. # 重置计数器
  368. success_count = 0
  369. fail_count = 0
  370. # 如果 SQL 中没有 ${dt},只需执行一次
  371. if not has_dt_var:
  372. print("\n检测到 SQL 中不含 ${dt} 变量,只执行一次...")
  373. target_dates = ["20000101"] # 用虚拟日期
  374. missing_dates = target_dates
  375. output_file = output_dir / f"{sql_file.stem}.csv"
  376. output_file.parent.mkdir(parents=True, exist_ok=True)
  377. try:
  378. client = ODPSClient()
  379. if args.parallel > 0:
  380. client.execute_sql_result_save_file_parallel(sql_template, str(output_file), workers=args.parallel)
  381. else:
  382. client.execute_sql_result_save_file(sql_template, str(output_file))
  383. print(f"数据目录: {output_file}")
  384. # 如果指定了飞书上传
  385. if args.feishu and output_file.exists():
  386. upload_to_feishu(output_file, args.feishu, args.sheet_id, args.sort, args.cols)
  387. except Exception as e:
  388. print(f"✗ 执行失败: {e}")
  389. return
  390. # 并发获取
  391. print(f"目标日期: {target_dates[0]} ~ {target_dates[-1]} ({len(target_dates)}天)")
  392. workers = min(args.workers, len(missing_dates))
  393. if args.parallel > 0:
  394. print(f"\n开始获取 (天级并发: {workers}, 单天多线程: {args.parallel})...")
  395. else:
  396. print(f"\n开始获取 (并发数: {workers})...")
  397. with ThreadPoolExecutor(max_workers=workers) as executor:
  398. futures = {
  399. executor.submit(fetch_single_day, dt, sql_template, daily_dir, args.parallel): dt
  400. for dt in missing_dates
  401. }
  402. completed = 0
  403. for future in as_completed(futures):
  404. completed += 1
  405. dt, status, info = future.result()
  406. if status == "success":
  407. print(f" [{completed}/{len(missing_dates)}] ✓ {dt}: {info} 行")
  408. elif status == "empty":
  409. print(f" [{completed}/{len(missing_dates)}] ⚠ {dt}: 无数据")
  410. elif status == "error":
  411. print(f" [{completed}/{len(missing_dates)}] ✗ {dt}: {info}")
  412. else:
  413. print(f" [{completed}/{len(missing_dates)}] ✗ {dt}: 失败")
  414. print(f"\n完成! 成功: {success_count}, 失败: {fail_count}")
  415. print(f"数据目录: {daily_dir}")
  416. # 如果指定了飞书上传,先合并再上传
  417. if args.feishu:
  418. merged_file = merge_csv_files(daily_dir)
  419. if merged_file:
  420. upload_to_feishu(merged_file, args.feishu, args.sheet_id, args.sort, args.cols)
  421. if __name__ == "__main__":
  422. main()