article_summary_task.py 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167
  1. """
  2. @author: luojunhui
  3. """
  4. import time
  5. import traceback
  6. from pymysql.cursors import DictCursor
  7. from tqdm import tqdm
  8. from applications.api import fetch_deepseek_response
  9. from applications.const import VideoToTextConst
  10. from applications.db import DatabaseConnector
  11. from config import long_articles_config
  12. const = VideoToTextConst()
  13. def generate_prompt(text):
  14. """
  15. 生成prompt
  16. """
  17. prompt = f"""
  18. 你是1个优秀的公众号文章写作大师,我对你有以下要求
  19. 文章: {text}
  20. 1.请仔细阅读以上公众号文章,挑选文章中最吸引人的情节或话题,总结为100字左右文章精彩总结(字数计算包括标点符号)。
  21. 句子段落之间以悬念承接,可以吸引读者往下读第二句。
  22. 2.在这100字内容的结尾处,增加1-2句话的引导,引导大家去观看上面的视频了解详情。注意是点击上面的视频,不是下面的视频。
  23. 你最终输出一段总结内容,不用加标题或者主题,也不用写第几段、多少字这样的话。整体的语言风格要口语化、直接点,要让60岁以上的老年人能看懂、能共情。人的名字尽量用全名,不用简称。
  24. """
  25. return prompt
  26. class ArticleSummaryTask(object):
  27. """
  28. 文章总结任务
  29. """
  30. def __init__(self):
  31. self.db_client = None
  32. def connect_db(self):
  33. """
  34. 连接数据库
  35. """
  36. self.db_client = DatabaseConnector(db_config=long_articles_config)
  37. self.db_client.connect()
  38. def get_task_list(self):
  39. """
  40. 获取任务列表
  41. """
  42. select_sql = f"""
  43. select id, video_text
  44. from video_content_understanding
  45. where summary_status = {const.SUMMARY_INIT_STATUS} and status = {const.VIDEO_UNDERSTAND_SUCCESS_STATUS}
  46. limit {const.SUMMARY_BATCH_SIZE};
  47. """
  48. task_list = self.db_client.fetch(select_sql, cursor_type=DictCursor)
  49. return task_list
  50. def rollback_lock_tasks(self):
  51. """
  52. rollback tasks which have been locked for a long time
  53. """
  54. now_timestamp = int(time.time())
  55. timestamp_threshold = now_timestamp - const.MAX_PROCESSING_TIME
  56. update_sql = f"""
  57. update video_content_understanding
  58. set summary_status = %s
  59. where summary_status = %s and status_update_timestamp < %s;
  60. """
  61. rollback_rows = self.db_client.save(
  62. query=update_sql,
  63. params=(const.SUMMARY_INIT_STATUS, const.SUMMARY_LOCK, timestamp_threshold),
  64. )
  65. return rollback_rows
  66. def handle_task_execution(self, task):
  67. """
  68. :param task: keys: [id, video_text]
  69. """
  70. task_id = task["id"]
  71. video_text = task["video_text"]
  72. # Lock Task
  73. affected_rows = self.update_task_status(
  74. task_id, const.SUMMARY_INIT_STATUS, const.SUMMARY_LOCK
  75. )
  76. if not affected_rows:
  77. return
  78. try:
  79. # generate prompt
  80. prompt = generate_prompt(video_text)
  81. # get result from deep seek AI
  82. result = fetch_deepseek_response(model="DeepSeek-R1", prompt=prompt)
  83. if result:
  84. # set as success and update summary text
  85. self.set_summary_text_for_task(task_id, result.strip())
  86. else:
  87. # set as fail
  88. self.update_task_status(
  89. task_id, const.SUMMARY_LOCK, const.SUMMARY_FAIL_STATUS
  90. )
  91. except Exception as e:
  92. print(e)
  93. print(traceback.format_exc())
  94. # set as fail
  95. self.update_task_status(
  96. task_id, const.SUMMARY_LOCK, const.SUMMARY_FAIL_STATUS
  97. )
  98. def set_summary_text_for_task(self, task_id, text):
  99. """
  100. successfully get summary text and update summary text to database
  101. """
  102. update_sql = f"""
  103. update video_content_understanding
  104. set summary_status = %s, summary_text = %s, status_update_timestamp = %s
  105. where id = %s and summary_status = %s;
  106. """
  107. affected_rows = self.db_client.save(
  108. query=update_sql,
  109. params=(
  110. const.SUMMARY_SUCCESS_STATUS,
  111. text,
  112. int(time.time()),
  113. task_id,
  114. const.SUMMARY_LOCK,
  115. ),
  116. )
  117. return affected_rows
  118. def update_task_status(self, task_id, ori_status, new_status):
  119. """
  120. 修改任务状态
  121. """
  122. update_sql = f"""
  123. update video_content_understanding
  124. set summary_status = %s, status_update_timestamp = %s
  125. where id = %s and summary_status = %s;
  126. """
  127. update_rows = self.db_client.save(
  128. update_sql, (new_status, int(time.time()), task_id, ori_status)
  129. )
  130. return update_rows
  131. def deal(self):
  132. """
  133. entrance function for this class
  134. """
  135. # first of all rollback tasks which have been locked for a long time
  136. rollback_rows = self.rollback_lock_tasks()
  137. print("rollback_lock_tasks: {}".format(rollback_rows))
  138. # get task list
  139. task_list = self.get_task_list()
  140. for task in tqdm(task_list, desc="handle each task"):
  141. try:
  142. self.handle_task_execution(task=task)
  143. except Exception as e:
  144. print("error: {}".format(e))
  145. print(traceback.format_exc())