fetch_daily.py 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680
  1. #!/usr/bin/env python
  2. # coding=utf-8
  3. """
  4. 按天增量获取数据 - 通用版本
  5. 支持并发获取,自动跳过已有数据
  6. 用法:
  7. python fetch_daily.py tasks/xxx/query.sql # 获取最近7天
  8. python fetch_daily.py tasks/xxx/query.sql --days 30 # 获取最近30天
  9. python fetch_daily.py tasks/xxx/query.sql --start 20260101 --end 20260107
  10. python fetch_daily.py tasks/xxx/query.sql --date 20260105 # 单天
  11. python fetch_daily.py tasks/xxx/query.sql --force # 强制重新获取
  12. python fetch_daily.py tasks/xxx/query.sql --workers 10 # 设置天级并发数
  13. python fetch_daily.py tasks/xxx/query.sql --parallel 50 # 单天多线程下载(默认50,大数据量推荐)
  14. python fetch_daily.py tasks/xxx/query.sql --parallel 0 # 关闭多线程,使用单线程下载
  15. python fetch_daily.py tasks/xxx/query.sql --feishu # 获取后上传到飞书表格
  16. python fetch_daily.py tasks/xxx/query.sql --feishu TOKEN # 指定飞书表格token
  17. python fetch_daily.py tasks/xxx/query.sql --merge --feishu # 仅合并并上传飞书
  18. python fetch_daily.py tasks/xxx/query.sql --config piaoquan_api # 切换 ODPS 配置
  19. """
  20. import argparse
  21. import sys
  22. from datetime import datetime, timedelta
  23. from pathlib import Path
  24. from concurrent.futures import ThreadPoolExecutor, as_completed
  25. import threading
  26. sys.path.insert(0, str(Path(__file__).parent / "lib"))
  27. from odps_module import ODPSClient
  28. import csv
  29. # 线程安全的计数器
  30. counter_lock = threading.Lock()
  31. success_count = 0
  32. fail_count = 0
  33. def get_existing_dates(daily_dir):
  34. """获取已下载的日期列表"""
  35. existing = set()
  36. if not daily_dir.exists():
  37. return existing
  38. for f in daily_dir.glob("*.csv"):
  39. try:
  40. dt = f.stem
  41. if len(dt) == 8 and dt.isdigit():
  42. existing.add(dt)
  43. except:
  44. pass
  45. return existing
  46. def merge_csv_files(daily_dir, output_file=None):
  47. """合并目录下所有日期 CSV 文件,只保留一个表头"""
  48. csv_files = sorted(daily_dir.glob("*.csv"))
  49. if not csv_files:
  50. print("没有找到 CSV 文件")
  51. return None
  52. if output_file is None:
  53. output_file = daily_dir.parent / f"{daily_dir.name}_merged.csv"
  54. with open(output_file, "w", encoding="utf-8") as out:
  55. header_written = False
  56. total_rows = 0
  57. for csv_file in csv_files:
  58. with open(csv_file, "r", encoding="utf-8") as f:
  59. lines = f.readlines()
  60. if not lines:
  61. continue
  62. if not header_written:
  63. out.write(lines[0])
  64. header_written = True
  65. for line in lines[1:]:
  66. out.write(line)
  67. total_rows += 1
  68. print(f"合并完成: {len(csv_files)} 个文件, {total_rows} 行数据")
  69. print(f"输出文件: {output_file}")
  70. return output_file
  71. def infer_column_types(rows):
  72. """推断每列的类型:int, float, 或 str"""
  73. if not rows:
  74. return []
  75. num_cols = len(rows[0])
  76. col_types = []
  77. for col_idx in range(num_cols):
  78. has_float = False
  79. all_numeric = True
  80. for row in rows:
  81. if col_idx >= len(row):
  82. continue
  83. v = row[col_idx].strip() if row[col_idx] else ""
  84. if not v: # 空值不影响类型判断
  85. continue
  86. try:
  87. if '.' in v or 'e' in v.lower():
  88. float(v)
  89. has_float = True
  90. else:
  91. int(v)
  92. except ValueError:
  93. all_numeric = False
  94. break
  95. if all_numeric:
  96. col_types.append('float' if has_float else 'int')
  97. else:
  98. col_types.append('str')
  99. return col_types
  100. def convert_row_by_types(row, col_types):
  101. """按列类型转换一行数据"""
  102. result = []
  103. for i, cell in enumerate(row):
  104. if i >= len(col_types):
  105. result.append(cell)
  106. continue
  107. v = cell.strip() if cell else ""
  108. if not v:
  109. result.append("")
  110. continue
  111. col_type = col_types[i]
  112. if col_type == 'int':
  113. result.append(int(v))
  114. elif col_type == 'float':
  115. result.append(float(v))
  116. else:
  117. result.append(cell)
  118. return result
  119. def load_feishu_config(sql_file):
  120. """加载飞书配置,优先级: {sql名}.json > sql目录/default.json > 根目录/default.json > 默认值"""
  121. import json
  122. defaults = {
  123. "token": "ONZqsxB9BhGH8tt90EScSJT5nHh",
  124. "sheet_id": None,
  125. "sort": "dt:desc",
  126. "cols": None,
  127. }
  128. root_dir = Path(__file__).parent
  129. sql_dir = sql_file.parent
  130. sql_name = sql_file.stem
  131. def load_json(path, name):
  132. if path.exists():
  133. try:
  134. with open(path, "r", encoding="utf-8") as f:
  135. defaults.update(json.load(f))
  136. except Exception as e:
  137. print(f"警告: 读取 {name} 失败: {e}")
  138. # 按优先级从低到高加载(后加载的覆盖先加载的)
  139. load_json(root_dir / "default.json", "根目录/default.json")
  140. load_json(sql_dir / "default.json", "sql目录/default.json")
  141. load_json(sql_dir / f"{sql_name}.json", f"{sql_name}.json")
  142. return defaults
  143. def parse_sort_spec(sort_spec):
  144. """解析排序规格,如 'dt:desc,name:asc' -> [('dt', True), ('name', False)]"""
  145. if not sort_spec:
  146. return []
  147. result = []
  148. for part in sort_spec.split(","):
  149. part = part.strip()
  150. if not part:
  151. continue
  152. if ":" in part:
  153. field, order = part.rsplit(":", 1)
  154. desc = order.lower() != "asc"
  155. else:
  156. field, desc = part, True # 默认逆序
  157. result.append((field.strip(), desc))
  158. return result
  159. def parse_cols_spec(cols_spec):
  160. """解析列映射规格,如 'dt:日期,name,value:数值' -> [('dt', '日期'), ('name', 'name'), ('value', '数值')]"""
  161. if not cols_spec:
  162. return []
  163. result = []
  164. for part in cols_spec.split(","):
  165. part = part.strip()
  166. if not part:
  167. continue
  168. if ":" in part:
  169. old_name, new_name = part.split(":", 1)
  170. result.append((old_name.strip(), new_name.strip()))
  171. else:
  172. result.append((part, part))
  173. return result
  174. def apply_cols_mapping(header, data_rows, cols_spec):
  175. """应用列映射:筛选、排序、重命名"""
  176. col_mapping = parse_cols_spec(cols_spec)
  177. if not col_mapping:
  178. return header, data_rows
  179. # 构建索引映射
  180. header_index = {name: i for i, name in enumerate(header)}
  181. new_header = []
  182. col_indices = []
  183. for old_name, new_name in col_mapping:
  184. if old_name in header_index:
  185. col_indices.append(header_index[old_name])
  186. new_header.append(new_name)
  187. else:
  188. print(f"警告: 字段 '{old_name}' 不存在,已跳过")
  189. if not col_indices:
  190. print("警告: 没有有效的列映射,保持原样")
  191. return header, data_rows
  192. # 应用映射
  193. new_rows = []
  194. for row in data_rows:
  195. new_row = [row[i] if i < len(row) else "" for i in col_indices]
  196. new_rows.append(new_row)
  197. print(f"列映射: {len(col_indices)} 列")
  198. return new_header, new_rows
  199. def column_index_to_letter(col_idx):
  200. """列索引转字母,如 1->A, 26->Z, 27->AA"""
  201. result = ""
  202. while col_idx > 0:
  203. col_idx, remainder = divmod(col_idx - 1, 26)
  204. result = chr(65 + remainder) + result
  205. return result
  206. def upload_to_feishu(csv_file, sheet_token, sheet_id=None, sort_spec="dt:desc", cols_spec=None):
  207. """上传 CSV 文件到飞书表格(通过模板行继承样式)
  208. 第1行: 表头
  209. 第2行: 样式模板(用于继承,最后删除)
  210. 第3行起: 数据
  211. Args:
  212. csv_file: CSV 文件路径
  213. sheet_token: 飞书表格 token
  214. sheet_id: 工作表 ID(None 时自动获取第一个)
  215. sort_spec: 排序规格,如 "dt:desc,name:asc"
  216. cols_spec: 列映射规格,如 "dt:日期,name,value:数值"
  217. """
  218. from feishu import Client, LARK_HOST, APP_ID, APP_SECRET, request
  219. # 读取 CSV
  220. with open(csv_file, "r", encoding="utf-8") as f:
  221. reader = csv.reader(f)
  222. rows = list(reader)
  223. if len(rows) < 2:
  224. print("CSV 文件为空,跳过上传")
  225. return
  226. header = rows[0]
  227. data_rows = rows[1:]
  228. # 排序(在列映射之前,使用原始列名)
  229. sort_fields = parse_sort_spec(sort_spec)
  230. if sort_fields:
  231. applied = []
  232. for field, desc in reversed(sort_fields):
  233. if field in header:
  234. idx = header.index(field)
  235. data_rows.sort(key=lambda row: row[idx] if idx < len(row) else "", reverse=desc)
  236. applied.append(f"{field}:{'desc' if desc else 'asc'}")
  237. if applied:
  238. print(f"排序: {', '.join(reversed(applied))}")
  239. # 列映射(排序之后)
  240. header, data_rows = apply_cols_mapping(header, data_rows, cols_spec)
  241. # 按列推断类型并转换
  242. col_types = infer_column_types(data_rows)
  243. converted_rows = [convert_row_by_types(row, col_types) for row in data_rows]
  244. # 初始化飞书客户端
  245. client = Client(LARK_HOST)
  246. access_token = client.get_tenant_access_token(APP_ID, APP_SECRET)
  247. # 获取 sheet_id
  248. if sheet_id is None:
  249. sheet_id = client.get_sheetid(access_token, sheet_token)
  250. print(f"Sheet ID: {sheet_id}")
  251. # 获取表格信息
  252. sheet_props = client.get_sheet_properties(access_token, sheet_token, sheet_id)
  253. current_cols = sheet_props['column_count'] if sheet_props else 26
  254. header_end_col = column_index_to_letter(current_cols)
  255. # 读取飞书表头(获取所有列)
  256. feishu_header = client.read_range_values(access_token, sheet_token, f"{sheet_id}!A1:{header_end_col}1")
  257. feishu_cols = []
  258. if feishu_header and feishu_header[0]:
  259. feishu_cols = [c for c in feishu_header[0] if c] # 过滤 None 和空字符串
  260. if feishu_cols:
  261. print(f"飞书表头: {feishu_cols}")
  262. print(f"CSV表头: {header}")
  263. # 校验字段一致性(警告但继续,以飞书表头为准)
  264. feishu_set = set(feishu_cols)
  265. csv_set = set(header)
  266. missing_in_csv = feishu_set - csv_set
  267. missing_in_feishu = csv_set - feishu_set
  268. if missing_in_csv:
  269. print(f"警告: CSV缺少字段(将填空值): {missing_in_csv}")
  270. if missing_in_feishu:
  271. print(f"警告: 飞书缺少字段(将忽略): {missing_in_feishu}")
  272. # 按飞书表头顺序重排数据
  273. csv_col_index = {name: i for i, name in enumerate(header)}
  274. new_converted_rows = []
  275. for row in converted_rows:
  276. new_row = []
  277. for col_name in feishu_cols:
  278. if col_name in csv_col_index:
  279. new_row.append(row[csv_col_index[col_name]])
  280. else:
  281. new_row.append("") # CSV缺少的字段填空
  282. new_converted_rows.append(new_row)
  283. converted_rows = new_converted_rows
  284. header = feishu_cols
  285. print(f"已按飞书表头顺序重排数据")
  286. else:
  287. # 飞书表头为空,用 CSV 表头写入
  288. print(f"飞书表头为空,使用 CSV 表头写入")
  289. header_range = f"{sheet_id}!A1:{column_index_to_letter(len(header))}1"
  290. client.batch_update_values(access_token, sheet_token, {
  291. "valueRanges": [{"range": header_range, "values": [header]}]
  292. })
  293. total_rows = len(converted_rows)
  294. num_cols = len(header)
  295. end_col = column_index_to_letter(num_cols)
  296. print(f"上传到飞书: {total_rows} 行数据")
  297. batch_size = 500
  298. # 获取当前行数(复用之前获取的 sheet_props)
  299. current_rows = sheet_props['row_count'] if sheet_props else 2
  300. print(f"当前行数: {current_rows}, 需要数据行: {total_rows}")
  301. headers = {
  302. 'Content-Type': 'application/json; charset=utf-8',
  303. 'Authorization': f'Bearer {access_token}'
  304. }
  305. # 第1步:删除旧数据行(保留第1行表头 + 第2行样式模板),分批删除
  306. if current_rows > 2:
  307. print(f"清理旧数据({current_rows - 2}行)...")
  308. rows_to_delete = current_rows - 2
  309. delete_batch = 5000
  310. while rows_to_delete > 0:
  311. # 每次从第3行开始删除,删除后行号会自动调整
  312. batch = min(rows_to_delete, delete_batch)
  313. try:
  314. client.delete_rows(access_token, sheet_token, sheet_id, 3, 2 + batch)
  315. rows_to_delete -= batch
  316. if rows_to_delete > 0:
  317. print(f" 已删除 {current_rows - 2 - rows_to_delete}/{current_rows - 2}")
  318. except Exception as e:
  319. print(f" 清理失败: {e}")
  320. break
  321. # 第2步:扩展表格容量(insert 不会自动扩展)
  322. # 删除后当前只有2行(表头+模板),需要扩展到 2 + total_rows 行
  323. needed_rows = 2 + total_rows
  324. add_url = f"{LARK_HOST}/open-apis/sheets/v2/spreadsheets/{sheet_token}/dimension_range"
  325. add_payload = {
  326. "dimension": {
  327. "sheetId": sheet_id,
  328. "majorDimension": "ROWS",
  329. "length": total_rows # 添加数据行数
  330. }
  331. }
  332. try:
  333. request("POST", add_url, headers, add_payload)
  334. print(f"扩展容量: +{total_rows} 行")
  335. except Exception as e:
  336. print(f" 扩展容量失败: {e}")
  337. # 第3步:分批插入空行(继承第2行样式)并写入数据
  338. print(f"插入并写入 {total_rows} 行...")
  339. insert_url = f"{LARK_HOST}/open-apis/sheets/v2/spreadsheets/{sheet_token}/insert_dimension_range"
  340. # 反向处理批次(从最后一批开始),因为每次都在第3行前插入
  341. batches = [converted_rows[i:i + batch_size] for i in range(0, total_rows, batch_size)]
  342. processed = 0
  343. for batch in reversed(batches):
  344. batch_count = len(batch)
  345. # 在第3行前插入空行(继承第2行样式)
  346. insert_payload = {
  347. "dimension": {
  348. "sheetId": sheet_id,
  349. "majorDimension": "ROWS",
  350. "startIndex": 2, # 0-indexed, 第3行位置
  351. "endIndex": 2 + batch_count
  352. },
  353. "inheritStyle": "BEFORE"
  354. }
  355. try:
  356. request("POST", insert_url, headers, insert_payload)
  357. except Exception as e:
  358. print(f" 插入行失败: {e}")
  359. break
  360. # 写入数据到插入的行(第3行开始)
  361. range_str = f"{sheet_id}!A3:{end_col}{2 + batch_count}"
  362. client.batch_update_values(access_token, sheet_token, {
  363. "valueRanges": [{"range": range_str, "values": batch}]
  364. })
  365. processed += batch_count
  366. print(f" 处理: {processed}/{total_rows}")
  367. # 第4步:删除末尾多余的空行(扩展容量时添加的)
  368. final_row_count = 2 + total_rows # 表头 + 模板 + 数据
  369. current_row_count = 2 + total_rows * 2 # 扩展 + 插入
  370. if current_row_count > final_row_count:
  371. print(f"清理多余空行...")
  372. try:
  373. client.delete_rows(access_token, sheet_token, sheet_id, final_row_count + 1, current_row_count)
  374. except Exception as e:
  375. print(f" 清理失败: {e}")
  376. # 第5步:删除模板行(第2行)
  377. print(f"删除模板行...")
  378. try:
  379. client.delete_rows(access_token, sheet_token, sheet_id, 2, 2)
  380. except Exception as e:
  381. print(f" 删除模板行失败: {e}")
  382. print(f"飞书上传完成: {sheet_token}")
  383. def get_date_range(start_str, end_str):
  384. """生成日期范围列表"""
  385. start = datetime.strptime(start_str, "%Y%m%d")
  386. end = datetime.strptime(end_str, "%Y%m%d")
  387. dates = []
  388. current = start
  389. while current <= end:
  390. dates.append(current.strftime("%Y%m%d"))
  391. current += timedelta(days=1)
  392. return dates
  393. def fetch_single_day(dt, sql_template, daily_dir, parallel_threads=0, config="default"):
  394. """获取单天数据"""
  395. global success_count, fail_count
  396. try:
  397. client = ODPSClient(config=config)
  398. sql = sql_template.replace("${dt}", dt)
  399. output_file = daily_dir / f"{dt}.csv"
  400. # 下载到文件
  401. if parallel_threads > 0:
  402. # 多线程并行下载(适合大数据量)
  403. client.execute_sql_result_save_file_parallel(sql, str(output_file), workers=parallel_threads)
  404. else:
  405. # 单线程下载
  406. client.execute_sql_result_save_file(sql, str(output_file))
  407. # 检查结果
  408. if output_file.exists():
  409. row_count = sum(1 for _ in open(output_file)) - 1 # 减去表头
  410. with counter_lock:
  411. success_count += 1
  412. if row_count > 0:
  413. return (dt, "success", row_count)
  414. else:
  415. return (dt, "empty", 0)
  416. else:
  417. with counter_lock:
  418. fail_count += 1
  419. return (dt, "fail", 0)
  420. except Exception as e:
  421. with counter_lock:
  422. fail_count += 1
  423. return (dt, "error", str(e))
  424. def main():
  425. global success_count, fail_count
  426. parser = argparse.ArgumentParser(description="按天增量获取数据")
  427. parser.add_argument("sql_file", type=str, help="SQL文件路径")
  428. parser.add_argument("--days", type=int, default=7, help="获取最近N天 (默认7)")
  429. parser.add_argument("--start", type=str, help="开始日期 YYYYMMDD")
  430. parser.add_argument("--end", type=str, help="结束日期 YYYYMMDD")
  431. parser.add_argument("--date", type=str, help="单天日期 YYYYMMDD")
  432. parser.add_argument("--force", action="store_true", help="强制重新获取")
  433. parser.add_argument("--workers", type=int, default=5, help="天级并发数 (默认5)")
  434. parser.add_argument("--parallel", type=int, default=50, help="单天多线程下载 (默认50, 大数据量推荐)")
  435. parser.add_argument("--merge", action="store_true", help="合并所有日期数据到一个文件")
  436. parser.add_argument("--feishu", nargs="?", const="__USE_CONFIG__",
  437. help="上传到飞书表格")
  438. parser.add_argument("--sheet-id", type=str, default=None, help="飞书工作表ID")
  439. parser.add_argument("--sort", type=str, default=None, help="排序: 字段:asc/desc")
  440. parser.add_argument("--cols", type=str, default=None, help="列映射: 原名:新名,...")
  441. parser.add_argument("--config", type=str, default="default", help="ODPS配置: default 或 piaoquan_api")
  442. args = parser.parse_args()
  443. # 解析 SQL 文件路径
  444. sql_file = Path(args.sql_file).resolve()
  445. if not sql_file.exists():
  446. print(f"错误: 找不到 {sql_file}")
  447. return
  448. # 加载飞书配置(优先级: 命令行 > {sql名}.json > sql目录/default.json > 根目录/default.json > 默认值)
  449. feishu_config = load_feishu_config(sql_file)
  450. if args.feishu == "__USE_CONFIG__":
  451. args.feishu = feishu_config["token"]
  452. elif args.feishu is None:
  453. pass # 未启用飞书上传
  454. # 命令行参数覆盖配置文件
  455. if args.sheet_id is None:
  456. args.sheet_id = feishu_config["sheet_id"]
  457. if args.sort is None:
  458. args.sort = feishu_config["sort"]
  459. if args.cols is None:
  460. args.cols = feishu_config["cols"]
  461. # 打印飞书配置
  462. if args.feishu:
  463. print(f"飞书配置: token={args.feishu}, sheet_id={args.sheet_id}, sort={args.sort}, cols={args.cols}")
  464. # 输出目录:SQL 同目录下的 output/SQL文件名/
  465. output_dir = sql_file.parent / "output"
  466. daily_dir = output_dir / sql_file.stem
  467. daily_dir.mkdir(parents=True, exist_ok=True)
  468. print(f"SQL文件: {sql_file}")
  469. print(f"数据目录: {daily_dir}")
  470. # 仅合并模式:不获取数据,直接合并已有文件
  471. if args.merge:
  472. existing_dates = get_existing_dates(daily_dir)
  473. print(f"已有数据: {len(existing_dates)}天")
  474. if existing_dates:
  475. merged_file = merge_csv_files(daily_dir)
  476. # 如果指定了飞书上传
  477. if args.feishu and merged_file:
  478. upload_to_feishu(merged_file, args.feishu, args.sheet_id, args.sort, args.cols)
  479. else:
  480. print("没有可合并的数据")
  481. return
  482. # 确定日期范围
  483. if args.date:
  484. target_dates = [args.date]
  485. elif args.start and args.end:
  486. target_dates = get_date_range(args.start, args.end)
  487. else:
  488. today = datetime.now()
  489. end_date = (today - timedelta(days=1)).strftime("%Y%m%d")
  490. start_date = (today - timedelta(days=args.days)).strftime("%Y%m%d")
  491. target_dates = get_date_range(start_date, end_date)
  492. print(f"目标日期: {target_dates[0]} ~ {target_dates[-1]} ({len(target_dates)}天)")
  493. # 检查已有数据
  494. existing_dates = get_existing_dates(daily_dir)
  495. print(f"已有数据: {len(existing_dates)}天")
  496. # 确定需要获取的日期
  497. if args.force:
  498. missing_dates = target_dates
  499. print(f"强制模式: 重新获取所有 {len(missing_dates)} 天")
  500. else:
  501. missing_dates = [d for d in target_dates if d not in existing_dates]
  502. print(f"需要获取: {len(missing_dates)}天")
  503. if not missing_dates:
  504. print("没有需要获取的数据,退出")
  505. return
  506. # 读取 SQL 模板
  507. sql_template = sql_file.read_text(encoding="utf-8")
  508. # 检测 SQL 中是否包含 ${dt} 变量
  509. has_dt_var = "${dt}" in sql_template
  510. # 重置计数器
  511. success_count = 0
  512. fail_count = 0
  513. # 如果 SQL 中没有 ${dt},只需执行一次
  514. if not has_dt_var:
  515. print("\n检测到 SQL 中不含 ${dt} 变量,只执行一次...")
  516. target_dates = ["20000101"] # 用虚拟日期
  517. missing_dates = target_dates
  518. output_file = output_dir / f"{sql_file.stem}.csv"
  519. output_file.parent.mkdir(parents=True, exist_ok=True)
  520. try:
  521. client = ODPSClient(config=args.config)
  522. if args.parallel > 0:
  523. client.execute_sql_result_save_file_parallel(sql_template, str(output_file), workers=args.parallel)
  524. else:
  525. client.execute_sql_result_save_file(sql_template, str(output_file))
  526. print(f"数据目录: {output_file}")
  527. # 如果指定了飞书上传
  528. if args.feishu and output_file.exists():
  529. upload_to_feishu(output_file, args.feishu, args.sheet_id, args.sort, args.cols)
  530. except Exception as e:
  531. print(f"✗ 执行失败: {e}")
  532. return
  533. # 并发获取
  534. print(f"目标日期: {target_dates[0]} ~ {target_dates[-1]} ({len(target_dates)}天)")
  535. workers = min(args.workers, len(missing_dates))
  536. if args.parallel > 0:
  537. print(f"\n开始获取 (天级并发: {workers}, 单天多线程: {args.parallel})...")
  538. else:
  539. print(f"\n开始获取 (并发数: {workers})...")
  540. with ThreadPoolExecutor(max_workers=workers) as executor:
  541. futures = {
  542. executor.submit(fetch_single_day, dt, sql_template, daily_dir, args.parallel, args.config): dt
  543. for dt in missing_dates
  544. }
  545. completed = 0
  546. for future in as_completed(futures):
  547. completed += 1
  548. dt, status, info = future.result()
  549. if status == "success":
  550. print(f" [{completed}/{len(missing_dates)}] ✓ {dt}: {info} 行")
  551. elif status == "empty":
  552. print(f" [{completed}/{len(missing_dates)}] ⚠ {dt}: 无数据")
  553. elif status == "error":
  554. print(f" [{completed}/{len(missing_dates)}] ✗ {dt}: {info}")
  555. else:
  556. print(f" [{completed}/{len(missing_dates)}] ✗ {dt}: 失败")
  557. print(f"\n完成! 成功: {success_count}, 失败: {fail_count}")
  558. print(f"数据目录: {daily_dir}")
  559. # 如果指定了飞书上传,先合并再上传
  560. if args.feishu:
  561. merged_file = merge_csv_files(daily_dir)
  562. if merged_file:
  563. upload_to_feishu(merged_file, args.feishu, args.sheet_id, args.sort, args.cols)
  564. if __name__ == "__main__":
  565. main()