|
@@ -1,321 +0,0 @@
|
|
|
-"""
|
|
|
-process the data to satisfy the lightgbm
|
|
|
-"""
|
|
|
-import datetime
|
|
|
-import sys
|
|
|
-import os
|
|
|
-import json
|
|
|
-import asyncio
|
|
|
-import argparse
|
|
|
-import time
|
|
|
-import numpy as np
|
|
|
-
|
|
|
-from tqdm import tqdm
|
|
|
-import jieba.analyse
|
|
|
-import pandas as pd
|
|
|
-
|
|
|
-sys.path.append(os.getcwd())
|
|
|
-
|
|
|
-from functions import generate_label_date, generate_daily_strings, MysqlClient, MySQLClientSpider
|
|
|
-
|
|
|
-
|
|
|
-class DataProcessor(object):
|
|
|
- """
|
|
|
- Insert some information to lightgbm_data
|
|
|
- """
|
|
|
-
|
|
|
- def __init__(self, ):
|
|
|
- self.client = MysqlClient()
|
|
|
- self.client_spider = MySQLClientSpider()
|
|
|
- self.label_data = {}
|
|
|
-
|
|
|
- def producer(self):
|
|
|
- """
|
|
|
- 生成数据
|
|
|
- :return:none
|
|
|
- """
|
|
|
- # 把 label, video_title, daily_dt_str, 存储到 mysql 数据库中去
|
|
|
- label_path = "data/train_data/daily-label-20240326-20240331.json"
|
|
|
- with open(label_path, encoding="utf-8") as f:
|
|
|
- self.label_data = json.loads(f.read())
|
|
|
-
|
|
|
- def read_title(client, video_id):
|
|
|
- """
|
|
|
- read_title_from mysql
|
|
|
- """
|
|
|
- sql = f"""SELECT title from wx_video where id = {video_id};"""
|
|
|
- # print("title", sql)
|
|
|
- try:
|
|
|
- title = client.select(sql)[0][0]
|
|
|
- return title.strip()
|
|
|
- except Exception as e:
|
|
|
- print(video_id, "\t", e)
|
|
|
- return ""
|
|
|
-
|
|
|
- def generate_label(video_id, hourly_dt_str, label_info):
|
|
|
- """
|
|
|
- generate label daily_dt_str for mysql
|
|
|
- :param label_info:
|
|
|
- :param video_id:
|
|
|
- :param hourly_dt_str:
|
|
|
- :return: label, daily_dt_str
|
|
|
- """
|
|
|
- label_dt = generate_label_date(hourly_dt_str)
|
|
|
- label_obj = label_info.get(label_dt, {}).get(video_id)
|
|
|
- if label_obj:
|
|
|
- label = int(label_obj["total_return"]) if label_obj["total_return"] else 0
|
|
|
- # print(label)
|
|
|
- else:
|
|
|
- label = 0
|
|
|
- return label, label_dt
|
|
|
-
|
|
|
- def process_info(item_):
|
|
|
- """
|
|
|
- Insert data into MySql
|
|
|
- :param item_:
|
|
|
- """
|
|
|
- video_id, hour_dt = item_
|
|
|
- # print(type(video_id))
|
|
|
- label_info = self.label_data
|
|
|
- title = read_title(client=self.client, video_id=video_id)
|
|
|
- label, dt_daily = generate_label(str(video_id), hour_dt, label_info)
|
|
|
-
|
|
|
- insert_sql = f"""UPDATE lightgbm_data set video_title = '{title}', label = '{label}', daily_dt_str = '{dt_daily}' where video_id = '{video_id}';"""
|
|
|
- # print(insert_sql)
|
|
|
- self.client_spider.update(insert_sql)
|
|
|
-
|
|
|
- select_sql = "SELECT video_id, hour_dt_str FROM lightgbm_data where label is NULL and hour_dt_str >= '2024032700';"
|
|
|
- init_data_tuple = self.client_spider.select(select_sql)
|
|
|
- init_list = list(init_data_tuple)
|
|
|
- for item in tqdm(init_list):
|
|
|
- try:
|
|
|
- process_info(item)
|
|
|
- except Exception as e:
|
|
|
- print("操作失败", e)
|
|
|
-
|
|
|
-
|
|
|
-class SpiderProcess(object):
|
|
|
- """
|
|
|
- Spider Data Process and Process data for lightgbm training
|
|
|
- """
|
|
|
-
|
|
|
- def __init__(self):
|
|
|
- self.client_spider = MySQLClientSpider()
|
|
|
- self.spider_features = [
|
|
|
- "channel",
|
|
|
- "out_user_id",
|
|
|
- "mode",
|
|
|
- "out_play_cnt",
|
|
|
- "out_like_cnt",
|
|
|
- "out_share_cnt"
|
|
|
- ]
|
|
|
-
|
|
|
- def spider_lop(self, video_id):
|
|
|
- """
|
|
|
- Spider lop = like / play
|
|
|
- :param video_id:
|
|
|
- :return:
|
|
|
- """
|
|
|
- sql = f"""SELECT like_cnt, play_cnt, duration from crawler_video where video_id = '{video_id}';"""
|
|
|
- try:
|
|
|
- like_cnt, play_cnt, duration = self.client_spider.select(sql)[0]
|
|
|
- lop = (like_cnt + 700) / (play_cnt + 18000)
|
|
|
- return lop, duration
|
|
|
- except Exception as e:
|
|
|
- print(video_id, "\t", e)
|
|
|
- return 0, 0
|
|
|
-
|
|
|
- def spider_data_produce(self, flag, dt_time=None):
|
|
|
- """
|
|
|
- 把 spider_duration 存储到数据库中
|
|
|
- :return:
|
|
|
- """
|
|
|
- if flag == "train":
|
|
|
- select_sql = "SELECT video_id, video_title, label, channel, out_user_id, spider_mode, out_play_cnt, out_like_cnt, out_share_cnt FROM lightgbm_data WHERE type = 'spider' order by daily_dt_str;"
|
|
|
- des_path = "data/train_data/spider_train_{}".format(datetime.datetime.today().strftime("%Y%m%d"))
|
|
|
- elif flag == "predict":
|
|
|
- dt_time = datetime.datetime.strptime(dt_time, "%Y%m%d")
|
|
|
- three_date_before = dt_time + datetime.timedelta(days=4)
|
|
|
- temp_time = three_date_before.strftime("%Y%m%d")
|
|
|
- select_sql = f"""SELECT video_id, video_title, label, channel, out_user_id, spider_mode, out_play_cnt, out_like_cnt, out_share_cnt FROM lightgbm_data WHERE type = 'spider' and daily_dt_str = '{temp_time}';"""
|
|
|
- print(select_sql)
|
|
|
- des_path = "data/predict_data/predict_{}.json".format(dt_time.strftime("%Y%m%d"))
|
|
|
- else:
|
|
|
- return
|
|
|
- data_list = self.client_spider.select(select_sql)
|
|
|
- df = []
|
|
|
- for line in tqdm(data_list):
|
|
|
- try:
|
|
|
- temp = list(line)
|
|
|
- video_id = line[0]
|
|
|
- title = line[1]
|
|
|
- lop, duration = self.spider_lop(video_id)
|
|
|
- title_tags = list(jieba.analyse.textrank(title, topK=3))
|
|
|
- temp.append(lop)
|
|
|
- temp.append(duration)
|
|
|
- if title_tags:
|
|
|
- for i in range(3):
|
|
|
- try:
|
|
|
- temp.append(title_tags[i])
|
|
|
- except:
|
|
|
- temp.append(None)
|
|
|
- else:
|
|
|
- temp.append(None)
|
|
|
- temp.append(None)
|
|
|
- temp.append(None)
|
|
|
-
|
|
|
- df.append(temp[2:])
|
|
|
- except:
|
|
|
- continue
|
|
|
- df = pd.DataFrame(df, columns=['label', 'channel', 'out_user_id', 'mode', 'out_play_cnt', 'out_like_cnt',
|
|
|
- 'out_share_cnt', 'lop', 'duration', 'tag1', 'tag2', 'tag3'])
|
|
|
-
|
|
|
- df.to_json(des_path, orient='records')
|
|
|
-
|
|
|
-
|
|
|
-class UserProcess(object):
|
|
|
- """
|
|
|
- User Data Process
|
|
|
- """
|
|
|
-
|
|
|
- def __init__(self):
|
|
|
- self.client_spider = MySQLClientSpider()
|
|
|
- self.user_features = [
|
|
|
- "label",
|
|
|
- "uid",
|
|
|
- "channel",
|
|
|
- "user_fans",
|
|
|
- "user_view_30",
|
|
|
- "user_share_30",
|
|
|
- "user_return_30",
|
|
|
- "user_rov",
|
|
|
- "user_str",
|
|
|
- "user_return_videos_30",
|
|
|
- "user_return_videos_3",
|
|
|
- "user_return_3",
|
|
|
- "user_view_3",
|
|
|
- "user_share_3",
|
|
|
- "address",
|
|
|
- "tag1",
|
|
|
- "tag2",
|
|
|
- "tag3"
|
|
|
- ]
|
|
|
-
|
|
|
- def userinfo_to_mysql(self, start_date, end_date):
|
|
|
- """
|
|
|
- 把 user_return_3, user_view_3, user_share_3
|
|
|
- user_return_videos_3, user_return_videos_30
|
|
|
- address 存储到 mysql 数据库中
|
|
|
- :return:
|
|
|
- """
|
|
|
- user_path = 'data/train_data/daily-user-info-{}-{}.json'.format(start_date, end_date)
|
|
|
- with open(user_path) as f:
|
|
|
- data = json.loads(f.read())
|
|
|
- sql = "select video_id, hour_dt_str from lightgbm_data where type = 'userupload' and address is NULL;"
|
|
|
- dt_list = self.client_spider.select(sql)
|
|
|
- for item in tqdm(dt_list):
|
|
|
- video_id, dt = item
|
|
|
- dt = dt[:8]
|
|
|
- user_info_obj = data.get(dt, {}).get(str(video_id))
|
|
|
- if user_info_obj:
|
|
|
- try:
|
|
|
- video_id = user_info_obj['video_id']
|
|
|
- address = user_info_obj['address']
|
|
|
- return_3 = user_info_obj['return_3days']
|
|
|
- view_3 = user_info_obj['view_3days']
|
|
|
- share_3 = user_info_obj['share_3days']
|
|
|
- return_videos_3 = user_info_obj['3day_return_500_videos']
|
|
|
- return_videos_30 = user_info_obj['30day_return_2000_videos']
|
|
|
- update_sql = f"""UPDATE lightgbm_data set address='{address}', user_return_3={return_3}, user_view_3={view_3}, user_share_3={share_3}, user_return_videos_3={return_videos_3}, user_return_videos_30={return_videos_30} where video_id = '{video_id}';"""
|
|
|
- self.client_spider.update(update_sql)
|
|
|
- except Exception as e:
|
|
|
- print(e)
|
|
|
- pass
|
|
|
- else:
|
|
|
- print("No user info")
|
|
|
-
|
|
|
- def generate_user_data(self, flag, dt_time=None):
|
|
|
- """
|
|
|
- 生成user训练数据
|
|
|
- :return:
|
|
|
- """
|
|
|
- dt_time = datetime.datetime.strptime(dt_time, "%Y%m%d")
|
|
|
- three_date_before = dt_time + datetime.timedelta(days=4)
|
|
|
- temp_time = three_date_before.strftime("%Y%m%d")
|
|
|
- if flag == "train":
|
|
|
- sql = "select video_title, label, user_id, channel, user_fans, user_view_30, user_share_30, user_return_30, user_rov, user_str, user_return_videos_30, user_return_videos_3, user_return_3, user_view_3, user_share_3, address from lightgbm_data where type = 'userupload' and daily_dt_str >= '20240305';"
|
|
|
- des_path = "data/train_data/user_train_{}.json".format(datetime.datetime.today().strftime("%Y%m%d"))
|
|
|
- elif flag == "predict":
|
|
|
- sql = f"""select video_title, label, user_id, channel, user_fans, user_view_30, user_share_30, user_return_30, user_rov, user_str, user_return_videos_30, user_return_videos_3, user_return_3, user_view_3, user_share_3, address from lightgbm_data where type = 'userupload' and daily_dt_str = '{temp_time}';"""
|
|
|
- des_path = "data/predict_data/user_predict_{}.json".format(dt_time.strftime("%Y%m%d"))
|
|
|
- else:
|
|
|
- return
|
|
|
- dt_list = self.client_spider.select(sql)
|
|
|
- df = []
|
|
|
- for line in tqdm(dt_list):
|
|
|
- title = line[0]
|
|
|
- temp = list(line)
|
|
|
- title_tags = list(jieba.analyse.textrank(title, topK=3))
|
|
|
- if title_tags:
|
|
|
- for i in range(3):
|
|
|
- try:
|
|
|
- temp.append(title_tags[i])
|
|
|
- except:
|
|
|
- temp.append(None)
|
|
|
- else:
|
|
|
- temp.append(None)
|
|
|
- temp.append(None)
|
|
|
- temp.append(None)
|
|
|
-
|
|
|
- df.append(temp[1:])
|
|
|
- df = pd.DataFrame(df, columns=self.user_features)
|
|
|
- df['ros_30'] = np.where(df['user_view_30'] != 0, df['user_return_30'] / df['user_share_30'], np.nan)
|
|
|
- df['rov_30'] = np.where(df['user_view_30'] != 0, df['user_return_30'] / df['user_view_30'], np.nan)
|
|
|
- df['ros_3'] = np.where(df['user_view_3'] != 0, df['user_return_3'] / df['user_share_3'], np.nan)
|
|
|
- df['rov_3'] = np.where(df['user_view_3'] != 0, df['user_return_3'] / df['user_view_3'], np.nan)
|
|
|
- df.to_json(des_path, orient='records')
|
|
|
-
|
|
|
-
|
|
|
-if __name__ == "__main__":
|
|
|
- parser = argparse.ArgumentParser() # 新建参数解释器对象
|
|
|
- parser.add_argument("--mode")
|
|
|
- parser.add_argument("--de")
|
|
|
- parser.add_argument("--dt")
|
|
|
- args = parser.parse_args()
|
|
|
- mode = args.mode
|
|
|
- D = args.de
|
|
|
- dt = args.dt
|
|
|
- match D:
|
|
|
- case "spider":
|
|
|
- S = SpiderProcess()
|
|
|
- S.spider_data_produce(flag=mode, dt_time=dt)
|
|
|
- case "user":
|
|
|
- U = UserProcess()
|
|
|
- if mode == "generate":
|
|
|
- sd = str(input("输入开始日期,格式为 YYYYmmdd"))
|
|
|
- ed = str(input("输入结束日期,格式为 YYYYmmdd"))
|
|
|
- U.userinfo_to_mysql(start_date=sd, end_date=ed)
|
|
|
- else:
|
|
|
- U.generate_user_data(flag=mode, dt_time=dt)
|
|
|
- # else:
|
|
|
- # print("Error")
|
|
|
- case "Data":
|
|
|
- D = DataProcessor()
|
|
|
- D.producer()
|
|
|
-
|
|
|
- # if mode == "train":
|
|
|
- # print("Loading data and process for training.....")
|
|
|
- # D = DataProcessor(flag="train", ll=category)
|
|
|
- # D.producer("whole")
|
|
|
- # elif mode == "predict":
|
|
|
- # print("Loading data and process for prediction for each day......")
|
|
|
- # D = DataProcessor(flag="predict", ll=category)
|
|
|
- # if dtype == "single":
|
|
|
- # date_str = str(input("Please enter the date of the prediction"))
|
|
|
- # D.producer(date_str)
|
|
|
- # elif dtype == "days":
|
|
|
- # start_date_str = str(input("Please enter the start date of the prediction"))
|
|
|
- # end_date_str = str(input("Please enter the end date of the prediction"))
|
|
|
- # dt_list = generate_daily_strings(start_date=start_date_str, end_date=end_date_str)
|
|
|
- # for d in dt_list:
|
|
|
- # D.producer()
|