fetch_daily.py 35 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919
  1. #!/usr/bin/env python
  2. # coding=utf-8
  3. """
  4. 按天增量获取数据 - 通用版本
  5. 支持并发获取,自动跳过已有数据
  6. 用法:
  7. python fetch_daily.py tasks/xxx/query.sql # 获取最近7天
  8. python fetch_daily.py tasks/xxx/query.sql --days 30 # 获取最近30天
  9. python fetch_daily.py tasks/xxx/query.sql --start 20260101 --end 20260107
  10. python fetch_daily.py tasks/xxx/query.sql --date 20260105 # 单天
  11. python fetch_daily.py tasks/xxx/query.sql --date 20260105 --hh 08 # 单天单小时
  12. python fetch_daily.py tasks/xxx/query.sql --force # 强制重新获取
  13. python fetch_daily.py tasks/xxx/query.sql --workers 10 # 设置天级并发数
  14. python fetch_daily.py tasks/xxx/query.sql --parallel 50 # 单天多线程下载(默认50,大数据量推荐)
  15. python fetch_daily.py tasks/xxx/query.sql --parallel 0 # 关闭多线程,使用单线程下载
  16. python fetch_daily.py tasks/xxx/query.sql --feishu # 获取后上传到飞书表格
  17. python fetch_daily.py tasks/xxx/query.sql --feishu TOKEN # 指定飞书表格token
  18. python fetch_daily.py tasks/xxx/query.sql --merge --feishu # 仅合并并上传飞书
  19. python fetch_daily.py tasks/xxx/query.sql --config piaoquan_api # 切换 ODPS 配置
  20. """
  21. import argparse
  22. import sys
  23. from datetime import datetime, timedelta
  24. from pathlib import Path
  25. from concurrent.futures import ThreadPoolExecutor, as_completed
  26. import threading
  27. sys.path.insert(0, str(Path(__file__).parent / "lib"))
  28. from odps_module import ODPSClient
  29. import csv
  30. # 线程安全的计数器
  31. counter_lock = threading.Lock()
  32. success_count = 0
  33. fail_count = 0
  34. def get_existing_dates(daily_dir, hh=None):
  35. """获取已下载的日期列表(可选指定小时)"""
  36. existing = set()
  37. if not daily_dir.exists():
  38. return existing
  39. for f in daily_dir.glob("*.csv"):
  40. try:
  41. stem = f.stem
  42. if hh is not None:
  43. # 带小时格式:20250101_08
  44. if len(stem) == 11 and stem[8] == '_':
  45. dt = stem[:8]
  46. file_hh = stem[9:11]
  47. if dt.isdigit() and file_hh == hh:
  48. existing.add(dt)
  49. else:
  50. # 仅日期格式:20250101
  51. if len(stem) == 8 and stem.isdigit():
  52. existing.add(stem)
  53. except:
  54. pass
  55. return existing
  56. def merge_csv_files(daily_dir, output_file=None):
  57. """合并目录下所有日期 CSV 文件,自动对齐不同表头(新增列用空值填充)"""
  58. csv_files = sorted(daily_dir.glob("*.csv"))
  59. if not csv_files:
  60. print("没有找到 CSV 文件")
  61. return None
  62. if output_file is None:
  63. output_file = daily_dir.parent / f"{daily_dir.name}_merged.csv"
  64. # 第一遍:收集所有列名,按首次出现顺序保留
  65. all_columns = []
  66. col_set = set()
  67. for csv_file in csv_files:
  68. with open(csv_file, "r", encoding="utf-8") as f:
  69. reader = csv.reader(f)
  70. try:
  71. file_header = next(reader)
  72. except StopIteration:
  73. continue
  74. for col in file_header:
  75. if col not in col_set:
  76. all_columns.append(col)
  77. col_set.add(col)
  78. if not all_columns:
  79. print("没有找到有效的表头")
  80. return None
  81. # 第二遍:按统一表头写出数据
  82. with open(output_file, "w", encoding="utf-8", newline="") as out:
  83. writer = csv.writer(out)
  84. writer.writerow(all_columns)
  85. total_rows = 0
  86. for csv_file in csv_files:
  87. with open(csv_file, "r", encoding="utf-8", newline="") as f:
  88. reader = csv.reader(f)
  89. try:
  90. file_header = next(reader)
  91. except StopIteration:
  92. continue
  93. # 构建当前文件列名 → 索引的映射
  94. col_index = {col: i for i, col in enumerate(file_header)}
  95. for row in reader:
  96. aligned_row = [row[col_index[col]] if col in col_index and col_index[col] < len(row) else "" for col in all_columns]
  97. writer.writerow(aligned_row)
  98. total_rows += 1
  99. print(f"合并完成: {len(csv_files)} 个文件, {total_rows} 行数据, {len(all_columns)} 列")
  100. print(f"输出文件: {output_file}")
  101. return output_file
  102. def infer_column_types(rows):
  103. """推断每列的类型:int, float, 或 str"""
  104. if not rows:
  105. return []
  106. num_cols = len(rows[0])
  107. col_types = []
  108. for col_idx in range(num_cols):
  109. has_float = False
  110. all_numeric = True
  111. for row in rows:
  112. if col_idx >= len(row):
  113. continue
  114. v = row[col_idx].strip() if row[col_idx] else ""
  115. if not v: # 空值不影响类型判断
  116. continue
  117. try:
  118. if '.' in v or 'e' in v.lower():
  119. float(v)
  120. has_float = True
  121. else:
  122. int(v)
  123. except ValueError:
  124. all_numeric = False
  125. break
  126. if all_numeric:
  127. col_types.append('float' if has_float else 'int')
  128. else:
  129. col_types.append('str')
  130. return col_types
  131. def convert_row_by_types(row, col_types):
  132. """按列类型转换一行数据"""
  133. result = []
  134. for i, cell in enumerate(row):
  135. if i >= len(col_types):
  136. result.append(cell)
  137. continue
  138. v = cell.strip() if cell else ""
  139. if not v:
  140. result.append("")
  141. continue
  142. col_type = col_types[i]
  143. if col_type == 'int':
  144. result.append(int(v))
  145. elif col_type == 'float':
  146. result.append(float(v))
  147. else:
  148. result.append(cell)
  149. return result
  150. def load_feishu_config(sql_file):
  151. """加载飞书配置,优先级: {sql名}.json > sql目录/default.json > 根目录/default.json > 默认值"""
  152. import json
  153. defaults = {
  154. "token": "ONZqsxB9BhGH8tt90EScSJT5nHh",
  155. "sheet_id": None,
  156. "sort": "dt:desc",
  157. "cols": None,
  158. "filter": None,
  159. "limit": None,
  160. "append_cols": False,
  161. "order": None, # 自定义列值顺序,如 {"group": ["5d", "01", "34"]}
  162. }
  163. root_dir = Path(__file__).parent
  164. sql_dir = sql_file.parent
  165. sql_name = sql_file.stem
  166. def load_json(path, name):
  167. if path.exists():
  168. try:
  169. with open(path, "r", encoding="utf-8") as f:
  170. defaults.update(json.load(f))
  171. except Exception as e:
  172. print(f"警告: 读取 {name} 失败: {e}")
  173. # 按优先级从低到高加载(后加载的覆盖先加载的)
  174. load_json(root_dir / "default.json", "根目录/default.json")
  175. load_json(sql_dir / "default.json", "sql目录/default.json")
  176. load_json(sql_dir / f"{sql_name}.json", f"{sql_name}.json")
  177. return defaults
  178. def make_custom_order_key(idx, custom_order):
  179. """为自定义顺序排序构造 key 函数。
  180. 白名单内的值严格按 custom_order 指定顺序排列;
  181. 白名单外的值统一放到末尾,未知值之间按字典序稳定排列。
  182. Args:
  183. idx: 目标列在 header 中的索引
  184. custom_order: 期望的值顺序列表,如 ["5d", "01", "34"]
  185. Returns:
  186. 一个 row -> sortable 的 key 函数,供 list.sort(key=...) 使用
  187. """
  188. order_map = {str(v): i for i, v in enumerate(custom_order)}
  189. fallback = len(custom_order)
  190. def key_fn(row):
  191. v = row[idx] if idx < len(row) else ""
  192. return (order_map.get(v, fallback), v)
  193. return key_fn
  194. def parse_sort_spec(sort_spec):
  195. """解析排序规格,如 'dt:desc,name:asc' -> [('dt', True), ('name', False)]"""
  196. if not sort_spec:
  197. return []
  198. result = []
  199. for part in sort_spec.split(","):
  200. part = part.strip()
  201. if not part:
  202. continue
  203. if ":" in part:
  204. field, order = part.rsplit(":", 1)
  205. desc = order.lower() != "asc"
  206. else:
  207. field, desc = part, True # 默认逆序
  208. result.append((field.strip(), desc))
  209. return result
  210. def parse_cols_spec(cols_spec):
  211. """解析列映射规格,如 'dt:日期,name,value:数值' -> [('dt', '日期'), ('name', 'name'), ('value', '数值')]"""
  212. if not cols_spec:
  213. return []
  214. result = []
  215. for part in cols_spec.split(","):
  216. part = part.strip()
  217. if not part:
  218. continue
  219. if ":" in part:
  220. old_name, new_name = part.split(":", 1)
  221. result.append((old_name.strip(), new_name.strip()))
  222. else:
  223. result.append((part, part))
  224. return result
  225. def apply_cols_mapping(header, data_rows, cols_spec):
  226. """应用列映射:筛选、排序、重命名"""
  227. col_mapping = parse_cols_spec(cols_spec)
  228. if not col_mapping:
  229. return header, data_rows
  230. # 构建索引映射
  231. header_index = {name: i for i, name in enumerate(header)}
  232. new_header = []
  233. col_indices = []
  234. for old_name, new_name in col_mapping:
  235. if old_name in header_index:
  236. col_indices.append(header_index[old_name])
  237. new_header.append(new_name)
  238. else:
  239. print(f"警告: 字段 '{old_name}' 不存在,已跳过")
  240. if not col_indices:
  241. print("警告: 没有有效的列映射,保持原样")
  242. return header, data_rows
  243. # 应用映射
  244. new_rows = []
  245. for row in data_rows:
  246. new_row = [row[i] if i < len(row) else "" for i in col_indices]
  247. new_rows.append(new_row)
  248. print(f"列映射: {len(col_indices)} 列")
  249. return new_header, new_rows
  250. def column_index_to_letter(col_idx):
  251. """列索引转字母,如 1->A, 26->Z, 27->AA"""
  252. result = ""
  253. while col_idx > 0:
  254. col_idx, remainder = divmod(col_idx - 1, 26)
  255. result = chr(65 + remainder) + result
  256. return result
  257. def upload_to_feishu(csv_file, sheet_token, sheet_id=None, sort_spec="dt:desc", cols_spec=None, filter_spec=None, limit=None, append_cols=False, order_spec=None):
  258. """上传 CSV 文件到飞书表格(通过模板行继承样式)
  259. 第1行: 表头
  260. 第2行: 样式模板(用于继承,最后删除)
  261. 第3行起: 数据
  262. Args:
  263. csv_file: CSV 文件路径
  264. sheet_token: 飞书表格 token
  265. sheet_id: 工作表 ID(None 时自动获取第一个)
  266. sort_spec: 排序规格,如 "dt:desc,name:asc"
  267. cols_spec: 列映射规格,如 "dt:日期,name,value:数值"
  268. filter_spec: 过滤条件,dict {"字段": "值"} 或 str "字段=值,字段=值"
  269. limit: 上传行数上限
  270. append_cols: 是否将飞书中没有的新列追加到右侧(默认 False 忽略)
  271. order_spec: 自定义列值顺序,dict {字段: [值1, 值2, ...]}
  272. """
  273. from feishu import Client, LARK_HOST, APP_ID, APP_SECRET, request
  274. # 读取 CSV
  275. with open(csv_file, "r", encoding="utf-8") as f:
  276. reader = csv.reader(f)
  277. rows = list(reader)
  278. if len(rows) < 2:
  279. print("CSV 文件为空,跳过上传")
  280. return
  281. header = rows[0]
  282. data_rows = rows[1:]
  283. # 排序(在列映射之前,使用原始列名)
  284. sort_fields = parse_sort_spec(sort_spec)
  285. if sort_fields:
  286. applied = []
  287. for field, desc in reversed(sort_fields):
  288. if field in header:
  289. idx = header.index(field)
  290. if order_spec and field in order_spec:
  291. # 自定义顺序排序(asc/desc 被忽略)
  292. custom_order = order_spec[field]
  293. data_rows.sort(key=make_custom_order_key(idx, custom_order))
  294. applied.append(f"{field}:custom({len(custom_order)})")
  295. else:
  296. data_rows.sort(key=lambda row: row[idx] if idx < len(row) else "", reverse=desc)
  297. applied.append(f"{field}:{'desc' if desc else 'asc'}")
  298. if applied:
  299. print(f"排序: {', '.join(reversed(applied))}")
  300. # 过滤(排序之后)
  301. if filter_spec:
  302. # 支持 dict(来自 JSON 配置)或 str(来自命令行 "字段=值,字段!=值")
  303. if isinstance(filter_spec, str):
  304. filters = []
  305. for part in filter_spec.split(","):
  306. if "!=" in part:
  307. k, v = part.split("!=", 1)
  308. filters.append((k.strip(), v.strip(), "!="))
  309. elif "=" in part:
  310. k, v = part.split("=", 1)
  311. filters.append((k.strip(), v.strip(), "=="))
  312. elif isinstance(filter_spec, dict):
  313. filters = [(k, v, "==") for k, v in filter_spec.items()]
  314. before_count = len(data_rows)
  315. for field, value, op in filters:
  316. if field in header:
  317. idx = header.index(field)
  318. if op == "!=":
  319. data_rows = [row for row in data_rows if idx < len(row) and row[idx] != str(value)]
  320. else:
  321. data_rows = [row for row in data_rows if idx < len(row) and row[idx] == str(value)]
  322. print(f"过滤: {filters} → {before_count} → {len(data_rows)} 行")
  323. # limit(过滤之后)
  324. if limit and len(data_rows) > limit:
  325. print(f"限制行数: {len(data_rows)} → {limit}")
  326. data_rows = data_rows[:limit]
  327. # 列映射(排序之后)
  328. header, data_rows = apply_cols_mapping(header, data_rows, cols_spec)
  329. # 按列推断类型并转换
  330. col_types = infer_column_types(data_rows)
  331. converted_rows = [convert_row_by_types(row, col_types) for row in data_rows]
  332. # 初始化飞书客户端
  333. client = Client(LARK_HOST)
  334. access_token = client.get_tenant_access_token(APP_ID, APP_SECRET)
  335. # 获取 sheet_id
  336. if sheet_id is None:
  337. sheet_id = client.get_sheetid(access_token, sheet_token)
  338. print(f"Sheet ID: {sheet_id}")
  339. # 获取表格信息
  340. sheet_props = client.get_sheet_properties(access_token, sheet_token, sheet_id)
  341. current_cols = sheet_props['column_count'] if sheet_props else 26
  342. header_end_col = column_index_to_letter(current_cols)
  343. # 扩展列数(CSV 列数超过当前 sheet 列数时)
  344. num_csv_cols = len(header)
  345. if num_csv_cols > current_cols:
  346. add_cols = num_csv_cols - current_cols
  347. expand_headers = {
  348. 'Content-Type': 'application/json; charset=utf-8',
  349. 'Authorization': f'Bearer {access_token}'
  350. }
  351. expand_payload = {
  352. "dimension": {
  353. "sheetId": sheet_id,
  354. "majorDimension": "COLUMNS",
  355. "length": add_cols
  356. }
  357. }
  358. try:
  359. request("POST", f"{LARK_HOST}/open-apis/sheets/v2/spreadsheets/{sheet_token}/dimension_range",
  360. expand_headers, expand_payload)
  361. print(f"扩展列数: {current_cols} -> {num_csv_cols} (+{add_cols}列)")
  362. current_cols = num_csv_cols
  363. header_end_col = column_index_to_letter(current_cols)
  364. except Exception as e:
  365. print(f" 扩展列数失败: {e}")
  366. # 读取飞书表头(获取所有列)
  367. feishu_header = client.read_range_values(access_token, sheet_token, f"{sheet_id}!A1:{header_end_col}1")
  368. feishu_cols = []
  369. if feishu_header and feishu_header[0]:
  370. feishu_cols = [c for c in feishu_header[0] if c] # 过滤 None 和空字符串
  371. # 富文本列转纯文本(飞书表头可能含带链接的 list 结构)
  372. def _col_to_str(col):
  373. if isinstance(col, list):
  374. return "".join(item.get("text", "") for item in col if isinstance(item, dict))
  375. return col
  376. if feishu_cols:
  377. feishu_cols_str = [_col_to_str(c) for c in feishu_cols]
  378. print(f"飞书表头: {feishu_cols_str}")
  379. print(f"CSV表头: {header}")
  380. # 校验字段一致性
  381. feishu_set = set(feishu_cols_str)
  382. csv_set = set(header)
  383. missing_in_csv = feishu_set - csv_set
  384. missing_in_feishu = csv_set - feishu_set
  385. if missing_in_csv:
  386. print(f"警告: CSV缺少字段(将填空值): {missing_in_csv}")
  387. if missing_in_feishu:
  388. if append_cols:
  389. print(f"新增列(将追加到右侧): {missing_in_feishu}")
  390. else:
  391. print(f"警告: 飞书缺少字段(将忽略): {missing_in_feishu}")
  392. # 确定最终列顺序:飞书已有列 + (可选) CSV新增列
  393. final_col_names = list(feishu_cols_str)
  394. append_col_names = []
  395. if append_cols and missing_in_feishu:
  396. # 按 CSV 中的原始顺序追加新列
  397. append_col_names = [c for c in header if c in missing_in_feishu]
  398. final_col_names.extend(append_col_names)
  399. # 按最终列顺序重排数据
  400. csv_col_index = {name: i for i, name in enumerate(header)}
  401. new_converted_rows = []
  402. for row in converted_rows:
  403. new_row = []
  404. for col_name in final_col_names:
  405. if col_name in csv_col_index:
  406. new_row.append(row[csv_col_index[col_name]])
  407. else:
  408. new_row.append("") # CSV缺少的字段填空
  409. new_converted_rows.append(new_row)
  410. converted_rows = new_converted_rows
  411. # 写入新增列的表头到飞书
  412. if append_col_names:
  413. # 先扩展列数
  414. add_cols = len(append_col_names)
  415. expand_headers = {
  416. 'Content-Type': 'application/json; charset=utf-8',
  417. 'Authorization': f'Bearer {access_token}'
  418. }
  419. expand_payload = {
  420. "dimension": {
  421. "sheetId": sheet_id,
  422. "majorDimension": "COLUMNS",
  423. "length": add_cols
  424. }
  425. }
  426. try:
  427. request("POST", f"{LARK_HOST}/open-apis/sheets/v2/spreadsheets/{sheet_token}/dimension_range",
  428. expand_headers, expand_payload)
  429. current_cols += add_cols
  430. print(f"扩展列数: +{add_cols}列(追加新字段)")
  431. except Exception as e:
  432. print(f" 扩展列数失败: {e}")
  433. # 写入新列表头
  434. start_col_idx = len(feishu_cols_str) + 1
  435. start_col = column_index_to_letter(start_col_idx)
  436. end_col = column_index_to_letter(start_col_idx + add_cols - 1)
  437. append_range = f"{sheet_id}!{start_col}1:{end_col}1"
  438. client.batch_update_values(access_token, sheet_token, {
  439. "valueRanges": [{"range": append_range, "values": [append_col_names]}]
  440. })
  441. print(f"已写入新列表头: {append_col_names}")
  442. # header 使用飞书原始表头 + 新增列名
  443. header = list(feishu_cols) + append_col_names
  444. else:
  445. header = feishu_cols
  446. print(f"已按飞书表头顺序重排数据")
  447. else:
  448. # 飞书表头为空,用 CSV 表头写入(飞书单次最多写100列,需分批)
  449. print(f"飞书表头为空,使用 CSV 表头写入")
  450. col_batch = 100
  451. for start in range(0, len(header), col_batch):
  452. end = min(start + col_batch, len(header))
  453. start_col = column_index_to_letter(start + 1)
  454. end_col = column_index_to_letter(end)
  455. batch_range = f"{sheet_id}!{start_col}1:{end_col}1"
  456. client.batch_update_values(access_token, sheet_token, {
  457. "valueRanges": [{"range": batch_range, "values": [header[start:end]]}]
  458. })
  459. total_rows = len(converted_rows)
  460. num_cols = len(header)
  461. end_col = column_index_to_letter(num_cols)
  462. # 飞书单 sheet 上限 5,000,000 cells,预留表头+模板行
  463. CELL_LIMIT = 5_000_000
  464. max_data_rows = (CELL_LIMIT // num_cols) - 2
  465. if total_rows > max_data_rows:
  466. print(f"⚠ 飞书 cell 上限 {CELL_LIMIT:,}({num_cols}列 × {max_data_rows}行),截断 {total_rows} → {max_data_rows} 行")
  467. converted_rows = converted_rows[:max_data_rows]
  468. total_rows = max_data_rows
  469. print(f"上传到飞书: {total_rows} 行数据")
  470. batch_size = 500
  471. # 获取当前行数(复用之前获取的 sheet_props)
  472. current_rows = sheet_props['row_count'] if sheet_props else 2
  473. print(f"当前行数: {current_rows}, 需要数据行: {total_rows}")
  474. headers = {
  475. 'Content-Type': 'application/json; charset=utf-8',
  476. 'Authorization': f'Bearer {access_token}'
  477. }
  478. # 判断是否有模板行(第2行)
  479. has_template = current_rows >= 2
  480. data_start = 3 if has_template else 2
  481. keep_rows = 2 if has_template else 1
  482. # 第1步:删除旧数据行(保留表头 + 模板行(如有)),分批删除
  483. if current_rows > keep_rows:
  484. rows_to_delete = current_rows - keep_rows
  485. print(f"清理旧数据({rows_to_delete}行)...")
  486. delete_batch = 5000
  487. while rows_to_delete > 0:
  488. batch = min(rows_to_delete, delete_batch)
  489. try:
  490. client.delete_rows(access_token, sheet_token, sheet_id, data_start, data_start - 1 + batch)
  491. rows_to_delete -= batch
  492. if rows_to_delete > 0:
  493. print(f" 已删除 {current_rows - keep_rows - rows_to_delete}/{current_rows - keep_rows}")
  494. except Exception as e:
  495. print(f" 清理失败: {e}")
  496. break
  497. # 第2步:准备空行
  498. if has_template:
  499. # 有模板行:先扩展占位行(使 endIndex 不超过 sheetMaxRowCount),再 insert 继承样式
  500. insert_batch = 5000
  501. remaining = total_rows
  502. inserted = 0
  503. while remaining > 0:
  504. chunk = min(remaining, insert_batch)
  505. try:
  506. # 先扩展占位行(dimension_range POST 无 endIndex 限制)
  507. client.append_empty_rows(access_token, sheet_token, sheet_id, chunk)
  508. # 再 insert 带样式的行(此时 sheet 行数已足够大)
  509. client.insert_rows_before(access_token, sheet_token, sheet_id,
  510. data_start + inserted, chunk,
  511. inherit_style="BEFORE")
  512. inserted += chunk
  513. remaining -= chunk
  514. except Exception as e:
  515. print(f" 插入行失败(已插入{inserted}): {e}")
  516. break
  517. if inserted > 0:
  518. print(f"插入行(继承模板样式): +{inserted} 行")
  519. else:
  520. # 无模板行:用 dimension_range POST 扩展(无样式继承)
  521. add_url = f"{LARK_HOST}/open-apis/sheets/v2/spreadsheets/{sheet_token}/dimension_range"
  522. expand_batch = 5000
  523. remaining = total_rows
  524. expanded = 0
  525. while remaining > 0:
  526. chunk = min(remaining, expand_batch)
  527. add_payload = {
  528. "dimension": {
  529. "sheetId": sheet_id,
  530. "majorDimension": "ROWS",
  531. "length": chunk
  532. }
  533. }
  534. try:
  535. request("POST", add_url, headers, add_payload)
  536. expanded += chunk
  537. remaining -= chunk
  538. except Exception as e:
  539. print(f" 扩展容量失败(已扩展{expanded}): {e}")
  540. break
  541. if expanded > 0:
  542. print(f"扩展容量: +{expanded} 行")
  543. # 第3步:分批写入数据
  544. print(f"写入 {total_rows} 行...")
  545. batches = [converted_rows[i:i + batch_size] for i in range(0, total_rows, batch_size)]
  546. processed = 0
  547. for i, batch in enumerate(batches):
  548. batch_count = len(batch)
  549. start_row = data_start + i * batch_size
  550. # 写入数据(飞书单次最多100列,需按列分批)
  551. col_batch = 100
  552. value_ranges = []
  553. for col_start in range(0, num_cols, col_batch):
  554. col_end = min(col_start + col_batch, num_cols)
  555. sc = column_index_to_letter(col_start + 1)
  556. ec = column_index_to_letter(col_end)
  557. col_range = f"{sheet_id}!{sc}{start_row}:{ec}{start_row + batch_count - 1}"
  558. col_values = [row[col_start:col_end] for row in batch]
  559. value_ranges.append({"range": col_range, "values": col_values})
  560. client.batch_update_values(access_token, sheet_token, {
  561. "valueRanges": value_ranges
  562. })
  563. processed += batch_count
  564. print(f" 处理: {processed}/{total_rows}")
  565. # 第4步:删除模板行(第2行),仅当有模板行时
  566. if has_template:
  567. print(f"删除模板行...")
  568. try:
  569. client.delete_rows(access_token, sheet_token, sheet_id, 2, 2)
  570. except Exception as e:
  571. print(f" 删除模板行失败: {e}")
  572. # 第5步:删除占位行(在数据行之后的多余空行),分批删除(每批≤5000行)
  573. if has_template and total_rows > 0:
  574. try:
  575. sheet_props_final = client.get_sheet_properties(access_token, sheet_token, sheet_id)
  576. if sheet_props_final and sheet_props_final['row_count'] > 1 + total_rows:
  577. rows_to_clean = sheet_props_final['row_count'] - (1 + total_rows)
  578. clean_start = 1 + total_rows + 1 # 表头(1) + 数据(total_rows) + 第一个占位行
  579. print(f"清理占位行({rows_to_clean}行)...")
  580. delete_batch = 5000
  581. while rows_to_clean > 0:
  582. batch = min(rows_to_clean, delete_batch)
  583. client.delete_rows(access_token, sheet_token, sheet_id,
  584. clean_start, clean_start - 1 + batch)
  585. rows_to_clean -= batch
  586. except Exception as e:
  587. print(f" 清理占位行失败: {e}")
  588. print(f"飞书上传完成: {sheet_token}")
  589. def get_date_range(start_str, end_str):
  590. """生成日期范围列表"""
  591. start = datetime.strptime(start_str, "%Y%m%d")
  592. end = datetime.strptime(end_str, "%Y%m%d")
  593. dates = []
  594. current = start
  595. while current <= end:
  596. dates.append(current.strftime("%Y%m%d"))
  597. current += timedelta(days=1)
  598. return dates
  599. def fetch_single_day(dt, sql_template, daily_dir, parallel_threads=0, config="default", hh=None):
  600. """获取单天数据(可选指定小时)"""
  601. global success_count, fail_count
  602. try:
  603. client = ODPSClient(config=config)
  604. sql = sql_template.replace("${dt}", dt)
  605. if hh is not None:
  606. sql = sql.replace("${hh}", hh)
  607. output_file = daily_dir / f"{dt}_{hh}.csv"
  608. else:
  609. output_file = daily_dir / f"{dt}.csv"
  610. # 下载到文件
  611. if parallel_threads > 0:
  612. # 多线程并行下载(适合大数据量)
  613. client.execute_sql_result_save_file_parallel(sql, str(output_file), workers=parallel_threads)
  614. else:
  615. # 单线程下载
  616. client.execute_sql_result_save_file(sql, str(output_file))
  617. # 检查结果
  618. if output_file.exists():
  619. row_count = sum(1 for _ in open(output_file)) - 1 # 减去表头
  620. with counter_lock:
  621. success_count += 1
  622. if row_count > 0:
  623. return (dt, "success", row_count)
  624. else:
  625. return (dt, "empty", 0)
  626. else:
  627. with counter_lock:
  628. fail_count += 1
  629. return (dt, "fail", 0)
  630. except Exception as e:
  631. with counter_lock:
  632. fail_count += 1
  633. return (dt, "error", str(e))
  634. def main():
  635. global success_count, fail_count
  636. parser = argparse.ArgumentParser(description="按天增量获取数据")
  637. parser.add_argument("sql_file", type=str, help="SQL文件路径")
  638. parser.add_argument("--days", type=int, default=7, help="获取最近N天 (默认7)")
  639. parser.add_argument("--start", type=str, help="开始日期 YYYYMMDD")
  640. parser.add_argument("--end", type=str, help="结束日期 YYYYMMDD")
  641. parser.add_argument("--date", type=str, help="单天日期 YYYYMMDD")
  642. parser.add_argument("--hh", type=str, default=None, help="小时 HH (00-23),需配合 --date 使用")
  643. parser.add_argument("--force", action="store_true", help="强制重新获取")
  644. parser.add_argument("--workers", type=int, default=5, help="天级并发数 (默认5)")
  645. parser.add_argument("--parallel", type=int, default=50, help="单天多线程下载 (默认50, 大数据量推荐)")
  646. parser.add_argument("--merge", action="store_true", help="合并所有日期数据到一个文件")
  647. parser.add_argument("--feishu", nargs="?", const="__USE_CONFIG__",
  648. help="上传到飞书表格")
  649. parser.add_argument("--sheet-id", type=str, default=None, help="飞书工作表ID")
  650. parser.add_argument("--sort", type=str, default=None, help="排序: 字段:asc/desc")
  651. parser.add_argument("--cols", type=str, default=None, help="列映射: 原名:新名,...")
  652. parser.add_argument("--filter", type=str, default=None, help="过滤: 字段=值,字段=值")
  653. parser.add_argument("--limit", type=int, default=None, help="上传行数上限")
  654. parser.add_argument("--config", type=str, default="default", help="ODPS配置: default 或 piaoquan_api")
  655. args = parser.parse_args()
  656. # 解析 SQL 文件路径
  657. sql_file = Path(args.sql_file).resolve()
  658. if not sql_file.exists():
  659. print(f"错误: 找不到 {sql_file}")
  660. return
  661. # 加载飞书配置(优先级: 命令行 > {sql名}.json > sql目录/default.json > 根目录/default.json > 默认值)
  662. feishu_config = load_feishu_config(sql_file)
  663. if args.feishu == "__USE_CONFIG__":
  664. args.feishu = feishu_config["token"]
  665. elif args.feishu is None:
  666. pass # 未启用飞书上传
  667. # 命令行参数覆盖配置文件
  668. if args.sheet_id is None:
  669. args.sheet_id = feishu_config["sheet_id"]
  670. if args.sort is None:
  671. args.sort = feishu_config["sort"]
  672. if args.cols is None:
  673. args.cols = feishu_config["cols"]
  674. if args.filter is None:
  675. args.filter = feishu_config["filter"]
  676. if args.limit is None:
  677. args.limit = feishu_config["limit"]
  678. append_cols = feishu_config.get("append_cols", False)
  679. order_spec = feishu_config.get("order")
  680. # 打印飞书配置
  681. if args.feishu:
  682. print(f"飞书配置: token={args.feishu}, sheet_id={args.sheet_id}, sort={args.sort}, cols={args.cols}, order={order_spec}")
  683. # 输出目录:SQL 同目录下的 output/SQL文件名/
  684. output_dir = sql_file.parent / "output"
  685. daily_dir = output_dir / sql_file.stem
  686. daily_dir.mkdir(parents=True, exist_ok=True)
  687. print(f"SQL文件: {sql_file}")
  688. print(f"数据目录: {daily_dir}")
  689. # 仅合并模式:不获取数据,直接合并已有文件
  690. if args.merge:
  691. existing_dates = get_existing_dates(daily_dir)
  692. print(f"已有数据: {len(existing_dates)}天")
  693. if existing_dates:
  694. merged_file = merge_csv_files(daily_dir)
  695. # 如果指定了飞书上传
  696. if args.feishu and merged_file:
  697. upload_to_feishu(merged_file, args.feishu, args.sheet_id, args.sort, args.cols, args.filter, args.limit, append_cols, order_spec)
  698. else:
  699. print("没有可合并的数据")
  700. return
  701. # 确定日期范围
  702. if args.date:
  703. target_dates = [args.date]
  704. elif args.start and args.end:
  705. target_dates = get_date_range(args.start, args.end)
  706. else:
  707. today = datetime.now()
  708. end_date = (today - timedelta(days=1)).strftime("%Y%m%d")
  709. start_date = (today - timedelta(days=args.days)).strftime("%Y%m%d")
  710. target_dates = get_date_range(start_date, end_date)
  711. print(f"目标日期: {target_dates[0]} ~ {target_dates[-1]} ({len(target_dates)}天)")
  712. # 检查已有数据
  713. existing_dates = get_existing_dates(daily_dir, args.hh)
  714. if args.hh:
  715. print(f"已有数据: {len(existing_dates)}天 (hh={args.hh})")
  716. else:
  717. print(f"已有数据: {len(existing_dates)}天")
  718. # 确定需要获取的日期
  719. if args.force:
  720. missing_dates = target_dates
  721. print(f"强制模式: 重新获取所有 {len(missing_dates)} 天")
  722. else:
  723. missing_dates = [d for d in target_dates if d not in existing_dates]
  724. print(f"需要获取: {len(missing_dates)}天")
  725. if not missing_dates:
  726. print("没有需要获取的数据,退出")
  727. return
  728. # 读取 SQL 模板
  729. sql_template = sql_file.read_text(encoding="utf-8")
  730. # 检测 SQL 中是否包含 ${dt} 变量
  731. has_dt_var = "${dt}" in sql_template
  732. # 重置计数器
  733. success_count = 0
  734. fail_count = 0
  735. # 如果 SQL 中没有 ${dt},只需执行一次
  736. if not has_dt_var:
  737. print("\n检测到 SQL 中不含 ${dt} 变量,只执行一次...")
  738. target_dates = ["20000101"] # 用虚拟日期
  739. missing_dates = target_dates
  740. output_file = output_dir / f"{sql_file.stem}.csv"
  741. output_file.parent.mkdir(parents=True, exist_ok=True)
  742. try:
  743. client = ODPSClient(config=args.config)
  744. if args.parallel > 0:
  745. client.execute_sql_result_save_file_parallel(sql_template, str(output_file), workers=args.parallel)
  746. else:
  747. client.execute_sql_result_save_file(sql_template, str(output_file))
  748. print(f"数据目录: {output_file}")
  749. # 如果指定了飞书上传
  750. if args.feishu and output_file.exists():
  751. upload_to_feishu(output_file, args.feishu, args.sheet_id, args.sort, args.cols, args.filter, args.limit, append_cols, order_spec)
  752. except Exception as e:
  753. print(f"✗ 执行失败: {e}")
  754. return
  755. # 并发获取
  756. print(f"目标日期: {target_dates[0]} ~ {target_dates[-1]} ({len(target_dates)}天)")
  757. workers = min(args.workers, len(missing_dates))
  758. if args.parallel > 0:
  759. print(f"\n开始获取 (天级并发: {workers}, 单天多线程: {args.parallel})...")
  760. else:
  761. print(f"\n开始获取 (并发数: {workers})...")
  762. with ThreadPoolExecutor(max_workers=workers) as executor:
  763. futures = {
  764. executor.submit(fetch_single_day, dt, sql_template, daily_dir, args.parallel, args.config, args.hh): dt
  765. for dt in missing_dates
  766. }
  767. completed = 0
  768. for future in as_completed(futures):
  769. completed += 1
  770. dt, status, info = future.result()
  771. if status == "success":
  772. print(f" [{completed}/{len(missing_dates)}] ✓ {dt}: {info} 行")
  773. elif status == "empty":
  774. print(f" [{completed}/{len(missing_dates)}] ⚠ {dt}: 无数据")
  775. elif status == "error":
  776. print(f" [{completed}/{len(missing_dates)}] ✗ {dt}: {info}")
  777. else:
  778. print(f" [{completed}/{len(missing_dates)}] ✗ {dt}: 失败")
  779. print(f"\n完成! 成功: {success_count}, 失败: {fail_count}")
  780. print(f"数据目录: {daily_dir}")
  781. # 如果指定了飞书上传,先合并再上传
  782. if args.feishu:
  783. merged_file = merge_csv_files(daily_dir)
  784. if merged_file:
  785. upload_to_feishu(merged_file, args.feishu, args.sheet_id, args.sort, args.cols, args.filter, args.limit, append_cols, order_spec)
  786. if __name__ == "__main__":
  787. main()