article_summary_task.py 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120
  1. """
  2. @author: luojunhui
  3. """
  4. from pymysql.cursors import DictCursor
  5. from applications.api import deep_seek_api
  6. from applications.db import DatabaseConnector
  7. from config import long_articles_config
  8. def generate_prompt(text):
  9. """
  10. 生成prompt
  11. """
  12. prompt = f"""
  13. 你是1个优秀的公众号文章写作大师,我对你有以下要求
  14. 文章: {text}
  15. 1.请仔细阅读以上公众号文章,挑选文章中最吸引人的情节或话题,总结为100字左右文章精彩总结(字数计算包括标点符号)。
  16. 句子段落之间以悬念承接,可以吸引读者往下读第二句。
  17. 2.在这100字内容的结尾处,增加1-2句话的引导,引导大家去观看上面的视频了解详情。注意是点击上面的视频,不是下面的视频。
  18. 你最终输出一段总结内容,不用加标题或者主题,也不用写第几段、多少字这样的话。整体的语言风格要口语化、直接点,要让60岁以上的老年人能看懂、能共情。人的名字尽量用全名,不用简称。
  19. """
  20. return prompt
  21. class ArticleSummaryTask(object):
  22. """
  23. 文章总结任务
  24. """
  25. def __init__(self):
  26. self.db_client = None
  27. def connect_db(self):
  28. """
  29. 连接数据库
  30. """
  31. self.db_client = DatabaseConnector(db_config=long_articles_config)
  32. self.db_client.connect()
  33. def get_task_list(self):
  34. """
  35. 获取任务列表
  36. """
  37. select_sql = f"""
  38. select t1.video_text, t2.audit_video_id
  39. from video_content_understanding t1 join publish_single_video_source t2 on t1.pq_vid = t2.audit_video_id
  40. where t1.status = 2 and t2.bad_status = 0 and t2.extract_status = 1 limit 20;
  41. """
  42. task_list = self.db_client.fetch(select_sql, cursor_type=DictCursor)
  43. return task_list
  44. def process_each_task(self, task):
  45. """
  46. task: {
  47. "video_text": "视频内容",
  48. "audit_video_id": "视频id"
  49. }
  50. """
  51. video_text = task["video_text"]
  52. audit_video_id = task["audit_video_id"]
  53. # 开始处理,将extract_status更新为101
  54. update_sql = f"""
  55. update publish_single_video_source set extract_status = %s where audit_video_id = %s
  56. """
  57. affected_rows = self.db_client.save(
  58. query=update_sql,
  59. params=(101, audit_video_id)
  60. )
  61. if not affected_rows:
  62. return
  63. # 生成prompt
  64. prompt = generate_prompt(video_text)
  65. response = deep_seek_api(model="DeepSeek-R1", prompt=prompt)
  66. if response:
  67. update_sql = f"""
  68. update publish_single_video_source
  69. set extract_status = %s, summary_text = %s
  70. where audit_video_id = %s and extract_status = %s;
  71. """
  72. affected_rows = self.db_client.save(
  73. query=update_sql,
  74. params=(2, response, audit_video_id, 101)
  75. )
  76. print(affected_rows)
  77. else:
  78. update_sql = f"""
  79. update publish_single_video_source
  80. set extract_status = %s
  81. where audit_video_id = %s and extract_status = %s;
  82. """
  83. affected_rows = self.db_client.save(
  84. query=update_sql,
  85. params=(99, audit_video_id, 101)
  86. )
  87. print(affected_rows)
  88. def deal(self):
  89. """
  90. 开始处理任务
  91. """
  92. task_list = self.get_task_list()
  93. for task in task_list:
  94. try:
  95. self.process_each_task(task)
  96. except Exception as e:
  97. print(e)
  98. continue
  99. if __name__ == '__main__':
  100. article_summary_task = ArticleSummaryTask()
  101. article_summary_task.connect_db()
  102. article_summary_task.deal()