Browse Source

同步 1小时

zhangbo 1 year ago
parent
commit
ef0b2adabe
2 changed files with 68 additions and 170 deletions
  1. 67 170
      alg_recsys_recall_1h_region.py
  2. 1 0
      check_video_limit_distribute.py

+ 67 - 170
alg_recsys_recall_1h_region.py

@@ -29,11 +29,27 @@ RULE_PARAMS = {
             'view_type': 'video-show-region', 'platform_return_rate': 0.001,
             'region_24h_rule_key': 'rule66', '24h_rule_key': 'rule66'
         },
+        'rule67': {
+          'view_type': 'video-show-region', 'platform_return_rate': 0.001,
+        'region_24h_rule_key': 'rule66', '24h_rule_key': 'rule66', 'h_rule_key': 'rule66'
+         },
+         'rule68': {
+             'view_type': 'video-show-region', 'platform_return_rate': 0.001,
+             'region_24h_rule_key': 'rule66', '24h_rule_key': 'rule66',
+             'score_func': 'back_rate_exponential_weighting1'
+         },
+         'rule69': {
+             'view_type': 'video-show-region', 'platform_return_rate': 0.001,
+             'region_24h_rule_key': 'rule66', '24h_rule_key': 'rule66',
+         },
     },
     'data_params': config_.DATA_PARAMS,
     'params_list': [
         # 532
         {'data': 'data66', 'rule': 'rule66'},
+        {'data': 'data66', 'rule': 'rule67'},  # 523->510
+        {'data': 'data66', 'rule': 'rule68'},  # 523->514
+        {'data': 'data66', 'rule': 'rule69'},  # 523->518
     ],
 }
 
