""" process the data to satisfy the lightgbm """ import sys import os import json from tqdm import tqdm import jieba.analyse sys.path.append(os.getcwd()) from functions import generate_label_date, MysqlClient class DataProcessor(object): """ Process the data to satisfy the lightGBM """ def __init__(self, flag, c="useful"): self.client = MysqlClient() self.flag = flag self.c = c def generate_train_label(self, item, y_ori_data, cate): """ 生成训练数据,用 np.array矩阵的方式返回, :return: x_train, 训练数据, y_train, 训练 label """ video_id = item["video_id"] dt = item["dt"] useful_features = [ "uid", "type", "channel", "fans", "view_count_user_30days", "share_count_user_30days", "return_count_user_30days", "rov_user", "str_user", "out_user_id", "mode", "out_play_cnt", "out_like_cnt", "out_share_cnt", "out_collection_cnt", ] spider_features = [ "channel", "view_count_user_30days", "share_count_user_30days", "return_count_user_30days", "rov_user", "str_user", "out_user_id", "mode", "out_play_cnt", "out_like_cnt", "out_share_cnt" ] user_features = [ "uid", "channel", "fans", "view_count_user_30days", "share_count_user_30days", "return_count_user_30days", "rov_user", "str_user" ] match self.c: case "useful": item_features = [item[i] for i in useful_features] case "user": item_features = [item[i] for i in user_features] case "spider": if item['type'] == "spider": item_features = [item[i] for i in spider_features] else: return None, None keywords_textrank = self.title_processor(video_id) if keywords_textrank: for i in range(3): try: item_features.append(keywords_textrank[i]) except: item_features.append(None) else: item_features.append(None) item_features.append(None) item_features.append(None) label_dt = generate_label_date(dt) label_obj = y_ori_data.get(label_dt, {}).get(video_id) if label_obj: label = int(label_obj[cate]) if label_obj[cate] else 0 else: label = 0 return label, item_features def title_processor(self, video_id): """ 通过 video_id 去获取title, 然后通过 title 再分词,把关键词作为 feature :param video_id: the video id :return: tag_list [tag, tag, tag, tag......] """ sql = f"""SELECT title from wx_video where id = {video_id};""" try: title = self.client.select(sql)[0][0] keywords_textrank = jieba.analyse.textrank(title, topK=3) return list(keywords_textrank) except Exception as e: print(video_id, "\t", e) return [] def producer(self, dt): """ 生成数据 :return:none """ if self.flag == "train": x_path = "data/train_data/train_2024010100_2024031523.json" y_path = "data/train_data/daily-label-20240101-20240325.json" elif self.flag == "predict": x_path = "data/pred_data/pred_202403{}00_202403{}23.json".format(dt, dt) y_path = "data/train_data/daily-label-20240101-20240325.json" else: return with open(x_path) as f: x_data = json.loads(f.read()) with open(y_path) as f: y_data = json.loads(f.read()) cate_list = ["total_return"] for c in cate_list: x_list = [] y_list = [] for video_obj in tqdm(x_data): our_label, features = self.generate_train_label(video_obj, y_data, c) if features: x_list.append(features) y_list.append(our_label) with open("data/produce_data/x_data_{}_{}_{}_{}.json".format(c, self.flag, dt, self.c), "w") as f1: f1.write(json.dumps(x_list, ensure_ascii=False)) with open("data/produce_data/y_data_{}_{}_{}_{}.json".format(c, self.flag, dt, self.c), "w") as f2: f2.write(json.dumps(y_list, ensure_ascii=False)) if __name__ == "__main__": flag = int(input("please input method train or predict:\n ")) if flag == 1: t = "train" D = DataProcessor(flag=t, c="spider") D.producer(dt="whole") else: t = "predict" D = DataProcessor(flag=t, c="spider") for d in range(16, 22): D.producer(d)