import os import concurrent.futures import re import schedule import time import threading from common import Material, Common, Feishu # 控制读写速度的参数 from video_rewriting.video_prep import getVideo MAX_BPS = 120 * 1024 * 1024 # 120MB/s MAX_WORKERS = os.cpu_count() * 2 # 线程池最大工作线程数量 READ_WRITE_CHUNK_SIZE = 1024 * 1024 # 每次读写的块大小 (1MB) SLEEP_INTERVAL = READ_WRITE_CHUNK_SIZE / MAX_BPS # 控制每次读写的延迟时间 # 全局锁,用于同步读写操作 lock = threading.Lock() # 记录今天已经返回的用户名 today = [] def video_task_start(data): global today user_data_mark = data["mark"] # 开始准备执行生成视频脚本 if user_data_mark is not None and user_data_mark in today: Common.logger("log").info(f"视频脚本参数中的用户名 {user_data_mark} 今天已经返回过,不再启动线程。今天已经返回的用户名:{user_data_mark}") print(f"视频脚本参数中的用户名 {user_data_mark} 今天已经返回过,不再启动线程。") return mark = getVideo.video_task(data) print(f"返回用户名{mark}") if mark: today.append(mark) Common.logger("log").info(f"返回用户名{mark}") # data = Material.feishu_list() # video_task_start(data[0]) def controlled_io_operation(data): with lock: start_time = time.time() time.sleep(SLEEP_INTERVAL) end_time = time.time() elapsed_time = end_time - start_time if elapsed_time < SLEEP_INTERVAL: time.sleep(SLEEP_INTERVAL - elapsed_time) video_task_start(data) def video_start(): print("开始执行生成视频脚本.") data = Material.feishu_list() with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor: futures = {executor.submit(controlled_io_operation, user_data): user_data for user_data in data} for future in concurrent.futures.as_completed(futures): try: future.result() print("处理结果: 成功") except concurrent.futures.TimeoutError: print("任务超时,已取消.") except Exception as e: print("处理任务时出现异常:", e) print("执行生成视频脚本结束.") def usernames_today(): today.clear() print("today 已清空") video_start() # 定时任务设置 schedule.every().day.at("01:00").do(usernames_today) schedule.every(12).hours.do(video_start) while True: schedule.run_pending() time.sleep(1)