migrate.py 854 B

12345678910111213141516171819202122232425262728293031323334
  1. """
  2. @author: luojunhui
  3. """
  4. """
  5. @author: luojunhui
  6. """
  7. import time
  8. from concurrent.futures.thread import ThreadPoolExecutor
  9. from applications.mysql import MySQL
  10. from applications.odps_server import PyODPS
  11. from applications.functions import generate_daily_strings
  12. def migrate_daily(dt):
  13. """
  14. 迁移当天到数据
  15. :param dt:
  16. :return:
  17. """
  18. PO = PyODPS()
  19. M = MySQL()
  20. select_sql = f"""select * from loghubods.video_return_top_500_new where dt = '{dt}';"""
  21. data = PO.select(select_sql)
  22. a = time.time()
  23. with ThreadPoolExecutor(max_workers=8) as pool:
  24. pool.map(M.migrate_data_to_mysql, data)
  25. b = time.time()
  26. print("{} successfully insert {} rows, totally cost {} seconds".format(dt, len(data), b - a))
  27. dt_list = generate_daily_strings("20240613", "20240617")
  28. for dt in dt_list:
  29. migrate_daily(dt)