""" @author: luojunhui """ from pymysql.cursors import DictCursor from tqdm import tqdm from applications.api import deep_seek_api from applications.db import DatabaseConnector from config import long_articles_config def generate_prompt(text): """ 生成prompt """ prompt = f""" 你是1个优秀的公众号文章写作大师,我对你有以下要求 文章: {text} 1.请仔细阅读以上公众号文章,挑选文章中最吸引人的情节或话题,总结为100字左右文章精彩总结(字数计算包括标点符号)。 句子段落之间以悬念承接,可以吸引读者往下读第二句。 2.在这100字内容的结尾处,增加1-2句话的引导,引导大家去观看上面的视频了解详情。注意是点击上面的视频,不是下面的视频。 你最终输出一段总结内容,不用加标题或者主题,也不用写第几段、多少字这样的话。整体的语言风格要口语化、直接点,要让60岁以上的老年人能看懂、能共情。人的名字尽量用全名,不用简称。 """ return prompt class ArticleSummaryTask(object): """ 文章总结任务 """ def __init__(self): self.db_client = None def connect_db(self): """ 连接数据库 """ self.db_client = DatabaseConnector(db_config=long_articles_config) self.db_client.connect() def get_task_list(self): """ 获取任务列表 """ select_sql = f""" select t1.video_text, t2.audit_video_id from video_content_understanding t1 join publish_single_video_source t2 on t1.pq_vid = t2.audit_video_id where t1.status = 2 and t2.bad_status = 0 and t2.extract_status = 0; """ task_list = self.db_client.fetch(select_sql, cursor_type=DictCursor) return task_list def process_each_task(self, task): """ task: { "video_text": "视频内容", "audit_video_id": "视频id" } """ video_text = task["video_text"] audit_video_id = task["audit_video_id"] # 开始处理,将extract_status更新为101 update_sql = f""" update publish_single_video_source set extract_status = %s where audit_video_id = %s and extract_status = %s """ affected_rows = self.db_client.save( query=update_sql, params=(101, audit_video_id, 0) ) if not affected_rows: return # 生成prompt prompt = generate_prompt(video_text) response = deep_seek_api(model="DeepSeek-R1", prompt=prompt) if response: update_sql = f""" update publish_single_video_source set extract_status = %s, summary_text = %s where audit_video_id = %s and extract_status = %s; """ affected_rows = self.db_client.save( query=update_sql, params=(2, response.strip(), audit_video_id, 101) ) print(affected_rows) else: update_sql = f""" update publish_single_video_source set extract_status = %s where audit_video_id = %s and extract_status = %s; """ affected_rows = self.db_client.save( query=update_sql, params=(99, audit_video_id, 101) ) print(affected_rows) def deal(self): """ 开始处理任务 """ task_list = self.get_task_list() for task in tqdm(task_list): try: self.process_each_task(task) except Exception as e: print(e) continue