| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505 | 
							- # -*- coding: utf-8 -*-
 
- import pandas as pd
 
- import traceback
 
- import odps
 
- from odps import ODPS
 
- import json
 
- import time
 
- from pymysql.cursors import DictCursor
 
- from datetime import datetime, timedelta
 
- from db_helper import MysqlHelper
 
- from my_utils import check_table_partition_exits_v2, get_dataframe_from_odps, \
 
-     get_odps_df_of_max_partition, get_odps_instance, get_odps_df_of_recent_partitions
 
- from my_utils import request_post, send_msg_to_feishu
 
- from my_config import set_config
 
- import numpy as np
 
- from log import Log
 
- import os
 
- from argparse import ArgumentParser
 
- from constants import AutoReplyAccountType
 
- from alg_growth_common import check_unsafe_video, filter_unsafe_video
 
- CONFIG, _ = set_config()
 
- LOGGER = Log()
 
- BASE_GROUP_NAME = '3rd-party-base'
 
- EXPLORE1_GROUP_NAME = '3rd-party-explore1'
 
- EXPLORE2_GROUP_NAME = '3rd-party-explore2'
 
- # GH_IDS will be updated by get_and_update_gh_ids
 
- GH_IDS = ('default',)
 
- pd.set_option('display.max_rows', None)
 
- CDN_IMG_OPERATOR = "?x-oss-process=image/resize,m_fill,w_600,h_480,limit_0/format,jpg/watermark,image_eXNoL3BpYy93YXRlcm1hcmtlci9pY29uX3BsYXlfd2hpdGUucG5nP3gtb3NzLXByb2Nlc3M9aW1hZ2UvcmVzaXplLHdfMTQ0,g_center"
 
- ODS_PROJECT = "loghubods"
 
- EXPLORE_POOL_TABLE = 'alg_growth_video_return_stats_history'
 
- GH_REPLY_STATS_TABLE = 'alg_growth_3rd_gh_reply_video_stats'
 
- ODPS_RANK_RESULT_TABLE = 'alg_3rd_gh_autoreply_video_rank_data'
 
- GH_DETAIL = 'gh_detail'
 
- RDS_RANK_RESULT_TABLE = 'alg_gh_autoreply_video_rank_data'
 
- STATS_PERIOD_DAYS = 5
 
- SEND_N = 1
 
- def get_and_update_gh_ids(run_dt):
 
-     db = MysqlHelper(CONFIG.MYSQL_GROWTH_INFO)
 
-     gh_type = AutoReplyAccountType.EXTERNAL_GZH.value
 
-     sqlstr = f"""
 
-         SELECT gh_id, gh_name, category1, category2, channel,
 
-                video_ids, strategy_status
 
-         FROM {GH_DETAIL}
 
-         WHERE is_delete = 0 AND `type` = {gh_type}
 
-         """
 
-     account_data = db.get_data(sqlstr, DictCursor)
 
-     account_df = pd.DataFrame(account_data)
 
-     # default单独处理
 
-     if 'default' not in account_df['gh_id'].values:
 
-         new_row = pd.DataFrame({'gh_id': ['default'], 'gh_name': ['默认'], 'type': [2], 'category1': ['泛生活']},
 
-                                index=[0])
 
-         account_df = pd.concat([account_df, new_row], ignore_index=True)
 
-     account_df = account_df.drop_duplicates(subset=['gh_id'])
 
-     global GH_IDS
 
-     GH_IDS = tuple(account_df['gh_id'])
 
-     return account_df
 
- def check_data_partition(project, table, data_dt, data_hr=None):
 
-     """检查数据是否准备好"""
 
-     try:
 
-         partition_spec = {'dt': data_dt}
 
-         if data_hr:
 
-             partition_spec['hour'] = data_hr
 
-         part_exist, data_count = check_table_partition_exits_v2(
 
-             project, table, partition_spec)
 
-     except Exception as e:
 
-         data_count = 0
 
-     return data_count
 
- def get_last_strategy_result(project, rank_table, dt_version, key):
 
-     strategy_df = get_odps_df_of_max_partition(
 
-         project, rank_table, {'ctime': dt_version}
 
-     ).to_pandas()
 
-     dt_version = strategy_df.iloc[0]['dt_version']
 
-     sub_df = strategy_df.query(f'strategy_key == "{key}"')
 
-     sub_df = sub_df[['gh_id', 'video_id', 'strategy_key', 'sort']].drop_duplicates()
 
