carry_data_handle.py 2.5 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667
  1. import json
  2. import os
  3. import time
  4. import uuid
  5. import schedule
  6. from loguru import logger
  7. from carry_video.carry_video import CarryViode
  8. from common import Feishu, AliyunLogger
  9. from common.ffmpeg import FFmpeg
  10. from common.redis import get_carry_data, in_carry_video_data
  11. ENV = os.getenv('ENV', 'dev')
  12. NAME = os.getenv('NAME')
  13. REDIS_NAME = os.getenv('REDIS_NAME')
  14. CACHE_DIR = '/app/cache/' if ENV == 'prod' else os.path.expanduser('~/Downloads/')
  15. def video_task_start():
  16. logger.info(f"[+] {REDIS_NAME}任务开始redis获取")
  17. data = get_carry_data(REDIS_NAME)
  18. if not data:
  19. return
  20. try:
  21. logger.info(f"[+] {NAME}任务开始,数据为{data}")
  22. uid = str(uuid.uuid4())
  23. file_path = os.path.join(CACHE_DIR, uid)
  24. carry_video = CarryViode()
  25. mark = carry_video.main(json.loads(data), REDIS_NAME, file_path)
  26. print(f"返回用户名: {mark}")
  27. logger.info(f"[+] {NAME}处理一条成功")
  28. for filename in os.listdir(CACHE_DIR):
  29. # 检查文件名是否包含关键字
  30. if uid in filename:
  31. file_path = os.path.join(CACHE_DIR, filename)
  32. try:
  33. # 删除文件
  34. os.remove(file_path)
  35. logger.info(f"已删除文件: {file_path}")
  36. except Exception as e:
  37. logger.error(f"删除文件时出错: {file_path}, 错误: {e}")
  38. return
  39. except Exception as e:
  40. data = json.loads(data)
  41. in_carry_video_data(REDIS_NAME, json.dumps(data, ensure_ascii=False, indent=4))
  42. text = (
  43. f"**负责人**: {data['name']}\n"
  44. f"**内容**: {data}\n"
  45. f"**失败信息**: 视频处理失败,等待重新处理\n"
  46. )
  47. AliyunLogger.logging(data["name"], "效率工具", data["tag_transport_channel"], data["video_url"],
  48. f"视频处理失败,失败信息{e}", "3002", str(data))
  49. Feishu.finish_bot(text,
  50. "https://open.feishu.cn/open-apis/bot/v2/hook/65bc5463-dee9-46d0-bc2d-ec6c49a8f3cd",
  51. "【 搬运&改造效率工具失败通知 】")
  52. logger.error(f"[+] {data}处理失败,失败信息{e}")
  53. return
  54. def schedule_tasks():
  55. schedule.every(6).minutes.do(video_task_start)
  56. if __name__ == '__main__':
  57. schedule_tasks() # 调用任务调度函数
  58. while True:
  59. schedule.run_pending()
  60. time.sleep(1) # 每秒钟检查一次
  61. # video_task_start()