123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244 |
- """
- Created on Mon Mar 18, 2024
- @author: luojunhui
- Read data from odps and save to json file in local files
- """
- import os
- import sys
- import json
- from tqdm import tqdm
- sys.path.append(os.getcwd())
- from functions import PyODPS, generate_hourly_strings, generate_daily_strings
- from functions import MySQLClientSpider
- class VideoDataGenerator(object):
- """
- 生成训练数据,测试数据
- """
- def __init__(self):
- self.oo = PyODPS()
- # self.v_mysql = MysqlClient()
- self.spider_mysql = MySQLClientSpider()
- def insert_into_database(self, dt):
- """
- 把小时级数据插入 MySQL 数据库
- :return:
- """
- data_list = self.get_hour_data(dt)
- for obj in tqdm(data_list):
- insert_sql = f"""
- INSERT INTO lightgbm_data
- (video_id, user_id, type, channel, user_fans, user_view_30, user_share_30, user_return_30, user_rov, user_str, out_user_id, spider_mode, out_play_cnt, out_like_cnt, out_share_cnt, hour_dt_str)
- values
- ({obj['video_id']}, '{obj['uid']}', '{obj['type']}', '{obj['channel']}', {obj['fans']}, {obj['view_count_user_30days']}, {obj['share_count_user_30days']}, {obj['return_count_user_30days']}, {obj['rov_user']}, {obj['str_user']}, '{obj['out_user_id']}', '{obj['mode']}', {obj['out_play_cnt']}, {obj['out_like_cnt']}, {obj['out_share_cnt']}, '{obj['dt']}');
- """
- self.spider_mysql.update(insert_sql)
- def get_hour_data(self, dt):
- """
- 获取小时级的新视频
- :param dt: 小时参数
- :return:
- """
- sql = f"""select * from loghubods.conten_quality_base_hour where dt = '{dt}';"""
- hour_data = self.oo.select(sql)
- result = []
- for line in hour_data:
- obj = {
- "uid": line["uid"] if line['uid'] else "",
- "video_id": line["videoid"] if line['videoid'] else "",
- "type": line["type"] if line['type'] else "",
- "channel": line["channel"] if line['channel'] else "",
- "fst": line["flowpool_start_type"],
- "fsl": line["flowpool_start_level"],
- "fet": line["flowpool_end_type"],
- "fel": line["flowpool_end_level"],
- "f_view": line["flowpool_distribute_view_times"],
- "f_share": line["flowpool_share_times"],
- "f_return": line["flowpool_return_users"],
- "f3_view": line["flowpool_3days_distribute_view_times"],
- "f3_share": line["flowpool_3days_share_times"],
- "f3_return": line["flowpool_3days_return_users"],
- "ros_dms": line["ros_dms"],
- "rov_dms": line["rov_dms"],
- "ros_sls": line["ros_sls"],
- "rov_sls": line["rov_sls"],
- "fans": line["fans"] if line["fans"] else 0,
- "view_count_user_30days": line["view_cnt_user_30days"] if line["view_cnt_user_30days"] else 0,
- "share_count_user_30days": line["share_cnt_user_30days"] if line["share_cnt_user_30days"] else 0,
- "return_count_user_30days": line["return_cnt_user_30days"] if line["return_cnt_user_30days"] else 0,
- "rov_user": line["rov_user"] if line["rov_user"] else 0,
- "str_user": line["str_user"] if line["str_user"] else 0,
- "out_user_id": line["out_user_id"] if line["out_user_id"] else "",
- "mode": line["strategy"] if line["strategy"] else "",
- "out_play_cnt": line["out_play_cnt"] if line["out_play_cnt"] else 0,
- "out_like_cnt": line["out_like_cnt"] if line["out_like_cnt"] else 0,
- "out_share_cnt": line["out_share_cnt"] if line["out_share_cnt"] else 0,
- "out_collection_cnt": line["out_collection_cnt"],
- "up_level_time_hour": line["up_level_time_hour"],
- "dt": line["dt"],
- }
- result.append(obj)
- return result
- def get_daily_data(self, dt):
- """
- 天级表里面存储了视频的表现 label, 通过小时级的 video_id 去获取视频的表现
- :param dt: 20240101
- :return: data_list
- """
- sql = f"""select * from loghubods.conten_quality_base where dt = '{dt}';"""
- data = self.oo.select(sql)
- result = [
- {
- "video_id": item["videoid"],
- "total_view": item["flowpool_distribute_view_times"],
- "total_share": item["flowpool_share_times"],
- "total_return": item["flowpool_return_users"],
- "3day_view": item["flowpool_3days_distribute_view_times"],
- "3day_share": item["flowpool_3days_share_times"],
- "3day_return": item["flowpool_3days_return_users"],
- "3day_up_level": item["up_level_3_days"],
- 'uplevel': item['uplevel'],
- "dt": item["dt"],
- }
- for item in data
- ]
- return result
- def get_daily_user_info(self, dt):
- """
- 获取用户的信息以及特征
- :param dt:
- """
- sql = f"""select * from loghubods.conten_quality_base_day_new where dt = '{dt}';"""
- data = self.oo.select(sql)
- result = [
- {
- "uid": item["uid"],
- "video_id": item["videoid"],
- "address": item["city"],
- "return_3days": item["return_count_user_3days"],
- "view_3days": item["view_count_user_3days"],
- "share_3days": item["share_count_3days"],
- "3day_return_500_videos": item["return_count_user_3days_videocnt"],
- "30day_return_2000_videos": item["return_count_user_30days_videocnt"],
- "dt": item["dt"],
- }
- for item in data
- ]
- return result
- def save_daily_data(start_date, end_date, save_path):
- """
- 获取日期范围内数据,并且保存到指定路径
- :param start_date:
- :param end_date:
- :param save_path:
- :return:
- """
- date_list = generate_daily_strings(start_date, end_date)
- V = VideoDataGenerator()
- L = {}
- for date_str in tqdm(date_list):
- L[date_str] = {}
- data_list = V.get_daily_data(date_str)
- for obj in tqdm(data_list):
- video_id = obj["video_id"]
- L[date_str][video_id] = obj
- with open(save_path, "w") as f:
- f.write(json.dumps(L, ensure_ascii=False))
- def download_hour_video_data(date_str):
- """
- 获取日期参数
- :param date_str:
- :return:
- """
- V = VideoDataGenerator()
- data_list = V.get_hour_data(date_str)
- L = []
- for obj in tqdm(data_list):
- L.append(obj)
- temp_path = "data/temp_data/hour_{}.json".format(date_str)
- with open(temp_path, "w") as f:
- f.write(json.dumps(L, ensure_ascii=False))
- def save_hourly_data(start_date, end_date):
- """
- save hourly data
- :param start_date:
- :param end_date:
- :param save_path:
- :return:
- """
- # print(save_path)
- V = VideoDataGenerator()
- date_list = generate_hourly_strings(start_date, end_date)
- for obj in tqdm(date_list):
- V.insert_into_database(obj)
- # with ThreadPoolExecutor(max_workers=5) as Pool:
- # Pool.map(V.insert_into_database, date_list)
- def save_daily_user_info(start_date, end_date, save_path):
- """
- save daily user_info
- :param start_date:
- :param end_date:
- :param save_path:
- :return:
- """
- date_list = generate_daily_strings(start_date, end_date)
- V = VideoDataGenerator()
- L = {}
- for date_str in tqdm(date_list):
- L[date_str] = {}
- data_list = V.get_daily_user_info(date_str)
- for obj in tqdm(data_list):
- video_id = obj["video_id"]
- L[date_str][video_id] = obj
- with open(save_path, "w") as f:
- f.write(json.dumps(L, ensure_ascii=False))
- if __name__ == "__main__":
- flag = int(
- input(
- "请输入标识符,输入 1 获取小时级数据\n输入 2 获取天级数据\n输入 3 获取用户信息数据: \n"
- )
- )
- if flag == 1:
- start = str(input("请输入开始字符串, 格式为 yyyymmddhh:\n"))
- end = str(input("请输入结束字符串, 格式为 yyyymmddhh: \n"))
- if len(start) == 10 and len(end) == 10:
- save_hourly_data(start, end)
- else:
- print("Time format is not ok")
- elif flag == 2:
- start = str(input("请输入开始字符串, 格式为 yyyymmdd:\n"))
- end = str(input("请输入结束字符串, 格式为 yymmdd: \n"))
- save_p = "data/train_data/daily-label-{}-{}.json".format(start, end)
- if len(start) == 8 and len(end) == 8:
- save_daily_data(start, end, save_p)
- else:
- print("Time format is not ok")
- elif flag == 3:
- start = str(input("请输入开始字符串, 格式为 yyyymmdd:\n"))
- end = str(input("请输入结束字符串, 格式为 yymmdd: \n"))
- save_p = "data/train_data/daily-user-info-{}-{}.json".format(start, end)
- if len(start) == 8 and len(end) == 8:
- save_daily_user_info(start, end, save_p)
- else:
- print("Time format is not ok")
- else:
- print("Input Error ! Make sure your input is 1 or 2 or 3!!")
|