Преглед на файлове

Merge branch 'feature_2023121113_liqian_update_rule_rank_h_by_24h' into test

liqian преди 1 година
родител
ревизия
7af6b5bdb1
променени са 2 файла, в които са добавени 82 реда и са изтрити 35 реда
  1. 1 1
      config.py
  2. 81 34
      rule_rank_h_by_24h.py

+ 1 - 1
config.py

@@ -2787,7 +2787,7 @@ class ProductionConfig(BaseConfig):
 def set_config():
     # 获取环境变量 ROV_OFFLINE_ENV
     env = os.environ.get('ROV_OFFLINE_ENV')
-    print("ROV_OFFLINE_ENV:{}".format(str(env)))
+    # print("ROV_OFFLINE_ENV:{}".format(str(env)))
     # env = 'dev'
     if env is None:
         # log_.error('ENV ERROR: is None!')

+ 81 - 34
rule_rank_h_by_24h.py

@@ -1,4 +1,5 @@
 import pandas as pd
+import multiprocessing
 import math
 import traceback
 from functools import reduce
@@ -303,6 +304,43 @@ def merge_df_with_score(df_left, df_right):
     return df_merged[feature_list]
 
 
+def process_with_param(param, data_params_item, rule_params_item, feature_df, now_date, now_h):
+    log_.info(f"param = {param} start...")
+    score_df_list = []
+    notify_backend = param.get('notify_backend', False)
+    data_key = param.get('data')
+    data_param = data_params_item.get(data_key)
+    log_.info(f"data_key = {data_key}, data_param = {data_param}")
+    rule_key = param.get('rule')
+    rule_param = rule_params_item.get(rule_key)
+    log_.info(f"rule_key = {rule_key}, rule_param = {rule_param}")
+    # cal_score_func = rule_param.get('cal_score_func', 1)
+    merge_func = rule_param.get('merge_func', 1)
+
+    if merge_func == 2:
+        for apptype, weight in data_param.items():
+            df = feature_df[feature_df['apptype'] == apptype]
+            # 计算score
+            score_df = cal_score(df=df, param=rule_param)
+            score_df['score'] = score_df['score'] * weight
+            score_df_list.append(score_df)
+        # 分数合并
+        df_merged = reduce(merge_df_with_score, score_df_list)
+        # 更新平台回流比
+        df_merged['platform_return_rate'] = df_merged['platform_return'] / df_merged['回流人数']
+        video_rank_h(df=df_merged, now_date=now_date, now_h=now_h,
+                     rule_key=rule_key, param=rule_param, data_key=data_key,
+                     notify_backend=notify_backend)
+    else:
+        df_list = [feature_df[feature_df['apptype'] == apptype] for apptype, _ in data_param.items()]
+        df_merged = reduce(merge_df, df_list)
+        score_df = cal_score(df=df_merged, param=rule_param)
+        video_rank_h(df=score_df, now_date=now_date, now_h=now_h,
+                     rule_key=rule_key, param=rule_param, data_key=data_key,
+                     notify_backend=notify_backend)
+    log_.info(f"param = {param} end!")
+
+
 def rank_by_h(now_date, now_h, rule_params, project, table):
     # 获取特征数据
     feature_df = get_feature_data(now_date=now_date, now_h=now_h, project=project, table=table)
@@ -330,40 +368,49 @@ def rank_by_h(now_date, now_h, rule_params, project, table):
         video_rank_h(df=score_df, now_date=now_date, now_h=now_h,
                      rule_key=rule_key, param=rule_param, data_key=data_key)
     """
-
-    for param in rule_params.get('params_list'):
-        score_df_list = []
-        notify_backend = param.get('notify_backend', False)
-        data_key = param.get('data')
-        data_param = data_params_item.get(data_key)
-        log_.info(f"data_key = {data_key}, data_param = {data_param}")
-        rule_key = param.get('rule')
-        rule_param = rule_params_item.get(rule_key)
-        log_.info(f"rule_key = {rule_key}, rule_param = {rule_param}")
-        # cal_score_func = rule_param.get('cal_score_func', 1)
-        merge_func = rule_param.get('merge_func', 1)
-
-        if merge_func == 2:
-            for apptype, weight in data_param.items():
-                df = feature_df[feature_df['apptype'] == apptype]
-                # 计算score
-                score_df = cal_score(df=df, param=rule_param)
-                score_df['score'] = score_df['score'] * weight
-                score_df_list.append(score_df)
-            # 分数合并
-            df_merged = reduce(merge_df_with_score, score_df_list)
-            # 更新平台回流比
-            df_merged['platform_return_rate'] = df_merged['platform_return'] / df_merged['回流人数']
-            video_rank_h(df=df_merged, now_date=now_date, now_h=now_h,
-                         rule_key=rule_key, param=rule_param, data_key=data_key,
-                         notify_backend=notify_backend)
-        else:
-            df_list = [feature_df[feature_df['apptype'] == apptype] for apptype, _ in data_param.items()]
-            df_merged = reduce(merge_df, df_list)
-            score_df = cal_score(df=df_merged, param=rule_param)
-            video_rank_h(df=score_df, now_date=now_date, now_h=now_h,
-                         rule_key=rule_key, param=rule_param, data_key=data_key,
-                         notify_backend=notify_backend)
+    params_list = rule_params.get('params_list')
+    pool = multiprocessing.Pool(processes=len(params_list))
+    for param in params_list:
+        pool.apply_async(
+            func=process_with_param,
+            args=(param, data_params_item, rule_params_item, feature_df, now_date, now_h)
+        )
+    pool.close()
+    pool.join()
+
+    # for param in rule_params.get('params_list'):
+    #     score_df_list = []
+    #     notify_backend = param.get('notify_backend', False)
+    #     data_key = param.get('data')
+    #     data_param = data_params_item.get(data_key)
+    #     log_.info(f"data_key = {data_key}, data_param = {data_param}")
+    #     rule_key = param.get('rule')
+    #     rule_param = rule_params_item.get(rule_key)
+    #     log_.info(f"rule_key = {rule_key}, rule_param = {rule_param}")
+    #     # cal_score_func = rule_param.get('cal_score_func', 1)
+    #     merge_func = rule_param.get('merge_func', 1)
+    #
+    #     if merge_func == 2:
+    #         for apptype, weight in data_param.items():
+    #             df = feature_df[feature_df['apptype'] == apptype]
+    #             # 计算score
+    #             score_df = cal_score(df=df, param=rule_param)
+    #             score_df['score'] = score_df['score'] * weight
+    #             score_df_list.append(score_df)
+    #         # 分数合并
+    #         df_merged = reduce(merge_df_with_score, score_df_list)
+    #         # 更新平台回流比
+    #         df_merged['platform_return_rate'] = df_merged['platform_return'] / df_merged['回流人数']
+    #         video_rank_h(df=df_merged, now_date=now_date, now_h=now_h,
+    #                      rule_key=rule_key, param=rule_param, data_key=data_key,
+    #                      notify_backend=notify_backend)
+    #     else:
+    #         df_list = [feature_df[feature_df['apptype'] == apptype] for apptype, _ in data_param.items()]
+    #         df_merged = reduce(merge_df, df_list)
+    #         score_df = cal_score(df=df_merged, param=rule_param)
+    #         video_rank_h(df=score_df, now_date=now_date, now_h=now_h,
+    #                      rule_key=rule_key, param=rule_param, data_key=data_key,
+    #                      notify_backend=notify_backend)
 
     #     # to-csv
     #     score_filename = f"score_by24h_{key}_{datetime.strftime(now_date, '%Y%m%d%H')}.csv"