# -*- coding: utf-8 -*- import pandas as pd import traceback import odps from odps import ODPS from threading import Timer from datetime import datetime, timedelta from db_helper import MysqlHelper from my_utils import check_table_partition_exits_v2, get_dataframe_from_odps, \ get_odps_df_of_max_partition, get_odps_instance, get_odps_df_of_recent_partitions from my_utils import request_post, send_msg_to_feishu from my_config import set_config import numpy as np from log import Log import os from argparse import ArgumentParser CONFIG, _ = set_config() LOGGER = Log() BASE_GROUP_NAME = '3rd-party-base' EXPLORE1_GROUP_NAME = '3rd-party-explore1' EXPLORE2_GROUP_NAME = '3rd-party-explore2' # TODO: fetch gh_id from external data source GH_IDS = ( 'gh_2863155cedcb', 'gh_c1acd6bac0f8', 'gh_da993c5f7f64', 'gh_495d71abbda2', 'gh_e2318164f869', 'gh_fc4ec610756e', 'gh_2450ad774945', 'gh_175925e40318', 'gh_994adaf7a539', 'gh_250c51d5ce69', 'gh_37adcd637351', 'gh_a36405f4e5d3', 'gh_ee5b4b07ed8b', 'gh_11debb2c8392', 'gh_d645c1ef7fb0', 'gh_1899b728af86', 'gh_059a27ea86b2', 'gh_6454c103be14', 'gh_63745bad4f21', 'gh_8a29eebc2012', 'gh_57bc9846c86a', 'gh_570967881eae', 'gh_197a0d8caa31', 'gh_93af434e3f47', 'gh_184f2d765f55', 'gh_8157d8fd284e', 'gh_8e1d1f19d44f', 'gh_1da8f62f4a0d', 'gh_fd4df7c45bb9', 'gh_dcfcf74b0846', 'gh_3afc3a8b8a3d', 'gh_ef699270bf64', 'gh_ba870f8b178b', 'gh_58cdb2f1f0d0', 'gh_3dce33de6994', 'gh_543e6d7e15f3', 'gh_0d55fee6b78d', 'gh_1c11651e0df4', 'gh_7f4741dd5fea', 'gh_33e9e4cbed84', 'gh_23fa67ebb016', 'gh_33c26df4eab0', 'gh_01c93b07605f', 'gh_c655e3c8a121', 'gh_83adb78f4ede', 'gh_0cc7c6d712eb', 'gh_1b8bfb5c4ffd', 'gh_e4fb77b1023b', 'gh_f994f5d9a4b6', 'gh_e0ca8ba4ed91', 'gh_b2b4d5aa6b49', 'gh_53759f90b0c5', 'gh_d219a0cc8a35', 'gh_930b5ef5a185', 'gh_22cad14dc4ec', 'gh_8734c02f2983', 'gh_8d68e68f2d08', 'gh_c603033bf881', 'gh_55ac4e447179', 'gh_8b5c838ac19a', 'gh_aed71f26e7e6', 'gh_330ef0db846d', 'gh_87eca527c626', 'gh_7a14b4c15090', 'gh_b74693fed783', 'gh_e1594e6db64b', 'gh_d32daba8ccf8', 'gh_23e084923def', 'gh_148aa4a776ce', 'gh_0df4d6b647ea', 'gh_041d1c819c30', 'gh_7e33fbba4398', 'gh_354ab82cf9b3', 'gh_b1f1d7a1f351', 'gh_793647539ef5', 'gh_1ff0c29f8408', 'gh_ecef1c08bcf4', 'gh_22f53f6a2b5d', 'gh_34820675d0fc', 'gh_4175a8f745f6', 'gh_81145598368a', 'gh_5f0bb5822e10', 'gh_65d8db4e97ca', 'gh_a09594d52fda', 'gh_4411cf1e5f4e', 'gh_9ee5f5e8425f', 'gh_df24adad2521', 'gh_30b472377707', 'gh_bb6775b47656', 'gh_69808935bba0', 'gh_fb77872bf907', 'gh_830c4aa1b262', 'gh_b5393e35caa4', 'gh_fa7dceae7c9d', 'gh_449fb0c2d817', 'gh_d6e75ad9094f', 'gh_1cbb453a800e', 'gh_1b243f162cbd', 'gh_50db6881c86e', 'gh_9d94775e8137', 'gh_d37101fb9b98', 'gh_ed86d05703eb', 'gh_ac4072121e24', 'gh_620af8e24fb9', 'gh_ee4783ded544', 'gh_d2bb5f1b9498', 'gh_5044de6e1597', 'gh_d94de77a8d08', 'gh_98624814f69a', 'gh_4c38b9d4474a', 'gh_f2a6c90c56cb', 'gh_26f1353fda5a', 'gh_143743361496', 'gh_126c99b39cea', 'gh_53e6e0a1b1bd', 'gh_859aafbcda3d', 'gh_cfce2617bd82', 'gh_db8ea2bc6687', 'gh_c4708b8cfe39', 'gh_57d2388bd01d', 'gh_5fffe35cc12a', 'gh_45980a6448f3', 'gh_f5120c12ee23', 'gh_bf79e3645d7a', 'gh_6c6d81dd642d', 'gh_57ee6a6ef204', 'gh_45be25d1c06b', 'gh_3ee85ba7c3ae', 'gh_7c89d5a3e745', 'gh_c46be9ea4eef', 'gh_cedc3c4eb48b', 'gh_8a91fa7f32aa', 'gh_5207b355776f', 'gh_6c7f73de400b', 'gh_d2f3805f8fa3', 'gh_7dd47f8aca4e', 'gh_967f9abb9ccd', 'gh_f46c6c9b53fa', 'gh_086abf2a536b', 'gh_6e11282216f3', 'gh_f5332b8dfb63', 'gh_f78610e292ba', 'gh_06699758fa4b', 'gh_92323d0bea11', 'gh_517aed4e8197', 'gh_c80462b5a330', 'gh_1b1c3ced734e', 'gh_dd54e30b03ad', 'gh_cadd0ea4fab3', 'gh_ef07a709127e', 'gh_ab6ca922e605', 'gh_8b69b67ea723', 'gh_363c54315788', 'gh_a363987c60bf', 'gh_86ca35774fcf', 'gh_518694803ae7', 'gh_f98d5f17e9ea', 'gh_5e0cd3f7b457', 'gh_9e0d149e2c0a', 'gh_7e77b09bb4f5', 'gh_261bbd99a906', 'gh_2dc8e3a7b6c9', 'gh_1ec8dae66c97', 'gh_7f062810b4e7', 'gh_3c112c0c9c8b', 'gh_01cd19465b39', 'gh_8cc8ae6eb9a5', 'gh_210f7ce6f418', 'gh_04804a94e325', 'gh_4685665647f0', 'gh_d7fa96aeb839', 'gh_210cb680d83d', 'gh_862b00a394e3', 'gh_3cf7b310906a', 'gh_669555ebea28', 'gh_aaac62205137', 'gh_0a03f8fa63ba', 'gh_b8b2d4184832', 'gh_819a632d4bb1', 'gh_db09b87a0fc9', 'gh_b673c01e7bd8', 'gh_6da61a15044a', 'gh_2f1fab4efaef', 'gh_da22f64152d5', 'gh_ff9fe99f2097', 'gh_33731afddbdb', 'gh_4d2f75c3c3fe', 'gh_40ff43c50773', 'gh_56b65b7d4520', 'gh_ff16c412ab97', 'gh_8bf689ae15cc', 'gh_650b17dbba8f', 'gh_b63b9dde3f4b', 'gh_36e74017026e', 'gh_a8851bfa953b', 'gh_ec5beb465640', 'gh_133c36b99b14', 'gh_b144210318e5', 'gh_3bffce62dbb4', 'gh_2fbff340c683', 'gh_3ceae370dcf5', 'gh_530b634707b0', 'gh_b7cdece20099', 'gh_9e0c7a370aaf', 'gh_96412c0393e3', 'gh_c8060587e6d1', 'gh_0d3c97cc30cc', 'gh_491189a534f2', 'gh_fe9620386c2c', 'gh_9d50b7067f07', 'gh_e1331141406a', 'gh_d6db13fcf14d', 'gh_5522900b6a67', 'gh_a7c21403c493', 'gh_eeec7c2e28a5', 'gh_c783350a9660', ) TARGET_GH_IDS = ( 'gh_250c51d5ce69', 'gh_8a29eebc2012', 'gh_ff16c412ab97', 'gh_1014734791e0', 'gh_570967881eae', 'gh_a7c21403c493', 'gh_7f062810b4e7', 'gh_c8060587e6d1', 'gh_1da8f62f4a0d', 'gh_56b65b7d4520', 'gh_eeec7c2e28a5', 'gh_7c89d5a3e745', 'gh_ee5b4b07ed8b', 'gh_0d3c97cc30cc', 'gh_c783350a9660', ) CDN_IMG_OPERATOR = "?x-oss-process=image/resize,m_fill,w_600,h_480,limit_0/format,jpg/watermark,image_eXNoL3BpYy93YXRlcm1hcmtlci9pY29uX3BsYXlfd2hpdGUucG5nP3gtb3NzLXByb2Nlc3M9aW1hZ2UvcmVzaXplLHdfMTQ0,g_center" ODS_PROJECT = "loghubods" EXPLORE_POOL_TABLE = 'alg_growth_video_return_stats_history' GH_REPLY_STATS_TABLE = 'alg_growth_3rd_gh_reply_video_stats' # ODPS_RANK_RESULT_TABLE = 'alg_gh_autoreply_video_rank_data' ODPS_3RD_RANK_RESULT_TABLE = 'alg_3rd_gh_autoreply_video_rank_data' GH_DETAIL = 'gh_detail' RDS_RANK_RESULT_TABLE = 'alg_gh_autoreply_video_rank_data' STATS_PERIOD_DAYS = 5 SEND_N = 1 def check_data_partition(project, table, data_dt, data_hr=None): """检查数据是否准备好""" try: partition_spec = {'dt': data_dt} if data_hr: partition_spec['hour'] = data_hr part_exist, data_count = check_table_partition_exits_v2( project, table, partition_spec) except Exception as e: data_count = 0 return data_count def get_last_strategy_result(project, rank_table, dt_version, key): strategy_df = get_odps_df_of_max_partition( project, rank_table, {'ctime': dt_version} ).to_pandas() sub_df = strategy_df.query(f'strategy_key == "{key}"') sub_df = sub_df[['gh_id', 'video_id', 'strategy_key', 'sort']].drop_duplicates() return sub_df def process_reply_stats(project, table, period, run_dt): # 获取多天即转统计数据用于聚合 df = get_odps_df_of_recent_partitions(project, table, period, {'dt': run_dt}) df = df.to_pandas() df['video_id'] = df['video_id'].astype('int64') df = df[['gh_id', 'video_id', 'send_count', 'first_visit_uv', 'day0_return']] # 账号内聚合 df = df.groupby(['video_id', 'gh_id']).agg({ 'send_count': 'sum', 'first_visit_uv': 'sum', 'day0_return': 'sum' }).reset_index() # 聚合所有数据作为default default_stats_df = df.groupby('video_id').agg({ 'send_count': 'sum', 'first_visit_uv': 'sum', 'day0_return': 'sum' }).reset_index() default_stats_df['gh_id'] = 'default' merged_df = pd.concat([df, default_stats_df]).reset_index(drop=True) merged_df['score'] = merged_df['day0_return'] / (merged_df['send_count'] + 500) return merged_df def rank_for_layer1(run_dt, run_hour, gh): # TODO: 加审核&退场 gh = gh[gh['type'] == 2] df = get_odps_df_of_max_partition(ODS_PROJECT, EXPLORE_POOL_TABLE, {'dt': run_dt}) df = df.to_pandas() # 确保重跑时可获得一致结果 dt_version = f'{run_dt}{run_hour}' np.random.seed(int(dt_version) + 1) # TODO: 修改权重计算策略 df['score'] = df['ros'] # 按照 category1 分类后进行加权随机抽样 sampled_df = df.groupby('category1').apply( lambda x: x.sample(n=SEND_N, weights=x['score'], replace=False)).reset_index(drop=True) # 添加 'sort' 列 sampled_df['sort'] = sampled_df.groupby('category1')['score'].rank(method='first', ascending=False).astype(int) # 按得分排序 sampled_df = sampled_df.sort_values(by=['category1', 'score'], ascending=[True, False]).reset_index(drop=True) sampled_df['strategy_key'] = EXPLORE1_GROUP_NAME sampled_df['dt_version'] = dt_version extend_df = sampled_df.merge(gh, on='category1') result_df = extend_df[['strategy_key', 'dt_version', 'gh_id', 'sort', 'video_id', 'score']] return result_df def rank_for_layer2(run_dt, run_hour, project, stats_table, rank_table): stats_df = process_reply_stats(project, stats_table, STATS_PERIOD_DAYS, run_dt) # 确保重跑时可获得一致结果 dt_version = f'{run_dt}{run_hour}' np.random.seed(int(dt_version) + 1) # TODO: 计算账号间相关性 ## 账号两两组合,取有RoVn数值视频的交集,单个账号内的RoVn(平滑后)组成向量 ## 求向量相关系数或cosine相似度 ## 单个视频的RoVn加权求和 # 当前实现基础版本:只在账号内求二级探索排序分 sampled_dfs = [] # 处理default逻辑(default-explore2) default_stats_df = stats_df.query('gh_id == "default"') sampled_df = default_stats_df.sample(n=SEND_N, weights=default_stats_df['score']) sampled_df['sort'] = range(1, len(sampled_df) + 1) sampled_dfs.append(sampled_df) # 基础过滤for账号 df = stats_df.query('day0_return > 100') # fallback to base if necessary base_strategy_df = get_last_strategy_result( project, rank_table, dt_version, BASE_GROUP_NAME) for gh_id in GH_IDS: sub_df = df.query(f'gh_id == "{gh_id}"') if len(sub_df) < SEND_N: LOGGER.warning( "gh_id[{}] rows[{}] not enough for layer2, fallback to base" .format(gh_id, len(sub_df))) sub_df = base_strategy_df.query(f'gh_id == "{gh_id}"') sub_df['score'] = sub_df['sort'] sampled_df = sub_df.sample(n=SEND_N, weights=sub_df['score']) sampled_df['sort'] = range(1, len(sampled_df) + 1) sampled_dfs.append(sampled_df) extend_df = pd.concat(sampled_dfs) extend_df['strategy_key'] = EXPLORE2_GROUP_NAME extend_df['dt_version'] = dt_version result_df = extend_df[['strategy_key', 'dt_version', 'gh_id', 'sort', 'video_id', 'score']] return result_df def rank_for_base(run_dt, run_hour, project, stats_table, rank_table, stg_key): stats_df = process_reply_stats(project, stats_table, STATS_PERIOD_DAYS, run_dt) # TODO: support to set base manually dt_version = f'{run_dt}{run_hour}' # 获取当前base信息, 策略表dt_version(ctime partition)采用当前时间 base_strategy_df = get_last_strategy_result( project, rank_table, dt_version, stg_key) default_stats_df = stats_df.query('gh_id == "default"') # 在账号内排序,决定该账号(包括default)的base利用内容 # 排序过程中,确保当前base策略参与排序,因此先关联再过滤 gh_ids_str = ','.join(f'"{x}"' for x in GH_IDS) stats_df = stats_df.query(f'gh_id in ({gh_ids_str})') stats_with_strategy_df = stats_df \ .merge( base_strategy_df, on=['gh_id', 'video_id'], how='left') \ .query('strategy_key.notna() or score > 0.1') # 合并default和分账号数据 grouped_stats_df = pd.concat([default_stats_df, stats_with_strategy_df]).reset_index() def set_top_n(group, n=2): group_sorted = group.sort_values(by='score', ascending=False) top_n = group_sorted.head(n) top_n['sort'] = range(1, len(top_n) + 1) return top_n ranked_df = grouped_stats_df.groupby('gh_id').apply(set_top_n, SEND_N) ranked_df = ranked_df.reset_index(drop=True) ranked_df['strategy_key'] = stg_key ranked_df['dt_version'] = dt_version ranked_df = ranked_df[['strategy_key', 'dt_version', 'gh_id', 'sort', 'video_id', 'score']] return ranked_df def check_result_data(df): for gh_id in GH_IDS: for key in (EXPLORE1_GROUP_NAME, EXPLORE2_GROUP_NAME, BASE_GROUP_NAME): sub_df = df.query(f'gh_id == "{gh_id}" and strategy_key == "{key}"') if len(sub_df) != SEND_N: raise Exception(f"Result not enough for gh_id[{gh_id}]") def rank_for_base_designate(run_dt, run_hour, stg_key): dt_version = f'{run_dt}{run_hour}' ranked_df = pd.DataFrame() # 初始化一个空的 DataFrame for gh_id in GH_IDS: if gh_id in TARGET_GH_IDS: temp_df = pd.DataFrame({ 'strategy_key': [stg_key], 'dt_version': [dt_version], 'gh_id': [gh_id], 'sort': [1], 'video_id': [13586800], 'score': [0.5] }) else: temp_df = pd.DataFrame({ 'strategy_key': [stg_key], 'dt_version': [dt_version], 'gh_id': [gh_id], 'sort': [1], 'video_id': [20463342], 'score': [0.5] }) ranked_df = pd.concat([ranked_df, temp_df], ignore_index=True) return ranked_df def build_and_transfer_data(run_dt, run_hour, project, **kwargs): gh = get_odps_df_of_max_partition(ODS_PROJECT, GH_DETAIL, {'dt': run_dt}) gh = gh.to_pandas() gh = gh[gh['type'] == 2] if 'default' not in gh['gh_id'].values: # 如果没有,添加一行 new_row = pd.DataFrame({'gh_id': ['default'], 'gh_name': ['默认'], 'type': [2], 'category1': ['泛生活']}, index=[0]) # 使用pd.concat添加新行 gh = pd.concat([gh, new_row], ignore_index=True) gh = gh.drop_duplicates(subset=['gh_id']) gh_ids = tuple(gh['gh_id']) global GH_IDS GH_IDS = gh_ids dry_run = kwargs.get('dry_run', False) layer1_rank = rank_for_layer1(run_dt, run_hour, gh) # layer2_rank = rank_for_layer2(run_dt, run_hour, ODS_PROJECT, GH_REPLY_STATS_TABLE, ODPS_3RD_RANK_RESULT_TABLE) # base_rank = rank_for_base(run_dt, run_hour, ODS_PROJECT, GH_REPLY_STATS_TABLE, ODPS_3RD_RANK_RESULT_TABLE,BASE_GROUP_NAME) # layer1_rank = rank_for_base_designate(run_dt, run_hour, EXPLORE1_GROUP_NAME) layer2_rank = rank_for_base_designate(run_dt, run_hour, EXPLORE2_GROUP_NAME) base_rank = rank_for_base_designate(run_dt, run_hour, BASE_GROUP_NAME) final_rank_df = pd.concat([layer1_rank, layer2_rank, base_rank]).reset_index(drop=True) check_result_data(final_rank_df) odps_instance = get_odps_instance(project) odps_ranked_df = odps.DataFrame(final_rank_df) video_df = get_dataframe_from_odps('videoods', 'wx_video') video_df['cover_url'] = video_df['cover_img_path'] + CDN_IMG_OPERATOR video_df = video_df['id', 'title', 'cover_url'] final_df = odps_ranked_df.join(video_df, on=('video_id', 'id')) final_df = final_df.to_pandas() final_df = final_df[['strategy_key', 'dt_version', 'gh_id', 'sort', 'video_id', 'title', 'cover_url', 'score']] # reverse sending order final_df['sort'] = SEND_N + 1 - final_df['sort'] if dry_run: print(final_df[['strategy_key', 'gh_id', 'sort', 'video_id', 'score', 'title']]) return # save to ODPS # t = odps_instance.get_table(ODPS_3RD_RANK_RESULT_TABLE) # part_spec_dict = {'dt': run_dt, 'hour': run_hour, 'ctime': dt_version} # part_spec = ','.join(['{}={}'.format(k, part_spec_dict[k]) for k in part_spec_dict.keys()]) # with t.open_writer(partition=part_spec, create_partition=True, overwrite=True) as writer: # writer.write(list(final_df.itertuples(index=False))) # sync to MySQL data_to_insert = [tuple(row) for row in final_df.itertuples(index=False)] data_columns = list(final_df.columns) mysql = MysqlHelper(CONFIG.MYSQL_CRAWLER_INFO) mysql.batch_insert(RDS_RANK_RESULT_TABLE, data_to_insert, data_columns) def main_loop(): argparser = ArgumentParser() argparser.add_argument('-n', '--dry-run', action='store_true') args = argparser.parse_args() try: now_date = datetime.today() LOGGER.info(f"开始执行: {datetime.strftime(now_date, '%Y-%m-%d %H:%M')}") now_hour = now_date.strftime("%H") last_date = now_date - timedelta(1) last_dt = last_date.strftime("%Y%m%d") # 查看当前天级更新的数据是否已准备好 # 当前上游统计表为天级更新,但字段设计为兼容小时级 h_data_count = check_data_partition(ODS_PROJECT, GH_REPLY_STATS_TABLE, last_dt, '00') if h_data_count > 0: LOGGER.info('上游数据表查询数据条数={},开始计算'.format(h_data_count)) run_dt = now_date.strftime("%Y%m%d") LOGGER.info(f'run_dt: {run_dt}, run_hour: {now_hour}') build_and_transfer_data(run_dt, now_hour, ODS_PROJECT, dry_run=args.dry_run) LOGGER.info('数据更新完成') else: LOGGER.info("上游数据未就绪,等待60s") Timer(60, main_loop).start() return except Exception as e: LOGGER.error(f"数据更新失败, exception: {e}, traceback: {traceback.format_exc()}") if CONFIG.ENV_TEXT == '开发环境': return send_msg_to_feishu( webhook=CONFIG.FEISHU_ROBOT['server_robot'].get('webhook'), key_word=CONFIG.FEISHU_ROBOT['server_robot'].get('key_word'), msg_text=f"rov-offline{CONFIG.ENV_TEXT} - 数据更新失败\n" f"exception: {e}\n" f"traceback: {traceback.format_exc()}" ) if __name__ == '__main__': LOGGER.info("%s 开始执行" % os.path.basename(__file__)) LOGGER.info(f"environment: {CONFIG.ENV_TEXT}") main_loop()