from applications.api import AsyncElasticSearchClient from applications.api import fetch_deepseek_completion class HotKeysGenerate: def __init__(self, pool, log_client, trace_id): self.elastic_client = AsyncElasticSearchClient() self.pool = pool self.log_client = log_client self.trace_id = trace_id @staticmethod async def generate_prompt(formated_titles): """ 生成热键的prompt """ prompt = f""" 请你根据以下的热点时事,帮我生成一些热搜词组,我需要用这些词组来搜索相关的文章。 生成的每个词组不能太短,需要能够覆盖热点时事的主要内容。如果遇到相似的热点时事,需要合并为一个词组。 如果遇到敏感话题,直接过滤掉即可 ## 输入 {formated_titles} ## 输出 输出要求是 JSON 格式,返回一个数组,数组中的每个元素都是一个字符串,字符串是一个热搜词组。 参考结构:{{ "hot_keys": ["key1", "key2", "key3", ...] }} """ return prompt async def get_hot_titles(self): query = """ SELECT title FROM hot_point_titles WHERE useful = 2 AND create_time > DATE_SUB(CURDATE(), INTERVAL 3 DAY) ; """ response = await self.pool.async_fetch(query=query) hot_titles = [item["title"] for item in response if '习近平' not in item['title']] return "\n".join(hot_titles) async def deal(self): article_id_set = set() hot_titles = await self.get_hot_titles() prompt = await self.generate_prompt(hot_titles) completion = fetch_deepseek_completion(model="default", prompt=prompt, output_type="json") hot_keys = completion["hot_keys"] if not hot_keys: return list(article_id_set) for key in hot_keys: response = await self.elastic_client.search( search_keys=key ) for item in response[:5]: article_id_set.add(item["article_id"]) return list(article_id_set)