| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307 |
- import json
- from typing import List, Optional, Dict, Any
- import uuid
- from applications.config import (
- ES_HOSTS,
- ELASTIC_SEARCH_INDEX,
- ES_PASSWORD,
- MILVUS_CONFIG,
- DEFAULT_MODEL
- )
- from applications.resource import init_resource_manager
- from applications.utils.milvus import async_insert_chunk, async_update_embedding
- from applications.api import get_basic_embedding, fetch_deepseek_completion
- # 初始化资源管理器
- resource_manager = init_resource_manager(
- es_hosts=ES_HOSTS,
- es_index=ELASTIC_SEARCH_INDEX,
- es_password=ES_PASSWORD,
- milvus_config=MILVUS_CONFIG,
- )
- def hits_to_json(hits):
- if not hits:
- return []
- special_keys = {"entities", "concepts", "questions", "keywords"}
- return [
- {
- "pk": hit.id,
- "score": hit.distance,
- **{
- key: list(value) if key in special_keys else value
- for key, value in (hit.get("entity", {}) or {}).items()
- },
- }
- for hit in hits[0] if hit.distance > 0.8
- ]
- def format_json_file(json_obj):
- output_string = ""
- for key in json_obj:
- value = json_obj[key]
- output_string += f"{key}: {value}\n"
- return output_string
- class AggregatePattern:
- def __init__(self, resource):
- self.mysql_client = resource.mysql_client
- self.milvus_client = resource.milvus_client
- async def get_task(self):
- query = """
- SELECT t1.id, dim_name, name, t1.description, t1.detail,
- t2.output_type, t2.content,t2.constrains
- FROM modes t1 JOIN outputs t2 ON t1.output_id = t2.output_id
- WHERE standardization_status = 0
- LIMIT 5;
- """
- response = await self.mysql_client.async_fetch(query=query)
- return response
- async def base_vector_search(
- self,
- query_vec: List[float],
- anns_field: str = "mode_vector",
- limit: int = 5,
- expr: Optional[str] = None,
- search_params: Optional[Dict[str, Any]] = None,
- ):
- if search_params is None:
- search_params = {"metric_type": "COSINE", "params": {"ef": 64}}
- response = await asyncio.to_thread(
- self.milvus_client.search,
- data=[query_vec],
- anns_field=anns_field,
- param=search_params,
- limit=limit,
- expr=expr,
- output_fields=["id", "mode_id"],
- )
- print(response)
- return {"results": hits_to_json(response)[:10]}
- @staticmethod
- async def get_result_by_llm(task):
- output_type = task['output_type']
- content = task['content']
- constrains = task['constrains']
- detail = task['detail']
- mode_name = task['name']
- dim = task['dim_name']
- decr = task['description']
- constrains_string = ""
- for item in json.loads(constrains):
- constrains_string += format_json_file(item) + "\n"
- prompt = f"""
- 请基于以下输入信息,总结出一套可复用的知识模式。
- ## 输入信息
- **知识维度**:{dim}
- **模式名称**:{mode_name}
- **模式描述**:{decr}
- **模式详情**:{format_json_file(json.loads(detail)['不变的'])}
- **产出类型**:{output_type}
- **产出内容**:{format_json_file(json.loads(content))}
- **产出格式约束**:{constrains_string}
- ## 输出要求
- 请按照以下结构输出JSON格式的结果:
- 1. **模式名称**:直接使用输入中的模式名称或基于其提炼
- 2. **简要描述**:用1-2句话概括模式的核心价值和适用场景
- 3. **所有知识的总结**:详细阐述以下方面:
- - 灵感来源:模式的创意起点和驱动因素
- - 内容结构:固定的内容组织形式和要素
- - 写作方法:具体的创作技巧和表达方式
- - 核心逻辑:模式运作的基本原则和策略
- - 产出模板:可复用的内容框架和变量说明
- - 应用场景:模式的适用领域和使用价值
- 请确保总结全面、结构清晰,直接基于输入信息进行提炼,不要添加额外信息。
- ## 输出格式
- {{
- "name": "模式名称",
- "description": "简要描述",
- "details": "详细的知识总结,包含灵感来源、内容结构、写作方法、核心逻辑、产出模板、应用场景等完整要素"
- }}
- """
- response = await fetch_deepseek_completion(
- prompt=prompt,
- model="DeepSeek-R1",
- output_type="json"
- )
- return response
- async def merge_as_new_result(self, most_related_mode_id, new_result, pk_id, mode_id):
- # 查询出结果
- fetch_query = f"""select name, description, result from standard_mode where standard_id = %s"""
- response = await self.mysql_client.async_fetch(
- query=fetch_query, params=(most_related_mode_id,)
- )
- if not response:
- return
- else:
- old_result = response[0]
- merge_prompt = f"""
- ## 任务说明
- 您需要将一个新的模式知识与标准模式进行知识融合,创建一个综合性的知识模式。
- ## 融合要求
- 1. **名称融合**:基于标准模式名称和新的模式知识名称,创建一个新的、有意义的名称,体现两者的所有特征
- 2. **描述融合**:合并标准模式描述和新的模式知识描述,创建一个全面综合的描述,体现两者的所有特征
- 3. **知识总结融合**:整合标准模式总结和新的模式知识,确保包含所有相关信息,按照以下结构组织:
- - 灵感来源
- - 内容结构
- - 写作方法
- - 核心逻辑
- - 产出模板
- - 应用场景
- ## 输入信息
- **标准模式名称**:{old_result['name']}
- **标准模式描述**:{old_result['description']}
- **标准模式总结**:{old_result['result']}
- **新的模式知识名称**:{new_result['name']}
- **新的模式知识描述**:{new_result['description']}
- **新的模式知识**:{new_result['details']}
- ## 输出要求
- 请严格按照以下JSON格式输出,无需考虑输出长度,不要丢失信息。
- ## 输出格式
- {{
- "name": "融合后的模式名称,保留所有信息",
- "description": "融合后的综合描述,保留所有信息",
- "details": "融合后的详细知识总结,保留所有信息,必须包含以下完整要素:灵感来源、内容结构、写作方法、核心逻辑、产出模板、应用场景"
- }}
- 请确保融合后的知识模式包含两个模式的所有信息。输出前请校验,合并后的知识模式是否涵盖输入二者的所有元素,如果有缺失,请补全
- Please think step by step.
- """
- print(merge_prompt)
- response = await fetch_deepseek_completion(
- prompt=merge_prompt,
- model="DeepSeek-R1",
- output_type="json"
- )
- print(json.dumps(response, ensure_ascii=False, indent=4))
- update_query1 = """
- UPDATE modes
- SET standardization_status = %s, \
- standard_mode_id = %s, \
- result = %s \
- WHERE id = %s; \
- """
- await self.mysql_client.async_save(
- query=update_query1, params=(
- 2,
- most_related_mode_id,
- new_result['details'],
- mode_id
- )
- )
- update_query2 = """
- update standard_mode
- set name = %s,
- description = %s,
- result = %s
- where standard_id = %s
- """
- await self.mysql_client.async_save(
- query=update_query2, params=(
- response['name'],
- response['description'],
- response['details'],
- most_related_mode_id
- )
- )
- # 更新 milvus
- text = f"模式名称:{response['name']},模式描述:{response['description']}"
- embedding = await get_basic_embedding(text, DEFAULT_MODEL)
- data = {
- "id": pk_id,
- "mode_id": most_related_mode_id,
- "mode_vector": embedding,
- }
- await async_update_embedding(self.milvus_client, data)
- async def save_to_mysql_and_milvus(self, task, result):
- standard_id = f"standard-{str(uuid.uuid4())}"
- query = """
- INSERT INTO standard_mode (standard_id, name, description, result) VALUES
- (%s, %s, %s, %s);
- """
- await self.mysql_client.async_save(
- query=query, params=(
- standard_id,
- result['name'],
- result['description'],
- result['details']
- )
- )
- text = f"维度:{task['dim_name']},模式名称:{result['name']},模式描述:{result['description']}"
- embedding = await get_basic_embedding(text, DEFAULT_MODEL)
- data = {
- "mode_id": standard_id,
- "mode_vector": embedding,
- }
- await async_insert_chunk(self.milvus_client, data)
- update_query = """
- UPDATE modes
- SET standardization_status = %s, standard_mode_id = %s, result = %s WHERE id = %s;
- """
- await self.mysql_client.async_save(
- query=update_query, params=(
- 2,
- standard_id,
- result['details'],
- task['id']
- )
- )
- async def deal(self):
- tasks = await self.get_task()
- if not tasks:
- return
- else:
- for task in tasks:
- text = f"维度:{task['dim_name']},模式名称:{task['name']},模式描述:{task['description']}"
- print(text)
- embedding = await get_basic_embedding(text, DEFAULT_MODEL)
- response = await self.base_vector_search(query_vec=embedding)
- results = response['results']
- if not results:
- # set as new
- print("set as new standard mode")
- response = await self.get_result_by_llm(task)
- print(json.dumps(response, ensure_ascii=False, indent=4))
- await self.save_to_mysql_and_milvus(task, response)
- else:
- most_related_mode_id = results[0]['mode_id']
- pk_id = results[0]['id']
- response = await self.get_result_by_llm(task)
- print("new result")
- print(json.dumps(response, ensure_ascii=False, indent=4))
- await self.merge_as_new_result(most_related_mode_id, response, pk_id, task['id'])
- async def run_aggregate_pattern():
- await resource_manager.startup()
- aggregate_pattern = AggregatePattern(resource_manager)
- await aggregate_pattern.deal()
- await resource_manager.shutdown()
- if __name__ == "__main__":
- import asyncio
- asyncio.run(run_aggregate_pattern())
|