feature_spark_monitor.py 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170
  1. import configparser
  2. from datetime import datetime, timedelta
  3. from typing import Dict, List
  4. import pandas as pd
  5. from client import YarnClient
  6. from util import dateutil, feishu_inform_util
  7. yarn_client = YarnClient.YarnClient("192.168.203.16")
  8. table_list = [
  9. "alg_cid_feature_basic_info",
  10. "alg_cid_feature_adver_action",
  11. "alg_cid_feature_cid_action",
  12. "alg_cid_feature_region_action",
  13. "alg_cid_feature_app_action",
  14. "alg_cid_feature_week_action",
  15. "alg_cid_feature_hour_action",
  16. "alg_cid_feature_brand_action",
  17. "alg_cid_feature_weChatVersion_action",
  18. "alg_cid_feature_vid_cf",
  19. "alg_cid_feature_vid_cf_rank",
  20. "alg_mid_feature_ad_action",
  21. "alg_vid_feature_all_exp",
  22. "alg_vid_feature_all_share",
  23. "alg_vid_feature_all_return",
  24. "alg_vid_feature_exp2share",
  25. "alg_vid_feature_share2return",
  26. "alg_vid_feature_feed_noflow_exp",
  27. "alg_vid_feature_feed_noflow_root_share",
  28. "alg_vid_feature_feed_noflow_root_return",
  29. "alg_vid_feature_feed_flow_exp",
  30. "alg_vid_feature_feed_flow_root_share",
  31. "alg_vid_feature_feed_flow_root_return",
  32. "alg_vid_feature_feed_province_exp",
  33. "alg_vid_feature_feed_province_root_share",
  34. "alg_vid_feature_feed_province_root_return",
  35. "alg_vid_feature_basic_info",
  36. "alg_recsys_feature_cf_i2i_new",
  37. "alg_vid_feature_all_exp_v2",
  38. "alg_vid_feature_exp2share_v2",
  39. "alg_vid_feature_feed_noflow_exp_v2",
  40. "alg_vid_feature_feed_noflow_root_share_v2",
  41. "alg_vid_feature_feed_noflow_root_return_v2",
  42. "alg_vid_feature_feed_flow_exp_v2",
  43. "alg_vid_feature_feed_flow_root_share_v2",
  44. "alg_vid_feature_feed_flow_root_return_v2",
  45. "alg_vid_feature_feed_province_exp_v2",
  46. "alg_vid_feature_feed_province_root_share_v2",
  47. "alg_vid_feature_feed_province_root_return_v2",
  48. "alg_recsys_feature_cf_i2i_new_v2",
  49. "alg_mid_feature_play",
  50. "alg_mid_feature_share_and_return",
  51. "alg_mid_feature_play_tags",
  52. "alg_mid_feature_return_tags",
  53. "alg_mid_feature_share_tags",
  54. "alg_mid_feature_feed_exp_share_tags",
  55. "alg_mid_feature_feed_exp_return_tags",
  56. "alg_mid_feature_sharecf",
  57. "alg_mid_feature_returncf",
  58. "alg_mid_feature_feed_exp_return_tags_v2",
  59. "alg_mid_feature_feed_exp_share_tags_v2"
  60. ]
  61. filter_date = datetime(2024, 1, 1)
  62. def df_print(result):
  63. df = pd.DataFrame(result)
  64. # 过滤出 name 中包含 'cid' 的行
  65. filtered_df = df[df['name'].str.contains('cid')].copy() # 使用 .copy() 生成副本以避免警告
  66. filtered_df.loc[:, 'name'] = filtered_df['name'].str.replace('odps sync to redis : ', '', regex=False)
  67. sorted_df = filtered_df.sort_values(by="startedTime")
  68. # 获取表头
  69. header = ' | '.join(sorted_df.columns)
  70. def format_row(row):
  71. return ' | '.join([str(row[col]) for col in sorted_df.columns])
  72. # 获取数据行
  73. rows = filtered_df.apply(format_row, axis=1).tolist()
  74. # 打印输出
  75. print(header)
  76. print('-' * len(header))
  77. print('\n'.join(rows))
  78. def handle_table(table_name: str, spark_task_list: List[Dict]) -> (bool, str):
  79. filtered_data = [
  80. item for item in spark_task_list
  81. if table_name in item['name'] and dateutil.str_cover_date(item['startedTime']) > filter_date
  82. ]
  83. if filtered_data:
  84. latest_started_time = max(
  85. [dateutil.str_cover_date(item['startedTime']) for item in filtered_data])
  86. print(f"表: {table_name}, 最后一次同步时间为: {latest_started_time}")
  87. now = datetime.now()
  88. time_difference = now - latest_started_time
  89. if time_difference > timedelta(minutes=140):
  90. return True, latest_started_time
  91. return False, filtered_data
  92. # 如果没有找到,表示近七个小时都没有同步过
  93. return True, "七小时之前"
  94. def send_error_info(table_name: str, latest_started_time: str, webhook: str):
  95. mgs_text = f"\n- 大数据表名: {table_name}" \
  96. f"\n- 最后一次同步时间: {latest_started_time}" \
  97. f"\n- 超过两个小时没有同步,请关注"
  98. card_json = {
  99. "config": {},
  100. "i18n_elements": {
  101. "zh_cn": [
  102. {
  103. "tag": "markdown",
  104. "content": "",
  105. "text_align": "left",
  106. "text_size": "normal"
  107. },
  108. {
  109. "tag": "markdown",
  110. "content": mgs_text,
  111. "text_align": "left",
  112. "text_size": "normal"
  113. }
  114. ]
  115. },
  116. "i18n_header": {
  117. "zh_cn": {
  118. "title": {
  119. "tag": "plain_text",
  120. "content": "【测试】特征同步延迟告警"
  121. },
  122. "subtitle": {
  123. "tag": "plain_text",
  124. "content": ""
  125. },
  126. "template": "red"
  127. }
  128. }
  129. }
  130. feishu_inform_util.send_card_msg_to_feishu(webhook, card_json)
  131. def _main():
  132. # 读取配置文件
  133. config = configparser.ConfigParser()
  134. config.read("/home/monitor/model_monitor/config/config.ini")
  135. webhook_url = config.get("feishu", "model.webhook")
  136. # 获取最近七小时的Spark任务
  137. hours_7_early = int((datetime.now() - timedelta(hours=7)).timestamp()) * 1000
  138. result = yarn_client.get_apps(started_time_begin=hours_7_early)
  139. for table_name in table_list:
  140. # 判断最后一次同步是否为两个小时以内
  141. b, latest_started_time = handle_table(table_name, result)
  142. if b:
  143. print(f"表: {table_name}, 最后一次同步时间距当前时间超过140分钟")
  144. send_error_info(table_name, latest_started_time, webhook_url)
  145. if __name__ == '__main__':
  146. _main()