""" @author: luojunhui """ from pymysql.cursors import DictCursor from tqdm import tqdm from applications.api import deep_seek_api from applications.const import VideoToTextConst from applications.db import DatabaseConnector from config import long_articles_config const = VideoToTextConst() def generate_prompt(text): """ 生成prompt """ prompt = f""" 你是1个优秀的公众号文章写作大师,我对你有以下要求 文章: {text} 1.请仔细阅读以上公众号文章,挑选文章中最吸引人的情节或话题,总结为100字左右文章精彩总结(字数计算包括标点符号)。 句子段落之间以悬念承接,可以吸引读者往下读第二句。 2.在这100字内容的结尾处,增加1-2句话的引导,引导大家去观看上面的视频了解详情。注意是点击上面的视频,不是下面的视频。 你最终输出一段总结内容,不用加标题或者主题,也不用写第几段、多少字这样的话。整体的语言风格要口语化、直接点,要让60岁以上的老年人能看懂、能共情。人的名字尽量用全名,不用简称。 """ return prompt class ArticleSummaryTask(object): """ 文章总结任务 """ def __init__(self): self.db_client = None def connect_db(self): """ 连接数据库 """ self.db_client = DatabaseConnector(db_config=long_articles_config) self.db_client.connect() def get_task_list(self): """ 获取任务列表 """ select_sql = f""" select t1.video_text, t2.audit_video_id from video_content_understanding t1 join publish_single_video_source t2 on t1.pq_vid = t2.audit_video_id where t1.status = {const.VIDEO_UNDERSTAND_SUCCESS_STATUS} and t2.bad_status = {const.ARTICLE_GOOD_STATUS} and t2.extract_status = {const.EXTRACT_INIT_STATUS}; """ task_list = self.db_client.fetch(select_sql, cursor_type=DictCursor) return task_list def process_each_task(self, task): """ 处理每个任务 """ video_text = task["video_text"] audit_video_id = task["audit_video_id"] # 开始处理,将extract_status更新为101 update_sql = f""" update publish_single_video_source set extract_status = %s where audit_video_id = %s and extract_status = %s; """ affected_rows = self.db_client.save( query=update_sql, params=(const.EXTRACT_PROCESSING_STATUS, audit_video_id, const.EXTRACT_INIT_STATUS) ) if not affected_rows: return try: # 生成prompt prompt = generate_prompt(video_text) response = deep_seek_api(model="DeepSeek-R1", prompt=prompt) if response: update_sql = f""" update publish_single_video_source set extract_status = %s, summary_text = %s where audit_video_id = %s and extract_status = %s; """ affected_rows = self.db_client.save( query=update_sql, params=( const.EXTRACT_SUCCESS_STATUS, response.strip(), audit_video_id, const.EXTRACT_PROCESSING_STATUS ) ) print(affected_rows) else: update_sql = f""" update publish_single_video_source set extract_status = %s where audit_video_id = %s and extract_status = %s; """ affected_rows = self.db_client.save( query=update_sql, params=( const.EXTRACT_FAIL_STATUS, audit_video_id, const.EXTRACT_PROCESSING_STATUS ) ) print(affected_rows) except Exception as e: print(e) # set as fail update_sql = f""" update publish_single_video_source set extract_status = %s where audit_video_id = %s and extract_status = %s; """ self.db_client.save( query=update_sql, params=( const.EXTRACT_FAIL_STATUS, audit_video_id, const.EXTRACT_PROCESSING_STATUS ) ) def deal(self): """ 开始处理任务 """ task_list = self.get_task_list() for task in tqdm(task_list): try: self.process_each_task(task) except Exception as e: print(e) continue