""" process the data to satisfy the lightgbm """ import sys import os import json import asyncio import argparse from tqdm import tqdm import jieba.analyse sys.path.append(os.getcwd()) from functions import generate_label_date, generate_daily_strings, MysqlClient, MySQLClientSpider class DataProcessor(object): """ Insert some information to lightgbm_data """ def __init__(self, ): self.client = MysqlClient() self.client_spider = MySQLClientSpider() def generate_train_label(self, item, y_ori_data, cate): """ 生成训练数据,用 np.array矩阵的方式返回, :return: x_train, 训练数据, y_train, 训练 label """ video_id = item["video_id"] dt = item["dt"] useful_features = [ "uid", "type", "channel", "fans", "view_count_user_30days", "share_count_user_30days", "return_count_user_30days", "rov_user", "str_user", "out_user_id", "mode", "out_play_cnt", "out_like_cnt", "out_share_cnt", "out_collection_cnt", ] spider_features = [ "channel", "out_user_id", "mode", "out_play_cnt", "out_like_cnt", "out_share_cnt" ] user_features = [ "uid", "channel", "fans", "view_count_user_30days", "share_count_user_30days", "return_count_user_30days", "rov_user", "str_user" ] match self.ll: case "all": item_features = [item[i] for i in useful_features] case "user": if item['type'] == "userupload": item_features = [item[i] for i in user_features] else: return None, None case "spider": if item['type'] == "spider": item_features = [item[i] for i in spider_features] lop, duration = self.cal_lop(video_id) item_features.append(lop) item_features.append(duration) else: return None, None keywords_textrank = self.title_processor(video_id) if keywords_textrank: for i in range(3): try: item_features.append(keywords_textrank[i]) except: item_features.append(None) else: item_features.append(None) item_features.append(None) item_features.append(None) label_dt = generate_label_date(dt) label_obj = y_ori_data.get(label_dt, {}).get(video_id) if label_obj: label = int(label_obj[cate]) if label_obj[cate] else 0 else: label = 0 return label, item_features async def producer(self): """ 生成数据 :return:none """ # 把 label, video_title, daily_dt_str, 存储到 mysql 数据库中去 label_path = "data/train_data/daily-label-20240101-20240325.json" with open(label_path, encoding="utf-8") as f: label_data = json.loads(f.read()) async def read_title(client, video_id): """ read_title_from mysql """ sql = f"""SELECT title from wx_video where id = {video_id};""" try: title = await client.select(sql)[0][0] return title except Exception as e: print(video_id, "\t", e) return "" def generate_label(video_id, hourly_dt_str, label_info): """ generate label daily_dt_str for mysql :param label_info: :param video_id: :param hourly_dt_str: :return: label, daily_dt_str """ label_dt = generate_label_date(hourly_dt_str) label_obj = label_info.get(label_dt, {}).get(video_id) if label_obj: label = int(label_obj["total_return"]) if label_obj["total_return"] else 0 else: label = 0 return label, label_dt async def process_info(item_, label_info): """ Insert data into MySql :param item_: :param label_info: """ video_id, hour_dt = item_ title = await read_title(client=self.client, video_id=video_id) label, dt_daily = generate_label(video_id, hour_dt, label_info) insert_sql = f"""INSERT INTO lightgbm_data (video_title, label, daily_dt_str) values ('{title}', '{label}', '{dt_daily}';""" await self.client_spider.update(insert_sql) select_sql = "SELECT video_id, hour_dt_str FROM lightgbm_data where label is NULL;" init_data_tuple = self.client_spider.select(select_sql) init_list = list(init_data_tuple) async_tasks = [] for item in init_list: async_tasks.append(process_info(item, label_data)) await asyncio.gather(*async_tasks) class SpiderProcess(object): """ Spider Data Process and Process data for lightgbm training """ def __init__(self): self.client_spider = MySQLClientSpider() def spider_lop(self, video_id): """ Spider lop = like / play :param video_id: :return: """ sql = f"""SELECT like_cnt, play_cnt, duration from crawler_video where video_id = '{video_id}';""" try: like_cnt, play_cnt, duration = self.client_spider.select(sql)[0] lop = (like_cnt + 700) / (play_cnt + 18000) return lop, duration except Exception as e: print(video_id, "\t", e) return 0, 0 def spider_data_produce(self): """ 把 spider_duration 存储到数据库中 :return: """ return class UserProcess(object): """ User Data Process """ def __init__(self): self.client = MysqlClient() self.user_features = [ "uid", "channel", "user_fans", "user_view_30", "user_share_30", "user_return_30", "user_rov", "user_str", "user_return_videos_30", "user_return_videos_3", "user_return_3", "user_view_3", "user_share_3", "address" ] def title_processor(self, video_id): """ 通过 video_id 去获取title, 然后通过 title 再分词,把关键词作为 feature :param video_id: the video id :return: tag_list [tag, tag, tag, tag......] """ sql = f"""SELECT title from wx_video where id = {video_id};""" try: title = self.client.select(sql)[0][0] keywords_textrank = jieba.analyse.textrank(title, topK=3) return list(keywords_textrank) except Exception as e: print(video_id, "\t", e) return [] def user_data_process(self): """ 把 user_return_3, user_view_3, user_share_3 user_return_videos_3, user_return_videos_30 address 存储到 mysql 数据库中 :return: """ user_path = '/data' if __name__ == "__main__": # D = DataProcessor() # D.producer() # parser = argparse.ArgumentParser() # 新建参数解释器对象 # parser.add_argument("--mode") # parser.add_argument("--category") # parser.add_argument("--dtype", default="whole") # args = parser.parse_args() # mode = args.mode # category = args.category # dtype = args.dtype D = DataProcessor() asyncio.run(D.producer()) # if mode == "train": # print("Loading data and process for training.....") # D = DataProcessor(flag="train", ll=category) # D.producer("whole") # elif mode == "predict": # print("Loading data and process for prediction for each day......") # D = DataProcessor(flag="predict", ll=category) # if dtype == "single": # date_str = str(input("Please enter the date of the prediction")) # D.producer(date_str) # elif dtype == "days": # start_date_str = str(input("Please enter the start date of the prediction")) # end_date_str = str(input("Please enter the end date of the prediction")) # dt_list = generate_daily_strings(start_date=start_date_str, end_date=end_date_str) # for d in dt_list: # D.producer()