123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272 |
- import json
- import datetime
- import traceback
- from tqdm import tqdm
- from typing import List, Dict
- from pymysql.cursors import DictCursor
- from applications import aiditApi
- from applications.api import ElasticSearchClient
- from applications.api import fetch_deepseek_completion
- from applications.api import similarity_between_title_list
- from applications.db import DatabaseConnector
- from config import long_articles_config, denet_config
- from config.es_mappings import index_name
- extract_keywords_prompt = """
- 你是一名优秀的中文专家
- ## 任务说明
- 需要你从输入的标题和总结中提取3个搜索词
- ### 输出
- 输出结构为JSON,格式如下
- {output_format}
- ## 输入
- 标题:{title}
- 总结:{summary}
- """
- class TopArticleGeneralize:
- def __init__(self):
- self.long_articles_client = DatabaseConnector(long_articles_config)
- self.long_articles_client.connect()
- self.denet_client = DatabaseConnector(denet_config)
- self.denet_client.connect()
- self.elastic_search = ElasticSearchClient(index_=index_name)
- def fetch_distinct_top_titles(self) -> List[Dict]:
- """
- 获取top100生成计划中的文章标题
- """
- fetch_query = f"""
- select distinct title, source_id
- from datastat_sort_strategy
- where produce_plan_name = 'TOP100' and source_id is not null;
- """
- return self.long_articles_client.fetch(fetch_query, cursor_type=DictCursor)
- def get_title_read_info_detail(self, title: str) -> bool:
- """
- 获取标题最近3篇文章的阅读均值倍数
- """
- fetch_query = f"""
- select read_rate
- from datastat_sort_strategy
- where produce_plan_name = 'TOP100' and title = '{title}'
- order by date_str desc limit 3;
- """
- fetch_response = self.long_articles_client.fetch(
- fetch_query, cursor_type=DictCursor
- )
- read_rate_list = [i["read_rate"] for i in fetch_response]
- for read_rate in read_rate_list:
- if read_rate < 1.2:
- return False
- return True
- def get_article_summary(self, source_id: str) -> str:
- """
- use source_id to get article summary
- """
- fetch_query = f"""
- select output
- from produce_plan_module_output
- where plan_exe_id = '{source_id}' and produce_module_type = 18;
- """
- fetch_response = self.denet_client.fetch(fetch_query, cursor_type=DictCursor)
- return fetch_response[0]["output"]
- def get_keys_by_ai(self, title_obj: Dict) -> List[str]:
- """
- 获取关键词
- """
- title = title_obj["title"]
- source_id = title_obj["source_id"]
- article_summary = self.get_article_summary(source_id)
- output_format = {"keys": ["key1", "key2", "key3"]}
- prompt = extract_keywords_prompt.format(
- output_format=output_format, title=title, summary=article_summary
- )
- response = fetch_deepseek_completion(
- model="deepseek-V3", prompt=prompt, output_type="json"
- )
- return response["keys"]
- def migrate_article_to_es(self, max_article_id: int = 0):
- fetch_query = f"""
- select article_id, platform, out_account_id, title
- from crawler_meta_article
- where status = 1 and article_id > %s
- order by article_id limit 10000;
- """
- # 执行查询
- results = self.long_articles_client.fetch(
- fetch_query, cursor_type=DictCursor, params=(max_article_id,)
- )
- docs = [
- {
- "_index": index_name,
- "_id": item["article_id"],
- "_source": {
- "article_id": item["article_id"],
- "platform": item["platform"],
- "out_account_id": item["out_account_id"],
- "title": item["title"],
- },
- }
- for item in results
- ]
- self.elastic_search.bulk_insert(docs)
- class TopArticleGeneralizeFromArticlePool(TopArticleGeneralize):
- def get_candidate_articles(self, article_id_tuple):
- fetch_query = f"""
- select article_id, title, link, llm_sensitivity, score, category_by_ai
- from crawler_meta_article
- where status = %s
- and title_sensitivity = %s
- and article_id in %s;
- """
- fetch_response = self.long_articles_client.fetch(
- fetch_query, cursor_type=DictCursor, params=(1, 0, article_id_tuple)
- )
- return fetch_response
- def change_article_status_while_publishing(self, article_id_list):
- """
- :param: article_id_list: 文章的唯一 id
- :return:
- """
- update_sql = f"""
- update crawler_meta_article
- set status = %s
- where article_id in %s and status = %s;
- """
- affect_rows = self.long_articles_client.save(
- query=update_sql, params=(2, tuple(article_id_list), 1)
- )
- def deal(self):
- # migrate articles
- max_id = self.elastic_search.get_max_article_id()
- self.migrate_article_to_es(max_id)
- # fetch titles
- title_obj_list = self.fetch_distinct_top_titles()
- publishing_article_list = []
- for title_obj in tqdm(title_obj_list):
- if self.get_title_read_info_detail(title_obj["title"]):
- try:
- keys = self.get_keys_by_ai(title_obj)
- related_articles = self.elastic_search.search(
- search_keys=",".join(keys), size=50
- )
- if related_articles:
- article_id_list = [i["article_id"] for i in related_articles]
- article_list = self.get_candidate_articles(
- tuple(article_id_list)
- )
- title_list = [i["title"] for i in article_list]
- # 相关性排序
- similarity_array = similarity_between_title_list(
- title_list, [title_obj["title"]]
- )
- response_with_similarity_list = []
- for index, item in enumerate(article_list):
- item["similarity"] = similarity_array[index][0]
- response_with_similarity_list.append(item)
- sorted_response_with_similarity_list = sorted(
- response_with_similarity_list,
- key=lambda k: k["similarity"],
- reverse=True,
- )
- # 过滤相关性分大于0.8的文章
- sorted_response_with_similarity_list = [
- i
- for i in sorted_response_with_similarity_list
- if i["similarity"] <= 0.8
- ]
- publishing_article_list += sorted_response_with_similarity_list[
- :10
- ]
- except Exception as e:
- print(e)
- print(traceback.format_exc())
- url_list = [i["link"] for i in publishing_article_list]
- if url_list:
- # create_crawler_plan
- crawler_plan_response = aiditApi.auto_create_crawler_task(
- plan_id=None,
- plan_name="自动绑定-Top内容泛化-{}--{}".format(
- datetime.date.today().__str__(), len(url_list)
- ),
- plan_tag="Top内容泛化",
- article_source="weixin",
- url_list=url_list,
- )
- # save to db
- crawler_plan_id = crawler_plan_response["data"]["id"]
- crawler_plan_name = crawler_plan_response["data"]["name"]
- # auto bind to generate plan
- new_crawler_task_list = [
- {
- "contentType": 1,
- "inputSourceType": 2,
- "inputSourceSubType": None,
- "fieldName": None,
- "inputSourceValue": crawler_plan_id,
- "inputSourceLabel": crawler_plan_name,
- "inputSourceModal": 3,
- "inputSourceChannel": 5,
- }
- ]
- # 绑定至生成计划
- generate_plan_response = aiditApi.bind_crawler_task_to_generate_task(
- crawler_task_list=new_crawler_task_list,
- generate_task_id="20250703081329508785665",
- )
- # change article status
- article_id_list = [i["article_id"] for i in publishing_article_list]
- self.change_article_status_while_publishing(article_id_list=article_id_list)
- class TopArticleGeneralizeFromVideoPool(TopArticleGeneralize):
- def get_candidate_videos(self, key):
- fetch_query = f"""
- select article_title, content_trace_id, audit_video_id
- from publish_single_video_source
- where status = 0 and bad_status = 0 and article_title like '%{key}%'
- """
- fetch_response = self.long_articles_client.fetch(
- fetch_query, cursor_type=DictCursor
- )
- return fetch_response
- def deal(self):
- title_obj_list = self.fetch_distinct_top_titles()
- publishing_article_list = []
- for title_obj in tqdm(title_obj_list):
- if self.get_title_read_info_detail(title_obj["title"]):
- temp = []
- keys = self.get_keys_by_ai(title_obj)
- for key in keys:
- candidate_articles = self.get_candidate_videos(key)
- temp += candidate_articles
- print(json.dumps(temp, ensure_ascii=False, indent=4))
|