-     return sub_df, dt_version
 
- def process_reply_stats(project, table, period, run_dt):
 
-     # 获取多天即转统计数据用于聚合
 
-     df = get_odps_df_of_recent_partitions(project, table, period, {'dt': run_dt})
 
-     df = df.to_pandas()
 
-     df['video_id'] = df['video_id'].astype('int64')
 
-     df = df[['gh_id', 'video_id', 'send_count', 'first_visit_uv', 'day0_return']]
 
-     # 获取统计数据时统一去除不安全视频
 
-     df = filter_unsafe_video(df)
 
-     # 账号内聚合
 
-     df = df.groupby(['video_id', 'gh_id']).agg({
 
-         'send_count': 'sum',
 
-         'first_visit_uv': 'sum',
 
-         'day0_return': 'sum'
 
-     }).reset_index()
 
-     # 聚合所有数据作为default
 
-     default_stats_df = df.groupby('video_id').agg({
 
-         'send_count': 'sum',
 
-         'first_visit_uv': 'sum',
 
-         'day0_return': 'sum'
 
-     }).reset_index()
 
-     default_stats_df['gh_id'] = 'default'
 
-     merged_df = pd.concat([df, default_stats_df]).reset_index(drop=True)
 
-     merged_df['score'] = merged_df['day0_return'] / (merged_df['first_visit_uv'] + 100)
 
-     return merged_df
 
- def rank_for_layer1(run_dt, run_hour, project, table, gh):
 
-     # TODO: 加审核&退场
 
-     df = get_odps_df_of_max_partition(project, table, {'dt': run_dt})
 
-     df = df.to_pandas()
 
-     df = filter_unsafe_video(df)
 
-     # 确保重跑时可获得一致结果
 
-     dt_version = f'{run_dt}{run_hour}'
 
-     np.random.seed(int(dt_version) + 1)
 
-     # TODO: 修改权重计算策略
 
-     df['score'] = 1.0
 
-     # 按照 category1 分类后进行加权随机抽样
 
-     sampled_df = df.groupby('category1').apply(
 
-         lambda x: x.sample(n=SEND_N, weights=x['score'], replace=False)).reset_index(drop=True)
 
-     sampled_df['sort'] = sampled_df.groupby('category1')['score'].rank(method='first', ascending=False).astype(int)
 
-     # 按得分排序
 
-     sampled_df = sampled_df.sort_values(by=['category1', 'score'], ascending=[True, False]).reset_index(drop=True)
 
-     sampled_df['strategy_key'] = EXPLORE1_GROUP_NAME
 
-     sampled_df['dt_version'] = dt_version
 
-     extend_df = sampled_df.merge(gh, on='category1')
 
-     result_df = extend_df[['strategy_key', 'dt_version', 'gh_id', 'sort', 'video_id', 'score']]
 
-     return result_df
 
- def rank_for_layer2(run_dt, run_hour, project, stats_table, rank_table):
 
-     stats_df = process_reply_stats(project, stats_table, STATS_PERIOD_DAYS, run_dt)
 
-     # 确保重跑时可获得一致结果
 
-     dt_version = f'{run_dt}{run_hour}'
 
-     np.random.seed(int(dt_version) + 1)
 
-     # TODO: 计算账号间相关性
 
-     ## 账号两两组合,取有RoVn数值视频的交集,单个账号内的RoVn(平滑后)组成向量
 
-     ## 求向量相关系数或cosine相似度
 
-     ## 单个视频的RoVn加权求和
 
-     # 当前实现基础版本:只在账号内求二级探索排序分
 
-     sampled_dfs = []
 
-     # 处理default逻辑(default-explore2)
 
-     default_stats_df = stats_df.query('gh_id == "default"')
 
-     sampled_df = default_stats_df.sample(n=SEND_N, weights=default_stats_df['score'])
 
-     sampled_df['sort'] = range(1, len(sampled_df) + 1)
 
-     sampled_dfs.append(sampled_df)
 
-     # 基础过滤for账号
 
-     df = stats_df.query('send_count > 200 and score > 0')
 
-     # fallback to base if necessary
 
-     base_strategy_df, _ = get_last_strategy_result(
 
-         project, rank_table, dt_version, BASE_GROUP_NAME)
 
-     for gh_id in GH_IDS:
 
-         if gh_id == 'default':
 
-             continue
 
-         sub_df = df.query(f'gh_id == "{gh_id}"')
 
