""" @author: luojunhui """ import os import sys import json import pymysql from tqdm import tqdm from concurrent.futures.thread import ThreadPoolExecutor sys.path.append(os.getcwd()) from functions import PyODPS, generate_hourly_strings, generate_daily_strings from functions import MySQLClientSpider def insert_into_db(video_obj): """ Insert video into MySQL database :param video_obj: :return: """ video_id = video_obj['videoid'] title = video_obj['title'] recommend_status = video_obj['recommend_status'] total_time = video_obj['total_time'] width = video_obj['width'] height = video_obj['height'] app_type = video_obj['app_type'] if video_obj['app_type'] else '' descr = video_obj['descr'] if video_obj['descr'] else '' uid = video_obj['uid'] type_ = video_obj['type'] if video_obj['type'] else '' channel = video_obj['channel'] if video_obj['channel'] else '' user_fans = video_obj['user_fans'] user_view_30 = video_obj['user_view_cnt_30days'] user_share_30 = video_obj['user_share_cnt_30days'] user_return_30 = video_obj['user_return_cnt_30days'] dt = video_obj['dt'] sql = f""" INSERT INTO lightgbm (video_id, title, recommend_status, total_time, width, height, app_type, descr, uid, type, channel, user_fans, user_view_cnt_30days, user_share_cnt_30days, user_return_cnt_30days, dt) values ({video_id}, '{title}', '{recommend_status}', {total_time}, {width}, {height}, '{app_type}', '{descr}', '{uid}', '{type_}', '{channel}', {user_fans}, {user_view_30}, {user_share_30}, {user_return_30}, '{dt}'); """ # print(sql) connection = pymysql.connect( host="rm-bp1159bu17li9hi94.mysql.rds.aliyuncs.com", # 数据库IP地址,内网地址 port=3306, # 端口号 user="crawler", # mysql用户名 passwd="crawler123456@", # mysql用户登录密码 db="piaoquan-crawler", # 数据库名 charset="utf8mb4" # 如果数据库里面的文本是utf8编码的,charset指定是utf8 ) cursor = connection.cursor() try: cursor.execute(sql) connection.commit() except Exception as e: print("{}插入失败, 失败原因是: {}".format(video_obj['video_id'], e)) finally: connection.close() class ReadDataFromOdps(object): """ read data from ODPS """ def __init__(self): self.oo = PyODPS() self.spider_ = MySQLClientSpider() def read_data_with_multi_threads(self): """ 多线程读取 odps,并且异步更新到 mysql 中,作为训练数据 :return: """ dt_list = generate_daily_strings(start_date="20240301", end_date="20200410") for dt in tqdm(dt_list): daily_info = self.read_daily_data(dt) with ThreadPoolExecutor(max_workers=8) as Pool: Pool.map(insert_into_db, daily_info) def read_daily_data(self, dt): """ 获取天级别数据 :param dt: """ sql = f"""select * from loghubods.content_quality_base_eachday where dt = '{dt}';""" daily_data = self.oo.select(sql) return daily_data if __name__ == '__main__': R = ReadDataFromOdps() R.read_data_with_multi_threads()