Pārlūkot izejas kodu

feat:修改同步任务监控

zhaohaipeng 3 mēneši atpakaļ
vecāks
revīzija
7cc4181a0b
3 mainītis faili ar 19 papildinājumiem un 77 dzēšanām
  1. 6 0
      client/ODPSClient.py
  2. 13 3
      script/feature_spark_monitor.py
  3. 0 74
      script/t.py

+ 6 - 0
client/ODPSClient.py

@@ -102,6 +102,12 @@ class ODPSClient(object):
                 result.append(batch)
         return result
 
+    def get_all_partition(self, table: str, project="loghubods") -> list[str]:
+        result = []
+        for partition in self.odps.get_table(project=project, name=table).partitions:
+            result.append(partition)
+        return result
+
     @classmethod
     def get_all_partition_info(cls, table_name: str, page_num=1, page_size=30, **kwargs) -> list[dict]:
 

+ 13 - 3
script/feature_spark_monitor.py

@@ -66,6 +66,7 @@ table_list = [
 ]
 
 filter_date = datetime(2024, 1, 1)
+current_hour = datetime.now().hour
 
 
 def handle_table(table_name: str, spark_task_list: List[Dict]) -> (bool, str):
@@ -82,18 +83,27 @@ def handle_table(table_name: str, spark_task_list: List[Dict]) -> (bool, str):
     last_finished_item = filtered_data[0]
     print(f"表: {table_name}, 最后一次完成时间为: {last_finished_item['finishedTime']}")
 
-    time_difference = datetime.now() - date_util.str_cover_date(last_finished_item['finishedTime'])
+    finished_time = date_util.str_cover_date(last_finished_item['finishedTime'])
+    started_time = date_util.str_cover_date(last_finished_item['startedTime'])
+
+    time_difference = datetime.now() - finished_time
     if time_difference > timedelta(minutes=120):
         return True, f"最近两个小时没有同步完成数据,最近一次完成时间为: {last_finished_item['finishedTime']}"
 
     # 判断持续时间是否超过一个小时
-    elapse = (date_util.str_cover_date(last_finished_item['finishedTime']) -
-              date_util.str_cover_date(last_finished_item['startedTime']))
+    elapse = (finished_time - started_time)
     print(f"表: {table_name}, 最后一次任务持续时间为: {date_util.seconds_convert(elapse.seconds)}")
 
     if elapse > timedelta(minutes=50):
         return True, f"最近一次同步任务持续时间超过50分钟, 持续时间为: {date_util.seconds_convert(elapse.seconds)}"
 
+    # 判断任务的完成时间是否是当前小时
+    finished_hour = finished_time.hour
+
+    print(f"表: {table_name}, 最后一次完成是: {finished_hour} 小时, 当前小时为: {current_hour}")
+    if finished_hour != current_hour:
+        return True, f"当前小时的任务未完成,请关注!!!"
+
     return False, ""
 
 

+ 0 - 74
script/t.py

@@ -1,74 +0,0 @@
-weight_v567 = {
-    "xgbNorScaleType": 1,
-    "xgbNorBias": -1.5147,
-    "xgbNorWeight": 2.277,
-    "xgbNorPowerWeight": 1.2216,
-    "xgbNorPowerExp": 1.32
-}
-weight_v564 = {
-    "fmRovBias": -0.0017,
-    "fmRovWeight": 1.331,
-    "fmRovSquareWeight": -6.4597,
-    "fmRovCubeWeight": 14.393,
-    "xgbNorScaleType": 1,
-    "xgbNorBias": -1.5147,
-    "xgbNorWeight": 2.277,
-    "xgbNorPowerWeight": 1.2216,
-    "xgbNorPowerExp": 1.32
-}
-score_map = {"fmRovOrigin": 0.27195945382118225, "NorXGBScore": 1.5618711709976196, "fmRov": 0.03600984021032825, "RovFMScore": 0.27195945382118225, "hasReturnRovScore": 2.442478, "vor": 8.230978}
-
-
-def rov_calibration(bias: float, weight: float, square_weight: float, cube_weight: float, score: float) -> float:
-    new_score = bias + weight * score
-    if abs(square_weight) > 1E-8:
-        new_score += square_weight * (score ** 2)
-    if abs(cube_weight) > 1E-8:
-        new_score += cube_weight * (score ** 3)
-    if new_score < 1E-8:
-        new_score = score
-    elif new_score > 0.9:
-        new_score = 0.9
-    return new_score
-
-
-def nor_calibration(scale_type: float, poly_bias: float, poly_weight: float,
-                    power_weight: float, power_exp: float, score: float) -> float:
-    if scale_type < 1:
-        return nor_poly_calibration(poly_bias, poly_weight, score)
-    else:
-        return nor_power_calibration(power_weight, power_exp, score)
-
-
-def nor_poly_calibration(bias: float, weight: float, score: float) -> float:
-    new_score = bias + weight * score
-    return max(new_score, 0)
-
-
-def nor_power_calibration(weight: float, exp: float, score: float) -> float:
-    new_score = weight * (score ** exp)
-    return min(new_score, 100)
-
-
-def _main():
-    fmRovBias = float(weight_v564["fmRovBias"])
-    fmRovWeight = float(weight_v564["fmRovWeight"])
-    fmRovSquareWeight = float(weight_v564["fmRovSquareWeight"])
-    fmRovCubeWeight = float(weight_v564["fmRovCubeWeight"])
-    xgbNorScaleType = float(weight_v564["xgbNorScaleType"])
-    xgbNorBias = float(weight_v564["xgbNorBias"])
-    xgbNorWeight = float(weight_v564["xgbNorWeight"])
-    xgbNorPowerWeight = float(weight_v564["xgbNorPowerWeight"])
-    xgbNorPowerExp = float(weight_v564["xgbNorPowerExp"])
-    fm_rov = float(score_map['fmRov'])
-    nor = float(score_map['NorXGBScore'])
-    vor = float(score_map['vor'])
-
-    # new_fm_rov = rov_calibration(fmRovBias, fmRovWeight, fmRovSquareWeight, fmRovCubeWeight, fm_rov)
-    new_fm_rov = fm_rov
-    new_nor = nor_calibration(xgbNorScaleType, xgbNorBias, xgbNorWeight, xgbNorPowerWeight, xgbNorPowerExp, nor)
-    print(new_fm_rov * (0.1 + new_nor) * (0.1 + vor))
-
-
-if __name__ == '__main__':
-    _main()