Переглянути джерело

Update alg_growth_3rd_gh_reply_video_v1: support different send_n

StrayWarrior 8 місяців тому
батько
коміт
0119f18cad
1 змінених файлів з 90 додано та 21 видалено
  1. 90 21
      alg_growth_3rd_gh_reply_video_v1.py

+ 90 - 21
alg_growth_3rd_gh_reply_video_v1.py

@@ -28,6 +28,7 @@ EXPLORE1_GROUP_NAME = '3rd-party-explore1'
 EXPLORE2_GROUP_NAME = '3rd-party-explore2'
 # GH_IDS will be updated by get_and_update_gh_ids
 GH_IDS = ('default',)
+account_map = {}
 
 pd.set_option('display.max_rows', None)
 
@@ -40,14 +41,22 @@ ODPS_RANK_RESULT_TABLE = 'alg_3rd_gh_autoreply_video_rank_data'
 GH_DETAIL = 'gh_detail'
 RDS_RANK_RESULT_TABLE = 'alg_gh_autoreply_video_rank_data'
 STATS_PERIOD_DAYS = 5
-SEND_N = 1
+DEFAULT_SEND_N = 1
+MAX_SEND_N = 3
+
+default_video = {
+    '泛生活': [20463342, 14095344, 13737337],
+    '泛历史': [13586800, 12794884, 12117356],
+}
+
 
 def get_and_update_gh_ids(run_dt):
     db = MysqlHelper(CONFIG.MYSQL_GROWTH_INFO)
     gh_type = AutoReplyAccountType.EXTERNAL_GZH.value
     sqlstr = f"""
         SELECT gh_id, gh_name, category1, category2, channel,
-               video_ids, strategy_status
+               video_ids, strategy_status,
+               autoreply_send_minigram_num as send_n
         FROM {GH_DETAIL}
         WHERE is_delete = 0 AND `type` = {gh_type}
         """
@@ -63,6 +72,11 @@ def get_and_update_gh_ids(run_dt):
     account_df = account_df.drop_duplicates(subset=['gh_id'])
     global GH_IDS
     GH_IDS = tuple(account_df['gh_id'])
+    global account_map
+    account_map = { x['gh_id']: x for x in account_df.to_dict(orient='records') }
+    for gh_id in account_map:
+        account_info = account_map[gh_id]
+        account_info['send_n'] = account_info.get('send_n', 1)
     return account_df
 
 
@@ -134,13 +148,23 @@ def rank_for_layer1(run_dt, run_hour, project, table, gh):
     df['score'] = 1.0
     # 按照 category1 分类后进行加权随机抽样
     sampled_df = df.groupby('category1').apply(
-        lambda x: x.sample(n=SEND_N, weights=x['score'], replace=False)).reset_index(drop=True)
+        lambda x: x.sample(n=MAX_SEND_N, weights=x['score'], replace=False)).reset_index(drop=True)
     sampled_df['sort'] = sampled_df.groupby('category1')['score'].rank(method='first', ascending=False).astype(int)
     # 按得分排序
     sampled_df = sampled_df.sort_values(by=['category1', 'score'], ascending=[True, False]).reset_index(drop=True)
     sampled_df['strategy_key'] = EXPLORE1_GROUP_NAME
     sampled_df['dt_version'] = dt_version
-    extend_df = sampled_df.merge(gh, on='category1')
+
+    merged_dfs = []
+    for gh_id in GH_IDS:
+        sub_gh_df = gh.query(f'gh_id == "{gh_id}"')
+        account_info = account_map[gh_id]
+        send_n = account_info['send_n']
+        sub_video_df = sampled_df.query(f'sort <= {send_n}').copy()
+        merged_df = sub_video_df.merge(sub_gh_df, on='category1')
+        merged_df['sort'] = send_n + 1 - merged_df['sort']
+        merged_dfs.append(merged_df)
+    extend_df = pd.concat(merged_dfs)
     result_df = extend_df[['strategy_key', 'dt_version', 'gh_id', 'sort', 'video_id', 'score']]
     return result_df
 
