feature_spark_monitor.py 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166
  1. import configparser
  2. from datetime import datetime, timedelta
  3. from client import YarnClient
  4. from util import dateutil, feishu_inform_util
  5. import pandas as pd
  6. yarn_client = YarnClient.YarnClient("121.40.173.140")
  7. table_list = [
  8. "alg_cid_feature_basic_info",
  9. "alg_cid_feature_adver_action",
  10. "alg_cid_feature_cid_action",
  11. "alg_cid_feature_region_action",
  12. "alg_cid_feature_app_action",
  13. "alg_cid_feature_week_action",
  14. "alg_cid_feature_hour_action",
  15. "alg_cid_feature_brand_action",
  16. "alg_cid_feature_weChatVersion_action",
  17. "alg_cid_feature_vid_cf",
  18. "alg_cid_feature_vid_cf_rank",
  19. "alg_mid_feature_ad_action",
  20. "alg_vid_feature_all_exp",
  21. "alg_vid_feature_all_share",
  22. "alg_vid_feature_all_return",
  23. "alg_vid_feature_exp2share",
  24. "alg_vid_feature_share2return",
  25. "alg_vid_feature_feed_noflow_exp",
  26. "alg_vid_feature_feed_noflow_root_share",
  27. "alg_vid_feature_feed_noflow_root_return",
  28. "alg_vid_feature_feed_flow_exp",
  29. "alg_vid_feature_feed_flow_root_share",
  30. "alg_vid_feature_feed_flow_root_return",
  31. "alg_vid_feature_feed_province_exp",
  32. "alg_vid_feature_feed_province_root_share",
  33. "alg_vid_feature_feed_province_root_return",
  34. "alg_vid_feature_basic_info",
  35. "alg_recsys_feature_cf_i2i_new",
  36. "alg_vid_feature_all_exp_v2",
  37. "alg_vid_feature_exp2share_v2",
  38. "alg_vid_feature_feed_noflow_exp_v2",
  39. "alg_vid_feature_feed_noflow_root_share_v2",
  40. "alg_vid_feature_feed_noflow_root_return_v2",
  41. "alg_vid_feature_feed_flow_exp_v2",
  42. "alg_vid_feature_feed_flow_root_share_v2",
  43. "alg_vid_feature_feed_flow_root_return_v2",
  44. "alg_vid_feature_feed_province_exp_v2",
  45. "alg_vid_feature_feed_province_root_share_v2",
  46. "alg_vid_feature_feed_province_root_return_v2",
  47. "alg_recsys_feature_cf_i2i_new_v2",
  48. "alg_mid_feature_play",
  49. "alg_mid_feature_share_and_return",
  50. "alg_mid_feature_play_tags",
  51. "alg_mid_feature_return_tags",
  52. "alg_mid_feature_share_tags",
  53. "alg_mid_feature_feed_exp_share_tags",
  54. "alg_mid_feature_feed_exp_return_tags",
  55. "alg_mid_feature_sharecf",
  56. "alg_mid_feature_returncf",
  57. "alg_mid_feature_feed_exp_return_tags_v2",
  58. "alg_mid_feature_feed_exp_share_tags_v2"
  59. ]
  60. filter_date = datetime(2024, 1, 1)
  61. def df_print(result):
  62. df = pd.DataFrame(result)
  63. # 过滤出 name 中包含 'cid' 的行
  64. filtered_df = df[df['name'].str.contains('cid')].copy() # 使用 .copy() 生成副本以避免警告
  65. filtered_df.loc[:, 'name'] = filtered_df['name'].str.replace('odps sync to redis : ', '', regex=False)
  66. sorted_df = filtered_df.sort_values(by="startedTime")
  67. # 获取表头
  68. header = ' | '.join(sorted_df.columns)
  69. def format_row(row):
  70. return ' | '.join([str(row[col]) for col in sorted_df.columns])
  71. # 获取数据行
  72. rows = filtered_df.apply(format_row, axis=1).tolist()
  73. # 打印输出
  74. print(header)
  75. print('-' * len(header))
  76. print('\n'.join(rows))
  77. def handle_table(table_name: str, spark_task_list: list[dict]) -> (bool, str):
  78. filtered_data = [
  79. item for item in spark_task_list
  80. if table_name in item['name'] and dateutil.str_cover_date(item['startedTime']) > filter_date
  81. ]
  82. if filtered_data:
  83. latest_started_time = max(
  84. [dateutil.str_cover_date(item['startedTime']) for item in filtered_data])
  85. print(f"表: {table_name}, 最后一次同步时间为: {latest_started_time}")
  86. now = datetime.now()
  87. time_difference = now - latest_started_time
  88. if time_difference > timedelta(minutes=140):
  89. return True, latest_started_time
  90. return False, filtered_data
  91. return False, ""
  92. def send_error_info(table_name: str, latest_started_time: str):
  93. mgs_text = f"\n- 大数据表名: {table_name}" \
  94. f"\n- 最后一次同步时间: {latest_started_time}" \
  95. f"\n- 超过两个小时没有同步,请关注"
  96. card_json = {
  97. "config": {},
  98. "i18n_elements": {
  99. "zh_cn": [
  100. {
  101. "tag": "markdown",
  102. "content": "",
  103. "text_align": "left",
  104. "text_size": "normal"
  105. },
  106. {
  107. "tag": "markdown",
  108. "content": mgs_text,
  109. "text_align": "left",
  110. "text_size": "normal"
  111. }
  112. ]
  113. },
  114. "i18n_header": {
  115. "zh_cn": {
  116. "title": {
  117. "tag": "plain_text",
  118. "content": "【测试】大数据表同步延迟告警"
  119. },
  120. "subtitle": {
  121. "tag": "plain_text",
  122. "content": ""
  123. },
  124. "template": "red"
  125. }
  126. }
  127. }
  128. def _main():
  129. # 读取配置文件
  130. config = configparser.ConfigParser()
  131. config.read("config/config.ini")
  132. print(config.get("feishu", 'model.webhook'))
  133. # 获取最近七小时的Spark任务
  134. hours_7_early = int((datetime.now() - timedelta(hours=7)).timestamp()) * 1000
  135. result = yarn_client.get_apps(started_time_begin=hours_7_early)
  136. for table_name in table_list:
  137. # 判断最后一次同步是否为两个小时以内
  138. b, latest_started_time = handle_table(table_name, result)
  139. if b:
  140. print(f"表: {table_name}, 最后一次同步时间距当前时间超过140分钟")
  141. send_error_info(table_name, latest_started_time)
  142. if __name__ == '__main__':
  143. _main()