top_article_generalize.py 7.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200
  1. import time
  2. import datetime
  3. from typing import List, Dict
  4. from pymysql.cursors import DictCursor
  5. from applications import aiditApi
  6. from applications.api import fetch_deepseek_completion
  7. from applications.api import similarity_between_title_list
  8. from applications.db import DatabaseConnector
  9. from config import long_articles_config, denet_config
  10. extract_keywords_prompt = """
  11. 你是一名优秀的中文专家
  12. ## 任务说明
  13. 需要你从输入的标题和总结中提取3个搜索词
  14. ### 输出
  15. 输出结构为JSON,格式如下
  16. {output_format}
  17. ## 输入
  18. 标题:{title}
  19. 总结:{summary}
  20. """
  21. class TopArticleGeneralize:
  22. def __init__(self):
  23. self.long_articles_client = DatabaseConnector(long_articles_config)
  24. self.long_articles_client.connect()
  25. self.denet_client = DatabaseConnector(denet_config)
  26. self.denet_client.connect()
  27. def fetch_distinct_top_titles(self) -> List[Dict]:
  28. """
  29. 获取top100生成计划中的文章标题
  30. """
  31. fetch_query = f"""
  32. select distinct title, source_id
  33. from datastat_sort_strategy
  34. where produce_plan_name = 'TOP100' and source_id is not null;
  35. """
  36. return self.long_articles_client.fetch(fetch_query, cursor_type=DictCursor)
  37. def get_title_read_info_detail(self, title: str) -> bool:
  38. """
  39. 获取标题最近3篇文章的阅读均值倍数
  40. """
  41. fetch_query = f"""
  42. select read_rate
  43. from datastat_sort_strategy
  44. where produce_plan_name = 'TOP100' and title = '{title}'
  45. order by date_str desc limit 3;
  46. """
  47. fetch_response = self.long_articles_client.fetch(
  48. fetch_query, cursor_type=DictCursor
  49. )
  50. read_rate_list = [i["read_rate"] for i in fetch_response]
  51. for read_rate in read_rate_list:
  52. if read_rate < 1.2:
  53. return False
  54. return True
  55. def get_article_summary(self, source_id: str) -> str:
  56. """
  57. use source_id to get article summary
  58. """
  59. fetch_query = f"""
  60. select output
  61. from produce_plan_module_output
  62. where plan_exe_id = '{source_id}' and produce_module_type = 18;
  63. """
  64. fetch_response = self.denet_client.fetch(fetch_query, cursor_type=DictCursor)
  65. return fetch_response[0]["output"]
  66. def get_keys_by_ai(self, title_obj: Dict) -> List[str]:
  67. """
  68. 获取关键词
  69. """
  70. title = title_obj["title"]
  71. source_id = title_obj["source_id"]
  72. article_summary = self.get_article_summary(source_id)
  73. output_format = {"keys": ["key1", "key2", "key3"]}
  74. prompt = extract_keywords_prompt.format(
  75. output_format=output_format, title=title, summary=article_summary
  76. )
  77. response = fetch_deepseek_completion(
  78. model="deepseek-V3", prompt=prompt, output_type="json"
  79. )
  80. return response["keys"]
  81. class TopArticleGeneralizeFromArticlePool(TopArticleGeneralize):
  82. def get_candidate_articles(self, key):
  83. fetch_query = f"""
  84. select article_id, title, link, llm_sensitivity, score, category_by_ai
  85. from crawler_meta_article
  86. where status = 1
  87. and title_sensitivity = 0
  88. and title like '%{key}%';
  89. """
  90. fetch_response = self.long_articles_client.fetch(
  91. fetch_query, cursor_type=DictCursor
  92. )
  93. return fetch_response
  94. def change_article_status_while_publishing(self, article_id_list):
  95. """
  96. :param: article_id_list: 文章的唯一 id
  97. :return:
  98. """
  99. update_sql = f"""
  100. update crawler_meta_article
  101. set status = %s
  102. where article_id in %s and status = %s;
  103. """
  104. affect_rows = self.long_articles_client.save(
  105. query=update_sql, params=(2, tuple(article_id_list), 1)
  106. )
  107. def deal(self):
  108. title_obj_list = self.fetch_distinct_top_titles()
  109. publishing_article_list = []
  110. for title_obj in title_obj_list:
  111. if (
  112. title_obj["title"]
  113. == "母亲去世136天后,女子回到家,在锅盖上留下一句话,瞬间泪崩!"
  114. ):
  115. if self.get_title_read_info_detail(title_obj["title"]):
  116. temp = []
  117. keys = self.get_keys_by_ai(title_obj)
  118. for key in keys:
  119. candidate_articles = self.get_candidate_articles(key)
  120. temp += candidate_articles
  121. if temp:
  122. print(title_obj["title"])
  123. title_list = [i["title"] for i in temp]
  124. # 相关性排序
  125. similarity_array = similarity_between_title_list(
  126. title_list, [title_obj["title"]]
  127. )
  128. print(similarity_array)
  129. print(title_list)
  130. response_with_similarity_list = []
  131. for index, item in enumerate(temp):
  132. item["similarity"] = similarity_array[index][0]
  133. response_with_similarity_list.append(item)
  134. sorted_response_with_similarity_list = sorted(
  135. response_with_similarity_list,
  136. key=lambda k: k["similarity"],
  137. reverse=True,
  138. )
  139. publishing_article_list += sorted_response_with_similarity_list[
  140. :10
  141. ]
  142. url_list = [i["link"] for i in publishing_article_list]
  143. if url_list:
  144. # create_crawler_plan
  145. crawler_plan_response = aiditApi.auto_create_crawler_task(
  146. plan_id=None,
  147. plan_name="自动绑定-Top内容泛化-{}--{}".format(
  148. datetime.date.today().__str__(), len(url_list)
  149. ),
  150. plan_tag="Top内容泛化",
  151. article_source="weixin",
  152. url_list=url_list,
  153. )
  154. # save to db
  155. crawler_plan_id = crawler_plan_response["data"]["id"]
  156. crawler_plan_name = crawler_plan_response["data"]["name"]
  157. # auto bind to generate plan
  158. new_crawler_task_list = [
  159. {
  160. "contentType": 1,
  161. "inputSourceType": 2,
  162. "inputSourceSubType": None,
  163. "fieldName": None,
  164. "inputSourceValue": crawler_plan_id,
  165. "inputSourceLabel": crawler_plan_name,
  166. "inputSourceModal": 3,
  167. "inputSourceChannel": 5,
  168. }
  169. ]
  170. # 绑定至生成计划
  171. generate_plan_response = aiditApi.bind_crawler_task_to_generate_task(
  172. crawler_task_list=new_crawler_task_list,
  173. generate_task_id="20250703081329508785665",
  174. )
  175. # change article status
  176. article_id_list = [i["article_id"] for i in generate_plan_response]
  177. self.change_article_status_while_publishing(article_id_list=article_id_list)