@@ -616,75 +632,77 @@ def video_rank(df, now_date, now_h, rule_key, param, region, data_key, rule_rank
     h_recall_df = h_recall_df.drop_duplicates(subset=['videoid'], keep='first')
     h_recall_df['videoid'] = h_recall_df['videoid'].astype(int)
 
+    log_.info(f"各种规则过滤后,一共有多少个视频 = {len(h_recall_df)}")
     # 增加打捞的优质视频
     if add_videos_with_pre_h is True:
         add_func = param.get('add_func', None)
         h_recall_df = add_videos(initial_df=h_recall_df, now_date=now_date, rule_key=rule_key,
                                  region=region, data_key=data_key, hour_count=hour_count, top=10, add_func=add_func)
-
+        log_.info(f"打捞优质视频完成")
     h_recall_videos = h_recall_df['videoid'].to_list()
-    # log_.info(f'h_recall videos count = {len(h_recall_videos)}')
-
+    log_.info(f"各种规则增加后,一共有多少个视频 = {len(h_recall_df)}")
     # 视频状态过滤
     if data_key in ['data7', ]:
         filtered_videos = filter_video_status_app(h_recall_videos)
     else:
         filtered_videos = filter_video_status(h_recall_videos)
-    # log_.info('filtered_videos count = {}'.format(len(filtered_videos)))
 
     # 屏蔽视频过滤
     shield_config = param.get('shield_config', config_.SHIELD_CONFIG)
     shield_key_name_list = shield_config.get(region, None)
     if shield_key_name_list is not None:
         filtered_videos = filter_shield_video(video_ids=filtered_videos, shield_key_name_list=shield_key_name_list)
-        # log_.info(f"shield filtered_videos count = {len(filtered_videos)}")
 
     # 涉政视频过滤
     political_filter = param.get('political_filter', None)
     if political_filter is True:
-        # log_.info(f"political filter videos count = {len(filtered_videos)}")
         filtered_videos = filter_political_videos(video_ids=filtered_videos)
-        # log_.info(f"political filtered videos count = {len(filtered_videos)}")
+    log_.info(f"视频状态-涉政等-过滤后,一共有多少个视频 = {len(filtered_videos)}")
+
 
-    # 写入对应的redis
     h_video_ids = []
     by_30day_rule_key = param.get('30day_rule_key', None)
     if by_30day_rule_key is not None:
         # 与相对30天列表去重
         h_video_ids = get_day_30day_videos(now_date=now_date, data_key=data_key, rule_key=by_30day_rule_key)
-        # log_.info(f"h_video_ids count = {len(h_video_ids)}")
         if h_video_ids is not None:
             filtered_videos = [video_id for video_id in filtered_videos if int(video_id) not in h_video_ids]
-            # log_.info(f"filtered_videos count = {len(filtered_videos)}")
 
+    # 写入对应的redis
     h_recall_result = {}
     for video_id in filtered_videos:
         score = h_recall_df[h_recall_df['videoid'] == video_id]['score']
-        # print(score)
         h_recall_result[int(video_id)] = float(score)
         h_video_ids.append(int(video_id))
     h_recall_key_name = \
         f"{config_.RECALL_KEY_NAME_PREFIX_REGION_BY_H}{region}:{data_key}:{rule_key}:" \
         f"{datetime.datetime.strftime(now_date, '%Y%m%d')}:{now_h}"
+    log_.info("打印地域1小时的某个地域{},redis key:{}".format(region, h_recall_key_name))
     if len(h_recall_result) > 0:
-        # log_.info(f"h_recall_result count = {len(h_recall_result)}")
+        log_.info(f"开始写入头部数据:count = {len(h_recall_result)}, key = {h_recall_key_name}")
         redis_helper.add_data_with_zset(key_name=h_recall_key_name, data=h_recall_result, expire_time=2 * 24 * 3600)
         # 限流视频score调整
-        update_limit_video_score(initial_videos=h_recall_result, key_name=h_recall_key_name)
+        tmp = update_limit_video_score(initial_videos=h_recall_result, key_name=h_recall_key_name)
+        if tmp:
+            log_.info(f"走了限流逻辑后:count = {len(h_recall_result)}, key = {h_recall_key_name}")
+        else:
+            log_.info("走了限流逻辑,但没更改redis,未生效。")
         # 清空线上过滤应用列表
         # redis_helper.del_keys(key_name=f"{config_.REGION_H_VIDEO_FILER}{region}.{app_type}.{data_key}.{rule_key}")
+    else:
+        log_.info(f"无数据,不写入。")
 
-    h_rule_key = param.get('h_rule_key', None)
-    region_24h_rule_key = param.get('region_24h_rule_key', 'rule1')
-    by_24h_rule_key = param.get('24h_rule_key', None)
-    by_48h_rule_key = param.get('48h_rule_key', None)
-    dup_remove = param.get('dup_remove', True)
-    # 与其他召回视频池去重,存入对应的redis
-    dup_to_redis(h_video_ids=h_video_ids, now_date=now_date, now_h=now_h, rule_key=rule_key, h_rule_key=h_rule_key,
-                 region_24h_rule_key=region_24h_rule_key, by_24h_rule_key=by_24h_rule_key,
-                 by_48h_rule_key=by_48h_rule_key, region=region, data_key=data_key,
-                 rule_rank_h_flag=rule_rank_h_flag, political_filter=political_filter,
-                 shield_config=shield_config, dup_remove=dup_remove)
+    # h_rule_key = param.get('h_rule_key', None)
+    # region_24h_rule_key = param.get('region_24h_rule_key', 'rule1')
+    # by_24h_rule_key = param.get('24h_rule_key', None)
+    # by_48h_rule_key = param.get('48h_rule_key', None)
+    # dup_remove = param.get('dup_remove', True)
+    # # 与其他召回视频池去重,存入对应的redis
+    # dup_to_redis(h_video_ids=h_video_ids, now_date=now_date, now_h=now_h, rule_key=rule_key, h_rule_key=h_rule_key,
+    #              region_24h_rule_key=region_24h_rule_key, by_24h_rule_key=by_24h_rule_key,
+    #              by_48h_rule_key=by_48h_rule_key, region=region, data_key=data_key,
+    #              rule_rank_h_flag=rule_rank_h_flag, political_filter=political_filter,
+    #              shield_config=shield_config, dup_remove=dup_remove)
 
 
 def dup_data(h_video_ids, initial_key_name, dup_key_name, region, political_filter, shield_config, dup_remove):
@@ -836,15 +854,14 @@ def merge_df_with_score(df_left, df_right):
 
 def process_with_region(region, df_merged, data_key, rule_key, rule_param, now_date, now_h,
                         rule_rank_h_flag, add_videos_with_pre_h, hour_count):
-    log_.info(f"region = {region} start...")
-    # 计算score
+    log_.info(f"多协程的region = {region} 开始执行")
     region_df = df_merged[df_merged['code'] == region]
-    log_.info(f'region = {region}, region_df count = {len(region_df)}')
+    log_.info(f'该区域region = {region}, 下有多少数据量 = {len(region_df)}')
     score_df = cal_score(df=region_df, param=rule_param)
     video_rank(df=score_df, now_date=now_date, now_h=now_h, rule_key=rule_key, param=rule_param,
                region=region, data_key=data_key, rule_rank_h_flag=rule_rank_h_flag,
                add_videos_with_pre_h=add_videos_with_pre_h, hour_count=hour_count)
-    log_.info(f"region = {region} end!")
+    log_.info(f"多协程的region = {region} 完成执行")
 
 
 def process_with_region2(region, df_merged, data_key, rule_key, rule_param, now_date, now_h,
@@ -942,15 +959,13 @@ def copy_data_for_city(region, city_code, data_key, rule_key, now_date, now_h, s
 
 
 def process_with_param(param, data_params_item, rule_params_item, region_code_list, feature_df, now_date, now_h, rule_rank_h_flag):
-    log_.info(f"param = {param} start...")
-
     data_key = param.get('data')
     data_param = data_params_item.get(data_key)
-    log_.info(f"data_key = {data_key}, data_param = {data_param}")
     rule_key = param.get('rule')
     rule_param = rule_params_item.get(rule_key)
-    log_.info(f"rule_key = {rule_key}, rule_param = {rule_param}")
     merge_func = rule_param.get('merge_func', None)
+    log_.info("数据采用:{},统计采用{}.".format(data_key, rule_key))
+    log_.info("具体的规则是:{}.".format(rule_param))
     # 是否在地域小时级数据中增加打捞的优质视频
     add_videos_with_pre_h = rule_param.get('add_videos_with_pre_h', False)
     hour_count = rule_param.get('hour_count', 0)
@@ -987,18 +1002,17 @@ def process_with_param(param, data_params_item, rule_params_item, region_code_li
 
     # 特殊城市视频数据准备
     # 屏蔽视频过滤
-    shield_config = rule_param.get('shield_config', config_.SHIELD_CONFIG)
-    for region, city_list in config_.REGION_CITY_MAPPING.items():
-        t = [
-            gevent.spawn(
-                copy_data_for_city,
-                region, city_code, data_key, rule_key, now_date, now_h, shield_config
-            )
-            for city_code in city_list
-        ]
-        gevent.joinall(t)
-
-    log_.info(f"param = {param} end!")
+    # shield_config = rule_param.get('shield_config', config_.SHIELD_CONFIG)
+    # for region, city_list in config_.REGION_CITY_MAPPING.items():
+    #     t = [
+    #         gevent.spawn(
+    #             copy_data_for_city,
+    #             region, city_code, data_key, rule_key, now_date, now_h, shield_config
+    #         )
+    #         for city_code in city_list
+    #     ]
+    #     gevent.joinall(t)
+    log_.info(f"多进程的 param = {param} 完成执行!")
 
 
 def rank_by_h(project, table, now_date, now_h, rule_params, region_code_list, rule_rank_h_flag):
@@ -1019,80 +1033,6 @@ def rank_by_h(project, table, now_date, now_h, rule_params, region_code_list, ru
 
 
 
-
-    # pool = multiprocessing.Pool(processes=len(config_.APP_TYPE))
-    # for app_type, params in rule_params.items():
-    #     pool.apply_async(func=process_with_app_type,
-    #                      args=(app_type, params, region_code_list, feature_df, now_date, now_h, rule_rank_h_flag))
-    # pool.close()
-    # pool.join()
-
-    """
-    for app_type, params in rule_params.items():
-        log_.info(f"app_type = {app_type} start...")
-        data_params_item = params.get('data_params')
-        rule_params_item = params.get('rule_params')
-        for param in params.get('params_list'):
-            log_.info(f"param = {param} start...")
-            data_key = param.get('data')
-            data_param = data_params_item.get(data_key)
-            log_.info(f"data_key = {data_key}, data_param = {data_param}")
-            df_list = [feature_df[feature_df['apptype'] == apptype] for apptype in data_param]
-            df_merged = reduce(merge_df, df_list)
-            rule_key = param.get('rule')
-            rule_param = rule_params_item.get(rule_key)
-            log_.info(f"rule_key = {rule_key}, rule_param = {rule_param}")
-
-            task_list = []
-            for region in region_code_list:
-                t = Thread(target=process_with_region,
-                           args=(region, df_merged, app_type, data_key, rule_key, rule_param, now_date, now_h)
-                           )
-                t.start()
-                task_list.append(t)
-            for t in task_list:
-                t.join()
-            log_.info(f"param = {param} end!")
-        log_.info(f"app_type = {app_type} end!")
-    """
-
-    # for app_type, params in rule_params.items():
-    #     log_.info(f"app_type = {app_type}")
-    #     for data_key, data_param in params['data_params'].items():
-    #         log_.info(f"data_key = {data_key}, data_param = {data_param}")
-    #         df_list = [feature_df[feature_df['apptype'] == apptype] for apptype in data_param]
-    #         df_merged = reduce(merge_df, df_list)
-    #         for rule_key, rule_param in params['rule_params'].items():
-    #             log_.info(f"rule_key = {rule_key}, rule_param = {rule_param}")
-    #             task_list = [
-    #                 gevent.spawn(process_with_region, region, df_merged, app_type, data_key, rule_key, rule_param, now_date, now_h)
-    #                 for region in region_code_list
-    #             ]
-    #             gevent.joinall(task_list)
-
-    # rank
-    # for key, value in rule_params.items():
-    #     log_.info(f"rule = {key}, param = {value}")
-    #     for region in region_code_list:
-    #         log_.info(f"region = {region}")
-    #         # 计算score
-    #         region_df = feature_df[feature_df['code'] == region]
-    #         log_.info(f'region_df count = {len(region_df)}')
-    #         score_df = cal_score(df=region_df, param=value)
-    #         video_rank(df=score_df, now_date=now_date, now_h=now_h, rule_key=key, param=value, region=region)
-    #         # to-csv
-    #         score_filename = f"score_{region}_{key}_{datetime.datetime.strftime(now_date, '%Y%m%d%H')}.csv"
-    #         score_df.to_csv(f'./data/{score_filename}')
-    #         # to-logs
-    #         log_.info({"date": datetime.datetime.strftime(now_date, '%Y%m%d%H'),
-    #                    "region_code": region,
-    #                    "redis_key_prefix": config_.RECALL_KEY_NAME_PREFIX_REGION_BY_H,
-    #                    "rule_key": key,
-    #                    # "score_df": score_df[['videoid', 'score']]
-    #                    }
-    #                   )
-
-
 def h_bottom_process(param, rule_params_item, region_code_list, key_prefix, redis_dt, redis_h,
                      now_date, now_h, rule_rank_h_flag):
     redis_helper = RedisHelper()
@@ -1166,56 +1106,12 @@ def h_rank_bottom(now_date, now_h, rule_params, region_code_list, rule_rank_h_fl
         )
     pool.close()
     pool.join()
-    # for param in rule_params.get('params_list'):
-    #     data_key = param.get('data')
-    #     rule_key = param.get('rule')
-    #     rule_param = rule_params_item.get(rule_key)
-    #     log_.info(f"data_key = {data_key}, rule_key = {rule_key}, rule_param = {rule_param}")
-    #     region_24h_rule_key = rule_param.get('region_24h_rule_key', 'rule1')
-    #     by_24h_rule_key = rule_param.get('24h_rule_key', None)
-    #     by_48h_rule_key = rule_param.get('48h_rule_key', None)
-    #     # 涉政视频过滤
-    #     political_filter = param.get('political_filter', None)
-    #     # 屏蔽视频过滤
-    #     shield_config = param.get('shield_config', config_.SHIELD_CONFIG)
-    #     for region in region_code_list:
-    #         log_.info(f"region = {region}")
-    #         key_name = f"{key_prefix}{region}:{data_key}:{rule_key}:{redis_dt}:{redis_h}"
-    #         initial_data = redis_helper.get_all_data_from_zset(key_name=key_name, with_scores=True)
-    #         if initial_data is None:
-    #             initial_data = []
-    #         final_data = dict()
-    #         h_video_ids = []
-    #         for video_id, score in initial_data:
-    #             final_data[video_id] = score
-    #             h_video_ids.append(int(video_id))
-    #         # 存入对应的redis
-    #         final_key_name = \
-    #             f"{key_prefix}{region}:{data_key}:{rule_key}:{datetime.datetime.strftime(now_date, '%Y%m%d')}:{now_h}"
-    #         if len(final_data) > 0:
-    #             redis_helper.add_data_with_zset(key_name=final_key_name, data=final_data, expire_time=2 * 24 * 3600)
-    #         # 与其他召回视频池去重,存入对应的redis
-    #         dup_to_redis(h_video_ids=h_video_ids, now_date=now_date, now_h=now_h, rule_key=rule_key,
-    #                      region_24h_rule_key=region_24h_rule_key, region=region,
-    #                      data_key=data_key, by_24h_rule_key=by_24h_rule_key,
-    #                      by_48h_rule_key=by_48h_rule_key, rule_rank_h_flag=rule_rank_h_flag,
-    #                      political_filter=political_filter, shield_config=shield_config)
-    #     # 特殊城市视频数据准备
-    #     for region, city_list in config_.REGION_CITY_MAPPING.items():
-    #         t = [
-    #             gevent.spawn(
-    #                 copy_data_for_city,
-    #                 region, city_code, data_key, rule_key, now_date, now_h, shield_config
-    #             )
-    #             for city_code in city_list
-    #         ]
-    #         gevent.joinall(t)
+
 
 
 def h_timer_check():
     try:
         rule_rank_h_flag = sys.argv[1]
-        # rule_rank_h_flag = '24h'
         if rule_rank_h_flag == '48h':
             rule_params = config_.RULE_PARAMS_REGION_APP_TYPE_48H
         else:
@@ -1224,27 +1120,28 @@ def h_timer_check():
         table = config_.TABLE_REGION_APP_TYPE
         region_code_list = [code for region, code in region_code.items()]
         now_date = datetime.datetime.today()
-        log_.info(f"now_date: {datetime.datetime.strftime(now_date, '%Y%m%d%H')}, rule_rank_h_flag: {rule_rank_h_flag}")
+        log_.info(f"开始执行: {datetime.datetime.strftime(now_date, '%Y%m%d%H')}")
         now_h = datetime.datetime.now().hour
         now_min = datetime.datetime.now().minute
         if now_h == 0:
+            log_.info("当前时间{}小时,使用bottom的data,开始。".format(now_h))
             h_rank_bottom(now_date=now_date, now_h=now_h, rule_params=rule_params, region_code_list=region_code_list,
                           rule_rank_h_flag=rule_rank_h_flag)
-            log_.info(f"region_h_data end!")
+            log_.info("----------当前时间{}小时,使用bottom的data,完成----------".format(now_h))
             return
         # 查看当前小时更新的数据是否已准备好
         h_data_count = h_data_check(project=project, table=table, now_date=now_date)
         if h_data_count > 0:
-            log_.info(f'region_h_data_count = {h_data_count}')
+            log_.info('上游数据表查询数据条数 h_data_count = {},开始计算。'.format(h_data_count))
             # 数据准备好,进行更新
             rank_by_h(now_date=now_date, now_h=now_h, rule_params=rule_params,
                       project=project, table=table, region_code_list=region_code_list, rule_rank_h_flag=rule_rank_h_flag)
-            log_.info(f"region_h_data end!")
+            log_.info("----------正常完成----------")
         elif now_min > 40:
-            log_.info('h_recall data is None, use bottom data!')
+            log_.info('当前分钟超过40,预计执行无法完成,使用 bottom data!')
             h_rank_bottom(now_date=now_date, now_h=now_h, rule_params=rule_params, region_code_list=region_code_list,
                           rule_rank_h_flag=rule_rank_h_flag)
-            log_.info(f"region_h_data end!")
+            log_.info('----------当前分钟超过40,使用bottom的data,完成----------')
         else:
             # 数据没准备好,1分钟后重新检查
             Timer(60, h_timer_check).start()
@@ -1268,5 +1165,5 @@ def h_timer_check():
 
 
 if __name__ == '__main__':
-    log_.info(f"region_h_data start...")
+    log_.info("文件alg_recsys_recall_1h_region.py:「1小时地域」 开始执行")
     h_timer_check()

+ 1 - 0
check_video_limit_distribute.py

@@ -69,6 +69,7 @@ def update_limit_video_score(initial_videos, key_name):
     if len(limit_video_final_score) == 0:
         return
     redis_helper.add_data_with_zset(key_name=key_name, data=limit_video_final_score, expire_time=2 * 24 * 3600)
+    return limit_video_final_score
 
 
 def check_videos_distribute():