""" @author luojunhui @desc find best frame from each video """ import os import datetime import traceback from tqdm import tqdm from pymysql.cursors import DictCursor from applications import log from applications.api import GoogleAIAPI from applications.const import GoogleVideoUnderstandTaskConst from applications.db import DatabaseConnector from config import long_articles_config from coldStartTasks.ai_pipeline.basic import download_file from coldStartTasks.ai_pipeline.basic import update_task_queue_status from coldStartTasks.ai_pipeline.basic import roll_back_lock_tasks from coldStartTasks.ai_pipeline.basic import extract_best_frame_prompt from coldStartTasks.ai_pipeline.basic import get_video_cover from coldStartTasks.ai_pipeline.basic import normalize_time_str const = GoogleVideoUnderstandTaskConst() google_ai = GoogleAIAPI() class ExtractVideoBestFrame: """ extract video best frame from each video by GeminiAI """ def __init__(self): self.db_client = DatabaseConnector(db_config=long_articles_config) self.db_client.connect() def _roll_back_lock_tasks(self, task: str) -> int: return roll_back_lock_tasks( db_client=self.db_client, task=task, init_status=const.INIT_STATUS, processing_status=const.PROCESSING_STATUS, max_process_time=const.MAX_PROCESSING_TIME, ) def _lock_task(self, task_id: int, task_name) -> int: return update_task_queue_status( db_client=self.db_client, task_id=task_id, task=task_name, ori_status=const.INIT_STATUS, new_status=const.PROCESSING_STATUS, ) def get_upload_task_list(self, task_num: int = const.POOL_SIZE) -> list[dict]: """ get upload task list """ fetch_query = f""" select id, video_oss_path from {const.TABLE_NAME} where upload_status = {const.INIT_STATUS} order by priority desc limit {task_num}; """ upload_task_list = self.db_client.fetch( query=fetch_query, cursor_type=DictCursor ) return upload_task_list def get_extract_task_list(self, task_num: int = const.POOL_SIZE) -> list[dict]: """ get extract task list """ fetch_query = f""" select id, file_name from {const.TABLE_NAME} where upload_status = {const.SUCCESS_STATUS} and extract_status = {const.INIT_STATUS} order by file_expire_time limit {task_num}; """ extract_task_list = self.db_client.fetch( query=fetch_query, cursor_type=DictCursor ) return extract_task_list def get_cover_task_list(self) -> list[dict]: """ get cover task list """ fetch_query = f""" select id, video_oss_path, best_frame_time_ms from {const.TABLE_NAME} where extract_status = {const.SUCCESS_STATUS} and get_cover_status = {const.INIT_STATUS}; """ extract_task_list = self.db_client.fetch( query=fetch_query, cursor_type=DictCursor ) return extract_task_list def get_processing_task_pool_size(self) -> int: """ get processing task pool size """ fetch_query = f""" select count(1) as pool_size from {const.TABLE_NAME} where upload_status = {const.SUCCESS_STATUS} and file_state = 'PROCESSING' and extract_status = {const.INIT_STATUS}; """ fetch_response = self.db_client.fetch(query=fetch_query, cursor_type=DictCursor) processing_task_pool_size = ( fetch_response[0]["pool_size"] if fetch_response else 0 ) return processing_task_pool_size def set_upload_result( self, task_id: int, file_name: str, file_state: str, file_expire_time: str ) -> int: update_query = f""" update {const.TABLE_NAME} set upload_status = %s, upload_status_ts = %s, file_name = %s, file_state = %s, file_expire_time = %s where id = %s and upload_status = %s; """ update_rows = self.db_client.save( query=update_query, params=( const.SUCCESS_STATUS, datetime.datetime.now(), file_name, file_state, file_expire_time, task_id, const.PROCESSING_STATUS, ), ) return update_rows def set_extract_result( self, task_id: int, file_state: str, best_frame_tims_ms: str ) -> int: update_query = f""" update {const.TABLE_NAME} set extract_status = %s, extract_status_ts = %s, file_state = %s, best_frame_time_ms = %s where id = %s and extract_status = %s; """ update_rows = self.db_client.save( query=update_query, params=( const.SUCCESS_STATUS, datetime.datetime.now(), file_state, best_frame_tims_ms, task_id, const.PROCESSING_STATUS, ), ) return update_rows def set_cover_result(self, task_id: int, cover_oss_path: str) -> int: update_query = f""" update {const.TABLE_NAME} set cover_oss_path = %s, get_cover_status = %s, get_cover_status_ts = %s where id = %s and get_cover_status = %s; """ update_rows = self.db_client.save( query=update_query, params=( cover_oss_path, const.SUCCESS_STATUS, datetime.datetime.now(), task_id, const.PROCESSING_STATUS, ), ) return update_rows def upload_each_video(self, task: dict) -> None: lock_status = self._lock_task(task_id=task["id"], task_name="upload") if not lock_status: return None try: file_path = download_file(task["id"], task["video_oss_path"]) upload_response = google_ai.upload_file(file_path) if upload_response: file_name, file_state, expire_time = upload_response self.set_upload_result( task_id=task["id"], file_name=file_name, file_state=file_state, file_expire_time=expire_time, ) return None else: # set status as fail update_task_queue_status( db_client=self.db_client, task_id=task["id"], task="upload", ori_status=const.PROCESSING_STATUS, new_status=const.FAIL_STATUS, ) return None except Exception as e: log( task=const.TASK_NAME, function="upload_video_to_gemini_ai", message="task_failed", data={ "task_id": task["id"], "track_back": traceback.format_exc(), "error": str(e), }, ) update_task_queue_status( db_client=self.db_client, task_id=task["id"], task="upload", ori_status=const.PROCESSING_STATUS, new_status=const.FAIL_STATUS, ) return None def upload_video_to_gemini_ai( self, max_processing_pool_size: int = const.POOL_SIZE ) -> None: # upload video to gemini ai roll_back_lock_tasks_count = self._roll_back_lock_tasks(task="upload") log( task=const.TASK_NAME, function="upload_video_to_gemini_ai", message=f"roll_back_lock_tasks_count: {roll_back_lock_tasks_count}", ) processing_task_num = self.get_processing_task_pool_size() res_task_num = max_processing_pool_size - processing_task_num if res_task_num: upload_task_list = self.get_upload_task_list(task_num=res_task_num) for task in tqdm(upload_task_list, desc="upload_video_to_gemini_ai"): self.upload_each_video(task=task) else: log( task=const.TASK_NAME, function="upload_video_to_gemini_ai", message="reach pool size, no more space for task to upload", ) def extract_each_video(self, task: dict) -> None: # lock task lock_status = self._lock_task(task_id=task["id"], task_name="extract") if not lock_status: return None file_name = task["file_name"] video_local_path = os.path.join(const.DIR_NAME, "{}.mp4".format(task["id"])) try: google_file = google_ai.get_google_file(file_name) state = google_file.state.name match state: case "PROCESSING": # google is still processing this video update_task_queue_status( db_client=self.db_client, task_id=task["id"], task="extract", ori_status=const.PROCESSING_STATUS, new_status=const.INIT_STATUS, ) log( task=const.TASK_NAME, function="extract_best_frame_with_gemini_ai", message="google is still processing this video", data={ "task_id": task["id"], "file_name": file_name, "state": state, }, ) case "FAILED": # google process this video failed update_query = f""" update {const.TABLE_NAME} set file_state = %s, extract_status = %s, extract_status_ts = %s where id = %s and extract_status = %s; """ self.db_client.save( query=update_query, params=( "FAILED", const.FAIL_STATUS, datetime.datetime.now(), task["id"], const.PROCESSING_STATUS, ), ) log( task=const.TASK_NAME, function="extract_best_frame_with_gemini_ai", message="google process this video failed", data={ "task_id": task["id"], "file_name": file_name, "state": state, }, ) case "ACTIVE": # video process successfully try: best_frame_tims_ms = google_ai.fetch_info_from_google_ai( prompt=extract_best_frame_prompt(), video_file=google_file, ) if best_frame_tims_ms: self.set_extract_result( task_id=task["id"], file_state="ACTIVE", best_frame_tims_ms=best_frame_tims_ms.strip(), ) else: update_task_queue_status( db_client=self.db_client, task_id=task["id"], task="extract", ori_status=const.PROCESSING_STATUS, new_status=const.FAIL_STATUS, ) # delete local file and google file if os.path.exists(video_local_path): os.remove(video_local_path) google_ai.delete_video(file_name) log( task=const.TASK_NAME, function="extract_best_frame_with_gemini_ai", message="video process successfully", data={ "task_id": task["id"], "file_name": file_name, "state": state, "best_frame_tims_ms": best_frame_tims_ms, }, ) except Exception as e: log( task=const.TASK_NAME, function="extract_best_frame_with_gemini_ai", message="task_failed_inside_cycle", data={ "task_id": task["id"], "track_back": traceback.format_exc(), "error": str(e), }, ) update_task_queue_status( db_client=self.db_client, task_id=task["id"], task="extract", ori_status=const.PROCESSING_STATUS, new_status=const.FAIL_STATUS, ) except Exception as e: log( task=const.TASK_NAME, function="extract_best_frame_with_gemini_ai", message="task_failed_outside_cycle", data={ "task_id": task["id"], "track_back": traceback.format_exc(), "error": str(e), }, ) update_task_queue_status( db_client=self.db_client, task_id=task["id"], task="extract", ori_status=const.PROCESSING_STATUS, new_status=const.FAIL_STATUS, ) def extract_best_frame_with_gemini_ai(self): # roll back lock tasks roll_back_lock_tasks_count = self._roll_back_lock_tasks(task="extract") log( task=const.TASK_NAME, function="extract_best_frame_with_gemini_ai", message=f"roll_back_lock_tasks_count: {roll_back_lock_tasks_count}", ) # do extract frame task task_list = self.get_extract_task_list() for task in tqdm(task_list, desc="extract_best_frame_with_gemini_ai"): self.extract_each_video(task=task) def get_each_cover(self, task: dict) -> None: lock_status = self._lock_task(task_id=task["id"], task_name="get_cover") if not lock_status: return None time_str = normalize_time_str(task["best_frame_time_ms"]) if time_str: response = get_video_cover( video_oss_path=task["video_oss_path"], time_millisecond_str=time_str ) log( task=const.TASK_NAME, function="extract_cover_with_ffmpeg", message="get_video_cover_with_ffmpeg", data={ "task_id": task["id"], "video_oss_path": task["video_oss_path"], "time_millisecond_str": time_str, "response": response, }, ) if response["success"] and response["data"]: cover_oss_path = response["data"] self.set_cover_result(task_id=task["id"], cover_oss_path=cover_oss_path) else: update_task_queue_status( db_client=self.db_client, task_id=task["id"], task="get_cover", ori_status=const.PROCESSING_STATUS, new_status=const.FAIL_STATUS, ) else: log( task=const.TASK_NAME, function="extract_cover_with_ffmpeg", message="time_str format is not correct", data={ "task_id": task["id"], "video_oss_path": task["video_oss_path"], "time_millisecond_str": time_str, }, ) update_task_queue_status( db_client=self.db_client, task_id=task["id"], task="get_cover", ori_status=const.PROCESSING_STATUS, new_status=const.FAIL_STATUS, ) def get_cover_with_best_frame(self): """ get cover with best frame """ # roll back lock tasks roll_back_lock_tasks_count = self._roll_back_lock_tasks(task="get_cover") log( task=const.TASK_NAME, function="extract_cover_with_ffmpeg", message=f"roll_back_lock_tasks_count: {roll_back_lock_tasks_count}", ) # get task list task_list = self.get_cover_task_list() for task in tqdm(task_list, desc="extract_cover_with_ffmpeg"): self.get_each_cover(task=task)