123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174 |
- """
- Created on Mon Mar 18, 2024
- @author: luojunhui
- Read data from odps and save to json file in local files
- """
- import os
- import sys
- import json
- from tqdm import tqdm
- from concurrent.futures import ThreadPoolExecutor
- sys.path.append(os.getcwd())
- from functions import PyODPS, generate_hourly_strings, generate_daily_strings
- class VideoDataGenerator(object):
- """
- 生成训练数据,测试数据
- """
- def __init__(self):
- self.oo = PyODPS()
- def get_hour_data(self, dt):
- """
- 获取小时级的新视频
- :param dt: 小时参数
- :return:
- """
- sql = f"""select * from loghubods.conten_quality_base_hour where dt = '{dt}';"""
- hour_data = self.oo.select(sql)
- result = []
- for line in hour_data:
- obj = {
- "uid": line["uid"],
- "video_id": line["videoid"],
- "type": line["type"],
- "channel": line["channel"],
- "fst": line["flowpool_start_type"],
- "fsl": line["flowpool_start_level"],
- "fet": line["flowpool_end_type"],
- "fel": line["flowpool_end_level"],
- "f_view": line["flowpool_distribute_view_times"],
- "f_share": line["flowpool_share_times"],
- "f_return": line["flowpool_return_users"],
- "f3_view": line["flowpool_3days_distribute_view_times"],
- "f3_share": line["flowpool_3days_share_times"],
- "f3_return": line["flowpool_3days_return_users"],
- "ros_dms": line["ros_dms"],
- "rov_dms": line["rov_dms"],
- "ros_sls": line["ros_sls"],
- "rov_sls": line["rov_sls"],
- "fans": line["fans"],
- "view_count_user_30days": line["view_cnt_user_30days"],
- "share_count_user_30days": line["share_cnt_user_30days"],
- "return_count_user_30days": line["return_cnt_user_30days"],
- "rov_user": line["rov_user"],
- "str_user": line["str_user"], # share / view
- "out_user_id": line["out_user_id"],
- "mode": line["strategy"],
- "out_play_cnt": line["out_play_cnt"],
- "out_like_cnt": line["out_like_cnt"],
- "out_share_cnt": line["out_share_cnt"],
- "out_collection_cnt": line["out_collection_cnt"],
- "up_level_time_hour": line["up_level_time_hour"],
- "dt": line["dt"],
- }
- result.append(obj)
- return result
- def get_daily_data(self, dt):
- """
- 天级表里面存储了视频的表现 label, 通过小时级的 video_id 去获取视频的表现
- :param dt: 20240101
- :return: data_list
- """
- sql = f"""select * from loghubods.conten_quality_base where dt = '{dt}';"""
- data = self.oo.select(sql)
- result = [
- {
- "video_id": item["videoid"],
- "total_view": item["flowpool_distribute_view_times"],
- "total_share": item["flowpool_share_times"],
- "total_return": item["flowpool_return_users"],
- "3day_view": item["flowpool_3days_distribute_view_times"],
- "3day_share": item["flowpool_3days_share_times"],
- "3day_return": item["flowpool_3days_return_users"],
- "3day_up_level": item["up_level_3_days"],
- "dt": item["dt"],
- }
- for item in data
- ]
- return result
- def save_daily_data(start_date, end_date, save_path):
- """
- 获取日期范围内数据,并且保存到指定路径
- :param start_date:
- :param end_date:
- :param save_path:
- :return:
- """
- date_list = generate_daily_strings(start_date, end_date)
- V = VideoDataGenerator()
- L = {}
- for date_str in tqdm(date_list):
- L[date_str] = {}
- data_list = V.get_daily_data(date_str)
- for obj in tqdm(data_list):
- video_id = obj["video_id"]
- L[date_str][video_id] = obj
- with open(save_path, "w") as f:
- f.write(json.dumps(L, ensure_ascii=False))
- def download_hour_video_data(date_str):
- """
- 获取日期参数
- :param date_str:
- :return:
- """
- V = VideoDataGenerator()
- data_list = V.get_hour_data(date_str)
- L = []
- for obj in data_list:
- L.append(obj)
- temp_path = "data/{}.json".format(date_str)
- with open(temp_path, "w") as f:
- f.write(json.dumps(L, ensure_ascii=False))
- def save_hourly_data(start_date, end_date, save_path):
- """
- save hourly data
- :param start_date:
- :param end_date:
- :param save_path:
- :return:
- """
- print(save_path)
- date_list = generate_hourly_strings(start_date, end_date)
- with ThreadPoolExecutor(max_workers=10) as Pool:
- Pool.map(download_hour_video_data, date_list)
- # for date_str in tqdm(date_list):
- # data_list = V.get_hour_data(date_str)
- # for obj in tqdm(data_list):
- # L.append(obj)
- # with open(save_path, "w") as f:
- # f.write(json.dumps(L, ensure_ascii=False))
- if __name__ == "__main__":
- flag = int(input("请输入标识符,输入 1 获取小时级数据, 输入 2 获取天级数据: \n"))
- if flag == 1:
- start = str(input("请输入开始字符串, 格式为 yyyymmddhh:\n"))
- end = str(input("请输入结束字符串, 格式为 yymmddhh: \n"))
- save_p = "data/hourly-train-{}-{}.json".format(start, end)
- if len(start) == 10 and len(end) == 10:
- save_hourly_data(start, end, save_p)
- else:
- print("Time format is not ok")
- elif flag == 2:
- start = str(input("请输入开始字符串, 格式为 yyyymmdd:\n"))
- end = str(input("请输入结束字符串, 格式为 yymmdd: \n"))
- save_p = "data/daily-label-{}-{}.json".format(start, end)
- if len(start) == 8 and len(end) == 8:
- save_daily_data(start, end, save_p)
- else:
- print("Time format is not ok")
- else:
- print("Input Error ! Make sure your input is 1 or 2!!")
|