import json from typing import List, Optional, Dict, Any import uuid from applications.config import ( ES_HOSTS, ELASTIC_SEARCH_INDEX, ES_PASSWORD, MILVUS_CONFIG, DEFAULT_MODEL ) from applications.resource import init_resource_manager from applications.utils.milvus import async_insert_chunk, async_update_embedding from applications.api import get_basic_embedding, fetch_deepseek_completion # 初始化资源管理器 resource_manager = init_resource_manager( es_hosts=ES_HOSTS, es_index=ELASTIC_SEARCH_INDEX, es_password=ES_PASSWORD, milvus_config=MILVUS_CONFIG, ) def hits_to_json(hits): if not hits: return [] special_keys = {"entities", "concepts", "questions", "keywords"} return [ { "pk": hit.id, "score": hit.distance, **{ key: list(value) if key in special_keys else value for key, value in (hit.get("entity", {}) or {}).items() }, } for hit in hits[0] if hit.distance > 0.8 ] def format_json_file(json_obj): output_string = "" for key in json_obj: value = json_obj[key] output_string += f"{key}: {value}\n" return output_string class AggregatePattern: def __init__(self, resource): self.mysql_client = resource.mysql_client self.milvus_client = resource.milvus_client async def get_task(self): query = """ SELECT t1.id, dim_name, name, t1.description, t1.detail, t2.output_type, t2.content,t2.constrains FROM modes t1 JOIN outputs t2 ON t1.output_id = t2.output_id WHERE standardization_status = 0; """ response = await self.mysql_client.async_fetch(query=query) return response async def base_vector_search( self, query_vec: List[float], anns_field: str = "mode_vector", limit: int = 5, expr: Optional[str] = None, search_params: Optional[Dict[str, Any]] = None, ): if search_params is None: search_params = {"metric_type": "COSINE", "params": {"ef": 64}} response = await asyncio.to_thread( self.milvus_client.search, data=[query_vec], anns_field=anns_field, param=search_params, limit=limit, expr=expr, output_fields=["id", "mode_id"], ) print(response) return {"results": hits_to_json(response)[:10]} @staticmethod async def get_result_by_llm(task): output_type = task['output_type'] content = task['content'] constrains = task['constrains'] detail = task['detail'] mode_name = task['name'] dim = task['dim_name'] decr = task['description'] constrains_string = "" for item in json.loads(constrains): constrains_string += format_json_file(item) + "\n" prompt = f""" 请基于以下输入信息,总结出一套可复用的知识模式。 ## 输入信息 **知识维度**:{dim} **模式名称**:{mode_name} **模式描述**:{decr} **模式详情**:{format_json_file(json.loads(detail)['不变的'])} **产出类型**:{output_type} **产出内容**:{format_json_file(json.loads(content))} **产出格式约束**:{constrains_string} ## 输出要求 请按照以下结构输出JSON格式的结果: 1. **模式名称**:直接使用输入中的模式名称或基于其提炼 2. **简要描述**:用1-2句话概括模式的核心价值和适用场景 3. **所有知识的总结**:详细阐述以下方面: - 灵感来源:模式的创意起点和驱动因素 - 内容结构:固定的内容组织形式和要素 - 写作方法:具体的创作技巧和表达方式 - 核心逻辑:模式运作的基本原则和策略 - 产出模板:可复用的内容框架和变量说明 - 应用场景:模式的适用领域和使用价值 请确保总结全面、结构清晰,直接基于输入信息进行提炼,不要添加额外信息。 ## 输出格式 {{ "name": "模式名称", "description": "简要描述", "details": "详细的知识总结,包含灵感来源、内容结构、写作方法、核心逻辑、产出模板、应用场景等完整要素" }} """ response = await fetch_deepseek_completion( prompt=prompt, model="DeepSeek-R1", output_type="json" ) return response async def merge_as_new_result(self, most_related_mode_id, new_result, pk_id, mode_id): # 查询出结果 fetch_query = f"""select name, description, result from standard_mode where standard_id = %s""" response = await self.mysql_client.async_fetch( query=fetch_query, params=(most_related_mode_id,) ) if not response: return else: old_result = response[0] merge_prompt = f""" ## 任务说明 您需要将一个新的模式知识与标准模式进行知识融合,创建一个综合性的知识模式。 ## 融合要求 1. **名称融合**:基于标准模式名称和新的模式知识名称,创建一个新的、有意义的名称,体现两者的所有特征 2. **描述融合**:合并标准模式描述和新的模式知识描述,创建一个全面综合的描述,体现两者的所有特征 3. **知识总结融合**:整合标准模式总结和新的模式知识,确保包含所有相关信息,按照以下结构组织: - 灵感来源 - 内容结构 - 写作方法 - 核心逻辑 - 产出模板 - 应用场景 ## 输入信息 **标准模式名称**:{old_result['name']} **标准模式描述**:{old_result['description']} **标准模式总结**:{old_result['result']} **新的模式知识名称**:{new_result['name']} **新的模式知识描述**:{new_result['description']} **新的模式知识**:{new_result['details']} ## 输出要求 请严格按照以下JSON格式输出,无需考虑输出长度,不要丢失信息。 输出 JSON 的每一个字段的 value 字段都必须是字符串类型,不能是其他类型。 ## 输出格式 {{ "name": "融合后的模式名称,保留所有信息", "description": "融合后的综合描述,保留所有信息", "details": "融合后的详细知识总结,保留所有信息,必须包含以下完整要素:灵感来源、内容结构、写作方法、核心逻辑、产出模板、应用场景" }} 请确保融合后的知识模式包含两个模式的所有信息。输出前请校验,合并后的知识模式是否涵盖输入二者的所有元素,如果有缺失,请补全 Please think step by step. """ print(merge_prompt) response = await fetch_deepseek_completion( prompt=merge_prompt, model="DeepSeek-R1", output_type="json" ) print(json.dumps(response, ensure_ascii=False, indent=4)) update_query1 = """ UPDATE modes SET standardization_status = %s, \ standard_mode_id = %s, \ result = %s \ WHERE id = %s; \ """ await self.mysql_client.async_save( query=update_query1, params=( 2, most_related_mode_id, new_result['details'], mode_id ) ) update_query2 = """ update standard_mode set name = %s, description = %s, result = %s where standard_id = %s """ await self.mysql_client.async_save( query=update_query2, params=( response['name'], response['description'], response['details'], most_related_mode_id ) ) # 更新 milvus text = f"模式名称:{response['name']},模式描述:{response['description']}" embedding = await get_basic_embedding(text, DEFAULT_MODEL) data = { "id": pk_id, "mode_id": most_related_mode_id, "mode_vector": embedding, } await async_update_embedding(self.milvus_client, data) async def save_to_mysql_and_milvus(self, task, result): standard_id = f"standard-{str(uuid.uuid4())}" query = """ INSERT INTO standard_mode (standard_id, name, description, result) VALUES (%s, %s, %s, %s); """ await self.mysql_client.async_save( query=query, params=( standard_id, result['name'], result['description'], result['details'] ) ) text = f"维度:{task['dim_name']},模式名称:{result['name']},模式描述:{result['description']}" embedding = await get_basic_embedding(text, DEFAULT_MODEL) data = { "mode_id": standard_id, "mode_vector": embedding, } await async_insert_chunk(self.milvus_client, data) update_query = """ UPDATE modes SET standardization_status = %s, standard_mode_id = %s, result = %s WHERE id = %s; """ await self.mysql_client.async_save( query=update_query, params=( 2, standard_id, result['details'], task['id'] ) ) async def deal(self): tasks = await self.get_task() if not tasks: return else: for task in tasks: text = f"维度:{task['dim_name']},模式名称:{task['name']},模式描述:{task['description']}" print(text) embedding = await get_basic_embedding(text, DEFAULT_MODEL) response = await self.base_vector_search(query_vec=embedding) results = response['results'] if not results: # set as new print("set as new standard mode") response = await self.get_result_by_llm(task) print(json.dumps(response, ensure_ascii=False, indent=4)) await self.save_to_mysql_and_milvus(task, response) else: most_related_mode_id = results[0]['mode_id'] pk_id = results[0]['id'] response = await self.get_result_by_llm(task) print("new result") print(json.dumps(response, ensure_ascii=False, indent=4)) await self.merge_as_new_result(most_related_mode_id, response, pk_id, task['id']) async def run_aggregate_pattern(): await resource_manager.startup() aggregate_pattern = AggregatePattern(resource_manager) await aggregate_pattern.deal() await resource_manager.shutdown() if __name__ == "__main__": import asyncio asyncio.run(run_aggregate_pattern())