""" @author: luojunhui """ import time import traceback from pymysql.cursors import DictCursor from tqdm import tqdm from applications.api import fetch_deepseek_response from applications.const import VideoToTextConst from applications.db import DatabaseConnector from config import long_articles_config const = VideoToTextConst() def generate_prompt(text): """ 生成prompt """ prompt = f""" 你是1个优秀的公众号文章写作大师,我对你有以下要求 文章: {text} 1.请仔细阅读以上公众号文章,挑选文章中最吸引人的情节或话题,总结为100字左右文章精彩总结(字数计算包括标点符号)。 句子段落之间以悬念承接,可以吸引读者往下读第二句。 2.在这100字内容的结尾处,增加1-2句话的引导,引导大家去观看上面的视频了解详情。注意是点击上面的视频,不是下面的视频。 你最终输出一段总结内容,不用加标题或者主题,也不用写第几段、多少字这样的话。整体的语言风格要口语化、直接点,要让60岁以上的老年人能看懂、能共情。人的名字尽量用全名,不用简称。 """ return prompt class ArticleSummaryTask(object): """ 文章总结任务 """ def __init__(self): self.db_client = None def connect_db(self): """ 连接数据库 """ self.db_client = DatabaseConnector(db_config=long_articles_config) self.db_client.connect() def get_task_list(self): """ 获取任务列表 """ select_sql = f""" select id, video_text from video_content_understanding where summary_status = {const.SUMMARY_INIT_STATUS} and status = {const.VIDEO_UNDERSTAND_SUCCESS_STATUS} limit {const.SUMMARY_BATCH_SIZE}; """ task_list = self.db_client.fetch(select_sql, cursor_type=DictCursor) return task_list def rollback_lock_tasks(self): """ rollback tasks which have been locked for a long time """ now_timestamp = int(time.time()) timestamp_threshold = now_timestamp - const.MAX_PROCESSING_TIME update_sql = f""" update video_content_understanding set summary_status = %s where summary_status = %s and status_update_timestamp < %s; """ rollback_rows = self.db_client.save( query=update_sql, params=(const.SUMMARY_INIT_STATUS, const.SUMMARY_LOCK, timestamp_threshold), ) return rollback_rows def handle_task_execution(self, task): """ :param task: keys: [id, video_text] """ task_id = task["id"] video_text = task["video_text"] # Lock Task affected_rows = self.update_task_status( task_id, const.SUMMARY_INIT_STATUS, const.SUMMARY_LOCK ) if not affected_rows: return try: # generate prompt prompt = generate_prompt(video_text) # get result from deep seek AI result = fetch_deepseek_response(model="DeepSeek-R1", prompt=prompt) if result: # set as success and update summary text self.set_summary_text_for_task(task_id, result.strip()) else: # set as fail self.update_task_status( task_id, const.SUMMARY_LOCK, const.SUMMARY_FAIL_STATUS ) except Exception as e: print(e) print(traceback.format_exc()) # set as fail self.update_task_status( task_id, const.SUMMARY_LOCK, const.SUMMARY_FAIL_STATUS ) def set_summary_text_for_task(self, task_id, text): """ successfully get summary text and update summary text to database """ update_sql = f""" update video_content_understanding set summary_status = %s, summary_text = %s, status_update_timestamp = %s where id = %s and summary_status = %s; """ affected_rows = self.db_client.save( query=update_sql, params=( const.SUMMARY_SUCCESS_STATUS, text, int(time.time()), task_id, const.SUMMARY_LOCK, ), ) return affected_rows def update_task_status(self, task_id, ori_status, new_status): """ 修改任务状态 """ update_sql = f""" update video_content_understanding set summary_status = %s, status_update_timestamp = %s where id = %s and summary_status = %s; """ update_rows = self.db_client.save( update_sql, (new_status, int(time.time()), task_id, ori_status) ) return update_rows def deal(self): """ entrance function for this class """ # first of all rollback tasks which have been locked for a long time rollback_rows = self.rollback_lock_tasks() print("rollback_lock_tasks: {}".format(rollback_rows)) # get task list task_list = self.get_task_list() for task in tqdm(task_list, desc="handle each task"): try: self.handle_task_execution(task=task) except Exception as e: print("error: {}".format(e)) print(traceback.format_exc())