article_summary_task.py 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115
  1. """
  2. @author: luojunhui
  3. """
  4. from pymysql.cursors import DictCursor
  5. from tqdm import tqdm
  6. from applications.api import deep_seek_api
  7. from applications.db import DatabaseConnector
  8. from config import long_articles_config
  9. def generate_prompt(text):
  10. """
  11. 生成prompt
  12. """
  13. prompt = f"""
  14. 你是1个优秀的公众号文章写作大师,我对你有以下要求
  15. 文章: {text}
  16. 1.请仔细阅读以上公众号文章,挑选文章中最吸引人的情节或话题,总结为100字左右文章精彩总结(字数计算包括标点符号)。
  17. 句子段落之间以悬念承接,可以吸引读者往下读第二句。
  18. 2.在这100字内容的结尾处,增加1-2句话的引导,引导大家去观看上面的视频了解详情。注意是点击上面的视频,不是下面的视频。
  19. 你最终输出一段总结内容,不用加标题或者主题,也不用写第几段、多少字这样的话。整体的语言风格要口语化、直接点,要让60岁以上的老年人能看懂、能共情。人的名字尽量用全名,不用简称。
  20. """
  21. return prompt
  22. class ArticleSummaryTask(object):
  23. """
  24. 文章总结任务
  25. """
  26. def __init__(self):
  27. self.db_client = None
  28. def connect_db(self):
  29. """
  30. 连接数据库
  31. """
  32. self.db_client = DatabaseConnector(db_config=long_articles_config)
  33. self.db_client.connect()
  34. def get_task_list(self):
  35. """
  36. 获取任务列表
  37. """
  38. select_sql = f"""
  39. select t1.video_text, t2.audit_video_id
  40. from video_content_understanding t1 join publish_single_video_source t2 on t1.pq_vid = t2.audit_video_id
  41. where t1.status = 2 and t2.bad_status = 0 and t2.extract_status = 0;
  42. """
  43. task_list = self.db_client.fetch(select_sql, cursor_type=DictCursor)
  44. return task_list
  45. def process_each_task(self, task):
  46. """
  47. task: {
  48. "video_text": "视频内容",
  49. "audit_video_id": "视频id"
  50. }
  51. """
  52. video_text = task["video_text"]
  53. audit_video_id = task["audit_video_id"]
  54. # 开始处理,将extract_status更新为101
  55. update_sql = f"""
  56. update publish_single_video_source set extract_status = %s where audit_video_id = %s and extract_status = %s
  57. """
  58. affected_rows = self.db_client.save(
  59. query=update_sql,
  60. params=(101, audit_video_id, 0)
  61. )
  62. if not affected_rows:
  63. return
  64. # 生成prompt
  65. prompt = generate_prompt(video_text)
  66. response = deep_seek_api(model="DeepSeek-R1", prompt=prompt)
  67. if response:
  68. update_sql = f"""
  69. update publish_single_video_source
  70. set extract_status = %s, summary_text = %s
  71. where audit_video_id = %s and extract_status = %s;
  72. """
  73. affected_rows = self.db_client.save(
  74. query=update_sql,
  75. params=(2, response.strip(), audit_video_id, 101)
  76. )
  77. print(affected_rows)
  78. else:
  79. update_sql = f"""
  80. update publish_single_video_source
  81. set extract_status = %s
  82. where audit_video_id = %s and extract_status = %s;
  83. """
  84. affected_rows = self.db_client.save(
  85. query=update_sql,
  86. params=(99, audit_video_id, 101)
  87. )
  88. print(affected_rows)
  89. def deal(self):
  90. """
  91. 开始处理任务
  92. """
  93. task_list = self.get_task_list()
  94. for task in tqdm(task_list):
  95. try:
  96. self.process_each_task(task)
  97. except Exception as e:
  98. print(e)
  99. continue