|  | @@ -0,0 +1,105 @@
 | 
	
		
			
				|  |  | +import datetime
 | 
	
		
			
				|  |  | +import sys
 | 
	
		
			
				|  |  | +import traceback
 | 
	
		
			
				|  |  | +import pandas as pd
 | 
	
		
			
				|  |  | +from odps import ODPS
 | 
	
		
			
				|  |  | +from threading import Timer
 | 
	
		
			
				|  |  | +from get_data import get_data_from_odps
 | 
	
		
			
				|  |  | +from utils import send_msg_to_feishu
 | 
	
		
			
				|  |  | +from db_helper import RedisHelper
 | 
	
		
			
				|  |  | +from config import set_config
 | 
	
		
			
				|  |  | +from log import Log
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +config_, env = set_config()
 | 
	
		
			
				|  |  | +log_ = Log()
 | 
	
		
			
				|  |  | +features = ['videoid', 'play_count_total', 'gmt_create']
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +def data_check(project, table, now_date):
 | 
	
		
			
				|  |  | +    """检查数据是否准备好"""
 | 
	
		
			
				|  |  | +    odps = ODPS(
 | 
	
		
			
				|  |  | +        access_id=config_.ODPS_CONFIG['ACCESSID'],
 | 
	
		
			
				|  |  | +        secret_access_key=config_.ODPS_CONFIG['ACCESSKEY'],
 | 
	
		
			
				|  |  | +        project=project,
 | 
	
		
			
				|  |  | +        endpoint=config_.ODPS_CONFIG['ENDPOINT'],
 | 
	
		
			
				|  |  | +        connect_timeout=3000,
 | 
	
		
			
				|  |  | +        read_timeout=500000,
 | 
	
		
			
				|  |  | +        pool_maxsize=1000,
 | 
	
		
			
				|  |  | +        pool_connections=1000
 | 
	
		
			
				|  |  | +    )
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    try:
 | 
	
		
			
				|  |  | +        dt = datetime.datetime.strftime(now_date, '%Y%m%d')
 | 
	
		
			
				|  |  | +        sql = f'select * from {project}.{table} where dt = {dt}'
 | 
	
		
			
				|  |  | +        with odps.execute_sql(sql=sql).open_reader() as reader:
 | 
	
		
			
				|  |  | +            data_count = reader.count
 | 
	
		
			
				|  |  | +    except Exception as e:
 | 
	
		
			
				|  |  | +        data_count = 0
 | 
	
		
			
				|  |  | +    return data_count
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +def get_religion_videos(now_date, project, table, key_name_prefix):
 | 
	
		
			
				|  |  | +    """获取宗教视频列表"""
 | 
	
		
			
				|  |  | +    # 获取videoId
 | 
	
		
			
				|  |  | +    dt = datetime.datetime.strftime(now_date, '%Y%m%d')
 | 
	
		
			
				|  |  | +    records = get_data_from_odps(date=dt, project=project, table=table)
 | 
	
		
			
				|  |  | +    feature_data = []
 | 
	
		
			
				|  |  | +    for record in records:
 | 
	
		
			
				|  |  | +        item = {}
 | 
	
		
			
				|  |  | +        for feature_name in features:
 | 
	
		
			
				|  |  | +            item[feature_name] = record[feature_name]
 | 
	
		
			
				|  |  | +        feature_data.append(item)
 | 
	
		
			
				|  |  | +    feature_df = pd.DataFrame(feature_data)
 | 
	
		
			
				|  |  | +    # 按照发布时间和播放量进行倒序
 | 
	
		
			
				|  |  | +    feature_df = feature_df.sort_values(by=['gmt_create', 'play_count_total'], ascending=False)
 | 
	
		
			
				|  |  | +    print(feature_df)
 | 
	
		
			
				|  |  | +    video_id_list = feature_df['videoid'].to_list()
 | 
	
		
			
				|  |  | +    # 按照排序给定分数
 | 
	
		
			
				|  |  | +    final_result = {}
 | 
	
		
			
				|  |  | +    step = 100 / (len(video_id_list) * 2)
 | 
	
		
			
				|  |  | +    for i, video_id in enumerate(video_id_list):
 | 
	
		
			
				|  |  | +        score = 100 - i * step
 | 
	
		
			
				|  |  | +        final_result[int(video_id)] = score
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    # 写入对应的redis
 | 
	
		
			
				|  |  | +    key_name = \
 | 
	
		
			
				|  |  | +        f"{key_name_prefix}{datetime.datetime.strftime(now_date, '%Y%m%d')}"
 | 
	
		
			
				|  |  | +    if len(final_result) > 0:
 | 
	
		
			
				|  |  | +        redis_helper = RedisHelper()
 | 
	
		
			
				|  |  | +        redis_helper.add_data_with_zset(key_name=key_name, data=final_result, expire_time=2 * 24 * 3600)
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +def timer_check(religion_name):
 | 
	
		
			
				|  |  | +    project = config_.RELIGION_VIDEOS[religion_name]['project']
 | 
	
		
			
				|  |  | +    table = config_.RELIGION_VIDEOS[religion_name]['table']
 | 
	
		
			
				|  |  | +    key_name_prefix = config_.RELIGION_VIDEOS[religion_name]['key_name_prefix']
 | 
	
		
			
				|  |  | +    now_date = datetime.datetime.today()
 | 
	
		
			
				|  |  | +    log_.info(f"now_date: {datetime.datetime.strftime(now_date, '%Y%m%d')}")
 | 
	
		
			
				|  |  | +    # 查看当天更新的数据是否已准备好
 | 
	
		
			
				|  |  | +    data_count = data_check(project=project, table=table, now_date=now_date)
 | 
	
		
			
				|  |  | +    if data_count > 0:
 | 
	
		
			
				|  |  | +        log_.info(f'religion_catholicism_videos_count = {data_count}')
 | 
	
		
			
				|  |  | +        # 数据准备好,进行更新
 | 
	
		
			
				|  |  | +        get_religion_videos(now_date=now_date, project=project, table=table, key_name_prefix=key_name_prefix)
 | 
	
		
			
				|  |  | +    else:
 | 
	
		
			
				|  |  | +        # 数据没准备好,1分钟后重新检查
 | 
	
		
			
				|  |  | +        Timer(5 * 60, timer_check, args=[religion_name]).start()
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +def main():
 | 
	
		
			
				|  |  | +    try:
 | 
	
		
			
				|  |  | +        religion_name = sys.argv[1]
 | 
	
		
			
				|  |  | +        timer_check(religion_name)
 | 
	
		
			
				|  |  | +    except Exception as e:
 | 
	
		
			
				|  |  | +        log_.error(f"宗教视频数据更新失败, exception: {e}, traceback: {traceback.format_exc()}")
 | 
	
		
			
				|  |  | +        send_msg_to_feishu(
 | 
	
		
			
				|  |  | +            webhook=config_.FEISHU_ROBOT['server_robot'].get('webhook'),
 | 
	
		
			
				|  |  | +            key_word=config_.FEISHU_ROBOT['server_robot'].get('key_word'),
 | 
	
		
			
				|  |  | +            msg_text=f"rov-offline{config_.ENV_TEXT} - 宗教视频数据更新失败\n"
 | 
	
		
			
				|  |  | +                     f"exception: {e}\n"
 | 
	
		
			
				|  |  | +                     f"traceback: {traceback.format_exc()}"
 | 
	
		
			
				|  |  | +        )
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +if __name__ == '__main__':
 | 
	
		
			
				|  |  | +    main()
 |