1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889 |
- import time
- import datetime
- import pandas as pd
- import math
- import random
- from odps import ODPS
- from threading import Timer
- from get_data import get_data_from_odps
- from db_helper import RedisHelper, MysqlHelper
- from my_config import set_config
- from log import Log
- from my_utils import request_post
- config_, env = set_config()
- log_ = Log()
- def data_check(project, table, now_date):
- """检查数据是否准备好"""
- odps = ODPS(
- access_id=config_.ODPS_CONFIG['ACCESSID'],
- secret_access_key=config_.ODPS_CONFIG['ACCESSKEY'],
- project=project,
- endpoint=config_.ODPS_CONFIG['ENDPOINT'],
- connect_timeout=3000,
- read_timeout=500000,
- pool_maxsize=1000,
- pool_connections=1000
- )
- try:
- dt = datetime.datetime.strftime(now_date, '%Y%m%d')
- sql = f'select * from {project}.{table} where dt = {dt}'
- with odps.execute_sql(sql=sql).open_reader() as reader:
- data_count = reader.count
- except Exception as e:
- data_count = 0
- return data_count
- def get_special_videos(now_date, project, table):
- """获取特殊mid指定的视频列表"""
- # 获取videoId
- dt = datetime.datetime.strftime(now_date, '%Y%m%d')
- records = get_data_from_odps(date=dt, project=project, table=table)
- video_id_list = [record['videoid'] for record in records]
- # 排序合并,随机给定分数
- final_result = {}
- # json_data = []
- for video_id in video_id_list:
- score = random.uniform(0, 100)
- final_result[int(video_id)] = score
- # json_data.append({'videoId': video_id, 'rovScore': score})
- # 写入对应的redis
- key_name = \
- f"{config_.KEY_NAME_PREFIX_SPECIAL_VIDEOS}{datetime.datetime.strftime(now_date, '%Y%m%d')}"
- if len(final_result) > 0:
- redis_helper = RedisHelper()
- redis_helper.add_data_with_zset(key_name=key_name, data=final_result, expire_time=2 * 24 * 3600)
- # 通知后端更新兜底视频数据
- # log_.info('json_data count = {}'.format(len(json_data)))
- # result = request_post(request_url=config_.NOTIFY_BACKEND_updateFallBackVideoList_URL,
- # request_data={'videos': json_data})
- # if result['code'] == 0:
- # log_.info('notify backend updateFallBackVideoList success!')
- # else:
- # log_.error('notify backend updateFallBackVideoList fail!')
- def h_timer_check():
- project = config_.SPECIAL_MID_VIDEOS_PROJECT.get('videos')
- table = config_.SPECIAL_MID_VIDEOS_TABLE.get('videos')
- now_date = datetime.datetime.today()
- log_.info(f"now_date: {datetime.datetime.strftime(now_date, '%Y%m%d')}")
- # 查看当天更新的数据是否已准备好
- data_count = data_check(project=project, table=table, now_date=now_date)
- if data_count > 0:
- log_.info(f'special_videos_count = {data_count}')
- # 数据准备好,进行更新
- get_special_videos(now_date=now_date, project=project, table=table)
- else:
- # 数据没准备好,1分钟后重新检查
- Timer(5 * 60, h_timer_check).start()
- if __name__ == '__main__':
- h_timer_check()
|