""" Created on Mon Mar 18, 2024 @author: luojunhui Read data from odps and save to json file in local files """ import os import sys import json from tqdm import tqdm from concurrent.futures import ThreadPoolExecutor sys.path.append(os.getcwd()) from functions import PyODPS, generate_hourly_strings, generate_daily_strings from functions import MysqlClient, MySQLClientSpider class VideoDataGenerator(object): """ 生成训练数据,测试数据 """ def __init__(self): self.oo = PyODPS() # self.v_mysql = MysqlClient() self.spider_mysql = MySQLClientSpider() def insert_into_database(self, dt): """ 把小时级数据插入 MySQL 数据库 :return: """ data_list = self.get_hour_data(dt) for obj in tqdm(data_list): insert_sql = f""" INSERT INTO lightgbm_data (video_id, user_id, type, channel, user_fans, user_view_30, user_share_30, user_return_30, user_rov, user_str, out_user_id, spider_mode, out_play_cnt, out_like_cnt, out_share_cnt, hour_dt_str) values ({obj['video_id']}, '{obj['uid']}', '{obj['type']}', '{obj['channel']}', {obj['fans']}, {obj['view_count_user_30days']}, {obj['share_count_user_30days']}, {obj['return_count_user_30days']}, {obj['rov_user']}, {obj['str_user']}, '{obj['out_user_id']}', '{obj['mode']}', {obj['out_play_cnt']}, {obj['out_like_cnt']}, {obj['out_share_cnt']}, '{obj['dt']}'); """ self.spider_mysql.update(insert_sql) def get_hour_data(self, dt): """ 获取小时级的新视频 :param dt: 小时参数 :return: """ sql = f"""select * from loghubods.conten_quality_base_hour where dt = '{dt}';""" hour_data = self.oo.select(sql) result = [] for line in hour_data: obj = { "uid": line["uid"] if line['uid'] else "", "video_id": line["videoid"] if line['videoid'] else "", "type": line["type"] if line['type'] else "", "channel": line["channel"] if line['channel'] else "", "fst": line["flowpool_start_type"], "fsl": line["flowpool_start_level"], "fet": line["flowpool_end_type"], "fel": line["flowpool_end_level"], "f_view": line["flowpool_distribute_view_times"], "f_share": line["flowpool_share_times"], "f_return": line["flowpool_return_users"], "f3_view": line["flowpool_3days_distribute_view_times"], "f3_share": line["flowpool_3days_share_times"], "f3_return": line["flowpool_3days_return_users"], "ros_dms": line["ros_dms"], "rov_dms": line["rov_dms"], "ros_sls": line["ros_sls"], "rov_sls": line["rov_sls"], "fans": line["fans"] if line["fans"] else 0, "view_count_user_30days": line["view_cnt_user_30days"] if line["view_cnt_user_30days"] else 0, "share_count_user_30days": line["share_cnt_user_30days"] if line["share_cnt_user_30days"] else 0, "return_count_user_30days": line["return_cnt_user_30days"] if line["return_cnt_user_30days"] else 0, "rov_user": line["rov_user"] if line["rov_user"] else 0, "str_user": line["str_user"] if line["str_user"] else 0, "out_user_id": line["out_user_id"] if line["out_user_id"] else "", "mode": line["strategy"] if line["strategy"] else "", "out_play_cnt": line["out_play_cnt"] if line["out_play_cnt"] else 0, "out_like_cnt": line["out_like_cnt"] if line["out_like_cnt"] else 0, "out_share_cnt": line["out_share_cnt"] if line["out_share_cnt"] else 0, "out_collection_cnt": line["out_collection_cnt"], "up_level_time_hour": line["up_level_time_hour"], "dt": line["dt"], } result.append(obj) return result def get_daily_data(self, dt): """ 天级表里面存储了视频的表现 label, 通过小时级的 video_id 去获取视频的表现 :param dt: 20240101 :return: data_list """ sql = f"""select * from loghubods.conten_quality_base where dt = '{dt}';""" data = self.oo.select(sql) result = [ { "video_id": item["videoid"], "total_view": item["flowpool_distribute_view_times"], "total_share": item["flowpool_share_times"], "total_return": item["flowpool_return_users"], "3day_view": item["flowpool_3days_distribute_view_times"], "3day_share": item["flowpool_3days_share_times"], "3day_return": item["flowpool_3days_return_users"], "3day_up_level": item["up_level_3_days"], "dt": item["dt"], } for item in data ] return result def get_daily_user_info(self, dt): """ 获取用户的信息以及特征 :param dt: """ sql = f"""select * from loghubods.conten_quality_base_day_new where dt = '{dt}';""" data = self.oo.select(sql) result = [ { "uid": item["uid"], "video_id": item["videoid"], "address": item["city"], "return_3days": item["return_count_user_3days"], "view_3days": item["view_count_user_3days"], "share_3days": item["share_count_3days"], "3day_return_500_videos": item["return_count_user_3days_videocnt"], "30day_return_2000_videos": item["return_count_user_30days_videocnt"], "dt": item["dt"], } for item in data ] return result def save_daily_data(start_date, end_date, save_path): """ 获取日期范围内数据,并且保存到指定路径 :param start_date: :param end_date: :param save_path: :return: """ date_list = generate_daily_strings(start_date, end_date) V = VideoDataGenerator() L = {} for date_str in tqdm(date_list): L[date_str] = {} data_list = V.get_daily_data(date_str) for obj in tqdm(data_list): video_id = obj["video_id"] L[date_str][video_id] = obj with open(save_path, "w") as f: f.write(json.dumps(L, ensure_ascii=False)) def download_hour_video_data(date_str): """ 获取日期参数 :param date_str: :return: """ V = VideoDataGenerator() data_list = V.get_hour_data(date_str) L = [] for obj in tqdm(data_list): L.append(obj) temp_path = "data/temp_data/hour_{}.json".format(date_str) with open(temp_path, "w") as f: f.write(json.dumps(L, ensure_ascii=False)) def save_hourly_data(start_date, end_date): """ save hourly data :param start_date: :param end_date: :param save_path: :return: """ # print(save_path) V = VideoDataGenerator() date_list = generate_hourly_strings(start_date, end_date) for obj in tqdm(date_list): V.insert_into_database(obj) # with ThreadPoolExecutor(max_workers=5) as Pool: # Pool.map(V.insert_into_database, date_list) def save_daily_user_info(start_date, end_date, save_path): """ save daily user_info :param start_date: :param end_date: :param save_path: :return: """ date_list = generate_daily_strings(start_date, end_date) V = VideoDataGenerator() L = {} for date_str in tqdm(date_list): L[date_str] = {} data_list = V.get_daily_user_info(date_str) for obj in tqdm(data_list): video_id = obj["video_id"] L[date_str][video_id] = obj with open(save_path, "w") as f: f.write(json.dumps(L, ensure_ascii=False)) if __name__ == "__main__": flag = int( input( "请输入标识符,输入 1 获取小时级数据\n输入 2 获取天级数据\n输入 3 获取用户信息数据: \n" ) ) if flag == 1: start = str(input("请输入开始字符串, 格式为 yyyymmddhh:\n")) end = str(input("请输入结束字符串, 格式为 yyyymmddhh: \n")) if len(start) == 10 and len(end) == 10: save_hourly_data(start, end) else: print("Time format is not ok") elif flag == 2: start = str(input("请输入开始字符串, 格式为 yyyymmdd:\n")) end = str(input("请输入结束字符串, 格式为 yymmdd: \n")) save_p = "data/train_data/daily-label-{}-{}.json".format(start, end) if len(start) == 8 and len(end) == 8: save_daily_data(start, end, save_p) else: print("Time format is not ok") elif flag == 3: start = str(input("请输入开始字符串, 格式为 yyyymmdd:\n")) end = str(input("请输入结束字符串, 格式为 yymmdd: \n")) save_p = "data/train_data/daily-user-info-{}-{}.json".format(start, end) if len(start) == 8 and len(end) == 8: save_daily_user_info(start, end, save_p) else: print("Time format is not ok") else: print("Input Error ! Make sure your input is 1 or 2 or 3!!")