""" Created on Mon Mar 18, 2024 @author: luojunhui Read data from odps and save to json file in local files """ import os import sys import json from tqdm import tqdm sys.path.append(os.getcwd()) from functions import PyODPS, generate_hourly_strings, generate_daily_strings from functions import MySQLClientSpider class VideoDataGenerator(object): """ 生成训练数据,测试数据 """ def __init__(self): self.oo = PyODPS() # self.v_mysql = MysqlClient() self.spider_mysql = MySQLClientSpider() def insert_into_database(self, dt): """ 把小时级数据插入 MySQL 数据库 :return: """ data_list = self.get_hour_data(dt) for obj in tqdm(data_list): insert_sql = f""" INSERT INTO lightgbm_data (video_id, user_id, type, channel, user_fans, user_view_30, user_share_30, user_return_30, user_rov, user_str, out_user_id, spider_mode, out_play_cnt, out_like_cnt, out_share_cnt, hour_dt_str) values ({obj['video_id']}, '{obj['uid']}', '{obj['type']}', '{obj['channel']}', {obj['fans']}, {obj['view_count_user_30days']}, {obj['share_count_user_30days']}, {obj['return_count_user_30days']}, {obj['rov_user']}, {obj['str_user']}, '{obj['out_user_id']}', '{obj['mode']}', {obj['out_play_cnt']}, {obj['out_like_cnt']}, {obj['out_share_cnt']}, '{obj['dt']}'); """ self.spider_mysql.update(insert_sql) def get_hour_data(self, dt): """ 获取小时级的新视频 :param dt: 小时参数 :return: """ sql = f"""select * from loghubods.conten_quality_base_hour where dt = '{dt}';""" hour_data = self.oo.select(sql) result = [] for line in hour_data: obj = { "uid": line["uid"] if line['uid'] else "", "video_id": line["videoid"] if line['videoid'] else "", "type": line["type"] if line['type'] else "", "channel": line["channel"] if line['channel'] else "", "fst": line["flowpool_start_type"], "fsl": line["flowpool_start_level"], "fet": line["flowpool_end_type"], "fel": line["flowpool_end_level"], "f_view": line["flowpool_distribute_view_times"], "f_share": line["flowpool_share_times"], "f_return": line["flowpool_return_users"], "f3_view": line["flowpool_3days_distribute_view_times"], "f3_share": line["flowpool_3days_share_times"], "f3_return": line["flowpool_3days_return_users"], "ros_dms": line["ros_dms"], "rov_dms": line["rov_dms"], "ros_sls": line["ros_sls"], "rov_sls": line["rov_sls"], "fans": line["fans"] if line["fans"] else 0, "view_count_user_30days": line["view_cnt_user_30days"] if line["view_cnt_user_30days"] else 0, "share_count_user_30days": line["share_cnt_user_30days"] if line["share_cnt_user_30days"] else 0, "return_count_user_30days": line["return_cnt_user_30days"] if line["return_cnt_user_30days"] else 0, "rov_user": line["rov_user"] if line["rov_user"] else 0, "str_user": line["str_user"] if line["str_user"] else 0, "out_user_id": line["out_user_id"] if line["out_user_id"] else "", "mode": line["strategy"] if line["strategy"] else "", "out_play_cnt": line["out_play_cnt"] if line["out_play_cnt"] else 0, "out_like_cnt": line["out_like_cnt"] if line["out_like_cnt"] else 0, "out_share_cnt": line["out_share_cnt"] if line["out_share_cnt"] else 0, "out_collection_cnt": line["out_collection_cnt"], "up_level_time_hour": line["up_level_time_hour"], "dt": line["dt"], } result.append(obj) return result def get_daily_data(self, dt): """ 天级表里面存储了视频的表现 label, 通过小时级的 video_id 去获取视频的表现 :param dt: 20240101 :return: data_list """ sql = f"""select * from loghubods.conten_quality_base where dt = '{dt}';""" data = self.oo.select(sql) result = [ { "video_id": item["videoid"], "total_view": item["flowpool_distribute_view_times"], "total_share": item["flowpool_share_times"], "total_return": item["flowpool_return_users"], "3day_view": item["flowpool_3days_distribute_view_times"], "3day_share": item["flowpool_3days_share_times"], "3day_return": item["flowpool_3days_return_users"], "3day_up_level": item["up_level_3_days"], 'uplevel': item['uplevel'], "dt": item["dt"], } for item in data ] return result def get_daily_user_info(self, dt): """ 获取用户的信息以及特征 :param dt: """ sql = f"""select * from loghubods.conten_quality_base_day_new where dt = '{dt}';""" data = self.oo.select(sql) result = [ { "uid": item["uid"], "video_id": item["videoid"], "address": item["city"], "return_3days": item["return_count_user_3days"], "view_3days": item["view_count_user_3days"], "share_3days": item["share_count_3days"], "3day_return_500_videos": item["return_count_user_3days_videocnt"], "30day_return_2000_videos": item["return_count_user_30days_videocnt"], "dt": item["dt"], } for item in data ] return result def save_daily_data(start_date, end_date, save_path): """ 获取日期范围内数据,并且保存到指定路径 :param start_date: :param end_date: :param save_path: :return: """ date_list = generate_daily_strings(start_date, end_date) V = VideoDataGenerator() L = {} for date_str in tqdm(date_list): L[date_str] = {} data_list = V.get_daily_data(date_str) for obj in tqdm(data_list): video_id = obj["video_id"] L[date_str][video_id] = obj with open(save_path, "w") as f: f.write(json.dumps(L, ensure_ascii=False)) def download_hour_video_data(date_str): """ 获取日期参数 :param date_str: :return: """ V = VideoDataGenerator() data_list = V.get_hour_data(date_str) L = [] for obj in tqdm(data_list): L.append(obj) temp_path = "data/temp_data/hour_{}.json".format(date_str) with open(temp_path, "w") as f: f.write(json.dumps(L, ensure_ascii=False)) def save_hourly_data(start_date, end_date): """ save hourly data :param start_date: :param end_date: :param save_path: :return: """ # print(save_path) V = VideoDataGenerator() date_list = generate_hourly_strings(start_date, end_date) for obj in tqdm(date_list): V.insert_into_database(obj) # with ThreadPoolExecutor(max_workers=5) as Pool: # Pool.map(V.insert_into_database, date_list) def save_daily_user_info(start_date, end_date, save_path): """ save daily user_info :param start_date: :param end_date: :param save_path: :return: """ date_list = generate_daily_strings(start_date, end_date) V = VideoDataGenerator() L = {} for date_str in tqdm(date_list): L[date_str] = {} data_list = V.get_daily_user_info(date_str) for obj in tqdm(data_list): video_id = obj["video_id"] L[date_str][video_id] = obj with open(save_path, "w") as f: f.write(json.dumps(L, ensure_ascii=False)) if __name__ == "__main__": flag = int( input( "请输入标识符,输入 1 获取小时级数据\n输入 2 获取天级数据\n输入 3 获取用户信息数据: \n" ) ) if flag == 1: start = str(input("请输入开始字符串, 格式为 yyyymmddhh:\n")) end = str(input("请输入结束字符串, 格式为 yyyymmddhh: \n")) if len(start) == 10 and len(end) == 10: save_hourly_data(start, end) else: print("Time format is not ok") elif flag == 2: start = str(input("请输入开始字符串, 格式为 yyyymmdd:\n")) end = str(input("请输入结束字符串, 格式为 yymmdd: \n")) save_p = "data/train_data/daily-label-{}-{}.json".format(start, end) if len(start) == 8 and len(end) == 8: save_daily_data(start, end, save_p) else: print("Time format is not ok") elif flag == 3: start = str(input("请输入开始字符串, 格式为 yyyymmdd:\n")) end = str(input("请输入结束字符串, 格式为 yymmdd: \n")) save_p = "data/train_data/daily-user-info-{}-{}.json".format(start, end) if len(start) == 8 and len(end) == 8: save_daily_user_info(start, end, save_p) else: print("Time format is not ok") else: print("Input Error ! Make sure your input is 1 or 2 or 3!!")