article_summary_task.py 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146
  1. """
  2. @author: luojunhui
  3. """
  4. from pymysql.cursors import DictCursor
  5. from tqdm import tqdm
  6. from applications.api import deep_seek_api
  7. from applications.const import VideoToTextConst
  8. from applications.db import DatabaseConnector
  9. from config import long_articles_config
  10. const = VideoToTextConst()
  11. def generate_prompt(text):
  12. """
  13. 生成prompt
  14. """
  15. prompt = f"""
  16. 你是1个优秀的公众号文章写作大师,我对你有以下要求
  17. 文章: {text}
  18. 1.请仔细阅读以上公众号文章,挑选文章中最吸引人的情节或话题,总结为100字左右文章精彩总结(字数计算包括标点符号)。
  19. 句子段落之间以悬念承接,可以吸引读者往下读第二句。
  20. 2.在这100字内容的结尾处,增加1-2句话的引导,引导大家去观看上面的视频了解详情。注意是点击上面的视频,不是下面的视频。
  21. 你最终输出一段总结内容,不用加标题或者主题,也不用写第几段、多少字这样的话。整体的语言风格要口语化、直接点,要让60岁以上的老年人能看懂、能共情。人的名字尽量用全名,不用简称。
  22. """
  23. return prompt
  24. class ArticleSummaryTask(object):
  25. """
  26. 文章总结任务
  27. """
  28. def __init__(self):
  29. self.db_client = None
  30. def connect_db(self):
  31. """
  32. 连接数据库
  33. """
  34. self.db_client = DatabaseConnector(db_config=long_articles_config)
  35. self.db_client.connect()
  36. def get_task_list(self):
  37. """
  38. 获取任务列表
  39. """
  40. select_sql = f"""
  41. select t1.video_text, t2.audit_video_id
  42. from video_content_understanding t1
  43. join publish_single_video_source t2
  44. on t1.pq_vid = t2.audit_video_id
  45. where t1.status = {const.VIDEO_UNDERSTAND_SUCCESS_STATUS}
  46. and t2.bad_status = {const.ARTICLE_GOOD_STATUS}
  47. and t2.extract_status = {const.EXTRACT_INIT_STATUS};
  48. """
  49. task_list = self.db_client.fetch(select_sql, cursor_type=DictCursor)
  50. return task_list
  51. def process_each_task(self, task):
  52. """
  53. 处理每个任务
  54. """
  55. video_text = task["video_text"]
  56. audit_video_id = task["audit_video_id"]
  57. # 开始处理,将extract_status更新为101
  58. update_sql = f"""
  59. update publish_single_video_source
  60. set extract_status = %s
  61. where audit_video_id = %s and extract_status = %s;
  62. """
  63. affected_rows = self.db_client.save(
  64. query=update_sql,
  65. params=(const.EXTRACT_PROCESSING_STATUS, audit_video_id, const.EXTRACT_INIT_STATUS)
  66. )
  67. if not affected_rows:
  68. return
  69. try:
  70. # 生成prompt
  71. prompt = generate_prompt(video_text)
  72. response = deep_seek_api(model="DeepSeek-R1", prompt=prompt)
  73. if response:
  74. update_sql = f"""
  75. update publish_single_video_source
  76. set extract_status = %s, summary_text = %s
  77. where audit_video_id = %s and extract_status = %s;
  78. """
  79. affected_rows = self.db_client.save(
  80. query=update_sql,
  81. params=(
  82. const.EXTRACT_SUCCESS_STATUS,
  83. response.strip(),
  84. audit_video_id,
  85. const.EXTRACT_PROCESSING_STATUS
  86. )
  87. )
  88. print(affected_rows)
  89. else:
  90. update_sql = f"""
  91. update publish_single_video_source
  92. set extract_status = %s
  93. where audit_video_id = %s and extract_status = %s;
  94. """
  95. affected_rows = self.db_client.save(
  96. query=update_sql,
  97. params=(
  98. const.EXTRACT_FAIL_STATUS,
  99. audit_video_id,
  100. const.EXTRACT_PROCESSING_STATUS
  101. )
  102. )
  103. print(affected_rows)
  104. except Exception as e:
  105. print(e)
  106. # set as fail
  107. update_sql = f"""
  108. update publish_single_video_source
  109. set extract_status = %s
  110. where audit_video_id = %s and extract_status = %s;
  111. """
  112. self.db_client.save(
  113. query=update_sql,
  114. params=(
  115. const.EXTRACT_FAIL_STATUS,
  116. audit_video_id,
  117. const.EXTRACT_PROCESSING_STATUS
  118. )
  119. )
  120. def deal(self):
  121. """
  122. 开始处理任务
  123. """
  124. task_list = self.get_task_list()
  125. for task in tqdm(task_list):
  126. try:
  127. self.process_each_task(task)
  128. except Exception as e:
  129. print(e)
  130. continue