""" @author: luojunhui 迁移rootSourceId """ import json import pymysql import datetime from concurrent.futures.thread import ThreadPoolExecutor source_list = [ "touliu_tencentGzhArticle_cc284926a7d1c19f9a4e6abe5520468b", "touliu_tencentGzhArticle_2e4c21de3707f3b368b0cc4500d120f0", "touliu_tencentGzhArticle_a18c11dd294df014334f7db72830221a", "touliu_tencentGzhArticle_c2debdc233827497e24b633dea36c57c", "touliu_tencentGzhArticle_d66796826916665a23c667472ef4dd56", "touliu_tencentGzhArticle_f8e97355f3687f57fd4efeb635a7a3a2" ] source_id_list = { 'longArticles_2d311f88a9c1bd5a90ce88339ae93e78': 1, 'longArticles_8d9fd0553c988e7a6bf3a6198f78d890': 1, 'longArticles_99763b3ad92c781194dbd3eb3321542c': 1, 'longArticles_2a27f501ef0d758c35dd3b70cf3bbfa3': 1, "touliu_tencentGzhArticle_cc284926a7d1c19f9a4e6abe5520468b": 1, "touliu_tencentGzhArticle_2e4c21de3707f3b368b0cc4500d120f0": 1, "touliu_tencentGzhArticle_a18c11dd294df014334f7db72830221a": 1, "touliu_tencentGzhArticle_c2debdc233827497e24b633dea36c57c": 1, "touliu_tencentGzhArticle_d66796826916665a23c667472ef4dd56": 1, "touliu_tencentGzhArticle_f8e97355f3687f57fd4efeb635a7a3a2": 1, "touliu_tencentGzhArticle_gh_68e7fdc09fe4_90bb12e53f6628fd5330310c7c3cc344": 1, "touliu_tencentGzhArticle_gh_68e7fdc09fe4_cd602a61ea073e41404572fce51eb297": 1, "touliu_tencentGzhArticle_gh_68e7fdc09fe4_d8fca9b2712f829d625d98bec37db228": 1, "touliu_tencentGzhArticle_gh_77f36c109fb1_1401a97f6537f32b14496cd5fe6caa70": 1, "touliu_tencentGzhArticle_gh_77f36c109fb1_926713998cd1513370b910ba20adda44": 1, "touliu_tencentGzhArticle_gh_77f36c109fb1_4ca7c1c6223501ff4f80913f8363309f": 1 } def get_data_list(request_time_stamp): """ 获取数据 :param request_time_stamp: :return: """ connection = pymysql.connect( host="rm-bp1159bu17li9hi94.mysql.rds.aliyuncs.com", port=3306, user="crawler", passwd="crawler123456@", db="piaoquan-crawler", charset="utf8mb4" ) sql = f""" select trace_id, gh_id, account_name, article_title, result1, result2, result3, request_time_stamp from long_articles_video where %s < request_time_stamp < %s; """ cursor = connection.cursor() cursor.execute( sql, (request_time_stamp - 24 * 60 * 60, request_time_stamp) ) print(request_time_stamp) data = cursor.fetchall() return data def process_each_data(data_tuple): """ 处理数据 :param data_tuple: :return: """ trace_id = data_tuple[0] gh_id = data_tuple[1] account_name = data_tuple[2] title = data_tuple[3] result_1 = data_tuple[4] result_2 = data_tuple[5] result_3 = data_tuple[6] request_time_stamp = data_tuple[7] result_list = [result_1, result_2, result_3] for result in result_list: if result: connection = pymysql.connect( host="rm-bp1159bu17li9hi94.mysql.rds.aliyuncs.com", port=3306, user="crawler", passwd="crawler123456@", db="piaoquan-crawler", charset="utf8mb4" ) source_id = json.loads(result)['productionPath'].split("rootSourceId%3D")[1] video_id = json.loads(result)['productionPath'].split("videos%3Fid%3D")[1].split("%26su%")[0] sql = f""" INSERT INTO long_articles_root_source_id (rootSourceId, accountName, ghId, articleTitle, requestTime, trace_id, push_type, video_id) values (%s, %s, %s, %s, %s, %s, %s, %s); """ cursor = connection.cursor() cursor.execute( sql, ( source_id, account_name, gh_id, title, request_time_stamp, trace_id, source_id_list.get(source_id, 2), video_id ) ) connection.commit() else: print("No result") def source_id_job(): """ 执行代码 :return: """ today_string = datetime.datetime.today().strftime("%Y-%m-%d") time_stamp = datetime.datetime.strptime(today_string, '%Y-%m-%d').timestamp() data_list = get_data_list(int(time_stamp)) with ThreadPoolExecutor(max_workers=10) as Pool: Pool.map(process_each_data, data_list) # if __name__ == '__main__': # schedule.every().day.at("01:00").do(job) # while True: # schedule.run_pending() # time.sleep(1)