feature_spark_monitor.py 8.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222
  1. import argparse
  2. import configparser
  3. import json
  4. from datetime import datetime, timedelta
  5. from typing import Dict, List
  6. from client import YarnClient, ApolloClient
  7. from util import date_util, json_util, feishu_inform_util
  8. yarn_client = YarnClient.YarnClient("192.168.203.16")
  9. table_list = [
  10. "alg_mid_feature_sharecf",
  11. "alg_vid_feature_all_share",
  12. "alg_vid_feature_all_return",
  13. "alg_vid_feature_share2return",
  14. "alg_vid_feature_basic_info",
  15. "alg_recsys_feature_cf_i2i_new",
  16. "alg_vid_feature_all_exp_v2",
  17. "alg_vid_feature_exp2share_v2",
  18. "alg_vid_feature_feed_noflow_exp_v2",
  19. "alg_vid_feature_feed_noflow_root_share_v2",
  20. "alg_vid_feature_feed_noflow_root_return_v2",
  21. "alg_vid_feature_feed_flow_exp_v2",
  22. "alg_vid_feature_feed_flow_root_share_v2",
  23. "alg_vid_feature_feed_flow_root_return_v2",
  24. "alg_vid_feature_feed_province_exp_v2",
  25. "alg_vid_feature_feed_province_root_share_v2",
  26. "alg_vid_feature_feed_province_root_return_v2",
  27. "alg_cid_feature_basic_info",
  28. "alg_cid_feature_adver_action",
  29. "alg_cid_feature_cid_action",
  30. "alg_cid_feature_region_action",
  31. "alg_cid_feature_app_action",
  32. "alg_cid_feature_week_action",
  33. "alg_cid_feature_hour_action",
  34. "alg_cid_feature_brand_action",
  35. "alg_cid_feature_weChatVersion_action",
  36. "alg_cid_feature_vid_cf",
  37. "alg_cid_feature_vid_cf_rank",
  38. "alg_mid_feature_ad_action",
  39. "alg_mid_feature_play",
  40. "alg_mid_feature_share_and_return",
  41. "alg_mid_feature_play_tags",
  42. "alg_mid_feature_return_tags",
  43. "alg_mid_feature_share_tags",
  44. "alg_mid_feature_returncf",
  45. "alg_mid_feature_feed_exp_return_tags_v2",
  46. "alg_mid_feature_feed_exp_share_tags_v2",
  47. "alg_vid_global_feature_20250212",
  48. "alg_vid_recommend_exp_feature_20250212",
  49. "alg_vid_recommend_flowpool_exp_feature_20250212",
  50. "alg_vid_apptype_recommend_exp_feature_20250212",
  51. "alg_vid_province_recommend_exp_feature_20250212",
  52. "alg_vid_brand_recommend_exp_feature_20250212",
  53. "alg_vid_hotsencetype_recommend_exp_feature_20250212",
  54. "scene_type_vid_cf_feature_20250212",
  55. "vid_click_cf_feature_20250212",
  56. "alg_recsys_feature_cf_i2i_v2",
  57. "alg_channel_recommend_exp_feature_20250212",
  58. "alg_merge_cate1_recommend_exp_feature_20250212",
  59. "alg_merge_cate2_recommend_exp_feature_20250212",
  60. "alg_video_unionid_recommend_exp_feature_20250212",
  61. "mid_merge_cate1_feature_20250212",
  62. "mid_merge_cate2_feature_20250212",
  63. "mid_global_feature_20250212",
  64. ]
  65. filter_date = datetime(2024, 1, 1)
  66. current_hour = datetime.now().hour
  67. def handle_table(table_name: str, spark_task_list: List[Dict]) -> (bool, str, str):
  68. filtered_data = [
  69. item for item in spark_task_list
  70. if table_name == item['name']
  71. ]
  72. if not filtered_data:
  73. # 如果没有找到,表示近七个小时都没有同步过
  74. return True, "最近没有四小时同步过数据", "error"
  75. # 判断最近一次完成时间是否大于两个小时
  76. filtered_data.sort(key=lambda item: date_util.str_cover_date(item['finishedTime']), reverse=True)
  77. last_finished_item = filtered_data[0]
  78. print(f"表: {table_name}, 最后一次完成时间为: {last_finished_item['finishedTime']}")
  79. finished_time = date_util.str_cover_date(last_finished_item['finishedTime'])
  80. started_time = date_util.str_cover_date(last_finished_item['startedTime'])
  81. time_difference = datetime.now() - finished_time
  82. if time_difference > timedelta(minutes=120):
  83. return True, f"最近两个小时没有同步完成数据,最近一次完成时间为: {last_finished_item['finishedTime']}", "error"
  84. # 判断持续时间是否超过一个小时
  85. elapse = (finished_time - started_time)
  86. print(f"表: {table_name}, 最后一次任务持续时间为: {date_util.seconds_convert(elapse.seconds)}")
  87. if elapse > timedelta(minutes=50):
  88. return True, f"最近一次同步任务持续时间超过50分钟, 持续时间为: {date_util.seconds_convert(elapse.seconds)}", "warn"
  89. # 判断任务的完成时间是否是当前小时
  90. finished_hour = finished_time.hour
  91. print(f"表: {table_name}, 最后一次完成是: {finished_hour} 小时, 当前小时为: {current_hour}")
  92. if finished_hour != current_hour:
  93. return True, f"当前小时的任务未完成", "warn"
  94. return False, ""
  95. def invoke_feishu_card_mgs(webhook: str, content: str, alarm_level="error"):
  96. card_json = {
  97. "config": {},
  98. "i18n_elements": {
  99. "zh_cn": [
  100. {
  101. "tag": "markdown",
  102. "content": "",
  103. "text_align": "left",
  104. "text_size": "normal"
  105. },
  106. {
  107. "tag": "markdown",
  108. "content": content,
  109. "text_align": "left",
  110. "text_size": "normal"
  111. }
  112. ]
  113. },
  114. "i18n_header": {
  115. "zh_cn": {
  116. "title": {
  117. "tag": "plain_text",
  118. "content": "特征同步延迟告警"
  119. },
  120. "subtitle": {
  121. "tag": "plain_text",
  122. "content": ""
  123. },
  124. "template": "red" if alarm_level == "error" else "yellow"
  125. }
  126. }
  127. }
  128. feishu_inform_util.send_card_msg_to_feishu(webhook, card_json)
  129. def send_error_info(table_name: str, warn_reason: str, alarm_level: str, webhook: str):
  130. mgs_text = f"\n- 大数据表名: {table_name}" \
  131. f"\n- 告警原因: {warn_reason}" \
  132. f"\n- 请关注"
  133. invoke_feishu_card_mgs(webhook, mgs_text, alarm_level)
  134. def print_config(config_path):
  135. print(f"配置文件路径: {config_path}")
  136. config = configparser.ConfigParser()
  137. config.read(config_path)
  138. for section in config.sections():
  139. print(f"[{section}]")
  140. for key, value in config.items(section):
  141. print(f"{key} = {value}")
  142. def _main():
  143. parser = argparse.ArgumentParser(description="feature_spark_task_monitor")
  144. parser.add_argument("-c", "--config", required=False, help="config file path",
  145. default="/home/monitor/model_script/config/config.ini")
  146. args = parser.parse_args()
  147. print_config(args.config)
  148. # 读取配置文件
  149. config = configparser.ConfigParser()
  150. config.read(args.config)
  151. webhook_url = config.get("feishu", "model.webhook")
  152. # webhook_url = 'https://open.feishu.cn/open-apis/bot/v2/hook/540d4098-367a-4068-9a44-b8109652f07c'
  153. apollo_meta_url = config.get("apollo", "meta")
  154. apollo = ApolloClient.ApolloClient(apollo_meta_url)
  155. # 获取最近4小时的Spark任务
  156. hours_7_early = int((datetime.now() - timedelta(hours=4)).timestamp()) * 1000
  157. result = yarn_client.get_apps(finished_time_begin=hours_7_early)
  158. result = [
  159. {**item, 'name': item['name'].split(":")[1].strip()}
  160. for item in result
  161. if item['finalStatus'] == "SUCCEEDED"
  162. ]
  163. if len(result) == 0:
  164. print("未获取到已完成的任务,跳过")
  165. return
  166. j = apollo.get_config("recommend-feature", "default", "application")
  167. dts_config = json_util.remove_comments(j['configurations']['dts.config'])
  168. table_config_list = json.loads(dts_config)
  169. for table_config in table_config_list:
  170. print("\n\n\n")
  171. table_name = table_config['odps']['table']
  172. print(f"当前处理的表为: {table_name}")
  173. partitions = table_config['odps']['partition']
  174. if len(partitions) != 2:
  175. # 只处理有两个分区键的小时级表
  176. print(f"表: {table_name} 的分区键只有一个,暂不处理")
  177. continue
  178. b, warn_reason, alarm_level = handle_table(table_name, result)
  179. if b:
  180. print(f"表: {table_name}, 触发告警; 告警原因: {warn_reason}")
  181. send_error_info(table_name, warn_reason, alarm_level, webhook_url)
  182. if __name__ == '__main__':
  183. try:
  184. _main()
  185. except Exception as e:
  186. print(f"监控告警发生异常: {e}")
  187. txt = f"\n- 特征同步异常告警" \
  188. f"\n- 告警原因: 监控脚本执行异常" \
  189. f"\n- 请关注"
  190. url = "https://open.feishu.cn/open-apis/bot/v2/hook/540d4098-367a-4068-9a44-b8109652f07c"
  191. invoke_feishu_card_mgs(url, txt, "error")