-         if len(sub_df) < SEND_N:
 
-             LOGGER.warning(
 
-                 "gh_id[{}] rows[{}] not enough for layer2, fallback to base"
 
-                 .format(gh_id, len(sub_df)))
 
-             sub_df = base_strategy_df.query(f'gh_id == "{gh_id}"').copy()
 
-             sub_df['score'] = sub_df['sort']
 
-         sampled_df = sub_df.sample(n=SEND_N, weights=sub_df['score'])
 
-         sampled_df['sort'] = range(1, len(sampled_df) + 1)
 
-         sampled_dfs.append(sampled_df)
 
-     extend_df = pd.concat(sampled_dfs)
 
-     extend_df['strategy_key'] = EXPLORE2_GROUP_NAME
 
-     extend_df['dt_version'] = dt_version
 
-     result_df = extend_df[['strategy_key', 'dt_version', 'gh_id', 'sort', 'video_id', 'score']]
 
-     return result_df
 
- def rank_for_base(run_dt, run_hour, project, stats_table, rank_table, stg_key):
 
-     stats_df = process_reply_stats(project, stats_table, STATS_PERIOD_DAYS, run_dt)
 
-     # TODO: support to set base manually
 
-     dt_version = f'{run_dt}{run_hour}'
 
-     # 获取当前base信息, 策略表dt_version(ctime partition)采用当前时间
 
-     base_strategy_df, _ = get_last_strategy_result(
 
-         project, rank_table, dt_version, stg_key)
 
-     default_stats_df = stats_df.query('gh_id == "default"')
 
-     # 在账号内排序,决定该账号(包括default)的base利用内容
 
-     # 排序过程中,确保当前base策略参与排序,因此先关联再过滤
 
-     non_default_ids = list(filter(lambda x: x != 'default', GH_IDS))
 
-     gh_ids_str = ','.join(f'"{x}"' for x in non_default_ids)
 
-     stats_df = stats_df.query(f'gh_id in ({gh_ids_str})')
 
-     stats_with_strategy_df = stats_df \
 
-         .merge(
 
-             base_strategy_df,
 
-             on=['gh_id', 'video_id'],
 
-             how='outer') \
 
-         .query('strategy_key.notna() or (send_count > 500 and score > 0.05)') \
 
-         .fillna({'score': 0.0})
 
-     # 合并default和分账号数据
 
-     grouped_stats_df = pd.concat([default_stats_df, stats_with_strategy_df]).reset_index()
 
-     def set_top_n(group, n=2):
 
-         group_sorted = group.sort_values(by='score', ascending=False)
 
-         top_n = group_sorted.head(n)
 
-         top_n['sort'] = range(1, len(top_n) + 1)
 
-         return top_n
 
-     ranked_df = grouped_stats_df.groupby('gh_id').apply(set_top_n, SEND_N)
 
-     ranked_df = ranked_df.reset_index(drop=True)
 
-     ranked_df['strategy_key'] = stg_key
 
-     ranked_df['dt_version'] = dt_version
 
-     ranked_df = ranked_df[['strategy_key', 'dt_version', 'gh_id', 'sort', 'video_id', 'score']]
 
-     return ranked_df
 
- def check_result_data(df):
 
-     check_unsafe_video(df)
 
-     for gh_id in GH_IDS:
 
-         for key in (EXPLORE1_GROUP_NAME, EXPLORE2_GROUP_NAME, BASE_GROUP_NAME):
 
-             sub_df = df.query(f'gh_id == "{gh_id}" and strategy_key == "{key}"')
 
-             n_records = len(sub_df)
 
-             if n_records != SEND_N:
 
-                 raise Exception(f"Unexpected record count: {gh_id},{key},{n_records}")
 
- def postprocess_override_by_config(df, gh_df, dt_version):
 
-     override_config = gh_df.query('strategy_status == 0').to_dict(orient='records')
 
-     override_data = {
 
-         'strategy_key': [],
 
-         'gh_id': [],
 
-         'sort': [],
 
-         'video_id': []
 
-     }
 
-     for row in override_config:
 
-         gh_id = row['gh_id']
 
-         try:
 
-             video_ids = json.loads(row['video_ids'])
 
-             if not isinstance(video_ids, list):
 
-                 raise Exception("video_ids is not list")
 
-             video_ids = video_ids[:SEND_N]
 
-         except Exception as e:
 
-             LOGGER.error(f"json parse error: {e}. content: {row['video_ids']}")
 
