liqian 2 лет назад
Родитель
Сommit
8bea859a42
3 измененных файлов с 91 добавлено и 26 удалено
  1. 7 0
      config.py
  2. 77 26
      whole_movies_update.py
  3. 7 0
      whole_movies_update_task.sh

+ 7 - 0
config.py

@@ -148,6 +148,13 @@ class BaseConfig(object):
     OLD_VIDEOS_PROJECT = 'loghubods'
     OLD_VIDEOS_TABLE = 'xcx_test_video'
 
+    # 完整电影更新使用数据
+    WHOLE_MOVIES_PROJECT = 'loghubods'
+    WHOLE_MOVIES_TABLE = 'whole_movies'
+
+    # 完整电影视频更新结果存放 redis key 前缀,完整格式:'com.weiqu.video.recall.whole.movies.item.{date}.{h}'
+    RECALL_KEY_NAME_PREFIX_WHOLE_MOVIES = 'com.weiqu.video.recall.whole.movies.item.'
+
     # 小程序离线ROV模型结果存放 redis key前缀,完整格式:com.weiqu.video.recall.hot.item.score.{date}
     RECALL_KEY_NAME_PREFIX = 'com.weiqu.video.recall.hot.item.score.'
     # 小程序小时级更新结果存放 redis key前缀,完整格式:com.weiqu.video.recall.item.score.h.{rule_key}.{date}.{h}

+ 77 - 26
whole_movies_update.py

@@ -18,7 +18,9 @@ features = [
     '视频id',
     '抓取时间',
     '进入黑名单时间',
-    '站外播放量'
+    '站外播放量',
+    'praise_count',
+    'transfer_count'
 ]
 
 
@@ -36,7 +38,7 @@ def h_data_check(project, table, now_date):
     )
 
     try:
-        dt = datetime.datetime.strftime(now_date, '%Y%m%d')
+        dt = datetime.datetime.strftime(now_date, '%Y%m%d%H')
         sql = f'select * from {project}.{table} where dt = {dt}'
         with odps.execute_sql(sql=sql).open_reader() as reader:
             data_count = reader.count
@@ -47,7 +49,7 @@ def h_data_check(project, table, now_date):
 
 def get_feature_data(now_date, project, table):
     """获取特征数据"""
-    dt = datetime.datetime.strftime(now_date, '%Y%m%d')
+    dt = datetime.datetime.strftime(now_date, '%Y%m%d%H')
     # dt = '2022041310'
     records = get_data_from_odps(date=dt, project=project, table=table)
     feature_data = []
@@ -60,58 +62,107 @@ def get_feature_data(now_date, project, table):
     return feature_df
 
 
-def video_rank(app_type, df, now_date):
+def video_rank(df, now_date, now_h):
     """
     对视频进行排序
-    :param app_type:
     :param df:
     :param now_date:
+    :param now_h:
     :return:
     """
     df = df.fillna(0)
     # 视频状态过滤
     log_.info(f'initial_df count = {len(df)}')
-    video_ids = [int(video_id) for video_id in df['videoid']]
-    df['videoid'] = df['videoid'].astype(int)
-    df = df.drop_duplicates(['videoid'], keep=False)
+    video_ids = [int(video_id) for video_id in df['视频id']]
+    df['视频id'] = df['视频id'].astype(int)
+    df = df.drop_duplicates(['视频id'], keep=False)
     log_.info(f'df length = {len(df)}')
 
     # 获取待推荐
     filtered_result_6 = filter_video_status_with_applet_rec(video_ids=video_ids, applet_rec_status=-6)
-    filtered_df_6 = df[df['videoid'].isin(filtered_result_6)]
+    filtered_df_6 = df[df['视频id'].isin(filtered_result_6)]
     filtered_df_6 = filtered_df_6.sort_values(by=['站外播放量'], ascending=False)
     log_.info(f'filtered_df_6 count = {len(filtered_df_6)}')
 
     # 获取普通推荐
     filtered_result_1 = filter_video_status_with_applet_rec(video_ids=video_ids, applet_rec_status=1)
-    filtered_df_1 = df[df['videoid'].isin(filtered_result_1)]
+    filtered_df_1 = df[df['视频id'].isin(filtered_result_1)]
     filtered_df_1 = filtered_df_1.sort_values(by=['站外播放量'], ascending=False)
     log_.info(f'filtered_df_1 count = {len(filtered_df_1)}')
 
     # 排序合并,给定分数
     merge_df = filtered_df_1.append(filtered_df_6)
-    merge_df = merge_df.drop_duplicates(['videoid'], keep=False)
-    merge_videos = merge_df['videoid'].to_list()
+    merge_df = merge_df.drop_duplicates(['视频id'], keep=False)
+    merge_videos = merge_df['视频id'].to_list()
     final_result = {}
