zhangbo 1 year ago
parent
commit
bd009f6617
2 changed files with 26 additions and 102 deletions
  1. 21 91
      alg_recsys_recall_24h_region.py
  2. 5 11
      alg_recsys_recall_hour_region_task.sh

+ 21 - 91
alg_recsys_recall_24h_region.py

@@ -23,8 +23,7 @@ region_code = config_.REGION_CODE
 
 RULE_PARAMS = {
     'rule_params': {
-        'rule66': {'view_type': 'video-show', 'return_count': 21, 'score_rule': 0,
-                  'platform_return_rate': 0.001},
+        'rule66': {'view_type': 'video-show', 'return_count': 21, 'score_rule': 0, 'platform_return_rate': 0.001},
     },
     'data_params': config_.DATA_PARAMS,
     'params_list': [
@@ -165,38 +164,35 @@ def video_rank(df, now_date, now_h, rule_key, param, region, data_key):
     platform_return_rate = param.get('platform_return_rate', 0)
     h_recall_df = df[(df['lastday_return'] >= return_count) & (df['score'] >= score_value)
                      & (df['platform_return_rate'] >= platform_return_rate)]
-    log_.info(f'h_recall_df count = {len(h_recall_df)}')
     # videoid重复时,保留分值高
     h_recall_df = h_recall_df.sort_values(by=['score'], ascending=False)
     h_recall_df = h_recall_df.drop_duplicates(subset=['videoid'], keep='first')
     h_recall_df['videoid'] = h_recall_df['videoid'].astype(int)
     h_recall_videos = h_recall_df['videoid'].to_list()
-    log_.info(f'h_recall_videos count = {len(h_recall_videos)}')
-    log_.info('h_recall_videos:{}'.format('-'.join([str(i) for i in h_recall_videos])))
-
+    log_.info(f"各种规则过滤后,一共有多少个视频 = {len(h_recall_videos)}")
     # 视频状态过滤
     if data_key in ['data7', ]:
         filtered_videos = filter_video_status_app(h_recall_videos)
     else:
         filtered_videos = filter_video_status(h_recall_videos)
-    log_.info('filtered_videos count = {}'.format(len(filtered_videos)))
+    log_.info(f"视频状态-过滤后,一共有多少个视频 = {len(filtered_videos)}")
 
     # 写入对应的redis
     h_video_ids = []
     day_recall_result = {}
     for video_id in filtered_videos:
         score = h_recall_df[h_recall_df['videoid'] == video_id]['score']
-        # print(score)
         day_recall_result[int(video_id)] = float(score)
         h_video_ids.append(int(video_id))
     day_recall_key_name = \
         f"{config_.RECALL_KEY_NAME_PREFIX_REGION_BY_24H}{region}:{data_key}:{rule_key}:" \
         f"{datetime.datetime.strftime(now_date, '%Y%m%d')}:{now_h}"
-    log_.info("day_recall_result.type:{}".format(str(type(day_recall_result))))
-    log_.info("begin to write redis for day_recall_key_name:{} with {}".format(day_recall_key_name,
-                                                                               str(len(day_recall_result))))
+    log_.info("打印地域24小时的某个地域{},redis key:{}".format(region, day_recall_key_name))
     if len(day_recall_result) > 0:
+        log_.info(f"开始写入头部数据:count = {len(day_recall_result)}, key = {day_recall_key_name}")
         redis_helper.add_data_with_zset(key_name=day_recall_key_name, data=day_recall_result, expire_time=2 * 3600)
+    else:
+        log_.info(f"无数据,不写入。")
         # 清空线上过滤应用列表
         # redis_helper.del_keys(key_name=f"{config_.REGION_H_VIDEO_FILER_24H}{region}.{app_type}.{data_key}.{rule_key}")
 
@@ -238,14 +234,13 @@ def merge_df_with_score(df_left, df_right):
 
 
 def process_with_region(region, df_merged, data_key, rule_key, rule_param, now_date, now_h):
-    log_.info(f"region = {region} start...")
-    # 计算score
+    log_.info(f"多协程的region = {region} 开始执行")
     region_df = df_merged[df_merged['code'] == region]
-    log_.info(f'region = {region}, region_df count = {len(region_df)}')
+    log_.info(f'该区域region = {region}, 下有多少数据量 = {len(region_df)}')
     score_df = cal_score(df=region_df, param=rule_param)
     video_rank(df=score_df, now_date=now_date, now_h=now_h, region=region,
                rule_key=rule_key, param=rule_param, data_key=data_key)
-    log_.info(f"region = {region} end!")
+    log_.info(f"多协程的region = {region} 完成执行")
 
 
 def process_with_region2(region, df_merged, data_key, rule_key, rule_param, now_date, now_h):
@@ -281,14 +276,14 @@ def process_with_app_type(app_type, params, region_code_list, feature_df, now_da
 
 
 def process_with_param(param, data_params_item, rule_params_item, region_code_list, feature_df, now_date, now_h):
-    log_.info(f"param = {param} start...")
     data_key = param.get('data')
     data_param = data_params_item.get(data_key)
-    log_.info(f"data_key = {data_key}, data_param = {data_param}")
     rule_key = param.get('rule')
     rule_param = rule_params_item.get(rule_key)
-    log_.info(f"rule_key = {rule_key}, rule_param = {rule_param}")
     merge_func = rule_param.get('merge_func', None)
+    log_.info("数据采用:{},统计采用{}.".format(data_key, rule_key))
+    log_.info("具体的规则是:{}.".format(rule_param))
+
     if merge_func == 2:
         score_df_list = []
         for apptype, weight in data_param.items():
@@ -314,7 +309,7 @@ def process_with_param(param, data_params_item, rule_params_item, region_code_li
         ]
 
     gevent.joinall(task_list)
-    log_.info(f"param = {param} end!")
+    log_.info(f"多进程的 param = {param} 完成执行!")
 
 
 def rank_by_24h(project, table, now_date, now_h, rule_params, region_code_list):
@@ -343,42 +338,6 @@ def rank_by_24h(project, table, now_date, now_h, rule_params, region_code_list):
     pool.join()
     """
 
-    # for app_type, params in rule_params.items():
-    #     log_.info(f"app_type = {app_type}")
-    #     for data_key, data_param in params['data_params'].items():
-    #         log_.info(f"data_key = {data_key}, data_param = {data_param}")
-    #         df_list = [feature_df[feature_df['apptype'] == apptype] for apptype in data_param]
-    #         df_merged = reduce(merge_df, df_list)
-    #         for rule_key, rule_param in params['rule_params'].items():
-    #             log_.info(f"rule_key = {rule_key}, rule_param = {rule_param}")
-    #             task_list = [
-    #                 gevent.spawn(process_with_region, region, df_merged, app_type, data_key, rule_key, rule_param,
-    #                              now_date, now_h)
-    #                 for region in region_code_list
-    #             ]
-    #             gevent.joinall(task_list)
-
-
-    # for key, value in rule_params.items():
-    #     log_.info(f"rule = {key}, param = {value}")
-    #     for region in region_code_list:
-    #         log_.info(f"region = {region}")
-    #         # 计算score
-    #         region_df = feature_df[feature_df['code'] == region]
-    #         log_.info(f'region_df count = {len(region_df)}')
-    #         score_df = cal_score(df=region_df, param=value)
-    #         video_rank(df=score_df, now_date=now_date, now_h=now_h, rule_key=key, param=value, region=region)
-    #         # to-csv
-    #         score_filename = f"score_24h_{region}_{key}_{datetime.datetime.strftime(now_date, '%Y%m%d%H')}.csv"
-    #         score_df.to_csv(f'./data/{score_filename}')
-    #         # to-logs
-    #         log_.info({"date": datetime.datetime.strftime(now_date, '%Y%m%d%H'),
-    #                    "region_code": region,
-    #                    "redis_key_prefix": config_.RECALL_KEY_NAME_PREFIX_REGION_BY_24H,
-    #                    "rule_key": key,
-    #                    # "score_df": score_df[['videoid', 'score']]
-    #                    })
-
 
 def dup_to_redis(h_video_ids, now_date, now_h, rule_key, region):
     """将地域分组小时级数据与其他召回视频池去重,存入对应的redis"""
@@ -451,35 +410,7 @@ def h_rank_bottom(now_date, now_h, rule_params, region_code_list):
             if len(final_data) > 0:
                 redis_helper.add_data_with_zset(key_name=final_key_name, data=final_data, expire_time=2 * 3600)
 
-    """
-    for app_type, params in rule_params.items():
-        log_.info(f"app_type = {app_type}")
-        for param in params.get('params_list'):
-            data_key = param.get('data')
-            rule_key = param.get('rule')
-            log_.info(f"data_key = {data_key}, rule_key = {rule_key}")
-            for region in region_code_list:
-                log_.info(f"region = {region}")
-                key_name = f"{key_prefix}{region}:{app_type}:{data_key}:{rule_key}:{redis_dt}:{redis_h}"
-                initial_data = redis_helper.get_all_data_from_zset(key_name=key_name, with_scores=True)
-                if initial_data is None:
-                    initial_data = []
-                final_data = dict()
-                h_video_ids = []
-                for video_id, score in initial_data:
-                    final_data[video_id] = score
-                    h_video_ids.append(int(video_id))
-                # 存入对应的redis
-                final_key_name = \
-                    f"{key_prefix}{region}:{app_type}:{data_key}:{rule_key}:{datetime.datetime.strftime(now_date, '%Y%m%d')}:{now_h}"
-                if len(final_data) > 0:
-                    redis_helper.add_data_with_zset(key_name=final_key_name, data=final_data, expire_time=2 * 3600)
-                # 清空线上过滤应用列表
-                # redis_helper.del_keys(key_name=f"{config_.REGION_H_VIDEO_FILER_24H}{region}.{app_type}.{data_key}.{rule_key}")
-
-                # 与其他召回视频池去重,存入对应的redis
-                # dup_to_redis(h_video_ids=h_video_ids, now_date=now_date, now_h=now_h, rule_key=rule_key, region=region)
-    """
+
 
 
 def h_timer_check():
@@ -491,19 +422,18 @@ def h_timer_check():
         now_date = datetime.datetime.today()
         now_h = datetime.datetime.now().hour
         now_min = datetime.datetime.now().minute
-        log_.info(f"now_date: {datetime.datetime.strftime(now_date, '%Y%m%d%H')}")
+        log_.info(f"开始执行: {datetime.datetime.strftime(now_date, '%Y%m%d%H')}")
         # 查看当天更新的数据是否已准备好
         h_data_count = data_check(project=project, table=table, now_date=now_date)
         if h_data_count > 0:
-            log_.info(f'region_24h_data_count = {h_data_count}')
-            # 数据准备好,进行更新
+            log_.info('上游数据表查询数据条数 h_data_count = {},开始计算。'.format(h_data_count))
             rank_by_24h(now_date=now_date, now_h=now_h, rule_params=rule_params,
                         project=project, table=table, region_code_list=region_code_list)
-            log_.info(f"region_24h_data end!")
+            log_.info("----------正常完成----------")
         elif now_min > 40:
-            log_.info('24h_recall data is None, use bottom data!')
+            log_.info('当前分钟超过40,预计执行无法完成,使用 bottom data!')
             h_rank_bottom(now_date=now_date, now_h=now_h, rule_params=rule_params, region_code_list=region_code_list)
-            log_.info(f"region_24h_data end!")
+            log_.info('----------当前分钟超过40,使用bottom的data,完成----------')
         else:
             # 数据没准备好,1分钟后重新检查
             Timer(60, h_timer_check).start()
@@ -520,5 +450,5 @@ def h_timer_check():
 
 
 if __name__ == '__main__':
-    log_.info(f"region_24h_data start...")
+    log_.info("文件alg_recsys_recall_24h_region.py:「24小时地域」 开始执行")
     h_timer_check()

+ 5 - 11
alg_recsys_recall_hour_region_task.sh

@@ -10,20 +10,14 @@ echo "开始执行时间:{$cur_time}-{$cur_h}"
 
 if [[ $ROV_OFFLINE_ENV == 'test' ]]; then
   cd /root/zhangbo/rov-offline
-  /root/anaconda3/bin/python alg_recsys_recall_24h_noregion.py > "logs_dir/alg_recsys_recall_24h_noregion_{$cur_time}_{$cur_h}.log" &
-  /root/anaconda3/bin/python alg_recsys_recall_24h_region.py > "logs_dir/alg_recsys_recall_24h_region_{$cur_time}_{$cur_h}.log"
+  /root/anaconda3/bin/python alg_recsys_recall_24h_noregion.py &
+  /root/anaconda3/bin/python alg_recsys_recall_24h_region.py &
+
   wait
   echo "并行执行时间:{$(date "+%Y-%m-%d %H:%M:%S")}"
-  /root/anaconda3/bin/python alg_recsys_recall_1h_region.py '24h' > "logs_dir/alg_recsys_recall_1h_region_{$cur_time}_{$cur_h}.log"
+  /root/anaconda3/bin/python alg_recsys_recall_1h_region.py '24h'
   echo "结束执行时间:{$(date "+%Y-%m-%d %H:%M:%S")}"
   echo "all done"
 elif [[ $ROV_OFFLINE_ENV == 'pro' ]]; then
-  cd /root/zhangbo/rov-offline
-  /root/anaconda3/bin/python alg_recsys_recall_24h_noregion.py > "logs_dir/alg_recsys_recall_24h_noregion_{$cur_time}_{$cur_h}.log" &
-  /root/anaconda3/bin/python alg_recsys_recall_24h_region.py > "logs_dir/alg_recsys_recall_24h_region_{$cur_time}_{$cur_h}.log"
-  wait
-  echo "并行执行时间:{$(date "+%Y-%m-%d %H:%M:%S")}"
-  /root/anaconda3/bin/python alg_recsys_recall_1h_region.py '24h' > "logs_dir/alg_recsys_recall_1h_region_{$cur_time}_{$cur_h}.log"
-  echo "结束执行时间:{$(date "+%Y-%m-%d %H:%M:%S")}"
-  echo "all done"
+
 fi