top_article_generalize.py 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193
  1. import time
  2. import datetime
  3. from typing import List, Dict
  4. from pymysql.cursors import DictCursor
  5. from applications import aiditApi
  6. from applications.api import fetch_deepseek_completion
  7. from applications.api import similarity_between_title_list
  8. from applications.db import DatabaseConnector
  9. from config import long_articles_config, denet_config
  10. extract_keywords_prompt = """
  11. 你是一名优秀的中文专家
  12. ## 任务说明
  13. 需要你从输入的标题和总结中提取3个搜索词
  14. ### 输出
  15. 输出结构为JSON,格式如下
  16. {output_format}
  17. ## 输入
  18. 标题:{title}
  19. 总结:{summary}
  20. """
  21. class TopArticleGeneralize:
  22. def __init__(self):
  23. self.long_articles_client = DatabaseConnector(long_articles_config)
  24. self.long_articles_client.connect()
  25. self.denet_client = DatabaseConnector(denet_config)
  26. self.denet_client.connect()
  27. def fetch_distinct_top_titles(self) -> List[Dict]:
  28. """
  29. 获取top100生成计划中的文章标题
  30. """
  31. fetch_query = f"""
  32. select distinct title, source_id
  33. from datastat_sort_strategy
  34. where produce_plan_name = 'TOP100' and source_id is not null;
  35. """
  36. return self.long_articles_client.fetch(fetch_query, cursor_type=DictCursor)
  37. def get_title_read_info_detail(self, title: str) -> bool:
  38. """
  39. 获取标题最近3篇文章的阅读均值倍数
  40. """
  41. fetch_query = f"""
  42. select read_rate
  43. from datastat_sort_strategy
  44. where produce_plan_name = 'TOP100' and title = '{title}'
  45. order by date_str desc limit 3;
  46. """
  47. fetch_response = self.long_articles_client.fetch(
  48. fetch_query, cursor_type=DictCursor
  49. )
  50. read_rate_list = [i["read_rate"] for i in fetch_response]
  51. for read_rate in read_rate_list:
  52. if read_rate < 1.2:
  53. return False
  54. return True
  55. def get_article_summary(self, source_id: str) -> str:
  56. """
  57. use source_id to get article summary
  58. """
  59. fetch_query = f"""
  60. select output
  61. from produce_plan_module_output
  62. where plan_exe_id = '{source_id}' and produce_module_type = 18;
  63. """
  64. fetch_response = self.denet_client.fetch(fetch_query, cursor_type=DictCursor)
  65. return fetch_response[0]["output"]
  66. def get_keys_by_ai(self, title_obj: Dict) -> List[str]:
  67. """
  68. 获取关键词
  69. """
  70. title = title_obj["title"]
  71. source_id = title_obj["source_id"]
  72. article_summary = self.get_article_summary(source_id)
  73. output_format = {"keys": ["key1", "key2", "key3"]}
  74. prompt = extract_keywords_prompt.format(
  75. output_format=output_format, title=title, summary=article_summary
  76. )
  77. response = fetch_deepseek_completion(
  78. model="deepseek-V3", prompt=prompt, output_type="json"
  79. )
  80. return response["keys"]
  81. class TopArticleGeneralizeFromArticlePool(TopArticleGeneralize):
  82. def get_candidate_articles(self, key):
  83. fetch_query = f"""
  84. select article_id, title, link, llm_sensitivity, score, category_by_ai
  85. from crawler_meta_article
  86. where status = 1
  87. and title_sensitivity = 0
  88. and title like '%{key}%';
  89. """
  90. fetch_response = self.long_articles_client.fetch(
  91. fetch_query, cursor_type=DictCursor
  92. )
  93. return fetch_response
  94. def change_article_status_while_publishing(self, article_id_list):
  95. """
  96. :param: article_id_list: 文章的唯一 id
  97. :return:
  98. """
  99. update_sql = f"""
  100. update crawler_meta_article
  101. set status = %s
  102. where article_id in %s and status = %s;
  103. """
  104. affect_rows = self.long_articles_client.save(
  105. query=update_sql, params=(2, tuple(article_id_list), 1)
  106. )
  107. def deal(self):
  108. title_obj_list = self.fetch_distinct_top_titles()
  109. publishing_article_list = []
  110. for title_obj in title_obj_list:
  111. if self.get_title_read_info_detail(title_obj["title"]):
  112. temp = []
  113. keys = self.get_keys_by_ai(title_obj)
  114. for key in keys:
  115. candidate_articles = self.get_candidate_articles(key)
  116. temp += candidate_articles
  117. if temp:
  118. title_list = [i["title"] for i in temp]
  119. # 相关性排序
  120. similarity_array = similarity_between_title_list(
  121. title_list, [title_obj["title"]]
  122. )
  123. response_with_similarity_list = []
  124. for index, item in enumerate(temp):
  125. item["similarity"] = similarity_array[index][0]
  126. response_with_similarity_list.append(item)
  127. sorted_response_with_similarity_list = sorted(
  128. response_with_similarity_list,
  129. key=lambda k: k["similarity"],
  130. reverse=True,
  131. )
  132. publishing_article_list += sorted_response_with_similarity_list[
  133. :10
  134. ]
  135. url_list = [i["link"] for i in publishing_article_list]
  136. if url_list:
  137. # create_crawler_plan
  138. crawler_plan_response = aiditApi.auto_create_crawler_task(
  139. plan_id=None,
  140. plan_name="自动绑定-Top内容泛化-{}--{}".format(
  141. datetime.date.today().__str__(), len(url_list)
  142. ),
  143. plan_tag="Top内容泛化",
  144. article_source="weixin",
  145. url_list=url_list,
  146. )
  147. # save to db
  148. crawler_plan_id = crawler_plan_response["data"]["id"]
  149. crawler_plan_name = crawler_plan_response["data"]["name"]
  150. # auto bind to generate plan
  151. new_crawler_task_list = [
  152. {
  153. "contentType": 1,
  154. "inputSourceType": 2,
  155. "inputSourceSubType": None,
  156. "fieldName": None,
  157. "inputSourceValue": crawler_plan_id,
  158. "inputSourceLabel": crawler_plan_name,
  159. "inputSourceModal": 3,
  160. "inputSourceChannel": 5,
  161. }
  162. ]
  163. # 绑定至生成计划
  164. generate_plan_response = aiditApi.bind_crawler_task_to_generate_task(
  165. crawler_task_list=new_crawler_task_list,
  166. generate_task_id="20250703081329508785665",
  167. )
  168. # change article status
  169. article_id_list = [i["article_id"] for i in publishing_article_list]
  170. self.change_article_status_while_publishing(article_id_list=article_id_list)