""" Created on Mon Mar 18, 2024 @author: luojunhui """ import os import sys import json from tqdm import tqdm from datetime import datetime, timedelta sys.path.append(os.getcwd()) from functions.odps_function import PyODPS def generate_hourly_strings(start_date, end_date): """ Generate hourly date_str :param start_date: :param end_date: :return: """ start = datetime.strptime(start_date, '%Y%m%d%H') end = datetime.strptime(end_date, '%Y%m%d%H') current = start date_strings = [] while current <= end: date_strings.append(current.strftime('%Y%m%d%H')) current += timedelta(hours=1) return date_strings def generate_daily_strings(start_date, end_date): """ Generate daily date_str :param start_date: :param end_date: :return: """ start = datetime.strptime(start_date, '%Y%m%d') end = datetime.strptime(end_date, '%Y%m%d') current = start date_strings = [] while current <= end: date_strings.append(current.strftime('%Y%m%d')) current += timedelta(days=1) return date_strings def generate_label_date(now_dt): """ Generate date in 3 days :param now_dt: :return: """ now_date = datetime.strptime(now_dt, "%Y%m%d%H") three_date = now_date + timedelta(days=4) return three_date.strftime("%Y%m%d") class VideoDataGenerator(object): """ 生成训练数据,测试数据 """ def __init__(self): self.oo = PyODPS() def get_hour_data(self, dt): """ 获取小时级的新视频 :param dt: 小时参数 :return: """ sql = f"""select * from loghubods.conten_quality_base_hour where dt = '{dt}';""" hour_data = self.oo.select(sql) result = [] for line in hour_data: obj = { "uid": line['uid'], "video_id": line['videoid'], "type": line['type'], "channel": line['channel'], "fst": line['flowpool_start_type'], "fsl": line['flowpool_start_level'], "fet": line['flowpool_end_type'], "fel": line['flowpool_end_level'], "f_view": line['flowpool_distribute_view_times'], "f_share": line['flowpool_share_times'], "f_return": line['flowpool_return_users'], "f3_view": line['flowpool_3days_distribute_view_times'], "f3_share": line['flowpool_3days_share_times'], "f3_return": line['flowpool_3days_return_users'], "ros_dms": line['ros_dms'], "rov_dms": line['rov_dms'], "ros_sls": line['ros_sls'], "rov_sls": line['rov_sls'], "fans": line['fans'], "view_count_user_30days": line['view_cnt_user_30days'], "share_count_user_30days": line['share_cnt_user_30days'], "return_count_user_30days": line['return_cnt_user_30days'], "rov_user": line['rov_user'], "str_user": line['str_user'], # share / view "out_user_id": line['out_user_id'], "mode": line['strategy'], "out_play_cnt": line['out_play_cnt'], "out_like_cnt": line['out_like_cnt'], "out_share_cnt": line['out_share_cnt'], "out_collection_cnt": line['out_collection_cnt'], "up_level_time_hour": line['up_level_time_hour'], "dt": line['dt'] } result.append(obj) return result def get_daily_data(self, dt): """ 天级表里面存储了视频的表现 label, 通过小时级的 video_id 去获取视频的表现 :param dt: 20240101 :return: data_list """ sql = f"""select * from loghubods.conten_quality_base where dt = '{dt}';""" data = self.oo.select(sql) result = [ { "video_id": item['videoid'], "total_view": item['flowpool_distribute_view_times'], "total_share": item['flowpool_share_times'], "total_return": item['flowpool_return_users'], "3day_view": item['flowpool_3days_distribute_view_times'], "3day_share": item['flowpool_3days_share_times'], "3day_return": item['flowpool_3days_return_users'], "dt": item['dt'] } for item in data ] return result if __name__ == '__main__': date_list = generate_hourly_strings("2024010100", "2024013134") V = VideoDataGenerator() L = {} # for date_str in tqdm(date_list): # L[date_str] = {} # data_list = V.get_hour_data(date_str) # for obj in tqdm(data_list): # video_id = obj['video_id'] # L[date_str][video_id] = obj with open('../data/january_train.json', 'w') as f: f.write(json.dumps(L, ensure_ascii=False, indent=4))