religion_videos_update.py 2.9 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485
  1. import datetime
  2. import pandas as pd
  3. from odps import ODPS
  4. from threading import Timer
  5. from get_data import get_data_from_odps
  6. from db_helper import RedisHelper
  7. from config import set_config
  8. from log import Log
  9. config_, env = set_config()
  10. log_ = Log()
  11. features = ['videoid', 'play_count', 'dt']
  12. def data_check(project, table, now_date):
  13. """检查数据是否准备好"""
  14. odps = ODPS(
  15. access_id=config_.ODPS_CONFIG['ACCESSID'],
  16. secret_access_key=config_.ODPS_CONFIG['ACCESSKEY'],
  17. project=project,
  18. endpoint=config_.ODPS_CONFIG['ENDPOINT'],
  19. connect_timeout=3000,
  20. read_timeout=500000,
  21. pool_maxsize=1000,
  22. pool_connections=1000
  23. )
  24. try:
  25. dt = datetime.datetime.strftime(now_date, '%Y%m%d')
  26. sql = f'select * from {project}.{table} where dt = {dt}'
  27. with odps.execute_sql(sql=sql).open_reader() as reader:
  28. data_count = reader.count
  29. except Exception as e:
  30. data_count = 0
  31. return data_count
  32. def get_religion_videos(now_date, project, table):
  33. """获取宗教视频列表"""
  34. # 获取videoId
  35. dt = datetime.datetime.strftime(now_date, '%Y%m%d')
  36. records = get_data_from_odps(date=dt, project=project, table=table)
  37. feature_data = []
  38. for record in records:
  39. item = {}
  40. for feature_name in features:
  41. item[feature_name] = record[feature_name]
  42. feature_data.append(item)
  43. feature_df = pd.DataFrame(feature_data)
  44. # 按照发布时间和播放量进行倒序
  45. feature_df = feature_df.sort_values(by=['dt', 'play_count'], ascending=False)
  46. video_id_list = feature_df['videoid'].to_list()
  47. # 按照排序给定分数
  48. final_result = {}
  49. step = 100 / (len(video_id_list) * 2)
  50. for i, video_id in enumerate(video_id_list):
  51. score = 100 - i * step
  52. final_result[int(video_id)] = score
  53. # 写入对应的redis
  54. key_name = \
  55. f"{config_.KEY_NAME_PREFIX_RELIGION_VIDEOS}{datetime.datetime.strftime(now_date, '%Y%m%d')}"
  56. if len(final_result) > 0:
  57. redis_helper = RedisHelper()
  58. redis_helper.add_data_with_zset(key_name=key_name, data=final_result, expire_time=2 * 24 * 3600)
  59. def h_timer_check():
  60. project = config_.RELIGION_VIDEOS_PROJECT
  61. table = config_.RELIGION_VIDEOS_TABLE
  62. now_date = datetime.datetime.today()
  63. log_.info(f"now_date: {datetime.datetime.strftime(now_date, '%Y%m%d')}")
  64. # 查看当天更新的数据是否已准备好
  65. data_count = data_check(project=project, table=table, now_date=now_date)
  66. if data_count > 0:
  67. log_.info(f'religion_videos_count = {data_count}')
  68. # 数据准备好,进行更新
  69. get_religion_videos(now_date=now_date, project=project, table=table)
  70. else:
  71. # 数据没准备好,1分钟后重新检查
  72. Timer(5 * 60, h_timer_check).start()
  73. if __name__ == '__main__':
  74. h_timer_check()