Przeglądaj źródła

feat: 支持多 ODPS 配置切换

fetch_daily.py 新增 --config 参数,lib/odps_module.py 重构为
多配置模式,支持 default 和 piaoquan_api 两套 ODPS 环境。

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
yangxiaohui 1 miesiąc temu
rodzic
commit
28f965767c
2 zmienionych plików z 36 dodań i 8 usunięć
  1. 6 4
      fetch_daily.py
  2. 30 4
      lib/odps_module.py

+ 6 - 4
fetch_daily.py

@@ -16,6 +16,7 @@
     python fetch_daily.py tasks/xxx/query.sql --feishu           # 获取后上传到飞书表格
     python fetch_daily.py tasks/xxx/query.sql --feishu TOKEN     # 指定飞书表格token
     python fetch_daily.py tasks/xxx/query.sql --merge --feishu   # 仅合并并上传飞书
+    python fetch_daily.py tasks/xxx/query.sql --config piaoquan_api  # 切换 ODPS 配置
 """
 import argparse
 import sys
@@ -463,12 +464,12 @@ def get_date_range(start_str, end_str):
     return dates
 
 
-def fetch_single_day(dt, sql_template, daily_dir, parallel_threads=0):
+def fetch_single_day(dt, sql_template, daily_dir, parallel_threads=0, config="default"):
     """获取单天数据"""
     global success_count, fail_count
 
     try:
-        client = ODPSClient()
+        client = ODPSClient(config=config)
         sql = sql_template.replace("${dt}", dt)
         output_file = daily_dir / f"{dt}.csv"
 
@@ -518,6 +519,7 @@ def main():
     parser.add_argument("--sheet-id", type=str, default=None, help="飞书工作表ID")
     parser.add_argument("--sort", type=str, default=None, help="排序: 字段:asc/desc")
     parser.add_argument("--cols", type=str, default=None, help="列映射: 原名:新名,...")
+    parser.add_argument("--config", type=str, default="default", help="ODPS配置: default 或 piaoquan_api")
     args = parser.parse_args()
 
     # 解析 SQL 文件路径
@@ -613,7 +615,7 @@ def main():
         output_file.parent.mkdir(parents=True, exist_ok=True)
 
         try:
-            client = ODPSClient()
+            client = ODPSClient(config=args.config)
             if args.parallel > 0:
                 client.execute_sql_result_save_file_parallel(sql_template, str(output_file), workers=args.parallel)
             else:
@@ -636,7 +638,7 @@ def main():
 
     with ThreadPoolExecutor(max_workers=workers) as executor:
         futures = {
-            executor.submit(fetch_single_day, dt, sql_template, daily_dir, args.parallel): dt
+            executor.submit(fetch_single_day, dt, sql_template, daily_dir, args.parallel, args.config): dt
             for dt in missing_dates
         }
 

+ 30 - 4
lib/odps_module.py

@@ -16,18 +16,44 @@ from pyarrow import csv as pa_csv
 options.tunnel.use_instance_tunnel = True
 options.tunnel.limit_instance_tunnel = False
 
+# ODPS 配置
+ODPS_CONFIGS = {
+    "default": {
+        "access_id": "LTAIWYUujJAm7CbH",
+        "access_secret": "RfSjdiWwED1sGFlsjXv0DlfTnZTG1P",
+        "project": "loghubods",
+    },
+    "piaoquan_api": {
+        "access_id": "LTAI5tKyXxh7C6349c1wbwUX",
+        "access_secret": "H8doQDC20KugToRA3giERgRyRD1KR9",
+        "project": "piaoquan_api",
+    },
+}
+
 
 class ODPSClient(object):
-    def __init__(self, project="loghubods"):
-        self.accessId = "LTAIWYUujJAm7CbH"
-        self.accessSecret = "RfSjdiWwED1sGFlsjXv0DlfTnZTG1P"
+    def __init__(self, project="loghubods", config="default"):
+        """
+        初始化 ODPS 客户端
+
+        Args:
+            project: 项目名(可覆盖配置中的默认项目)
+            config: 配置名,可选 "default" 或 "piaoquan_api"
+        """
+        cfg = ODPS_CONFIGS.get(config, ODPS_CONFIGS["default"])
+
+        self.accessId = cfg["access_id"]
+        self.accessSecret = cfg["access_secret"]
         self.endpoint = "http://service.odps.aliyun.com/api"
         self.tunnelUrl = "http://dt.cn-hangzhou.maxcompute.aliyun-inc.com"
 
+        # 如果指定了 project 且不是默认值,使用指定的;否则用配置中的
+        actual_project = project if project != "loghubods" else cfg["project"]
+
         self.odps = ODPS(
             self.accessId,
             self.accessSecret,
-            project,
+            actual_project,
             self.endpoint
         )