-    step = round(100 / len(merge_videos), 3)
-    for i, video_id in enumerate(merge_videos):
-        score = 100 - i * step
-        final_result[int(video_id)] = score
+    if len(merge_videos) > 0:
+        step = round(100 / len(merge_videos), 3)
+        for i, video_id in enumerate(merge_videos):
+            score = 100 - i * step
+            final_result[int(video_id)] = score
     # 写入对应的redis
-    # key_name = \
-    #     f"{}{app_type}.{datetime.datetime.strftime(now_date, '%Y%m%d')}"
-    # if len(final_result) > 0:
-    #     redis_helper = RedisHelper()
-    #     redis_helper.add_data_with_zset(key_name=key_name, data=final_result, expire_time=23 * 3600)
+    key_name = \
+        f"{config_.RECALL_KEY_NAME_PREFIX_WHOLE_MOVIES}{datetime.datetime.strftime(now_date, '%Y%m%d')}.{now_h}"
+    if len(final_result) > 0:
+        redis_helper = RedisHelper()
+        redis_helper.add_data_with_zset(key_name=key_name, data=final_result, expire_time=23 * 3600)
 
 
-def rank_by_h(app_type, now_date, now_h, return_count_list, project, table):
+def rank_by_h(now_date, now_h, project, table):
     # 获取特征数据
     feature_df = get_feature_data(now_date=now_date, project=project, table=table)
     # rank
-    for cnt in return_count_list:
-        log_.info(f"return_count = {cnt}")
-        video_rank(app_type=app_type, df=feature_df, now_date=now_date)
+    video_rank(df=feature_df, now_date=now_date, now_h=now_h)
     # to-csv
     # score_filename = f"score_{app_type}_{datetime.datetime.strftime(now_date, '%Y%m%d%H')}.csv"
-    # score_df.to_csv(f'./data/{score_filename}')
+    # score_df.to_csv(f'./data/{score_filename}')
+
+
+def h_rank_bottom(now_date, now_h):
+    """未按时更新数据,用上一小时结果作为当前小时的数据"""
+    redis_helper = RedisHelper()
+    if now_h == 0:
+        redis_dt = datetime.datetime.strftime(now_date - datetime.timedelta(days=1), '%Y%m%d')
+        redis_h = 23
+    else:
+        redis_dt = datetime.datetime.strftime(now_date, '%Y%m%d')
+        redis_h = now_h - 1
+
+    # 以上一小时的数据作为当前小时的数据
+    key_prefix = config_.RECALL_KEY_NAME_PREFIX_WHOLE_MOVIES
+    key_name = f"{key_prefix}{redis_dt}.{redis_h}"
+    initial_data = redis_helper.get_data_zset_with_index(key_name=key_name, start=0, end=-1, with_scores=True)
+    final_data = dict()
+    h_video_ids = []
+    for video_id, score in initial_data:
+        final_data[video_id] = score
+        h_video_ids.append(int(video_id))
+    # 存入对应的redis
+    final_key_name = f"{key_prefix}{datetime.datetime.strftime(now_date, '%Y%m%d')}.{now_h}"
+    if len(final_data) > 0:
+        redis_helper.add_data_with_zset(key_name=final_key_name, data=final_data, expire_time=23 * 3600)
+
+
+def h_timer_check():
+    project = config_.WHOLE_MOVIES_PROJECT
+    table = config_.WHOLE_MOVIES_TABLE
+    now_date = datetime.datetime.today()
+    now_h = datetime.datetime.now().hour
+    now_min = datetime.datetime.now().minute
+    log_.info(f"now_date: {datetime.datetime.strftime(now_date, '%Y%m%d%H')}")
+    # 查看当天更新的数据是否已准备好
+    h_data_count = h_data_check(project=project, table=table, now_date=now_date)
+    if h_data_count > 0:
+        log_.info(f'whole_movies_data_count = {h_data_count}')
+        # 数据准备好,进行更新
+        rank_by_h(now_date=now_date, now_h=now_h, project=project, table=table)
+    elif now_min > 50:
+        log_.info('whole_movies data is None, use bottom data!')
+        h_rank_bottom(now_date=now_date, now_h=now_h)
+    else:
+        # 数据没准备好,1分钟后重新检查
+        Timer(60, h_timer_check).start()
+
+
+if __name__ == '__main__':
+    h_timer_check()

+ 7 - 0
whole_movies_update_task.sh

@@ -0,0 +1,7 @@
+source /etc/profile
+echo $ROV_OFFLINE_ENV
+if [[ $ROV_OFFLINE_ENV == 'test' ]]; then
+    cd /data2/rov-offline && /root/anaconda3/bin/python /data2/rov-offline/whole_movies_update.py
+elif [[ $ROV_OFFLINE_ENV == 'pro' ]]; then
+    cd /data/rov-offline && /root/anaconda3/bin/python /data/rov-offline/whole_movies_update.py
+fi