123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578 |
- # -*- coding: utf-8 -*-
- import pandas as pd
- import traceback
- import odps
- from odps import ODPS
- import json
- import time
- from pymysql.cursors import DictCursor
- from datetime import datetime, timedelta
- from db_helper import MysqlHelper
- from my_utils import check_table_partition_exits_v2, get_dataframe_from_odps, \
- get_odps_df_of_max_partition, get_odps_instance, get_odps_df_of_recent_partitions
- from my_utils import request_post, send_msg_to_feishu
- from my_config import set_config
- import numpy as np
- from log import Log
- import os
- from argparse import ArgumentParser
- from constants import AutoReplyAccountType
- from alg_growth_common import check_unsafe_video, filter_unsafe_video, filter_audit_failed_video
- CONFIG, _ = set_config()
- LOGGER = Log()
- BASE_GROUP_NAME = '3rd-party-base'
- EXPLORE1_GROUP_NAME = '3rd-party-explore1'
- EXPLORE2_GROUP_NAME = '3rd-party-explore2'
- # GH_IDS will be updated by get_and_update_gh_ids
- GH_IDS = ('default',)
- account_map = {}
- pd.set_option('display.max_rows', None)
- CDN_IMG_OPERATOR = "?x-oss-process=image/resize,m_fill,w_600,h_480,limit_0/format,jpg/watermark,image_eXNoL3BpYy93YXRlcm1hcmtlci9pY29uX3BsYXlfd2hpdGUucG5nP3gtb3NzLXByb2Nlc3M9aW1hZ2UvcmVzaXplLHdfMTQ0,g_center"
- ODS_PROJECT = "loghubods"
- EXPLORE_POOL_TABLE = 'alg_growth_video_return_stats_history'
- GH_REPLY_STATS_TABLE = 'alg_growth_3rd_gh_reply_video_stats'
- ODPS_RANK_RESULT_TABLE = 'alg_3rd_gh_autoreply_video_rank_data'
- GH_DETAIL = 'gh_detail'
- RDS_RANK_RESULT_TABLE = 'alg_gh_autoreply_video_rank_data'
- STATS_PERIOD_DAYS = 5
- DEFAULT_SEND_N = 1
- MAX_SEND_N = 3
- default_video = {
- '泛生活': [20463342, 14095344, 13737337],
- '泛历史': [13586800, 12794884, 12117356],
- }
- def get_and_update_gh_ids(run_dt):
- db = MysqlHelper(CONFIG.MYSQL_GROWTH_INFO)
- gh_type = AutoReplyAccountType.EXTERNAL_GZH.value
- sqlstr = f"""
- SELECT gh_id, gh_name, category1, category2, channel,
- video_ids, strategy_status,
- autoreply_send_minigram_num as send_n
- FROM {GH_DETAIL}
- WHERE is_delete = 0 AND `type` = {gh_type}
- """
- account_data = db.get_data(sqlstr, DictCursor)
- account_df = pd.DataFrame(account_data)
- # default单独处理
- if 'default' not in account_df['gh_id'].values:
- new_row = pd.DataFrame({'gh_id': ['default'], 'gh_name': ['默认'],
- 'type': [2], 'category1': ['泛生活'], 'send_n': 1},
- index=[0])
- account_df = pd.concat([account_df, new_row], ignore_index=True)
- account_df = account_df.drop_duplicates(subset=['gh_id'])
- global GH_IDS
- GH_IDS = tuple(account_df['gh_id'])
- global account_map
- account_map = { x['gh_id']: x for x in account_df.to_dict(orient='records') }
- for gh_id in account_map:
- account_info = account_map[gh_id]
- account_info['send_n'] = int(account_info.get('send_n', 1))
- return account_df
- def check_data_partition(project, table, data_dt, data_hr=None):
- """检查数据是否准备好"""
- try:
- partition_spec = {'dt': data_dt}
- if data_hr:
- partition_spec['hour'] = data_hr
- part_exist, data_count = check_table_partition_exits_v2(
- project, table, partition_spec)
- except Exception as e:
- data_count = 0
- return data_count
- def get_last_strategy_result(project, rank_table, dt_version, key):
- strategy_df = get_odps_df_of_max_partition(
- project, rank_table, {'ctime': dt_version}
- ).to_pandas()
- dt_version = strategy_df.iloc[0]['dt_version']
- sub_df = strategy_df.query(f'strategy_key == "{key}"')
- sub_df = sub_df[['gh_id', 'video_id', 'strategy_key', 'sort']].drop_duplicates()
- return sub_df, dt_version
- def process_reply_stats(project, table, period, run_dt):
- # 获取多天即转统计数据用于聚合
- df = get_odps_df_of_recent_partitions(project, table, period, {'dt': run_dt})
- df = df.to_pandas()
- df['video_id'] = df['video_id'].astype('int64')
- df = df[['gh_id', 'video_id', 'send_count', 'first_visit_uv', 'day0_return']]
- # 获取统计数据时统一去除不安全视频
- df = filter_unsafe_video(df)
- # 账号内聚合
- df = df.groupby(['video_id', 'gh_id']).agg({
- 'send_count': 'sum',
- 'first_visit_uv': 'sum',
- 'day0_return': 'sum'
- }).reset_index()
- # 聚合所有数据作为default
- default_stats_df = df.groupby('video_id').agg({
- 'send_count': 'sum',
- 'first_visit_uv': 'sum',
- 'day0_return': 'sum'
- }).reset_index()
- default_stats_df['gh_id'] = 'default'
- merged_df = pd.concat([df, default_stats_df]).reset_index(drop=True)
- merged_df['score'] = merged_df['day0_return'] / (merged_df['first_visit_uv'] + 100)
- return merged_df
- def rank_for_layer1(run_dt, run_hour, project, table, gh):
- # TODO: 加审核&退场
- df = get_odps_df_of_max_partition(project, table, {'dt': run_dt})
- df = df.to_pandas()
- df = filter_unsafe_video(df)
- # 确保重跑时可获得一致结果
- dt_version = f'{run_dt}{run_hour}'
- np.random.seed(int(dt_version) + 1)
- # TODO: 修改权重计算策略
- df['score'] = 1.0
- # 按照 category1 分类后进行加权随机抽样
- sampled_df = df.groupby('category1').apply(
- lambda x: x.sample(n=MAX_SEND_N, weights=x['score'], replace=False)).reset_index(drop=True)
- sampled_df['sort'] = sampled_df.groupby('category1')['score'].rank(method='first', ascending=False).astype(int)
- # 按得分排序
- sampled_df = sampled_df.sort_values(by=['category1', 'score'], ascending=[True, False]).reset_index(drop=True)
- sampled_df['strategy_key'] = EXPLORE1_GROUP_NAME
- sampled_df['dt_version'] = dt_version
- merged_dfs = []
- for gh_id in GH_IDS:
- sub_gh_df = gh.query(f'gh_id == "{gh_id}"')
- account_info = account_map[gh_id]
- send_n = account_info['send_n']
- sub_video_df = sampled_df.query(f'sort <= {send_n}').copy()
- merged_df = sub_video_df.merge(sub_gh_df, on='category1')
- merged_df['sort'] = send_n + 1 - merged_df['sort']
- merged_dfs.append(merged_df)
- extend_df = pd.concat(merged_dfs)
- result_df = extend_df[['strategy_key', 'dt_version', 'gh_id', 'sort', 'video_id', 'score']]
- return result_df
- def rank_for_layer2(run_dt, run_hour, project, stats_table, rank_table):
- stats_df = process_reply_stats(project, stats_table, STATS_PERIOD_DAYS, run_dt)
- # 确保重跑时可获得一致结果
- dt_version = f'{run_dt}{run_hour}'
- np.random.seed(int(dt_version) + 1)
- # TODO: 计算账号间相关性
- ## 账号两两组合,取有RoVn数值视频的交集,单个账号内的RoVn(平滑后)组成向量
- ## 求向量相关系数或cosine相似度
- ## 单个视频的RoVn加权求和
- # 当前实现基础版本:只在账号内求二级探索排序分
- sampled_dfs = []
- # 处理default逻辑(default-explore2)
- default_stats_df = stats_df.query('gh_id == "default"')
- sampled_df = default_stats_df.sample(n=DEFAULT_SEND_N, weights=default_stats_df['score'])
- sampled_df['sort'] = range(1, len(sampled_df) + 1)
- sampled_dfs.append(sampled_df)
- # 基础过滤for账号
- df = stats_df.query('send_count > 200 and score > 0')
- df = filter_audit_failed_video(df)
- # fallback to base if necessary
- base_strategy_df, _ = get_last_strategy_result(
- project, rank_table, dt_version, BASE_GROUP_NAME)
- base_strategy_df = filter_audit_failed_video(base_strategy_df)
- for gh_id in GH_IDS:
- if gh_id == 'default':
- continue
- account_info = account_map[gh_id]
- send_n = account_info['send_n']
- sub_df = df.query(f'gh_id == "{gh_id}"')
- if len(sub_df) < send_n:
- LOGGER.warning(
- "gh_id[{}] rows[{}] not enough for layer2, fallback to base"
- .format(gh_id, len(sub_df)))
- sub_df = base_strategy_df.query(f'gh_id == "{gh_id}"').copy()
- if len(sub_df) < send_n:
- LOGGER.warning(
- "gh_id[{}] rows[{}] still not enough for layer2, add backup"
- .format(gh_id, len(sub_df)))
- rows = []
- idx = len(sub_df)
- exist_video_ids = sub_df['video_id'].unique()
- for video_id in default_video[account_info['category1']]:
- if video_id in exist_video_ids:
- continue
- row = {
- 'gh_id': gh_id,
- 'sort': idx + 1,
- 'video_id': video_id,
- 'strategy_key': '' # this is not important
- }
- rows.append(row)
- appx_df = pd.DataFrame(rows)
- sub_df = pd.concat([sub_df, appx_df])
- sub_df['score'] = sub_df['sort']
- sampled_df = sub_df.sample(n=send_n, weights=sub_df['score'])
- sampled_df['sort'] = range(send_n, 0, -1)
- sampled_dfs.append(sampled_df)
- extend_df = pd.concat(sampled_dfs)
- extend_df['strategy_key'] = EXPLORE2_GROUP_NAME
- extend_df['dt_version'] = dt_version
- result_df = extend_df[['strategy_key', 'dt_version', 'gh_id', 'sort', 'video_id', 'score']]
- return result_df
- def rank_for_base(run_dt, run_hour, project, stats_table, rank_table, stg_key):
- stats_df = process_reply_stats(project, stats_table, STATS_PERIOD_DAYS, run_dt)
- # TODO: support to set base manually
- dt_version = f'{run_dt}{run_hour}'
- # 获取当前base信息, 策略表dt_version(ctime partition)采用当前时间
- base_strategy_df, _ = get_last_strategy_result(
- project, rank_table, dt_version, stg_key)
- default_stats_df = stats_df.query('gh_id == "default"')
- # 在账号内排序,决定该账号(包括default)的base利用内容
- # 排序过程中,确保当前base策略参与排序,因此先关联再过滤
- non_default_ids = list(filter(lambda x: x != 'default', GH_IDS))
- gh_ids_str = ','.join(f'"{x}"' for x in non_default_ids)
- stats_df = stats_df.query(f'gh_id in ({gh_ids_str})')
- stats_with_strategy_df = stats_df \
- .merge(
- base_strategy_df,
- on=['gh_id', 'video_id'],
- how='outer') \
- .query('strategy_key.notna() or (send_count > 500 and score > 0.05)') \
- .fillna({'score': 0.0})
- # 合并default和分账号数据
- grouped_stats_df = pd.concat([default_stats_df, stats_with_strategy_df]).reset_index()
- grouped_stats_df = filter_audit_failed_video(grouped_stats_df)
- def set_top_n(group, n=2):
- group_sorted = group.sort_values(by='score', ascending=False)
- top_n = group_sorted.head(n)
- top_n['sort'] = range(1, len(top_n) + 1)
- return top_n
- ranked_df = grouped_stats_df.groupby('gh_id').apply(set_top_n, MAX_SEND_N)
- sampled_dfs = []
- for gh_id in GH_IDS:
- account_info = account_map[gh_id]
- send_n = account_info['send_n']
- sub_df = ranked_df.query(f'gh_id == "{gh_id}" and sort <= {send_n}').copy()
- if len(sub_df) < send_n:
- LOGGER.warning(
- "gh_id[{}] rows[{}] still not enough for base, add backup"
- .format(gh_id, len(sub_df)))
- rows = []
- idx = len(sub_df)
- exist_video_ids = sub_df['video_id'].unique()
- for video_id in default_video[account_info['category1']]:
- if video_id in exist_video_ids:
- continue
- row = {
- 'gh_id': gh_id,
- 'sort': idx + 1,
- 'video_id': video_id,
- 'score': 0.0,
- 'strategy_key': '' # this is not important
- }
- rows.append(row)
- if len(sub_df) + len(rows) >= send_n:
- break
- appx_df = pd.DataFrame(rows)
- sub_df = pd.concat([sub_df, appx_df])
- sub_df['sort'] = send_n + 1 - sub_df['sort']
- sampled_dfs.append(sub_df)
- ranked_df = pd.concat(sampled_dfs)
- ranked_df = ranked_df.reset_index(drop=True)
- ranked_df['strategy_key'] = stg_key
- ranked_df['dt_version'] = dt_version
- ranked_df = ranked_df[['strategy_key', 'dt_version', 'gh_id', 'sort', 'video_id', 'score']]
- return ranked_df
- def check_result_data(df):
- check_unsafe_video(df)
- for gh_id in GH_IDS:
- account_info = account_map[gh_id]
- for key in (EXPLORE1_GROUP_NAME, EXPLORE2_GROUP_NAME, BASE_GROUP_NAME):
- sub_df = df.query(f'gh_id == "{gh_id}" and strategy_key == "{key}"')
- n_records = len(sub_df)
- if n_records != account_info['send_n']:
- raise Exception(f"Unexpected record count: {gh_id},{key},{n_records}")
- def postprocess_override_by_config(df, gh_df, dt_version):
- override_config = gh_df.query('strategy_status == 0').to_dict(orient='records')
- override_data = {
- 'strategy_key': [],
- 'gh_id': [],
- 'sort': [],
- 'video_id': []
- }
- for row in override_config:
- gh_id = row['gh_id']
- account_info = account_map[gh_id]
- try:
- video_ids = json.loads(row['video_ids'])
- if not isinstance(video_ids, list):
- raise Exception("video_ids is not list")
- video_ids = video_ids[:account_info['send_n']]
- except Exception as e:
- LOGGER.error(f"json parse error: {e}. content: {row['video_ids']}")
- continue
- for idx, video_id in enumerate(video_ids):
- for key in (BASE_GROUP_NAME, EXPLORE1_GROUP_NAME, EXPLORE2_GROUP_NAME):
- df = df.drop(df.query(
- f'gh_id == "{gh_id}" and strategy_key == "{key}" and sort == {idx + 1}'
- ).index)
- override_data['strategy_key'].append(key)
- override_data['gh_id'].append(gh_id)
- override_data['sort'].append(idx + 1)
- override_data['video_id'].append(video_id)
- n_records = len(override_data['strategy_key'])
- override_data['dt_version'] = [dt_version] * n_records
- override_data['score'] = [0.0] * n_records
- df_to_append = pd.DataFrame(override_data)
- df = pd.concat([df, df_to_append], ignore_index=True)
- # 强制更换不安全视频
- idx = df[df['video_id'] == 14403867].index
- df.loc[idx, 'video_id'] = 20463342
- return df
- def build_and_transfer_base_mode(gh_df, run_dt, run_hour, dt_version, dry_run):
- layer1_rank = rank_for_layer1(run_dt, run_hour, ODS_PROJECT, EXPLORE_POOL_TABLE, gh_df)
- layer2_rank = rank_for_layer2(run_dt, run_hour, ODS_PROJECT, GH_REPLY_STATS_TABLE, ODPS_RANK_RESULT_TABLE)
- base_rank = rank_for_base(run_dt, run_hour, ODS_PROJECT, GH_REPLY_STATS_TABLE, ODPS_RANK_RESULT_TABLE,BASE_GROUP_NAME)
- final_rank_df = pd.concat([layer1_rank, layer2_rank, base_rank]).reset_index(drop=True)
- final_rank_df = postprocess_override_by_config(final_rank_df, gh_df, dt_version)
- check_result_data(final_rank_df)
- final_df = join_video_info(final_rank_df)
- if dry_run:
- print("==== ALL DATA ====")
- print(final_df[['strategy_key', 'gh_id', 'sort', 'video_id', 'score', 'title']]
- .sort_values(by=['strategy_key', 'gh_id', 'sort']))
- last_odps_df = get_odps_df_of_max_partition(
- ODS_PROJECT, ODPS_RANK_RESULT_TABLE, {'ctime': dt_version}
- ).to_pandas()
- merged_df = last_odps_df.merge(
- final_df, on=['strategy_key', 'gh_id', 'sort'], how='outer')
- delta_df = merged_df.query('title_x != title_y')
- delta_df = delta_df[['strategy_key', 'gh_id', 'sort',
- 'title_x', 'score_x', 'title_y', 'score_y']]
- delta_df.to_csv('tmp_delta_data.csv')
- return
- # save to ODPS
- odps_instance = get_odps_instance(ODS_PROJECT)
- t = odps_instance.get_table(ODPS_RANK_RESULT_TABLE)
- part_spec_dict = {'dt': run_dt, 'hour': run_hour, 'ctime': dt_version}
- part_spec = ','.join(['{}={}'.format(k, part_spec_dict[k]) for k in part_spec_dict.keys()])
- with t.open_writer(partition=part_spec, create_partition=True, overwrite=True) as writer:
- writer.write(list(final_df.itertuples(index=False)))
- # sync to MySQL
- data_to_insert = [tuple(row) for row in final_df.itertuples(index=False)]
- data_columns = list(final_df.columns)
- max_time_to_delete = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
- mysql = MysqlHelper(CONFIG.MYSQL_GROWTH_INFO)
- mysql.batch_insert(RDS_RANK_RESULT_TABLE, data_to_insert, data_columns)
- # remove old data of same version
- for key in final_df['strategy_key'].unique():
- sql = f"""
- update {RDS_RANK_RESULT_TABLE}
- set is_delete = 1
- where
- dt_version = '{dt_version}'
- and strategy_key = '{key}'
- and create_time < '{max_time_to_delete}'
- and is_delete = 0
- """
- rows = mysql.execute(sql)
- def build_and_transfer_delta_mode(account_df, dt_version, dry_run):
- # 获取最新策略信息, 策略表dt_version(ctime partition)采用当前时间
- last_strategy, last_dt_version = get_last_strategy_result(
- ODS_PROJECT, ODPS_RANK_RESULT_TABLE, dt_version, BASE_GROUP_NAME)
- all_accounts = account_df['gh_id'].unique()
- accounts_in_strategy = last_strategy['gh_id'].unique()
- delta_accounts = [x for x in set(all_accounts) - set(accounts_in_strategy)]
- if len(delta_accounts) > 0:
- LOGGER.info('Found {} new accounts: {}'.format(
- len(delta_accounts), ','.join(delta_accounts)))
- else:
- LOGGER.info('Found 0 new account, do nothing.')
- return
- # 新增账号,不存在历史,可直接忽略strategy_status字段
- # TODO: set default by history stats
- groups = (BASE_GROUP_NAME, EXPLORE1_GROUP_NAME, EXPLORE2_GROUP_NAME)
- rows = []
- for gh_id in delta_accounts:
- account_info = account_map[gh_id]
- configured_videos = account_info['video_ids']
- video_ids = default_video[account_info['category1']]
- if configured_videos:
- LOGGER.info(f'{gh_id} has configured video IDs: {configured_videos}')
- try:
- video_ids = [int(x) for x in configured_videos.split(',')]
- except Exception as e:
- print('invalid configured video_ids, use default instead')
- for group_key in groups:
- for idx in range(account_info['send_n']):
- row = {
- 'strategy_key': group_key,
- 'gh_id': gh_id,
- 'sort': idx + 1,
- 'video_id': video_ids[idx],
- 'dt_version': last_dt_version,
- 'score': 0.0
- }
- rows.append(row)
- df = pd.DataFrame(rows)
- final_df = join_video_info(df)
- if dry_run:
- print(final_df)
- return
- # 增量记录更新至MySQL
- data_to_insert = [tuple(row) for row in final_df.itertuples(index=False)]
- data_columns = list(final_df.columns)
- mysql = MysqlHelper(CONFIG.MYSQL_GROWTH_INFO)
- mysql.batch_insert(RDS_RANK_RESULT_TABLE, data_to_insert, data_columns)
- # 全量记录写回ODPS
- last_odps_df = get_odps_df_of_max_partition(
- ODS_PROJECT, ODPS_RANK_RESULT_TABLE, {'ctime': dt_version}
- ).to_pandas()
- updated_odps_df = pd.concat([final_df, last_odps_df], ignore_index=True)
- odps_instance = get_odps_instance(ODS_PROJECT)
- t = odps_instance.get_table(ODPS_RANK_RESULT_TABLE)
- target_dt = last_dt_version[0:8]
- target_hour = last_dt_version[8:10]
- part_spec_dict = {'dt': target_dt, 'hour': target_hour, 'ctime': last_dt_version}
- part_spec = ','.join(['{}={}'.format(k, part_spec_dict[k]) for k in part_spec_dict.keys()])
- with t.open_writer(partition=part_spec, create_partition=True, overwrite=True) as writer:
- writer.write(list(updated_odps_df.itertuples(index=False)))
- def join_video_info(df):
- db = MysqlHelper(CONFIG.MYSQL_INFO)
- video_ids = df['video_id'].unique().tolist()
- video_ids_str = ','.join([str(x) for x in video_ids])
- sql = f"""
- SELECT id as video_id, title, cover_img_path FROM wx_video
- WHERE id in ({video_ids_str})
- """
- rows = db.get_data(sql, DictCursor)
- video_df = pd.DataFrame(rows)
- video_df['cover_url'] = video_df['cover_img_path'] + CDN_IMG_OPERATOR
- final_df = df.merge(video_df, on='video_id')
- # odps_instance = get_odps_instance(ODS_PROJECT)
- # odps_df = odps.DataFrame(df)
- # video_df = get_dataframe_from_odps('videoods', 'wx_video')
- # video_df['cover_url'] = video_df['cover_img_path'] + CDN_IMG_OPERATOR
- # video_df = video_df['id', 'title', 'cover_url']
- # final_df = odps_df.join(video_df, on=('video_id', 'id'))
- # final_df = final_df.to_pandas()
- final_df = final_df[['strategy_key', 'dt_version', 'gh_id', 'sort', 'video_id', 'title', 'cover_url', 'score']]
- return final_df
- def build_and_transfer_data(run_dt, run_hour, **kwargs):
- dt_version = f'{run_dt}{run_hour}'
- dry_run = kwargs.get('dry_run', False)
- mode = kwargs.get('mode')
- gh_df = get_and_update_gh_ids(run_dt)
- if mode == 'delta':
- return build_and_transfer_delta_mode(gh_df, dt_version, dry_run)
- else:
- return build_and_transfer_base_mode(gh_df, run_dt, run_hour, dt_version, dry_run)
- def main():
- LOGGER.info("%s 开始执行" % os.path.basename(__file__))
- LOGGER.info(f"environment: {CONFIG.ENV_TEXT}")
- argparser = ArgumentParser()
- argparser.add_argument('-n', '--dry-run', action='store_true')
- argparser.add_argument('--run-at',help='assume to run at date and hour, yyyyMMddHH')
- argparser.add_argument('--mode', default='base', choices=['base', 'delta'], help='run mode')
- args = argparser.parse_args()
- run_date = datetime.today()
- if args.run_at:
- run_date = datetime.strptime(args.run_at, "%Y%m%d%H")
- LOGGER.info(f"Assume to run at {run_date.strftime('%Y-%m-%d %H:00')}")
- try:
- now_date = datetime.today()
- LOGGER.info(f"开始执行: {datetime.strftime(now_date, '%Y-%m-%d %H:%M')}")
- last_date = run_date - timedelta(1)
- last_dt = last_date.strftime("%Y%m%d")
- # 查看当前天级更新的数据是否已准备好
- # 当前上游统计表为天级更新,但字段设计为兼容小时级
- while True:
- h_data_count = check_data_partition(ODS_PROJECT, GH_REPLY_STATS_TABLE, last_dt, '00')
- if h_data_count > 0:
- LOGGER.info('上游数据表查询数据条数={},开始计算'.format(h_data_count))
- run_dt = run_date.strftime("%Y%m%d")
- run_hour = run_date.strftime("%H")
- LOGGER.info(f'run_dt: {run_dt}, run_hour: {run_hour}')
- build_and_transfer_data(run_dt, run_hour, dry_run=args.dry_run, mode=args.mode)
- LOGGER.info('数据更新完成')
- return
- else:
- LOGGER.info("上游数据未就绪,等待60s")
- time.sleep(60)
- except Exception as e:
- LOGGER.error(f"数据更新失败, exception: {e}, traceback: {traceback.format_exc()}")
- if CONFIG.ENV_TEXT == '开发环境' or args.dry_run:
- return
- send_msg_to_feishu(
- webhook=CONFIG.FEISHU_ROBOT['growth_task_robot'].get('webhook'),
- key_word=CONFIG.FEISHU_ROBOT['growth_task_robot'].get('key_word'),
- msg_text=f"rov-offline{CONFIG.ENV_TEXT} - 数据更新失败\n"
- f"exception: {e}\n"
- f"traceback: {traceback.format_exc()}"
- )
- if __name__ == '__main__':
- main()
|