feature_spark_monitor.py 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188
  1. import configparser
  2. from datetime import datetime, timedelta
  3. from typing import Dict, List
  4. import pandas as pd
  5. from client import YarnClient, ODPSClient
  6. from util import date_util, feishu_inform_util
  7. yarn_client = YarnClient.YarnClient("121.40.173.140")
  8. odps_client = ODPSClient.ODPSClient()
  9. table_list = [
  10. "alg_mid_feature_sharecf",
  11. "alg_vid_feature_all_share",
  12. "alg_vid_feature_all_return",
  13. "alg_vid_feature_share2return",
  14. "alg_vid_feature_basic_info",
  15. "alg_recsys_feature_cf_i2i_new",
  16. "alg_vid_feature_all_exp_v2",
  17. "alg_vid_feature_exp2share_v2",
  18. "alg_vid_feature_feed_noflow_exp_v2",
  19. "alg_vid_feature_feed_noflow_root_share_v2",
  20. "alg_vid_feature_feed_noflow_root_return_v2",
  21. "alg_vid_feature_feed_flow_exp_v2",
  22. "alg_vid_feature_feed_flow_root_share_v2",
  23. "alg_vid_feature_feed_flow_root_return_v2",
  24. "alg_vid_feature_feed_province_exp_v2",
  25. "alg_vid_feature_feed_province_root_share_v2",
  26. "alg_vid_feature_feed_province_root_return_v2",
  27. "alg_recsys_feature_cf_i2i_new_v2",
  28. "alg_cid_feature_basic_info",
  29. "alg_cid_feature_adver_action",
  30. "alg_cid_feature_cid_action",
  31. "alg_cid_feature_region_action",
  32. "alg_cid_feature_app_action",
  33. "alg_cid_feature_week_action",
  34. "alg_cid_feature_hour_action",
  35. "alg_cid_feature_brand_action",
  36. "alg_cid_feature_weChatVersion_action",
  37. "alg_cid_feature_vid_cf",
  38. "alg_cid_feature_vid_cf_rank",
  39. "alg_mid_feature_ad_action",
  40. "alg_mid_feature_play",
  41. "alg_mid_feature_share_and_return",
  42. "alg_mid_feature_play_tags",
  43. "alg_mid_feature_return_tags",
  44. "alg_mid_feature_share_tags",
  45. "alg_mid_feature_returncf",
  46. "alg_mid_feature_feed_exp_return_tags_v2",
  47. "alg_mid_feature_feed_exp_share_tags_v2"
  48. ]
  49. filter_date = datetime(2024, 1, 1)
  50. columns = ["id", "表名", "startedTime", "launchTime", "finishedTime", "state", "elapsedTime", "分区", "数据量",
  51. "数据大小", "创建时间", "更新时间"]
  52. def df_print(result):
  53. df = pd.DataFrame(result)
  54. sorted_df = df.sort_values(by="startedTime")
  55. sorted_df = sorted_df[columns]
  56. # 获取表头
  57. header = ' | '.join(sorted_df.columns)
  58. # 获取数据行
  59. def format_row(row):
  60. return ' | '.join([str(row[col]) for col in sorted_df.columns])
  61. rows = sorted_df.apply(format_row, axis=1).tolist()
  62. # 打印输出
  63. print(header)
  64. print('\n'.join(rows))
  65. def handle_table(table_name: str, spark_task_list: List[Dict]) -> (bool, str):
  66. filtered_data = [
  67. item for item in spark_task_list
  68. if table_name in item['name'] and date_util.str_cover_date(item['startedTime']) > filter_date
  69. ]
  70. if filtered_data:
  71. latest_started_time = max(
  72. [date_util.str_cover_date(item['startedTime']) for item in filtered_data])
  73. print(f"表: {table_name}, 最后一次同步时间为: {latest_started_time}")
  74. now = datetime.now()
  75. time_difference = now - latest_started_time
  76. if time_difference > timedelta(minutes=140):
  77. return True, latest_started_time
  78. return False, filtered_data
  79. # 如果没有找到,表示近七个小时都没有同步过
  80. return True, "七小时之前"
  81. def send_error_info(table_name: str, latest_started_time: str, webhook: str):
  82. mgs_text = f"\n- 大数据表名: {table_name}" \
  83. f"\n- 最后一次同步时间: {latest_started_time}" \
  84. f"\n- 超过两个小时没有同步,请关注"
  85. card_json = {
  86. "config": {},
  87. "i18n_elements": {
  88. "zh_cn": [
  89. {
  90. "tag": "markdown",
  91. "content": "",
  92. "text_align": "left",
  93. "text_size": "normal"
  94. },
  95. {
  96. "tag": "markdown",
  97. "content": mgs_text,
  98. "text_align": "left",
  99. "text_size": "normal"
  100. }
  101. ]
  102. },
  103. "i18n_header": {
  104. "zh_cn": {
  105. "title": {
  106. "tag": "plain_text",
  107. "content": "【测试】特征同步延迟告警"
  108. },
  109. "subtitle": {
  110. "tag": "plain_text",
  111. "content": ""
  112. },
  113. "template": "red"
  114. }
  115. }
  116. }
  117. feishu_inform_util.send_card_msg_to_feishu(webhook, card_json)
  118. def _main():
  119. # 读取配置文件
  120. config = configparser.ConfigParser()
  121. config.read("/home/monitor/model_monitor/config/config.ini")
  122. webhook_url = config.get("feishu", "model.webhook")
  123. # 获取最近七小时的Spark任务
  124. hours_7_early = int((datetime.now() - timedelta(hours=7)).timestamp()) * 1000
  125. result = yarn_client.get_apps(started_time_begin=hours_7_early)
  126. for table_name in table_list:
  127. # 判断最后一次同步是否为两个小时以内
  128. b, latest_started_time = handle_table(table_name, result)
  129. if b:
  130. print(f"表: {table_name}, 最后一次同步时间距当前时间超过140分钟")
  131. send_error_info(table_name, latest_started_time, webhook_url)
  132. def _analyse():
  133. hours_7_early = int((datetime.now() - timedelta(hours=14)).timestamp()) * 1000
  134. result = yarn_client.get_apps(started_time_begin=hours_7_early)
  135. result = [
  136. {
  137. **{k: v for k, v in item.items() if k != 'name'},
  138. 'table_name': item['name'].split(":")[1].strip()
  139. }
  140. for item in result
  141. if "alg" in item['name'] and item['state'] == 'RUNNING'
  142. ]
  143. partition_info = {}
  144. for table_name in list({item['table_name'] for item in result}):
  145. resp = odps_client.get_all_partition_info(table_name=table_name)
  146. partition_info[table_name] = {item['分区']: item for item in resp}
  147. spark_task_list = []
  148. for item in result:
  149. dt_hh = date_util.date_convert_dt_hh(item['startedTime'])
  150. if item['table_name'] in partition_info and dt_hh in partition_info[item['table_name']]:
  151. item = {
  152. **item,
  153. **partition_info[item['table_name']][dt_hh]
  154. }
  155. spark_task_list.append(item)
  156. df_print(spark_task_list)
  157. if __name__ == '__main__':
  158. # _main()
  159. _analyse()