123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165 |
- """
- @author: luojunhui
- """
- import time
- import datetime
- import traceback
- from pymysql.cursors import DictCursor
- from tqdm import tqdm
- from applications import log
- from applications.api import fetch_deepseek_response
- from applications.const import VideoToTextConst
- from applications.db import DatabaseConnector
- from config import long_articles_config
- from coldStartTasks.ai_pipeline.basic import generate_summary_prompt
- from coldStartTasks.ai_pipeline.basic import update_video_pool_status
- from coldStartTasks.ai_pipeline.basic import update_task_queue_status
- const = VideoToTextConst()
- class ArticleSummaryTask(object):
- """
- 文章总结任务
- """
- def __init__(self):
- self.db_client = DatabaseConnector(db_config=long_articles_config)
- self.db_client.connect()
- def get_summary_task_list(self) -> list[dict]:
- """
- 获取任务列表
- """
- fetch_query = f"""
- select id, content_trace_id, video_text
- from video_content_understanding
- where summary_status = {const.INIT_STATUS} and understanding_status = {const.SUCCESS_STATUS}
- limit {const.SUMMARY_BATCH_SIZE};
- """
- task_list = self.db_client.fetch(query=fetch_query, cursor_type=DictCursor)
- return task_list
- def rollback_lock_tasks(self) -> int:
- """
- rollback tasks which have been locked for a long time
- """
- now_timestamp = int(time.time())
- timestamp_threshold = now_timestamp - const.MAX_PROCESSING_TIME
- update_sql = f"""
- update video_content_understanding
- set summary_status = %s
- where summary_status = %s and summary_status_ts < %s;
- """
- rollback_rows = self.db_client.save(
- query=update_sql,
- params=(const.INIT_STATUS, const.PROCESSING_STATUS, timestamp_threshold),
- )
- return rollback_rows
- def handle_task_execution(self, task):
- """
- :param task: keys: [id, video_text]
- """
- task_id = task["id"]
- content_trace_id = task["content_trace_id"]
- video_text = task["video_text"]
- # Lock Task
- affected_rows = update_task_queue_status(
- db_client=self.db_client,
- task_id=task_id,
- process="summary",
- ori_status=const.INIT_STATUS,
- new_status=const.PROCESSING_STATUS,
- )
- if not affected_rows:
- return
- # fetch summary text from AI
- try:
- # generate prompt
- prompt = generate_summary_prompt(video_text)
- # get result from deep seek AI
- result = fetch_deepseek_response(model="DeepSeek-R1", prompt=prompt)
- if result:
- # set as success and update summary text
- self.set_summary_text_for_task(task_id, result.strip())
- task_status = const.SUCCESS_STATUS
- else:
- # set as fail
- update_task_queue_status(
- db_client=self.db_client,
- task_id=task_id,
- process="summary",
- ori_status=const.PROCESSING_STATUS,
- new_status=const.FAIL_STATUS,
- )
- task_status = const.FAIL_STATUS
- except Exception as e:
- # set as fail
- update_task_queue_status(
- db_client=self.db_client,
- task_id=task_id,
- process="summary",
- ori_status=const.PROCESSING_STATUS,
- new_status=const.FAIL_STATUS,
- )
- task_status = const.FAIL_STATUS
- log(
- task="article_summary_task",
- function="fetch_deepseek_response",
- message="fetch_deepseek_response failed",
- data={"error": str(e), "trace_back": traceback.format_exc()},
- )
- # update video pool status
- update_video_pool_status(
- self.db_client, content_trace_id, const.PROCESSING_STATUS, task_status
- )
- def set_summary_text_for_task(self, task_id, text):
- """
- successfully get summary text and update summary text to database
- """
- update_sql = f"""
- update video_content_understanding
- set summary_status = %s, summary_text = %s, understanding_status_ts = %s
- where id = %s and summary_status = %s;
- """
- affected_rows = self.db_client.save(
- query=update_sql,
- params=(
- const.SUCCESS_STATUS,
- text,
- datetime.datetime.now(),
- task_id,
- const.PROCESSING_STATUS,
- ),
- )
- return affected_rows
- def deal(self):
- """
- entrance function for this class
- """
- # first of all rollback tasks which have been locked for a long time
- rollback_rows = self.rollback_lock_tasks()
- tqdm.write("rollback_lock_tasks: {}".format(rollback_rows))
- # get task list
- task_list = self.get_summary_task_list()
- for task in tqdm(task_list, desc="handle each task"):
- try:
- self.handle_task_execution(task=task)
- except Exception as e:
- log(
- task="article_summary_task",
- function="deal",
- message="fetch_deepseek_response",
- data={"error": str(e), "trace_back": traceback.format_exc()},
- )
|