""" Created on Mon Mar 18, 2024 @author: luojunhui Read data from odps and save to json file in local files """ import os import sys import json from tqdm import tqdm from concurrent.futures import ThreadPoolExecutor sys.path.append(os.getcwd()) from functions import PyODPS, generate_hourly_strings, generate_daily_strings class VideoDataGenerator(object): """ 生成训练数据,测试数据 """ def __init__(self): self.oo = PyODPS() def get_hour_data(self, dt): """ 获取小时级的新视频 :param dt: 小时参数 :return: """ sql = f"""select * from loghubods.conten_quality_base_hour where dt = '{dt}';""" hour_data = self.oo.select(sql) result = [] for line in hour_data: obj = { "uid": line["uid"], "video_id": line["videoid"], "type": line["type"], "channel": line["channel"], "fst": line["flowpool_start_type"], "fsl": line["flowpool_start_level"], "fet": line["flowpool_end_type"], "fel": line["flowpool_end_level"], "f_view": line["flowpool_distribute_view_times"], "f_share": line["flowpool_share_times"], "f_return": line["flowpool_return_users"], "f3_view": line["flowpool_3days_distribute_view_times"], "f3_share": line["flowpool_3days_share_times"], "f3_return": line["flowpool_3days_return_users"], "ros_dms": line["ros_dms"], "rov_dms": line["rov_dms"], "ros_sls": line["ros_sls"], "rov_sls": line["rov_sls"], "fans": line["fans"], "view_count_user_30days": line["view_cnt_user_30days"], "share_count_user_30days": line["share_cnt_user_30days"], "return_count_user_30days": line["return_cnt_user_30days"], "rov_user": line["rov_user"], "str_user": line["str_user"], # share / view "out_user_id": line["out_user_id"], "mode": line["strategy"], "out_play_cnt": line["out_play_cnt"], "out_like_cnt": line["out_like_cnt"], "out_share_cnt": line["out_share_cnt"], "out_collection_cnt": line["out_collection_cnt"], "up_level_time_hour": line["up_level_time_hour"], "dt": line["dt"], } result.append(obj) return result def get_daily_data(self, dt): """ 天级表里面存储了视频的表现 label, 通过小时级的 video_id 去获取视频的表现 :param dt: 20240101 :return: data_list """ sql = f"""select * from loghubods.conten_quality_base where dt = '{dt}';""" data = self.oo.select(sql) result = [ { "video_id": item["videoid"], "total_view": item["flowpool_distribute_view_times"], "total_share": item["flowpool_share_times"], "total_return": item["flowpool_return_users"], "3day_view": item["flowpool_3days_distribute_view_times"], "3day_share": item["flowpool_3days_share_times"], "3day_return": item["flowpool_3days_return_users"], "3day_up_level": item["up_level_3_days"], "dt": item["dt"], } for item in data ] return result def save_daily_data(start_date, end_date, save_path): """ 获取日期范围内数据,并且保存到指定路径 :param start_date: :param end_date: :param save_path: :return: """ date_list = generate_daily_strings(start_date, end_date) V = VideoDataGenerator() L = {} for date_str in tqdm(date_list): L[date_str] = {} data_list = V.get_daily_data(date_str) for obj in tqdm(data_list): video_id = obj["video_id"] L[date_str][video_id] = obj with open(save_path, "w") as f: f.write(json.dumps(L, ensure_ascii=False)) def download_hour_video_data(date_str): """ 获取日期参数 :param date_str: :return: """ V = VideoDataGenerator() data_list = V.get_hour_data(date_str) L = [] for obj in data_list: L.append(obj) temp_path = "data/temp_data/hour_{}.json".format(date_str) with open(temp_path, "w") as f: f.write(json.dumps(L, ensure_ascii=False)) def save_hourly_data(start_date, end_date, save_path): """ save hourly data :param start_date: :param end_date: :param save_path: :return: """ print(save_path) date_list = generate_hourly_strings(start_date, end_date) with ThreadPoolExecutor(max_workers=10) as Pool: Pool.map(download_hour_video_data, date_list) # for date_str in tqdm(date_list): # data_list = V.get_hour_data(date_str) # for obj in tqdm(data_list): # L.append(obj) # with open(save_path, "w") as f: # f.write(json.dumps(L, ensure_ascii=False)) if __name__ == "__main__": flag = int(input("请输入标识符,输入 1 获取小时级数据, 输入 2 获取天级数据: \n")) if flag == 1: start = str(input("请输入开始字符串, 格式为 yyyymmddhh:\n")) end = str(input("请输入结束字符串, 格式为 yyyymmddhh: \n")) save_p = "data/hourly-train-{}-{}.json".format(start, end) if len(start) == 10 and len(end) == 10: save_hourly_data(start, end, save_p) else: print("Time format is not ok") elif flag == 2: start = str(input("请输入开始字符串, 格式为 yyyymmdd:\n")) end = str(input("请输入结束字符串, 格式为 yymmdd: \n")) save_p = "data/train_data/daily-label-{}-{}.json".format(start, end) if len(start) == 8 and len(end) == 8: save_daily_data(start, end, save_p) else: print("Time format is not ok") else: print("Input Error ! Make sure your input is 1 or 2!!")