""" Created on Mon Mar 18, 2024 @author: luojunhui Read data from odps and save to json file in local files """ import os import sys import json from tqdm import tqdm sys.path.append(os.getcwd()) from functions import PyODPS, generate_hourly_strings, generate_daily_strings class VideoDataGenerator(object): """ 生成训练数据,测试数据 """ def __init__(self): self.oo = PyODPS() def get_hour_data(self, dt): """ 获取小时级的新视频 :param dt: 小时参数 :return: """ sql = f"""select * from loghubods.conten_quality_base_hour where dt = '{dt}';""" hour_data = self.oo.select(sql) result = [] for line in hour_data: obj = { "uid": line['uid'], "video_id": line['videoid'], "type": line['type'], "channel": line['channel'], "fst": line['flowpool_start_type'], "fsl": line['flowpool_start_level'], "fet": line['flowpool_end_type'], "fel": line['flowpool_end_level'], "f_view": line['flowpool_distribute_view_times'], "f_share": line['flowpool_share_times'], "f_return": line['flowpool_return_users'], "f3_view": line['flowpool_3days_distribute_view_times'], "f3_share": line['flowpool_3days_share_times'], "f3_return": line['flowpool_3days_return_users'], "ros_dms": line['ros_dms'], "rov_dms": line['rov_dms'], "ros_sls": line['ros_sls'], "rov_sls": line['rov_sls'], "fans": line['fans'], "view_count_user_30days": line['view_cnt_user_30days'], "share_count_user_30days": line['share_cnt_user_30days'], "return_count_user_30days": line['return_cnt_user_30days'], "rov_user": line['rov_user'], "str_user": line['str_user'], # share / view "out_user_id": line['out_user_id'], "mode": line['strategy'], "out_play_cnt": line['out_play_cnt'], "out_like_cnt": line['out_like_cnt'], "out_share_cnt": line['out_share_cnt'], "out_collection_cnt": line['out_collection_cnt'], "up_level_time_hour": line['up_level_time_hour'], "dt": line['dt'] } result.append(obj) return result def get_daily_data(self, dt): """ 天级表里面存储了视频的表现 label, 通过小时级的 video_id 去获取视频的表现 :param dt: 20240101 :return: data_list """ sql = f"""select * from loghubods.conten_quality_base where dt = '{dt}';""" data = self.oo.select(sql) result = [ { "video_id": item['videoid'], "total_view": item['flowpool_distribute_view_times'], "total_share": item['flowpool_share_times'], "total_return": item['flowpool_return_users'], "3day_view": item['flowpool_3days_distribute_view_times'], "3day_share": item['flowpool_3days_share_times'], "3day_return": item['flowpool_3days_return_users'], "dt": item['dt'] } for item in data ] return result def save_daily_data(start_date, end_date, save_path): """ 获取日期范围内数据,并且保存到指定路径 :param start_date: :param end_date: :param save_path: :return: """ date_list = generate_daily_strings(start_date, end_date) V = VideoDataGenerator() L = {} for date_str in tqdm(date_list): L[date_str] = {} data_list = V.get_daily_data(date_str) for obj in tqdm(data_list): video_id = obj['video_id'] L[date_str][video_id] = obj with open(save_path, "w") as f: f.write(json.dumps(L, ensure_ascii=False)) def save_hourly_data(start_date, end_date, save_path): """ save hourly data :param start_date: :param end_date: :param save_path: :return: """ date_list = generate_hourly_strings(start_date, end_date) V = VideoDataGenerator() L = [] for date_str in tqdm(date_list): data_list = V.get_hour_data(date_str) for obj in tqdm(data_list): L.append(obj) with open(save_path, "w") as f: f.write(json.dumps(L, ensure_ascii=False)) if __name__ == '__main__': flag = int(input("请输入标识符,输入 1 获取小时级数据, 输入 2 获取天级数据: \n")) if flag == 1: start = str(input("请输入开始字符串, 格式为 yyyymmddhh:\n")) end = str(input("请输入结束字符串, 格式为 yymmddhh: \n")) save_p = "data/hourly-train-{}-{}.json".format(start, end) if len(start) == 10 and len(end) == 10: save_hourly_data(start, end, save_p) else: print("Time format is not ok") elif flag == 2: start = str(input("请输入开始字符串, 格式为 yyyymmdd:\n")) end = str(input("请输入结束字符串, 格式为 yymmdd: \n")) save_p = "data/daily-label-{}-{}.json".format(start, end) if len(start) == 8 and len(end) == 8: save_daily_data(start, end, save_p) else: print("Time format is not ok") else: print("Input Error ! Make sure your input is 1 or 2!!")