""" @author: luojunhui """ import time import datetime import traceback from pymysql.cursors import DictCursor from tqdm import tqdm from applications import log from applications.api import fetch_deepseek_response from applications.const import VideoToTextConst from applications.db import DatabaseConnector from config import long_articles_config from coldStartTasks.ai_pipeline.basic import generate_summary_prompt from coldStartTasks.ai_pipeline.basic import update_video_pool_status from coldStartTasks.ai_pipeline.basic import update_task_queue_status const = VideoToTextConst() class ArticleSummaryTask(object): """ 文章总结任务 """ def __init__(self): self.db_client = DatabaseConnector(db_config=long_articles_config) self.db_client.connect() def get_summary_task_list(self) -> list[dict]: """ 获取任务列表 """ fetch_query = f""" select id, content_trace_id, video_text from video_content_understanding where summary_status = {const.INIT_STATUS} and understanding_status = {const.SUCCESS_STATUS} limit {const.SUMMARY_BATCH_SIZE}; """ task_list = self.db_client.fetch(query=fetch_query, cursor_type=DictCursor) return task_list def rollback_lock_tasks(self) -> int: """ rollback tasks which have been locked for a long time """ now_timestamp = int(time.time()) timestamp_threshold = now_timestamp - const.MAX_PROCESSING_TIME update_sql = f""" update video_content_understanding set summary_status = %s where summary_status = %s and summary_status_ts < %s; """ rollback_rows = self.db_client.save( query=update_sql, params=(const.INIT_STATUS, const.PROCESSING_STATUS, timestamp_threshold), ) return rollback_rows def handle_task_execution(self, task): """ :param task: keys: [id, video_text] """ task_id = task["id"] content_trace_id = task["content_trace_id"] video_text = task["video_text"] # Lock Task affected_rows = update_task_queue_status( db_client=self.db_client, task_id=task_id, process="summary", ori_status=const.INIT_STATUS, new_status=const.PROCESSING_STATUS, ) if not affected_rows: return # fetch summary text from AI try: # generate prompt prompt = generate_summary_prompt(video_text) # get result from deep seek AI result = fetch_deepseek_response(model="DeepSeek-R1", prompt=prompt) if result: # set as success and update summary text self.set_summary_text_for_task(task_id, result.strip()) task_status = const.SUCCESS_STATUS else: # set as fail update_task_queue_status( db_client=self.db_client, task_id=task_id, process="summary", ori_status=const.PROCESSING_STATUS, new_status=const.FAIL_STATUS, ) task_status = const.FAIL_STATUS except Exception as e: # set as fail update_task_queue_status( db_client=self.db_client, task_id=task_id, process="summary", ori_status=const.PROCESSING_STATUS, new_status=const.FAIL_STATUS, ) task_status = const.FAIL_STATUS log( task="article_summary_task", function="fetch_deepseek_response", message="fetch_deepseek_response failed", data={"error": str(e), "trace_back": traceback.format_exc()}, ) # update video pool status update_video_pool_status( self.db_client, content_trace_id, const.PROCESSING_STATUS, task_status ) def set_summary_text_for_task(self, task_id, text): """ successfully get summary text and update summary text to database """ update_sql = f""" update video_content_understanding set summary_status = %s, summary_text = %s, understanding_status_ts = %s where id = %s and summary_status = %s; """ affected_rows = self.db_client.save( query=update_sql, params=( const.SUCCESS_STATUS, text, datetime.datetime.now(), task_id, const.PROCESSING_STATUS, ), ) return affected_rows def deal(self): """ entrance function for this class """ # first of all rollback tasks which have been locked for a long time rollback_rows = self.rollback_lock_tasks() tqdm.write("rollback_lock_tasks: {}".format(rollback_rows)) # get task list task_list = self.get_summary_task_list() for task in tqdm(task_list, desc="handle each task"): try: self.handle_task_execution(task=task) except Exception as e: log( task="article_summary_task", function="deal", message="fetch_deepseek_response", data={"error": str(e), "trace_back": traceback.format_exc()}, )