import time import datetime import pandas as pd import math import random from odps import ODPS from threading import Timer from get_data import get_data_from_odps from db_helper import RedisHelper, MysqlHelper from my_config import set_config from log import Log from my_utils import filter_video_status_with_applet_rec config_, env = set_config() log_ = Log() features = [ '视频id', '抓取时间', '进入黑名单时间', '站外播放量', 'praise_count', 'transfer_count' ] def h_data_check(project, table, now_date): """检查数据是否准备好""" odps = ODPS( access_id=config_.ODPS_CONFIG['ACCESSID'], secret_access_key=config_.ODPS_CONFIG['ACCESSKEY'], project=project, endpoint=config_.ODPS_CONFIG['ENDPOINT'], connect_timeout=3000, read_timeout=500000, pool_maxsize=1000, pool_connections=1000 ) try: dt = datetime.datetime.strftime(now_date, '%Y%m%d%H') sql = f'select * from {project}.{table} where dt = {dt}' with odps.execute_sql(sql=sql).open_reader() as reader: data_count = reader.count except Exception as e: data_count = 0 return data_count def get_feature_data(now_date, project, table): """获取特征数据""" dt = datetime.datetime.strftime(now_date, '%Y%m%d%H') # dt = '2022041310' records = get_data_from_odps(date=dt, project=project, table=table) feature_data = [] for record in records: item = {} for feature_name in features: item[feature_name] = record[feature_name] feature_data.append(item) feature_df = pd.DataFrame(feature_data) return feature_df def video_rank(df, now_date, now_h): """ 对视频进行排序 :param df: :param now_date: :param now_h: :return: """ df = df.fillna(0) # 视频状态过滤 log_.info(f'initial_df count = {len(df)}') video_ids = [int(video_id) for video_id in df['视频id']] df['视频id'] = df['视频id'].astype(int) df = df.drop_duplicates(['视频id'], keep=False) log_.info(f'df length = {len(df)}') # 获取待推荐 filtered_result_6 = filter_video_status_with_applet_rec(video_ids=video_ids, applet_rec_status=-6) filtered_df_6 = df[df['视频id'].isin(filtered_result_6)] filtered_df_6 = filtered_df_6.sort_values(by=['站外播放量'], ascending=False) log_.info(f'filtered_df_6 count = {len(filtered_df_6)}') # 获取普通推荐 filtered_result_1 = filter_video_status_with_applet_rec(video_ids=video_ids, applet_rec_status=1) filtered_df_1 = df[df['视频id'].isin(filtered_result_1)] filtered_df_1 = filtered_df_1.sort_values(by=['站外播放量'], ascending=False) log_.info(f'filtered_df_1 count = {len(filtered_df_1)}') # 排序合并,给定分数 merge_df = filtered_df_1.append(filtered_df_6) merge_df = merge_df.drop_duplicates(['视频id'], keep=False) merge_videos = merge_df['视频id'].to_list() final_result = {} if len(merge_videos) > 0: step = round(100 / len(merge_videos), 3) for i, video_id in enumerate(merge_videos): score = 100 - i * step final_result[int(video_id)] = score # 写入对应的redis key_name = \ f"{config_.RECALL_KEY_NAME_PREFIX_WHOLE_MOVIES}{datetime.datetime.strftime(now_date, '%Y%m%d')}.{now_h}" if len(final_result) > 0: redis_helper = RedisHelper() redis_helper.add_data_with_zset(key_name=key_name, data=final_result, expire_time=23 * 3600) def rank_by_h(now_date, now_h, project, table): # 获取特征数据 feature_df = get_feature_data(now_date=now_date, project=project, table=table) # rank video_rank(df=feature_df, now_date=now_date, now_h=now_h) # to-csv # score_filename = f"score_{app_type}_{datetime.datetime.strftime(now_date, '%Y%m%d%H')}.csv" # score_df.to_csv(f'./data/{score_filename}') def h_rank_bottom(now_date, now_h): """未按时更新数据,用上一小时结果作为当前小时的数据""" redis_helper = RedisHelper() if now_h == 0: redis_dt = datetime.datetime.strftime(now_date - datetime.timedelta(days=1), '%Y%m%d') redis_h = 23 else: redis_dt = datetime.datetime.strftime(now_date, '%Y%m%d') redis_h = now_h - 1 # 以上一小时的数据作为当前小时的数据 key_prefix = config_.RECALL_KEY_NAME_PREFIX_WHOLE_MOVIES key_name = f"{key_prefix}{redis_dt}.{redis_h}" initial_data = redis_helper.get_data_zset_with_index(key_name=key_name, start=0, end=-1, with_scores=True) final_data = dict() h_video_ids = [] for video_id, score in initial_data: final_data[video_id] = score h_video_ids.append(int(video_id)) # 存入对应的redis final_key_name = f"{key_prefix}{datetime.datetime.strftime(now_date, '%Y%m%d')}.{now_h}" if len(final_data) > 0: redis_helper.add_data_with_zset(key_name=final_key_name, data=final_data, expire_time=23 * 3600) def h_timer_check(): project = config_.WHOLE_MOVIES_PROJECT table = config_.WHOLE_MOVIES_TABLE now_date = datetime.datetime.today() now_h = datetime.datetime.now().hour now_min = datetime.datetime.now().minute log_.info(f"now_date: {datetime.datetime.strftime(now_date, '%Y%m%d%H')}") # 查看当天更新的数据是否已准备好 h_data_count = h_data_check(project=project, table=table, now_date=now_date) if h_data_count > 0: log_.info(f'whole_movies_data_count = {h_data_count}') # 数据准备好,进行更新 rank_by_h(now_date=now_date, now_h=now_h, project=project, table=table) elif now_min > 50: log_.info('whole_movies data is None, use bottom data!') h_rank_bottom(now_date=now_date, now_h=now_h) else: # 数据没准备好,1分钟后重新检查 Timer(60, h_timer_check).start() if __name__ == '__main__': h_timer_check()