123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155 |
- """
- Created on Mon Mar 18, 2024
- @author: luojunhui
- Read data from odps and save to json file in local files
- """
- import os
- import sys
- import json
- from tqdm import tqdm
- sys.path.append(os.getcwd())
- from functions import PyODPS, generate_hourly_strings, generate_daily_strings
- class VideoDataGenerator(object):
- """
- 生成训练数据,测试数据
- """
- def __init__(self):
- self.oo = PyODPS()
- def get_hour_data(self, dt):
- """
- 获取小时级的新视频
- :param dt: 小时参数
- :return:
- """
- sql = f"""select * from loghubods.conten_quality_base_hour where dt = '{dt}';"""
- hour_data = self.oo.select(sql)
- result = []
- for line in hour_data:
- obj = {
- "uid": line['uid'],
- "video_id": line['videoid'],
- "type": line['type'],
- "channel": line['channel'],
- "fst": line['flowpool_start_type'],
- "fsl": line['flowpool_start_level'],
- "fet": line['flowpool_end_type'],
- "fel": line['flowpool_end_level'],
- "f_view": line['flowpool_distribute_view_times'],
- "f_share": line['flowpool_share_times'],
- "f_return": line['flowpool_return_users'],
- "f3_view": line['flowpool_3days_distribute_view_times'],
- "f3_share": line['flowpool_3days_share_times'],
- "f3_return": line['flowpool_3days_return_users'],
- "ros_dms": line['ros_dms'],
- "rov_dms": line['rov_dms'],
- "ros_sls": line['ros_sls'],
- "rov_sls": line['rov_sls'],
- "fans": line['fans'],
- "view_count_user_30days": line['view_cnt_user_30days'],
- "share_count_user_30days": line['share_cnt_user_30days'],
- "return_count_user_30days": line['return_cnt_user_30days'],
- "rov_user": line['rov_user'],
- "str_user": line['str_user'], # share / view
- "out_user_id": line['out_user_id'],
- "mode": line['strategy'],
- "out_play_cnt": line['out_play_cnt'],
- "out_like_cnt": line['out_like_cnt'],
- "out_share_cnt": line['out_share_cnt'],
- "out_collection_cnt": line['out_collection_cnt'],
- "up_level_time_hour": line['up_level_time_hour'],
- "dt": line['dt']
- }
- result.append(obj)
- return result
- def get_daily_data(self, dt):
- """
- 天级表里面存储了视频的表现 label, 通过小时级的 video_id 去获取视频的表现
- :param dt: 20240101
- :return: data_list
- """
- sql = f"""select * from loghubods.conten_quality_base where dt = '{dt}';"""
- data = self.oo.select(sql)
- result = [
- {
- "video_id": item['videoid'],
- "total_view": item['flowpool_distribute_view_times'],
- "total_share": item['flowpool_share_times'],
- "total_return": item['flowpool_return_users'],
- "3day_view": item['flowpool_3days_distribute_view_times'],
- "3day_share": item['flowpool_3days_share_times'],
- "3day_return": item['flowpool_3days_return_users'],
- "dt": item['dt']
- } for item in data
- ]
- return result
- def save_daily_data(start_date, end_date, save_path):
- """
- 获取日期范围内数据,并且保存到指定路径
- :param start_date:
- :param end_date:
- :param save_path:
- :return:
- """
- date_list = generate_daily_strings(start_date, end_date)
- V = VideoDataGenerator()
- L = {}
- for date_str in tqdm(date_list):
- L[date_str] = {}
- data_list = V.get_daily_data(date_str)
- for obj in tqdm(data_list):
- video_id = obj['video_id']
- L[date_str][video_id] = obj
- with open(save_path, "w") as f:
- f.write(json.dumps(L, ensure_ascii=False))
- def save_hourly_data(start_date, end_date, save_path):
- """
- save hourly data
- :param start_date:
- :param end_date:
- :param save_path:
- :return:
- """
- date_list = generate_hourly_strings(start_date, end_date)
- V = VideoDataGenerator()
- L = []
- for date_str in tqdm(date_list):
- data_list = V.get_hour_data(date_str)
- for obj in tqdm(data_list):
- L.append(obj)
- with open(save_path, "w") as f:
- f.write(json.dumps(L, ensure_ascii=False))
- if __name__ == '__main__':
- flag = int(input("请输入标识符,输入 1 获取小时级数据, 输入 2 获取天级数据: \n"))
- if flag == 1:
- start = str(input("请输入开始字符串, 格式为 yyyymmddhh:\n"))
- end = str(input("请输入结束字符串, 格式为 yymmddhh: \n"))
- save_p = "data/hourly-train-{}-{}.json".format(start, end)
- if len(start) == 10 and len(end) == 10:
- save_hourly_data(start, end, save_p)
- else:
- print("Time format is not ok")
- elif flag == 2:
- start = str(input("请输入开始字符串, 格式为 yyyymmdd:\n"))
- end = str(input("请输入结束字符串, 格式为 yymmdd: \n"))
- save_p = "data/daily-label-{}-{}.json".format(start, end)
- if len(start) == 8 and len(end) == 8:
- save_daily_data(start, end, save_p)
- else:
- print("Time format is not ok")
- else:
- print("Input Error ! Make sure your input is 1 or 2!!")
|