Преглед на файлове

Merge branch 'dev-xym-category' of algorithm/rov-offline into master

fengzhoutian преди 6 месеца
родител
ревизия
ce2fb1fb1b
променени са 2 файла, в които са добавени 68 реда и са изтрити 32 реда
  1. 33 16
      alg_growth_3rd_gh_reply_video_v1.py
  2. 35 16
      alg_growth_gh_reply_video_v1.py

+ 33 - 16
alg_growth_3rd_gh_reply_video_v1.py

@@ -15,6 +15,7 @@ import numpy as np
 from log import Log
 import os
 from argparse import ArgumentParser
+from constants import AutoReplyAccountType
 
 CONFIG, _ = set_config()
 LOGGER = Log()
@@ -267,10 +268,26 @@ EXPLORE_POOL_TABLE = 'alg_growth_video_return_stats_history'
 GH_REPLY_STATS_TABLE = 'alg_growth_3rd_gh_reply_video_stats'
 # ODPS_RANK_RESULT_TABLE = 'alg_gh_autoreply_video_rank_data'
 ODPS_3RD_RANK_RESULT_TABLE = 'alg_3rd_gh_autoreply_video_rank_data'
+GH_DETAIL = 'gh_detail'
 RDS_RANK_RESULT_TABLE = 'alg_gh_autoreply_video_rank_data'
 STATS_PERIOD_DAYS = 5
 SEND_N = 1
 
+def get_and_update_gh_ids(run_dt):
+    gh = get_odps_df_of_max_partition(ODS_PROJECT, GH_DETAIL, {'dt': run_dt})
+    gh = gh.to_pandas()
+    gh = gh[gh['type'] == AutoReplyAccountType.EXTERNAL_GZH.value]
+    # default单独处理
+    if 'default' not in gh['gh_id'].values:
+        new_row = pd.DataFrame({'gh_id': ['default'], 'gh_name': ['默认'], 'type': [2], 'category1': ['泛生活']},
+                               index=[0])
+        gh = pd.concat([gh, new_row], ignore_index=True)
+
+    gh = gh.drop_duplicates(subset=['gh_id'])
+    global GH_IDS
+    GH_IDS = tuple(gh['gh_id'])
+    return gh
+
 
 def check_data_partition(project, table, data_dt, data_hr=None):
     """检查数据是否准备好"""
@@ -323,7 +340,7 @@ def process_reply_stats(project, table, period, run_dt):
     return merged_df
 
 
-def rank_for_layer1(run_dt, run_hour, project, table):
+def rank_for_layer1(run_dt, run_hour, project, table, gh):
     # TODO: 加审核&退场
     df = get_odps_df_of_max_partition(project, table, {'dt': run_dt})
     df = df.to_pandas()
@@ -333,17 +350,15 @@ def rank_for_layer1(run_dt, run_hour, project, table):
 
     # TODO: 修改权重计算策略
     df['score'] = df['ros']
-
-    sampled_df = df.sample(n=SEND_N, weights=df['score'])
-    sampled_df['sort'] = range(1, len(sampled_df) + 1)
+    # 按照 category1 分类后进行加权随机抽样
+    sampled_df = df.groupby('category1').apply(
+        lambda x: x.sample(n=SEND_N, weights=x['score'], replace=False)).reset_index(drop=True)
+    sampled_df['sort'] = sampled_df.groupby('category1')['score'].rank(method='first', ascending=False).astype(int)
+    # 按得分排序
+    sampled_df = sampled_df.sort_values(by=['category1', 'score'], ascending=[True, False]).reset_index(drop=True)
     sampled_df['strategy_key'] = EXPLORE1_GROUP_NAME
     sampled_df['dt_version'] = dt_version
-
-    gh_name_df = pd.DataFrame({'gh_id': GH_IDS + ('default',)})
-    sampled_df['_tmpkey'] = 1
-    gh_name_df['_tmpkey'] = 1
-    extend_df = sampled_df.merge(gh_name_df, on='_tmpkey').drop('_tmpkey', axis=1)
-
+    extend_df = sampled_df.merge(gh, on='category1')
     result_df = extend_df[['strategy_key', 'dt_version', 'gh_id', 'sort', 'video_id', 'score']]
     return result_df
 
@@ -375,6 +390,8 @@ def rank_for_layer2(run_dt, run_hour, project, stats_table, rank_table):
         project, rank_table, dt_version, BASE_GROUP_NAME)
 
     for gh_id in GH_IDS:
