whole_movies_update.py 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168
  1. import time
  2. import datetime
  3. import pandas as pd
  4. import math
  5. import random
  6. from odps import ODPS
  7. from threading import Timer
  8. from get_data import get_data_from_odps
  9. from db_helper import RedisHelper, MysqlHelper
  10. from my_config import set_config
  11. from log import Log
  12. from my_utils import filter_video_status_with_applet_rec
  13. config_, env = set_config()
  14. log_ = Log()
  15. features = [
  16. '视频id',
  17. '抓取时间',
  18. '进入黑名单时间',
  19. '站外播放量',
  20. 'praise_count',
  21. 'transfer_count'
  22. ]
  23. def h_data_check(project, table, now_date):
  24. """检查数据是否准备好"""
  25. odps = ODPS(
  26. access_id=config_.ODPS_CONFIG['ACCESSID'],
  27. secret_access_key=config_.ODPS_CONFIG['ACCESSKEY'],
  28. project=project,
  29. endpoint=config_.ODPS_CONFIG['ENDPOINT'],
  30. connect_timeout=3000,
  31. read_timeout=500000,
  32. pool_maxsize=1000,
  33. pool_connections=1000
  34. )
  35. try:
  36. dt = datetime.datetime.strftime(now_date, '%Y%m%d%H')
  37. sql = f'select * from {project}.{table} where dt = {dt}'
  38. with odps.execute_sql(sql=sql).open_reader() as reader:
  39. data_count = reader.count
  40. except Exception as e:
  41. data_count = 0
  42. return data_count
  43. def get_feature_data(now_date, project, table):
  44. """获取特征数据"""
  45. dt = datetime.datetime.strftime(now_date, '%Y%m%d%H')
  46. # dt = '2022041310'
  47. records = get_data_from_odps(date=dt, project=project, table=table)
  48. feature_data = []
  49. for record in records:
  50. item = {}
  51. for feature_name in features:
  52. item[feature_name] = record[feature_name]
  53. feature_data.append(item)
  54. feature_df = pd.DataFrame(feature_data)
  55. return feature_df
  56. def video_rank(df, now_date, now_h):
  57. """
  58. 对视频进行排序
  59. :param df:
  60. :param now_date:
  61. :param now_h:
  62. :return:
  63. """
  64. df = df.fillna(0)
  65. # 视频状态过滤
  66. log_.info(f'initial_df count = {len(df)}')
  67. video_ids = [int(video_id) for video_id in df['视频id']]
  68. df['视频id'] = df['视频id'].astype(int)
  69. df = df.drop_duplicates(['视频id'], keep=False)
  70. log_.info(f'df length = {len(df)}')
  71. # 获取待推荐
  72. filtered_result_6 = filter_video_status_with_applet_rec(video_ids=video_ids, applet_rec_status=-6)
  73. filtered_df_6 = df[df['视频id'].isin(filtered_result_6)]
  74. filtered_df_6 = filtered_df_6.sort_values(by=['站外播放量'], ascending=False)
  75. log_.info(f'filtered_df_6 count = {len(filtered_df_6)}')
  76. # 获取普通推荐
  77. filtered_result_1 = filter_video_status_with_applet_rec(video_ids=video_ids, applet_rec_status=1)
  78. filtered_df_1 = df[df['视频id'].isin(filtered_result_1)]
  79. filtered_df_1 = filtered_df_1.sort_values(by=['站外播放量'], ascending=False)
  80. log_.info(f'filtered_df_1 count = {len(filtered_df_1)}')
  81. # 排序合并,给定分数
  82. merge_df = filtered_df_1.append(filtered_df_6)
  83. merge_df = merge_df.drop_duplicates(['视频id'], keep=False)
  84. merge_videos = merge_df['视频id'].to_list()
  85. final_result = {}
  86. if len(merge_videos) > 0:
  87. step = round(100 / len(merge_videos), 3)
  88. for i, video_id in enumerate(merge_videos):
  89. score = 100 - i * step
  90. final_result[int(video_id)] = score
  91. # 写入对应的redis
  92. key_name = \
  93. f"{config_.RECALL_KEY_NAME_PREFIX_WHOLE_MOVIES}{datetime.datetime.strftime(now_date, '%Y%m%d')}.{now_h}"
  94. if len(final_result) > 0:
  95. redis_helper = RedisHelper()
  96. redis_helper.add_data_with_zset(key_name=key_name, data=final_result, expire_time=23 * 3600)
  97. def rank_by_h(now_date, now_h, project, table):
  98. # 获取特征数据
  99. feature_df = get_feature_data(now_date=now_date, project=project, table=table)
  100. # rank
  101. video_rank(df=feature_df, now_date=now_date, now_h=now_h)
  102. # to-csv
  103. # score_filename = f"score_{app_type}_{datetime.datetime.strftime(now_date, '%Y%m%d%H')}.csv"
  104. # score_df.to_csv(f'./data/{score_filename}')
  105. def h_rank_bottom(now_date, now_h):
  106. """未按时更新数据,用上一小时结果作为当前小时的数据"""
  107. redis_helper = RedisHelper()
  108. if now_h == 0:
  109. redis_dt = datetime.datetime.strftime(now_date - datetime.timedelta(days=1), '%Y%m%d')
  110. redis_h = 23
  111. else:
  112. redis_dt = datetime.datetime.strftime(now_date, '%Y%m%d')
  113. redis_h = now_h - 1
  114. # 以上一小时的数据作为当前小时的数据
  115. key_prefix = config_.RECALL_KEY_NAME_PREFIX_WHOLE_MOVIES
  116. key_name = f"{key_prefix}{redis_dt}.{redis_h}"
  117. initial_data = redis_helper.get_data_zset_with_index(key_name=key_name, start=0, end=-1, with_scores=True)
  118. final_data = dict()
  119. h_video_ids = []
  120. for video_id, score in initial_data:
  121. final_data[video_id] = score
  122. h_video_ids.append(int(video_id))
  123. # 存入对应的redis
  124. final_key_name = f"{key_prefix}{datetime.datetime.strftime(now_date, '%Y%m%d')}.{now_h}"
  125. if len(final_data) > 0:
  126. redis_helper.add_data_with_zset(key_name=final_key_name, data=final_data, expire_time=23 * 3600)
  127. def h_timer_check():
  128. project = config_.WHOLE_MOVIES_PROJECT
  129. table = config_.WHOLE_MOVIES_TABLE
  130. now_date = datetime.datetime.today()
  131. now_h = datetime.datetime.now().hour
  132. now_min = datetime.datetime.now().minute
  133. log_.info(f"now_date: {datetime.datetime.strftime(now_date, '%Y%m%d%H')}")
  134. # 查看当天更新的数据是否已准备好
  135. h_data_count = h_data_check(project=project, table=table, now_date=now_date)
  136. if h_data_count > 0:
  137. log_.info(f'whole_movies_data_count = {h_data_count}')
  138. # 数据准备好,进行更新
  139. rank_by_h(now_date=now_date, now_h=now_h, project=project, table=table)
  140. elif now_min > 50:
  141. log_.info('whole_movies data is None, use bottom data!')
  142. h_rank_bottom(now_date=now_date, now_h=now_h)
  143. else:
  144. # 数据没准备好,1分钟后重新检查
  145. Timer(60, h_timer_check).start()
  146. if __name__ == '__main__':
  147. h_timer_check()