123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163 |
- """
- @author: luojunhui
- """
- import time
- import datetime
- import traceback
- from pymysql.cursors import DictCursor
- from tqdm import tqdm
- from applications.api import fetch_deepseek_response
- from applications.const import VideoToTextConst
- from applications.db import DatabaseConnector
- from config import long_articles_config
- const = VideoToTextConst()
- def generate_prompt(text):
- """
- 生成prompt
- """
- prompt = f"""
- 你是1个优秀的公众号文章写作大师,我对你有以下要求
- 视频总结:{text}
-
- 第一个要求:请仔细阅读以上视频总结,挑选其中最吸引人的情节或话题,总结为100字左右文章精彩总结(字数计算包括标点符号),这部分内容为段落1。
- 句子段落之间以悬念承接,可以吸引读者往下读第二句。
-
- 第二个要求:在这100字内容的结尾处,增加1-2句话的引导,引导大家去观看上面的视频了解详情,可以加一些emoji表情。注意是点击上面的视频,不是下面的视频。这部分内容为段落2。
-
- 你最终输出一段总结内容,将第一段和第二段之间空格一行,并且对所有文字进行加粗处理。不用加标题或者主题,也不用写第几段、多少字这样的话。整体的语言风格要口语化、直接点,要让60岁以上的老年人能看懂、能共情。人的名字尽量用全名,不用简称。
- """
- return prompt
- class ArticleSummaryTask(object):
- """
- 文章总结任务
- """
- def __init__(self):
- self.db_client = DatabaseConnector(db_config=long_articles_config)
- self.db_client.connect()
- def get_task_list(self):
- """
- 获取任务列表
- """
- select_sql = f"""
- select id, video_text
- from video_content_understanding
- where summary_status = {const.INIT_STATUS} and understanding_status = {const.SUCCESS_STATUS}
- limit {const.SUMMARY_BATCH_SIZE};
- """
- task_list = self.db_client.fetch(select_sql, cursor_type=DictCursor)
- return task_list
- def rollback_lock_tasks(self):
- """
- rollback tasks which have been locked for a long time
- """
- now_timestamp = int(time.time())
- timestamp_threshold = now_timestamp - const.MAX_PROCESSING_TIME
- update_sql = f"""
- update video_content_understanding
- set summary_status = %s
- where summary_status = %s and summary_status_ts < %s;
- """
- rollback_rows = self.db_client.save(
- query=update_sql,
- params=(const.INIT_STATUS, const.PROCESSING_STATUS, timestamp_threshold),
- )
- return rollback_rows
- def handle_task_execution(self, task):
- """
- :param task: keys: [id, video_text]
- """
- task_id = task["id"]
- video_text = task["video_text"]
- # Lock Task
- affected_rows = self.update_task_status(
- task_id, const.INIT_STATUS, const.PROCESSING_STATUS
- )
- if not affected_rows:
- return
- try:
- # generate prompt
- prompt = generate_prompt(video_text)
- # get result from deep seek AI
- result = fetch_deepseek_response(model="DeepSeek-R1", prompt=prompt)
- if result:
- # set as success and update summary text
- self.set_summary_text_for_task(task_id, result.strip())
- else:
- # set as fail
- self.update_task_status(
- task_id, const.PROCESSING_STATUS, const.FAIL_STATUS
- )
- except Exception as e:
- print(e)
- print(traceback.format_exc())
- # set as fail
- self.update_task_status(
- task_id, const.PROCESSING_STATUS, const.FAIL_STATUS
- )
- def set_summary_text_for_task(self, task_id, text):
- """
- successfully get summary text and update summary text to database
- """
- update_sql = f"""
- update video_content_understanding
- set summary_status = %s, summary_text = %s, understanding_status_ts = %s
- where id = %s and summary_status = %s;
- """
- affected_rows = self.db_client.save(
- query=update_sql,
- params=(
- const.SUCCESS_STATUS,
- text,
- datetime.datetime.now(),
- task_id,
- const.PROCESSING_STATUS
- ),
- )
- return affected_rows
- def update_task_status(self, task_id, ori_status, new_status):
- """
- 修改任务状态
- """
- update_sql = f"""
- update video_content_understanding
- set summary_status = %s, summary_status_ts = %s
- where id = %s and summary_status = %s;
- """
- update_rows = self.db_client.save(
- update_sql, (new_status, datetime.datetime.now(), task_id, ori_status)
- )
- return update_rows
- def deal(self):
- """
- entrance function for this class
- """
- # first of all rollback tasks which have been locked for a long time
- rollback_rows = self.rollback_lock_tasks()
- print("rollback_lock_tasks: {}".format(rollback_rows))
- # get task list
- task_list = self.get_task_list()
- for task in tqdm(task_list, desc="handle each task"):
- try:
- self.handle_task_execution(task=task)
- except Exception as e:
- print("error: {}".format(e))
- print(traceback.format_exc())
|