""" @author: luojunhui 迁移rootSourceId """ import json import time import datetime import schedule from tqdm import tqdm from applications import Functions, longArticlesMySQL, PQMySQL, log class UpdateRootSourceId(object): """ 更新 rootSourceId """ db_client = PQMySQL() lam = longArticlesMySQL() source_id_list = { 'longArticles_2d311f88a9c1bd5a90ce88339ae93e78': 1, 'longArticles_8d9fd0553c988e7a6bf3a6198f78d890': 1, 'longArticles_99763b3ad92c781194dbd3eb3321542c': 1, 'longArticles_2a27f501ef0d758c35dd3b70cf3bbfa3': 1, "touliu_tencentGzhArticle_cc284926a7d1c19f9a4e6abe5520468b": 1, "touliu_tencentGzhArticle_2e4c21de3707f3b368b0cc4500d120f0": 1, "touliu_tencentGzhArticle_a18c11dd294df014334f7db72830221a": 1, "touliu_tencentGzhArticle_c2debdc233827497e24b633dea36c57c": 1, "touliu_tencentGzhArticle_d66796826916665a23c667472ef4dd56": 1, "touliu_tencentGzhArticle_f8e97355f3687f57fd4efeb635a7a3a2": 1, "touliu_tencentGzhArticle_gh_68e7fdc09fe4_90bb12e53f6628fd5330310c7c3cc344": 1, "touliu_tencentGzhArticle_gh_68e7fdc09fe4_cd602a61ea073e41404572fce51eb297": 1, "touliu_tencentGzhArticle_gh_68e7fdc09fe4_d8fca9b2712f829d625d98bec37db228": 1, "touliu_tencentGzhArticle_gh_77f36c109fb1_1401a97f6537f32b14496cd5fe6caa70": 1, "touliu_tencentGzhArticle_gh_77f36c109fb1_926713998cd1513370b910ba20adda44": 1, "touliu_tencentGzhArticle_gh_77f36c109fb1_4ca7c1c6223501ff4f80913f8363309f": 1 } @classmethod def getDataList(cls, request_timestamp): """ :param request_timestamp: :return: """ start_dt = request_timestamp - 1 * 24 * 3600 sql = f""" select trace_id, gh_id, account_name, response, request_timestamp from long_articles_match_videos where request_timestamp >= {start_dt} and request_timestamp <= {request_timestamp} and content_status = 4; """ result = cls.lam.select(sql) log( task="migrateRootSourceId", function="getDataList", message="一共找到了: {} 条记录".format(len(result)) ) return result @classmethod def processEachData(cls, data_tuple): """ 处理数据 :param data_tuple: :return: """ trace_id = data_tuple[0] gh_id = data_tuple[1] account_name = data_tuple[2] request_timestamp = data_tuple[4] response = json.loads(data_tuple[3]) if response: for result in response: source_id = result['rootSourceId'] video_id = result['videoId'] sql = f""" INSERT INTO long_articles_root_source_id (rootSourceId, accountName, ghId, requestTime, trace_id, push_type, video_id) values (%s, %s, %s, %s, %s, %s, %s); """ try: cls.db_client.update( sql=sql, params=( source_id, account_name, gh_id, request_timestamp, trace_id, cls.source_id_list.get(source_id, 2), video_id ) ) log( task="migrateRootSourceId", function="processEachData", message="更新消息成功", data={"trace_id": trace_id} ) except Exception as e: log( task="migrateRootSourceId", function="processEachData", message="更新消息失败,报错信息是: {}".format(e), status="fail", data={"trace_id": trace_id} ) else: print("No result") @classmethod def sourceIdJob(cls): """ 执行代码 :return: """ today_string = datetime.datetime.today().strftime("%Y-%m-%d") time_stamp = datetime.datetime.strptime(today_string, '%Y-%m-%d').timestamp() data_list = cls.getDataList(int(time_stamp)) for item in tqdm(data_list): try: cls.processEachData(item) except Exception as e: print(e) def source_id_job(): """ :return: """ S = UpdateRootSourceId() S.sourceIdJob() if __name__ == '__main__': # source_id_job() schedule.every().day.at("02:00").do(Functions().job_with_thread, source_id_job) while True: schedule.run_pending() time.sleep(1)