import json import datetime import traceback from tqdm import tqdm from typing import List, Dict from pymysql.cursors import DictCursor from applications import aiditApi from applications.api import ElasticSearchClient from applications.api import fetch_deepseek_completion from applications.api import similarity_between_title_list from applications.db import DatabaseConnector from config import long_articles_config, denet_config from config.es_mappings import index_name extract_keywords_prompt = """ 你是一名优秀的中文专家 ## 任务说明 需要你从输入的标题和总结中提取3个搜索词 ### 输出 输出结构为JSON,格式如下 {output_format} ## 输入 标题:{title} 总结:{summary} """ class TopArticleGeneralize: def __init__(self): self.long_articles_client = DatabaseConnector(long_articles_config) self.long_articles_client.connect() self.denet_client = DatabaseConnector(denet_config) self.denet_client.connect() self.elastic_search = ElasticSearchClient(index_=index_name) def fetch_distinct_top_titles(self) -> List[Dict]: """ 获取top100生成计划中的文章标题 """ fetch_query = f""" select distinct title, source_id from datastat_sort_strategy where produce_plan_name = 'TOP100' and source_id is not null; """ return self.long_articles_client.fetch(fetch_query, cursor_type=DictCursor) def get_title_read_info_detail(self, title: str) -> bool: """ 获取标题最近3篇文章的阅读均值倍数 """ fetch_query = f""" select read_rate from datastat_sort_strategy where produce_plan_name = 'TOP100' and title = '{title}' order by date_str desc limit 3; """ fetch_response = self.long_articles_client.fetch( fetch_query, cursor_type=DictCursor ) read_rate_list = [i["read_rate"] for i in fetch_response] for read_rate in read_rate_list: if read_rate < 1.2: return False return True def get_article_summary(self, source_id: str) -> str: """ use source_id to get article summary """ fetch_query = f""" select output from produce_plan_module_output where plan_exe_id = '{source_id}' and produce_module_type = 18; """ fetch_response = self.denet_client.fetch(fetch_query, cursor_type=DictCursor) return fetch_response[0]["output"] def get_keys_by_ai(self, title_obj: Dict) -> List[str]: """ 获取关键词 """ title = title_obj["title"] source_id = title_obj["source_id"] article_summary = self.get_article_summary(source_id) output_format = {"keys": ["key1", "key2", "key3"]} prompt = extract_keywords_prompt.format( output_format=output_format, title=title, summary=article_summary ) response = fetch_deepseek_completion( model="deepseek-V3", prompt=prompt, output_type="json" ) return response["keys"] def migrate_article_to_es(self, max_article_id: int = 0): fetch_query = f""" select article_id, platform, out_account_id, title from crawler_meta_article where status = 1 and article_id > %s order by article_id limit 10000; """ # 执行查询 results = self.long_articles_client.fetch( fetch_query, cursor_type=DictCursor, params=(max_article_id,) ) docs = [ { "_index": index_name, "_id": item["article_id"], "_source": { "article_id": item["article_id"], "platform": item["platform"], "out_account_id": item["out_account_id"], "title": item["title"], }, } for item in results ] self.elastic_search.bulk_insert(docs) class TopArticleGeneralizeFromArticlePool(TopArticleGeneralize): def get_candidate_articles(self, article_id_tuple): fetch_query = f""" select article_id, title, link, llm_sensitivity, score, category_by_ai from crawler_meta_article where status = %s and title_sensitivity = %s and article_id in %s; """ fetch_response = self.long_articles_client.fetch( fetch_query, cursor_type=DictCursor, params=(1, 0, article_id_tuple) ) return fetch_response def change_article_status_while_publishing(self, article_id_list): """ :param: article_id_list: 文章的唯一 id :return: """ update_sql = f""" update crawler_meta_article set status = %s where article_id in %s and status = %s; """ affect_rows = self.long_articles_client.save( query=update_sql, params=(2, tuple(article_id_list), 1) ) def deal(self): # migrate articles max_id = self.elastic_search.get_max_article_id() self.migrate_article_to_es(max_id) # fetch titles title_obj_list = self.fetch_distinct_top_titles() publishing_article_list = [] for title_obj in tqdm(title_obj_list): if self.get_title_read_info_detail(title_obj["title"]): try: keys = self.get_keys_by_ai(title_obj) related_articles = self.elastic_search.search( search_keys=",".join(keys), size=50 ) if related_articles: article_id_list = [i["article_id"] for i in related_articles] article_list = self.get_candidate_articles( tuple(article_id_list) ) title_list = [i["title"] for i in article_list] # 相关性排序 similarity_array = similarity_between_title_list( title_list, [title_obj["title"]] ) response_with_similarity_list = [] for index, item in enumerate(article_list): item["similarity"] = similarity_array[index][0] response_with_similarity_list.append(item) sorted_response_with_similarity_list = sorted( response_with_similarity_list, key=lambda k: k["similarity"], reverse=True, ) # 过滤相关性分大于0.8的文章 sorted_response_with_similarity_list = [ i for i in sorted_response_with_similarity_list if i["similarity"] <= 0.8 ] publishing_article_list += sorted_response_with_similarity_list[ :10 ] except Exception as e: print(e) print(traceback.format_exc()) url_list = [i["link"] for i in publishing_article_list] if url_list: # create_crawler_plan crawler_plan_response = aiditApi.auto_create_crawler_task( plan_id=None, plan_name="自动绑定-Top内容泛化-{}--{}".format( datetime.date.today().__str__(), len(url_list) ), plan_tag="Top内容泛化", article_source="weixin", url_list=url_list, ) # save to db crawler_plan_id = crawler_plan_response["data"]["id"] crawler_plan_name = crawler_plan_response["data"]["name"] # auto bind to generate plan new_crawler_task_list = [ { "contentType": 1, "inputSourceType": 2, "inputSourceSubType": None, "fieldName": None, "inputSourceValue": crawler_plan_id, "inputSourceLabel": crawler_plan_name, "inputSourceModal": 3, "inputSourceChannel": 5, } ] # 绑定至生成计划 generate_plan_response = aiditApi.bind_crawler_task_to_generate_task( crawler_task_list=new_crawler_task_list, generate_task_id="20250703081329508785665", ) # change article status article_id_list = [i["article_id"] for i in publishing_article_list] self.change_article_status_while_publishing(article_id_list=article_id_list) class TopArticleGeneralizeFromVideoPool(TopArticleGeneralize): def get_candidate_videos(self, key): fetch_query = f""" select article_title, content_trace_id, audit_video_id from publish_single_video_source where status = 0 and bad_status = 0 and article_title like '%{key}%' """ fetch_response = self.long_articles_client.fetch( fetch_query, cursor_type=DictCursor ) return fetch_response def deal(self): title_obj_list = self.fetch_distinct_top_titles() publishing_article_list = [] for title_obj in tqdm(title_obj_list): if self.get_title_read_info_detail(title_obj["title"]): temp = [] keys = self.get_keys_by_ai(title_obj) for key in keys: candidate_articles = self.get_candidate_videos(key) temp += candidate_articles print(json.dumps(temp, ensure_ascii=False, indent=4))