# import os # import concurrent.futures # import re # # import schedule # import time # import threading # from common import Material, Common, Feishu # from video_agc.agc_video_method import AgcVidoe # # # 控制读写速度的参数 # MAX_BPS = 1 * 1024 * 1024 # 120MB/s # MAX_WORKERS = os.cpu_count() # 线程池最大工作线程数量 # READ_WRITE_CHUNK_SIZE = 512 * 1024 # 每次读写的块大小 (1MB) # SLEEP_INTERVAL = READ_WRITE_CHUNK_SIZE / MAX_BPS # 控制每次读写的延迟时间 # # # 全局锁,用于同步读写操作 # lock = threading.Lock() # # # 记录今天已经返回的用户名 # gs_today = [] # cg_today = [] # bk_today = [] # # def gs_video_start(user_data): # global gs_today # user_data_mark = user_data["mark"] # video_call = user_data["video_call"] # mark_name = user_data['mark_name'] # # if user_data_mark is not None and user_data_mark in gs_today: # Common.logger("video").info(f"视频脚本参数中的用户名 {user_data_mark} 今天已经返回过,不再启动线程") # print(f"视频脚本参数中的用户名 {user_data_mark} 今天已经返回过,不再启动线程。") # return # 如果返回了某个用户名,并且今天已经返回过,则不启动线程 # if video_call is not None and video_call in gs_today: # print(f"视频脚本参数中的脚本{user_data_mark} 今天已经返回过,不再启动线程。") # return # 如果返回了某个用户名,并且今天已经返回过,则不启动线程 # else: # print(f"视频脚本参数{user_data}") # mark = AgcVidoe.video_gs_stitching(user_data) # print(f"返回用户名{mark}") # if mark: # Common.logger("video").info(f"返回用户名{mark}") # gs_today.append(mark) # zd_count = user_data["zd_count"] # 生成总条数 # # 总条数 # result = re.match(r'([^0-9]+)', user_data_mark).group() # all_count = AgcVidoe.get_link_gs_count(result) # if all_count >= int(zd_count): # Feishu.bot('recommend', 'AGC完成通知', '今日脚本跟随视频拼接任务完成啦~', user_data_mark.split("-")[0], mark_name) # # def cg_video_start(user_data): # global cg_today # user_data_mark = user_data["mark"] # if user_data_mark and user_data_mark in cg_today: # Common.logger("video").info(f"视频脚本参数中的用户名 {user_data_mark} 今天已经返回过,不再启动线程。") # print(f"视频脚本参数中的用户名 {user_data_mark} 今天已经返回过,不再启动线程。") # return # print(f"视频脚本参数 {user_data}") # mark = AgcVidoe.video_stitching(user_data) # print(f"返回用户名 {mark}") # if mark: # Common.logger("video").info(f"返回用户名 {mark}") # cg_today.append(user_data_mark) # # def bk_video_start(user_data): # global bk_today # user_data_mark = user_data["mark"] # # 开始准备执行生成视频脚本 # if user_data_mark is not None and user_data_mark in bk_today: # Common.logger("video").info(f"视频脚本参数中的用户名 {user_data_mark} 今天已经返回过,不再启动线程。") # print(f"视频脚本参数中的用户名 {user_data_mark} 今天已经返回过,不再启动线程。") # return # mark = AgcVidoe.video_bk_stitching(user_data) # print(f"返回用户名{mark}") # if mark: # bk_today.append(mark) # Common.logger("video").info(f"返回用户名{mark}") # # def controlled_io_operation(platform, data): # with lock: # start_time = time.time() # time.sleep(SLEEP_INTERVAL) # end_time = time.time() # elapsed_time = end_time - start_time # if elapsed_time < SLEEP_INTERVAL: # time.sleep(SLEEP_INTERVAL - elapsed_time) # if platform == "gs": # gs_video_start(data) # elif platform == "cg": # cg_video_start(data) # elif platform == "bk": # bk_video_start(data) # # def video_start(platform): # print("开始执行生成视频脚本.") # if platform == "cg": # data = Material.feishu_list() # elif platform == "gs": # data = Material.feishu_gs_list() # elif platform == "bk": # data = Material.feishu_bk_list() # with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor: # futures = {executor.submit(controlled_io_operation, platform, user_data): user_data for user_data in data} # for future in concurrent.futures.as_completed(futures): # try: # future.result() # print("处理结果: 成功") # except concurrent.futures.TimeoutError: # print("任务超时,已取消.") # except Exception as e: # print("处理任务时出现异常:", e) # print("执行生成视频脚本结束.") # # def gs_usernames_today(): # gs_today.clear() # print("gs_usernames_today 已清空") # def cg_usernames_today(): # cg_today.clear() # print("cg_usernames_today 已清空") # def bk_usernames_today(): # bk_today.clear() # print("bk_usernames_today 已清空") # # # 定时任务设置 # schedule.every().day.at("21:10").do(gs_usernames_today) # schedule.every().day.at("04:10").do(cg_usernames_today) # schedule.every().day.at("00:10").do(bk_usernames_today) # # schedule.every(10).minutes.do(video_start, "cg") # schedule.every(10).minutes.do(video_start, "gs") # # schedule.every(10).minutes.do(video_start, "bk") # schedule.every().day.at("00:20").do(video_start, "bk") # # # if __name__ == "__main__": # while True: # try: # schedule.run_pending() # except Exception as e: # print("执行调度任务时出现异常:", e) # time.sleep(1) from common import Material from video_agc.agc_video import AGC data = Material.feishu_list() AGC.video(data[3], "常规")