浏览代码

同步 1小时

zhangbo 1 年之前
父节点
当前提交
f9d2b10b77
共有 1 个文件被更改,包括 19 次插入15 次删除
  1. 19 15
      alg_recsys_recall_1h_noregion.py

+ 19 - 15
alg_recsys_recall_1h_noregion.py

@@ -186,7 +186,7 @@ def video_rank_h(df, now_date, now_h, rule_key, param, data_key):
     获取符合进入召回源条件的视频
     """
     redis_helper = RedisHelper()
-    log_.info(f"videos_count = {len(df)}")
+    log_.info(f"一共有多少个视频 = {len(df)}")
 
     # videoid重复时,保留分值高
     df = df.sort_values(by=['score'], ascending=False)
@@ -197,14 +197,14 @@ def video_rank_h(df, now_date, now_h, rule_key, param, data_key):
     platform_return_rate = param.get('platform_return_rate', 0)
     h_recall_df = df[df['platform_return_rate'] > platform_return_rate]
     h_recall_videos = h_recall_df['videoid'].to_list()
-    log_.info(f'h_recall videos count = {len(h_recall_videos)}')
+    log_.info(f"回流率-过滤后,一共有多少个视频 = {len(h_recall_videos)}")
 
     # 视频状态过滤
     if data_key in ['data7', ]:
         filtered_videos = filter_video_status_app(h_recall_videos)
     else:
         filtered_videos = filter_video_status(h_recall_videos)
-    log_.info('filtered_videos count = {}'.format(len(filtered_videos)))
+    log_.info(f"视频状态-过滤后,一共有多少个视频 = {len(filtered_videos)}")
 
     # 写入对应的redis
     now_dt = datetime.strftime(now_date, '%Y%m%d')
@@ -215,13 +215,15 @@ def video_rank_h(df, now_date, now_h, rule_key, param, data_key):
         h_recall_result[int(video_id)] = float(score)
         h_video_ids.append(int(video_id))
 
+    # recall:item:score:h:
     h_recall_key_name = \
         f"{config_.RECALL_KEY_NAME_PREFIX_BY_H_H}{data_key}:{rule_key}:{now_dt}:{now_h}"
-
+    log_.info("打印非地域24小时redis key:{}".format(h_recall_key_name))
     if len(h_recall_result) > 0:
-        log_.info(f"count = {len(h_recall_result)}, key = {h_recall_key_name}")
+        log_.info(f"开始写入头部数据:count = {len(h_recall_result)}, key = {h_recall_key_name}")
         redis_helper.add_data_with_zset(key_name=h_recall_key_name, data=h_recall_result, expire_time=2 * 3600)
-
+    else:
+        log_.info(f"无数据,不写入。")
 
 def rank_by_h(now_date, now_h, rule_params, project, table):
     # 获取特征数据
@@ -240,6 +242,8 @@ def rank_by_h(now_date, now_h, rule_params, project, table):
         rule_param = rule_params_item.get(rule_key)
         log_.info(f"rule_key = {rule_key}, rule_param = {rule_param}")
         merge_func = rule_param.get('merge_func', 1)
+        log_.info("数据采用:{},统计采用{}.".format(data_key, rule_key))
+        log_.info("具体的规则是:{}.".format(rule_param))
 
         if merge_func == 2:
             for apptype, weight in data_param.items():
@@ -269,28 +273,28 @@ def h_timer_check():
         table = config_.TABLE_H_APP_TYPE
         rule_params = RULE_PARAMS
         now_date = datetime.today()
-        log_.info(f"now_date: {datetime.strftime(now_date, '%Y%m%d%H')}")
+        log_.info(f"开始执行: {datetime.strftime(now_date, '%Y%m%d%H')}")
         now_min = datetime.now().minute
         now_h = datetime.now().hour
 
         if now_h == 0:
-            log_.info(f'now_h = {now_h} use bottom data!')
+            log_.info("当前时间{}小时,使用bottom的data,开始。".format(now_h))
             h_rank_bottom(now_date=now_date, now_h=now_h, rule_params=rule_params)
-            log_.info(f"h_data end!")
+            log_.info("----------当前时间{}小时,使用bottom的data,完成----------".format(now_h))
             return
         # 查看当前小时级更新的数据是否已准备好
         h_data_count = h_data_check(project=project, table=table, now_date=now_date)
         if h_data_count > 0:
-            log_.info(f'h_data_count = {h_data_count}')
+            log_.info('上游数据表查询数据条数 h_data_count = {},开始计算。'.format(h_data_count))
             # 数据准备好,进行更新
             rank_by_h(now_date=now_date, now_h=now_h, rule_params=rule_params, project=project, table=table)
-            log_.info(f"h_data end!")
+            log_.info("----------正常完成----------")
         elif now_min > 40:
-            log_.info('h_recall data is None, use bottom data!')
+            log_.info('当前分钟超过40,预计执行无法完成,使用 bottom data!')
             h_rank_bottom(now_date=now_date, now_h=now_h, rule_params=rule_params)
-            log_.info(f"h_data end!")
+            log_.info('----------当前分钟超过40,使用bottom的data,完成----------')
         else:
-            # 数据没准备好,1分钟后重新检查
+            log_.info("上游数据未就绪,等待...")
             Timer(60, h_timer_check).start()
 
     except Exception as e:
@@ -305,5 +309,5 @@ def h_timer_check():
 
 
 if __name__ == '__main__':
-    log_.info("文件alg_recsys_recall_24h_noregion.py:「24小时无地域」 开始执行")
+    log_.info("文件alg_recsys_recall_1h_noregion.py:「1小时无地域」 开始执行")
     h_timer_check()