1234567891011121314151617181920212223242526272829303132333435363738394041424344 |
- import datetime
- import os
- import sys
- from apscheduler.schedulers.blocking import BlockingScheduler
- from apscheduler.triggers.cron import CronTrigger
- from loguru import logger
- sys.path.append('/app')
- from utils.feishu_form import Material
- from utils.redis import RedisHelper
- class StartGetRecommend(object):
- @classmethod
- def run(cls):
- dt = int(datetime.datetime.now().strftime('%Y%m%d%H'))
- fs_data = os.getenv("FS_DATA")
- fs_data_list = fs_data.split(',')
- name = fs_data_list[0]
- fs_sheet = fs_data_list[1]
- redis_name = fs_data_list[2]
- logger.info(f"[FS] 开始获取{name},时区为{dt}")
- data = Material.get_carry_data(dt, fs_sheet, name)
- if not data:
- logger.info(f"[FS] {name},时区为{dt}没有获取到数据")
- return
- RedisHelper().get_client().rpush(redis_name, *data)
- logger.info(f"[FS] {name},时区为{dt}共获取{len(data)}条,写入成功")
- def run():
- scheduler = BlockingScheduler()
- try:
- scheduler.add_job(StartGetRecommend.run, trigger=CronTrigger(minute=10, second=0)) # 每小时获取一次
- scheduler.start()
- except KeyboardInterrupt:
- pass
- except Exception as e:
- pass
- finally:
- scheduler.shutdown()
- if __name__ == '__main__':
- run()
|