special_mid_videos_update.py 3.1 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889
  1. import time
  2. import datetime
  3. import pandas as pd
  4. import math
  5. import random
  6. from odps import ODPS
  7. from threading import Timer
  8. from get_data import get_data_from_odps
  9. from db_helper import RedisHelper, MysqlHelper
  10. from my_config import set_config
  11. from log import Log
  12. from my_utils import request_post
  13. config_, env = set_config()
  14. log_ = Log()
  15. def data_check(project, table, now_date):
  16. """检查数据是否准备好"""
  17. odps = ODPS(
  18. access_id=config_.ODPS_CONFIG['ACCESSID'],
  19. secret_access_key=config_.ODPS_CONFIG['ACCESSKEY'],
  20. project=project,
  21. endpoint=config_.ODPS_CONFIG['ENDPOINT'],
  22. connect_timeout=3000,
  23. read_timeout=500000,
  24. pool_maxsize=1000,
  25. pool_connections=1000
  26. )
  27. try:
  28. dt = datetime.datetime.strftime(now_date, '%Y%m%d')
  29. sql = f'select * from {project}.{table} where dt = {dt}'
  30. with odps.execute_sql(sql=sql).open_reader() as reader:
  31. data_count = reader.count
  32. except Exception as e:
  33. data_count = 0
  34. return data_count
  35. def get_special_videos(now_date, project, table):
  36. """获取特殊mid指定的视频列表"""
  37. # 获取videoId
  38. dt = datetime.datetime.strftime(now_date, '%Y%m%d')
  39. records = get_data_from_odps(date=dt, project=project, table=table)
  40. video_id_list = [record['videoid'] for record in records]
  41. # 排序合并,随机给定分数
  42. final_result = {}
  43. # json_data = []
  44. for video_id in video_id_list:
  45. score = random.uniform(0, 100)
  46. final_result[int(video_id)] = score
  47. # json_data.append({'videoId': video_id, 'rovScore': score})
  48. # 写入对应的redis
  49. key_name = \
  50. f"{config_.KEY_NAME_PREFIX_SPECIAL_VIDEOS}{datetime.datetime.strftime(now_date, '%Y%m%d')}"
  51. if len(final_result) > 0:
  52. redis_helper = RedisHelper()
  53. redis_helper.add_data_with_zset(key_name=key_name, data=final_result, expire_time=2 * 24 * 3600)
  54. # 通知后端更新兜底视频数据
  55. # log_.info('json_data count = {}'.format(len(json_data)))
  56. # result = request_post(request_url=config_.NOTIFY_BACKEND_updateFallBackVideoList_URL,
  57. # request_data={'videos': json_data})
  58. # if result['code'] == 0:
  59. # log_.info('notify backend updateFallBackVideoList success!')
  60. # else:
  61. # log_.error('notify backend updateFallBackVideoList fail!')
  62. def h_timer_check():
  63. project = config_.SPECIAL_MID_VIDEOS_PROJECT.get('videos')
  64. table = config_.SPECIAL_MID_VIDEOS_TABLE.get('videos')
  65. now_date = datetime.datetime.today()
  66. log_.info(f"now_date: {datetime.datetime.strftime(now_date, '%Y%m%d')}")
  67. # 查看当天更新的数据是否已准备好
  68. data_count = data_check(project=project, table=table, now_date=now_date)
  69. if data_count > 0:
  70. log_.info(f'special_videos_count = {data_count}')
  71. # 数据准备好,进行更新
  72. get_special_videos(now_date=now_date, project=project, table=table)
  73. else:
  74. # 数据没准备好,1分钟后重新检查
  75. Timer(5 * 60, h_timer_check).start()
  76. if __name__ == '__main__':
  77. h_timer_check()