""" @author: luojunhui """ import time import datetime import traceback from pymysql.cursors import DictCursor from tqdm import tqdm from applications.api import fetch_deepseek_response from applications.const import VideoToTextConst from applications.db import DatabaseConnector from config import long_articles_config const = VideoToTextConst() def generate_prompt(text): """ 生成prompt """ prompt = f""" 你是1个优秀的公众号文章写作大师,我对你有以下要求 视频总结:{text} 第一个要求:请仔细阅读以上视频总结,挑选其中最吸引人的情节或话题,总结为100字左右文章精彩总结(字数计算包括标点符号),这部分内容为段落1。 句子段落之间以悬念承接,可以吸引读者往下读第二句。 第二个要求:在这100字内容的结尾处,增加1-2句话的引导,引导大家去观看上面的视频了解详情,可以加一些emoji表情。注意是点击上面的视频,不是下面的视频。这部分内容为段落2。 你最终输出一段总结内容,将第一段和第二段之间空格一行,并且对所有文字进行加粗处理。不用加标题或者主题,也不用写第几段、多少字这样的话。整体的语言风格要口语化、直接点,要让60岁以上的老年人能看懂、能共情。人的名字尽量用全名,不用简称。 """ return prompt class ArticleSummaryTask(object): """ 文章总结任务 """ def __init__(self): self.db_client = DatabaseConnector(db_config=long_articles_config) self.db_client.connect() def get_task_list(self): """ 获取任务列表 """ select_sql = f""" select id, video_text from video_content_understanding where summary_status = {const.INIT_STATUS} and understanding_status = {const.SUCCESS_STATUS} limit {const.SUMMARY_BATCH_SIZE}; """ task_list = self.db_client.fetch(select_sql, cursor_type=DictCursor) return task_list def rollback_lock_tasks(self): """ rollback tasks which have been locked for a long time """ now_timestamp = int(time.time()) timestamp_threshold = now_timestamp - const.MAX_PROCESSING_TIME update_sql = f""" update video_content_understanding set summary_status = %s where summary_status = %s and summary_status_ts < %s; """ rollback_rows = self.db_client.save( query=update_sql, params=(const.INIT_STATUS, const.PROCESSING_STATUS, timestamp_threshold), ) return rollback_rows def handle_task_execution(self, task): """ :param task: keys: [id, video_text] """ task_id = task["id"] video_text = task["video_text"] # Lock Task affected_rows = self.update_task_status( task_id, const.INIT_STATUS, const.PROCESSING_STATUS ) if not affected_rows: return try: # generate prompt prompt = generate_prompt(video_text) # get result from deep seek AI result = fetch_deepseek_response(model="DeepSeek-R1", prompt=prompt) if result: # set as success and update summary text self.set_summary_text_for_task(task_id, result.strip()) else: # set as fail self.update_task_status( task_id, const.PROCESSING_STATUS, const.FAIL_STATUS ) except Exception as e: print(e) print(traceback.format_exc()) # set as fail self.update_task_status( task_id, const.PROCESSING_STATUS, const.FAIL_STATUS ) def set_summary_text_for_task(self, task_id, text): """ successfully get summary text and update summary text to database """ update_sql = f""" update video_content_understanding set summary_status = %s, summary_text = %s, understanding_status_ts = %s where id = %s and summary_status = %s; """ affected_rows = self.db_client.save( query=update_sql, params=( const.SUCCESS_STATUS, text, datetime.datetime.now(), task_id, const.PROCESSING_STATUS ), ) return affected_rows def update_task_status(self, task_id, ori_status, new_status): """ 修改任务状态 """ update_sql = f""" update video_content_understanding set summary_status = %s, summary_status_ts = %s where id = %s and summary_status = %s; """ update_rows = self.db_client.save( update_sql, (new_status, datetime.datetime.now(), task_id, ori_status) ) return update_rows def deal(self): """ entrance function for this class """ # first of all rollback tasks which have been locked for a long time rollback_rows = self.rollback_lock_tasks() print("rollback_lock_tasks: {}".format(rollback_rows)) # get task list task_list = self.get_task_list() for task in tqdm(task_list, desc="handle each task"): try: self.handle_task_execution(task=task) except Exception as e: print("error: {}".format(e)) print(traceback.format_exc())