import time import datetime import pandas as pd import math import random from odps import ODPS from threading import Timer from get_data import get_data_from_odps from db_helper import RedisHelper, MysqlHelper from config import set_config from log import Log from utils import request_post config_, env = set_config() log_ = Log() def data_check(project, table, now_date): """检查数据是否准备好""" odps = ODPS( access_id=config_.ODPS_CONFIG['ACCESSID'], secret_access_key=config_.ODPS_CONFIG['ACCESSKEY'], project=project, endpoint=config_.ODPS_CONFIG['ENDPOINT'], connect_timeout=3000, read_timeout=500000, pool_maxsize=1000, pool_connections=1000 ) try: dt = datetime.datetime.strftime(now_date, '%Y%m%d') sql = f'select * from {project}.{table} where dt = {dt}' with odps.execute_sql(sql=sql).open_reader() as reader: data_count = reader.count except Exception as e: data_count = 0 return data_count def get_special_videos(now_date, project, table): """获取特殊mid指定的视频列表""" # 获取videoId dt = datetime.datetime.strftime(now_date, '%Y%m%d') records = get_data_from_odps(date=dt, project=project, table=table) video_id_list = [record['videoid'] for record in records] # 排序合并,随机给定分数 final_result = {} json_data = [] for video_id in video_id_list: score = random.uniform(0, 100) final_result[int(video_id)] = score json_data.append({'videoId': video_id, 'rovScore': score}) # 写入对应的redis key_name = \ f"{config_.KEY_NAME_PREFIX_SPECIAL_VIDEOS}{datetime.datetime.strftime(now_date, '%Y%m%d')}" if len(final_result) > 0: redis_helper = RedisHelper() redis_helper.add_data_with_zset(key_name=key_name, data=final_result, expire_time=2 * 24 * 3600) # 通知后端更新兜底视频数据 log_.info('json_data count = {}'.format(len(json_data))) result = request_post(request_url=config_.NOTIFY_BACKEND_updateFallBackVideoList_URL, request_data={'videos': json_data}) if result['code'] == 0: log_.info('notify backend updateFallBackVideoList success!') else: log_.error('notify backend updateFallBackVideoList fail!') def h_timer_check(): project = config_.SPECIAL_MID_VIDEOS_PROJECT.get('videos') table = config_.SPECIAL_MID_VIDEOS_TABLE.get('videos') now_date = datetime.datetime.today() log_.info(f"now_date: {datetime.datetime.strftime(now_date, '%Y%m%d')}") # 查看当天更新的数据是否已准备好 data_count = data_check(project=project, table=table, now_date=now_date) if data_count > 0: log_.info(f'special_videos_count = {data_count}') # 数据准备好,进行更新 get_special_videos(now_date=now_date, project=project, table=table) else: # 数据没准备好,1分钟后重新检查 Timer(5 * 60, h_timer_check).start() if __name__ == '__main__': h_timer_check()