import time import datetime from typing import List, Dict from pymysql.cursors import DictCursor from applications import aiditApi from applications.api import fetch_deepseek_completion from applications.api import similarity_between_title_list from applications.db import DatabaseConnector from config import long_articles_config, denet_config extract_keywords_prompt = """ 你是一名优秀的中文专家 ## 任务说明 需要你从输入的标题和总结中提取3个搜索词 ### 输出 输出结构为JSON,格式如下 {output_format} ## 输入 标题:{title} 总结:{summary} """ class TopArticleGeneralize: def __init__(self): self.long_articles_client = DatabaseConnector(long_articles_config) self.long_articles_client.connect() self.denet_client = DatabaseConnector(denet_config) self.denet_client.connect() def fetch_distinct_top_titles(self) -> List[Dict]: """ 获取top100生成计划中的文章标题 """ fetch_query = f""" select distinct title, source_id from datastat_sort_strategy where produce_plan_name = 'TOP100' and source_id is not null; """ return self.long_articles_client.fetch(fetch_query, cursor_type=DictCursor) def get_title_read_info_detail(self, title: str) -> bool: """ 获取标题最近3篇文章的阅读均值倍数 """ fetch_query = f""" select read_rate from datastat_sort_strategy where produce_plan_name = 'TOP100' and title = '{title}' order by date_str desc limit 3; """ fetch_response = self.long_articles_client.fetch( fetch_query, cursor_type=DictCursor ) read_rate_list = [i["read_rate"] for i in fetch_response] for read_rate in read_rate_list: if read_rate < 1.2: return False return True def get_article_summary(self, source_id: str) -> str: """ use source_id to get article summary """ fetch_query = f""" select output from produce_plan_module_output where plan_exe_id = '{source_id}' and produce_module_type = 18; """ fetch_response = self.denet_client.fetch(fetch_query, cursor_type=DictCursor) return fetch_response[0]["output"] def get_keys_by_ai(self, title_obj: Dict) -> List[str]: """ 获取关键词 """ title = title_obj["title"] source_id = title_obj["source_id"] article_summary = self.get_article_summary(source_id) output_format = {"keys": ["key1", "key2", "key3"]} prompt = extract_keywords_prompt.format( output_format=output_format, title=title, summary=article_summary ) response = fetch_deepseek_completion( model="deepseek-V3", prompt=prompt, output_type="json" ) return response["keys"] class TopArticleGeneralizeFromArticlePool(TopArticleGeneralize): def get_candidate_articles(self, key): fetch_query = f""" select article_id, title, link, llm_sensitivity, score, category_by_ai from crawler_meta_article where status = 1 and title_sensitivity = 0 and title like '%{key}%'; """ fetch_response = self.long_articles_client.fetch( fetch_query, cursor_type=DictCursor ) return fetch_response def change_article_status_while_publishing(self, article_id_list): """ :param: article_id_list: 文章的唯一 id :return: """ update_sql = f""" update crawler_meta_article set status = %s where article_id in %s and status = %s; """ affect_rows = self.long_articles_client.save( query=update_sql, params=(2, tuple(article_id_list), 1) ) def deal(self): title_obj_list = self.fetch_distinct_top_titles() publishing_article_list = [] for title_obj in title_obj_list: if ( title_obj["title"] == "母亲去世136天后,女子回到家,在锅盖上留下一句话,瞬间泪崩!" ): if self.get_title_read_info_detail(title_obj["title"]): temp = [] keys = self.get_keys_by_ai(title_obj) for key in keys: candidate_articles = self.get_candidate_articles(key) temp += candidate_articles if temp: print(title_obj["title"]) title_list = [i["title"] for i in temp] # 相关性排序 similarity_array = similarity_between_title_list( title_list, [title_obj["title"]] ) print(similarity_array) print(title_list) response_with_similarity_list = [] for index, item in enumerate(temp): item["similarity"] = similarity_array[index][0] response_with_similarity_list.append(item) sorted_response_with_similarity_list = sorted( response_with_similarity_list, key=lambda k: k["similarity"], reverse=True, ) publishing_article_list += sorted_response_with_similarity_list[ :10 ] url_list = [i["link"] for i in publishing_article_list] if url_list: # create_crawler_plan crawler_plan_response = aiditApi.auto_create_crawler_task( plan_id=None, plan_name="自动绑定-Top内容泛化-{}--{}".format( datetime.date.today().__str__(), len(url_list) ), plan_tag="Top内容泛化", article_source="weixin", url_list=url_list, ) # save to db crawler_plan_id = crawler_plan_response["data"]["id"] crawler_plan_name = crawler_plan_response["data"]["name"] # auto bind to generate plan new_crawler_task_list = [ { "contentType": 1, "inputSourceType": 2, "inputSourceSubType": None, "fieldName": None, "inputSourceValue": crawler_plan_id, "inputSourceLabel": crawler_plan_name, "inputSourceModal": 3, "inputSourceChannel": 5, } ] # 绑定至生成计划 generate_plan_response = aiditApi.bind_crawler_task_to_generate_task( crawler_task_list=new_crawler_task_list, generate_task_id="20250703081329508785665", ) # change article status article_id_list = [i["article_id"] for i in generate_plan_response] self.change_article_status_while_publishing(article_id_list=article_id_list)