123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157 |
- """
- process the data to satisfy the lightgbm
- """
- import sys
- import os
- import json
- import argparse
- from tqdm import tqdm
- sys.path.append(os.getcwd())
- from functions import generate_label_date, MysqlClient, MySQLClientSpider
- class DataProcessor(object):
- """
- Insert some information to lightgbm_data
- """
- def __init__(self, ):
- self.client = MysqlClient()
- self.client_spider = MySQLClientSpider()
- self.label_data = {}
- def update_label(self, start_date, end_date):
- """
- 生成数据
- :return:none
- """
- # 把 label, video_title, daily_dt_str, 存储到 mysql 数据库中去
- label_path = "/root/luojunhui/alg/data/train_data/daily-label-{}-{}.json".format(start_date, end_date)
- with open(label_path, encoding="utf-8") as f:
- self.label_data = json.loads(f.read())
- def read_title(client, video_id):
- """
- read_title_from mysql
- """
- sql = f"""SELECT title from wx_video where id = {video_id};"""
- # print("title", sql)
- try:
- title = client.select(sql)[0][0]
- return title.strip()
- except Exception as e:
- print(video_id, "\t", e)
- return ""
- def generate_label(video_id, hourly_dt_str, label_info):
- """
- generate label daily_dt_str for mysql
- :param label_info:
- :param video_id:
- :param hourly_dt_str:
- :return: label, daily_dt_str
- """
- label_dt = generate_label_date(hourly_dt_str)
- label_obj = label_info.get(label_dt, {}).get(video_id)
- if label_obj:
- total_return = label_obj.get('flowpool_return_users', 0)
- total_view = label_obj.get('flowpool_distribute_view_times', 0)
- if total_view == 0:
- label = None
- else:
- label = total_return / total_view
- else:
- label = None
- return label, label_dt
- def process_info(item_):
- """
- Insert data into MySql
- :param item_:
- """
- video_id, hour_dt = item_
- # print(type(video_id))
- label_info = self.label_data
- # title = read_title(client=self.client, video_id=video_id)
- label, dt_daily = generate_label(str(video_id), hour_dt, label_info)
- insert_sql = f"""UPDATE lightgbm_data set rov_label = '{label}', daily_dt_str = '{dt_daily}' where video_id = '{video_id}';"""
- # print(insert_sql)
- self.client_spider.update(insert_sql)
- select_sql = "SELECT video_id, hour_dt_str FROM lightgbm_data;"
- init_data_tuple = self.client_spider.select(select_sql)
- init_list = list(init_data_tuple)
- for item in tqdm(init_list):
- try:
- process_info(item)
- except Exception as e:
- print("操作失败", e)
- def update_user_info(self, start_date, end_date):
- """
- 把补充的 user_info更新到 mysql 中、
- 把 user_return_3, user_view_3, user_share_3
- user_return_videos_3, user_return_videos_30
- address 存储到 mysql 数据库中
- :return:
- """
- user_path = '/root/luojunhui/alg/data/train_data/daily-user-info-{}-{}.json'.format(start_date, end_date)
- with open(user_path) as f:
- data = json.loads(f.read())
- sql = "select video_id, hour_dt_str from lightgbm_data where type = 'userupload' and address is NULL;"
- dt_list = self.client_spider.select(sql)
- for item in tqdm(dt_list):
- video_id, dt_info = item
- dt_info = dt_info[:8]
- user_info_obj = data.get(dt_info, {}).get(str(video_id))
- if user_info_obj:
- try:
- video_id = user_info_obj['video_id']
- address = user_info_obj['address']
- return_3 = user_info_obj['return_3days']
- view_3 = user_info_obj['view_3days']
- share_3 = user_info_obj['share_3days']
- return_videos_3 = user_info_obj['3day_return_500_videos']
- return_videos_30 = user_info_obj['30day_return_2000_videos']
- update_sql = f"""UPDATE lightgbm_data set address='{address}', user_return_3={return_3}, user_view_3={view_3}, user_share_3={share_3}, user_return_videos_3={return_videos_3}, user_return_videos_30={return_videos_30} where video_id = '{video_id}';"""
- self.client_spider.update(update_sql)
- except Exception as e:
- print(e)
- pass
- else:
- print("No user info")
- if __name__ == "__main__":
- parser = argparse.ArgumentParser() # 新建参数解释器对象
- parser.add_argument("--param")
- args = parser.parse_args()
- param = args.param
- D = DataProcessor()
- match param:
- case "label":
- sd = str(input("输入label日级表开始日期,格式为 YYYYmmdd"))
- ed = str(input("输入label日级表结束日期,格式为 YYYYmmdd"))
- D.update_label(start_date=sd, end_date=ed)
- case "user_info":
- sd = str(input("输入label日级表开始日期,格式为 YYYYmmdd"))
- ed = str(input("输入label日级表结束日期,格式为 YYYYmmdd"))
- D.update_user_info(start_date=sd, end_date=ed)
- # case "spider":
- # S = SpiderProcess()
- # S.spider_data_produce(flag=mode, dt_time=dt)
- # case "user":
- # U = UserProcess()
- # if mode == "generate":
- # sd = str(input("输入开始日期,格式为 YYYYmmdd"))
- # ed = str(input("输入结束日期,格式为 YYYYmmdd"))
- # U.userinfo_to_mysql(start_date=sd, end_date=ed)
- # else:
- # U.generate_user_data(flag=mode, dt_time=dt)
|