123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166 |
- import configparser
- from datetime import datetime, timedelta
- from client import YarnClient
- from util import dateutil, feishu_inform_util
- import pandas as pd
- yarn_client = YarnClient.YarnClient("121.40.173.140")
- table_list = [
- "alg_cid_feature_basic_info",
- "alg_cid_feature_adver_action",
- "alg_cid_feature_cid_action",
- "alg_cid_feature_region_action",
- "alg_cid_feature_app_action",
- "alg_cid_feature_week_action",
- "alg_cid_feature_hour_action",
- "alg_cid_feature_brand_action",
- "alg_cid_feature_weChatVersion_action",
- "alg_cid_feature_vid_cf",
- "alg_cid_feature_vid_cf_rank",
- "alg_mid_feature_ad_action",
- "alg_vid_feature_all_exp",
- "alg_vid_feature_all_share",
- "alg_vid_feature_all_return",
- "alg_vid_feature_exp2share",
- "alg_vid_feature_share2return",
- "alg_vid_feature_feed_noflow_exp",
- "alg_vid_feature_feed_noflow_root_share",
- "alg_vid_feature_feed_noflow_root_return",
- "alg_vid_feature_feed_flow_exp",
- "alg_vid_feature_feed_flow_root_share",
- "alg_vid_feature_feed_flow_root_return",
- "alg_vid_feature_feed_province_exp",
- "alg_vid_feature_feed_province_root_share",
- "alg_vid_feature_feed_province_root_return",
- "alg_vid_feature_basic_info",
- "alg_recsys_feature_cf_i2i_new",
- "alg_vid_feature_all_exp_v2",
- "alg_vid_feature_exp2share_v2",
- "alg_vid_feature_feed_noflow_exp_v2",
- "alg_vid_feature_feed_noflow_root_share_v2",
- "alg_vid_feature_feed_noflow_root_return_v2",
- "alg_vid_feature_feed_flow_exp_v2",
- "alg_vid_feature_feed_flow_root_share_v2",
- "alg_vid_feature_feed_flow_root_return_v2",
- "alg_vid_feature_feed_province_exp_v2",
- "alg_vid_feature_feed_province_root_share_v2",
- "alg_vid_feature_feed_province_root_return_v2",
- "alg_recsys_feature_cf_i2i_new_v2",
- "alg_mid_feature_play",
- "alg_mid_feature_share_and_return",
- "alg_mid_feature_play_tags",
- "alg_mid_feature_return_tags",
- "alg_mid_feature_share_tags",
- "alg_mid_feature_feed_exp_share_tags",
- "alg_mid_feature_feed_exp_return_tags",
- "alg_mid_feature_sharecf",
- "alg_mid_feature_returncf",
- "alg_mid_feature_feed_exp_return_tags_v2",
- "alg_mid_feature_feed_exp_share_tags_v2"
- ]
- filter_date = datetime(2024, 1, 1)
- def df_print(result):
- df = pd.DataFrame(result)
- # 过滤出 name 中包含 'cid' 的行
- filtered_df = df[df['name'].str.contains('cid')].copy() # 使用 .copy() 生成副本以避免警告
- filtered_df.loc[:, 'name'] = filtered_df['name'].str.replace('odps sync to redis : ', '', regex=False)
- sorted_df = filtered_df.sort_values(by="startedTime")
- # 获取表头
- header = ' | '.join(sorted_df.columns)
- def format_row(row):
- return ' | '.join([str(row[col]) for col in sorted_df.columns])
- # 获取数据行
- rows = filtered_df.apply(format_row, axis=1).tolist()
- # 打印输出
- print(header)
- print('-' * len(header))
- print('\n'.join(rows))
- def handle_table(table_name: str, spark_task_list: list[dict]) -> (bool, str):
- filtered_data = [
- item for item in spark_task_list
- if table_name in item['name'] and dateutil.str_cover_date(item['startedTime']) > filter_date
- ]
- if filtered_data:
- latest_started_time = max(
- [dateutil.str_cover_date(item['startedTime']) for item in filtered_data])
- print(f"表: {table_name}, 最后一次同步时间为: {latest_started_time}")
- now = datetime.now()
- time_difference = now - latest_started_time
- if time_difference > timedelta(minutes=140):
- return True, latest_started_time
- return False, filtered_data
- return False, ""
- def send_error_info(table_name: str, latest_started_time: str):
- mgs_text = f"\n- 大数据表名: {table_name}" \
- f"\n- 最后一次同步时间: {latest_started_time}" \
- f"\n- 超过两个小时没有同步,请关注"
- card_json = {
- "config": {},
- "i18n_elements": {
- "zh_cn": [
- {
- "tag": "markdown",
- "content": "",
- "text_align": "left",
- "text_size": "normal"
- },
- {
- "tag": "markdown",
- "content": mgs_text,
- "text_align": "left",
- "text_size": "normal"
- }
- ]
- },
- "i18n_header": {
- "zh_cn": {
- "title": {
- "tag": "plain_text",
- "content": "【测试】大数据表同步延迟告警"
- },
- "subtitle": {
- "tag": "plain_text",
- "content": ""
- },
- "template": "red"
- }
- }
- }
- def _main():
- # 读取配置文件
- config = configparser.ConfigParser()
- config.read("config/config.ini")
- print(config.get("feishu", 'model.webhook'))
- # 获取最近七小时的Spark任务
- hours_7_early = int((datetime.now() - timedelta(hours=7)).timestamp()) * 1000
- result = yarn_client.get_apps(started_time_begin=hours_7_early)
- for table_name in table_list:
- # 判断最后一次同步是否为两个小时以内
- b, latest_started_time = handle_table(table_name, result)
- if b:
- print(f"表: {table_name}, 最后一次同步时间距当前时间超过140分钟")
- send_error_info(table_name, latest_started_time)
- if __name__ == '__main__':
- _main()
|