+        if gh_id == 'default':
+            continue
         sub_df = df.query(f'gh_id == "{gh_id}"')
         if len(sub_df) < SEND_N:
             LOGGER.warning(
@@ -428,7 +445,6 @@ def rank_for_base(run_dt, run_hour, project, stats_table, rank_table, stg_key):
 
     ranked_df = grouped_stats_df.groupby('gh_id').apply(set_top_n, SEND_N)
     ranked_df = ranked_df.reset_index(drop=True)
-    # ranked_df['sort'] = grouped_stats_df.groupby('gh_id')['score'].rank(ascending=False)
     ranked_df['strategy_key'] = stg_key
     ranked_df['dt_version'] = dt_version
     ranked_df = ranked_df[['strategy_key', 'dt_version', 'gh_id', 'sort', 'video_id', 'score']]
@@ -436,18 +452,18 @@ def rank_for_base(run_dt, run_hour, project, stats_table, rank_table, stg_key):
 
 
 def check_result_data(df):
-    for gh_id in GH_IDS + ('default',):
+    for gh_id in GH_IDS:
         for key in (EXPLORE1_GROUP_NAME, EXPLORE2_GROUP_NAME, BASE_GROUP_NAME):
             sub_df = df.query(f'gh_id == "{gh_id}" and strategy_key == "{key}"')
             if len(sub_df) != SEND_N:
-                raise Exception(f"Result not enough for gh_id[{gh_id}]")
+                raise Exception(f"Result not enough for gh_id[{gh_id}], group[{key}]")
 
 
 def rank_for_base_designate(run_dt, run_hour, stg_key):
     dt_version = f'{run_dt}{run_hour}'
     ranked_df = pd.DataFrame()  # 初始化一个空的 DataFrame
 
-    for gh_id in GH_IDS + ('default',):
+    for gh_id in GH_IDS:
         if gh_id in TARGET_GH_IDS:
             temp_df = pd.DataFrame({
                 'strategy_key': [stg_key],
@@ -474,10 +490,11 @@ def build_and_transfer_data(run_dt, run_hour, project, **kwargs):
     dt_version = f'{run_dt}{run_hour}'
     dry_run = kwargs.get('dry_run', False)
 
-    # layer1_rank = rank_for_layer1(run_dt, run_hour, ODS_PROJECT, EXPLORE_POOL_TABLE)
+    gh_df = get_and_update_gh_ids(run_dt)
+
+    layer1_rank = rank_for_layer1(run_dt, run_hour, ODS_PROJECT, EXPLORE_POOL_TABLE, ,gh_df)
     # layer2_rank = rank_for_layer2(run_dt, run_hour, ODS_PROJECT, GH_REPLY_STATS_TABLE, ODPS_3RD_RANK_RESULT_TABLE)
     # base_rank = rank_for_base(run_dt, run_hour, ODS_PROJECT, GH_REPLY_STATS_TABLE, ODPS_3RD_RANK_RESULT_TABLE,BASE_GROUP_NAME)
-    layer1_rank = rank_for_base_designate(run_dt, run_hour, EXPLORE1_GROUP_NAME)
     layer2_rank = rank_for_base_designate(run_dt, run_hour, EXPLORE2_GROUP_NAME)
     base_rank = rank_for_base_designate(run_dt, run_hour, BASE_GROUP_NAME)
 

+ 35 - 16
alg_growth_gh_reply_video_v1.py

@@ -15,6 +15,7 @@ import numpy as np
 from log import Log
 import os
 from argparse import ArgumentParser
+from constants import AutoReplyAccountType
 
 CONFIG, _ = set_config()
 LOGGER = Log()
@@ -22,7 +23,6 @@ LOGGER = Log()
 BASE_GROUP_NAME = 'stg0909-base'
 EXPLORE1_GROUP_NAME = 'stg0909-explore1'
 EXPLORE2_GROUP_NAME = 'stg0909-explore2'
-#TODO: fetch gh_id from external data source
 GH_IDS = ('gh_ac43e43b253b', 'gh_93e00e187787', 'gh_77f36c109fb1',
           'gh_68e7fdc09fe4', 'gh_b181786a6c8c')
 CDN_IMG_OPERATOR = "?x-oss-process=image/resize,m_fill,w_600,h_480,limit_0/format,jpg/watermark,image_eXNoL3BpYy93YXRlcm1hcmtlci9pY29uX3BsYXlfd2hpdGUucG5nP3gtb3NzLXByb2Nlc3M9aW1hZ2UvcmVzaXplLHdfMTQ0,g_center"
@@ -32,9 +32,28 @@ EXPLORE_POOL_TABLE = 'alg_growth_video_return_stats_history'
 GH_REPLY_STATS_TABLE = 'alg_growth_gh_reply_video_stats'
 ODPS_RANK_RESULT_TABLE = 'alg_gh_autoreply_video_rank_data'
 RDS_RANK_RESULT_TABLE = 'alg_gh_autoreply_video_rank_data'
+GH_DETAIL = 'gh_detail'
 STATS_PERIOD_DAYS = 5
 SEND_N = 2
 
+pd.set_option('display.max_rows', None)
+
+def get_and_update_gh_ids(run_dt):
+    gh = get_odps_df_of_max_partition(ODS_PROJECT, GH_DETAIL, {'dt': run_dt})
+    gh = gh.to_pandas()
+    gh = gh[gh['type'] == AutoReplyAccountType.SELF_OWNED_GZH.value]
+    # default单独处理
+    if 'default' not in gh['gh_id'].values:
+        new_row = pd.DataFrame({'gh_id': ['default'], 'gh_name': ['默认'], 'type': [2], 'category1': ['泛生活']},
+                               index=[0])
+        gh = pd.concat([gh, new_row], ignore_index=True)
+
+    gh = gh.drop_duplicates(subset=['gh_id'])
+    global GH_IDS
+    GH_IDS = tuple(gh['gh_id'])
+    return gh
+
+
 def check_data_partition(project, table, data_dt, data_hr=None):
     """检查数据是否准备好"""
     try:
@@ -86,27 +105,23 @@ def process_reply_stats(project, table, period, run_dt):
     return merged_df
 
 
-def rank_for_layer1(run_dt, run_hour, project, table):
+def rank_for_layer1(run_dt, run_hour, project, table, gh_df):
     # TODO: 加审核&退场
     df = get_odps_df_of_max_partition(project, table, {'dt': run_dt})
     df = df.to_pandas()
     # 确保重跑时可获得一致结果
     dt_version = f'{run_dt}{run_hour}'
-    np.random.seed(int(dt_version)+1)
+    np.random.seed(int(dt_version) + 1)
 
     # TODO: 修改权重计算策略
     df['score'] = df['ros']
-
-    sampled_df = df.sample(n=SEND_N, weights=df['score'])
-    sampled_df['sort'] = range(1, len(sampled_df) + 1)
+    # 按照 category1 分类后进行加权随机抽样
+    sampled_df = df.groupby('category1').apply(
+        lambda x: x.sample(n=SEND_N, weights=x['score'], replace=False)).reset_index(drop=True)
+    sampled_df['sort'] = sampled_df.groupby('category1')['score'].rank(method='first', ascending=False).astype(int)
     sampled_df['strategy_key'] = EXPLORE1_GROUP_NAME
     sampled_df['dt_version'] = dt_version
-
-    gh_name_df = pd.DataFrame({'gh_id': GH_IDS + ('default', )})
-    sampled_df['_tmpkey'] = 1
-    gh_name_df['_tmpkey'] = 1
-    extend_df = sampled_df.merge(gh_name_df, on='_tmpkey').drop('_tmpkey', axis=1)
-
+    extend_df = sampled_df.merge(gh_df, on='category1')
     result_df = extend_df[['strategy_key', 'dt_version', 'gh_id', 'sort', 'video_id', 'score']]
     return result_df
 
@@ -135,8 +150,9 @@ def rank_for_layer2(run_dt, run_hour, project, stats_table, rank_table):
     # fallback to base if necessary
     base_strategy_df = get_last_strategy_result(
         project, rank_table, dt_version, BASE_GROUP_NAME)
-
     for gh_id in GH_IDS:
+        if gh_id == 'default':
+            continue
         sub_df = df.query(f'gh_id == "{gh_id}"')
         if len(sub_df) < SEND_N:
             LOGGER.warning(
@@ -196,18 +212,21 @@ def rank_for_base(run_dt, run_hour, project, stats_table, rank_table):
 
 
 def check_result_data(df):
-    for gh_id in GH_IDS + ('default', ):
+    for gh_id in GH_IDS:
         for key in (EXPLORE1_GROUP_NAME, EXPLORE2_GROUP_NAME, BASE_GROUP_NAME):
             sub_df = df.query(f'gh_id == "{gh_id}" and strategy_key == "{key}"')
             if len(sub_df) != SEND_N:
-                raise Exception(f"Result not enough for gh_id[{gh_id}] group[{group}]")
+                raise Exception(f"Result not enough for gh_id[{gh_id}] group[{key}]")
 
 
 def build_and_transfer_data(run_dt, run_hour, project, **kwargs):
     dt_version = f'{run_dt}{run_hour}'
     dry_run = kwargs.get('dry_run', False)
 
-    layer1_rank = rank_for_layer1(run_dt, run_hour, ODS_PROJECT, EXPLORE_POOL_TABLE)
+    next_dt = (datetime.strptime(run_dt, "%Y%m%d") + timedelta(1)).strftime("%Y%m%d")
+    gh_df = get_and_update_gh_ids(next_dt)
+
+    layer1_rank = rank_for_layer1(run_dt, run_hour, ODS_PROJECT, EXPLORE_POOL_TABLE, gh_df)
     layer2_rank = rank_for_layer2(run_dt, run_hour, ODS_PROJECT,
                                   GH_REPLY_STATS_TABLE, ODPS_RANK_RESULT_TABLE)
     base_rank = rank_for_base(run_dt, run_hour, ODS_PROJECT,