import json import os import time import uuid import schedule from loguru import logger from carry_video.carry_video import CarryViode from common import Feishu, AliyunLogger from common.redis import get_carry_data, in_carry_video_data ENV = os.getenv('ENV', 'dev') NAME = os.getenv('NAME') REDIS_NAME = os.getenv('REDIS_NAME') CACHE_DIR = '/app/cache/' if ENV == 'prod' else os.path.expanduser('~/Downloads/') def video_task_start(): logger.info(f"[+] {REDIS_NAME}任务开始redis获取") data = get_carry_data(REDIS_NAME) if not data: return try: logger.info(f"[+] {NAME}任务开始,数据为{data}") uid = str(uuid.uuid4()) file_path = os.path.join(CACHE_DIR, uid) carry_video = CarryViode() mark = carry_video.main(json.loads(data), REDIS_NAME, file_path) print(f"返回用户名: {mark}") logger.info(f"[+] {NAME}处理一条成功") for filename in os.listdir(CACHE_DIR): # 检查文件名是否包含关键字 if uid in filename: file_path = os.path.join(CACHE_DIR, filename) try: # 删除文件 os.remove(file_path) logger.info(f"已删除文件: {file_path}") except Exception as e: logger.error(f"删除文件时出错: {file_path}, 错误: {e}") return except Exception as e: data = json.loads(data) in_carry_video_data(REDIS_NAME, json.dumps(data, ensure_ascii=False, indent=4)) text = ( f"**负责人**: {data['name']}\n" f"**内容**: {data}\n" f"**失败信息**: 视频处理失败,等待重新处理\n" ) AliyunLogger.logging(data["name"], "效率工具", data["tag_transport_channel"], data["video_url"], f"视频处理失败,失败信息{e}", "3002", str(data)) Feishu.finish_bot(text, "https://open.feishu.cn/open-apis/bot/v2/hook/65bc5463-dee9-46d0-bc2d-ec6c49a8f3cd", "【 搬运&改造效率工具失败通知 】") logger.error(f"[+] {data}处理失败,失败信息{e}") return def schedule_tasks(): schedule.every(6).minutes.do(video_task_start) if __name__ == '__main__': schedule_tasks() # 调用任务调度函数 while True: schedule.run_pending() time.sleep(1) # 每秒钟检查一次 # video_task_start()