瀏覽代碼

feat:添加分析任务

zhaohaipeng 10 月之前
父節點
當前提交
484147b45b
共有 7 個文件被更改,包括 184 次插入33 次删除
  1. 89 0
      client/ODPSClient.py
  2. 5 5
      client/YarnClient.py
  3. 44 13
      feature_spark_monitor.py
  4. 13 0
      util/convert_util.py
  5. 33 0
      util/date_util.py
  6. 0 9
      util/dateutil.py
  7. 0 6
      util/feishu_inform_util.py

+ 89 - 0
client/ODPSClient.py

@@ -0,0 +1,89 @@
+import json
+import re
+
+import requests
+from odps import ODPS
+from odps.tunnel import TableTunnel
+from util import convert_util, date_util
+
+
+class ODPSClient:
+    def __init__(self):
+        self.accessId = "LTAIWYUujJAm7CbH"
+        self.accessSecret = "RfSjdiWwED1sGFlsjXv0DlfTnZTG1P"
+        self.endpoint = "http://service.odps.aliyun.com/api"
+        self.tunnelUrl = "http://dt.cn-hangzhou.maxcompute.aliyun-inc.com"
+        self.odps = ODPS(
+            self.accessId,
+            self.accessSecret,
+            "",
+            self.endpoint
+        )
+
+    def get_all_record(self, project: str, table: str, dt: str) -> list:
+        tunnel = TableTunnel(self.odps)
+        download_session = tunnel.create_download_session(f"{project}.{table}", partition_spec=f"dt={dt}")
+        with download_session.open_record_reader(0, download_session.count) as reader:
+            for record in reader:
+                print(record)
+
+    @classmethod
+    def get_all_partition_info(cls, table_name: str, page_num=1, page_size=30, project="loghubods") -> list[dict]:
+        header = {
+            "cookie": 'yunpk=1894469520484605; t=35f3266f9f98d4a391e691a92b49fae6; cna=8+1iHgu92GsCAT0whRrtEI4u; '
+                      'login_aliyunid_pk=1894469520484605; sca=2176fa73; bd=s0ouCmI%3D; '
+                      'copilot-session-id-dw-copilot=6e694b9e-aa2e-46fd-9ef5-77d4680755f1; '
+                      'help_csrf=doehZiDyB3oB1Z%2Fn3cPDTOjfQgZgfK3SmTSveK6mkcuU30ul8euzz4E'
+                      '%2BkHqlvnYgQ6A2GgsUAGTPFblrZ8s7xJmv7zwWDzSffe4ceCSGnoEo0tIfCoPvPcutqc2iScScUmvCqxLY9dxJnl9Dag0a'
+                      'dw%3D%3D; cr_token=83b9653a-28d7-4981-af91-45f5828cd63b; _samesite_flag_=true; activeRegionId='
+                      'cn-hangzhou; cookie2=1e2ab9af438ed18a85b4e68fbf4956d5; _tb_token_=e3f046f7b7bfe; aliyun_lang='
+                      'zh; ch-m-extra-1=true; mini_login_aliyunid_ticket=3R5H3e3HY2c8gwLZuY5GmS7J.111N6WK3pzaiprUFY6'
+                      'PArbC7KmiFxsfbwTDpNuDrALFYtkxoUoWD8bewoZ3BSqdnW8NRpptWx92BTNY5KfrWXrmzTBsxm5gSoiHkqzBDpxeiQ5J'
+                      'tAYwpT7Y73ZXA4j2BMT4XSuGyzKEsrETyffAXgLhF4TPX7mn5fHWu7jsocNdMsS7xH6gZrvBu1tcAAbw1Pwqeg4xe1ekC'
+                      'HcHog5tx1MytitCbXyyZ4QvMvxWjB47tTyFvbxLxY4VQHLjpW6XQyTGFKPNJnGWJ1Per9Pa6RNsdJwDHfjup.2mWNaj2h'
+                      'xzDgGG43QBHWDbN8fJYD21nYmoFyrTT2dwkuVG9mUptkQewi3bhwPMdyBg; login_current_pk=2088112089146396'
+                      '46; bs_n_lang=zh_CN; cnaui=%2522zhaohaipeng%2520%2540%25201894469520484605%2522; aui=%2522zha'
+                      'ohaipeng%2520%2540%25201894469520484605%2522; login_aliyunid_csrf=_csrf_tk_1929015259855532;'
+                      ' login_aliyunid="zhaohaipeng @ 1894469520484605"; login_aliyunid_ticket=3R5H3e3HY2c8gwLZuY5Gm'
+                      'S7J.1118P2HCue5f1vZ1yf4bXhrmHGZLNDGj5MZW8SinPps2tPh8RCYRkxDq391yUE2enUjtzR1neLcAmV7FgXbdr7CZy'
+                      'e7gX1D8viERV6QmBGoZACsLuJDnjE3useRX6vRio8yTkkEfGxrWSSmUkaCCesJMKZW3zW282np5TgxvNMBuL35MEgv.2m'
+                      'WNaj2G4W2ax92ZQn9kmFDwDKXYYc9PumdViGC6kXWkLqzeJe4gGK7HEzt81jXGke; aliyun_site=CN; login_aliyu'
+                      'nid_sc=3R5H3e3HY2c8gwLZuY5GmS7K.1113bVmMKv851SdF2aYNXdbgTb2UWJHsApX4rU6pbztYdef5Cc1SWjFqwueUU'
+                      'NUgbfpAPb.1QDta7oX8HMFGXX7k6ic77ZGWhChZRfmsKSScgXqZFvWcwKGmzwhpHRzs9YpPy37M; atpsida=bdb8dbc3'
+                      '436ae0f6eb6706f9_1724123715_1; c_token=a0f8c3bfb3c211f02349dd40eea30c46; ck2=8918a1a152bf75fa'
+                      'e4570ce4ab7cb750; an=zhaohaipeng; lg=true; sg=g26; dw_bff=208811208914639646.%E8%B5%B5%E6%B5%'
+                      'B7%E9%B9%8F.42901.1724120895027.1894469520484605.1185874277.26842.OFFICIAL.2.1724304596731.zh'
+                      'aohaipeng.5.0dd716a2473fc98b00111c110d1a7a9d6bc696297; currentRegionId=cn-hangzhou; csrf_toke'
+                      'n=7a9647ecb28a44b31724204973v33fc17f87; tfstk=f81S02cNPMCq3AQXxLU2hp81SGRB2gNa92TdSwhrJQdJRB_'
+                      'GA0-yLg-dhGjITzbLEyNCzN9epuAPOisd0LIUEwoQpgjmU3EhzHsfDGKEaaKeOpscf3KzxBulnG7ta_ykTXOktBEab5PZ'
+                      'raAH9wgIK-ooMFLUTbdpJ1p_LTZab5P4eqdQVlWUkTXylexvyepKer9vSFTJ9pK-MrTp80HKwMQYlexx9ehpeqdvSUH-p'
+                      'UVj5ewWqaaY8dPfHym1rnGK1VYye5_SLXchGU959aKXuE5XPLt93hqs51_Oy1YCtPHBAwWR4EjQ1oOANg1JhBNIiQ71Hg'
+                      '9fFkGXntQG9K60fAbGyg69B_FboMsASOARwkcyq9QA9n588XtOQNAXo6rmwH6Ap_Jkt0EWl__1O9srtf-6paMQ6IlBlhz'
+                      'blvDFvz6vmVqtYHpJoU94lriRKccKT05UlYXHeEYWPraj29f..; isg=BJubgTlFeefQqoYNgvt9L9EEKvkFcK9y0QISL'
+                      '43Y1hqxbLhOFkIDw4uqBsxizAdq'
+        }
+
+        url = f"https://bff-cn-hangzhou.data.aliyun.com/dma/listPartitions_2?pageSize={page_size}&pageNum={page_num}&entityGuid=odps.{project}.{table_name}&entityType=odps-table"
+        print(f"请求的阿里云接口为: {url}")
+        response = requests.get(url, headers=header)
+        resp_json = json.loads(response.text)
+        result = []
+        dt_hh = re.compile(r'dt=(\d{8})/hh=(\d{2})')
+        dt_hh_mm = re.compile(r'dt=(\d{8})/hh=(\d{2})/mm=(\d{2})')
+        for datum in resp_json["data"]["data"]:
+            s1 = dt_hh.search(datum['name'])
+            # s2 = dt_hh_mm.search(datum['name'])
+            partition = datum['name']
+            if s1:
+                partition = f"{s1.group(1)}{s1.group(2)}"
+            item = {
+                "表名": table_name,
+                "name": datum["name"],
+                "分区": partition,
+                "数据量": datum["recordCount"],
+                "数据大小": convert_util.byte_convert(datum['dataSize']),
+                "创建时间": date_util.ts_cover_str(datum["gmtCreate"]),
+                "更新时间": date_util.ts_cover_str(datum['gmtModified'])
+            }
+            result.append(item)
+        return result

