article_summary_task.py 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163
  1. """
  2. @author: luojunhui
  3. """
  4. import time
  5. import datetime
  6. import traceback
  7. from pymysql.cursors import DictCursor
  8. from tqdm import tqdm
  9. from applications.api import fetch_deepseek_response
  10. from applications.const import VideoToTextConst
  11. from applications.db import DatabaseConnector
  12. from config import long_articles_config
  13. const = VideoToTextConst()
  14. def generate_prompt(text):
  15. """
  16. 生成prompt
  17. """
  18. prompt = f"""
  19. 你是1个优秀的公众号文章写作大师,我对你有以下要求
  20. 视频总结:{text}
  21. 第一个要求:请仔细阅读以上视频总结,挑选其中最吸引人的情节或话题,总结为100字左右文章精彩总结(字数计算包括标点符号),这部分内容为段落1。
  22. 句子段落之间以悬念承接,可以吸引读者往下读第二句。
  23. 第二个要求:在这100字内容的结尾处,增加1-2句话的引导,引导大家去观看上面的视频了解详情,可以加一些emoji表情。注意是点击上面的视频,不是下面的视频。这部分内容为段落2。
  24. 你最终输出一段总结内容,将第一段和第二段之间空格一行,并且对所有文字进行加粗处理。不用加标题或者主题,也不用写第几段、多少字这样的话。整体的语言风格要口语化、直接点,要让60岁以上的老年人能看懂、能共情。人的名字尽量用全名,不用简称。
  25. """
  26. return prompt
  27. class ArticleSummaryTask(object):
  28. """
  29. 文章总结任务
  30. """
  31. def __init__(self):
  32. self.db_client = DatabaseConnector(db_config=long_articles_config)
  33. self.db_client.connect()
  34. def get_task_list(self):
  35. """
  36. 获取任务列表
  37. """
  38. select_sql = f"""
  39. select id, video_text
  40. from video_content_understanding
  41. where summary_status = {const.INIT_STATUS} and understanding_status = {const.SUCCESS_STATUS}
  42. limit {const.SUMMARY_BATCH_SIZE};
  43. """
  44. task_list = self.db_client.fetch(select_sql, cursor_type=DictCursor)
  45. return task_list
  46. def rollback_lock_tasks(self):
  47. """
  48. rollback tasks which have been locked for a long time
  49. """
  50. now_timestamp = int(time.time())
  51. timestamp_threshold = now_timestamp - const.MAX_PROCESSING_TIME
  52. update_sql = f"""
  53. update video_content_understanding
  54. set summary_status = %s
  55. where summary_status = %s and summary_status_ts < %s;
  56. """
  57. rollback_rows = self.db_client.save(
  58. query=update_sql,
  59. params=(const.INIT_STATUS, const.PROCESSING_STATUS, timestamp_threshold),
  60. )
  61. return rollback_rows
  62. def handle_task_execution(self, task):
  63. """
  64. :param task: keys: [id, video_text]
  65. """
  66. task_id = task["id"]
  67. video_text = task["video_text"]
  68. # Lock Task
  69. affected_rows = self.update_task_status(
  70. task_id, const.INIT_STATUS, const.PROCESSING_STATUS
  71. )
  72. if not affected_rows:
  73. return
  74. try:
  75. # generate prompt
  76. prompt = generate_prompt(video_text)
  77. # get result from deep seek AI
  78. result = fetch_deepseek_response(model="DeepSeek-R1", prompt=prompt)
  79. if result:
  80. # set as success and update summary text
  81. self.set_summary_text_for_task(task_id, result.strip())
  82. else:
  83. # set as fail
  84. self.update_task_status(
  85. task_id, const.PROCESSING_STATUS, const.FAIL_STATUS
  86. )
  87. except Exception as e:
  88. print(e)
  89. print(traceback.format_exc())
  90. # set as fail
  91. self.update_task_status(
  92. task_id, const.PROCESSING_STATUS, const.FAIL_STATUS
  93. )
  94. def set_summary_text_for_task(self, task_id, text):
  95. """
  96. successfully get summary text and update summary text to database
  97. """
  98. update_sql = f"""
  99. update video_content_understanding
  100. set summary_status = %s, summary_text = %s, understanding_status_ts = %s
  101. where id = %s and summary_status = %s;
  102. """
  103. affected_rows = self.db_client.save(
  104. query=update_sql,
  105. params=(
  106. const.SUCCESS_STATUS,
  107. text,
  108. datetime.datetime.now(),
  109. task_id,
  110. const.PROCESSING_STATUS
  111. ),
  112. )
  113. return affected_rows
  114. def update_task_status(self, task_id, ori_status, new_status):
  115. """
  116. 修改任务状态
  117. """
  118. update_sql = f"""
  119. update video_content_understanding
  120. set summary_status = %s, summary_status_ts = %s
  121. where id = %s and summary_status = %s;
  122. """
  123. update_rows = self.db_client.save(
  124. update_sql, (new_status, datetime.datetime.now(), task_id, ori_status)
  125. )
  126. return update_rows
  127. def deal(self):
  128. """
  129. entrance function for this class
  130. """
  131. # first of all rollback tasks which have been locked for a long time
  132. rollback_rows = self.rollback_lock_tasks()
  133. print("rollback_lock_tasks: {}".format(rollback_rows))
  134. # get task list
  135. task_list = self.get_task_list()
  136. for task in tqdm(task_list, desc="handle each task"):
  137. try:
  138. self.handle_task_execution(task=task)
  139. except Exception as e:
  140. print("error: {}".format(e))
  141. print(traceback.format_exc())