123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283 |
- """
- process the data to satisfy the lightgbm
- """
- import sys
- import os
- import json
- import asyncio
- import argparse
- from tqdm import tqdm
- import jieba.analyse
- from concurrent.futures.thread import ThreadPoolExecutor
- sys.path.append(os.getcwd())
- from functions import generate_label_date, generate_daily_strings, MysqlClient, MySQLClientSpider
- class DataProcessor(object):
- """
- Insert some information to lightgbm_data
- """
- def __init__(self, ):
- self.client = MysqlClient()
- self.client_spider = MySQLClientSpider()
- self.label_data = {}
- def generate_train_label(self, item, y_ori_data, cate):
- """
- 生成训练数据,用 np.array矩阵的方式返回,
- :return: x_train, 训练数据, y_train, 训练 label
- """
- video_id = item["video_id"]
- dt = item["dt"]
- useful_features = [
- "uid",
- "type",
- "channel",
- "fans",
- "view_count_user_30days",
- "share_count_user_30days",
- "return_count_user_30days",
- "rov_user",
- "str_user",
- "out_user_id",
- "mode",
- "out_play_cnt",
- "out_like_cnt",
- "out_share_cnt",
- "out_collection_cnt",
- ]
- spider_features = [
- "channel",
- "out_user_id",
- "mode",
- "out_play_cnt",
- "out_like_cnt",
- "out_share_cnt"
- ]
- user_features = [
- "uid",
- "channel",
- "fans",
- "view_count_user_30days",
- "share_count_user_30days",
- "return_count_user_30days",
- "rov_user",
- "str_user"
- ]
- match self.ll:
- case "all":
- item_features = [item[i] for i in useful_features]
- case "user":
- if item['type'] == "userupload":
- item_features = [item[i] for i in user_features]
- else:
- return None, None
- case "spider":
- if item['type'] == "spider":
- item_features = [item[i] for i in spider_features]
- lop, duration = self.cal_lop(video_id)
- item_features.append(lop)
- item_features.append(duration)
- else:
- return None, None
- keywords_textrank = self.title_processor(video_id)
- if keywords_textrank:
- for i in range(3):
- try:
- item_features.append(keywords_textrank[i])
- except:
- item_features.append(None)
- else:
- item_features.append(None)
- item_features.append(None)
- item_features.append(None)
- label_dt = generate_label_date(dt)
- label_obj = y_ori_data.get(label_dt, {}).get(video_id)
- if label_obj:
- label = int(label_obj[cate]) if label_obj[cate] else 0
- else:
- label = 0
- return label, item_features
- def producer(self):
- """
- 生成数据
- :return:none
- """
- # 把 label, video_title, daily_dt_str, 存储到 mysql 数据库中去
- label_path = "data/train_data/daily-label-20240101-20240325.json"
- with open(label_path, encoding="utf-8") as f:
- self.label_data = json.loads(f.read())
- def read_title(client, video_id):
- """
- read_title_from mysql
- """
- sql = f"""SELECT title from wx_video where id = {video_id};"""
- # print("title", sql)
- try:
- title = client.select(sql)[0][0]
- return title
- except Exception as e:
- print(video_id, "\t", e)
- return ""
- def generate_label(video_id, hourly_dt_str, label_info):
- """
- generate label daily_dt_str for mysql
- :param label_info:
- :param video_id:
- :param hourly_dt_str:
- :return: label, daily_dt_str
- """
- label_dt = generate_label_date(hourly_dt_str)
- label_obj = label_info.get(label_dt, {}).get(video_id)
- if label_obj:
- label = int(label_obj["total_return"]) if label_obj["total_return"] else 0
- print(label)
- else:
- label = 0
- return label, label_dt
- def process_info(item_):
- """
- Insert data into MySql
- :param item_:
- """
- video_id, hour_dt = item_
- label_info = self.label_data
- if not label_info:
- print(label_info)
- # print(len(label_info))
- title = read_title(client=self.client, video_id=video_id)
- label, dt_daily = generate_label(video_id, hour_dt, label_info)
- insert_sql = f"""UPDATE lightgbm_data
- set video_title = '{title}', label = '{label}', daily_dt_str = '{dt_daily}'
- where video_id = '{video_id}'
- ;"""
- self.client_spider.update(insert_sql)
- select_sql = "SELECT video_id, hour_dt_str FROM lightgbm_data where label is NULL and hour_dt_str < '20240327';"
- init_data_tuple = self.client_spider.select(select_sql)
- init_list = list(init_data_tuple)
- for item in tqdm(init_list):
- # print(item)
- process_info(item)
- # with ThreadPoolExecutor(max_workers=10) as Pool:
- # Pool.map(process_info, init_list)
- class SpiderProcess(object):
- """
- Spider Data Process and Process data for lightgbm training
- """
- def __init__(self):
- self.client_spider = MySQLClientSpider()
- def spider_lop(self, video_id):
- """
- Spider lop = like / play
- :param video_id:
- :return:
- """
- sql = f"""SELECT like_cnt, play_cnt, duration from crawler_video where video_id = '{video_id}';"""
- try:
- like_cnt, play_cnt, duration = self.client_spider.select(sql)[0]
- lop = (like_cnt + 700) / (play_cnt + 18000)
- return lop, duration
- except Exception as e:
- print(video_id, "\t", e)
- return 0, 0
- def spider_data_produce(self):
- """
- 把 spider_duration 存储到数据库中
- :return:
- """
- return
- class UserProcess(object):
- """
- User Data Process
- """
- def __init__(self):
- self.client = MysqlClient()
- self.user_features = [
- "uid",
- "channel",
- "user_fans",
- "user_view_30",
- "user_share_30",
- "user_return_30",
- "user_rov",
- "user_str",
- "user_return_videos_30",
- "user_return_videos_3",
- "user_return_3",
- "user_view_3",
- "user_share_3",
- "address"
- ]
- def title_processor(self, video_id):
- """
- 通过 video_id 去获取title, 然后通过 title 再分词,把关键词作为 feature
- :param video_id: the video id
- :return: tag_list [tag, tag, tag, tag......]
- """
- sql = f"""SELECT title from wx_video where id = {video_id};"""
- try:
- title = self.client.select(sql)[0][0]
- keywords_textrank = jieba.analyse.textrank(title, topK=3)
- return list(keywords_textrank)
- except Exception as e:
- print(video_id, "\t", e)
- return []
- def user_data_process(self):
- """
- 把 user_return_3, user_view_3, user_share_3
- user_return_videos_3, user_return_videos_30
- address 存储到 mysql 数据库中
- :return:
- """
- user_path = '/data'
- if __name__ == "__main__":
- # D = DataProcessor()
- # D.producer()
- # parser = argparse.ArgumentParser() # 新建参数解释器对象
- # parser.add_argument("--mode")
- # parser.add_argument("--category")
- # parser.add_argument("--dtype", default="whole")
- # args = parser.parse_args()
- # mode = args.mode
- # category = args.category
- # dtype = args.dtype
- D = DataProcessor()
- D.producer()
- # if mode == "train":
- # print("Loading data and process for training.....")
- # D = DataProcessor(flag="train", ll=category)
- # D.producer("whole")
- # elif mode == "predict":
- # print("Loading data and process for prediction for each day......")
- # D = DataProcessor(flag="predict", ll=category)
- # if dtype == "single":
- # date_str = str(input("Please enter the date of the prediction"))
- # D.producer(date_str)
- # elif dtype == "days":
- # start_date_str = str(input("Please enter the start date of the prediction"))
- # end_date_str = str(input("Please enter the end date of the prediction"))
- # dt_list = generate_daily_strings(start_date=start_date_str, end_date=end_date_str)
- # for d in dt_list:
- # D.producer()
|