top_article_generalize.py 8.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230
  1. import json
  2. import time
  3. import datetime
  4. import traceback
  5. from tqdm import tqdm
  6. from typing import List, Dict
  7. from pymysql.cursors import DictCursor
  8. from applications import aiditApi
  9. from applications.api import fetch_deepseek_completion
  10. from applications.api import similarity_between_title_list
  11. from applications.db import DatabaseConnector
  12. from config import long_articles_config, denet_config
  13. extract_keywords_prompt = """
  14. 你是一名优秀的中文专家
  15. ## 任务说明
  16. 需要你从输入的标题和总结中提取3个搜索词
  17. ### 输出
  18. 输出结构为JSON,格式如下
  19. {output_format}
  20. ## 输入
  21. 标题:{title}
  22. 总结:{summary}
  23. """
  24. class TopArticleGeneralize:
  25. def __init__(self):
  26. self.long_articles_client = DatabaseConnector(long_articles_config)
  27. self.long_articles_client.connect()
  28. self.denet_client = DatabaseConnector(denet_config)
  29. self.denet_client.connect()
  30. def fetch_distinct_top_titles(self) -> List[Dict]:
  31. """
  32. 获取top100生成计划中的文章标题
  33. """
  34. fetch_query = f"""
  35. select distinct title, source_id
  36. from datastat_sort_strategy
  37. where produce_plan_name = 'TOP100' and source_id is not null;
  38. """
  39. return self.long_articles_client.fetch(fetch_query, cursor_type=DictCursor)
  40. def get_title_read_info_detail(self, title: str) -> bool:
  41. """
  42. 获取标题最近3篇文章的阅读均值倍数
  43. """
  44. fetch_query = f"""
  45. select read_rate
  46. from datastat_sort_strategy
  47. where produce_plan_name = 'TOP100' and title = '{title}'
  48. order by date_str desc limit 3;
  49. """
  50. fetch_response = self.long_articles_client.fetch(
  51. fetch_query, cursor_type=DictCursor
  52. )
  53. read_rate_list = [i["read_rate"] for i in fetch_response]
  54. for read_rate in read_rate_list:
  55. if read_rate < 1.2:
  56. return False
  57. return True
  58. def get_article_summary(self, source_id: str) -> str:
  59. """
  60. use source_id to get article summary
  61. """
  62. fetch_query = f"""
  63. select output
  64. from produce_plan_module_output
  65. where plan_exe_id = '{source_id}' and produce_module_type = 18;
  66. """
  67. fetch_response = self.denet_client.fetch(fetch_query, cursor_type=DictCursor)
  68. return fetch_response[0]["output"]
  69. def get_keys_by_ai(self, title_obj: Dict) -> List[str]:
  70. """
  71. 获取关键词
  72. """
  73. title = title_obj["title"]
  74. source_id = title_obj["source_id"]
  75. article_summary = self.get_article_summary(source_id)
  76. output_format = {"keys": ["key1", "key2", "key3"]}
  77. prompt = extract_keywords_prompt.format(
  78. output_format=output_format, title=title, summary=article_summary
  79. )
  80. response = fetch_deepseek_completion(
  81. model="deepseek-V3", prompt=prompt, output_type="json"
  82. )
  83. return response["keys"]
  84. class TopArticleGeneralizeFromArticlePool(TopArticleGeneralize):
  85. def get_candidate_articles(self, key):
  86. fetch_query = f"""
  87. select article_id, title, link, llm_sensitivity, score, category_by_ai
  88. from crawler_meta_article
  89. where status = 1
  90. and title_sensitivity = 0
  91. and title like '%{key}%';
  92. """
  93. fetch_response = self.long_articles_client.fetch(
  94. fetch_query, cursor_type=DictCursor
  95. )
  96. return fetch_response
  97. def change_article_status_while_publishing(self, article_id_list):
  98. """
  99. :param: article_id_list: 文章的唯一 id
  100. :return:
  101. """
  102. update_sql = f"""
  103. update crawler_meta_article
  104. set status = %s
  105. where article_id in %s and status = %s;
  106. """
  107. affect_rows = self.long_articles_client.save(
  108. query=update_sql, params=(2, tuple(article_id_list), 1)
  109. )
  110. def deal(self):
  111. title_obj_list = self.fetch_distinct_top_titles()
  112. publishing_article_list = []
  113. for title_obj in tqdm(title_obj_list):
  114. if self.get_title_read_info_detail(title_obj["title"]):
  115. try:
  116. temp = []
  117. keys = self.get_keys_by_ai(title_obj)
  118. for key in keys:
  119. candidate_articles = self.get_candidate_articles(key)
  120. temp += candidate_articles
  121. if temp:
  122. title_list = [i["title"] for i in temp]
  123. # 相关性排序
  124. similarity_array = similarity_between_title_list(
  125. title_list, [title_obj["title"]]
  126. )
  127. response_with_similarity_list = []
  128. for index, item in enumerate(temp):
  129. item["similarity"] = similarity_array[index][0]
  130. response_with_similarity_list.append(item)
  131. sorted_response_with_similarity_list = sorted(
  132. response_with_similarity_list,
  133. key=lambda k: k["similarity"],
  134. reverse=True,
  135. )
  136. publishing_article_list += sorted_response_with_similarity_list[
  137. :10
  138. ]
  139. except Exception as e:
  140. print(e)
  141. print(traceback.format_exc())
  142. url_list = [i["link"] for i in publishing_article_list]
  143. if url_list:
  144. # create_crawler_plan
  145. crawler_plan_response = aiditApi.auto_create_crawler_task(
  146. plan_id=None,
  147. plan_name="自动绑定-Top内容泛化-{}--{}".format(
  148. datetime.date.today().__str__(), len(url_list)
  149. ),
  150. plan_tag="Top内容泛化",
  151. article_source="weixin",
  152. url_list=url_list,
  153. )
  154. # save to db
  155. crawler_plan_id = crawler_plan_response["data"]["id"]
  156. crawler_plan_name = crawler_plan_response["data"]["name"]
  157. # auto bind to generate plan
  158. new_crawler_task_list = [
  159. {
  160. "contentType": 1,
  161. "inputSourceType": 2,
  162. "inputSourceSubType": None,
  163. "fieldName": None,
  164. "inputSourceValue": crawler_plan_id,
  165. "inputSourceLabel": crawler_plan_name,
  166. "inputSourceModal": 3,
  167. "inputSourceChannel": 5,
  168. }
  169. ]
  170. # 绑定至生成计划
  171. generate_plan_response = aiditApi.bind_crawler_task_to_generate_task(
  172. crawler_task_list=new_crawler_task_list,
  173. generate_task_id="20250703081329508785665",
  174. )
  175. # change article status
  176. article_id_list = [i["article_id"] for i in publishing_article_list]
  177. self.change_article_status_while_publishing(article_id_list=article_id_list)
  178. class TopArticleGeneralizeFromVideoPool(TopArticleGeneralize):
  179. def get_candidate_videos(self, key):
  180. fetch_query = f"""
  181. select article_title, content_trace_id, audit_video_id
  182. from publish_single_video_source
  183. where status = 0 and bad_status = 0 and article_title like '%{key}%'
  184. """
  185. fetch_response = self.long_articles_client.fetch(
  186. fetch_query, cursor_type=DictCursor
  187. )
  188. return fetch_response
  189. def deal(self):
  190. title_obj_list = self.fetch_distinct_top_titles()
  191. publishing_article_list = []
  192. for title_obj in tqdm(title_obj_list):
  193. if self.get_title_read_info_detail(title_obj["title"]):
  194. temp = []
  195. keys = self.get_keys_by_ai(title_obj)
  196. for key in keys:
  197. candidate_articles = self.get_candidate_videos(key)
  198. temp += candidate_articles
  199. print(json.dumps(temp, ensure_ascii=False, indent=4))
  200. #
  201. # TopArticleGeneralizeFromVideoPool().deal()