-             continue
 
-         for idx, video_id in enumerate(video_ids):
 
-             for key in (BASE_GROUP_NAME, EXPLORE1_GROUP_NAME, EXPLORE2_GROUP_NAME):
 
-                 df = df.drop(df.query(
 
-                     f'gh_id == "{gh_id}" and strategy_key == "{key}" and sort == {idx + 1}'
 
-                 ).index)
 
-                 override_data['strategy_key'].append(key)
 
-                 override_data['gh_id'].append(gh_id)
 
-                 override_data['sort'].append(idx + 1)
 
-                 override_data['video_id'].append(video_id)
 
-     n_records = len(override_data['strategy_key'])
 
-     override_data['dt_version'] = [dt_version] * n_records
 
-     override_data['score'] = [0.0] * n_records
 
-     df_to_append = pd.DataFrame(override_data)
 
-     df = pd.concat([df, df_to_append], ignore_index=True)
 
-     # 强制更换不安全视频
 
-     idx = df[df['video_id'] == 14403867].index
 
-     df.loc[idx, 'video_id'] = 20463342
 
-     return df
 
- def build_and_transfer_base_mode(gh_df, run_dt, run_hour, dt_version, dry_run):
 
-     layer1_rank = rank_for_layer1(run_dt, run_hour, ODS_PROJECT, EXPLORE_POOL_TABLE, gh_df)
 
-     layer2_rank = rank_for_layer2(run_dt, run_hour, ODS_PROJECT, GH_REPLY_STATS_TABLE, ODPS_RANK_RESULT_TABLE)
 
-     base_rank = rank_for_base(run_dt, run_hour, ODS_PROJECT, GH_REPLY_STATS_TABLE, ODPS_RANK_RESULT_TABLE,BASE_GROUP_NAME)
 
-     final_rank_df = pd.concat([layer1_rank, layer2_rank, base_rank]).reset_index(drop=True)
 
-     final_rank_df = postprocess_override_by_config(final_rank_df, gh_df, dt_version)
 
-     check_result_data(final_rank_df)
 
-     final_df = join_video_info(final_rank_df)
 
-     # reverse sending order
 
-     final_df['sort'] = SEND_N + 1 - final_df['sort']
 
-     if dry_run:
 
-         print("==== ALL DATA ====")
 
-         print(final_df[['strategy_key', 'gh_id', 'sort', 'video_id', 'score', 'title']]
 
-               .sort_values(by=['strategy_key', 'gh_id', 'sort']))
 
-         last_odps_df = get_odps_df_of_max_partition(
 
-             ODS_PROJECT, ODPS_RANK_RESULT_TABLE, {'ctime': dt_version}
 
-         ).to_pandas()
 
-         merged_df = last_odps_df.merge(
 
-             final_df, on=['strategy_key', 'gh_id', 'sort'], how='outer')
 
-         delta_df = merged_df.query('title_x != title_y')
 
-         delta_df = delta_df[['strategy_key', 'gh_id', 'sort',
 
-                              'title_x', 'score_x', 'title_y', 'score_y']]
 
-         delta_df.to_csv('tmp_delta_data.csv')
 
-         return
 
-     # save to ODPS
 
-     odps_instance = get_odps_instance(ODS_PROJECT)
 
-     t = odps_instance.get_table(ODPS_RANK_RESULT_TABLE)
 
-     part_spec_dict = {'dt': run_dt, 'hour': run_hour, 'ctime': dt_version}
 
-     part_spec = ','.join(['{}={}'.format(k, part_spec_dict[k]) for k in part_spec_dict.keys()])
 
-     with t.open_writer(partition=part_spec, create_partition=True, overwrite=True) as writer:
 
-         writer.write(list(final_df.itertuples(index=False)))
 
-     # sync to MySQL
 
-     data_to_insert = [tuple(row) for row in final_df.itertuples(index=False)]
 
-     data_columns = list(final_df.columns)
 
-     max_time_to_delete = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
 
-     mysql = MysqlHelper(CONFIG.MYSQL_GROWTH_INFO)
 
-     mysql.batch_insert(RDS_RANK_RESULT_TABLE, data_to_insert, data_columns)
 
-     # remove old data of same version
 
-     for key in final_df['strategy_key'].unique():
 