@@ -160,7 +184,7 @@ def rank_for_layer2(run_dt, run_hour, project, stats_table, rank_table):
     sampled_dfs = []
     # 处理default逻辑(default-explore2)
     default_stats_df = stats_df.query('gh_id == "default"')
-    sampled_df = default_stats_df.sample(n=SEND_N, weights=default_stats_df['score'])
+    sampled_df = default_stats_df.sample(n=DEFAULT_SEND_N, weights=default_stats_df['score'])
     sampled_df['sort'] = range(1, len(sampled_df) + 1)
     sampled_dfs.append(sampled_df)
 
@@ -174,15 +198,37 @@ def rank_for_layer2(run_dt, run_hour, project, stats_table, rank_table):
     for gh_id in GH_IDS:
         if gh_id == 'default':
             continue
+        account_info = account_map[gh_id]
+        send_n = account_info['send_n']
         sub_df = df.query(f'gh_id == "{gh_id}"')
-        if len(sub_df) < SEND_N:
+        if len(sub_df) < send_n:
             LOGGER.warning(
                 "gh_id[{}] rows[{}] not enough for layer2, fallback to base"
                 .format(gh_id, len(sub_df)))
             sub_df = base_strategy_df.query(f'gh_id == "{gh_id}"').copy()
+            if len(sub_df) < send_n:
+                LOGGER.warning(
+                    "gh_id[{}] rows[{}] still not enough for layer2, add backup"
+                    .format(gh_id, len(sub_df)))
+                rows = []
+                idx = len(sub_df)
+                exist_video_ids = sub_df['video_id'].unique()
+                for video_id in default_video[account_info['category1']]:
+                    if video_id in exist_video_ids:
+                        continue
+                    row = {
+                        'gh_id': gh_id,
+                        'sort': idx + 1,
+                        'video_id': video_id,
+                        'strategy_key': '' # this is not important
+                    }
+                    rows.append(row)
+                appx_df = pd.DataFrame(rows)
+                sub_df = pd.concat([sub_df, appx_df])
             sub_df['score'] = sub_df['sort']
-        sampled_df = sub_df.sample(n=SEND_N, weights=sub_df['score'])
-        sampled_df['sort'] = range(1, len(sampled_df) + 1)
+
+        sampled_df = sub_df.sample(n=send_n, weights=sub_df['score'])
+        sampled_df['sort'] = range(send_n, 0, -1)
         sampled_dfs.append(sampled_df)
 
     extend_df = pd.concat(sampled_dfs)
@@ -227,7 +273,37 @@ def rank_for_base(run_dt, run_hour, project, stats_table, rank_table, stg_key):
         top_n['sort'] = range(1, len(top_n) + 1)
         return top_n
 
-    ranked_df = grouped_stats_df.groupby('gh_id').apply(set_top_n, SEND_N)
+    ranked_df = grouped_stats_df.groupby('gh_id').apply(set_top_n, MAX_SEND_N)
+    sampled_dfs = []
+    for gh_id in GH_IDS:
+        account_info = account_map[gh_id]
+        send_n = account_info['send_n']
+        sub_df = ranked_df.query(f'gh_id == "{gh_id}" and sort <= {send_n}').copy()
+        if len(sub_df) < send_n:
+            LOGGER.warning(
+                "gh_id[{}] rows[{}] still not enough for base, add backup"
+                .format(gh_id, len(sub_df)))
+            rows = []
+            idx = len(sub_df)
+            exist_video_ids = sub_df['video_id'].unique()
+            for video_id in default_video[account_info['category1']]:
+                if video_id in exist_video_ids:
+                    continue
+                row = {
+                    'gh_id': gh_id,
+                    'sort': idx + 1,
+                    'video_id': video_id,
+                    'score': 0.0,
+                    'strategy_key': '' # this is not important
+                }
+                rows.append(row)
+                if len(sub_df) + len(rows) >= send_n:
+                    break
+            appx_df = pd.DataFrame(rows)
+            sub_df = pd.concat([sub_df, appx_df])
+        sub_df['sort'] = send_n + 1 - sub_df['sort']
+        sampled_dfs.append(sub_df)
+    ranked_df = pd.concat(sampled_dfs)
     ranked_df = ranked_df.reset_index(drop=True)
     ranked_df['strategy_key'] = stg_key
     ranked_df['dt_version'] = dt_version
