select_work.py 1.3 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344
  1. import datetime
  2. import os
  3. import sys
  4. from apscheduler.schedulers.blocking import BlockingScheduler
  5. from apscheduler.triggers.cron import CronTrigger
  6. from loguru import logger
  7. sys.path.append('/app')
  8. from utils.feishu_form import Material
  9. from utils.redis import RedisHelper
  10. class StartGetRecommend(object):
  11. @classmethod
  12. def run(cls):
  13. dt = int(datetime.datetime.now().strftime('%Y%m%d%H'))
  14. fs_data = os.getenv("FS_DATA")
  15. fs_data_list = fs_data.split(',')
  16. name = fs_data_list[0]
  17. fs_sheet = fs_data_list[1]
  18. redis_name = fs_data_list[2]
  19. logger.info(f"[FS] 开始获取{name},时区为{dt}")
  20. data = Material.get_carry_data(dt, fs_sheet, name)
  21. if not data:
  22. logger.info(f"[FS] {name},时区为{dt}没有获取到数据")
  23. return
  24. RedisHelper().get_client().rpush(redis_name, *data)
  25. logger.info(f"[FS] {name},时区为{dt}共获取{len(data)}条,写入成功")
  26. def run():
  27. scheduler = BlockingScheduler()
  28. try:
  29. scheduler.add_job(StartGetRecommend.run, trigger=CronTrigger(minute=10, second=0)) # 每小时获取一次
  30. scheduler.start()
  31. except KeyboardInterrupt:
  32. pass
  33. except Exception as e:
  34. pass
  35. finally:
  36. scheduler.shutdown()
  37. if __name__ == '__main__':
  38. run()