agc_job.py 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142
  1. import os
  2. import concurrent.futures
  3. import re
  4. import schedule
  5. import time
  6. import threading
  7. from common import Material, Common, Feishu
  8. from video_agc.agc_video_method import AgcVidoe
  9. # 控制读写速度的参数
  10. MAX_BPS = 120 * 1024 * 1024 # 120MB/s
  11. MAX_WORKERS = os.cpu_count() * 2 # 线程池最大工作线程数量
  12. READ_WRITE_CHUNK_SIZE = 1024 * 1024 # 每次读写的块大小 (1MB)
  13. SLEEP_INTERVAL = READ_WRITE_CHUNK_SIZE / MAX_BPS # 控制每次读写的延迟时间
  14. # 全局锁,用于同步读写操作
  15. lock = threading.Lock()
  16. # 记录今天已经返回的用户名
  17. gs_today = []
  18. cg_today = []
  19. bk_today = []
  20. def gs_video_start(user_data):
  21. global gs_today
  22. user_data_mark = user_data["mark"]
  23. video_call = user_data["video_call"]
  24. mark_name = user_data['mark_name']
  25. if user_data_mark is not None and user_data_mark in gs_today:
  26. Common.logger("video").info(f"视频脚本参数中的用户名 {user_data_mark} 今天已经返回过,不再启动线程。今天已经返回的用户名:{returned_usernames_today}")
  27. print(f"视频脚本参数中的用户名 {user_data_mark} 今天已经返回过,不再启动线程。")
  28. return # 如果返回了某个用户名,并且今天已经返回过,则不启动线程
  29. if video_call is not None and video_call in gs_today:
  30. print(f"视频脚本参数中的脚本{user_data_mark} 今天已经返回过,不再启动线程。")
  31. return # 如果返回了某个用户名,并且今天已经返回过,则不启动线程
  32. else:
  33. print(f"视频脚本参数{user_data}")
  34. mark = AgcVidoe.video_gs_stitching(user_data)
  35. print(f"返回用户名{mark}")
  36. if mark:
  37. Common.logger("video").info(f"返回用户名{mark}")
  38. gs_today.append(mark)
  39. zd_count = user_data["zd_count"] # 生成总条数
  40. # 总条数
  41. result = re.match(r'([^0-9]+)', user_data_mark).group()
  42. all_count = AgcVidoe.get_link_gs_count(result)
  43. if all_count >= int(zd_count):
  44. Feishu.bot('recommend', 'AGC完成通知', '今日脚本跟随视频拼接任务完成啦~', user_data_mark.split("-")[0], mark_name)
  45. def cg_video_start(user_data):
  46. global cg_today
  47. user_data_mark = user_data["mark"]
  48. if user_data_mark and user_data_mark in cg_today:
  49. Common.logger("video").info(f"视频脚本参数中的用户名 {user_data_mark} 今天已经返回过,不再启动线程。")
  50. print(f"视频脚本参数中的用户名 {user_data_mark} 今天已经返回过,不再启动线程。")
  51. return
  52. print(f"视频脚本参数 {user_data}")
  53. mark = AgcVidoe.video_stitching(user_data)
  54. print(f"返回用户名 {mark}")
  55. if mark:
  56. Common.logger("video").info(f"返回用户名 {mark}")
  57. cg_today.append(user_data_mark)
  58. def bk_video_start(user_data):
  59. global bk_today
  60. user_data_mark = user_data["mark"]
  61. # 开始准备执行生成视频脚本
  62. if user_data_mark is not None and user_data_mark in bk_today:
  63. Common.logger("video").info(f"视频脚本参数中的用户名 {user_data_mark} 今天已经返回过,不再启动线程。今天已经返回的用户名:{returned_usernames_today}")
  64. print(f"视频脚本参数中的用户名 {user_data_mark} 今天已经返回过,不再启动线程。")
  65. return
  66. mark = AgcVidoe.video_bk_stitching(user_data)
  67. print(f"返回用户名{mark}")
  68. if mark:
  69. bk_today.append(mark)
  70. Common.logger("video").info(f"返回用户名{mark}")
  71. def controlled_io_operation(platform, data):
  72. with lock:
  73. start_time = time.time()
  74. time.sleep(SLEEP_INTERVAL)
  75. end_time = time.time()
  76. elapsed_time = end_time - start_time
  77. if elapsed_time < SLEEP_INTERVAL:
  78. time.sleep(SLEEP_INTERVAL - elapsed_time)
  79. if platform == "gs":
  80. gs_video_start(data)
  81. elif platform == "cg":
  82. cg_video_start(data)
  83. elif platform == "bk":
  84. bk_video_start(data)
  85. def video_start(platform):
  86. print("开始执行生成视频脚本.")
  87. if platform == "cg":
  88. data = Material.feishu_list()
  89. elif platform == "gs":
  90. data = Material.feishu_gs_list()
  91. elif platform == "bk":
  92. data = Material.feishu_bk_list()
  93. with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
  94. futures = {executor.submit(controlled_io_operation, platform, user_data): user_data for user_data in data}
  95. for future in concurrent.futures.as_completed(futures):
  96. try:
  97. future.result()
  98. print("处理结果: 成功")
  99. except concurrent.futures.TimeoutError:
  100. print("任务超时,已取消.")
  101. except Exception as e:
  102. print("处理任务时出现异常:", e)
  103. print("执行生成视频脚本结束.")
  104. def gs_usernames_today():
  105. gs_today.clear()
  106. print("gs_usernames_today 已清空")
  107. def cg_usernames_today():
  108. cg_today.clear()
  109. print("cg_usernames_today 已清空")
  110. def bk_usernames_today():
  111. bk_today.clear()
  112. print("bk_usernames_today 已清空")
  113. # 定时任务设置
  114. schedule.every().day.at("00:10").do(gs_usernames_today)
  115. schedule.every().day.at("04:10").do(cg_usernames_today)
  116. schedule.every().day.at("19:10").do(bk_usernames_today)
  117. schedule.every(10).minutes.do(video_start, "cg")
  118. schedule.every(10).minutes.do(video_start, "gs")
  119. schedule.every(10).minutes.do(video_start, "bk")
  120. if __name__ == "__main__":
  121. while True:
  122. schedule.run_pending()
  123. time.sleep(1)