religion_videos_update.py 2.4 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374
  1. import datetime
  2. import random
  3. from odps import ODPS
  4. from threading import Timer
  5. from get_data import get_data_from_odps
  6. from db_helper import RedisHelper, MysqlHelper
  7. from config import set_config
  8. from log import Log
  9. config_, env = set_config()
  10. log_ = Log()
  11. def data_check(project, table, now_date):
  12. """检查数据是否准备好"""
  13. odps = ODPS(
  14. access_id=config_.ODPS_CONFIG['ACCESSID'],
  15. secret_access_key=config_.ODPS_CONFIG['ACCESSKEY'],
  16. project=project,
  17. endpoint=config_.ODPS_CONFIG['ENDPOINT'],
  18. connect_timeout=3000,
  19. read_timeout=500000,
  20. pool_maxsize=1000,
  21. pool_connections=1000
  22. )
  23. try:
  24. dt = datetime.datetime.strftime(now_date, '%Y%m%d')
  25. sql = f'select * from {project}.{table} where dt = {dt}'
  26. with odps.execute_sql(sql=sql).open_reader() as reader:
  27. data_count = reader.count
  28. except Exception as e:
  29. data_count = 0
  30. return data_count
  31. def get_religion_videos(now_date, project, table):
  32. """获取宗教视频列表"""
  33. # 获取videoId
  34. dt = datetime.datetime.strftime(now_date, '%Y%m%d')
  35. records = get_data_from_odps(date=dt, project=project, table=table)
  36. video_id_list = [record['videoid'] for record in records]
  37. # 排序合并,随机给定分数
  38. final_result = {}
  39. for video_id in video_id_list:
  40. score = random.uniform(0, 100)
  41. final_result[int(video_id)] = score
  42. # 写入对应的redis
  43. key_name = \
  44. f"{config_.KEY_NAME_PREFIX_SPECIAL_VIDEOS}{datetime.datetime.strftime(now_date, '%Y%m%d')}"
  45. if len(final_result) > 0:
  46. redis_helper = RedisHelper()
  47. redis_helper.add_data_with_zset(key_name=key_name, data=final_result, expire_time=2 * 24 * 3600)
  48. def h_timer_check():
  49. project = config_.RELIGION_VIDEOS_PROJECT
  50. table = config_.RELIGION_VIDEOS_TABLE
  51. now_date = datetime.datetime.today()
  52. log_.info(f"now_date: {datetime.datetime.strftime(now_date, '%Y%m%d')}")
  53. # 查看当天更新的数据是否已准备好
  54. data_count = data_check(project=project, table=table, now_date=now_date)
  55. if data_count > 0:
  56. log_.info(f'religion_videos_count = {data_count}')
  57. # 数据准备好,进行更新
  58. get_religion_videos(now_date=now_date, project=project, table=table)
  59. else:
  60. # 数据没准备好,1分钟后重新检查
  61. Timer(5 * 60, h_timer_check).start()
  62. if __name__ == '__main__':
  63. h_timer_check()