summary_text.py 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165
  1. """
  2. @author: luojunhui
  3. """
  4. import time
  5. import datetime
  6. import traceback
  7. from pymysql.cursors import DictCursor
  8. from tqdm import tqdm
  9. from applications import log
  10. from applications.api import fetch_deepseek_response
  11. from applications.const import VideoToTextConst
  12. from applications.db import DatabaseConnector
  13. from config import long_articles_config
  14. from coldStartTasks.ai_pipeline.basic import generate_summary_prompt
  15. from coldStartTasks.ai_pipeline.basic import update_video_pool_status
  16. from coldStartTasks.ai_pipeline.basic import update_task_queue_status
  17. const = VideoToTextConst()
  18. class ArticleSummaryTask(object):
  19. """
  20. 文章总结任务
  21. """
  22. def __init__(self):
  23. self.db_client = DatabaseConnector(db_config=long_articles_config)
  24. self.db_client.connect()
  25. def get_summary_task_list(self) -> list[dict]:
  26. """
  27. 获取任务列表
  28. """
  29. fetch_query = f"""
  30. select id, content_trace_id, video_text
  31. from video_content_understanding
  32. where summary_status = {const.INIT_STATUS} and understanding_status = {const.SUCCESS_STATUS}
  33. limit {const.SUMMARY_BATCH_SIZE};
  34. """
  35. task_list = self.db_client.fetch(query=fetch_query, cursor_type=DictCursor)
  36. return task_list
  37. def rollback_lock_tasks(self) -> int:
  38. """
  39. rollback tasks which have been locked for a long time
  40. """
  41. now_timestamp = int(time.time())
  42. timestamp_threshold = now_timestamp - const.MAX_PROCESSING_TIME
  43. update_sql = f"""
  44. update video_content_understanding
  45. set summary_status = %s
  46. where summary_status = %s and summary_status_ts < %s;
  47. """
  48. rollback_rows = self.db_client.save(
  49. query=update_sql,
  50. params=(const.INIT_STATUS, const.PROCESSING_STATUS, timestamp_threshold),
  51. )
  52. return rollback_rows
  53. def handle_task_execution(self, task):
  54. """
  55. :param task: keys: [id, video_text]
  56. """
  57. task_id = task["id"]
  58. content_trace_id = task["content_trace_id"]
  59. video_text = task["video_text"]
  60. # Lock Task
  61. affected_rows = update_task_queue_status(
  62. db_client=self.db_client,
  63. task_id=task_id,
  64. process="summary",
  65. ori_status=const.INIT_STATUS,
  66. new_status=const.PROCESSING_STATUS,
  67. )
  68. if not affected_rows:
  69. return
  70. # fetch summary text from AI
  71. try:
  72. # generate prompt
  73. prompt = generate_summary_prompt(video_text)
  74. # get result from deep seek AI
  75. result = fetch_deepseek_response(model="DeepSeek-R1", prompt=prompt)
  76. if result:
  77. # set as success and update summary text
  78. self.set_summary_text_for_task(task_id, result.strip())
  79. task_status = const.SUCCESS_STATUS
  80. else:
  81. # set as fail
  82. update_task_queue_status(
  83. db_client=self.db_client,
  84. task_id=task_id,
  85. process="summary",
  86. ori_status=const.PROCESSING_STATUS,
  87. new_status=const.FAIL_STATUS,
  88. )
  89. task_status = const.FAIL_STATUS
  90. except Exception as e:
  91. # set as fail
  92. update_task_queue_status(
  93. db_client=self.db_client,
  94. task_id=task_id,
  95. process="summary",
  96. ori_status=const.PROCESSING_STATUS,
  97. new_status=const.FAIL_STATUS,
  98. )
  99. task_status = const.FAIL_STATUS
  100. log(
  101. task="article_summary_task",
  102. function="fetch_deepseek_response",
  103. message="fetch_deepseek_response failed",
  104. data={"error": str(e), "trace_back": traceback.format_exc()},
  105. )
  106. # update video pool status
  107. update_video_pool_status(
  108. self.db_client, content_trace_id, const.PROCESSING_STATUS, task_status
  109. )
  110. def set_summary_text_for_task(self, task_id, text):
  111. """
  112. successfully get summary text and update summary text to database
  113. """
  114. update_sql = f"""
  115. update video_content_understanding
  116. set summary_status = %s, summary_text = %s, understanding_status_ts = %s
  117. where id = %s and summary_status = %s;
  118. """
  119. affected_rows = self.db_client.save(
  120. query=update_sql,
  121. params=(
  122. const.SUCCESS_STATUS,
  123. text,
  124. datetime.datetime.now(),
  125. task_id,
  126. const.PROCESSING_STATUS,
  127. ),
  128. )
  129. return affected_rows
  130. def deal(self):
  131. """
  132. entrance function for this class
  133. """
  134. # first of all rollback tasks which have been locked for a long time
  135. rollback_rows = self.rollback_lock_tasks()
  136. tqdm.write("rollback_lock_tasks: {}".format(rollback_rows))
  137. # get task list
  138. task_list = self.get_summary_task_list()
  139. for task in tqdm(task_list, desc="handle each task"):
  140. try:
  141. self.handle_task_execution(task=task)
  142. except Exception as e:
  143. log(
  144. task="article_summary_task",
  145. function="deal",
  146. message="fetch_deepseek_response",
  147. data={"error": str(e), "trace_back": traceback.format_exc()},
  148. )