123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459 |
- """
- @author luojunhui
- @desc find best frame from each video
- """
- import os
- import datetime
- import traceback
- from tqdm import tqdm
- from pymysql.cursors import DictCursor
- from applications import log
- from applications.api import GoogleAIAPI
- from applications.const import GoogleVideoUnderstandTaskConst
- from applications.db import DatabaseConnector
- from config import long_articles_config
- from coldStartTasks.ai_pipeline.basic import download_file
- from coldStartTasks.ai_pipeline.basic import update_task_queue_status
- from coldStartTasks.ai_pipeline.basic import roll_back_lock_tasks
- from coldStartTasks.ai_pipeline.basic import extract_best_frame_prompt
- from coldStartTasks.ai_pipeline.basic import get_video_cover
- from coldStartTasks.ai_pipeline.basic import normalize_time_str
- const = GoogleVideoUnderstandTaskConst()
- google_ai = GoogleAIAPI()
- class ExtractVideoBestFrame:
- """
- extract video best frame from each video by GeminiAI
- """
- def __init__(self):
- self.db_client = DatabaseConnector(db_config=long_articles_config)
- self.db_client.connect()
- def _roll_back_lock_tasks(self, task: str) -> int:
- return roll_back_lock_tasks(
- db_client=self.db_client,
- task=task,
- init_status=const.INIT_STATUS,
- processing_status=const.PROCESSING_STATUS,
- max_process_time=const.MAX_PROCESSING_TIME,
- )
- def _lock_task(self, task_id: int, task_name) -> int:
- return update_task_queue_status(
- db_client=self.db_client,
- task_id=task_id,
- task=task_name,
- ori_status=const.INIT_STATUS,
- new_status=const.PROCESSING_STATUS,
- )
- def get_upload_task_list(self, task_num: int = const.POOL_SIZE) -> list[dict]:
- """
- get upload task list
- """
- fetch_query = f"""
- select id, video_oss_path from {const.TABLE_NAME}
- where upload_status = {const.INIT_STATUS}
- order by priority desc
- limit {task_num};
- """
- upload_task_list = self.db_client.fetch(
- query=fetch_query, cursor_type=DictCursor
- )
- return upload_task_list
- def get_extract_task_list(self, task_num: int = const.POOL_SIZE) -> list[dict]:
- """
- get extract task list
- """
- fetch_query = f"""
- select id, file_name from {const.TABLE_NAME}
- where upload_status = {const.SUCCESS_STATUS} and extract_status = {const.INIT_STATUS}
- order by file_expire_time
- limit {task_num};
- """
- extract_task_list = self.db_client.fetch(
- query=fetch_query, cursor_type=DictCursor
- )
- return extract_task_list
- def get_cover_task_list(self) -> list[dict]:
- """
- get cover task list
- """
- fetch_query = f"""
- select id, video_oss_path, best_frame_time_ms from {const.TABLE_NAME}
- where extract_status = {const.SUCCESS_STATUS} and get_cover_status = {const.INIT_STATUS};
- """
- extract_task_list = self.db_client.fetch(
- query=fetch_query, cursor_type=DictCursor
- )
- return extract_task_list
- def get_processing_task_pool_size(self) -> int:
- """
- get processing task pool size
- """
- fetch_query = f"""
- select count(1) as pool_size from {const.TABLE_NAME}
- where upload_status = {const.SUCCESS_STATUS} and file_state = 'PROCESSING' and extract_status = {const.INIT_STATUS};
- """
- fetch_response = self.db_client.fetch(query=fetch_query, cursor_type=DictCursor)
- processing_task_pool_size = (
- fetch_response[0]["pool_size"] if fetch_response else 0
- )
- return processing_task_pool_size
- def set_upload_result(
- self, task_id: int, file_name: str, file_state: str, file_expire_time: str
- ) -> int:
- update_query = f"""
- update {const.TABLE_NAME}
- set upload_status = %s, upload_status_ts = %s,
- file_name = %s, file_state = %s, file_expire_time = %s
- where id = %s and upload_status = %s;
- """
- update_rows = self.db_client.save(
- query=update_query,
- params=(
- const.SUCCESS_STATUS,
- datetime.datetime.now(),
- file_name,
- file_state,
- file_expire_time,
- task_id,
- const.PROCESSING_STATUS,
- ),
- )
- return update_rows
- def set_extract_result(
- self, task_id: int, file_state: str, best_frame_time_ms: str
- ) -> int:
- update_query = f"""
- update {const.TABLE_NAME}
- set extract_status = %s, extract_status_ts = %s,
- file_state = %s, best_frame_time_ms = %s
- where id = %s and extract_status = %s;
- """
- update_rows = self.db_client.save(
- query=update_query,
- params=(
- const.SUCCESS_STATUS,
- datetime.datetime.now(),
- file_state,
- best_frame_time_ms,
- task_id,
- const.PROCESSING_STATUS,
- ),
- )
- return update_rows
- def set_cover_result(self, task_id: int, cover_oss_path: str) -> int:
- update_query = f"""
- update {const.TABLE_NAME}
- set cover_oss_path = %s, get_cover_status = %s, get_cover_status_ts = %s
- where id = %s and get_cover_status = %s;
- """
- update_rows = self.db_client.save(
- query=update_query,
- params=(
- cover_oss_path,
- const.SUCCESS_STATUS,
- datetime.datetime.now(),
- task_id,
- const.PROCESSING_STATUS,
- ),
- )
- return update_rows
- def upload_each_video(self, task: dict) -> None:
- lock_status = self._lock_task(task_id=task["id"], task_name="upload")
- if not lock_status:
- return None
- try:
- file_path = download_file(task["id"], task["video_oss_path"])
- upload_response = google_ai.upload_file(file_path)
- if upload_response:
- file_name, file_state, expire_time = upload_response
- self.set_upload_result(
- task_id=task["id"],
- file_name=file_name,
- file_state=file_state,
- file_expire_time=expire_time,
- )
- return None
- else:
- # set status as fail
- update_task_queue_status(
- db_client=self.db_client,
- task_id=task["id"],
- task="upload",
- ori_status=const.PROCESSING_STATUS,
- new_status=const.FAIL_STATUS,
- )
- return None
- except Exception as e:
- log(
- task=const.TASK_NAME,
- function="upload_video_to_gemini_ai",
- message="task_failed",
- data={
- "task_id": task["id"],
- "track_back": traceback.format_exc(),
- "error": str(e),
- },
- )
- update_task_queue_status(
- db_client=self.db_client,
- task_id=task["id"],
- task="upload",
- ori_status=const.PROCESSING_STATUS,
- new_status=const.FAIL_STATUS,
- )
- return None
- def upload_video_to_gemini_ai(
- self, max_processing_pool_size: int = const.POOL_SIZE
- ) -> None:
- # upload video to gemini ai
- roll_back_lock_tasks_count = self._roll_back_lock_tasks(task="upload")
- log(
- task=const.TASK_NAME,
- function="upload_video_to_gemini_ai",
- message=f"roll_back_lock_tasks_count: {roll_back_lock_tasks_count}",
- )
- processing_task_num = self.get_processing_task_pool_size()
- res_task_num = max_processing_pool_size - processing_task_num
- if res_task_num:
- upload_task_list = self.get_upload_task_list(task_num=res_task_num)
- for task in tqdm(upload_task_list, desc="upload_video_to_gemini_ai"):
- self.upload_each_video(task=task)
- else:
- log(
- task=const.TASK_NAME,
- function="upload_video_to_gemini_ai",
- message="reach pool size, no more space for task to upload",
- )
- def extract_each_video(self, task: dict) -> None:
- # lock task
- lock_status = self._lock_task(task_id=task["id"], task_name="extract")
- if not lock_status:
- return None
- file_name = task["file_name"]
- video_local_path = os.path.join(const.DIR_NAME, "{}.mp4".format(task["id"]))
- try:
- google_file = google_ai.get_google_file(file_name)
- state = google_file.state.name
- match state:
- case "PROCESSING":
- # google is still processing this video
- update_task_queue_status(
- db_client=self.db_client,
- task_id=task["id"],
- task="extract",
- ori_status=const.PROCESSING_STATUS,
- new_status=const.INIT_STATUS,
- )
- log(
- task=const.TASK_NAME,
- function="extract_best_frame_with_gemini_ai",
- message="google is still processing this video",
- data={
- "task_id": task["id"],
- "file_name": file_name,
- "state": state,
- },
- )
- case "FAILED":
- # google process this video failed
- update_query = f"""
- update {const.TABLE_NAME}
- set file_state = %s, extract_status = %s, extract_status_ts = %s
- where id = %s and extract_status = %s;
- """
- self.db_client.save(
- query=update_query,
- params=(
- "FAILED",
- const.FAIL_STATUS,
- datetime.datetime.now(),
- task["id"],
- const.PROCESSING_STATUS,
- ),
- )
- log(
- task=const.TASK_NAME,
- function="extract_best_frame_with_gemini_ai",
- message="google process this video failed",
- data={
- "task_id": task["id"],
- "file_name": file_name,
- "state": state,
- },
- )
- case "ACTIVE":
- # video process successfully
- try:
- best_frame_time_ms = google_ai.fetch_info_from_google_ai(
- prompt=extract_best_frame_prompt(),
- video_file=google_file,
- )
- if best_frame_time_ms:
- self.set_extract_result(
- task_id=task["id"],
- file_state="ACTIVE",
- best_frame_time_ms=best_frame_time_ms.strip(),
- )
- else:
- update_task_queue_status(
- db_client=self.db_client,
- task_id=task["id"],
- task="extract",
- ori_status=const.PROCESSING_STATUS,
- new_status=const.FAIL_STATUS,
- )
- # delete local file and google file
- if os.path.exists(video_local_path):
- os.remove(video_local_path)
- google_ai.delete_video(file_name)
- log(
- task=const.TASK_NAME,
- function="extract_best_frame_with_gemini_ai",
- message="video process successfully",
- data={
- "task_id": task["id"],
- "file_name": file_name,
- "state": state,
- "best_frame_time_ms": best_frame_time_ms,
- },
- )
- except Exception as e:
- log(
- task=const.TASK_NAME,
- function="extract_best_frame_with_gemini_ai",
- message="task_failed_inside_cycle",
- data={
- "task_id": task["id"],
- "track_back": traceback.format_exc(),
- "error": str(e),
- },
- )
- update_task_queue_status(
- db_client=self.db_client,
- task_id=task["id"],
- task="extract",
- ori_status=const.PROCESSING_STATUS,
- new_status=const.FAIL_STATUS,
- )
- except Exception as e:
- log(
- task=const.TASK_NAME,
- function="extract_best_frame_with_gemini_ai",
- message="task_failed_outside_cycle",
- data={
- "task_id": task["id"],
- "track_back": traceback.format_exc(),
- "error": str(e),
- },
- )
- update_task_queue_status(
- db_client=self.db_client,
- task_id=task["id"],
- task="extract",
- ori_status=const.PROCESSING_STATUS,
- new_status=const.FAIL_STATUS,
- )
- def extract_best_frame_with_gemini_ai(self):
- # roll back lock tasks
- roll_back_lock_tasks_count = self._roll_back_lock_tasks(task="extract")
- log(
- task=const.TASK_NAME,
- function="extract_best_frame_with_gemini_ai",
- message=f"roll_back_lock_tasks_count: {roll_back_lock_tasks_count}",
- )
- # do extract frame task
- task_list = self.get_extract_task_list()
- for task in tqdm(task_list, desc="extract_best_frame_with_gemini_ai"):
- self.extract_each_video(task=task)
- def get_each_cover(self, task: dict) -> None:
- lock_status = self._lock_task(task_id=task["id"], task_name="get_cover")
- if not lock_status:
- return None
- time_str = normalize_time_str(task["best_frame_time_ms"])
- if time_str:
- response = get_video_cover(
- video_oss_path=task["video_oss_path"], time_millisecond_str=time_str
- )
- log(
- task=const.TASK_NAME,
- function="extract_cover_with_ffmpeg",
- message="get_video_cover_with_ffmpeg",
- data={
- "task_id": task["id"],
- "video_oss_path": task["video_oss_path"],
- "time_millisecond_str": time_str,
- "response": response,
- },
- )
- if response["success"] and response["data"]:
- cover_oss_path = response["data"]
- self.set_cover_result(task_id=task["id"], cover_oss_path=cover_oss_path)
- else:
- update_task_queue_status(
- db_client=self.db_client,
- task_id=task["id"],
- task="get_cover",
- ori_status=const.PROCESSING_STATUS,
- new_status=const.FAIL_STATUS,
- )
- else:
- log(
- task=const.TASK_NAME,
- function="extract_cover_with_ffmpeg",
- message="time_str format is not correct",
- data={
- "task_id": task["id"],
- "video_oss_path": task["video_oss_path"],
- "time_millisecond_str": time_str,
- },
- )
- update_task_queue_status(
- db_client=self.db_client,
- task_id=task["id"],
- task="get_cover",
- ori_status=const.PROCESSING_STATUS,
- new_status=const.FAIL_STATUS,
- )
- def get_cover_with_best_frame(self):
- """
- get cover with best frame
- """
- # roll back lock tasks
- roll_back_lock_tasks_count = self._roll_back_lock_tasks(task="get_cover")
- log(
- task=const.TASK_NAME,
- function="extract_cover_with_ffmpeg",
- message=f"roll_back_lock_tasks_count: {roll_back_lock_tasks_count}",
- )
- # get task list
- task_list = self.get_cover_task_list()
- for task in tqdm(task_list, desc="extract_cover_with_ffmpeg"):
- self.get_each_cover(task=task)
|