|
@@ -0,0 +1,96 @@
|
|
|
+"""
|
|
|
+@author: luojunhui
|
|
|
+"""
|
|
|
+import os
|
|
|
+import sys
|
|
|
+import json
|
|
|
+import pymysql
|
|
|
+from tqdm import tqdm
|
|
|
+from concurrent.futures.thread import ThreadPoolExecutor
|
|
|
+
|
|
|
+sys.path.append(os.getcwd())
|
|
|
+
|
|
|
+from functions import PyODPS, generate_hourly_strings, generate_daily_strings
|
|
|
+from functions import MySQLClientSpider
|
|
|
+
|
|
|
+
|
|
|
+def insert_into_db(video_obj):
|
|
|
+ """
|
|
|
+ Insert video into MySQL database
|
|
|
+ :param video_obj:
|
|
|
+ :return:
|
|
|
+ """
|
|
|
+ video_id = video_obj['videoid']
|
|
|
+ title = video_obj['title']
|
|
|
+ recommend_status = video_obj['recommend_status']
|
|
|
+ total_time = video_obj['total_time']
|
|
|
+ width = video_obj['width']
|
|
|
+ height = video_obj['height']
|
|
|
+ app_type = video_obj['app_type'] if video_obj['app_type'] else ''
|
|
|
+ descr = video_obj['descr'] if video_obj['descr'] else ''
|
|
|
+ uid = video_obj['uid']
|
|
|
+ type_ = video_obj['type'] if video_obj['type'] else ''
|
|
|
+ channel = video_obj['channel'] if video_obj['channel'] else ''
|
|
|
+ user_fans = video_obj['user_fans']
|
|
|
+ user_view_30 = video_obj['user_view_cnt_30days']
|
|
|
+ user_share_30 = video_obj['user_share_cnt_30days']
|
|
|
+ user_return_30 = video_obj['user_return_cnt_30days']
|
|
|
+ dt = video_obj['dt']
|
|
|
+ sql = f"""
|
|
|
+ INSERT INTO lightgbm
|
|
|
+ (video_id, title, recommend_status, total_time, width, height, app_type, descr, uid, type, channel, user_fans, user_view_cnt_30days, user_share_cnt_30days, user_return_cnt_30days, dt)
|
|
|
+ values
|
|
|
+ ({video_id}, '{title}', '{recommend_status}', {total_time}, {width}, {height}, '{app_type}', '{descr}', '{uid}', '{type_}', '{channel}', {user_fans}, {user_view_30}, {user_share_30}, {user_return_30}, '{dt}');
|
|
|
+ """
|
|
|
+ # print(sql)
|
|
|
+ connection = pymysql.connect(
|
|
|
+ host="rm-bp1159bu17li9hi94.mysql.rds.aliyuncs.com", # 数据库IP地址,内网地址
|
|
|
+ port=3306, # 端口号
|
|
|
+ user="crawler", # mysql用户名
|
|
|
+ passwd="crawler123456@", # mysql用户登录密码
|
|
|
+ db="piaoquan-crawler", # 数据库名
|
|
|
+ charset="utf8mb4" # 如果数据库里面的文本是utf8编码的,charset指定是utf8
|
|
|
+ )
|
|
|
+ cursor = connection.cursor()
|
|
|
+ try:
|
|
|
+ cursor.execute(sql)
|
|
|
+ connection.commit()
|
|
|
+ except Exception as e:
|
|
|
+ print("{}插入失败, 失败原因是: {}".format(video_obj['video_id'], e))
|
|
|
+ finally:
|
|
|
+ connection.close()
|
|
|
+
|
|
|
+
|
|
|
+class ReadDataFromOdps(object):
|
|
|
+ """
|
|
|
+ read data from ODPS
|
|
|
+ """
|
|
|
+
|
|
|
+ def __init__(self):
|
|
|
+ self.oo = PyODPS()
|
|
|
+ self.spider_ = MySQLClientSpider()
|
|
|
+
|
|
|
+ def read_data_with_multi_threads(self):
|
|
|
+ """
|
|
|
+ 多线程读取 odps,并且异步更新到 mysql 中,作为训练数据
|
|
|
+ :return:
|
|
|
+ """
|
|
|
+ dt_list = generate_daily_strings(start_date="20240301", end_date="20200410")
|
|
|
+ for dt in tqdm(dt_list):
|
|
|
+ daily_info = self.read_daily_data(dt)
|
|
|
+ with ThreadPoolExecutor(max_workers=8) as Pool:
|
|
|
+ Pool.map(insert_into_db, daily_info)
|
|
|
+
|
|
|
+ def read_daily_data(self, dt):
|
|
|
+ """
|
|
|
+ 获取天级别数据
|
|
|
+ :param dt:
|
|
|
+ """
|
|
|
+ sql = f"""select * from loghubods.content_quality_base_eachday where dt = '{dt}';"""
|
|
|
+ daily_data = self.oo.select(sql)
|
|
|
+ return daily_data
|
|
|
+
|
|
|
+
|
|
|
+if __name__ == '__main__':
|
|
|
+ R = ReadDataFromOdps()
|
|
|
+ R.read_data_with_multi_threads()
|