123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136 |
- """
- @author: luojunhui
- 迁移rootSourceId
- """
- import json
- import pymysql
- import datetime
- from concurrent.futures.thread import ThreadPoolExecutor
- source_list = [
- "touliu_tencentGzhArticle_cc284926a7d1c19f9a4e6abe5520468b",
- "touliu_tencentGzhArticle_2e4c21de3707f3b368b0cc4500d120f0",
- "touliu_tencentGzhArticle_a18c11dd294df014334f7db72830221a",
- "touliu_tencentGzhArticle_c2debdc233827497e24b633dea36c57c",
- "touliu_tencentGzhArticle_d66796826916665a23c667472ef4dd56",
- "touliu_tencentGzhArticle_f8e97355f3687f57fd4efeb635a7a3a2"
- ]
- source_id_list = {
- 'longArticles_2d311f88a9c1bd5a90ce88339ae93e78': 1,
- 'longArticles_8d9fd0553c988e7a6bf3a6198f78d890': 1,
- 'longArticles_99763b3ad92c781194dbd3eb3321542c': 1,
- 'longArticles_2a27f501ef0d758c35dd3b70cf3bbfa3': 1,
- "touliu_tencentGzhArticle_cc284926a7d1c19f9a4e6abe5520468b": 1,
- "touliu_tencentGzhArticle_2e4c21de3707f3b368b0cc4500d120f0": 1,
- "touliu_tencentGzhArticle_a18c11dd294df014334f7db72830221a": 1,
- "touliu_tencentGzhArticle_c2debdc233827497e24b633dea36c57c": 1,
- "touliu_tencentGzhArticle_d66796826916665a23c667472ef4dd56": 1,
- "touliu_tencentGzhArticle_f8e97355f3687f57fd4efeb635a7a3a2": 1,
- "touliu_tencentGzhArticle_gh_68e7fdc09fe4_90bb12e53f6628fd5330310c7c3cc344": 1,
- "touliu_tencentGzhArticle_gh_68e7fdc09fe4_cd602a61ea073e41404572fce51eb297": 1,
- "touliu_tencentGzhArticle_gh_68e7fdc09fe4_d8fca9b2712f829d625d98bec37db228": 1,
- "touliu_tencentGzhArticle_gh_77f36c109fb1_1401a97f6537f32b14496cd5fe6caa70": 1,
- "touliu_tencentGzhArticle_gh_77f36c109fb1_926713998cd1513370b910ba20adda44": 1,
- "touliu_tencentGzhArticle_gh_77f36c109fb1_4ca7c1c6223501ff4f80913f8363309f": 1
- }
- def get_data_list(request_time_stamp):
- """
- 获取数据
- :param request_time_stamp:
- :return:
- """
- connection = pymysql.connect(
- host="rm-bp1159bu17li9hi94.mysql.rds.aliyuncs.com",
- port=3306,
- user="crawler",
- passwd="crawler123456@",
- db="piaoquan-crawler",
- charset="utf8mb4"
- )
- sql = f"""
- select trace_id, gh_id, account_name, article_title, result1, result2, result3, request_time_stamp
- from long_articles_video where %s < request_time_stamp < %s;
- """
- cursor = connection.cursor()
- cursor.execute(
- sql,
- (request_time_stamp - 24 * 60 * 60, request_time_stamp)
- )
- print(request_time_stamp)
- data = cursor.fetchall()
- return data
- def process_each_data(data_tuple):
- """
- 处理数据
- :param data_tuple:
- :return:
- """
- trace_id = data_tuple[0]
- gh_id = data_tuple[1]
- account_name = data_tuple[2]
- title = data_tuple[3]
- result_1 = data_tuple[4]
- result_2 = data_tuple[5]
- result_3 = data_tuple[6]
- request_time_stamp = data_tuple[7]
- result_list = [result_1, result_2, result_3]
- for result in result_list:
- if result:
- connection = pymysql.connect(
- host="rm-bp1159bu17li9hi94.mysql.rds.aliyuncs.com",
- port=3306,
- user="crawler",
- passwd="crawler123456@",
- db="piaoquan-crawler",
- charset="utf8mb4"
- )
- source_id = json.loads(result)['productionPath'].split("rootSourceId%3D")[1]
- video_id = json.loads(result)['productionPath'].split("videos%3Fid%3D")[1].split("%26su%")[0]
- sql = f"""
- INSERT INTO long_articles_root_source_id
- (rootSourceId, accountName, ghId, articleTitle, requestTime, trace_id, push_type, video_id)
- values
- (%s, %s, %s, %s, %s, %s, %s, %s);
- """
- cursor = connection.cursor()
- cursor.execute(
- sql,
- (
- source_id,
- account_name,
- gh_id,
- title,
- request_time_stamp,
- trace_id,
- source_id_list.get(source_id, 2),
- video_id
- )
- )
- connection.commit()
- else:
- print("No result")
- def source_id_job():
- """
- 执行代码
- :return:
- """
- today_string = datetime.datetime.today().strftime("%Y-%m-%d")
- time_stamp = datetime.datetime.strptime(today_string, '%Y-%m-%d').timestamp()
- data_list = get_data_list(int(time_stamp))
- with ThreadPoolExecutor(max_workers=10) as Pool:
- Pool.map(process_each_data, data_list)
- # if __name__ == '__main__':
- # schedule.every().day.at("01:00").do(job)
- # while True:
- # schedule.run_pending()
- # time.sleep(1)
|