""" process the data to satisfy the lightgbm """ import sys import os import json import asyncio import argparse from tqdm import tqdm import jieba.analyse from concurrent.futures.thread import ThreadPoolExecutor sys.path.append(os.getcwd()) from functions import generate_label_date, generate_daily_strings, MysqlClient, MySQLClientSpider class DataProcessor(object): """ Insert some information to lightgbm_data """ def __init__(self, ): self.client = MysqlClient() self.client_spider = MySQLClientSpider() self.label_data = {} def generate_train_label(self, item, y_ori_data, cate): """ 生成训练数据,用 np.array矩阵的方式返回, :return: x_train, 训练数据, y_train, 训练 label """ video_id = item["video_id"] dt = item["dt"] useful_features = [ "uid", "type", "channel", "fans", "view_count_user_30days", "share_count_user_30days", "return_count_user_30days", "rov_user", "str_user", "out_user_id", "mode", "out_play_cnt", "out_like_cnt", "out_share_cnt", "out_collection_cnt", ] spider_features = [ "channel", "out_user_id", "mode", "out_play_cnt", "out_like_cnt", "out_share_cnt" ] user_features = [ "uid", "channel", "fans", "view_count_user_30days", "share_count_user_30days", "return_count_user_30days", "rov_user", "str_user" ] match self.ll: case "all": item_features = [item[i] for i in useful_features] case "user": if item['type'] == "userupload": item_features = [item[i] for i in user_features] else: return None, None case "spider": if item['type'] == "spider": item_features = [item[i] for i in spider_features] lop, duration = self.cal_lop(video_id) item_features.append(lop) item_features.append(duration) else: return None, None keywords_textrank = self.title_processor(video_id) if keywords_textrank: for i in range(3): try: item_features.append(keywords_textrank[i]) except: item_features.append(None) else: item_features.append(None) item_features.append(None) item_features.append(None) label_dt = generate_label_date(dt) label_obj = y_ori_data.get(label_dt, {}).get(video_id) if label_obj: label = int(label_obj[cate]) if label_obj[cate] else 0 else: label = 0 return label, item_features def producer(self): """ 生成数据 :return:none """ # 把 label, video_title, daily_dt_str, 存储到 mysql 数据库中去 label_path = "data/train_data/daily-label-20240101-20240325.json" with open(label_path, encoding="utf-8") as f: self.label_data = json.loads(f.read()) def read_title(client, video_id): """ read_title_from mysql """ sql = f"""SELECT title from wx_video where id = {video_id};""" # print("title", sql) try: title = client.select(sql)[0][0] return title except Exception as e: print(video_id, "\t", e) return "" def generate_label(video_id, hourly_dt_str, label_info): """ generate label daily_dt_str for mysql :param label_info: :param video_id: :param hourly_dt_str: :return: label, daily_dt_str """ label_dt = generate_label_date(hourly_dt_str) label_obj = label_info.get(label_dt, {}).get(video_id) if label_obj: label = int(label_obj["total_return"]) if label_obj["total_return"] else 0 print(label) else: label = 0 return label, label_dt def process_info(item_): """ Insert data into MySql :param item_: """ video_id, hour_dt = item_ label_info = self.label_data if not label_info: print(label_info) # print(len(label_info)) title = read_title(client=self.client, video_id=video_id) label, dt_daily = generate_label(video_id, hour_dt, label_info) insert_sql = f"""UPDATE lightgbm_data set video_title = '{title}', label = '{label}', daily_dt_str = '{dt_daily}' where video_id = '{video_id}' ;""" self.client_spider.update(insert_sql) select_sql = "SELECT video_id, hour_dt_str FROM lightgbm_data where label is NULL and hour_dt_str < '20240327';" init_data_tuple = self.client_spider.select(select_sql) init_list = list(init_data_tuple) for item in tqdm(init_list): # print(item) process_info(item) # with ThreadPoolExecutor(max_workers=10) as Pool: # Pool.map(process_info, init_list) class SpiderProcess(object): """ Spider Data Process and Process data for lightgbm training """ def __init__(self): self.client_spider = MySQLClientSpider() def spider_lop(self, video_id): """ Spider lop = like / play :param video_id: :return: """ sql = f"""SELECT like_cnt, play_cnt, duration from crawler_video where video_id = '{video_id}';""" try: like_cnt, play_cnt, duration = self.client_spider.select(sql)[0] lop = (like_cnt + 700) / (play_cnt + 18000) return lop, duration except Exception as e: print(video_id, "\t", e) return 0, 0 def spider_data_produce(self): """ 把 spider_duration 存储到数据库中 :return: """ return class UserProcess(object): """ User Data Process """ def __init__(self): self.client = MysqlClient() self.user_features = [ "uid", "channel", "user_fans", "user_view_30", "user_share_30", "user_return_30", "user_rov", "user_str", "user_return_videos_30", "user_return_videos_3", "user_return_3", "user_view_3", "user_share_3", "address" ] def title_processor(self, video_id): """ 通过 video_id 去获取title, 然后通过 title 再分词,把关键词作为 feature :param video_id: the video id :return: tag_list [tag, tag, tag, tag......] """ sql = f"""SELECT title from wx_video where id = {video_id};""" try: title = self.client.select(sql)[0][0] keywords_textrank = jieba.analyse.textrank(title, topK=3) return list(keywords_textrank) except Exception as e: print(video_id, "\t", e) return [] def user_data_process(self): """ 把 user_return_3, user_view_3, user_share_3 user_return_videos_3, user_return_videos_30 address 存储到 mysql 数据库中 :return: """ user_path = '/data' if __name__ == "__main__": # D = DataProcessor() # D.producer() # parser = argparse.ArgumentParser() # 新建参数解释器对象 # parser.add_argument("--mode") # parser.add_argument("--category") # parser.add_argument("--dtype", default="whole") # args = parser.parse_args() # mode = args.mode # category = args.category # dtype = args.dtype D = DataProcessor() D.producer() # if mode == "train": # print("Loading data and process for training.....") # D = DataProcessor(flag="train", ll=category) # D.producer("whole") # elif mode == "predict": # print("Loading data and process for prediction for each day......") # D = DataProcessor(flag="predict", ll=category) # if dtype == "single": # date_str = str(input("Please enter the date of the prediction")) # D.producer(date_str) # elif dtype == "days": # start_date_str = str(input("Please enter the start date of the prediction")) # end_date_str = str(input("Please enter the end date of the prediction")) # dt_list = generate_daily_strings(start_date=start_date_str, end_date=end_date_str) # for d in dt_list: # D.producer()