import json import os import time import uuid import schedule from apscheduler.schedulers.blocking import BlockingScheduler from apscheduler.triggers.interval import IntervalTrigger from loguru import logger from common.feishu_utils import Feishu from common.redis import get_carry_data, in_carry_video_data from pq_video.pq_video import PqViode ENV = os.getenv('ENV', 'dev') NAME = os.getenv('NAME') REDIS_NAME = os.getenv('REDIS_NAME') CACHE_DIR = '/app/cache/' if ENV == 'prod' else os.path.expanduser('~/Downloads/') class ConsumptionRecommend(object): @classmethod def run(cls): logger.info(f"[+] {REDIS_NAME}任务开始redis获取") for i in range(20): data = get_carry_data(REDIS_NAME) if not data: return try: logger.info(f"[+] {NAME}任务开始,数据为{data}") carry_video = PqViode() mark = carry_video.main(json.loads(data), REDIS_NAME) print(f"返回用户名: {mark}") logger.info(f"[+] {NAME}处理一条成功") continue except Exception as e: data = json.loads(data) in_carry_video_data(REDIS_NAME, json.dumps(data, ensure_ascii=False, indent=4)) text = ( f"**负责人**: {data['name']}\n" f"**内容**: {data}\n" f"**失败信息**: 站内视频重发失败,等待重新处理\n" ) Feishu.finish_bot(text, "https://open.feishu.cn/open-apis/bot/v2/hook/65bc5463-dee9-46d0-bc2d-ec6c49a8f3cd", "【 老内容重发通知 】") logger.error(f"[+] {data}处理失败,失败信息{e}") continue return def run(): scheduler = BlockingScheduler() try: logger.info(f"[+] 开始启动") scheduler.add_job(ConsumptionRecommend.run, trigger=IntervalTrigger(minutes=5)) # 每5分钟启动一次 scheduler.start() except KeyboardInterrupt: pass except Exception as e: logger.error(f"[+] 启动异常,异常信息:{e}") pass finally: scheduler.shutdown() if __name__ == '__main__': run()