123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414 |
- import pandas as pd
- import traceback
- import odps
- from odps import ODPS
- from threading import Timer
- from datetime import datetime, timedelta
- from db_helper import MysqlHelper
- from my_utils import check_table_partition_exits_v2, get_dataframe_from_odps, \
- get_odps_df_of_max_partition, get_odps_instance, get_odps_df_of_recent_partitions
- from my_utils import request_post, send_msg_to_feishu
- from my_config import set_config
- import numpy as np
- from log import Log
- import os
- from argparse import ArgumentParser
- CONFIG, _ = set_config()
- LOGGER = Log()
- BASE_GROUP_NAME = '3rd-party-base'
- EXPLORE1_GROUP_NAME = '3rd-party-explore1'
- EXPLORE2_GROUP_NAME = '3rd-party-explore2'
- GH_IDS = (
- 'gh_01c93b07605f',
- 'gh_041d1c819c30',
- 'gh_04804a94e325',
- 'gh_059a27ea86b2',
- 'gh_06699758fa4b',
- 'gh_0cc7c6d712eb',
- 'gh_0d3c97cc30cc',
- 'gh_11debb2c8392',
- 'gh_126c99b39cea',
- 'gh_133c36b99b14',
- 'gh_143743361496',
- 'gh_148aa4a776ce',
- 'gh_184f2d765f55',
- 'gh_197a0d8caa31',
- 'gh_1b243f162cbd',
- 'gh_1b8bfb5c4ffd',
- 'gh_1c11651e0df4',
- 'gh_1cbb453a800e',
- 'gh_1da8f62f4a0d',
- 'gh_1ff0c29f8408',
- 'gh_210f7ce6f418',
- 'gh_22cad14dc4ec',
- 'gh_22f53f6a2b5d',
- 'gh_23e084923def',
- 'gh_23fa67ebb016',
- 'gh_2450ad774945',
- 'gh_250c51d5ce69',
- 'gh_261bbd99a906',
- 'gh_26f1353fda5a',
- 'gh_2863155cedcb',
- 'gh_2fbff340c683',
- 'gh_30b472377707',
- 'gh_330ef0db846d',
- 'gh_33731afddbdb',
- 'gh_33c26df4eab0',
- 'gh_33e9e4cbed84',
- 'gh_34820675d0fc',
- 'gh_354ab82cf9b3',
- 'gh_36e74017026e',
- 'gh_37adcd637351',
- 'gh_3afc3a8b8a3d',
- 'gh_3dce33de6994',
- 'gh_3ee85ba7c3ae',
- 'gh_40ff43c50773',
- 'gh_4175a8f745f6',
- 'gh_449fb0c2d817',
- 'gh_45be25d1c06b',
- 'gh_491189a534f2',
- 'gh_495d71abbda2',
- 'gh_4c38b9d4474a',
- 'gh_4d2f75c3c3fe',
- 'gh_5044de6e1597',
- 'gh_53759f90b0c5',
- 'gh_543e6d7e15f3',
- 'gh_5522900b6a67',
- 'gh_55ac4e447179',
- 'gh_56b65b7d4520',
- 'gh_570967881eae',
- 'gh_57bc9846c86a',
- 'gh_57d2388bd01d',
- 'gh_58cdb2f1f0d0',
- 'gh_5e0cd3f7b457',
- 'gh_5f0bb5822e10',
- 'gh_5fffe35cc12a',
- 'gh_63745bad4f21',
- 'gh_6454c103be14',
- 'gh_69808935bba0',
- 'gh_6c7f73de400b',
- 'gh_7a14b4c15090',
- 'gh_7c89d5a3e745',
- 'gh_7e33fbba4398',
- 'gh_7e77b09bb4f5',
- 'gh_7f062810b4e7',
- 'gh_8157d8fd284e',
- 'gh_83adb78f4ede',
- 'gh_859aafbcda3d',
- 'gh_86ca35774fcf',
- 'gh_87eca527c626',
- 'gh_8a29eebc2012',
- 'gh_8b5c838ac19a',
- 'gh_8b69b67ea723',
- 'gh_8bf689ae15cc',
- 'gh_8cc8ae6eb9a5',
- 'gh_8d68e68f2d08',
- 'gh_930b5ef5a185',
- 'gh_93af434e3f47',
- 'gh_98624814f69a',
- 'gh_9d94775e8137',
- 'gh_9e0c7a370aaf',
- 'gh_9e0d149e2c0a',
- 'gh_9ee5f5e8425f',
- 'gh_a09594d52fda',
- 'gh_a36405f4e5d3',
- 'gh_a8851bfa953b',
- 'gh_ac4072121e24',
- 'gh_aed71f26e7e6',
- 'gh_b63b9dde3f4b',
- 'gh_b7cdece20099',
- 'gh_ba870f8b178b',
- 'gh_bb6775b47656',
- 'gh_bf79e3645d7a',
- 'gh_c1acd6bac0f8',
- 'gh_c4708b8cfe39',
- 'gh_c603033bf881',
- 'gh_c8060587e6d1',
- 'gh_cedc3c4eb48b',
- 'gh_d219a0cc8a35',
- 'gh_d2bb5f1b9498',
- 'gh_d32daba8ccf8',
- 'gh_d6e75ad9094f',
- 'gh_dd54e30b03ad',
- 'gh_e0ca8ba4ed91',
- 'gh_e2318164f869',
- 'gh_e4fb77b1023b',
- 'gh_ecef1c08bcf4',
- 'gh_ee5b4b07ed8b',
- 'gh_ef699270bf64',
- 'gh_f2a6c90c56cb',
- 'gh_f46c6c9b53fa',
- 'gh_f5120c12ee23',
- 'gh_fb77872bf907',
- 'gh_fc4ec610756e',
- 'gh_ff16c412ab97',
- 'gh_ff9fe99f2097',
- )
- CDN_IMG_OPERATOR = "?x-oss-process=image/resize,m_fill,w_600,h_480,limit_0/format,jpg/watermark,image_eXNoL3BpYy93YXRlcm1hcmtlci9pY29uX3BsYXlfd2hpdGUucG5nP3gtb3NzLXByb2Nlc3M9aW1hZ2UvcmVzaXplLHdfMTQ0,g_center"
- ODS_PROJECT = "loghubods"
- EXPLORE_POOL_TABLE = 'alg_growth_video_return_stats_history'
- GH_REPLY_STATS_TABLE = 'alg_growth_3rd_gh_reply_video_stats'
- ODPS_3RD_RANK_RESULT_TABLE = 'alg_3rd_gh_autoreply_video_rank_data'
- RDS_RANK_RESULT_TABLE = 'alg_gh_autoreply_video_rank_data'
- STATS_PERIOD_DAYS = 5
- SEND_N = 1
- def check_data_partition(project, table, data_dt, data_hr=None):
- """检查数据是否准备好"""
- try:
- partition_spec = {'dt': data_dt}
- if data_hr:
- partition_spec['hour'] = data_hr
- part_exist, data_count = check_table_partition_exits_v2(
- project, table, partition_spec)
- except Exception as e:
- data_count = 0
- return data_count
- def get_last_strategy_result(project, rank_table, dt_version, key):
- strategy_df = get_odps_df_of_max_partition(
- project, rank_table, { 'ctime': dt_version }
- ).to_pandas()
- sub_df = strategy_df.query(f'strategy_key == "{key}"')
- sub_df = sub_df[['gh_id', 'video_id', 'strategy_key', 'sort']].drop_duplicates()
- return sub_df
- def process_reply_stats(project, table, period, run_dt):
-
- df = get_odps_df_of_recent_partitions(project, table, period, {'dt': run_dt})
- df = df.to_pandas()
- df['video_id'] = df['video_id'].astype('int64')
- df = df[['gh_id', 'video_id', 'send_count', 'first_visit_uv', 'day0_return']]
-
- df = df.groupby(['video_id', 'gh_id']).agg({
- 'send_count': 'sum',
- 'first_visit_uv': 'sum',
- 'day0_return': 'sum'
- }).reset_index()
-
- default_stats_df = df.groupby('video_id').agg({
- 'send_count': 'sum',
- 'first_visit_uv': 'sum',
- 'day0_return': 'sum'
- }).reset_index()
- default_stats_df['gh_id'] = 'default'
- merged_df = pd.concat([df, default_stats_df]).reset_index(drop=True)
- merged_df['score'] = merged_df['day0_return'] / (merged_df['send_count'] + 500)
- return merged_df
- def rank_for_layer1(run_dt, run_hour, project, table):
-
- df = get_odps_df_of_max_partition(project, table, {'dt': run_dt})
- df = df.to_pandas()
-
- dt_version = f'{run_dt}{run_hour}'
- np.random.seed(int(dt_version)+1)
-
- df['score'] = df['rov']
- sampled_df = df.sample(n=SEND_N, weights=df['score'])
- sampled_df['sort'] = range(1, len(sampled_df) + 1)
- sampled_df['strategy_key'] = EXPLORE1_GROUP_NAME
- sampled_df['dt_version'] = dt_version
- gh_name_df = pd.DataFrame({'gh_id': GH_IDS + ('default', )})
- sampled_df['_tmpkey'] = 1
- gh_name_df['_tmpkey'] = 1
- extend_df = sampled_df.merge(gh_name_df, on='_tmpkey').drop('_tmpkey', axis=1)
- result_df = extend_df[['strategy_key', 'dt_version', 'gh_id', 'sort', 'video_id', 'score']]
- return result_df
- def rank_for_layer2(run_dt, run_hour, project, stats_table, rank_table):
- stats_df = process_reply_stats(project, stats_table, STATS_PERIOD_DAYS, run_dt)
-
- dt_version = f'{run_dt}{run_hour}'
- np.random.seed(int(dt_version)+1)
-
-
-
-
-
- sampled_dfs = []
-
- default_stats_df = stats_df.query('gh_id == "default"')
- sampled_df = default_stats_df.sample(n=SEND_N, weights=default_stats_df['score'])
- sampled_df['sort'] = range(1, len(sampled_df) + 1)
- sampled_dfs.append(sampled_df)
-
- df = stats_df.query('day0_return > 100')
-
- base_strategy_df = get_last_strategy_result(
- project, rank_table, dt_version, BASE_GROUP_NAME)
- for gh_id in GH_IDS:
- sub_df = df.query(f'gh_id == "{gh_id}"')
- if len(sub_df) < SEND_N:
- LOGGER.warning(
- "gh_id[{}] rows[{}] not enough for layer2, fallback to base"
- .format(gh_id, len(sub_df)))
- sub_df = base_strategy_df.query(f'gh_id == "{gh_id}"')
- sub_df['score'] = sub_df['sort']
- sampled_df = sub_df.sample(n=SEND_N, weights=sub_df['score'])
- sampled_df['sort'] = range(1, len(sampled_df) + 1)
- sampled_dfs.append(sampled_df)
- extend_df = pd.concat(sampled_dfs)
- extend_df['strategy_key'] = EXPLORE2_GROUP_NAME
- extend_df['dt_version'] = dt_version
- result_df = extend_df[['strategy_key', 'dt_version', 'gh_id', 'sort', 'video_id', 'score']]
- return result_df
- def rank_for_base(run_dt, run_hour, project, stats_table, rank_table, stg_key):
- stats_df = process_reply_stats(project, stats_table, STATS_PERIOD_DAYS, run_dt)
-
- dt_version = f'{run_dt}{run_hour}'
-
- base_strategy_df = get_last_strategy_result(
- project, rank_table, dt_version, stg_key)
- default_stats_df = stats_df.query('gh_id == "default"')
-
-
- gh_ids_str = ','.join(f'"{x}"' for x in GH_IDS)
- stats_df = stats_df.query(f'gh_id in ({gh_ids_str})')
- stats_with_strategy_df = stats_df \
- .merge(
- base_strategy_df,
- on=['gh_id', 'video_id'],
- how='left') \
- .query('strategy_key.notna() or score > 0.1')
-
- grouped_stats_df = pd.concat([default_stats_df, stats_with_strategy_df]).reset_index()
- def set_top_n(group, n=2):
- group_sorted = group.sort_values(by='score', ascending=False)
- top_n = group_sorted.head(n)
- top_n['sort'] = range(1, len(top_n) + 1)
- return top_n
- ranked_df = grouped_stats_df.groupby('gh_id').apply(set_top_n, SEND_N)
- ranked_df = ranked_df.reset_index(drop=True)
-
- ranked_df['strategy_key'] = stg_key
- ranked_df['dt_version'] = dt_version
- ranked_df = ranked_df[['strategy_key', 'dt_version', 'gh_id', 'sort', 'video_id', 'score']]
- return ranked_df
- def check_result_data(df):
- for gh_id in GH_IDS + ('default', ):
- for key in (EXPLORE1_GROUP_NAME, EXPLORE2_GROUP_NAME, BASE_GROUP_NAME):
- sub_df = df.query(f'gh_id == "{gh_id}" and strategy_key == "{key}"')
- if len(sub_df) != SEND_N:
- raise Exception(f"Result not enough for gh_id[{gh_id}]")
- def build_and_transfer_data(run_dt, run_hour, project, **kwargs):
- dt_version = f'{run_dt}{run_hour}'
- dry_run = kwargs.get('dry_run', False)
- layer1_rank = rank_for_layer1(run_dt, run_hour, ODS_PROJECT, EXPLORE_POOL_TABLE)
- layer2_rank = rank_for_layer2(run_dt, run_hour, ODS_PROJECT, GH_REPLY_STATS_TABLE, ODPS_3RD_RANK_RESULT_TABLE)
- base_rank = rank_for_base(run_dt, run_hour, ODS_PROJECT, GH_REPLY_STATS_TABLE, ODPS_3RD_RANK_RESULT_TABLE, BASE_GROUP_NAME)
- final_rank_df = pd.concat([layer1_rank, layer2_rank, base_rank]).reset_index(drop=True)
- check_result_data(final_rank_df)
- odps_instance = get_odps_instance(project)
- odps_ranked_df = odps.DataFrame(final_rank_df)
- video_df = get_dataframe_from_odps('videoods', 'wx_video')
- video_df['cover_url'] = video_df['cover_img_path'] + CDN_IMG_OPERATOR
- video_df = video_df['id', 'title', 'cover_url']
- final_df = odps_ranked_df.join(video_df, on=('video_id', 'id'))
- final_df = final_df.to_pandas()
- final_df = final_df[['strategy_key', 'dt_version', 'gh_id', 'sort', 'video_id', 'title', 'cover_url', 'score']]
-
- final_df['sort'] = SEND_N + 1 - final_df['sort']
- if dry_run:
- print(final_df[['strategy_key', 'gh_id', 'sort', 'video_id', 'score', 'title']])
- return
-
- t = odps_instance.get_table(ODPS_3RD_RANK_RESULT_TABLE)
- part_spec_dict = {'dt': run_dt, 'hour': run_hour, 'ctime': dt_version}
- part_spec =','.join(['{}={}'.format(k, part_spec_dict[k]) for k in part_spec_dict.keys()])
- with t.open_writer(partition=part_spec, create_partition=True, overwrite=True) as writer:
- writer.write(list(final_df.itertuples(index=False)))
-
- data_to_insert = [tuple(row) for row in final_df.itertuples(index=False)]
- data_columns = list(final_df.columns)
- mysql = MysqlHelper(CONFIG.MYSQL_CRAWLER_INFO)
- mysql.batch_insert(RDS_RANK_RESULT_TABLE, data_to_insert, data_columns)
- def main_loop():
- argparser = ArgumentParser()
- argparser.add_argument('-n', '--dry-run', action='store_true')
- args = argparser.parse_args()
- try:
- now_date = datetime.today()
- LOGGER.info(f"开始执行: {datetime.strftime(now_date, '%Y-%m-%d %H:%M')}")
- now_hour = now_date.strftime("%H")
- last_date = now_date - timedelta(1)
- last_dt = last_date.strftime("%Y%m%d")
-
-
- h_data_count = check_data_partition(ODS_PROJECT, GH_REPLY_STATS_TABLE, last_dt, '00')
- if h_data_count > 0:
- LOGGER.info('上游数据表查询数据条数={},开始计算'.format(h_data_count))
- run_dt = now_date.strftime("%Y%m%d")
- LOGGER.info(f'run_dt: {run_dt}, run_hour: {now_hour}')
- build_and_transfer_data(run_dt, now_hour, ODS_PROJECT,
- dry_run=args.dry_run)
- LOGGER.info('数据更新完成')
- else:
- LOGGER.info("上游数据未就绪,等待60s")
- Timer(60, main_loop).start()
- return
- except Exception as e:
- LOGGER.error(f"数据更新失败, exception: {e}, traceback: {traceback.format_exc()}")
- if CONFIG.ENV_TEXT == '开发环境':
- return
- send_msg_to_feishu(
- webhook=CONFIG.FEISHU_ROBOT['server_robot'].get('webhook'),
- key_word=CONFIG.FEISHU_ROBOT['server_robot'].get('key_word'),
- msg_text=f"rov-offline{CONFIG.ENV_TEXT} - 数据更新失败\n"
- f"exception: {e}\n"
- f"traceback: {traceback.format_exc()}"
- )
- if __name__ == '__main__':
- LOGGER.info("%s 开始执行" % os.path.basename(__file__))
- LOGGER.info(f"environment: {CONFIG.ENV_TEXT}")
- main_loop()
|