+ 5 - 5
client/YarnClient.py

@@ -3,7 +3,7 @@ from typing import List, Dict
 
 import requests
 
-from util import dateutil
+from util import date_util
 
 
 class YarnClient(object):
@@ -38,12 +38,12 @@ class YarnClient(object):
                 "id": app['id'],
                 "name": app['name'],
                 "finalStatus": app['finalStatus'],
-                "finishedTime": dateutil.ts_cover_str(app['finishedTime']),
-                "startedTime": dateutil.ts_cover_str(app['startedTime']),
-                "launchTime": dateutil.ts_cover_str(app['launchTime']),
+                "finishedTime": date_util.ts_cover_str(app['finishedTime']),
+                "startedTime": date_util.ts_cover_str(app['startedTime']),
+                "launchTime": date_util.ts_cover_str(app['launchTime']),
                 "queue": app['queue'],
                 "state": app['state'],
-                "elapsedTime": app['elapsedTime'],
+                "elapsedTime": date_util.timestamp_convert(app['elapsedTime']),
             }
             result.append(item)
 

+ 44 - 13
feature_spark_monitor.py

@@ -4,10 +4,11 @@ from typing import Dict, List
 
 import pandas as pd
 
-from client import YarnClient
-from util import dateutil, feishu_inform_util
+from client import YarnClient, ODPSClient
+from util import date_util, feishu_inform_util
 
