""" @author: luojunhui """ import datetime import multiprocessing from applications import log from coldStartTasks.ai_pipeline import ExtractVideoBestFrame PROCESS_EXIT_TIMEOUT = 10 * 60 def start_task(): task = ExtractVideoBestFrame() # 查询有多少任务正在处理中 processing_tasks = task.get_processing_task_pool_size() if processing_tasks: print( f"{datetime.datetime.now()} 当前有 {processing_tasks} 个任务正在等待 google 处理..." ) task.extract_best_frame_with_gemini_ai() else: print(f"{datetime.datetime.now()} 没有任务正在处理中...") # upload video to google ai task.upload_video_to_gemini_ai() log( task="video_understanding_with_google", function="main", message="upload_video_to_google_ai_task", ) task.extract_best_frame_with_gemini_ai() # 调用接口,使用 ffmpeg 获取视频的最佳帧作为封面 task.get_cover_with_best_frame() def main(): # create sub process process = multiprocessing.Process(target=start_task) process.start() # wait for sub process to finish process.join(PROCESS_EXIT_TIMEOUT) if process.is_alive(): print( f"Process {process.pid} did not finish within {PROCESS_EXIT_TIMEOUT} seconds. Terminating..." ) process.terminate() process.join() if __name__ == "__main__": main()