123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354 |
- """
- @author: luojunhui
- """
- import os
- import time
- import datetime
- import traceback
- from pymysql.cursors import DictCursor
- from tqdm import tqdm
- from applications import log
- from applications.api import GoogleAIAPI
- from applications.const import VideoToTextConst
- from applications.db import DatabaseConnector
- from config import long_articles_config
- from config import apolloConfig
- from coldStartTasks.ai_pipeline.basic import download_file
- from coldStartTasks.ai_pipeline.basic import update_task_queue_status
- from coldStartTasks.ai_pipeline.basic import roll_back_lock_tasks
- # 办公室网络调试需要打开代理
- # os.environ["HTTP_PROXY"] = "http://192.168.100.20:1087"
- # os.environ["HTTPS_PROXY"] = "http://192.168.100.20:1087"
- const = VideoToTextConst()
- config = apolloConfig(env="prod")
- # pool_size
- POOL_SIZE = int(config.getConfigValue("video_extract_pool_size"))
- # batch_size
- BATCH_SIZE = int(config.getConfigValue("video_extract_batch_size"))
- class GenerateTextFromVideo(object):
- """
- 从视频中生成文本
- """
- def __init__(self):
- self.google_ai_api = GoogleAIAPI()
- self.db = DatabaseConnector(db_config=long_articles_config)
- self.db.connect()
- def get_upload_task_list(self, task_length: int) -> list[dict]:
- """
- 获取上传视频任务,优先处理高流量池视频内容
- """
- fetch_query = f"""
- select t1.id, t1.video_oss_path
- from video_content_understanding t1
- join publish_single_video_source t2 on t1.content_trace_id = t2.content_trace_id
- where t1.upload_status = {const.INIT_STATUS}
- and t2.video_pool_audit_status = {const.AUDIT_SUCCESS_STATUS}
- and t2.bad_status = {const.ARTICLE_GOOD_STATUS}
- order by t2.flow_pool_level
- limit {task_length};
- """
- task_list = self.db.fetch(query=fetch_query, cursor_type=DictCursor)
- return task_list
- def get_extract_task_list(self) -> list[dict]:
- """
- 获取处理视频转文本任务
- """
- fetch_query = f"""
- select id, file_name, video_ori_title
- from video_content_understanding
- where upload_status = {const.SUCCESS_STATUS} and understanding_status = {const.INIT_STATUS}
- order by file_expire_time
- limit {BATCH_SIZE};
- """
- task_list = self.db.fetch(query=fetch_query, cursor_type=DictCursor)
- return task_list
- def get_processing_task_num(self) -> int:
- """
- get the number of processing task
- """
- select_query = f"""
- select count(1) as processing_count
- from video_content_understanding
- where file_state = 'PROCESSING' and upload_status = {const.SUCCESS_STATUS};
- """
- fetch_response = self.db.fetch(query=select_query, cursor_type=DictCursor)
- processing_task_num = (
- fetch_response[0]["processing_count"] if fetch_response else 0
- )
- return processing_task_num
- def set_upload_result_for_task(
- self, task_id: str, file_name: str, file_state: str, expire_time: str
- ) -> int:
- """
- set upload result for task
- """
- update_query = f"""
- update video_content_understanding
- set upload_status = %s, upload_status_ts = %s,
- file_name = %s, file_state = %s, file_expire_time = %s
- where id = %s and upload_status = %s;
- """
- affected_rows = self.db.save(
- query=update_query,
- params=(
- const.SUCCESS_STATUS,
- datetime.datetime.now(),
- file_name,
- file_state,
- expire_time,
- task_id,
- const.PROCESSING_STATUS,
- ),
- )
- return affected_rows
- def set_understanding_result_for_task(
- self, task_id: str, state: str, text: str
- ) -> int:
- update_query = f"""
- update video_content_understanding
- set understanding_status = %s, video_text = %s, file_state = %s
- where id = %s and understanding_status = %s;
- """
- affected_rows = self.db.save(
- query=update_query,
- params=(
- const.SUCCESS_STATUS,
- text,
- state,
- task_id,
- const.PROCESSING_STATUS,
- ),
- )
- return affected_rows
- def upload_video_to_google_ai_task(
- self, max_processing_video_count: int = POOL_SIZE
- ):
- """
- upload video to google AI and wait for processing
- """
- # rollback lock tasks
- rollback_rows = roll_back_lock_tasks(
- db_client=self.db,
- process="upload",
- init_status=const.INIT_STATUS,
- processing_status=const.PROCESSING_STATUS,
- max_process_time=const.MAX_PROCESSING_TIME,
- )
- tqdm.write("upload rollback_lock_tasks: {}".format(rollback_rows))
- processing_task_num = self.get_processing_task_num()
- rest_video_count = max_processing_video_count - processing_task_num
- if rest_video_count:
- task_list = self.get_upload_task_list(rest_video_count)
- for task in tqdm(task_list, desc="upload_video_task"):
- lock_rows = update_task_queue_status(
- db_client=self.db,
- task_id=task["id"],
- process="upload",
- ori_status=const.INIT_STATUS,
- new_status=const.PROCESSING_STATUS,
- )
- if not lock_rows:
- continue
- try:
- file_path = download_file(task["id"], task["video_oss_path"])
- google_upload_result = self.google_ai_api.upload_file(file_path)
- if google_upload_result:
- file_name, file_state, expire_time = google_upload_result
- self.set_upload_result_for_task(
- task_id=task["id"],
- file_name=file_name,
- file_state=file_state,
- expire_time=expire_time,
- )
- else:
- # roll back status
- update_task_queue_status(
- db_client=self.db,
- task_id=task["id"],
- process="upload",
- ori_status=const.PROCESSING_STATUS,
- new_status=const.FAIL_STATUS,
- )
- log(
- task="video_to_text",
- function="upload_video_to_google_ai_task",
- message="upload_video_to_google_ai_task failed",
- data={
- "task_id": task["id"],
- },
- )
- except Exception as e:
- log(
- task="video_to_text",
- function="upload_video_to_google_ai_task",
- message="upload_video_to_google_ai_task failed",
- data={
- "error": str(e),
- "traceback": traceback.format_exc(),
- "task_id": task["id"],
- },
- )
- # roll back status
- update_task_queue_status(
- db_client=self.db,
- task_id=task["id"],
- process="upload",
- ori_status=const.PROCESSING_STATUS,
- new_status=const.FAIL_STATUS,
- )
- else:
- log(
- task="video_to_text",
- function="upload_video_to_google_ai_task",
- message="task pool is full",
- )
- def convert_video_to_text_with_google_ai_task(self):
- """
- 处理视频转文本任务
- """
- rollback_rows = roll_back_lock_tasks(
- db_client=self.db,
- process="understanding",
- init_status=const.INIT_STATUS,
- processing_status=const.PROCESSING_STATUS,
- max_process_time=const.MAX_PROCESSING_TIME,
- )
- tqdm.write("extract rollback_lock_tasks: {}".format(rollback_rows))
- task_list = self.get_extract_task_list()
- for task in tqdm(task_list, desc="convert video to text"):
- # LOCK TASK
- lock_row = update_task_queue_status(
- db_client=self.db,
- task_id=task["id"],
- process="understanding",
- ori_status=const.INIT_STATUS,
- new_status=const.PROCESSING_STATUS,
- )
- if not lock_row:
- print("Task has benn locked by other process")
- continue
- file_name = task["file_name"]
- video_local_path = "static/{}.mp4".format(task["id"])
- try:
- google_file = self.google_ai_api.get_google_file(file_name)
- state = google_file.state.name
- match state:
- case "ACTIVE":
- try:
- video_text = self.google_ai_api.get_video_text(
- prompt="分析我上传的视频的画面和音频,用叙述故事的风格将视频所描述的事件进行总结,需要保证视频内容的完整性,并且用中文进行输出,直接返回生成的文本",
- video_file=google_file,
- )
- if video_text:
- self.set_understanding_result_for_task(
- task_id=task["id"], state=state, text=video_text
- )
- # delete local file and google file
- if os.path.exists(video_local_path):
- os.remove(video_local_path)
- tqdm.write(
- "video transform to text success, delete local file"
- )
- task_list.remove(task)
- self.google_ai_api.delete_video(file_name)
- tqdm.write(
- "delete video from google success: {}".format(
- file_name
- )
- )
- else:
- # roll back status and wait for next process
- update_task_queue_status(
- db_client=self.db,
- task_id=task["id"],
- process="understanding",
- ori_status=const.PROCESSING_STATUS,
- new_status=const.INIT_STATUS,
- )
- except Exception as e:
- # roll back status
- update_task_queue_status(
- db_client=self.db,
- task_id=task["id"],
- process="understanding",
- ori_status=const.PROCESSING_STATUS,
- new_status=const.FAIL_STATUS,
- )
- tqdm.write(str(e))
- continue
- case "PROCESSING":
- update_task_queue_status(
- db_client=self.db,
- task_id=task["id"],
- process="understanding",
- ori_status=const.PROCESSING_STATUS,
- new_status=const.INIT_STATUS,
- )
- tqdm.write("video is still processing")
- case "FAILED":
- update_sql = f"""
- update video_content_understanding
- set file_state = %s, understanding_status = %s, understanding_status_ts = %s
- where id = %s and understanding_status = %s;
- """
- self.db.save(
- query=update_sql,
- params=(
- state,
- const.FAIL_STATUS,
- datetime.datetime.now(),
- task["id"],
- const.PROCESSING_STATUS,
- ),
- )
- # delete local file and google file
- if os.path.exists(video_local_path):
- os.remove(video_local_path)
- self.google_ai_api.delete_video(file_name)
- task_list.remove(task)
- tqdm.write("video process failed, delete local file")
- time.sleep(const.SLEEP_SECONDS)
- except Exception as e:
- log(
- task="video_to_text",
- function="extract_video_to_text_task",
- message="extract video to text task failed",
- data={
- "error": str(e),
- "traceback": traceback.format_exc(),
- "task_id": task["id"],
- },
- )
- update_task_queue_status(
- db_client=self.db,
- task_id=task["id"],
- process="understanding",
- ori_status=const.PROCESSING_STATUS,
- new_status=const.FAIL_STATUS,
- )
|