Forráskód Böngészése

Merge branch 'feature/20241205-auto-refresh-gh' of algorithm/rov-offline into master

fengzhoutian 4 hónapja
szülő
commit
f5fe6c4e52
2 módosított fájl, 42 hozzáadás és 25 törlés
  1. 38 21
      alg_growth_3rd_gh_reply_video_v1.py
  2. 4 4
      my_config.py

+ 38 - 21
alg_growth_3rd_gh_reply_video_v1.py

@@ -239,9 +239,8 @@ def check_result_data(df):
                 raise Exception(f"Unexpected record count: {gh_id},{key},{n_records}")
 
 
-def postprocess_override_by_config(df, dt_version):
-    return df
-    config = json.load(open("configs/3rd_gh_reply_video.json"))
+def postprocess_override_by_config(df, gh_df, dt_version):
+    override_config = gh_df.query('strategy_status == 0').to_dict(orient='records')
     override_data = {
         'strategy_key': [],
         'gh_id': [],
@@ -249,17 +248,24 @@ def postprocess_override_by_config(df, dt_version):
         'video_id': []
     }
 
-    for gh_id in config:
-        gh_config = config[gh_id]
-        for key in gh_config:
-            for video_config in gh_config[key]:
-                # remove current
-                position = video_config['position']
-                video_id = video_config['video_id']
-                df = df.drop(df.query(f'gh_id == "{gh_id}" and strategy_key == "{key}" and sort == {position}').index)
+    for row in override_config:
+        gh_id = row['gh_id']
+        try:
+            video_ids = json.loads(row['video_ids'])
+            if not isinstance(video_ids, list):
+                raise Exception("video_ids is not list")
+            video_ids = video_ids[:SEND_N]
+        except Exception as e:
+            LOGGER.error(f"json parse error: {e}. content: {row['video_ids']}")
+            continue
+        for idx, video_id in enumerate(video_ids):
+            for key in (BASE_GROUP_NAME, EXPLORE1_GROUP_NAME, EXPLORE2_GROUP_NAME):
+                df = df.drop(df.query(
+                    f'gh_id == "{gh_id}" and strategy_key == "{key}" and sort == {idx + 1}'
+                ).index)
                 override_data['strategy_key'].append(key)
                 override_data['gh_id'].append(gh_id)
-                override_data['sort'].append(position)
+                override_data['sort'].append(idx + 1)
                 override_data['video_id'].append(video_id)
     n_records = len(override_data['strategy_key'])
     override_data['dt_version'] = [dt_version] * n_records
@@ -268,14 +274,13 @@ def postprocess_override_by_config(df, dt_version):
     df = pd.concat([df, df_to_append], ignore_index=True)
     return df
 
-
 def build_and_transfer_base_mode(gh_df, run_dt, run_hour, dt_version, dry_run):
     layer1_rank = rank_for_layer1(run_dt, run_hour, ODS_PROJECT, EXPLORE_POOL_TABLE, gh_df)
     layer2_rank = rank_for_layer2(run_dt, run_hour, ODS_PROJECT, GH_REPLY_STATS_TABLE, ODPS_RANK_RESULT_TABLE)
     base_rank = rank_for_base(run_dt, run_hour, ODS_PROJECT, GH_REPLY_STATS_TABLE, ODPS_RANK_RESULT_TABLE,BASE_GROUP_NAME)
 
     final_rank_df = pd.concat([layer1_rank, layer2_rank, base_rank]).reset_index(drop=True)
-    final_rank_df = postprocess_override_by_config(final_rank_df, dt_version)
+    final_rank_df = postprocess_override_by_config(final_rank_df, gh_df, dt_version)
     check_result_data(final_rank_df)
 
     final_df = join_video_info(final_rank_df)
@@ -400,15 +405,27 @@ def build_and_transfer_delta_mode(account_df, dt_version, dry_run):
 
 
 def join_video_info(df):
-    odps_instance = get_odps_instance(ODS_PROJECT)
-    odps_df = odps.DataFrame(df)
-
-    video_df = get_dataframe_from_odps('videoods', 'wx_video')
+    db = MysqlHelper(CONFIG.MYSQL_INFO)
+    video_ids = df['video_id'].unique().tolist()
+    video_ids_str = ','.join([str(x) for x in video_ids])
+    sql = f"""
+        SELECT id as video_id, title, cover_img_path FROM wx_video
+        WHERE id in ({video_ids_str})
+    """
+    rows = db.get_data(sql, DictCursor)
+    video_df = pd.DataFrame(rows)
     video_df['cover_url'] = video_df['cover_img_path'] + CDN_IMG_OPERATOR
-    video_df = video_df['id', 'title', 'cover_url']
-    final_df = odps_df.join(video_df, on=('video_id', 'id'))
+    final_df = df.merge(video_df, on='video_id')
+
+    # odps_instance = get_odps_instance(ODS_PROJECT)
+    # odps_df = odps.DataFrame(df)
+
+    # video_df = get_dataframe_from_odps('videoods', 'wx_video')
+    # video_df['cover_url'] = video_df['cover_img_path'] + CDN_IMG_OPERATOR
+    # video_df = video_df['id', 'title', 'cover_url']
+    # final_df = odps_df.join(video_df, on=('video_id', 'id'))
+    # final_df = final_df.to_pandas()
 
-    final_df = final_df.to_pandas()
     final_df = final_df[['strategy_key', 'dt_version', 'gh_id', 'sort', 'video_id', 'title', 'cover_url', 'score']]
     return final_df
 

+ 4 - 4
my_config.py

@@ -2431,7 +2431,7 @@ class DevelopmentConfig(BaseConfig):
         'user': 'wx2016_longvideo',
         'password': 'wx2016_longvideoP@assword1234',
         'db': 'longvideo',
-        'charset': 'utf8'
+        'charset': 'utf8mb4'
     }
 
     MYSQL_CRAWLER_INFO = {
@@ -2541,7 +2541,7 @@ class TestConfig(BaseConfig):
         'user': 'wx2016_longvideo',
         'password': 'wx2016_longvideoP@assword1234',
         'db': 'longvideo',
-        'charset': 'utf8'
+        'charset': 'utf8mb4'
     }
 
     MYSQL_CRAWLER_INFO = {
@@ -2650,7 +2650,7 @@ class PreProductionConfig(BaseConfig):
         'user': 'wx2016_longvideo',
         'password': 'wx2016_longvideoP@assword1234',
         'db': 'longvideo',
-        'charset': 'utf8'
+        'charset': 'utf8mb4'
     }
 
     MYSQL_CRAWLER_INFO = {
@@ -2752,7 +2752,7 @@ class ProductionConfig(BaseConfig):
         'user': 'wx2016_longvideo',
         'password': 'wx2016_longvideoP@assword1234',
         'db': 'longvideo',
-        'charset': 'utf8'
+        'charset': 'utf8mb4'
     }
 
     MYSQL_CRAWLER_INFO = {