-         sql = f"""
 
-             update {RDS_RANK_RESULT_TABLE}
 
-             set is_delete = 1
 
-             where
 
-                 dt_version = '{dt_version}'
 
-                 and strategy_key = '{key}'
 
-                 and create_time < '{max_time_to_delete}'
 
-                 and is_delete = 0
 
-         """
 
-         rows = mysql.execute(sql)
 
- def build_and_transfer_delta_mode(account_df, dt_version, dry_run):
 
-     # 获取最新策略信息, 策略表dt_version(ctime partition)采用当前时间
 
-     last_strategy, last_dt_version = get_last_strategy_result(
 
-         ODS_PROJECT, ODPS_RANK_RESULT_TABLE, dt_version, BASE_GROUP_NAME)
 
-     account_map = { x['gh_id']: x for x in account_df.to_dict(orient='records') }
 
-     all_accounts = account_df['gh_id'].unique()
 
-     accounts_in_strategy = last_strategy['gh_id'].unique()
 
-     delta_accounts = [x for x in set(all_accounts) - set(accounts_in_strategy)]
 
-     if len(delta_accounts) > 0:
 
-         LOGGER.info('Found {} new accounts: {}'.format(
 
-             len(delta_accounts), ','.join(delta_accounts)))
 
-     else:
 
-         LOGGER.info('Found 0 new account, do nothing.')
 
-         return
 
-     default_video = {
 
-         '泛生活': [20463342],
 
-         '泛历史': [13586800],
 
-     }
 
-     # 新增账号,不存在历史,可直接忽略strategy_status字段
 
-     # TODO: set default by history stats
 
-     groups = (BASE_GROUP_NAME, EXPLORE1_GROUP_NAME, EXPLORE2_GROUP_NAME)
 
-     rows = []
 
-     for gh_id in delta_accounts:
 
-         account_info = account_map[gh_id]
 
-         configured_videos = account_info['video_ids']
 
-         if configured_videos:
 
-             LOGGER.info(f'{gh_id} has configured video IDs: {configured_videos}')
 
-             video_ids = [int(x) for x in configured_videos.split(',')]
 
-         else:
 
-             video_ids = default_video[account_info['category1']]
 
-         for group_key in groups:
 
-             for idx in range(SEND_N):
 
-                 row = {
 
-                     'strategy_key': group_key,
 
-                     'gh_id': gh_id,
 
-                     'sort': idx + 1,
 
-                     'video_id': video_ids[idx],
 
-                     'dt_version': last_dt_version,
 
-                     'score': 0.0
 
-                 }
 
-             rows.append(row)
 
-     df = pd.DataFrame(rows)
 
-     final_df = join_video_info(df)
 
-     if dry_run:
 
-         print(final_df)
 
-         return
 
-     # 增量记录更新至MySQL
 
-     data_to_insert = [tuple(row) for row in final_df.itertuples(index=False)]
 
-     data_columns = list(final_df.columns)
 
-     mysql = MysqlHelper(CONFIG.MYSQL_GROWTH_INFO)
 
-     mysql.batch_insert(RDS_RANK_RESULT_TABLE, data_to_insert, data_columns)
 
-     # 全量记录写回ODPS
 
-     last_odps_df = get_odps_df_of_max_partition(
 
-         ODS_PROJECT, ODPS_RANK_RESULT_TABLE, {'ctime': dt_version}
 
-     ).to_pandas()
 
-     updated_odps_df = pd.concat([final_df, last_odps_df], ignore_index=True)
 
-     odps_instance = get_odps_instance(ODS_PROJECT)
 
-     t = odps_instance.get_table(ODPS_RANK_RESULT_TABLE)
 
-     target_dt = last_dt_version[0:8]
 
-     target_hour = last_dt_version[8:10]
 
-     part_spec_dict = {'dt': target_dt, 'hour': target_hour, 'ctime': last_dt_version}
 
-     part_spec = ','.join(['{}={}'.format(k, part_spec_dict[k]) for k in part_spec_dict.keys()])
 
-     with t.open_writer(partition=part_spec, create_partition=True, overwrite=True) as writer:
 
-         writer.write(list(updated_odps_df.itertuples(index=False)))
 
- def join_video_info(df):
 
-     db = MysqlHelper(CONFIG.MYSQL_INFO)
 
-     video_ids = df['video_id'].unique().tolist()
 
-     video_ids_str = ','.join([str(x) for x in video_ids])
 
-     sql = f"""
 
-         SELECT id as video_id, title, cover_img_path FROM wx_video
 
-         WHERE id in ({video_ids_str})
 
-     """
 
