import os import concurrent.futures import re import schedule import time import threading from common import Material, Common, Feishu from video_agc.agc_video_method import AgcVidoe # 控制读写速度的参数 MAX_BPS = 1 * 1024 * 1024 # 120MB/s MAX_WORKERS = os.cpu_count() # 线程池最大工作线程数量 READ_WRITE_CHUNK_SIZE = 512 * 1024 # 每次读写的块大小 (1MB) SLEEP_INTERVAL = READ_WRITE_CHUNK_SIZE / MAX_BPS # 控制每次读写的延迟时间 # 全局锁,用于同步读写操作 lock = threading.Lock() # 记录今天已经返回的用户名 gs_today = [] cg_today = [] bk_today = [] def gs_video_start(user_data): global gs_today user_data_mark = user_data["mark"] video_call = user_data["video_call"] mark_name = user_data['mark_name'] if user_data_mark is not None and user_data_mark in gs_today: Common.logger("video").info(f"视频脚本参数中的用户名 {user_data_mark} 今天已经返回过,不再启动线程。今天已经返回的用户名:{returned_usernames_today}") print(f"视频脚本参数中的用户名 {user_data_mark} 今天已经返回过,不再启动线程。") return # 如果返回了某个用户名,并且今天已经返回过,则不启动线程 if video_call is not None and video_call in gs_today: print(f"视频脚本参数中的脚本{user_data_mark} 今天已经返回过,不再启动线程。") return # 如果返回了某个用户名,并且今天已经返回过,则不启动线程 else: print(f"视频脚本参数{user_data}") mark = AgcVidoe.video_gs_stitching(user_data) print(f"返回用户名{mark}") if mark: Common.logger("video").info(f"返回用户名{mark}") gs_today.append(mark) zd_count = user_data["zd_count"] # 生成总条数 # 总条数 result = re.match(r'([^0-9]+)', user_data_mark).group() all_count = AgcVidoe.get_link_gs_count(result) if all_count >= int(zd_count): Feishu.bot('recommend', 'AGC完成通知', '今日脚本跟随视频拼接任务完成啦~', user_data_mark.split("-")[0], mark_name) def cg_video_start(user_data): global cg_today user_data_mark = user_data["mark"] if user_data_mark and user_data_mark in cg_today: Common.logger("video").info(f"视频脚本参数中的用户名 {user_data_mark} 今天已经返回过,不再启动线程。") print(f"视频脚本参数中的用户名 {user_data_mark} 今天已经返回过,不再启动线程。") return print(f"视频脚本参数 {user_data}") mark = AgcVidoe.video_stitching(user_data) print(f"返回用户名 {mark}") if mark: Common.logger("video").info(f"返回用户名 {mark}") cg_today.append(user_data_mark) def bk_video_start(user_data): global bk_today user_data_mark = user_data["mark"] # 开始准备执行生成视频脚本 if user_data_mark is not None and user_data_mark in bk_today: Common.logger("video").info(f"视频脚本参数中的用户名 {user_data_mark} 今天已经返回过,不再启动线程。今天已经返回的用户名:{returned_usernames_today}") print(f"视频脚本参数中的用户名 {user_data_mark} 今天已经返回过,不再启动线程。") return mark = AgcVidoe.video_bk_stitching(user_data) print(f"返回用户名{mark}") if mark: bk_today.append(mark) Common.logger("video").info(f"返回用户名{mark}") def controlled_io_operation(platform, data): with lock: start_time = time.time() time.sleep(SLEEP_INTERVAL) end_time = time.time() elapsed_time = end_time - start_time if elapsed_time < SLEEP_INTERVAL: time.sleep(SLEEP_INTERVAL - elapsed_time) if platform == "gs": gs_video_start(data) elif platform == "cg": cg_video_start(data) elif platform == "bk": bk_video_start(data) def video_start(platform): print("开始执行生成视频脚本.") if platform == "cg": data = Material.feishu_list() elif platform == "gs": data = Material.feishu_gs_list() elif platform == "bk": data = Material.feishu_bk_list() with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor: futures = {executor.submit(controlled_io_operation, platform, user_data): user_data for user_data in data} timeout = 20 * 60 # 设置超时时间为20分钟 for future in concurrent.futures.as_completed(futures, timeout=timeout): try: future.result() print("处理结果: 成功") except concurrent.futures.TimeoutError: print("任务超时,已取消.") except Exception as e: print("处理任务时出现异常:", e) print("执行生成视频脚本结束.") def gs_usernames_today(): gs_today.clear() print("gs_usernames_today 已清空") def cg_usernames_today(): cg_today.clear() print("cg_usernames_today 已清空") def bk_usernames_today(): bk_today.clear() print("bk_usernames_today 已清空") # 定时任务设置 schedule.every().day.at("00:10").do(gs_usernames_today) schedule.every().day.at("04:10").do(cg_usernames_today) schedule.every().day.at("19:10").do(bk_usernames_today) schedule.every(10).minutes.do(video_start, "cg") schedule.every(10).minutes.do(video_start, "gs") schedule.every(10).minutes.do(video_start, "bk") if __name__ == "__main__": while True: try: schedule.run_pending() except Exception as e: print("执行调度任务时出现异常:", e) time.sleep(1)