123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200 |
- import time
- import datetime
- from typing import List, Dict
- from pymysql.cursors import DictCursor
- from applications import aiditApi
- from applications.api import fetch_deepseek_completion
- from applications.api import similarity_between_title_list
- from applications.db import DatabaseConnector
- from config import long_articles_config, denet_config
- extract_keywords_prompt = """
- 你是一名优秀的中文专家
- ## 任务说明
- 需要你从输入的标题和总结中提取3个搜索词
- ### 输出
- 输出结构为JSON,格式如下
- {output_format}
- ## 输入
- 标题:{title}
- 总结:{summary}
- """
- class TopArticleGeneralize:
- def __init__(self):
- self.long_articles_client = DatabaseConnector(long_articles_config)
- self.long_articles_client.connect()
- self.denet_client = DatabaseConnector(denet_config)
- self.denet_client.connect()
- def fetch_distinct_top_titles(self) -> List[Dict]:
- """
- 获取top100生成计划中的文章标题
- """
- fetch_query = f"""
- select distinct title, source_id
- from datastat_sort_strategy
- where produce_plan_name = 'TOP100' and source_id is not null;
- """
- return self.long_articles_client.fetch(fetch_query, cursor_type=DictCursor)
- def get_title_read_info_detail(self, title: str) -> bool:
- """
- 获取标题最近3篇文章的阅读均值倍数
- """
- fetch_query = f"""
- select read_rate
- from datastat_sort_strategy
- where produce_plan_name = 'TOP100' and title = '{title}'
- order by date_str desc limit 3;
- """
- fetch_response = self.long_articles_client.fetch(
- fetch_query, cursor_type=DictCursor
- )
- read_rate_list = [i["read_rate"] for i in fetch_response]
- for read_rate in read_rate_list:
- if read_rate < 1.2:
- return False
- return True
- def get_article_summary(self, source_id: str) -> str:
- """
- use source_id to get article summary
- """
- fetch_query = f"""
- select output
- from produce_plan_module_output
- where plan_exe_id = '{source_id}' and produce_module_type = 18;
- """
- fetch_response = self.denet_client.fetch(fetch_query, cursor_type=DictCursor)
- return fetch_response[0]["output"]
- def get_keys_by_ai(self, title_obj: Dict) -> List[str]:
- """
- 获取关键词
- """
- title = title_obj["title"]
- source_id = title_obj["source_id"]
- article_summary = self.get_article_summary(source_id)
- output_format = {"keys": ["key1", "key2", "key3"]}
- prompt = extract_keywords_prompt.format(
- output_format=output_format, title=title, summary=article_summary
- )
- response = fetch_deepseek_completion(
- model="deepseek-V3", prompt=prompt, output_type="json"
- )
- return response["keys"]
- class TopArticleGeneralizeFromArticlePool(TopArticleGeneralize):
- def get_candidate_articles(self, key):
- fetch_query = f"""
- select article_id, title, link, llm_sensitivity, score, category_by_ai
- from crawler_meta_article
- where status = 1
- and title_sensitivity = 0
- and title like '%{key}%';
- """
- fetch_response = self.long_articles_client.fetch(
- fetch_query, cursor_type=DictCursor
- )
- return fetch_response
- def change_article_status_while_publishing(self, article_id_list):
- """
- :param: article_id_list: 文章的唯一 id
- :return:
- """
- update_sql = f"""
- update crawler_meta_article
- set status = %s
- where article_id in %s and status = %s;
- """
- affect_rows = self.long_articles_client.save(
- query=update_sql, params=(2, tuple(article_id_list), 1)
- )
- def deal(self):
- title_obj_list = self.fetch_distinct_top_titles()
- publishing_article_list = []
- for title_obj in title_obj_list:
- if (
- title_obj["title"]
- == "母亲去世136天后,女子回到家,在锅盖上留下一句话,瞬间泪崩!"
- ):
- if self.get_title_read_info_detail(title_obj["title"]):
- temp = []
- keys = self.get_keys_by_ai(title_obj)
- for key in keys:
- candidate_articles = self.get_candidate_articles(key)
- temp += candidate_articles
- if temp:
- print(title_obj["title"])
- title_list = [i["title"] for i in temp]
- # 相关性排序
- similarity_array = similarity_between_title_list(
- title_list, [title_obj["title"]]
- )
- print(similarity_array)
- print(title_list)
- response_with_similarity_list = []
- for index, item in enumerate(temp):
- item["similarity"] = similarity_array[index][0]
- response_with_similarity_list.append(item)
- sorted_response_with_similarity_list = sorted(
- response_with_similarity_list,
- key=lambda k: k["similarity"],
- reverse=True,
- )
- publishing_article_list += sorted_response_with_similarity_list[
- :10
- ]
- url_list = [i["link"] for i in publishing_article_list]
- if url_list:
- # create_crawler_plan
- crawler_plan_response = aiditApi.auto_create_crawler_task(
- plan_id=None,
- plan_name="自动绑定-Top内容泛化-{}--{}".format(
- datetime.date.today().__str__(), len(url_list)
- ),
- plan_tag="Top内容泛化",
- article_source="weixin",
- url_list=url_list,
- )
- # save to db
- crawler_plan_id = crawler_plan_response["data"]["id"]
- crawler_plan_name = crawler_plan_response["data"]["name"]
- # auto bind to generate plan
- new_crawler_task_list = [
- {
- "contentType": 1,
- "inputSourceType": 2,
- "inputSourceSubType": None,
- "fieldName": None,
- "inputSourceValue": crawler_plan_id,
- "inputSourceLabel": crawler_plan_name,
- "inputSourceModal": 3,
- "inputSourceChannel": 5,
- }
- ]
- # 绑定至生成计划
- generate_plan_response = aiditApi.bind_crawler_task_to_generate_task(
- crawler_task_list=new_crawler_task_list,
- generate_task_id="20250703081329508785665",
- )
- # change article status
- article_id_list = [i["article_id"] for i in generate_plan_response]
- self.change_article_status_while_publishing(article_id_list=article_id_list)
|