top_article_generalize.py 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194
  1. import time
  2. import datetime
  3. from tqdm import tqdm
  4. from typing import List, Dict
  5. from pymysql.cursors import DictCursor
  6. from applications import aiditApi
  7. from applications.api import fetch_deepseek_completion
  8. from applications.api import similarity_between_title_list
  9. from applications.db import DatabaseConnector
  10. from config import long_articles_config, denet_config
  11. extract_keywords_prompt = """
  12. 你是一名优秀的中文专家
  13. ## 任务说明
  14. 需要你从输入的标题和总结中提取3个搜索词
  15. ### 输出
  16. 输出结构为JSON,格式如下
  17. {output_format}
  18. ## 输入
  19. 标题:{title}
  20. 总结:{summary}
  21. """
  22. class TopArticleGeneralize:
  23. def __init__(self):
  24. self.long_articles_client = DatabaseConnector(long_articles_config)
  25. self.long_articles_client.connect()
  26. self.denet_client = DatabaseConnector(denet_config)
  27. self.denet_client.connect()
  28. def fetch_distinct_top_titles(self) -> List[Dict]:
  29. """
  30. 获取top100生成计划中的文章标题
  31. """
  32. fetch_query = f"""
  33. select distinct title, source_id
  34. from datastat_sort_strategy
  35. where produce_plan_name = 'TOP100' and source_id is not null;
  36. """
  37. return self.long_articles_client.fetch(fetch_query, cursor_type=DictCursor)
  38. def get_title_read_info_detail(self, title: str) -> bool:
  39. """
  40. 获取标题最近3篇文章的阅读均值倍数
  41. """
  42. fetch_query = f"""
  43. select read_rate
  44. from datastat_sort_strategy
  45. where produce_plan_name = 'TOP100' and title = '{title}'
  46. order by date_str desc limit 3;
  47. """
  48. fetch_response = self.long_articles_client.fetch(
  49. fetch_query, cursor_type=DictCursor
  50. )
  51. read_rate_list = [i["read_rate"] for i in fetch_response]
  52. for read_rate in read_rate_list:
  53. if read_rate < 1.2:
  54. return False
  55. return True
  56. def get_article_summary(self, source_id: str) -> str:
  57. """
  58. use source_id to get article summary
  59. """
  60. fetch_query = f"""
  61. select output
  62. from produce_plan_module_output
  63. where plan_exe_id = '{source_id}' and produce_module_type = 18;
  64. """
  65. fetch_response = self.denet_client.fetch(fetch_query, cursor_type=DictCursor)
  66. return fetch_response[0]["output"]
  67. def get_keys_by_ai(self, title_obj: Dict) -> List[str]:
  68. """
  69. 获取关键词
  70. """
  71. title = title_obj["title"]
  72. source_id = title_obj["source_id"]
  73. article_summary = self.get_article_summary(source_id)
  74. output_format = {"keys": ["key1", "key2", "key3"]}
  75. prompt = extract_keywords_prompt.format(
  76. output_format=output_format, title=title, summary=article_summary
  77. )
  78. response = fetch_deepseek_completion(
  79. model="deepseek-V3", prompt=prompt, output_type="json"
  80. )
  81. return response["keys"]
  82. class TopArticleGeneralizeFromArticlePool(TopArticleGeneralize):
  83. def get_candidate_articles(self, key):
  84. fetch_query = f"""
  85. select article_id, title, link, llm_sensitivity, score, category_by_ai
  86. from crawler_meta_article
  87. where status = 1
  88. and title_sensitivity = 0
  89. and title like '%{key}%';
  90. """
  91. fetch_response = self.long_articles_client.fetch(
  92. fetch_query, cursor_type=DictCursor
  93. )
  94. return fetch_response
  95. def change_article_status_while_publishing(self, article_id_list):
  96. """
  97. :param: article_id_list: 文章的唯一 id
  98. :return:
  99. """
  100. update_sql = f"""
  101. update crawler_meta_article
  102. set status = %s
  103. where article_id in %s and status = %s;
  104. """
  105. affect_rows = self.long_articles_client.save(
  106. query=update_sql, params=(2, tuple(article_id_list), 1)
  107. )
  108. def deal(self):
  109. title_obj_list = self.fetch_distinct_top_titles()
  110. publishing_article_list = []
  111. for title_obj in tqdm(title_obj_list):
  112. if self.get_title_read_info_detail(title_obj["title"]):
  113. temp = []
  114. keys = self.get_keys_by_ai(title_obj)
  115. for key in keys:
  116. candidate_articles = self.get_candidate_articles(key)
  117. temp += candidate_articles
  118. if temp:
  119. title_list = [i["title"] for i in temp]
  120. # 相关性排序
  121. similarity_array = similarity_between_title_list(
  122. title_list, [title_obj["title"]]
  123. )
  124. response_with_similarity_list = []
  125. for index, item in enumerate(temp):
  126. item["similarity"] = similarity_array[index][0]
  127. response_with_similarity_list.append(item)
  128. sorted_response_with_similarity_list = sorted(
  129. response_with_similarity_list,
  130. key=lambda k: k["similarity"],
  131. reverse=True,
  132. )
  133. publishing_article_list += sorted_response_with_similarity_list[
  134. :10
  135. ]
  136. url_list = [i["link"] for i in publishing_article_list]
  137. if url_list:
  138. # create_crawler_plan
  139. crawler_plan_response = aiditApi.auto_create_crawler_task(
  140. plan_id=None,
  141. plan_name="自动绑定-Top内容泛化-{}--{}".format(
  142. datetime.date.today().__str__(), len(url_list)
  143. ),
  144. plan_tag="Top内容泛化",
  145. article_source="weixin",
  146. url_list=url_list,
  147. )
  148. # save to db
  149. crawler_plan_id = crawler_plan_response["data"]["id"]
  150. crawler_plan_name = crawler_plan_response["data"]["name"]
  151. # auto bind to generate plan
  152. new_crawler_task_list = [
  153. {
  154. "contentType": 1,
  155. "inputSourceType": 2,
  156. "inputSourceSubType": None,
  157. "fieldName": None,
  158. "inputSourceValue": crawler_plan_id,
  159. "inputSourceLabel": crawler_plan_name,
  160. "inputSourceModal": 3,
  161. "inputSourceChannel": 5,
  162. }
  163. ]
  164. # 绑定至生成计划
  165. generate_plan_response = aiditApi.bind_crawler_task_to_generate_task(
  166. crawler_task_list=new_crawler_task_list,
  167. generate_task_id="20250703081329508785665",
  168. )
  169. # change article status
  170. article_id_list = [i["article_id"] for i in publishing_article_list]
  171. self.change_article_status_while_publishing(article_id_list=article_id_list)