special_mid_videos_update.py 2.5 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576
  1. import time
  2. import datetime
  3. import pandas as pd
  4. import math
  5. import random
  6. from odps import ODPS
  7. from threading import Timer
  8. from get_data import get_data_from_odps
  9. from db_helper import RedisHelper, MysqlHelper
  10. from config import set_config
  11. from log import Log
  12. from utils import filter_video_status_with_applet_rec
  13. config_, env = set_config()
  14. log_ = Log()
  15. def data_check(project, table, now_date):
  16. """检查数据是否准备好"""
  17. odps = ODPS(
  18. access_id=config_.ODPS_CONFIG['ACCESSID'],
  19. secret_access_key=config_.ODPS_CONFIG['ACCESSKEY'],
  20. project=project,
  21. endpoint=config_.ODPS_CONFIG['ENDPOINT'],
  22. connect_timeout=3000,
  23. read_timeout=500000,
  24. pool_maxsize=1000,
  25. pool_connections=1000
  26. )
  27. try:
  28. dt = datetime.datetime.strftime(now_date, '%Y%m%d')
  29. sql = f'select * from {project}.{table} where dt = {dt}'
  30. with odps.execute_sql(sql=sql).open_reader() as reader:
  31. data_count = reader.count
  32. except Exception as e:
  33. data_count = 0
  34. return data_count
  35. def get_special_videos(now_date, project, table):
  36. """获取特殊mid指定的视频列表"""
  37. # 获取videoId
  38. dt = datetime.datetime.strftime(now_date, '%Y%m%d')
  39. records = get_data_from_odps(date=dt, project=project, table=table)
  40. video_id_list = [record['videoid'] for record in records]
  41. # 排序合并,随机给定分数
  42. final_result = {}
  43. for video_id in video_id_list:
  44. final_result[int(video_id)] = random.uniform(0, 100)
  45. # 写入对应的redis
  46. key_name = \
  47. f"{config_.KEY_NAME_PREFIX_SPECIAL_VIDEOS}{datetime.datetime.strftime(now_date, '%Y%m%d')}"
  48. if len(final_result) > 0:
  49. redis_helper = RedisHelper()
  50. redis_helper.add_data_with_zset(key_name=key_name, data=final_result, expire_time=2 * 24 * 3600)
  51. def h_timer_check():
  52. project = config_.SPECIAL_MID_VIDEOS_PROJECT.get('videos')
  53. table = config_.SPECIAL_MID_VIDEOS_TABLE.get('videos')
  54. now_date = datetime.datetime.today()
  55. log_.info(f"now_date: {datetime.datetime.strftime(now_date, '%Y%m%d')}")
  56. # 查看当天更新的数据是否已准备好
  57. data_count = data_check(project=project, table=table, now_date=now_date)
  58. if data_count > 0:
  59. log_.info(f'special_videos_count = {data_count}')
  60. # 数据准备好,进行更新
  61. get_special_videos(now_date=now_date, project=project, table=table)
  62. else:
  63. # 数据没准备好,1分钟后重新检查
  64. Timer(5 * 60, h_timer_check).start()
  65. if __name__ == '__main__':
  66. h_timer_check()