123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247 |
- import datetime
- import sys
- import os
- import argparse
- import numpy as np
- from tqdm import tqdm
- import jieba.analyse
- import pandas as pd
- sys.path.append(os.getcwd())
- from functions import MySQLClientSpider
- class SpiderProcess(object):
- """
- Spider Data Process and Process data for lightgbm training
- """
- def __init__(self):
- self.client_spider = MySQLClientSpider()
- self.spider_features = [
- "channel",
- "out_user_id",
- "mode",
- "out_play_cnt",
- "out_like_cnt",
- "out_share_cnt"
- ]
- def spider_lop(self, video_id):
- """
- Spider lop = like / play
- :param video_id:
- :return:
- """
- sql = f"""SELECT like_cnt, play_cnt, duration from crawler_video where video_id = '{video_id}';"""
- try:
- like_cnt, play_cnt, duration = self.client_spider.select(sql)[0]
- lop = (like_cnt + 700) / (play_cnt + 18000)
- return lop, duration
- except Exception as e:
- print(video_id, "\t", e)
- return 0, 0
- def spider_data_produce(self, flag, dt_time):
- """
- 从数据库中读取爬虫数据,并且转化为 dataframe, 存储到本地,用作训练和预测
- :return:
- """
- dt_time = datetime.datetime.strptime(dt_time, "%Y%m%d")
- three_date_before = dt_time + datetime.timedelta(days=4)
- temp_time = three_date_before.strftime("%Y%m%d")
- if flag == "train":
- select_sql = f"""SELECT video_id, video_title, rov_label, channel, out_user_id, spider_mode, out_play_cnt, out_like_cnt, out_share_cnt FROM lightgbm_data WHERE type = 'spider' and daily_dt_str <= '{temp_time}' and rov_label > 0;"""
- des_path = "/root/luojunhui/alg/data/train_data/spider_train_{}.json".format(
- datetime.datetime.today().strftime("%Y%m%d"))
- elif flag == "predict":
- select_sql = f"""SELECT video_id, video_title, rov_label, channel, out_user_id, spider_mode, out_play_cnt, out_like_cnt, out_share_cnt FROM lightgbm_data WHERE type = 'spider' and daily_dt_str = '{temp_time}' and rov_label > 0;"""
- des_path = "/root/luojunhui/alg/data/predict_data/predict_{}.json".format(dt_time.strftime("%Y%m%d"))
- else:
- return
- data_list = self.client_spider.select(select_sql)
- df = []
- for line in tqdm(data_list):
- try:
- temp = list(line)
- video_id = line[0]
- title = line[1]
- lop, duration = self.spider_lop(video_id)
- title_tags = list(jieba.analyse.textrank(title, topK=3))
- temp.append(lop)
- temp.append(duration)
- if title_tags:
- for i in range(3):
- try:
- temp.append(title_tags[i])
- except:
- temp.append(None)
- else:
- temp.append(None)
- temp.append(None)
- temp.append(None)
- df.append(temp)
- except:
- continue
- df = pd.DataFrame(df,
- columns=['video_id', 'video_title', 'label', 'channel', 'out_user_id', 'mode', 'out_play_cnt',
- 'out_like_cnt',
- 'out_share_cnt', 'lop', 'duration', 'tag1', 'tag2', 'tag3'])
- df.to_json(des_path, orient='records')
- class UserProcess(object):
- """
- User Data Process
- """
- def __init__(self):
- self.client_spider = MySQLClientSpider()
- self.user_features = [
- "label",
- "uid",
- "channel",
- "user_fans",
- "user_view_30",
- "user_share_30",
- "user_return_30",
- "user_rov",
- "user_str",
- "user_return_videos_30",
- "user_return_videos_3",
- "user_return_3",
- "user_view_3",
- "user_share_3",
- "address",
- "tag1",
- "tag2",
- "tag3"
- ]
- def generate_user_data(self, flag, dt_time):
- """
- 生成user训练数据
- :return:
- """
- dt_time = datetime.datetime.strptime(dt_time, "%Y%m%d")
- three_date_before = dt_time + datetime.timedelta(days=4)
- temp_time = three_date_before.strftime("%Y%m%d")
- if flag == "train":
- sql = f"""select video_title, label, user_id, channel, user_fans, user_view_30, user_share_30, user_return_30, user_rov, user_str, user_return_videos_30, user_return_videos_3, user_return_3, user_view_3, user_share_3, address from lightgbm_data where type = 'userupload' and daily_dt_str <= '{temp_time}';"""
- des_path = "/root/luojunhui/alg/data/train_data/user_train_{}.json".format(
- datetime.datetime.today().strftime("%Y%m%d"))
- elif flag == "predict":
- sql = f"""select video_title, label, user_id, channel, user_fans, user_view_30, user_share_30, user_return_30, user_rov, user_str, user_return_videos_30, user_return_videos_3, user_return_3, user_view_3, user_share_3, address from lightgbm_data where type = 'userupload' and daily_dt_str = '{temp_time}';"""
- des_path = "/root/luojunhui/alg/data/predict_data/user_predict_{}.json".format(dt_time.strftime("%Y%m%d"))
- else:
- return
- dt_list = self.client_spider.select(sql)
- df = []
- for line in tqdm(dt_list):
- title = line[0]
- temp = list(line)
- title_tags = list(jieba.analyse.textrank(title, topK=3))
- if title_tags:
- for i in range(3):
- try:
- temp.append(title_tags[i])
- except:
- temp.append(None)
- else:
- temp.append(None)
- temp.append(None)
- temp.append(None)
- df.append(temp[1:])
- df = pd.DataFrame(df, columns=self.user_features)
- df['ros_30'] = np.where(df['user_view_30'] != 0, df['user_return_30'] / df['user_share_30'], np.nan)
- df['rov_30'] = np.where(df['user_view_30'] != 0, df['user_return_30'] / df['user_view_30'], np.nan)
- df['ros_3'] = np.where(df['user_view_3'] != 0, df['user_return_3'] / df['user_share_3'], np.nan)
- df['rov_3'] = np.where(df['user_view_3'] != 0, df['user_return_3'] / df['user_view_3'], np.nan)
- df.to_json(des_path, orient='records')
- class AllProcess(object):
- """
- 全部数据
- """
- def __init__(self):
- self.client_spider = MySQLClientSpider()
- self.all_features = [
- # "video_title",
- "rov_label",
- "channel",
- "type",
- # "out_play_cnt",
- # "out_like_cnt",
- # "out_share_cnt"
- "tag1",
- "tag2",
- "tag3"
- ]
- def read_all_data(self, flag, dt_time):
- """
- 生成用户数据
- :param flag: predict/train
- :param dt_time: 时间
- :return:
- """
- dt_time = datetime.datetime.strptime(dt_time, "%Y%m%d")
- three_date_before = dt_time + datetime.timedelta(days=4)
- temp_time = three_date_before.strftime("%Y%m%d")
- if flag == "train":
- sql = f"""select video_title, rov_label, channel, type from lightgbm_data where daily_dt_str <= '{temp_time}' and rov_label > 0;"""
- des_path = "/root/luojunhui/alg/data/train_data/all_train_{}.json".format(
- datetime.datetime.today().strftime("%Y%m%d"))
- elif flag == "predict":
- sql = f"""select video_title, rov_label, channel, type from lightgbm_data where daily_dt_str = '{temp_time}';"""
- des_path = "/root/luojunhui/alg/data/predict_data/all_predict_{}.json".format(dt_time.strftime("%Y%m%d"))
- else:
- return
- dt_list = self.client_spider.select(sql)
- df = []
- for line in tqdm(dt_list):
- title = line[0]
- try:
- title_tags = list(jieba.analyse.textrank(title, topK=3))
- temp = list(line)
- if title_tags:
- for i in range(3):
- try:
- temp.append(title_tags[i])
- except:
- temp.append(None)
- df.append(temp[1:])
- else:
- continue
- except Exception as e:
- print("title is empty\t", e)
- df = pd.DataFrame(df, columns=self.all_features)
- df.to_json(des_path, orient='records')
- if __name__ == '__main__':
- parser = argparse.ArgumentParser() # 新建参数解释器对象
- parser.add_argument("--cate")
- parser.add_argument("--flag")
- parser.add_argument("--dt")
- args = parser.parse_args()
- cate = args.cate
- flag = args.flag
- dt = args.dt
- match cate:
- case "spider":
- S = SpiderProcess()
- S.spider_data_produce(flag=flag, dt_time=dt)
- case "user_info":
- U = UserProcess()
- U.generate_user_data(flag=flag, dt_time=dt)
- case "all":
- A = AllProcess()
- A.read_all_data(flag=flag, dt_time=dt)
|