""" @author: luojunhui """ """ @author: luojunhui """ import time from concurrent.futures.thread import ThreadPoolExecutor from applications.mysql import MySQL from applications.odps_server import PyODPS from applications.functions import generate_daily_strings def migrate_daily(dt): """ 迁移当天到数据 :param dt: :return: """ PO = PyODPS() M = MySQL() select_sql = f"""select * from loghubods.video_return_top_500_new where dt = '{dt}';""" data = PO.select(select_sql) a = time.time() with ThreadPoolExecutor(max_workers=8) as pool: pool.map(M.migrate_data_to_mysql, data) b = time.time() print("{} successfully insert {} rows, totally cost {} seconds".format(dt, len(data), b - a)) dt_list = generate_daily_strings("20240613", "20240617") for dt in dt_list: migrate_daily(dt)