# -*- coding: utf-8 -*- import pandas as pd import traceback import odps from odps import ODPS import json import time from pymysql.cursors import DictCursor from datetime import datetime, timedelta from db_helper import MysqlHelper from my_utils import check_table_partition_exits_v2, get_dataframe_from_odps, \ get_odps_df_of_max_partition, get_odps_instance, get_odps_df_of_recent_partitions from my_utils import request_post, send_msg_to_feishu from my_config import set_config import numpy as np from log import Log import os from argparse import ArgumentParser from constants import AutoReplyAccountType from alg_growth_common import check_unsafe_video, filter_unsafe_video, filter_audit_failed_video CONFIG, _ = set_config() LOGGER = Log() BASE_GROUP_NAME = '3rd-party-base' EXPLORE1_GROUP_NAME = '3rd-party-explore1' EXPLORE2_GROUP_NAME = '3rd-party-explore2' # GH_IDS will be updated by get_and_update_gh_ids GH_IDS = ('default',) account_map = {} pd.set_option('display.max_rows', None) CDN_IMG_OPERATOR = "?x-oss-process=image/resize,m_fill,w_600,h_480,limit_0/format,jpg/watermark,image_eXNoL3BpYy93YXRlcm1hcmtlci9pY29uX3BsYXlfd2hpdGUucG5nP3gtb3NzLXByb2Nlc3M9aW1hZ2UvcmVzaXplLHdfMTQ0,g_center" ODS_PROJECT = "loghubods" EXPLORE_POOL_TABLE = 'alg_growth_video_return_stats_history' GH_REPLY_STATS_TABLE = 'alg_growth_3rd_gh_reply_video_stats' ODPS_RANK_RESULT_TABLE = 'alg_3rd_gh_autoreply_video_rank_data' GH_DETAIL = 'gh_detail' RDS_RANK_RESULT_TABLE = 'alg_gh_autoreply_video_rank_data' STATS_PERIOD_DAYS = 5 DEFAULT_SEND_N = 1 MAX_SEND_N = 3 default_video = { '泛生活': [20463342, 14095344, 13737337], '泛历史': [13586800, 12794884, 12117356], } def get_and_update_gh_ids(run_dt): db = MysqlHelper(CONFIG.MYSQL_GROWTH_INFO) gh_type = AutoReplyAccountType.EXTERNAL_GZH.value sqlstr = f""" SELECT gh_id, gh_name, category1, category2, channel, video_ids, strategy_status, autoreply_send_minigram_num as send_n FROM {GH_DETAIL} WHERE is_delete = 0 AND `type` = {gh_type} """ account_data = db.get_data(sqlstr, DictCursor) account_df = pd.DataFrame(account_data) # default单独处理 if 'default' not in account_df['gh_id'].values: new_row = pd.DataFrame({'gh_id': ['default'], 'gh_name': ['默认'], 'type': [2], 'category1': ['泛生活'], 'send_n': 1}, index=[0]) account_df = pd.concat([account_df, new_row], ignore_index=True) account_df = account_df.drop_duplicates(subset=['gh_id']) global GH_IDS GH_IDS = tuple(account_df['gh_id']) global account_map account_map = { x['gh_id']: x for x in account_df.to_dict(orient='records') } for gh_id in account_map: account_info = account_map[gh_id] account_info['send_n'] = int(account_info.get('send_n', 1)) return account_df def check_data_partition(project, table, data_dt, data_hr=None): """检查数据是否准备好""" try: partition_spec = {'dt': data_dt} if data_hr: partition_spec['hour'] = data_hr part_exist, data_count = check_table_partition_exits_v2( project, table, partition_spec) except Exception as e: data_count = 0 return data_count def get_last_strategy_result(project, rank_table, dt_version, key): strategy_df = get_odps_df_of_max_partition( project, rank_table, {'ctime': dt_version} ).to_pandas() dt_version = strategy_df.iloc[0]['dt_version'] sub_df = strategy_df.query(f'strategy_key == "{key}"') sub_df = sub_df[['gh_id', 'video_id', 'strategy_key', 'sort']].drop_duplicates() return sub_df, dt_version def process_reply_stats(project, table, period, run_dt): # 获取多天即转统计数据用于聚合 df = get_odps_df_of_recent_partitions(project, table, period, {'dt': run_dt}) df = df.to_pandas() df['video_id'] = df['video_id'].astype('int64') df = df[['gh_id', 'video_id', 'send_count', 'first_visit_uv', 'day0_return']] # 获取统计数据时统一去除不安全视频 df = filter_unsafe_video(df) # 账号内聚合 df = df.groupby(['video_id', 'gh_id']).agg({ 'send_count': 'sum', 'first_visit_uv': 'sum', 'day0_return': 'sum' }).reset_index() # 聚合所有数据作为default default_stats_df = df.groupby('video_id').agg({ 'send_count': 'sum', 'first_visit_uv': 'sum', 'day0_return': 'sum' }).reset_index() default_stats_df['gh_id'] = 'default' merged_df = pd.concat([df, default_stats_df]).reset_index(drop=True) merged_df['score'] = merged_df['day0_return'] / (merged_df['first_visit_uv'] + 100) return merged_df def rank_for_layer1(run_dt, run_hour, project, table, gh): # TODO: 加审核&退场 df = get_odps_df_of_max_partition(project, table, {'dt': run_dt}) df = df.to_pandas() df = filter_unsafe_video(df) # 确保重跑时可获得一致结果 dt_version = f'{run_dt}{run_hour}' np.random.seed(int(dt_version) + 1) # TODO: 修改权重计算策略 df['score'] = 1.0 # 按照 category1 分类后进行加权随机抽样 sampled_df = df.groupby('category1').apply( lambda x: x.sample(n=MAX_SEND_N, weights=x['score'], replace=False)).reset_index(drop=True) sampled_df['sort'] = sampled_df.groupby('category1')['score'].rank(method='first', ascending=False).astype(int) # 按得分排序 sampled_df = sampled_df.sort_values(by=['category1', 'score'], ascending=[True, False]).reset_index(drop=True) sampled_df['strategy_key'] = EXPLORE1_GROUP_NAME sampled_df['dt_version'] = dt_version merged_dfs = [] for gh_id in GH_IDS: sub_gh_df = gh.query(f'gh_id == "{gh_id}"') account_info = account_map[gh_id] send_n = account_info['send_n'] sub_video_df = sampled_df.query(f'sort <= {send_n}').copy() merged_df = sub_video_df.merge(sub_gh_df, on='category1') merged_df['sort'] = send_n + 1 - merged_df['sort'] merged_dfs.append(merged_df) extend_df = pd.concat(merged_dfs) result_df = extend_df[['strategy_key', 'dt_version', 'gh_id', 'sort', 'video_id', 'score']] return result_df def rank_for_layer2(run_dt, run_hour, project, stats_table, rank_table): stats_df = process_reply_stats(project, stats_table, STATS_PERIOD_DAYS, run_dt) # 确保重跑时可获得一致结果 dt_version = f'{run_dt}{run_hour}' np.random.seed(int(dt_version) + 1) # TODO: 计算账号间相关性 ## 账号两两组合,取有RoVn数值视频的交集,单个账号内的RoVn(平滑后)组成向量 ## 求向量相关系数或cosine相似度 ## 单个视频的RoVn加权求和 # 当前实现基础版本:只在账号内求二级探索排序分 sampled_dfs = [] # 处理default逻辑(default-explore2) default_stats_df = stats_df.query('gh_id == "default"') sampled_df = default_stats_df.sample(n=DEFAULT_SEND_N, weights=default_stats_df['score']) sampled_df['sort'] = range(1, len(sampled_df) + 1) sampled_dfs.append(sampled_df) # 基础过滤for账号 df = stats_df.query('send_count > 200 and score > 0') df = filter_audit_failed_video(df) # fallback to base if necessary base_strategy_df, _ = get_last_strategy_result( project, rank_table, dt_version, BASE_GROUP_NAME) base_strategy_df = filter_audit_failed_video(base_strategy_df) for gh_id in GH_IDS: if gh_id == 'default': continue account_info = account_map[gh_id] send_n = account_info['send_n'] sub_df = df.query(f'gh_id == "{gh_id}"') if len(sub_df) < send_n: LOGGER.warning( "gh_id[{}] rows[{}] not enough for layer2, fallback to base" .format(gh_id, len(sub_df))) sub_df = base_strategy_df.query(f'gh_id == "{gh_id}"').copy() if len(sub_df) < send_n: LOGGER.warning( "gh_id[{}] rows[{}] still not enough for layer2, add backup" .format(gh_id, len(sub_df))) rows = [] idx = len(sub_df) exist_video_ids = sub_df['video_id'].unique() for video_id in default_video[account_info['category1']]: if video_id in exist_video_ids: continue row = { 'gh_id': gh_id, 'sort': idx + 1, 'video_id': video_id, 'strategy_key': '' # this is not important } rows.append(row) appx_df = pd.DataFrame(rows) sub_df = pd.concat([sub_df, appx_df]) sub_df['score'] = sub_df['sort'] sampled_df = sub_df.sample(n=send_n, weights=sub_df['score']) sampled_df['sort'] = range(send_n, 0, -1) sampled_dfs.append(sampled_df) extend_df = pd.concat(sampled_dfs) extend_df['strategy_key'] = EXPLORE2_GROUP_NAME extend_df['dt_version'] = dt_version result_df = extend_df[['strategy_key', 'dt_version', 'gh_id', 'sort', 'video_id', 'score']] return result_df def rank_for_base(run_dt, run_hour, project, stats_table, rank_table, stg_key): stats_df = process_reply_stats(project, stats_table, STATS_PERIOD_DAYS, run_dt) # TODO: support to set base manually dt_version = f'{run_dt}{run_hour}' # 获取当前base信息, 策略表dt_version(ctime partition)采用当前时间 base_strategy_df, _ = get_last_strategy_result( project, rank_table, dt_version, stg_key) default_stats_df = stats_df.query('gh_id == "default"') # 在账号内排序,决定该账号(包括default)的base利用内容 # 排序过程中,确保当前base策略参与排序,因此先关联再过滤 non_default_ids = list(filter(lambda x: x != 'default', GH_IDS)) gh_ids_str = ','.join(f'"{x}"' for x in non_default_ids) stats_df = stats_df.query(f'gh_id in ({gh_ids_str})') stats_with_strategy_df = stats_df \ .merge( base_strategy_df, on=['gh_id', 'video_id'], how='outer') \ .query('strategy_key.notna() or (send_count > 500 and score > 0.05)') \ .fillna({'score': 0.0}) # 合并default和分账号数据 grouped_stats_df = pd.concat([default_stats_df, stats_with_strategy_df]).reset_index() grouped_stats_df = filter_audit_failed_video(grouped_stats_df) def set_top_n(group, n=2): group_sorted = group.sort_values(by='score', ascending=False) top_n = group_sorted.head(n) top_n['sort'] = range(1, len(top_n) + 1) return top_n ranked_df = grouped_stats_df.groupby('gh_id').apply(set_top_n, MAX_SEND_N) sampled_dfs = [] for gh_id in GH_IDS: account_info = account_map[gh_id] send_n = account_info['send_n'] sub_df = ranked_df.query(f'gh_id == "{gh_id}" and sort <= {send_n}').copy() if len(sub_df) < send_n: LOGGER.warning( "gh_id[{}] rows[{}] still not enough for base, add backup" .format(gh_id, len(sub_df))) rows = [] idx = len(sub_df) exist_video_ids = sub_df['video_id'].unique() for video_id in default_video[account_info['category1']]: if video_id in exist_video_ids: continue row = { 'gh_id': gh_id, 'sort': idx + 1, 'video_id': video_id, 'score': 0.0, 'strategy_key': '' # this is not important } rows.append(row) if len(sub_df) + len(rows) >= send_n: break appx_df = pd.DataFrame(rows) sub_df = pd.concat([sub_df, appx_df]) sub_df['sort'] = send_n + 1 - sub_df['sort'] sampled_dfs.append(sub_df) ranked_df = pd.concat(sampled_dfs) ranked_df = ranked_df.reset_index(drop=True) ranked_df['strategy_key'] = stg_key ranked_df['dt_version'] = dt_version ranked_df = ranked_df[['strategy_key', 'dt_version', 'gh_id', 'sort', 'video_id', 'score']] return ranked_df def check_result_data(df): check_unsafe_video(df) for gh_id in GH_IDS: account_info = account_map[gh_id] for key in (EXPLORE1_GROUP_NAME, EXPLORE2_GROUP_NAME, BASE_GROUP_NAME): sub_df = df.query(f'gh_id == "{gh_id}" and strategy_key == "{key}"') n_records = len(sub_df) if n_records != account_info['send_n']: raise Exception(f"Unexpected record count: {gh_id},{key},{n_records}") def postprocess_override_by_config(df, gh_df, dt_version): override_config = gh_df.query('strategy_status == 0').to_dict(orient='records') override_data = { 'strategy_key': [], 'gh_id': [], 'sort': [], 'video_id': [] } for row in override_config: gh_id = row['gh_id'] account_info = account_map[gh_id] try: video_ids = json.loads(row['video_ids']) if not isinstance(video_ids, list): raise Exception("video_ids is not list") video_ids = video_ids[:account_info['send_n']] except Exception as e: LOGGER.error(f"json parse error: {e}. content: {row['video_ids']}") continue for idx, video_id in enumerate(video_ids): for key in (BASE_GROUP_NAME, EXPLORE1_GROUP_NAME, EXPLORE2_GROUP_NAME): df = df.drop(df.query( f'gh_id == "{gh_id}" and strategy_key == "{key}" and sort == {idx + 1}' ).index) override_data['strategy_key'].append(key) override_data['gh_id'].append(gh_id) override_data['sort'].append(idx + 1) override_data['video_id'].append(video_id) n_records = len(override_data['strategy_key']) override_data['dt_version'] = [dt_version] * n_records override_data['score'] = [0.0] * n_records df_to_append = pd.DataFrame(override_data) df = pd.concat([df, df_to_append], ignore_index=True) # 强制更换不安全视频 idx = df[df['video_id'] == 14403867].index df.loc[idx, 'video_id'] = 20463342 return df def build_and_transfer_base_mode(gh_df, run_dt, run_hour, dt_version, dry_run): layer1_rank = rank_for_layer1(run_dt, run_hour, ODS_PROJECT, EXPLORE_POOL_TABLE, gh_df) layer2_rank = rank_for_layer2(run_dt, run_hour, ODS_PROJECT, GH_REPLY_STATS_TABLE, ODPS_RANK_RESULT_TABLE) base_rank = rank_for_base(run_dt, run_hour, ODS_PROJECT, GH_REPLY_STATS_TABLE, ODPS_RANK_RESULT_TABLE,BASE_GROUP_NAME) final_rank_df = pd.concat([layer1_rank, layer2_rank, base_rank]).reset_index(drop=True) final_rank_df = postprocess_override_by_config(final_rank_df, gh_df, dt_version) check_result_data(final_rank_df) final_df = join_video_info(final_rank_df) if dry_run: print("==== ALL DATA ====") print(final_df[['strategy_key', 'gh_id', 'sort', 'video_id', 'score', 'title']] .sort_values(by=['strategy_key', 'gh_id', 'sort'])) last_odps_df = get_odps_df_of_max_partition( ODS_PROJECT, ODPS_RANK_RESULT_TABLE, {'ctime': dt_version} ).to_pandas() merged_df = last_odps_df.merge( final_df, on=['strategy_key', 'gh_id', 'sort'], how='outer') delta_df = merged_df.query('title_x != title_y') delta_df = delta_df[['strategy_key', 'gh_id', 'sort', 'title_x', 'score_x', 'title_y', 'score_y']] delta_df.to_csv('tmp_delta_data.csv') return # save to ODPS odps_instance = get_odps_instance(ODS_PROJECT) t = odps_instance.get_table(ODPS_RANK_RESULT_TABLE) part_spec_dict = {'dt': run_dt, 'hour': run_hour, 'ctime': dt_version} part_spec = ','.join(['{}={}'.format(k, part_spec_dict[k]) for k in part_spec_dict.keys()]) with t.open_writer(partition=part_spec, create_partition=True, overwrite=True) as writer: writer.write(list(final_df.itertuples(index=False))) # sync to MySQL data_to_insert = [tuple(row) for row in final_df.itertuples(index=False)] data_columns = list(final_df.columns) max_time_to_delete = datetime.now().strftime("%Y-%m-%d %H:%M:%S") mysql = MysqlHelper(CONFIG.MYSQL_GROWTH_INFO) mysql.batch_insert(RDS_RANK_RESULT_TABLE, data_to_insert, data_columns) # remove old data of same version for key in final_df['strategy_key'].unique(): sql = f""" update {RDS_RANK_RESULT_TABLE} set is_delete = 1 where dt_version = '{dt_version}' and strategy_key = '{key}' and create_time < '{max_time_to_delete}' and is_delete = 0 """ rows = mysql.execute(sql) def build_and_transfer_delta_mode(account_df, dt_version, dry_run): # 获取最新策略信息, 策略表dt_version(ctime partition)采用当前时间 last_strategy, last_dt_version = get_last_strategy_result( ODS_PROJECT, ODPS_RANK_RESULT_TABLE, dt_version, BASE_GROUP_NAME) all_accounts = account_df['gh_id'].unique() accounts_in_strategy = last_strategy['gh_id'].unique() delta_accounts = [x for x in set(all_accounts) - set(accounts_in_strategy)] if len(delta_accounts) > 0: LOGGER.info('Found {} new accounts: {}'.format( len(delta_accounts), ','.join(delta_accounts))) else: LOGGER.info('Found 0 new account, do nothing.') return # 新增账号,不存在历史,可直接忽略strategy_status字段 # TODO: set default by history stats groups = (BASE_GROUP_NAME, EXPLORE1_GROUP_NAME, EXPLORE2_GROUP_NAME) rows = [] for gh_id in delta_accounts: account_info = account_map[gh_id] configured_videos = account_info['video_ids'] video_ids = default_video[account_info['category1']] if configured_videos: LOGGER.info(f'{gh_id} has configured video IDs: {configured_videos}') try: video_ids = [int(x) for x in configured_videos.split(',')] except Exception as e: print('invalid configured video_ids, use default instead') for group_key in groups: for idx in range(account_info['send_n']): row = { 'strategy_key': group_key, 'gh_id': gh_id, 'sort': idx + 1, 'video_id': video_ids[idx], 'dt_version': last_dt_version, 'score': 0.0 } rows.append(row) df = pd.DataFrame(rows) final_df = join_video_info(df) if dry_run: print(final_df) return # 增量记录更新至MySQL data_to_insert = [tuple(row) for row in final_df.itertuples(index=False)] data_columns = list(final_df.columns) mysql = MysqlHelper(CONFIG.MYSQL_GROWTH_INFO) mysql.batch_insert(RDS_RANK_RESULT_TABLE, data_to_insert, data_columns) # 全量记录写回ODPS last_odps_df = get_odps_df_of_max_partition( ODS_PROJECT, ODPS_RANK_RESULT_TABLE, {'ctime': dt_version} ).to_pandas() updated_odps_df = pd.concat([final_df, last_odps_df], ignore_index=True) odps_instance = get_odps_instance(ODS_PROJECT) t = odps_instance.get_table(ODPS_RANK_RESULT_TABLE) target_dt = last_dt_version[0:8] target_hour = last_dt_version[8:10] part_spec_dict = {'dt': target_dt, 'hour': target_hour, 'ctime': last_dt_version} part_spec = ','.join(['{}={}'.format(k, part_spec_dict[k]) for k in part_spec_dict.keys()]) with t.open_writer(partition=part_spec, create_partition=True, overwrite=True) as writer: writer.write(list(updated_odps_df.itertuples(index=False))) def join_video_info(df): db = MysqlHelper(CONFIG.MYSQL_INFO) video_ids = df['video_id'].unique().tolist() video_ids_str = ','.join([str(x) for x in video_ids]) sql = f""" SELECT id as video_id, title, cover_img_path FROM wx_video WHERE id in ({video_ids_str}) """ rows = db.get_data(sql, DictCursor) video_df = pd.DataFrame(rows) video_df['cover_url'] = video_df['cover_img_path'] + CDN_IMG_OPERATOR final_df = df.merge(video_df, on='video_id') # odps_instance = get_odps_instance(ODS_PROJECT) # odps_df = odps.DataFrame(df) # video_df = get_dataframe_from_odps('videoods', 'wx_video') # video_df['cover_url'] = video_df['cover_img_path'] + CDN_IMG_OPERATOR # video_df = video_df['id', 'title', 'cover_url'] # final_df = odps_df.join(video_df, on=('video_id', 'id')) # final_df = final_df.to_pandas() final_df = final_df[['strategy_key', 'dt_version', 'gh_id', 'sort', 'video_id', 'title', 'cover_url', 'score']] return final_df def build_and_transfer_data(run_dt, run_hour, **kwargs): dt_version = f'{run_dt}{run_hour}' dry_run = kwargs.get('dry_run', False) mode = kwargs.get('mode') gh_df = get_and_update_gh_ids(run_dt) if mode == 'delta': return build_and_transfer_delta_mode(gh_df, dt_version, dry_run) else: return build_and_transfer_base_mode(gh_df, run_dt, run_hour, dt_version, dry_run) def main(): LOGGER.info("%s 开始执行" % os.path.basename(__file__)) LOGGER.info(f"environment: {CONFIG.ENV_TEXT}") argparser = ArgumentParser() argparser.add_argument('-n', '--dry-run', action='store_true') argparser.add_argument('--run-at',help='assume to run at date and hour, yyyyMMddHH') argparser.add_argument('--mode', default='base', choices=['base', 'delta'], help='run mode') args = argparser.parse_args() run_date = datetime.today() if args.run_at: run_date = datetime.strptime(args.run_at, "%Y%m%d%H") LOGGER.info(f"Assume to run at {run_date.strftime('%Y-%m-%d %H:00')}") try: now_date = datetime.today() LOGGER.info(f"开始执行: {datetime.strftime(now_date, '%Y-%m-%d %H:%M')}") last_date = run_date - timedelta(1) last_dt = last_date.strftime("%Y%m%d") # 查看当前天级更新的数据是否已准备好 # 当前上游统计表为天级更新,但字段设计为兼容小时级 while True: h_data_count = check_data_partition(ODS_PROJECT, GH_REPLY_STATS_TABLE, last_dt, '00') if h_data_count > 0: LOGGER.info('上游数据表查询数据条数={},开始计算'.format(h_data_count)) run_dt = run_date.strftime("%Y%m%d") run_hour = run_date.strftime("%H") LOGGER.info(f'run_dt: {run_dt}, run_hour: {run_hour}') build_and_transfer_data(run_dt, run_hour, dry_run=args.dry_run, mode=args.mode) LOGGER.info('数据更新完成') return else: LOGGER.info("上游数据未就绪,等待60s") time.sleep(60) except Exception as e: LOGGER.error(f"数据更新失败, exception: {e}, traceback: {traceback.format_exc()}") if CONFIG.ENV_TEXT == '开发环境' or args.dry_run: return send_msg_to_feishu( webhook=CONFIG.FEISHU_ROBOT['growth_task_robot'].get('webhook'), key_word=CONFIG.FEISHU_ROBOT['growth_task_robot'].get('key_word'), msg_text=f"rov-offline{CONFIG.ENV_TEXT} - 数据更新失败\n" f"exception: {e}\n" f"traceback: {traceback.format_exc()}" ) if __name__ == '__main__': main()