-yarn_client = YarnClient.YarnClient("192.168.203.16")
+yarn_client = YarnClient.YarnClient("121.40.173.140")
+odps_client = ODPSClient.ODPSClient()
 
 table_list = [
     "alg_mid_feature_sharecf",
@@ -52,38 +53,38 @@ table_list = [
 
 filter_date = datetime(2024, 1, 1)
 
+columns = ["id", "表名", "startedTime", "launchTime", "finishedTime", "state", "elapsedTime", "分区", "数据量",
+           "数据大小", "创建时间", "更新时间"]
+
 
 def df_print(result):
     df = pd.DataFrame(result)
-    # 过滤出 name 中包含 'cid' 的行
-    filtered_df = df[df['name'].str.contains('cid')].copy()  # 使用 .copy() 生成副本以避免警告
-    filtered_df.loc[:, 'name'] = filtered_df['name'].str.replace('odps sync to redis : ', '', regex=False)
+    sorted_df = df.sort_values(by="startedTime")
 
-    sorted_df = filtered_df.sort_values(by="startedTime")
+    sorted_df = sorted_df[columns]
 
     # 获取表头
     header = ' | '.join(sorted_df.columns)
 
+    # 获取数据行
     def format_row(row):
         return ' | '.join([str(row[col]) for col in sorted_df.columns])
 
-    # 获取数据行
-    rows = filtered_df.apply(format_row, axis=1).tolist()
+    rows = sorted_df.apply(format_row, axis=1).tolist()
 
     # 打印输出
     print(header)
-    print('-' * len(header))
     print('\n'.join(rows))
 
 
 def handle_table(table_name: str, spark_task_list: List[Dict]) -> (bool, str):
     filtered_data = [
         item for item in spark_task_list
-        if table_name in item['name'] and dateutil.str_cover_date(item['startedTime']) > filter_date
+        if table_name in item['name'] and date_util.str_cover_date(item['startedTime']) > filter_date
     ]
     if filtered_data:
         latest_started_time = max(
-            [dateutil.str_cover_date(item['startedTime']) for item in filtered_data])
+            [date_util.str_cover_date(item['startedTime']) for item in filtered_data])
         print(f"表: {table_name}, 最后一次同步时间为: {latest_started_time}")
         now = datetime.now()
         time_difference = now - latest_started_time
@@ -153,5 +154,35 @@ def _main():
             send_error_info(table_name, latest_started_time, webhook_url)
 
 
+def _analyse():
+    hours_7_early = int((datetime.now() - timedelta(hours=14)).timestamp()) * 1000
+    result = yarn_client.get_apps(started_time_begin=hours_7_early)
+    result = [
+        {
+            **{k: v for k, v in item.items() if k != 'name'},
+            'table_name': item['name'].split(":")[1].strip()
+        }
+        for item in result
+        if "alg" in item['name'] and item['state'] == 'RUNNING'
+    ]
+    partition_info = {}
+    for table_name in list({item['table_name'] for item in result}):
+        resp = odps_client.get_all_partition_info(table_name=table_name)
+        partition_info[table_name] = {item['分区']: item for item in resp}
+
+    spark_task_list = []
+    for item in result:
+        dt_hh = date_util.date_convert_dt_hh(item['startedTime'])
+        if item['table_name'] in partition_info and dt_hh in partition_info[item['table_name']]:
+            item = {
+                **item,
+                **partition_info[item['table_name']][dt_hh]
+            }
+            spark_task_list.append(item)
+
+    df_print(spark_task_list)
+
+
 if __name__ == '__main__':
-    _main()
+    # _main()
+    _analyse()

+ 13 - 0
util/convert_util.py

@@ -0,0 +1,13 @@
+def byte_convert(bytes: int):
+    # 定义单位
+    units = ["Bytes", "KB", "MB", "GB", "TB"]
+
+    unit_index = 0
+
+    # 迭代转换单位,直到值小于1024
+    while bytes >= 1024 and unit_index < len(units) - 1:
+        bytes /= 1024.0
+        unit_index += 1
+
+    # 返回结果
+    return f"{bytes:.2f} {units[unit_index]}"

+ 33 - 0
util/date_util.py

@@ -0,0 +1,33 @@
+from datetime import datetime
+
+
+def ts_cover_str(ts=0) -> str:
+    return datetime.fromtimestamp(ts / 1000).strftime('%Y-%m-%d %H:%M:%S')
+
+
+def str_cover_date(s: str):
+    return datetime.strptime(s, '%Y-%m-%d %H:%M:%S')
+
+
+def timestamp_convert(ts: int) -> str:
+    return seconds_convert(ts // 1000)
+
+
+def seconds_convert(seconds: int) -> str:
+    hours = seconds // 3600
+    minutes = (seconds % 3600) // 60
+    seconds = seconds % 60
+
+    s = ""
+    if hours > 0:
+        s = f"{s} {hours}小时"
+    if minutes > 0:
+        s = f"{s} {minutes}分钟"
+    if seconds > 0:
+        s = f"{s} {seconds}秒"
+    return s
+
+
+def date_convert_dt_hh(date_str: str) -> str:
+    date_obj = datetime.strptime(date_str, '%Y-%m-%d %H:%M:%S')
+    return date_obj.strftime('%Y%m%d%H')

+ 0 - 9
util/dateutil.py

@@ -1,9 +0,0 @@
-import datetime
-
-
-def ts_cover_str(ts=0) -> str:
-    return datetime.datetime.fromtimestamp(ts / 1000).strftime('%Y-%m-%d %H:%M:%S')
-
-
-def str_cover_date(s: str):
-    return datetime.datetime.strptime(s, '%Y-%m-%d %H:%M:%S')

+ 0 - 6
util/feishu_inform_util.py

@@ -28,9 +28,3 @@ def timestamp_format(timestamp: str) -> str:
     except ValueError as e:
         return timestamp
 
-
-def seconds_convert(seconds):
-    hours = seconds // 3600
-    minutes = (seconds % 3600) // 60
-    seconds = seconds % 60
-    return f"{hours}小时 {minutes}分钟 {seconds}秒"