12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697 |
- """
- @author: luojunhui
- """
- import os
- import sys
- import json
- import pymysql
- from tqdm import tqdm
- from concurrent.futures.thread import ThreadPoolExecutor
- sys.path.append(os.getcwd())
- from functions import PyODPS, generate_hourly_strings, generate_daily_strings
- from functions import MySQLClientSpider
- def insert_into_db(video_obj):
- """
- Insert video into MySQL database
- :param video_obj:
- :return:
- """
- video_id = video_obj['videoid']
- title = video_obj['title']
- recommend_status = video_obj['recommend_status']
- total_time = video_obj['total_time']
- width = video_obj['width']
- height = video_obj['height']
- app_type = video_obj['app_type'] if video_obj['app_type'] else ''
- descr = video_obj['descr'] if video_obj['descr'] else ''
- uid = video_obj['uid']
- type_ = video_obj['type'] if video_obj['type'] else ''
- channel = video_obj['channel'] if video_obj['channel'] else ''
- user_fans = video_obj['user_fans']
- user_view_30 = video_obj['user_view_cnt_30days']
- user_share_30 = video_obj['user_share_cnt_30days']
- user_return_30 = video_obj['user_return_cnt_30days']
- dt = video_obj['dt']
- sql = f"""
- INSERT INTO lightgbm
- (video_id, title, recommend_status, total_time, width, height, app_type, descr, uid, type, channel, user_fans, user_view_cnt_30days, user_share_cnt_30days, user_return_cnt_30days, dt)
- values
- ({video_id}, '{title}', '{recommend_status}', {total_time}, {width}, {height}, '{app_type}', '{descr}', '{uid}', '{type_}', '{channel}', {user_fans}, {user_view_30}, {user_share_30}, {user_return_30}, '{dt}');
- """
- # print(sql)
- connection = pymysql.connect(
- host="rm-bp1159bu17li9hi94.mysql.rds.aliyuncs.com", # 数据库IP地址,内网地址
- port=3306, # 端口号
- user="crawler", # mysql用户名
- passwd="crawler123456@", # mysql用户登录密码
- db="piaoquan-crawler", # 数据库名
- charset="utf8mb4" # 如果数据库里面的文本是utf8编码的,charset指定是utf8
- )
- cursor = connection.cursor()
- try:
- cursor.execute(sql)
- connection.commit()
- except Exception as e:
- print("{}插入失败, 失败原因是: {}".format(video_obj['video_id'], e))
- finally:
- connection.close()
- class ReadDataFromOdps(object):
- """
- read data from ODPS
- """
- def __init__(self):
- self.oo = PyODPS()
- self.spider_ = MySQLClientSpider()
- def read_data_with_multi_threads(self):
- """
- 多线程读取 odps,并且异步更新到 mysql 中,作为训练数据
- :return:
- """
- dt_list = generate_daily_strings(start_date="20240302", end_date="20240410")
- print(dt_list)
- for dt in tqdm(dt_list):
- daily_info = self.read_daily_data(dt)
- with ThreadPoolExecutor(max_workers=8) as Pool:
- Pool.map(insert_into_db, daily_info)
- def read_daily_data(self, dt):
- """
- 获取天级别数据
- :param dt:
- """
- sql = f"""select * from loghubods.content_quality_base_eachday where dt = '{dt}';"""
- daily_data = self.oo.select(sql)
- return daily_data
- if __name__ == '__main__':
- R = ReadDataFromOdps()
- R.read_data_with_multi_threads()
|