liqian 3 years ago
parent
commit
644fb84a51
1 changed files with 42 additions and 74 deletions
  1. 42 74
      rule_rank_h_18_19.py

+ 42 - 74
rule_rank_h_18_19.py

@@ -87,9 +87,10 @@ def cal_score(df):
     return df
 
 
-def video_rank(df, now_date, now_h, return_count):
+def video_rank(app_type, df, now_date, now_h, return_count):
     """
     根据回流数量,对视频进行二次排序
+    :param app_type:
     :param df:
     :param now_date:
     :param now_h:
@@ -108,69 +109,36 @@ def video_rank(df, now_date, now_h, return_count):
     h_else_videos = h_else_df['videoid'].to_list
     # 合并,给定分数
     final_videos = h_recall_videos + h_else_videos
+    final_result = {}
+    step = round(100/len(final_videos), 3)
     for i, video_id in enumerate(final_videos):
+        score = 100 - i * step
+        final_result[int(video_id)] = score
+    # 写入对应的redis
+    key_name = \
+        f"{config_.RECALL_KEY_NAME_PREFIX_APP_TYPE}{app_type}.{datetime.datetime.strftime(now_date, '%Y%m%d')}.{now_h}"
+    if len(final_result) > 0:
+        redis_helper = RedisHelper()
+        redis_helper.add_data_with_zset(key_name=key_name, data=final_result, expire_time=23 * 3600)
 
 
-    # 写入对应的redis
-    h_video_ids =[]
-    h_recall_result = {}
-    for video_id in h_recall_videos:
-        score = h_recall_df[h_recall_df['videoid'] == video_id]['score']
-        h_recall_result[int(video_id)] = float(score)
-        h_video_ids.append(int(video_id))
-    h_recall_key_name = \
-        f"{config_.RECALL_KEY_NAME_PREFIX_BY_H}{return_count}.{datetime.datetime.strftime(now_date, '%Y%m%d')}.{now_h}"
-    if len(h_recall_result) > 0:
-        redis_helper.add_data_with_zset(key_name=h_recall_key_name, data=h_recall_result, expire_time=23 * 3600)
-        # 清空线上过滤应用列表
-        redis_helper.del_keys(key_name=f"{config_.H_VIDEO_FILER}{return_count}")
-
-    # 去重更新rov模型结果,并另存为redis中
-    initial_data_dup = {}
-    for video_id, score in initial_data:
-        if int(video_id) not in h_video_ids:
-            initial_data_dup[int(video_id)] = score
-    log_.info(f"initial data dup count = {len(initial_data_dup)}")
-    initial_key_name = \
-        f"{config_.RECALL_KEY_NAME_PREFIX_DUP_H}{return_count}.{datetime.datetime.strftime(now_date, '%Y%m%d')}.{now_h}"
-    if len(initial_data_dup) > 0:
-        redis_helper.add_data_with_zset(key_name=initial_key_name, data=initial_data_dup, expire_time=23 * 3600)
-
-
-    # # 去重合并
-    # final_videos = [int(item) for item in h_recall_videos]
-    # temp_videos = [int(video_id) for video_id, _ in initial_data if int(video_id) not in final_videos]
-    # final_videos = final_videos + temp_videos
-    # log_.info(f'final videos count = {len(final_videos)}')
-    #
-    # # 重新给定score
-    # final_data = {}
-    # for i, video_id in enumerate(final_videos):
-    #     score = 100 - i * config_.ROV_SCORE_D
-    #     final_data[video_id] = score
-    #
-    # # 存入对应的redis
-    # final_key_name = f"{config_.RECALL_KEY_NAME_PREFIX_BY_H}{datetime.datetime.strftime(now_date, '%Y%m%d')}.{now_h}"
-    # redis_helper.add_data_with_zset(key_name=final_key_name, data=final_data, expire_time=24 * 3600)
-
-
-def rank_by_h(now_date, now_h, return_count_list):
+def rank_by_h(app_type, now_date, now_h, return_count_list, project, table):
     # 获取特征数据
-    feature_df = get_feature_data(now_date=now_date)
+    feature_df = get_feature_data(now_date=now_date, project=project, table=table)
     # 计算score
     score_df = cal_score(df=feature_df)
     # rank
     for cnt in return_count_list:
         log_.info(f"return_count = {cnt}")
-        video_rank(df=score_df, now_date=now_date, now_h=now_h, return_count=cnt)
+        video_rank(app_type=app_type, df=score_df, now_date=now_date, now_h=now_h, return_count=cnt)
     # to-csv
-    score_filename = f"score_{datetime.datetime.strftime(now_date, '%Y%m%d%H')}.csv"
+    score_filename = f"score_{app_type}_{datetime.datetime.strftime(now_date, '%Y%m%d%H')}.csv"
     score_df.to_csv(f'./data/{score_filename}')
 
 
-def h_rank_bottom(now_date, now_h, return_count):
+def h_rank_bottom(app_type, now_date, now_h):
     """未按时更新数据,用上一小时结果作为当前小时的数据"""
-    log_.info(f"return_count = {return_count}")
+    log_.info(f"app_type = {app_type}")
     # 获取rov模型结果
     redis_helper = RedisHelper()
     if now_h == 0:
@@ -179,42 +147,40 @@ def h_rank_bottom(now_date, now_h, return_count):
     else:
         redis_dt = datetime.datetime.strftime(now_date, '%Y%m%d')
         redis_h = now_h - 1
-    key_prefix_list = [config_.RECALL_KEY_NAME_PREFIX_BY_H, config_.RECALL_KEY_NAME_PREFIX_DUP_H]
-    for key_prefix in key_prefix_list:
-        key_name = f"{key_prefix}{return_count}.{redis_dt}.{redis_h}"
-        initial_data = redis_helper.get_data_zset_with_index(key_name=key_name, start=0, end=-1, with_scores=True)
-        final_data = dict()
-        for video_id, score in initial_data:
-            final_data[video_id] = score
-        # 存入对应的redis
-        final_key_name = \
-            f"{key_prefix}{return_count}.{datetime.datetime.strftime(now_date, '%Y%m%d')}.{now_h}"
-        if len(final_data) > 0:
-            redis_helper.add_data_with_zset(key_name=final_key_name, data=final_data, expire_time=23 * 3600)
-    # 清空线上过滤应用列表
-    redis_helper.del_keys(key_name=f"{config_.H_VIDEO_FILER}{return_count}")
-
-
-def h_timer_check():
-    return_count_list = [20, 10]
+    key_name = f"{config_.RECALL_KEY_NAME_PREFIX_APP_TYPE}{app_type}.{redis_dt}.{redis_h}"
+    initial_data = redis_helper.get_data_zset_with_index(key_name=key_name, start=0, end=-1, with_scores=True)
+    final_data = dict()
+    for video_id, score in initial_data:
+        final_data[video_id] = score
+    # 存入对应的redis
+    final_key_name = \
+        f"{config_.RECALL_KEY_NAME_PREFIX_APP_TYPE}{app_type}.{datetime.datetime.strftime(now_date, '%Y%m%d')}.{now_h}"
+    if len(final_data) > 0:
+        redis_helper.add_data_with_zset(key_name=final_key_name, data=final_data, expire_time=23 * 3600)
+
+
+def h_timer_check(app_type):
+    log_.info(f"app_type = {app_type}")
+    return_count_list = [20]
     now_date = datetime.datetime.today()
     log_.info(f"now_date: {datetime.datetime.strftime(now_date, '%Y%m%d%H')}")
     now_h = datetime.datetime.now().hour
     now_min = datetime.datetime.now().minute
     if now_h == 0:
-        for cnt in return_count_list:
-            h_rank_bottom(now_date=now_date, now_h=now_h, return_count=cnt)
+        h_rank_bottom(app_type=app_type, now_date=now_date, now_h=now_h)
         return
     # 查看当前小时更新的数据是否已准备好
+    project = config_.PREDICT_PROJECT_18_19[str(app_type)]
+    table = config_.PREDICT_TABLE_18_19[str(app_type)]
     h_data_count = h_data_check(project=project, table=table, now_date=now_date)
     if h_data_count > 0:
         log_.info(f'h_data_count = {h_data_count}')
         # 数据准备好,进行更新
-        rank_by_h(now_date=now_date, now_h=now_h, return_count_list=return_count_list)
+        rank_by_h(app_type=app_type, now_date=now_date, now_h=now_h,
+                  return_count_list=return_count_list, project=project, table=table)
     elif now_min > 50:
         log_.info('h_recall data is None, use bottom data!')
-        for cnt in return_count_list:
-            h_rank_bottom(now_date=now_date, now_h=now_h, return_count=cnt)
+        h_rank_bottom(app_type=app_type, now_date=now_date, now_h=now_h)
     else:
         # 数据没准备好,1分钟后重新检查
         Timer(60, h_timer_check).start()
@@ -225,4 +191,6 @@ if __name__ == '__main__':
     # res = cal_score(df=df1)
     # video_rank(df=res, now_date=datetime.datetime.today())
     # rank_by_h()
-    h_timer_check()
+    app_type_list = [18, 19]
+    for app_type in app_type_list:
+        h_timer_check(app_type=app_type)