|
@@ -0,0 +1,435 @@
|
|
|
|
+# -*- coding: utf-8 -*-
|
|
|
|
+
|
|
|
|
+import pandas as pd
|
|
|
|
+import traceback
|
|
|
|
+import odps
|
|
|
|
+from odps import ODPS
|
|
|
|
+from threading import Timer
|
|
|
|
+from datetime import datetime, timedelta
|
|
|
|
+from db_helper import MysqlHelper
|
|
|
|
+from my_utils import check_table_partition_exits_v2, get_dataframe_from_odps, \
|
|
|
|
+ get_odps_df_of_max_partition, get_odps_instance, get_odps_df_of_recent_partitions
|
|
|
|
+from my_utils import request_post, send_msg_to_feishu
|
|
|
|
+from my_config import set_config
|
|
|
|
+import numpy as np
|
|
|
|
+from log import Log
|
|
|
|
+import os
|
|
|
|
+from argparse import ArgumentParser
|
|
|
|
+
|
|
|
|
+CONFIG, _ = set_config()
|
|
|
|
+LOGGER = Log()
|
|
|
|
+
|
|
|
|
+BASE_GROUP_NAME = '3rd-party-base'
|
|
|
|
+EXPLORE1_GROUP_NAME = '3rd-party-explore1'
|
|
|
|
+EXPLORE2_GROUP_NAME = '3rd-party-explore2'
|
|
|
|
+#TODO: fetch gh_id from external data source
|
|
|
|
+GH_IDS = ('gh_2863155cedcb',
|
|
|
|
+'gh_c1acd6bac0f8',
|
|
|
|
+'gh_da993c5f7f64',
|
|
|
|
+'gh_495d71abbda2',
|
|
|
|
+'gh_e2318164f869',
|
|
|
|
+'gh_fc4ec610756e',
|
|
|
|
+'gh_2450ad774945',
|
|
|
|
+'gh_175925e40318',
|
|
|
|
+'gh_994adaf7a539',
|
|
|
|
+'gh_250c51d5ce69',
|
|
|
|
+'gh_37adcd637351',
|
|
|
|
+'gh_a36405f4e5d3',
|
|
|
|
+'gh_ee5b4b07ed8b',
|
|
|
|
+'gh_11debb2c8392',
|
|
|
|
+'gh_d645c1ef7fb0',
|
|
|
|
+'gh_1899b728af86',
|
|
|
|
+'gh_059a27ea86b2',
|
|
|
|
+'gh_6454c103be14',
|
|
|
|
+'gh_63745bad4f21',
|
|
|
|
+'gh_8a29eebc2012',
|
|
|
|
+'gh_57bc9846c86a',
|
|
|
|
+'gh_570967881eae',
|
|
|
|
+'gh_197a0d8caa31',
|
|
|
|
+'gh_93af434e3f47',
|
|
|
|
+'gh_184f2d765f55',
|
|
|
|
+'gh_8157d8fd284e',
|
|
|
|
+'gh_8e1d1f19d44f',
|
|
|
|
+'gh_1da8f62f4a0d',
|
|
|
|
+'gh_fd4df7c45bb9',
|
|
|
|
+'gh_dcfcf74b0846',
|
|
|
|
+'gh_3afc3a8b8a3d',
|
|
|
|
+'gh_ef699270bf64',
|
|
|
|
+'gh_ba870f8b178b',
|
|
|
|
+'gh_58cdb2f1f0d0',
|
|
|
|
+'gh_3dce33de6994',
|
|
|
|
+'gh_543e6d7e15f3',
|
|
|
|
+'gh_0d55fee6b78d',
|
|
|
|
+'gh_1c11651e0df4',
|
|
|
|
+'gh_7f4741dd5fea',
|
|
|
|
+'gh_33e9e4cbed84',
|
|
|
|
+'gh_23fa67ebb016',
|
|
|
|
+'gh_33c26df4eab0',
|
|
|
|
+'gh_01c93b07605f',
|
|
|
|
+'gh_c655e3c8a121',
|
|
|
|
+'gh_83adb78f4ede',
|
|
|
|
+'gh_0cc7c6d712eb',
|
|
|
|
+'gh_1b8bfb5c4ffd',
|
|
|
|
+'gh_e4fb77b1023b',
|
|
|
|
+'gh_f994f5d9a4b6',
|
|
|
|
+'gh_e0ca8ba4ed91',
|
|
|
|
+'gh_b2b4d5aa6b49',
|
|
|
|
+'gh_53759f90b0c5',
|
|
|
|
+'gh_d219a0cc8a35',
|
|
|
|
+'gh_930b5ef5a185',
|
|
|
|
+'gh_22cad14dc4ec',
|
|
|
|
+'gh_8734c02f2983',
|
|
|
|
+'gh_8d68e68f2d08',
|
|
|
|
+'gh_c603033bf881',
|
|
|
|
+'gh_55ac4e447179',
|
|
|
|
+'gh_8b5c838ac19a',
|
|
|
|
+'gh_aed71f26e7e6',
|
|
|
|
+'gh_330ef0db846d',
|
|
|
|
+'gh_87eca527c626',
|
|
|
|
+'gh_7a14b4c15090',
|
|
|
|
+'gh_b74693fed783',
|
|
|
|
+'gh_e1594e6db64b',
|
|
|
|
+'gh_d32daba8ccf8',
|
|
|
|
+'gh_23e084923def',
|
|
|
|
+'gh_148aa4a776ce',
|
|
|
|
+'gh_0df4d6b647ea',
|
|
|
|
+'gh_041d1c819c30',
|
|
|
|
+'gh_7e33fbba4398',
|
|
|
|
+'gh_354ab82cf9b3',
|
|
|
|
+'gh_b1f1d7a1f351',
|
|
|
|
+'gh_793647539ef5',
|
|
|
|
+'gh_1ff0c29f8408',
|
|
|
|
+'gh_ecef1c08bcf4',
|
|
|
|
+'gh_22f53f6a2b5d',
|
|
|
|
+'gh_34820675d0fc',
|
|
|
|
+'gh_4175a8f745f6',
|
|
|
|
+'gh_81145598368a',
|
|
|
|
+'gh_5f0bb5822e10',
|
|
|
|
+'gh_65d8db4e97ca',
|
|
|
|
+'gh_a09594d52fda',
|
|
|
|
+'gh_4411cf1e5f4e',
|
|
|
|
+'gh_9ee5f5e8425f',
|
|
|
|
+'gh_df24adad2521',
|
|
|
|
+'gh_30b472377707',
|
|
|
|
+'gh_bb6775b47656',
|
|
|
|
+'gh_69808935bba0',
|
|
|
|
+'gh_fb77872bf907',
|
|
|
|
+'gh_830c4aa1b262',
|
|
|
|
+'gh_b5393e35caa4',
|
|
|
|
+'gh_fa7dceae7c9d',
|
|
|
|
+'gh_449fb0c2d817',
|
|
|
|
+'gh_d6e75ad9094f',
|
|
|
|
+'gh_1cbb453a800e',
|
|
|
|
+'gh_1b243f162cbd',
|
|
|
|
+'gh_50db6881c86e',
|
|
|
|
+'gh_9d94775e8137',
|
|
|
|
+'gh_d37101fb9b98',
|
|
|
|
+'gh_ed86d05703eb',
|
|
|
|
+'gh_ac4072121e24',
|
|
|
|
+'gh_620af8e24fb9',
|
|
|
|
+'gh_ee4783ded544',
|
|
|
|
+'gh_d2bb5f1b9498',
|
|
|
|
+'gh_5044de6e1597',
|
|
|
|
+'gh_d94de77a8d08',
|
|
|
|
+'gh_98624814f69a',
|
|
|
|
+'gh_4c38b9d4474a',
|
|
|
|
+'gh_f2a6c90c56cb',
|
|
|
|
+'gh_26f1353fda5a',
|
|
|
|
+'gh_143743361496',
|
|
|
|
+'gh_126c99b39cea',
|
|
|
|
+'gh_53e6e0a1b1bd',
|
|
|
|
+'gh_859aafbcda3d',
|
|
|
|
+'gh_cfce2617bd82',
|
|
|
|
+'gh_db8ea2bc6687',
|
|
|
|
+'gh_c4708b8cfe39',
|
|
|
|
+'gh_57d2388bd01d',
|
|
|
|
+'gh_5fffe35cc12a',
|
|
|
|
+'gh_45980a6448f3',
|
|
|
|
+'gh_f5120c12ee23',
|
|
|
|
+'gh_bf79e3645d7a',
|
|
|
|
+'gh_6c6d81dd642d',
|
|
|
|
+'gh_57ee6a6ef204',
|
|
|
|
+'gh_45be25d1c06b',
|
|
|
|
+'gh_3ee85ba7c3ae',
|
|
|
|
+'gh_7c89d5a3e745',
|
|
|
|
+'gh_c46be9ea4eef',
|
|
|
|
+'gh_cedc3c4eb48b',
|
|
|
|
+'gh_8a91fa7f32aa',
|
|
|
|
+'gh_5207b355776f',
|
|
|
|
+'gh_6c7f73de400b',
|
|
|
|
+'gh_d2f3805f8fa3',
|
|
|
|
+'gh_7dd47f8aca4e',
|
|
|
|
+'gh_967f9abb9ccd',
|
|
|
|
+'gh_f46c6c9b53fa',
|
|
|
|
+'gh_086abf2a536b',
|
|
|
|
+'gh_6e11282216f3',
|
|
|
|
+'gh_f5332b8dfb63',
|
|
|
|
+'gh_f78610e292ba',
|
|
|
|
+'gh_06699758fa4b',
|
|
|
|
+'gh_92323d0bea11',
|
|
|
|
+'gh_517aed4e8197',
|
|
|
|
+'gh_c80462b5a330',
|
|
|
|
+'gh_1b1c3ced734e',
|
|
|
|
+'gh_dd54e30b03ad',
|
|
|
|
+'gh_cadd0ea4fab3',
|
|
|
|
+'gh_ef07a709127e',
|
|
|
|
+'gh_ab6ca922e605',
|
|
|
|
+'gh_8b69b67ea723',
|
|
|
|
+'gh_363c54315788',
|
|
|
|
+'gh_a363987c60bf',
|
|
|
|
+'gh_86ca35774fcf',
|
|
|
|
+'gh_518694803ae7',
|
|
|
|
+'gh_f98d5f17e9ea',
|
|
|
|
+'gh_5e0cd3f7b457',
|
|
|
|
+'gh_9e0d149e2c0a',
|
|
|
|
+'gh_7e77b09bb4f5',
|
|
|
|
+'gh_261bbd99a906',
|
|
|
|
+'gh_2dc8e3a7b6c9',
|
|
|
|
+'gh_1ec8dae66c97',
|
|
|
|
+'gh_7f062810b4e7',
|
|
|
|
+'gh_3c112c0c9c8b',
|
|
|
|
+'gh_01cd19465b39',
|
|
|
|
+'gh_8cc8ae6eb9a5',
|
|
|
|
+'gh_210f7ce6f418',
|
|
|
|
+'gh_04804a94e325',
|
|
|
|
+'gh_4685665647f0',
|
|
|
|
+'gh_d7fa96aeb839',
|
|
|
|
+'gh_210cb680d83d',
|
|
|
|
+'gh_862b00a394e3',
|
|
|
|
+'gh_3cf7b310906a',
|
|
|
|
+'gh_669555ebea28',
|
|
|
|
+'gh_aaac62205137',
|
|
|
|
+'gh_0a03f8fa63ba',
|
|
|
|
+'gh_b8b2d4184832',
|
|
|
|
+'gh_819a632d4bb1',
|
|
|
|
+'gh_db09b87a0fc9',
|
|
|
|
+'gh_b673c01e7bd8',
|
|
|
|
+'gh_6da61a15044a',
|
|
|
|
+'gh_2f1fab4efaef',
|
|
|
|
+'gh_da22f64152d5',
|
|
|
|
+'gh_ff9fe99f2097',
|
|
|
|
+'gh_33731afddbdb',
|
|
|
|
+'gh_4d2f75c3c3fe',
|
|
|
|
+'gh_40ff43c50773',
|
|
|
|
+'gh_56b65b7d4520',
|
|
|
|
+'gh_ff16c412ab97',
|
|
|
|
+'gh_8bf689ae15cc',
|
|
|
|
+'gh_650b17dbba8f',
|
|
|
|
+'gh_b63b9dde3f4b',
|
|
|
|
+'gh_36e74017026e',
|
|
|
|
+'gh_a8851bfa953b',
|
|
|
|
+'gh_ec5beb465640',
|
|
|
|
+'gh_133c36b99b14',
|
|
|
|
+'gh_b144210318e5',
|
|
|
|
+'gh_3bffce62dbb4',
|
|
|
|
+'gh_2fbff340c683',
|
|
|
|
+'gh_3ceae370dcf5',
|
|
|
|
+'gh_530b634707b0',
|
|
|
|
+'gh_b7cdece20099',
|
|
|
|
+'gh_9e0c7a370aaf',
|
|
|
|
+'gh_96412c0393e3',
|
|
|
|
+'gh_c8060587e6d1',
|
|
|
|
+'gh_0d3c97cc30cc',
|
|
|
|
+'gh_491189a534f2',
|
|
|
|
+'gh_fe9620386c2c',
|
|
|
|
+'gh_9d50b7067f07',
|
|
|
|
+'gh_e1331141406a',
|
|
|
|
+'gh_d6db13fcf14d',
|
|
|
|
+'gh_5522900b6a67',
|
|
|
|
+'gh_a7c21403c493',
|
|
|
|
+'gh_eeec7c2e28a5',
|
|
|
|
+'gh_c783350a9660',)
|
|
|
|
+CDN_IMG_OPERATOR = "?x-oss-process=image/resize,m_fill,w_600,h_480,limit_0/format,jpg/watermark,image_eXNoL3BpYy93YXRlcm1hcmtlci9pY29uX3BsYXlfd2hpdGUucG5nP3gtb3NzLXByb2Nlc3M9aW1hZ2UvcmVzaXplLHdfMTQ0,g_center"
|
|
|
|
+
|
|
|
|
+ODS_PROJECT = "loghubods"
|
|
|
|
+GH_REPLY_STATS_TABLE = 'alg_growth_3rd_gh_reply_video_stats'
|
|
|
|
+ODPS_RANK_RESULT_TABLE = 'alg_gh_autoreply_video_rank_data'
|
|
|
|
+RDS_RANK_RESULT_TABLE = 'alg_gh_autoreply_video_rank_data'
|
|
|
|
+STATS_PERIOD_DAYS = 5
|
|
|
|
+SEND_N = 1
|
|
|
|
+
|
|
|
|
+def check_data_partition(project, table, data_dt, data_hr=None):
|
|
|
|
+ """检查数据是否准备好"""
|
|
|
|
+ try:
|
|
|
|
+ partition_spec = {'dt': data_dt}
|
|
|
|
+ if data_hr:
|
|
|
|
+ partition_spec['hour'] = data_hr
|
|
|
|
+ part_exist, data_count = check_table_partition_exits_v2(
|
|
|
|
+ project, table, partition_spec)
|
|
|
|
+ except Exception as e:
|
|
|
|
+ data_count = 0
|
|
|
|
+ return data_count
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+def get_last_strategy_result(project, rank_table, dt_version, key):
|
|
|
|
+ strategy_df = get_odps_df_of_max_partition(
|
|
|
|
+ project, rank_table, { 'ctime': dt_version }
|
|
|
|
+ ).to_pandas()
|
|
|
|
+ sub_df = strategy_df.query(f'strategy_key == "{key}"')
|
|
|
|
+ sub_df = sub_df[['gh_id', 'video_id', 'strategy_key', 'sort']].drop_duplicates()
|
|
|
|
+ return sub_df
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+def process_reply_stats(project, table, period, run_dt):
|
|
|
|
+ # 获取多天即转统计数据用于聚合
|
|
|
|
+ df = get_odps_df_of_recent_partitions(project, table, period, {'dt': run_dt})
|
|
|
|
+ df = df.to_pandas()
|
|
|
|
+
|
|
|
|
+ df['video_id'] = df['video_id'].astype('int64')
|
|
|
|
+ df = df[['gh_id', 'video_id', 'send_count', 'first_visit_uv', 'day0_return']]
|
|
|
|
+
|
|
|
|
+ # 账号内聚合
|
|
|
|
+ df = df.groupby(['video_id', 'gh_id']).agg({
|
|
|
|
+ 'send_count': 'sum',
|
|
|
|
+ 'first_visit_uv': 'sum',
|
|
|
|
+ 'day0_return': 'sum'
|
|
|
|
+ }).reset_index()
|
|
|
|
+
|
|
|
|
+ # 聚合所有数据作为default
|
|
|
|
+ default_stats_df = df.groupby('video_id').agg({
|
|
|
|
+ 'send_count': 'sum',
|
|
|
|
+ 'first_visit_uv': 'sum',
|
|
|
|
+ 'day0_return': 'sum'
|
|
|
|
+ }).reset_index()
|
|
|
|
+ default_stats_df['gh_id'] = 'default'
|
|
|
|
+
|
|
|
|
+ merged_df = pd.concat([df, default_stats_df]).reset_index(drop=True)
|
|
|
|
+
|
|
|
|
+ merged_df['score'] = merged_df['day0_return'] / (merged_df['send_count'] + 500)
|
|
|
|
+ return merged_df
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+def rank_for_base(run_dt, run_hour, project, stats_table, rank_table, stg_key):
|
|
|
|
+ stats_df = process_reply_stats(project, stats_table, STATS_PERIOD_DAYS, run_dt)
|
|
|
|
+
|
|
|
|
+ #TODO: support to set base manually
|
|
|
|
+ dt_version = f'{run_dt}{run_hour}'
|
|
|
|
+
|
|
|
|
+ # 获取当前base信息, 策略表dt_version(ctime partition)采用当前时间
|
|
|
|
+ base_strategy_df = get_last_strategy_result(
|
|
|
|
+ project, rank_table, dt_version, stg_key)
|
|
|
|
+
|
|
|
|
+ default_stats_df = stats_df.query('gh_id == "default"')
|
|
|
|
+
|
|
|
|
+ # 在账号内排序,决定该账号(包括default)的base利用内容
|
|
|
|
+ # 排序过程中,确保当前base策略参与排序,因此先关联再过滤
|
|
|
|
+ gh_ids_str = ','.join(f'"{x}"' for x in GH_IDS)
|
|
|
|
+ stats_df = stats_df.query(f'gh_id in ({gh_ids_str})')
|
|
|
|
+
|
|
|
|
+ stats_with_strategy_df = stats_df \
|
|
|
|
+ .merge(
|
|
|
|
+ base_strategy_df,
|
|
|
|
+ on=['gh_id', 'video_id'],
|
|
|
|
+ how='left') \
|
|
|
|
+ .query('strategy_key.notna() or score > 0.1')
|
|
|
|
+
|
|
|
|
+ # 合并default和分账号数据
|
|
|
|
+ grouped_stats_df = pd.concat([default_stats_df, stats_with_strategy_df]).reset_index()
|
|
|
|
+
|
|
|
|
+ def set_top_n(group, n=2):
|
|
|
|
+ group_sorted = group.sort_values(by='score', ascending=False)
|
|
|
|
+ top_n = group_sorted.head(n)
|
|
|
|
+ top_n['sort'] = range(1, len(top_n) + 1)
|
|
|
|
+ return top_n
|
|
|
|
+ ranked_df = grouped_stats_df.groupby('gh_id').apply(set_top_n, SEND_N)
|
|
|
|
+ ranked_df = ranked_df.reset_index(drop=True)
|
|
|
|
+ #ranked_df['sort'] = grouped_stats_df.groupby('gh_id')['score'].rank(ascending=False)
|
|
|
|
+ ranked_df['strategy_key'] = stg_key
|
|
|
|
+ ranked_df['dt_version'] = dt_version
|
|
|
|
+ ranked_df = ranked_df[['strategy_key', 'dt_version', 'gh_id', 'sort', 'video_id', 'score']]
|
|
|
|
+ return ranked_df
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+def check_result_data(df):
|
|
|
|
+ for gh_id in GH_IDS + ('default', ):
|
|
|
|
+ for key in (EXPLORE1_GROUP_NAME, EXPLORE2_GROUP_NAME, BASE_GROUP_NAME):
|
|
|
|
+ sub_df = df.query(f'gh_id == "{gh_id}" and strategy_key == "{key}"')
|
|
|
|
+ # if len(sub_df) != SEND_N:
|
|
|
|
+ # raise Exception(f"Result not enough for gh_id[{gh_id}]")
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+def build_and_transfer_data(run_dt, run_hour, project, **kwargs):
|
|
|
|
+ dt_version = f'{run_dt}{run_hour}'
|
|
|
|
+ dry_run = kwargs.get('dry_run', False)
|
|
|
|
+
|
|
|
|
+ layer1_rank = rank_for_base(run_dt, run_hour, ODS_PROJECT, GH_REPLY_STATS_TABLE, ODPS_RANK_RESULT_TABLE, EXPLORE1_GROUP_NAME)
|
|
|
|
+ layer2_rank = rank_for_base(run_dt, run_hour, ODS_PROJECT, GH_REPLY_STATS_TABLE, ODPS_RANK_RESULT_TABLE, EXPLORE2_GROUP_NAME)
|
|
|
|
+ base_rank = rank_for_base(run_dt, run_hour, ODS_PROJECT, GH_REPLY_STATS_TABLE, ODPS_RANK_RESULT_TABLE, BASE_GROUP_NAME)
|
|
|
|
+ final_rank_df = pd.concat([layer1_rank, layer2_rank, base_rank]).reset_index(drop=True)
|
|
|
|
+ check_result_data(final_rank_df)
|
|
|
|
+
|
|
|
|
+ odps_instance = get_odps_instance(project)
|
|
|
|
+ odps_ranked_df = odps.DataFrame(final_rank_df)
|
|
|
|
+
|
|
|
|
+ video_df = get_dataframe_from_odps('videoods', 'wx_video')
|
|
|
|
+ video_df['cover_url'] = video_df['cover_img_path'] + CDN_IMG_OPERATOR
|
|
|
|
+ video_df = video_df['id', 'title', 'cover_url']
|
|
|
|
+ final_df = odps_ranked_df.join(video_df, on=('video_id', 'id'))
|
|
|
|
+
|
|
|
|
+ final_df = final_df.to_pandas()
|
|
|
|
+ final_df = final_df[['strategy_key', 'dt_version', 'gh_id', 'sort', 'video_id', 'title', 'cover_url', 'score']]
|
|
|
|
+
|
|
|
|
+ # reverse sending order
|
|
|
|
+ final_df['sort'] = SEND_N + 1 - final_df['sort']
|
|
|
|
+
|
|
|
|
+ if dry_run:
|
|
|
|
+ print(final_df[['strategy_key', 'gh_id', 'sort', 'video_id', 'score', 'title']])
|
|
|
|
+ return
|
|
|
|
+
|
|
|
|
+ # save to ODPS
|
|
|
|
+ # t = odps_instance.get_table(ODPS_RANK_RESULT_TABLE)
|
|
|
|
+ # part_spec_dict = {'dt': run_dt, 'hour': run_hour, 'ctime': dt_version}
|
|
|
|
+ # part_spec =','.join(['{}={}'.format(k, part_spec_dict[k]) for k in part_spec_dict.keys()])
|
|
|
|
+ # with t.open_writer(partition=part_spec, create_partition=True, overwrite=True) as writer:
|
|
|
|
+ # writer.write(list(final_df.itertuples(index=False)))
|
|
|
|
+
|
|
|
|
+ # sync to MySQL
|
|
|
|
+ data_to_insert = [tuple(row) for row in final_df.itertuples(index=False)]
|
|
|
|
+ data_columns = list(final_df.columns)
|
|
|
|
+ mysql = MysqlHelper(CONFIG.MYSQL_CRAWLER_INFO)
|
|
|
|
+ mysql.batch_insert(RDS_RANK_RESULT_TABLE, data_to_insert, data_columns)
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+def main_loop():
|
|
|
|
+ argparser = ArgumentParser()
|
|
|
|
+ argparser.add_argument('-n', '--dry-run', action='store_true')
|
|
|
|
+ args = argparser.parse_args()
|
|
|
|
+
|
|
|
|
+ try:
|
|
|
|
+ now_date = datetime.today()
|
|
|
|
+ LOGGER.info(f"开始执行: {datetime.strftime(now_date, '%Y-%m-%d %H:%M')}")
|
|
|
|
+ now_hour = now_date.strftime("%H")
|
|
|
|
+
|
|
|
|
+ last_date = now_date - timedelta(1)
|
|
|
|
+ last_dt = last_date.strftime("%Y%m%d")
|
|
|
|
+ # 查看当前天级更新的数据是否已准备好
|
|
|
|
+ # 当前上游统计表为天级更新,但字段设计为兼容小时级
|
|
|
|
+ h_data_count = check_data_partition(ODS_PROJECT, GH_REPLY_STATS_TABLE, last_dt, '00')
|
|
|
|
+ if h_data_count > 0:
|
|
|
|
+ LOGGER.info('上游数据表查询数据条数={},开始计算'.format(h_data_count))
|
|
|
|
+ run_dt = now_date.strftime("%Y%m%d")
|
|
|
|
+ LOGGER.info(f'run_dt: {run_dt}, run_hour: {now_hour}')
|
|
|
|
+ build_and_transfer_data(run_dt, now_hour, ODS_PROJECT,
|
|
|
|
+ dry_run=args.dry_run)
|
|
|
|
+ LOGGER.info('数据更新完成')
|
|
|
|
+ else:
|
|
|
|
+ LOGGER.info("上游数据未就绪,等待60s")
|
|
|
|
+ Timer(60, main_loop).start()
|
|
|
|
+ return
|
|
|
|
+ except Exception as e:
|
|
|
|
+ LOGGER.error(f"数据更新失败, exception: {e}, traceback: {traceback.format_exc()}")
|
|
|
|
+ if CONFIG.ENV_TEXT == '开发环境':
|
|
|
|
+ return
|
|
|
|
+ send_msg_to_feishu(
|
|
|
|
+ webhook=CONFIG.FEISHU_ROBOT['server_robot'].get('webhook'),
|
|
|
|
+ key_word=CONFIG.FEISHU_ROBOT['server_robot'].get('key_word'),
|
|
|
|
+ msg_text=f"rov-offline{CONFIG.ENV_TEXT} - 数据更新失败\n"
|
|
|
|
+ f"exception: {e}\n"
|
|
|
|
+ f"traceback: {traceback.format_exc()}"
|
|
|
|
+ )
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+if __name__ == '__main__':
|
|
|
|
+ LOGGER.info("%s 开始执行" % os.path.basename(__file__))
|
|
|
|
+ LOGGER.info(f"environment: {CONFIG.ENV_TEXT}")
|
|
|
|
+ main_loop()
|