123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115 |
- """
- @author: luojunhui
- """
- from pymysql.cursors import DictCursor
- from tqdm import tqdm
- from applications.api import deep_seek_api
- from applications.db import DatabaseConnector
- from config import long_articles_config
- def generate_prompt(text):
- """
- 生成prompt
- """
- prompt = f"""
- 你是1个优秀的公众号文章写作大师,我对你有以下要求
- 文章: {text}
- 1.请仔细阅读以上公众号文章,挑选文章中最吸引人的情节或话题,总结为100字左右文章精彩总结(字数计算包括标点符号)。
- 句子段落之间以悬念承接,可以吸引读者往下读第二句。
- 2.在这100字内容的结尾处,增加1-2句话的引导,引导大家去观看上面的视频了解详情。注意是点击上面的视频,不是下面的视频。
- 你最终输出一段总结内容,不用加标题或者主题,也不用写第几段、多少字这样的话。整体的语言风格要口语化、直接点,要让60岁以上的老年人能看懂、能共情。人的名字尽量用全名,不用简称。
- """
- return prompt
- class ArticleSummaryTask(object):
- """
- 文章总结任务
- """
- def __init__(self):
- self.db_client = None
- def connect_db(self):
- """
- 连接数据库
- """
- self.db_client = DatabaseConnector(db_config=long_articles_config)
- self.db_client.connect()
- def get_task_list(self):
- """
- 获取任务列表
- """
- select_sql = f"""
- select t1.video_text, t2.audit_video_id
- from video_content_understanding t1 join publish_single_video_source t2 on t1.pq_vid = t2.audit_video_id
- where t1.status = 2 and t2.bad_status = 0 and t2.extract_status = 0;
- """
- task_list = self.db_client.fetch(select_sql, cursor_type=DictCursor)
- return task_list
- def process_each_task(self, task):
- """
- task: {
- "video_text": "视频内容",
- "audit_video_id": "视频id"
- }
- """
- video_text = task["video_text"]
- audit_video_id = task["audit_video_id"]
- # 开始处理,将extract_status更新为101
- update_sql = f"""
- update publish_single_video_source set extract_status = %s where audit_video_id = %s and extract_status = %s
- """
- affected_rows = self.db_client.save(
- query=update_sql,
- params=(101, audit_video_id, 0)
- )
- if not affected_rows:
- return
- # 生成prompt
- prompt = generate_prompt(video_text)
- response = deep_seek_api(model="DeepSeek-R1", prompt=prompt)
- if response:
- update_sql = f"""
- update publish_single_video_source
- set extract_status = %s, summary_text = %s
- where audit_video_id = %s and extract_status = %s;
- """
- affected_rows = self.db_client.save(
- query=update_sql,
- params=(2, response.strip(), audit_video_id, 101)
- )
- print(affected_rows)
- else:
- update_sql = f"""
- update publish_single_video_source
- set extract_status = %s
- where audit_video_id = %s and extract_status = %s;
- """
- affected_rows = self.db_client.save(
- query=update_sql,
- params=(99, audit_video_id, 101)
- )
- print(affected_rows)
- def deal(self):
- """
- 开始处理任务
- """
- task_list = self.get_task_list()
- for task in tqdm(task_list):
- try:
- self.process_each_task(task)
- except Exception as e:
- print(e)
- continue
|