feature_spark_monitor.py 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134
  1. import configparser
  2. from datetime import datetime, timedelta
  3. from typing import Dict, List
  4. from client import YarnClient
  5. from util import date_util, feishu_inform_util
  6. yarn_client = YarnClient.YarnClient("192.168.203.16")
  7. table_list = [
  8. "alg_mid_feature_sharecf",
  9. "alg_vid_feature_all_share",
  10. "alg_vid_feature_all_return",
  11. "alg_vid_feature_share2return",
  12. "alg_vid_feature_basic_info",
  13. "alg_recsys_feature_cf_i2i_new",
  14. "alg_vid_feature_all_exp_v2",
  15. "alg_vid_feature_exp2share_v2",
  16. "alg_vid_feature_feed_noflow_exp_v2",
  17. "alg_vid_feature_feed_noflow_root_share_v2",
  18. "alg_vid_feature_feed_noflow_root_return_v2",
  19. "alg_vid_feature_feed_flow_exp_v2",
  20. "alg_vid_feature_feed_flow_root_share_v2",
  21. "alg_vid_feature_feed_flow_root_return_v2",
  22. "alg_vid_feature_feed_province_exp_v2",
  23. "alg_vid_feature_feed_province_root_share_v2",
  24. "alg_vid_feature_feed_province_root_return_v2",
  25. "alg_recsys_feature_cf_i2i_new_v2",
  26. "alg_cid_feature_basic_info",
  27. "alg_cid_feature_adver_action",
  28. "alg_cid_feature_cid_action",
  29. "alg_cid_feature_region_action",
  30. "alg_cid_feature_app_action",
  31. "alg_cid_feature_week_action",
  32. "alg_cid_feature_hour_action",
  33. "alg_cid_feature_brand_action",
  34. "alg_cid_feature_weChatVersion_action",
  35. "alg_cid_feature_vid_cf",
  36. "alg_cid_feature_vid_cf_rank",
  37. "alg_mid_feature_ad_action",
  38. "alg_mid_feature_play",
  39. "alg_mid_feature_share_and_return",
  40. "alg_mid_feature_play_tags",
  41. "alg_mid_feature_return_tags",
  42. "alg_mid_feature_share_tags",
  43. "alg_mid_feature_returncf",
  44. "alg_mid_feature_feed_exp_return_tags_v2",
  45. "alg_mid_feature_feed_exp_share_tags_v2"
  46. ]
  47. filter_date = datetime(2024, 1, 1)
  48. def handle_table(table_name: str, spark_task_list: List[Dict]) -> (bool, str):
  49. filtered_data = [
  50. item for item in spark_task_list
  51. if table_name in item['name'] and date_util.str_cover_date(item['finishedTime']) > filter_date
  52. ]
  53. print(f"{table_name} ==> {filtered_data}")
  54. if filtered_data:
  55. latest_started_time = max(
  56. [date_util.str_cover_date(item['finishedTime']) for item in filtered_data])
  57. print(f"表: {table_name}, 最后一次同步完成时间为: {latest_started_time}")
  58. now = datetime.now()
  59. time_difference = now - latest_started_time
  60. if time_difference > timedelta(minutes=140):
  61. return True, latest_started_time
  62. else:
  63. return False, filtered_data
  64. else:
  65. # 如果没有找到,表示近七个小时都没有同步过
  66. return True, "七小时之前"
  67. def send_error_info(table_name: str, latest_started_time: str, webhook: str):
  68. mgs_text = f"\n- 大数据表名: {table_name}" \
  69. f"\n- 最后一次同步时间: {latest_started_time}" \
  70. f"\n- 超过两个小时没有同步,请关注"
  71. card_json = {
  72. "config": {},
  73. "i18n_elements": {
  74. "zh_cn": [
  75. {
  76. "tag": "markdown",
  77. "content": "",
  78. "text_align": "left",
  79. "text_size": "normal"
  80. },
  81. {
  82. "tag": "markdown",
  83. "content": mgs_text,
  84. "text_align": "left",
  85. "text_size": "normal"
  86. }
  87. ]
  88. },
  89. "i18n_header": {
  90. "zh_cn": {
  91. "title": {
  92. "tag": "plain_text",
  93. "content": "特征同步延迟告警"
  94. },
  95. "subtitle": {
  96. "tag": "plain_text",
  97. "content": ""
  98. },
  99. "template": "red"
  100. }
  101. }
  102. }
  103. feishu_inform_util.send_card_msg_to_feishu(webhook, card_json)
  104. def _main():
  105. # 读取配置文件
  106. config = configparser.ConfigParser()
  107. config.read("/home/monitor/model_monitor/config/config.ini")
  108. webhook_url = config.get("feishu", "model.webhook")
  109. # 获取最近七小时的Spark任务
  110. hours_7_early = int((datetime.now() - timedelta(hours=7)).timestamp()) * 1000
  111. result = yarn_client.get_apps(finished_time_begin=hours_7_early)
  112. result = filter(lambda item: item['finalStatus'] == 'SUCCEEDED', result)
  113. for table_name in table_list:
  114. # 判断最后一次同步是否为两个小时以内
  115. b, latest_started_time = handle_table(table_name, result)
  116. if b:
  117. print(f"表: {table_name}, 最后一次同步时间距当前时间超过140分钟")
  118. # send_error_info(table_name, latest_started_time, webhook_url)
  119. if __name__ == '__main__':
  120. _main()