top_article_generalize.py 9.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272
  1. import json
  2. import datetime
  3. import traceback
  4. from tqdm import tqdm
  5. from typing import List, Dict
  6. from pymysql.cursors import DictCursor
  7. from applications import aiditApi
  8. from applications.api import ElasticSearchClient
  9. from applications.api import fetch_deepseek_completion
  10. from applications.api import similarity_between_title_list
  11. from applications.db import DatabaseConnector
  12. from config import long_articles_config, denet_config
  13. from config.es_mappings import index_name
  14. extract_keywords_prompt = """
  15. 你是一名优秀的中文专家
  16. ## 任务说明
  17. 需要你从输入的标题和总结中提取3个搜索词
  18. ### 输出
  19. 输出结构为JSON,格式如下
  20. {output_format}
  21. ## 输入
  22. 标题:{title}
  23. 总结:{summary}
  24. """
  25. class TopArticleGeneralize:
  26. def __init__(self):
  27. self.long_articles_client = DatabaseConnector(long_articles_config)
  28. self.long_articles_client.connect()
  29. self.denet_client = DatabaseConnector(denet_config)
  30. self.denet_client.connect()
  31. self.elastic_search = ElasticSearchClient(index_=index_name)
  32. def fetch_distinct_top_titles(self) -> List[Dict]:
  33. """
  34. 获取top100生成计划中的文章标题
  35. """
  36. fetch_query = f"""
  37. select distinct title, source_id
  38. from datastat_sort_strategy
  39. where produce_plan_name = 'TOP100' and source_id is not null;
  40. """
  41. return self.long_articles_client.fetch(fetch_query, cursor_type=DictCursor)
  42. def get_title_read_info_detail(self, title: str) -> bool:
  43. """
  44. 获取标题最近3篇文章的阅读均值倍数
  45. """
  46. fetch_query = f"""
  47. select read_rate
  48. from datastat_sort_strategy
  49. where produce_plan_name = 'TOP100' and title = '{title}'
  50. order by date_str desc limit 3;
  51. """
  52. fetch_response = self.long_articles_client.fetch(
  53. fetch_query, cursor_type=DictCursor
  54. )
  55. read_rate_list = [i["read_rate"] for i in fetch_response]
  56. for read_rate in read_rate_list:
  57. if read_rate < 1.2:
  58. return False
  59. return True
  60. def get_article_summary(self, source_id: str) -> str:
  61. """
  62. use source_id to get article summary
  63. """
  64. fetch_query = f"""
  65. select output
  66. from produce_plan_module_output
  67. where plan_exe_id = '{source_id}' and produce_module_type = 18;
  68. """
  69. fetch_response = self.denet_client.fetch(fetch_query, cursor_type=DictCursor)
  70. return fetch_response[0]["output"]
  71. def get_keys_by_ai(self, title_obj: Dict) -> List[str]:
  72. """
  73. 获取关键词
  74. """
  75. title = title_obj["title"]
  76. source_id = title_obj["source_id"]
  77. article_summary = self.get_article_summary(source_id)
  78. output_format = {"keys": ["key1", "key2", "key3"]}
  79. prompt = extract_keywords_prompt.format(
  80. output_format=output_format, title=title, summary=article_summary
  81. )
  82. response = fetch_deepseek_completion(
  83. model="deepseek-V3", prompt=prompt, output_type="json"
  84. )
  85. return response["keys"]
  86. def migrate_article_to_es(self, max_article_id: int = 0):
  87. fetch_query = f"""
  88. select article_id, platform, out_account_id, title
  89. from crawler_meta_article
  90. where status = 1 and article_id > %s
  91. order by article_id limit 10000;
  92. """
  93. # 执行查询
  94. results = self.long_articles_client.fetch(
  95. fetch_query, cursor_type=DictCursor, params=(max_article_id,)
  96. )
  97. docs = [
  98. {
  99. "_index": index_name,
  100. "_id": item["article_id"],
  101. "_source": {
  102. "article_id": item["article_id"],
  103. "platform": item["platform"],
  104. "out_account_id": item["out_account_id"],
  105. "title": item["title"],
  106. },
  107. }
  108. for item in results
  109. ]
  110. self.elastic_search.bulk_insert(docs)
  111. class TopArticleGeneralizeFromArticlePool(TopArticleGeneralize):
  112. def get_candidate_articles(self, article_id_tuple):
  113. fetch_query = f"""
  114. select article_id, title, link, llm_sensitivity, score, category_by_ai
  115. from crawler_meta_article
  116. where status = %s
  117. and title_sensitivity = %s
  118. and article_id in %s;
  119. """
  120. fetch_response = self.long_articles_client.fetch(
  121. fetch_query, cursor_type=DictCursor, params=(1, 0, article_id_tuple)
  122. )
  123. return fetch_response
  124. def change_article_status_while_publishing(self, article_id_list):
  125. """
  126. :param: article_id_list: 文章的唯一 id
  127. :return:
  128. """
  129. update_sql = f"""
  130. update crawler_meta_article
  131. set status = %s
  132. where article_id in %s and status = %s;
  133. """
  134. affect_rows = self.long_articles_client.save(
  135. query=update_sql, params=(2, tuple(article_id_list), 1)
  136. )
  137. def deal(self):
  138. # migrate articles
  139. max_id = self.elastic_search.get_max_article_id()
  140. self.migrate_article_to_es(max_id)
  141. # fetch titles
  142. title_obj_list = self.fetch_distinct_top_titles()
  143. publishing_article_list = []
  144. for title_obj in tqdm(title_obj_list):
  145. if self.get_title_read_info_detail(title_obj["title"]):
  146. try:
  147. keys = self.get_keys_by_ai(title_obj)
  148. related_articles = self.elastic_search.search(
  149. search_keys=",".join(keys), size=50
  150. )
  151. if related_articles:
  152. article_id_list = [i["article_id"] for i in related_articles]
  153. article_list = self.get_candidate_articles(
  154. tuple(article_id_list)
  155. )
  156. title_list = [i["title"] for i in article_list]
  157. # 相关性排序
  158. similarity_array = similarity_between_title_list(
  159. title_list, [title_obj["title"]]
  160. )
  161. response_with_similarity_list = []
  162. for index, item in enumerate(article_list):
  163. item["similarity"] = similarity_array[index][0]
  164. response_with_similarity_list.append(item)
  165. sorted_response_with_similarity_list = sorted(
  166. response_with_similarity_list,
  167. key=lambda k: k["similarity"],
  168. reverse=True,
  169. )
  170. # 过滤相关性分大于0.8的文章
  171. sorted_response_with_similarity_list = [
  172. i
  173. for i in sorted_response_with_similarity_list
  174. if i["similarity"] <= 0.8
  175. ]
  176. publishing_article_list += sorted_response_with_similarity_list[
  177. :10
  178. ]
  179. except Exception as e:
  180. print(e)
  181. print(traceback.format_exc())
  182. url_list = [i["link"] for i in publishing_article_list]
  183. if url_list:
  184. # create_crawler_plan
  185. crawler_plan_response = aiditApi.auto_create_crawler_task(
  186. plan_id=None,
  187. plan_name="自动绑定-Top内容泛化-{}--{}".format(
  188. datetime.date.today().__str__(), len(url_list)
  189. ),
  190. plan_tag="Top内容泛化",
  191. article_source="weixin",
  192. url_list=url_list,
  193. )
  194. # save to db
  195. crawler_plan_id = crawler_plan_response["data"]["id"]
  196. crawler_plan_name = crawler_plan_response["data"]["name"]
  197. # auto bind to generate plan
  198. new_crawler_task_list = [
  199. {
  200. "contentType": 1,
  201. "inputSourceType": 2,
  202. "inputSourceSubType": None,
  203. "fieldName": None,
  204. "inputSourceValue": crawler_plan_id,
  205. "inputSourceLabel": crawler_plan_name,
  206. "inputSourceModal": 3,
  207. "inputSourceChannel": 5,
  208. }
  209. ]
  210. # 绑定至生成计划
  211. generate_plan_response = aiditApi.bind_crawler_task_to_generate_task(
  212. crawler_task_list=new_crawler_task_list,
  213. generate_task_id="20250703081329508785665",
  214. )
  215. # change article status
  216. article_id_list = [i["article_id"] for i in publishing_article_list]
  217. self.change_article_status_while_publishing(article_id_list=article_id_list)
  218. class TopArticleGeneralizeFromVideoPool(TopArticleGeneralize):
  219. def get_candidate_videos(self, key):
  220. fetch_query = f"""
  221. select article_title, content_trace_id, audit_video_id
  222. from publish_single_video_source
  223. where status = 0 and bad_status = 0 and article_title like '%{key}%'
  224. """
  225. fetch_response = self.long_articles_client.fetch(
  226. fetch_query, cursor_type=DictCursor
  227. )
  228. return fetch_response
  229. def deal(self):
  230. title_obj_list = self.fetch_distinct_top_titles()
  231. publishing_article_list = []
  232. for title_obj in tqdm(title_obj_list):
  233. if self.get_title_read_info_detail(title_obj["title"]):
  234. temp = []
  235. keys = self.get_keys_by_ai(title_obj)
  236. for key in keys:
  237. candidate_articles = self.get_candidate_videos(key)
  238. temp += candidate_articles
  239. print(json.dumps(temp, ensure_ascii=False, indent=4))