""" process the data to satisfy the lightgbm """ import sys import os import json import argparse from tqdm import tqdm from concurrent.futures.thread import ThreadPoolExecutor sys.path.append(os.getcwd()) from functions import generate_label_date, MysqlClient, MySQLClientSpider class DataProcessor(object): """ Insert some information to lightgbm_data """ def __init__(self, ): self.client = MysqlClient() self.client_spider = MySQLClientSpider() self.label_data = {} def update_label(self, start_date, end_date): """ 生成数据 :return:none """ # 把 label, video_title, daily_dt_str, 存储到 mysql 数据库中去 label_path = "/root/luojunhui/alg/data/train_data/daily-label-{}-{}.json".format(start_date, end_date) with open(label_path, encoding="utf-8") as f: self.label_data = json.loads(f.read()) def read_title(client, video_id): """ read_title_from mysql """ sql = f"""SELECT title from wx_video where id = {video_id};""" # print("title", sql) try: title = client.select(sql)[0][0] return title.strip() except Exception as e: print(video_id, "\t", e) return "" def generate_label(video_id, hourly_dt_str, label_info): """ generate label daily_dt_str for mysql :param label_info: :param video_id: :param hourly_dt_str: :return: label, daily_dt_str """ label_dt = generate_label_date(hourly_dt_str) label_obj = label_info.get(label_dt, {}).get(video_id) if label_obj: total_return = label_obj.get('total_return', 0) total_view = label_obj.get('total_view', 0) if total_return is not None and total_view is not None: total_return = float(total_return) total_view = float(total_view) if total_view == 0: label = None else: if total_return == 0: label = None else: label = float(total_return) / float(total_view) elif total_return is None and total_view is not None: label = 0 else: label = None else: label = None return label, label_dt def process_info(item_): """ Insert data into MySql :param item_: """ video_id, hour_dt = item_ # print(type(video_id)) label_info = self.label_data # title = read_title(client=self.client, video_id=video_id) label, dt_daily = generate_label(str(video_id), hour_dt, label_info) insert_sql = f"""UPDATE lightgbm_data set rov_label = '{label}', daily_dt_str = '{dt_daily}' where video_id = '{video_id}';""" # print(insert_sql) self.client_spider.update(insert_sql) select_sql = "SELECT video_id, hour_dt_str FROM lightgbm_data where rov_label is NULL;" init_data_tuple = self.client_spider.select(select_sql) init_list = list(init_data_tuple) # with ThreadPoolExecutor(max_workers=4) as Pool: # Pool.map(process_info, init_list) for item in tqdm(init_list): try: process_info(item) except Exception as e: print("操作失败", e) def update_user_info(self, start_date, end_date): """ 把补充的 user_info更新到 mysql 中、 把 user_return_3, user_view_3, user_share_3 user_return_videos_3, user_return_videos_30 address 存储到 mysql 数据库中 :return: """ user_path = '/root/luojunhui/alg/data/train_data/daily-user-info-{}-{}.json'.format(start_date, end_date) with open(user_path) as f: data = json.loads(f.read()) sql = "select video_id, hour_dt_str from lightgbm_data where type = 'userupload' and address is NULL;" dt_list = self.client_spider.select(sql) for item in tqdm(dt_list): video_id, dt_info = item dt_info = dt_info[:8] user_info_obj = data.get(dt_info, {}).get(str(video_id)) if user_info_obj: try: video_id = user_info_obj['video_id'] address = user_info_obj['address'] return_3 = user_info_obj['return_3days'] view_3 = user_info_obj['view_3days'] share_3 = user_info_obj['share_3days'] return_videos_3 = user_info_obj['3day_return_500_videos'] return_videos_30 = user_info_obj['30day_return_2000_videos'] update_sql = f"""UPDATE lightgbm_data set address='{address}', user_return_3={return_3}, user_view_3={view_3}, user_share_3={share_3}, user_return_videos_3={return_videos_3}, user_return_videos_30={return_videos_30} where video_id = '{video_id}';""" self.client_spider.update(update_sql) except Exception as e: print(e) pass else: print("No user info") if __name__ == "__main__": parser = argparse.ArgumentParser() # 新建参数解释器对象 parser.add_argument("--param") args = parser.parse_args() param = args.param D = DataProcessor() match param: case "label": sd = str(input("输入label日级表开始日期,格式为 YYYYmmdd")) ed = str(input("输入label日级表结束日期,格式为 YYYYmmdd")) D.update_label(start_date=sd, end_date=ed) case "user_info": sd = str(input("输入label日级表开始日期,格式为 YYYYmmdd")) ed = str(input("输入label日级表结束日期,格式为 YYYYmmdd")) D.update_user_info(start_date=sd, end_date=ed) # case "spider": # S = SpiderProcess() # S.spider_data_produce(flag=mode, dt_time=dt) # case "user": # U = UserProcess() # if mode == "generate": # sd = str(input("输入开始日期,格式为 YYYYmmdd")) # ed = str(input("输入结束日期,格式为 YYYYmmdd")) # U.userinfo_to_mysql(start_date=sd, end_date=ed) # else: # U.generate_user_data(flag=mode, dt_time=dt)