import datetime import os import sys from apscheduler.schedulers.blocking import BlockingScheduler from apscheduler.triggers.cron import CronTrigger from loguru import logger sys.path.append('/app') from utils.feishu_form import Material from utils.redis import RedisHelper class StartGetRecommend(object): @classmethod def run(cls): dt = int(datetime.datetime.now().strftime('%Y%m%d%H')) fs_data = os.getenv("FS_DATA") logger.info(f"[FS] {fs_data},时区为{dt}") fs_data_list = fs_data.split(',') name = fs_data_list[0] fs_sheet = fs_data_list[1] redis_name = fs_data_list[2] logger.info(f"[FS] 开始获取{name},时区为{dt}") data = Material.get_carry_data(dt, fs_sheet, name) if not data: logger.info(f"[FS] {name},时区为{dt}没有获取到数据") return RedisHelper().get_client().rpush(redis_name, *data) logger.info(f"[FS] {name},时区为{dt}共获取{len(data)}条,写入成功") def run(): scheduler = BlockingScheduler() try: scheduler.add_job(StartGetRecommend.run, trigger=CronTrigger(minute=15, second=0)) # 每小时获取一次 scheduler.start() except KeyboardInterrupt: pass except Exception as e: pass finally: scheduler.shutdown() if __name__ == '__main__': run()