liqian 2 éve
szülő
commit
13b98a0544
3 módosított fájl, 106 hozzáadás és 48 törlés
  1. 104 46
      region_rule_rank_h.py
  2. 1 1
      region_rule_rank_h_by24h.py
  3. 1 1
      rule_rank_h_by_24h.py

+ 104 - 46
region_rule_rank_h.py

@@ -715,10 +715,58 @@ def rank_by_h(project, table, now_date, now_h, rule_params, region_code_list, ru
     #                   )
 
 
+def h_bottom_process(param, rule_params_item, region_code_list, key_prefix, redis_dt, redis_h,
+                     now_date, now_h, rule_rank_h_flag):
+    redis_helper = RedisHelper()
+    data_key = param.get('data')
+    rule_key = param.get('rule')
+    rule_param = rule_params_item.get(rule_key)
+    log_.info(f"data_key = {data_key}, rule_key = {rule_key}, rule_param = {rule_param}")
+    region_24h_rule_key = rule_param.get('region_24h_rule_key', 'rule1')
+    by_24h_rule_key = rule_param.get('24h_rule_key', None)
+    by_48h_rule_key = rule_param.get('48h_rule_key', None)
+    # 涉政视频过滤
+    political_filter = param.get('political_filter', None)
+    # 屏蔽视频过滤
+    shield_config = param.get('shield_config', config_.SHIELD_CONFIG)
+    for region in region_code_list:
+        log_.info(f"region = {region}")
+        key_name = f"{key_prefix}{region}:{data_key}:{rule_key}:{redis_dt}:{redis_h}"
+        initial_data = redis_helper.get_all_data_from_zset(key_name=key_name, with_scores=True)
+        if initial_data is None:
+            initial_data = []
+        final_data = dict()
+        h_video_ids = []
+        for video_id, score in initial_data:
+            final_data[video_id] = score
+            h_video_ids.append(int(video_id))
+        # 存入对应的redis
+        final_key_name = \
+            f"{key_prefix}{region}:{data_key}:{rule_key}:{datetime.datetime.strftime(now_date, '%Y%m%d')}:{now_h}"
+        if len(final_data) > 0:
+            redis_helper.add_data_with_zset(key_name=final_key_name, data=final_data, expire_time=2 * 24 * 3600)
+        # 与其他召回视频池去重,存入对应的redis
+        dup_to_redis(h_video_ids=h_video_ids, now_date=now_date, now_h=now_h, rule_key=rule_key,
+                     region_24h_rule_key=region_24h_rule_key, region=region,
+                     data_key=data_key, by_24h_rule_key=by_24h_rule_key,
+                     by_48h_rule_key=by_48h_rule_key, rule_rank_h_flag=rule_rank_h_flag,
+                     political_filter=political_filter, shield_config=shield_config)
+    # 特殊城市视频数据准备
+    for region, city_list in config_.REGION_CITY_MAPPING.items():
+        t = [
+            gevent.spawn(
+                copy_data_for_city,
+                region, city_code, data_key, rule_key, now_date, now_h, shield_config
+            )
+            for city_code in city_list
+        ]
+        gevent.joinall(t)
+
+
 def h_rank_bottom(now_date, now_h, rule_params, region_code_list, rule_rank_h_flag):
     """未按时更新数据,用上一小时结果作为当前小时的数据"""
     # 获取rov模型结果
-    redis_helper = RedisHelper()
+    # redis_helper = RedisHelper()
     if now_h == 0:
         redis_dt = datetime.datetime.strftime(now_date - datetime.timedelta(days=1), '%Y%m%d')
         redis_h = 23
@@ -729,50 +777,59 @@ def h_rank_bottom(now_date, now_h, rule_params, region_code_list, rule_rank_h_fl
     # 以上一小时的地域分组数据作为当前小时的数据
     key_prefix = config_.RECALL_KEY_NAME_PREFIX_REGION_BY_H
     rule_params_item = rule_params.get('rule_params')
-    for param in rule_params.get('params_list'):
-        data_key = param.get('data')
-        rule_key = param.get('rule')
-        rule_param = rule_params_item.get(rule_key)
-        log_.info(f"data_key = {data_key}, rule_key = {rule_key}, rule_param = {rule_param}")
-        region_24h_rule_key = rule_param.get('region_24h_rule_key', 'rule1')
-        by_24h_rule_key = rule_param.get('24h_rule_key', None)
-        by_48h_rule_key = rule_param.get('48h_rule_key', None)
-        # 涉政视频过滤
-        political_filter = param.get('political_filter', None)
-        # 屏蔽视频过滤
-        shield_config = param.get('shield_config', config_.SHIELD_CONFIG)
-        for region in region_code_list:
-            log_.info(f"region = {region}")
-            key_name = f"{key_prefix}{region}:{data_key}:{rule_key}:{redis_dt}:{redis_h}"
-            initial_data = redis_helper.get_all_data_from_zset(key_name=key_name, with_scores=True)
-            if initial_data is None:
-                initial_data = []
-            final_data = dict()
-            h_video_ids = []
-            for video_id, score in initial_data:
-                final_data[video_id] = score
-                h_video_ids.append(int(video_id))
-            # 存入对应的redis
-            final_key_name = \
-                f"{key_prefix}{region}:{data_key}:{rule_key}:{datetime.datetime.strftime(now_date, '%Y%m%d')}:{now_h}"
-            if len(final_data) > 0:
-                redis_helper.add_data_with_zset(key_name=final_key_name, data=final_data, expire_time=2 * 24 * 3600)
-            # 与其他召回视频池去重,存入对应的redis
-            dup_to_redis(h_video_ids=h_video_ids, now_date=now_date, now_h=now_h, rule_key=rule_key,
-                         region_24h_rule_key=region_24h_rule_key, region=region,
-                         data_key=data_key, by_24h_rule_key=by_24h_rule_key,
-                         by_48h_rule_key=by_48h_rule_key, rule_rank_h_flag=rule_rank_h_flag,
-                         political_filter=political_filter, shield_config=shield_config)
-        # 特殊城市视频数据准备
-        for region, city_list in config_.REGION_CITY_MAPPING.items():
-            t = [
-                gevent.spawn(
-                    copy_data_for_city,
-                    region, city_code, data_key, rule_key, now_date, now_h, shield_config
-                )
-                for city_code in city_list
-            ]
-            gevent.joinall(t)
+    params_list = rule_params.get('params_list')
+    pool = multiprocessing.Pool(processes=len(params_list))
+    for param in params_list:
+        pool.apply_async(
+            func=h_bottom_process,
+            args=(param, rule_params_item, region_code_list, key_prefix, redis_dt, redis_h, now_date, now_h, rule_rank_h_flag)
+        )
+    pool.close()
+    pool.join()
+    # for param in rule_params.get('params_list'):
+    #     data_key = param.get('data')
+    #     rule_key = param.get('rule')
+    #     rule_param = rule_params_item.get(rule_key)
+    #     log_.info(f"data_key = {data_key}, rule_key = {rule_key}, rule_param = {rule_param}")
+    #     region_24h_rule_key = rule_param.get('region_24h_rule_key', 'rule1')
+    #     by_24h_rule_key = rule_param.get('24h_rule_key', None)
+    #     by_48h_rule_key = rule_param.get('48h_rule_key', None)
+    #     # 涉政视频过滤
+    #     political_filter = param.get('political_filter', None)
+    #     # 屏蔽视频过滤
+    #     shield_config = param.get('shield_config', config_.SHIELD_CONFIG)
+    #     for region in region_code_list:
+    #         log_.info(f"region = {region}")
+    #         key_name = f"{key_prefix}{region}:{data_key}:{rule_key}:{redis_dt}:{redis_h}"
+    #         initial_data = redis_helper.get_all_data_from_zset(key_name=key_name, with_scores=True)
+    #         if initial_data is None:
+    #             initial_data = []
+    #         final_data = dict()
+    #         h_video_ids = []
+    #         for video_id, score in initial_data:
+    #             final_data[video_id] = score
+    #             h_video_ids.append(int(video_id))
+    #         # 存入对应的redis
+    #         final_key_name = \
+    #             f"{key_prefix}{region}:{data_key}:{rule_key}:{datetime.datetime.strftime(now_date, '%Y%m%d')}:{now_h}"
+    #         if len(final_data) > 0:
+    #             redis_helper.add_data_with_zset(key_name=final_key_name, data=final_data, expire_time=2 * 24 * 3600)
+    #         # 与其他召回视频池去重,存入对应的redis
+    #         dup_to_redis(h_video_ids=h_video_ids, now_date=now_date, now_h=now_h, rule_key=rule_key,
+    #                      region_24h_rule_key=region_24h_rule_key, region=region,
+    #                      data_key=data_key, by_24h_rule_key=by_24h_rule_key,
+    #                      by_48h_rule_key=by_48h_rule_key, rule_rank_h_flag=rule_rank_h_flag,
+    #                      political_filter=political_filter, shield_config=shield_config)
+    #     # 特殊城市视频数据准备
+    #     for region, city_list in config_.REGION_CITY_MAPPING.items():
+    #         t = [
+    #             gevent.spawn(
+    #                 copy_data_for_city,
+    #                 region, city_code, data_key, rule_key, now_date, now_h, shield_config
+    #             )
+    #             for city_code in city_list
+    #         ]
+    #         gevent.joinall(t)
 
 
 def h_timer_check():
@@ -793,6 +850,7 @@ def h_timer_check():
         if now_h == 0:
             h_rank_bottom(now_date=now_date, now_h=now_h, rule_params=rule_params, region_code_list=region_code_list,
                           rule_rank_h_flag=rule_rank_h_flag)
+            log_.info(f"region_h_data end!")
             return
         # 查看当前小时更新的数据是否已准备好
         h_data_count = h_data_check(project=project, table=table, now_date=now_date)
@@ -802,7 +860,7 @@ def h_timer_check():
             rank_by_h(now_date=now_date, now_h=now_h, rule_params=rule_params,
                       project=project, table=table, region_code_list=region_code_list, rule_rank_h_flag=rule_rank_h_flag)
             log_.info(f"region_h_data end!")
-        elif now_min > 50:
+        elif now_min > 40:
             log_.info('h_recall data is None, use bottom data!')
             h_rank_bottom(now_date=now_date, now_h=now_h, rule_params=rule_params, region_code_list=region_code_list,
                           rule_rank_h_flag=rule_rank_h_flag)

+ 1 - 1
region_rule_rank_h_by24h.py

@@ -491,7 +491,7 @@ def h_timer_check():
             rank_by_24h(now_date=now_date, now_h=now_h, rule_params=rule_params,
                         project=project, table=table, region_code_list=region_code_list)
             log_.info(f"region_24h_data end!")
-        elif now_min > 50:
+        elif now_min > 40:
             log_.info('24h_recall data is None, use bottom data!')
             h_rank_bottom(now_date=now_date, now_h=now_h, rule_params=rule_params, region_code_list=region_code_list)
             log_.info(f"region_24h_data end!")

+ 1 - 1
rule_rank_h_by_24h.py

@@ -452,7 +452,7 @@ def h_timer_check():
             # 数据准备好,进行更新
             rank_by_h(now_date=now_date, now_h=now_h, rule_params=rule_params, project=project, table=table)
             log_.info(f"24h_data end!")
-        elif now_min > 45:
+        elif now_min > 40:
             log_.info('h_by24h_recall data is None, use bottom data!')
             h_rank_bottom(now_date=now_date, now_h=now_h, rule_params=rule_params)
             log_.info(f"24h_data end!")