top_article_generalize.py 9.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265
  1. import json
  2. import time
  3. import datetime
  4. import traceback
  5. from tqdm import tqdm
  6. from typing import List, Dict
  7. from pymysql.cursors import DictCursor
  8. from applications import aiditApi
  9. from applications.api import ElasticSearchClient
  10. from applications.api import fetch_deepseek_completion
  11. from applications.api import similarity_between_title_list
  12. from applications.db import DatabaseConnector
  13. from config import long_articles_config, denet_config
  14. from config.es_mappings import index_name
  15. extract_keywords_prompt = """
  16. 你是一名优秀的中文专家
  17. ## 任务说明
  18. 需要你从输入的标题和总结中提取3个搜索词
  19. ### 输出
  20. 输出结构为JSON,格式如下
  21. {output_format}
  22. ## 输入
  23. 标题:{title}
  24. 总结:{summary}
  25. """
  26. class TopArticleGeneralize:
  27. def __init__(self):
  28. self.long_articles_client = DatabaseConnector(long_articles_config)
  29. self.long_articles_client.connect()
  30. self.denet_client = DatabaseConnector(denet_config)
  31. self.denet_client.connect()
  32. self.elastic_search = ElasticSearchClient(index_=index_name)
  33. def fetch_distinct_top_titles(self) -> List[Dict]:
  34. """
  35. 获取top100生成计划中的文章标题
  36. """
  37. fetch_query = f"""
  38. select distinct title, source_id
  39. from datastat_sort_strategy
  40. where produce_plan_name = 'TOP100' and source_id is not null;
  41. """
  42. return self.long_articles_client.fetch(fetch_query, cursor_type=DictCursor)
  43. def get_title_read_info_detail(self, title: str) -> bool:
  44. """
  45. 获取标题最近3篇文章的阅读均值倍数
  46. """
  47. fetch_query = f"""
  48. select read_rate
  49. from datastat_sort_strategy
  50. where produce_plan_name = 'TOP100' and title = '{title}'
  51. order by date_str desc limit 3;
  52. """
  53. fetch_response = self.long_articles_client.fetch(
  54. fetch_query, cursor_type=DictCursor
  55. )
  56. read_rate_list = [i["read_rate"] for i in fetch_response]
  57. for read_rate in read_rate_list:
  58. if read_rate < 1.2:
  59. return False
  60. return True
  61. def get_article_summary(self, source_id: str) -> str:
  62. """
  63. use source_id to get article summary
  64. """
  65. fetch_query = f"""
  66. select output
  67. from produce_plan_module_output
  68. where plan_exe_id = '{source_id}' and produce_module_type = 18;
  69. """
  70. fetch_response = self.denet_client.fetch(fetch_query, cursor_type=DictCursor)
  71. return fetch_response[0]["output"]
  72. def get_keys_by_ai(self, title_obj: Dict) -> List[str]:
  73. """
  74. 获取关键词
  75. """
  76. title = title_obj["title"]
  77. source_id = title_obj["source_id"]
  78. article_summary = self.get_article_summary(source_id)
  79. output_format = {"keys": ["key1", "key2", "key3"]}
  80. prompt = extract_keywords_prompt.format(
  81. output_format=output_format, title=title, summary=article_summary
  82. )
  83. response = fetch_deepseek_completion(
  84. model="deepseek-V3", prompt=prompt, output_type="json"
  85. )
  86. return response["keys"]
  87. def migrate_article_to_es(self, max_article_id: int = 0):
  88. fetch_query = f"""
  89. select article_id, platform, out_account_id, title
  90. from crawler_meta_article
  91. where status = 1 and article_id > %s
  92. order by article_id limit 10000;
  93. """
  94. # 执行查询
  95. results = self.long_articles_client.fetch(
  96. fetch_query, cursor_type=DictCursor, params=(max_article_id,)
  97. )
  98. docs = [
  99. {
  100. "_index": index_name,
  101. "_id": item["article_id"],
  102. "_source": {
  103. "article_id": item["article_id"],
  104. "platform": item["platform"],
  105. "out_account_id": item["out_account_id"],
  106. "title": item["title"],
  107. },
  108. }
  109. for item in results
  110. ]
  111. self.elastic_search.bulk_insert(docs)
  112. class TopArticleGeneralizeFromArticlePool(TopArticleGeneralize):
  113. def get_candidate_articles(self, article_id_tuple):
  114. fetch_query = f"""
  115. select article_id, title, link, llm_sensitivity, score, category_by_ai
  116. from crawler_meta_article
  117. where status = %s
  118. and title_sensitivity = %s
  119. and article_id in %s;
  120. """
  121. fetch_response = self.long_articles_client.fetch(
  122. fetch_query, cursor_type=DictCursor, params=(1, 0, article_id_tuple)
  123. )
  124. return fetch_response
  125. def change_article_status_while_publishing(self, article_id_list):
  126. """
  127. :param: article_id_list: 文章的唯一 id
  128. :return:
  129. """
  130. update_sql = f"""
  131. update crawler_meta_article
  132. set status = %s
  133. where article_id in %s and status = %s;
  134. """
  135. affect_rows = self.long_articles_client.save(
  136. query=update_sql, params=(2, tuple(article_id_list), 1)
  137. )
  138. def deal(self):
  139. # migrate articles
  140. max_id = self.elastic_search.get_max_article_id()
  141. self.migrate_article_to_es(max_id)
  142. # fetch titles
  143. title_obj_list = self.fetch_distinct_top_titles()
  144. publishing_article_list = []
  145. for title_obj in tqdm(title_obj_list):
  146. if self.get_title_read_info_detail(title_obj["title"]):
  147. try:
  148. keys = self.get_keys_by_ai(title_obj)
  149. related_articles = self.elastic_search.search(
  150. search_keys=",".join(keys), size=50
  151. )
  152. if related_articles:
  153. article_id_list = [i["article_id"] for i in related_articles]
  154. article_list = self.get_candidate_articles(
  155. tuple(article_id_list)
  156. )
  157. title_list = [i["title"] for i in article_list]
  158. # 相关性排序
  159. similarity_array = similarity_between_title_list(
  160. title_list, [title_obj["title"]]
  161. )
  162. response_with_similarity_list = []
  163. for index, item in enumerate(article_list):
  164. item["similarity"] = similarity_array[index][0]
  165. response_with_similarity_list.append(item)
  166. sorted_response_with_similarity_list = sorted(
  167. response_with_similarity_list,
  168. key=lambda k: k["similarity"],
  169. reverse=True,
  170. )
  171. publishing_article_list += sorted_response_with_similarity_list[
  172. :10
  173. ]
  174. except Exception as e:
  175. print(e)
  176. print(traceback.format_exc())
  177. url_list = [i["link"] for i in publishing_article_list]
  178. if url_list:
  179. # create_crawler_plan
  180. crawler_plan_response = aiditApi.auto_create_crawler_task(
  181. plan_id=None,
  182. plan_name="自动绑定-Top内容泛化-{}--{}".format(
  183. datetime.date.today().__str__(), len(url_list)
  184. ),
  185. plan_tag="Top内容泛化",
  186. article_source="weixin",
  187. url_list=url_list,
  188. )
  189. # save to db
  190. crawler_plan_id = crawler_plan_response["data"]["id"]
  191. crawler_plan_name = crawler_plan_response["data"]["name"]
  192. # auto bind to generate plan
  193. new_crawler_task_list = [
  194. {
  195. "contentType": 1,
  196. "inputSourceType": 2,
  197. "inputSourceSubType": None,
  198. "fieldName": None,
  199. "inputSourceValue": crawler_plan_id,
  200. "inputSourceLabel": crawler_plan_name,
  201. "inputSourceModal": 3,
  202. "inputSourceChannel": 5,
  203. }
  204. ]
  205. # 绑定至生成计划
  206. generate_plan_response = aiditApi.bind_crawler_task_to_generate_task(
  207. crawler_task_list=new_crawler_task_list,
  208. generate_task_id="20250703081329508785665",
  209. )
  210. # change article status
  211. article_id_list = [i["article_id"] for i in publishing_article_list]
  212. self.change_article_status_while_publishing(article_id_list=article_id_list)
  213. class TopArticleGeneralizeFromVideoPool(TopArticleGeneralize):
  214. def get_candidate_videos(self, key):
  215. fetch_query = f"""
  216. select article_title, content_trace_id, audit_video_id
  217. from publish_single_video_source
  218. where status = 0 and bad_status = 0 and article_title like '%{key}%'
  219. """
  220. fetch_response = self.long_articles_client.fetch(
  221. fetch_query, cursor_type=DictCursor
  222. )
  223. return fetch_response
  224. def deal(self):
  225. title_obj_list = self.fetch_distinct_top_titles()
  226. publishing_article_list = []
  227. for title_obj in tqdm(title_obj_list):
  228. if self.get_title_read_info_detail(title_obj["title"]):
  229. temp = []
  230. keys = self.get_keys_by_ai(title_obj)
  231. for key in keys:
  232. candidate_articles = self.get_candidate_videos(key)
  233. temp += candidate_articles
  234. print(json.dumps(temp, ensure_ascii=False, indent=4))