import os import concurrent.futures import schedule import time import threading from common import Material from video_agc.agc_video import AGC # 控制读写速度的参数 MAX_BPS = 1 * 1024 * 1024 # 120MB/s MAX_WORKERS = os.cpu_count() * 5 # 线程池最大工作线程数量 READ_WRITE_CHUNK_SIZE = 512 * 1024 # 每次读写的块大小 (1MB) SLEEP_INTERVAL = READ_WRITE_CHUNK_SIZE / MAX_BPS # 控制每次读写的延迟时间 # 全局锁,用于同步读写操作 lock = threading.Lock() def controlled_io_operation(platform, data): with lock: start_time = time.time() time.sleep(SLEEP_INTERVAL) end_time = time.time() elapsed_time = end_time - start_time if elapsed_time < SLEEP_INTERVAL: time.sleep(SLEEP_INTERVAL - elapsed_time) if platform == "gs": AGC.video(data, "跟随") elif platform == "cg": AGC.video(data, "常规") elif platform == "bk": AGC.video(data, "爆款") def video_start(platform): print("开始执行生成视频脚本.") if platform == "cg": data = Material.feishu_list() elif platform == "gs": data = Material.feishu_gs_list() elif platform == "bk": data = Material.feishu_bk_list() with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor: futures = {executor.submit(controlled_io_operation, platform, user_data): user_data for user_data in data} for future in concurrent.futures.as_completed(futures): try: future.result() print("处理结果: 成功") except concurrent.futures.TimeoutError: print("任务超时,已取消.") except Exception as e: print("处理任务时出现异常:", e) print("执行生成视频脚本结束.") schedule.every().day.at("04:10").do(video_start, "cg") if __name__ == "__main__": # video_start("cg") while True: try: schedule.run_pending() except Exception as e: print("执行调度任务时出现异常:", e) time.sleep(1)