feature_spark_monitor.py 7.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206
  1. import argparse
  2. import configparser
  3. from datetime import datetime, timedelta
  4. from typing import Dict, List
  5. from client import YarnClient
  6. from util import date_util, feishu_inform_util
  7. yarn_client = YarnClient.YarnClient("192.168.203.16")
  8. table_list = [
  9. "alg_mid_feature_sharecf",
  10. "alg_vid_feature_all_share",
  11. "alg_vid_feature_all_return",
  12. "alg_vid_feature_share2return",
  13. "alg_vid_feature_basic_info",
  14. "alg_recsys_feature_cf_i2i_new",
  15. "alg_vid_feature_all_exp_v2",
  16. "alg_vid_feature_exp2share_v2",
  17. "alg_vid_feature_feed_noflow_exp_v2",
  18. "alg_vid_feature_feed_noflow_root_share_v2",
  19. "alg_vid_feature_feed_noflow_root_return_v2",
  20. "alg_vid_feature_feed_flow_exp_v2",
  21. "alg_vid_feature_feed_flow_root_share_v2",
  22. "alg_vid_feature_feed_flow_root_return_v2",
  23. "alg_vid_feature_feed_province_exp_v2",
  24. "alg_vid_feature_feed_province_root_share_v2",
  25. "alg_vid_feature_feed_province_root_return_v2",
  26. "alg_cid_feature_basic_info",
  27. "alg_cid_feature_adver_action",
  28. "alg_cid_feature_cid_action",
  29. "alg_cid_feature_region_action",
  30. "alg_cid_feature_app_action",
  31. "alg_cid_feature_week_action",
  32. "alg_cid_feature_hour_action",
  33. "alg_cid_feature_brand_action",
  34. "alg_cid_feature_weChatVersion_action",
  35. "alg_cid_feature_vid_cf",
  36. "alg_cid_feature_vid_cf_rank",
  37. "alg_mid_feature_ad_action",
  38. "alg_mid_feature_play",
  39. "alg_mid_feature_share_and_return",
  40. "alg_mid_feature_play_tags",
  41. "alg_mid_feature_return_tags",
  42. "alg_mid_feature_share_tags",
  43. "alg_mid_feature_returncf",
  44. "alg_mid_feature_feed_exp_return_tags_v2",
  45. "alg_mid_feature_feed_exp_share_tags_v2",
  46. "alg_vid_global_feature_20250212",
  47. "alg_vid_recommend_exp_feature_20250212",
  48. "alg_vid_recommend_flowpool_exp_feature_20250212",
  49. "alg_vid_apptype_recommend_exp_feature_20250212",
  50. "alg_vid_province_recommend_exp_feature_20250212",
  51. "alg_vid_brand_recommend_exp_feature_20250212",
  52. "alg_vid_hotsencetype_recommend_exp_feature_20250212",
  53. "scene_type_vid_cf_feature_20250212",
  54. "vid_click_cf_feature_20250212",
  55. "alg_recsys_feature_cf_i2i_v2",
  56. "alg_channel_recommend_exp_feature_20250212",
  57. "alg_merge_cate1_recommend_exp_feature_20250212",
  58. "alg_merge_cate2_recommend_exp_feature_20250212",
  59. "alg_video_unionid_recommend_exp_feature_20250212",
  60. "mid_merge_cate1_feature_20250212",
  61. "mid_merge_cate2_feature_20250212",
  62. "mid_global_feature_20250212",
  63. ]
  64. filter_date = datetime(2024, 1, 1)
  65. current_hour = datetime.now().hour
  66. def handle_table(table_name: str, spark_task_list: List[Dict]) -> (bool, str):
  67. filtered_data = [
  68. item for item in spark_task_list
  69. if table_name == item['name']
  70. ]
  71. if not filtered_data:
  72. # 如果没有找到,表示近七个小时都没有同步过
  73. return True, "最近没有四小时同步过数据"
  74. # 判断最近一次完成时间是否大于两个小时
  75. filtered_data.sort(key=lambda item: date_util.str_cover_date(item['finishedTime']), reverse=True)
  76. last_finished_item = filtered_data[0]
  77. print(f"表: {table_name}, 最后一次完成时间为: {last_finished_item['finishedTime']}")
  78. finished_time = date_util.str_cover_date(last_finished_item['finishedTime'])
  79. started_time = date_util.str_cover_date(last_finished_item['startedTime'])
  80. time_difference = datetime.now() - finished_time
  81. if time_difference > timedelta(minutes=120):
  82. return True, f"最近两个小时没有同步完成数据,最近一次完成时间为: {last_finished_item['finishedTime']}"
  83. # 判断持续时间是否超过一个小时
  84. elapse = (finished_time - started_time)
  85. print(f"表: {table_name}, 最后一次任务持续时间为: {date_util.seconds_convert(elapse.seconds)}")
  86. if elapse > timedelta(minutes=50):
  87. return True, f"最近一次同步任务持续时间超过50分钟, 持续时间为: {date_util.seconds_convert(elapse.seconds)}"
  88. # 判断任务的完成时间是否是当前小时
  89. finished_hour = finished_time.hour
  90. print(f"表: {table_name}, 最后一次完成是: {finished_hour} 小时, 当前小时为: {current_hour}")
  91. if finished_hour != current_hour:
  92. return True, f"当前小时的任务未完成,请关注!!!"
  93. return False, ""
  94. def invoke_feishu_card_mgs(webhook: str, content: str):
  95. card_json = {
  96. "config": {},
  97. "i18n_elements": {
  98. "zh_cn": [
  99. {
  100. "tag": "markdown",
  101. "content": "",
  102. "text_align": "left",
  103. "text_size": "normal"
  104. },
  105. {
  106. "tag": "markdown",
  107. "content": content,
  108. "text_align": "left",
  109. "text_size": "normal"
  110. }
  111. ]
  112. },
  113. "i18n_header": {
  114. "zh_cn": {
  115. "title": {
  116. "tag": "plain_text",
  117. "content": "特征同步延迟告警"
  118. },
  119. "subtitle": {
  120. "tag": "plain_text",
  121. "content": ""
  122. },
  123. "template": "red"
  124. }
  125. }
  126. }
  127. feishu_inform_util.send_card_msg_to_feishu(webhook, card_json)
  128. def send_error_info(table_name: str, warn_reason: str, webhook: str):
  129. mgs_text = f"\n- 大数据表名: {table_name}" \
  130. f"\n- 告警原因: {warn_reason}" \
  131. f"\n- 请关注"
  132. invoke_feishu_card_mgs(webhook, mgs_text)
  133. def print_config(config_path):
  134. print(f"配置文件路径: {config_path}")
  135. config = configparser.ConfigParser()
  136. config.read(config_path)
  137. for section in config.sections():
  138. print(f"[{section}]")
  139. for key, value in config.items(section):
  140. print(f"{key} = {value}")
  141. def _main():
  142. parser = argparse.ArgumentParser(description="feature_spark_task_monitor")
  143. parser.add_argument("-c", "--config", required=False, help="config file path",
  144. default="/home/monitor/model_script/config/config.ini")
  145. args = parser.parse_args()
  146. print_config(args.config)
  147. # 读取配置文件
  148. config = configparser.ConfigParser()
  149. config.read(args.config)
  150. webhook_url = config.get("feishu", "model.webhook")
  151. # webhook_url = 'https://open.feishu.cn/open-apis/bot/v2/hook/540d4098-367a-4068-9a44-b8109652f07c'
  152. # 获取最近4小时的Spark任务
  153. hours_7_early = int((datetime.now() - timedelta(hours=4)).timestamp()) * 1000
  154. result = yarn_client.get_apps(finished_time_begin=hours_7_early)
  155. result = [
  156. {**item, 'name': item['name'].split(":")[1].strip()}
  157. for item in result
  158. if item['finalStatus'] == "SUCCEEDED"
  159. ]
  160. if len(result) == 0:
  161. print("未获取已完成的任务,跳过")
  162. return
  163. for table_name in table_list:
  164. b, warn_reason = handle_table(table_name, result)
  165. if b:
  166. print(f"表: {table_name}, 触发告警; 告警原因: {warn_reason}")
  167. send_error_info(table_name, warn_reason, webhook_url)
  168. if __name__ == '__main__':
  169. try:
  170. _main()
  171. except Exception as e:
  172. print(f"监控告警发生异常: {e}")
  173. mgs_text = f"\n- 特征同步异常告警" \
  174. f"\n- 告警原因: 监控脚本执行异常" \
  175. f"\n- 请关注"
  176. url = "https://open.feishu.cn/open-apis/bot/v2/hook/540d4098-367a-4068-9a44-b8109652f07c"
  177. invoke_feishu_card_mgs(url, mgs_text)