-     rows = db.get_data(sql, DictCursor)
 
-     video_df = pd.DataFrame(rows)
 
-     video_df['cover_url'] = video_df['cover_img_path'] + CDN_IMG_OPERATOR
 
-     final_df = df.merge(video_df, on='video_id')
 
-     # odps_instance = get_odps_instance(ODS_PROJECT)
 
-     # odps_df = odps.DataFrame(df)
 
-     # video_df = get_dataframe_from_odps('videoods', 'wx_video')
 
-     # video_df['cover_url'] = video_df['cover_img_path'] + CDN_IMG_OPERATOR
 
-     # video_df = video_df['id', 'title', 'cover_url']
 
-     # final_df = odps_df.join(video_df, on=('video_id', 'id'))
 
-     # final_df = final_df.to_pandas()
 
-     final_df = final_df[['strategy_key', 'dt_version', 'gh_id', 'sort', 'video_id', 'title', 'cover_url', 'score']]
 
-     return final_df
 
- def build_and_transfer_data(run_dt, run_hour, **kwargs):
 
-     dt_version = f'{run_dt}{run_hour}'
 
-     dry_run = kwargs.get('dry_run', False)
 
-     mode = kwargs.get('mode')
 
-     gh_df = get_and_update_gh_ids(run_dt)
 
-     if mode == 'delta':
 
-         return build_and_transfer_delta_mode(gh_df, dt_version, dry_run)
 
-     else:
 
-         return build_and_transfer_base_mode(gh_df, run_dt, run_hour, dt_version, dry_run)
 
- def main():
 
-     LOGGER.info("%s 开始执行" % os.path.basename(__file__))
 
-     LOGGER.info(f"environment: {CONFIG.ENV_TEXT}")
 
-     argparser = ArgumentParser()
 
-     argparser.add_argument('-n', '--dry-run', action='store_true')
 
-     argparser.add_argument('--run-at',help='assume to run at date and hour, yyyyMMddHH')
 
-     argparser.add_argument('--mode', default='base', choices=['base', 'delta'], help='run mode')
 
-     args = argparser.parse_args()
 
-     run_date = datetime.today()
 
-     if args.run_at:
 
-         run_date = datetime.strptime(args.run_at, "%Y%m%d%H")
 
-         LOGGER.info(f"Assume to run at {run_date.strftime('%Y-%m-%d %H:00')}")
 
-     try:
 
-         now_date = datetime.today()
 
-         LOGGER.info(f"开始执行: {datetime.strftime(now_date, '%Y-%m-%d %H:%M')}")
 
-         last_date = run_date - timedelta(1)
 
-         last_dt = last_date.strftime("%Y%m%d")
 
-         # 查看当前天级更新的数据是否已准备好
 
-         # 当前上游统计表为天级更新,但字段设计为兼容小时级
 
-         while True:
 
-             h_data_count = check_data_partition(ODS_PROJECT, GH_REPLY_STATS_TABLE, last_dt, '00')
 
-             if h_data_count > 0:
 
-                 LOGGER.info('上游数据表查询数据条数={},开始计算'.format(h_data_count))
 
-                 run_dt = run_date.strftime("%Y%m%d")
 
-                 run_hour = run_date.strftime("%H")
 
-                 LOGGER.info(f'run_dt: {run_dt}, run_hour: {run_hour}')
 
-                 build_and_transfer_data(run_dt, run_hour, dry_run=args.dry_run, mode=args.mode)
 
-                 LOGGER.info('数据更新完成')
 
-                 return
 
-             else:
 
-                 LOGGER.info("上游数据未就绪,等待60s")
 
-                 time.sleep(60)
 
-     except Exception as e:
 
-         LOGGER.error(f"数据更新失败, exception: {e}, traceback: {traceback.format_exc()}")
 
-         if CONFIG.ENV_TEXT == '开发环境' or args.dry_run:
 
-             return
 
-         send_msg_to_feishu(
 
-             webhook=CONFIG.FEISHU_ROBOT['growth_task_robot'].get('webhook'),
 
-             key_word=CONFIG.FEISHU_ROBOT['growth_task_robot'].get('key_word'),
 
-             msg_text=f"rov-offline{CONFIG.ENV_TEXT} - 数据更新失败\n"
 
-                      f"exception: {e}\n"
 
-                      f"traceback: {traceback.format_exc()}"
 
-         )
 
- if __name__ == '__main__':
 
-     main()
 
 
  |