@@ -238,10 +314,11 @@ def rank_for_base(run_dt, run_hour, project, stats_table, rank_table, stg_key):
 def check_result_data(df):
     check_unsafe_video(df)
     for gh_id in GH_IDS:
+        account_info = account_map[gh_id]
         for key in (EXPLORE1_GROUP_NAME, EXPLORE2_GROUP_NAME, BASE_GROUP_NAME):
             sub_df = df.query(f'gh_id == "{gh_id}" and strategy_key == "{key}"')
             n_records = len(sub_df)
-            if n_records != SEND_N:
+            if n_records != account_info['send_n']:
                 raise Exception(f"Unexpected record count: {gh_id},{key},{n_records}")
 
 
@@ -256,11 +333,12 @@ def postprocess_override_by_config(df, gh_df, dt_version):
 
     for row in override_config:
         gh_id = row['gh_id']
+        account_info = account_map[gh_id]
         try:
             video_ids = json.loads(row['video_ids'])
             if not isinstance(video_ids, list):
                 raise Exception("video_ids is not list")
-            video_ids = video_ids[:SEND_N]
+            video_ids = video_ids[:account_info['send_n']]
         except Exception as e:
             LOGGER.error(f"json parse error: {e}. content: {row['video_ids']}")
             continue
@@ -294,9 +372,6 @@ def build_and_transfer_base_mode(gh_df, run_dt, run_hour, dt_version, dry_run):
 
     final_df = join_video_info(final_rank_df)
 
-    # reverse sending order
-    final_df['sort'] = SEND_N + 1 - final_df['sort']
-
     if dry_run:
         print("==== ALL DATA ====")
         print(final_df[['strategy_key', 'gh_id', 'sort', 'video_id', 'score', 'title']]
@@ -346,7 +421,6 @@ def build_and_transfer_delta_mode(account_df, dt_version, dry_run):
     # 获取最新策略信息, 策略表dt_version(ctime partition)采用当前时间
     last_strategy, last_dt_version = get_last_strategy_result(
         ODS_PROJECT, ODPS_RANK_RESULT_TABLE, dt_version, BASE_GROUP_NAME)
-    account_map = { x['gh_id']: x for x in account_df.to_dict(orient='records') }
     all_accounts = account_df['gh_id'].unique()
     accounts_in_strategy = last_strategy['gh_id'].unique()
     delta_accounts = [x for x in set(all_accounts) - set(accounts_in_strategy)]
@@ -357,11 +431,6 @@ def build_and_transfer_delta_mode(account_df, dt_version, dry_run):
         LOGGER.info('Found 0 new account, do nothing.')
         return
 
-    default_video = {
-        '泛生活': [20463342],
-        '泛历史': [13586800],
-    }
-
     # 新增账号,不存在历史,可直接忽略strategy_status字段
     # TODO: set default by history stats
     groups = (BASE_GROUP_NAME, EXPLORE1_GROUP_NAME, EXPLORE2_GROUP_NAME)
@@ -375,7 +444,7 @@ def build_and_transfer_delta_mode(account_df, dt_version, dry_run):
         else:
             video_ids = default_video[account_info['category1']]
         for group_key in groups:
-            for idx in range(SEND_N):
+            for idx in range(account_info['send_n']):
                 row = {
                     'strategy_key': group_key,
                     'gh_id': gh_id,