whole_movies_update.py 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126
  1. import time
  2. import datetime
  3. import pandas as pd
  4. import math
  5. import random
  6. from odps import ODPS
  7. from threading import Timer
  8. from get_data import get_data_from_odps
  9. from db_helper import RedisHelper, MysqlHelper
  10. from config import set_config
  11. from log import Log
  12. from utils import filter_video_status_with_applet_rec
  13. config_, env = set_config()
  14. log_ = Log()
  15. features = [
  16. '视频id',
  17. '抓取时间',
  18. '进入黑名单时间',
  19. '站外播放量'
  20. ]
  21. def h_data_check(project, table, now_date):
  22. """检查数据是否准备好"""
  23. odps = ODPS(
  24. access_id=config_.ODPS_CONFIG['ACCESSID'],
  25. secret_access_key=config_.ODPS_CONFIG['ACCESSKEY'],
  26. project=project,
  27. endpoint=config_.ODPS_CONFIG['ENDPOINT'],
  28. connect_timeout=3000,
  29. read_timeout=500000,
  30. pool_maxsize=1000,
  31. pool_connections=1000
  32. )
  33. try:
  34. dt = datetime.datetime.strftime(now_date, '%Y%m%d')
  35. sql = f'select * from {project}.{table} where dt = {dt}'
  36. with odps.execute_sql(sql=sql).open_reader() as reader:
  37. data_count = reader.count
  38. except Exception as e:
  39. data_count = 0
  40. return data_count
  41. def get_feature_data(now_date, project, table):
  42. """获取特征数据"""
  43. dt = datetime.datetime.strftime(now_date, '%Y%m%d')
  44. # dt = '2022041310'
  45. records = get_data_from_odps(date=dt, project=project, table=table)
  46. feature_data = []
  47. for record in records:
  48. item = {}
  49. for feature_name in features:
  50. item[feature_name] = record[feature_name]
  51. feature_data.append(item)
  52. feature_df = pd.DataFrame(feature_data)
  53. return feature_df
  54. def video_rank(app_type, df, now_date, now_h, return_count):
  55. """
  56. 对视频进行排序
  57. :param app_type:
  58. :param df:
  59. :param now_date:
  60. :param now_h:
  61. :param return_count: 小时级数据回流限制数
  62. :return:
  63. """
  64. # 视频状态过滤
  65. log_.info(f'initial_df count = {len(df)}')
  66. video_ids = [int(video_id) for video_id in df['videoid']]
  67. df['videoid'] = df['videoid'].astype(int)
  68. # 获取待推荐
  69. filtered_result_6 = filter_video_status_with_applet_rec(video_ids=video_ids, applet_rec_status=-6)
  70. filtered_df_6 = df[df['videoid'].isin(filtered_result_6)]
  71. filtered_df_6 = filtered_df_6.drop_duplicates(['videoid'], keep=False)
  72. log_.info(f'filtered_df_6 count = {len(filtered_df_6)}')
  73. # 获取普通推荐
  74. filtered_result_1 = filter_video_status_with_applet_rec(video_ids=video_ids, applet_rec_status=1)
  75. filtered_df_1 = df[df['videoid'].isin(filtered_result_1)]
  76. filtered_df_1 = filtered_df_1.drop_duplicates(['videoid'], keep=False)
  77. log_.info(f'filtered_df_1 count = {len(filtered_df_1)}')
  78. log_.info(f'df length = {len(df)}')
  79. # 获取符合进入召回源条件的视频,进入条件:小时级回流>=20 && score>=0.005
  80. h_recall_df = df[(df['lastonehour_return'] >= return_count) & (df['score'] >= 0.005)]
  81. h_recall_videos = h_recall_df['videoid'].to_list()
  82. log_.info(f'h_recall videos count = {len(h_recall_videos)}')
  83. # 不符合进入召回源条件的视频
  84. df = df.append(h_recall_df)
  85. h_else_df = df.drop_duplicates(['videoid'], keep=False)
  86. h_else_df = h_else_df.sort_values(by=['score'], ascending=False)
  87. h_else_videos = h_else_df['videoid'].to_list()
  88. # 合并,给定分数
  89. final_videos = h_recall_videos + h_else_videos
  90. final_result = {}
  91. step = round(100/len(final_videos), 3)
  92. for i, video_id in enumerate(final_videos):
  93. score = 100 - i * step
  94. final_result[int(video_id)] = score
  95. # 写入对应的redis
  96. key_name = \
  97. f"{config_.RECALL_KEY_NAME_PREFIX_APP_TYPE}{app_type}.{datetime.datetime.strftime(now_date, '%Y%m%d')}.{now_h}"
  98. if len(final_result) > 0:
  99. redis_helper = RedisHelper()
  100. redis_helper.add_data_with_zset(key_name=key_name, data=final_result, expire_time=23 * 3600)
  101. def rank_by_h(app_type, now_date, now_h, return_count_list, project, table):
  102. # 获取特征数据
  103. feature_df = get_feature_data(now_date=now_date, project=project, table=table)
  104. # 计算score
  105. score_df = cal_score(df=feature_df)
  106. # rank
  107. for cnt in return_count_list:
  108. log_.info(f"return_count = {cnt}")
  109. video_rank(app_type=app_type, df=score_df, now_date=now_date, now_h=now_h, return_count=cnt)
  110. # to-csv
  111. score_filename = f"score_{app_type}_{datetime.datetime.strftime(now_date, '%Y%m%d%H')}.csv"
  112. score_df.to_csv(f'./data/{score_filename}')