import datetime import sys import os import argparse import numpy as np from tqdm import tqdm import jieba.analyse import pandas as pd sys.path.append(os.getcwd()) from functions import MySQLClientSpider class SpiderProcess(object): """ Spider Data Process and Process data for lightgbm training """ def __init__(self): self.client_spider = MySQLClientSpider() self.spider_features = [ "channel", "out_user_id", "mode", "out_play_cnt", "out_like_cnt", "out_share_cnt" ] def spider_lop(self, video_id): """ Spider lop = like / play :param video_id: :return: """ sql = f"""SELECT like_cnt, play_cnt, duration from crawler_video where video_id = '{video_id}';""" try: like_cnt, play_cnt, duration = self.client_spider.select(sql)[0] lop = (like_cnt + 700) / (play_cnt + 18000) return lop, duration except Exception as e: print(video_id, "\t", e) return 0, 0 def spider_data_produce(self, flag, dt_time): """ 从数据库中读取爬虫数据,并且转化为 dataframe, 存储到本地,用作训练和预测 :return: """ dt_time = datetime.datetime.strptime(dt_time, "%Y%m%d") three_date_before = dt_time + datetime.timedelta(days=4) temp_time = three_date_before.strftime("%Y%m%d") if flag == "train": select_sql = f"""SELECT video_id, video_title, rov_label, channel, out_user_id, spider_mode, out_play_cnt, out_like_cnt, out_share_cnt FROM lightgbm_data WHERE type = 'spider' and daily_dt_str <= '{temp_time}' and rov_label > 0;""" des_path = "/root/luojunhui/alg/data/train_data/spider_train_{}.json".format( datetime.datetime.today().strftime("%Y%m%d")) elif flag == "predict": select_sql = f"""SELECT video_id, video_title, rov_label, channel, out_user_id, spider_mode, out_play_cnt, out_like_cnt, out_share_cnt FROM lightgbm_data WHERE type = 'spider' and daily_dt_str = '{temp_time}' and rov_label > 0;""" des_path = "/root/luojunhui/alg/data/predict_data/predict_{}.json".format(dt_time.strftime("%Y%m%d")) else: return data_list = self.client_spider.select(select_sql) df = [] for line in tqdm(data_list): try: temp = list(line) video_id = line[0] title = line[1] lop, duration = self.spider_lop(video_id) title_tags = list(jieba.analyse.textrank(title, topK=3)) temp.append(lop) temp.append(duration) if title_tags: for i in range(3): try: temp.append(title_tags[i]) except: temp.append(None) else: temp.append(None) temp.append(None) temp.append(None) df.append(temp) except: continue df = pd.DataFrame(df, columns=['video_id', 'video_title', 'label', 'channel', 'out_user_id', 'mode', 'out_play_cnt', 'out_like_cnt', 'out_share_cnt', 'lop', 'duration', 'tag1', 'tag2', 'tag3']) df.to_json(des_path, orient='records') class UserProcess(object): """ User Data Process """ def __init__(self): self.client_spider = MySQLClientSpider() self.user_features = [ "label", "uid", "channel", "user_fans", "user_view_30", "user_share_30", "user_return_30", "user_rov", "user_str", "user_return_videos_30", "user_return_videos_3", "user_return_3", "user_view_3", "user_share_3", "address", "tag1", "tag2", "tag3" ] def generate_user_data(self, flag, dt_time): """ 生成user训练数据 :return: """ dt_time = datetime.datetime.strptime(dt_time, "%Y%m%d") three_date_before = dt_time + datetime.timedelta(days=4) temp_time = three_date_before.strftime("%Y%m%d") if flag == "train": sql = f"""select video_title, label, user_id, channel, user_fans, user_view_30, user_share_30, user_return_30, user_rov, user_str, user_return_videos_30, user_return_videos_3, user_return_3, user_view_3, user_share_3, address from lightgbm_data where type = 'userupload' and daily_dt_str <= '{temp_time}';""" des_path = "/root/luojunhui/alg/data/train_data/user_train_{}.json".format( datetime.datetime.today().strftime("%Y%m%d")) elif flag == "predict": sql = f"""select video_title, label, user_id, channel, user_fans, user_view_30, user_share_30, user_return_30, user_rov, user_str, user_return_videos_30, user_return_videos_3, user_return_3, user_view_3, user_share_3, address from lightgbm_data where type = 'userupload' and daily_dt_str = '{temp_time}';""" des_path = "/root/luojunhui/alg/data/predict_data/user_predict_{}.json".format(dt_time.strftime("%Y%m%d")) else: return dt_list = self.client_spider.select(sql) df = [] for line in tqdm(dt_list): title = line[0] temp = list(line) title_tags = list(jieba.analyse.textrank(title, topK=3)) if title_tags: for i in range(3): try: temp.append(title_tags[i]) except: temp.append(None) else: temp.append(None) temp.append(None) temp.append(None) df.append(temp[1:]) df = pd.DataFrame(df, columns=self.user_features) df['ros_30'] = np.where(df['user_view_30'] != 0, df['user_return_30'] / df['user_share_30'], np.nan) df['rov_30'] = np.where(df['user_view_30'] != 0, df['user_return_30'] / df['user_view_30'], np.nan) df['ros_3'] = np.where(df['user_view_3'] != 0, df['user_return_3'] / df['user_share_3'], np.nan) df['rov_3'] = np.where(df['user_view_3'] != 0, df['user_return_3'] / df['user_view_3'], np.nan) df.to_json(des_path, orient='records') class AllProcess(object): """ 全部数据 """ def __init__(self): self.client_spider = MySQLClientSpider() self.all_features = [ # "video_title", "rov_label", "channel", "type", # "out_play_cnt", # "out_like_cnt", # "out_share_cnt" "tag1", "tag2", "tag3" ] def read_all_data(self, flag, dt_time): """ 生成用户数据 :param flag: predict/train :param dt_time: 时间 :return: """ dt_time = datetime.datetime.strptime(dt_time, "%Y%m%d") three_date_before = dt_time + datetime.timedelta(days=4) temp_time = three_date_before.strftime("%Y%m%d") if flag == "train": sql = f"""select video_title, rov_label, channel, type from lightgbm_data where daily_dt_str <= '{temp_time}' and rov_label > 0;""" des_path = "/root/luojunhui/alg/data/train_data/all_train_{}.json".format( datetime.datetime.today().strftime("%Y%m%d")) elif flag == "predict": sql = f"""select video_title, rov_label, channel, type from lightgbm_data where daily_dt_str = '{temp_time}';""" des_path = "/root/luojunhui/alg/data/predict_data/all_predict_{}.json".format(dt_time.strftime("%Y%m%d")) else: return dt_list = self.client_spider.select(sql) df = [] for line in tqdm(dt_list): title = line[0] try: title_tags = list(jieba.analyse.textrank(title, topK=3)) temp = list(line) if title_tags: for i in range(3): try: temp.append(title_tags[i]) except: temp.append(None) df.append(temp[1:]) else: continue except Exception as e: print("title is empty\t", e) df = pd.DataFrame(df, columns=self.all_features) df.to_json(des_path, orient='records') if __name__ == '__main__': parser = argparse.ArgumentParser() # 新建参数解释器对象 parser.add_argument("--cate") parser.add_argument("--flag") parser.add_argument("--dt") args = parser.parse_args() cate = args.cate flag = args.flag dt = args.dt match cate: case "spider": S = SpiderProcess() S.spider_data_produce(flag=flag, dt_time=dt) case "user_info": U = UserProcess() U.generate_user_data(flag=flag, dt_time=dt) case "all": A = AllProcess() A.read_all_data(flag=flag, dt_time=dt)