""" @author: luojunhui 迁移rootSourceId """ import json import time import datetime import schedule from tqdm import tqdm from applications import Functions, PQMySQL, log class UpdateRootSourceId(object): """ 更新 rootSourceId """ db_client = PQMySQL() source_id_list = { 'longArticles_2d311f88a9c1bd5a90ce88339ae93e78': 1, 'longArticles_8d9fd0553c988e7a6bf3a6198f78d890': 1, 'longArticles_99763b3ad92c781194dbd3eb3321542c': 1, 'longArticles_2a27f501ef0d758c35dd3b70cf3bbfa3': 1, "touliu_tencentGzhArticle_cc284926a7d1c19f9a4e6abe5520468b": 1, "touliu_tencentGzhArticle_2e4c21de3707f3b368b0cc4500d120f0": 1, "touliu_tencentGzhArticle_a18c11dd294df014334f7db72830221a": 1, "touliu_tencentGzhArticle_c2debdc233827497e24b633dea36c57c": 1, "touliu_tencentGzhArticle_d66796826916665a23c667472ef4dd56": 1, "touliu_tencentGzhArticle_f8e97355f3687f57fd4efeb635a7a3a2": 1, "touliu_tencentGzhArticle_gh_68e7fdc09fe4_90bb12e53f6628fd5330310c7c3cc344": 1, "touliu_tencentGzhArticle_gh_68e7fdc09fe4_cd602a61ea073e41404572fce51eb297": 1, "touliu_tencentGzhArticle_gh_68e7fdc09fe4_d8fca9b2712f829d625d98bec37db228": 1, "touliu_tencentGzhArticle_gh_77f36c109fb1_1401a97f6537f32b14496cd5fe6caa70": 1, "touliu_tencentGzhArticle_gh_77f36c109fb1_926713998cd1513370b910ba20adda44": 1, "touliu_tencentGzhArticle_gh_77f36c109fb1_4ca7c1c6223501ff4f80913f8363309f": 1 } @classmethod def getDataList(cls, request_time_stamp): """ :param request_time_stamp: :return: """ start_dt = request_time_stamp - 1 * 24 * 3600 sql = f""" select trace_id, gh_id, account_name, article_title, result1, result2, result3, request_time_stamp from long_articles_video where request_time_stamp > {start_dt} and request_time_stamp < {request_time_stamp} and content_status = 2; """ result = cls.db_client.select(sql) log( task="migrateRootSourceId", function="getDataList", message="一共找到了: {} 条记录".format(len(result)) ) return result @classmethod def processEachData(cls, data_tuple): """ 处理数据 :param data_tuple: :return: """ trace_id = data_tuple[0] gh_id = data_tuple[1] account_name = data_tuple[2] title = data_tuple[3] result_1 = data_tuple[4] result_2 = data_tuple[5] result_3 = data_tuple[6] request_time_stamp = data_tuple[7] result_list = [result_1, result_2, result_3] for result in result_list: if result: source_id = json.loads(result)['productionPath'].split("rootSourceId%3D")[1] video_id = json.loads(result)['productionPath'].split("videos%3Fid%3D")[1].split("%26su%")[0] sql = f""" INSERT INTO long_articles_root_source_id (rootSourceId, accountName, ghId, articleTitle, requestTime, trace_id, push_type, video_id) values (%s, %s, %s, %s, %s, %s, %s, %s); """ try: cls.db_client.update( sql=sql, params=( source_id, account_name, gh_id, title, request_time_stamp, trace_id, cls.source_id_list.get(source_id, 2), video_id ) ) log( task="migrateRootSourceId", function="processEachData", message="更新消息成功", data={"trace_id": trace_id} ) except Exception as e: log( task="migrateRootSourceId", function="processEachData", message="更新消息失败,报错信息是: {}".format(e), status="fail", data={"trace_id": trace_id} ) else: print("No result") @classmethod def sourceIdJob(cls): """ 执行代码 :return: """ today_string = datetime.datetime.today().strftime("%Y-%m-%d") time_stamp = datetime.datetime.strptime(today_string, '%Y-%m-%d').timestamp() data_list = cls.getDataList(int(time_stamp)) for item in tqdm(data_list): try: cls.processEachData(item) except Exception as e: print(e) def source_id_job(): """ :return: """ S = UpdateRootSourceId() S.sourceIdJob() if __name__ == '__main__': # source_id_job() schedule.every().day.at("01:00").do(Functions().job_with_thread, source_id_job) while True: schedule.run_pending() time.sleep(1) # log( # task="migrateRootSourceId", # function="main", # message="迁移 source_id 任务正常执行" # )