123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105 |
- import datetime
- import sys
- import traceback
- import pandas as pd
- from odps import ODPS
- from threading import Timer
- from get_data import get_data_from_odps
- from utils import send_msg_to_feishu
- from db_helper import RedisHelper
- from config import set_config
- from log import Log
- config_, env = set_config()
- log_ = Log()
- features = ['videoid', 'play_count_total', 'gmt_create']
- def data_check(project, table, now_date):
- """检查数据是否准备好"""
- odps = ODPS(
- access_id=config_.ODPS_CONFIG['ACCESSID'],
- secret_access_key=config_.ODPS_CONFIG['ACCESSKEY'],
- project=project,
- endpoint=config_.ODPS_CONFIG['ENDPOINT'],
- connect_timeout=3000,
- read_timeout=500000,
- pool_maxsize=1000,
- pool_connections=1000
- )
- try:
- dt = datetime.datetime.strftime(now_date, '%Y%m%d')
- sql = f'select * from {project}.{table} where dt = {dt}'
- with odps.execute_sql(sql=sql).open_reader() as reader:
- data_count = reader.count
- except Exception as e:
- data_count = 0
- return data_count
- def get_religion_videos(now_date, project, table, key_name_prefix):
- """获取宗教视频列表"""
- # 获取videoId
- dt = datetime.datetime.strftime(now_date, '%Y%m%d')
- records = get_data_from_odps(date=dt, project=project, table=table)
- feature_data = []
- for record in records:
- item = {}
- for feature_name in features:
- item[feature_name] = record[feature_name]
- feature_data.append(item)
- feature_df = pd.DataFrame(feature_data)
- # 按照发布时间和播放量进行倒序
- feature_df = feature_df.sort_values(by=['gmt_create', 'play_count_total'], ascending=False)
- print(feature_df)
- video_id_list = feature_df['videoid'].to_list()
- # 按照排序给定分数
- final_result = {}
- step = 100 / (len(video_id_list) * 2)
- for i, video_id in enumerate(video_id_list):
- score = 100 - i * step
- final_result[int(video_id)] = score
- # 写入对应的redis
- key_name = \
- f"{key_name_prefix}{datetime.datetime.strftime(now_date, '%Y%m%d')}"
- if len(final_result) > 0:
- redis_helper = RedisHelper()
- redis_helper.add_data_with_zset(key_name=key_name, data=final_result, expire_time=2 * 24 * 3600)
- def timer_check(religion_name):
- project = config_.RELIGION_VIDEOS[religion_name]['project']
- table = config_.RELIGION_VIDEOS[religion_name]['table']
- key_name_prefix = config_.RELIGION_VIDEOS[religion_name]['key_name_prefix']
- now_date = datetime.datetime.today()
- log_.info(f"now_date: {datetime.datetime.strftime(now_date, '%Y%m%d')}")
- # 查看当天更新的数据是否已准备好
- data_count = data_check(project=project, table=table, now_date=now_date)
- if data_count > 0:
- log_.info(f'religion_name = {religion_name}, religion_videos_count = {data_count}')
- # 数据准备好,进行更新
- get_religion_videos(now_date=now_date, project=project, table=table, key_name_prefix=key_name_prefix)
- else:
- # 数据没准备好,1分钟后重新检查
- Timer(5 * 60, timer_check, args=[religion_name]).start()
- def main():
- try:
- religion_name = sys.argv[1]
- timer_check(religion_name)
- except Exception as e:
- log_.error(f"宗教视频数据更新失败, exception: {e}, traceback: {traceback.format_exc()}")
- send_msg_to_feishu(
- webhook=config_.FEISHU_ROBOT['server_robot'].get('webhook'),
- key_word=config_.FEISHU_ROBOT['server_robot'].get('key_word'),
- msg_text=f"rov-offline{config_.ENV_TEXT} - 宗教视频数据更新失败\n"
- f"exception: {e}\n"
- f"traceback: {traceback.format_exc()}"
- )
- if __name__ == '__main__':
- main()
|