dev.py 871 B

12345678910111213141516171819202122232425262728293031
  1. """
  2. @author: luojunhui
  3. """
  4. import time
  5. from concurrent.futures.thread import ThreadPoolExecutor
  6. from applications.mysql import MySQL
  7. from applications.odps_server import PyODPS
  8. from applications.functions import generate_daily_strings
  9. def migrate_daily(dt):
  10. """
  11. 迁移当天到数据
  12. :param dt:
  13. :return:
  14. """
  15. PO = PyODPS()
  16. M = MySQL()
  17. select_sql = f"""select * from loghubods.video_return_top_500_new where dt = '{dt}';"""
  18. data = PO.select(select_sql)
  19. a = time.time()
  20. with ThreadPoolExecutor(max_workers=8) as pool:
  21. pool.map(M.migrate_data_to_mysql, data)
  22. b = time.time()
  23. print("{} successfully insert {} rows, totally cost {} seconds".format(dt, len(data), b - a))
  24. dt_list = generate_daily_strings("20240522", "20240525")
  25. with ThreadPoolExecutor(max_workers=8) as Pool:
  26. Pool.map(migrate_daily, dt_list)