luojunhui há 1 semana atrás
pai
commit
4d2bbd0162

+ 2 - 0
applications/async_task/__init__.py

@@ -3,6 +3,7 @@ from .chunk_task import ChunkBooksTask
 from .delete_task import DeleteTask
 from .auto_rechunk_task import AutoRechunkTask
 from .build_graph import BuildGraph
+from .record_pattern import RecordPattern
 
 
 __all__ = [
@@ -11,4 +12,5 @@ __all__ = [
     "AutoRechunkTask",
     "BuildGraph",
     "ChunkBooksTask",
+    "RecordPattern",
 ]

+ 307 - 0
applications/async_task/aggregate_pattern.py

@@ -0,0 +1,307 @@
+import json
+from typing import List, Optional, Dict, Any
+import uuid
+
+from applications.config import (
+    ES_HOSTS,
+    ELASTIC_SEARCH_INDEX,
+    ES_PASSWORD,
+    MILVUS_CONFIG,
+    DEFAULT_MODEL
+)
+from applications.resource import init_resource_manager
+from applications.utils.milvus import async_insert_chunk, async_update_embedding
+from applications.api import get_basic_embedding, fetch_deepseek_completion
+
+
+# 初始化资源管理器
+resource_manager = init_resource_manager(
+    es_hosts=ES_HOSTS,
+    es_index=ELASTIC_SEARCH_INDEX,
+    es_password=ES_PASSWORD,
+    milvus_config=MILVUS_CONFIG,
+)
+
+def hits_to_json(hits):
+    if not hits:
+        return []
+
+    special_keys = {"entities", "concepts", "questions", "keywords"}
+    return [
+        {
+            "pk": hit.id,
+            "score": hit.distance,
+            **{
+                key: list(value) if key in special_keys else value
+                for key, value in (hit.get("entity", {}) or {}).items()
+            },
+        }
+        for hit in hits[0] if hit.distance > 0.8
+    ]
+
+def format_json_file(json_obj):
+    output_string = ""
+    for key in json_obj:
+        value = json_obj[key]
+        output_string += f"{key}: {value}\n"
+    return output_string
+
+class AggregatePattern:
+    def __init__(self, resource):
+        self.mysql_client = resource.mysql_client
+        self.milvus_client = resource.milvus_client
+
+    async def get_task(self):
+        query = """
+            SELECT t1.id, dim_name, name, t1.description, t1.detail, 
+                   t2.output_type, t2.content,t2.constrains
+            FROM modes t1 JOIN outputs t2 ON t1.output_id = t2.output_id
+            WHERE standardization_status = 0
+            LIMIT 5;
+        """
+        response = await self.mysql_client.async_fetch(query=query)
+        return response
+
+    async def base_vector_search(
+        self,
+        query_vec: List[float],
+        anns_field: str = "mode_vector",
+        limit: int = 5,
+        expr: Optional[str] = None,
+        search_params: Optional[Dict[str, Any]] = None,
+    ):
+        if search_params is None:
+            search_params = {"metric_type": "COSINE", "params": {"ef": 64}}
+
+        response = await asyncio.to_thread(
+            self.milvus_client.search,
+            data=[query_vec],
+            anns_field=anns_field,
+            param=search_params,
+            limit=limit,
+            expr=expr,
+            output_fields=["id", "mode_id"],
+        )
+        print(response)
+        return {"results": hits_to_json(response)[:10]}
+
+    @staticmethod
+    async def get_result_by_llm(task):
+        output_type = task['output_type']
+        content = task['content']
+        constrains = task['constrains']
+        detail = task['detail']
+        mode_name = task['name']
+        dim = task['dim_name']
+        decr = task['description']
+        constrains_string = ""
+        for item in json.loads(constrains):
+            constrains_string += format_json_file(item) + "\n"
+        prompt = f"""
+请基于以下输入信息,总结出一套可复用的知识模式。
+
+## 输入信息
+**知识维度**:{dim}
+**模式名称**:{mode_name}
+**模式描述**:{decr}
+**模式详情**:{format_json_file(json.loads(detail)['不变的'])}
+**产出类型**:{output_type}
+**产出内容**:{format_json_file(json.loads(content))}
+**产出格式约束**:{constrains_string}
+
+## 输出要求
+请按照以下结构输出JSON格式的结果:
+
+1. **模式名称**:直接使用输入中的模式名称或基于其提炼
+2. **简要描述**:用1-2句话概括模式的核心价值和适用场景
+3. **所有知识的总结**:详细阐述以下方面:
+   - 灵感来源:模式的创意起点和驱动因素
+   - 内容结构:固定的内容组织形式和要素
+   - 写作方法:具体的创作技巧和表达方式
+   - 核心逻辑:模式运作的基本原则和策略
+   - 产出模板:可复用的内容框架和变量说明
+   - 应用场景:模式的适用领域和使用价值
+
+请确保总结全面、结构清晰,直接基于输入信息进行提炼,不要添加额外信息。
+
+## 输出格式
+{{
+    "name": "模式名称",
+    "description": "简要描述",
+    "details": "详细的知识总结,包含灵感来源、内容结构、写作方法、核心逻辑、产出模板、应用场景等完整要素"
+}}
+        """
+        response = await fetch_deepseek_completion(
+            prompt=prompt,
+            model="DeepSeek-R1",
+            output_type="json"
+        )
+        return response
+
+    async def merge_as_new_result(self, most_related_mode_id, new_result, pk_id, mode_id):
+        # 查询出结果
+        fetch_query = f"""select name, description, result from standard_mode where standard_id = %s"""
+        response = await self.mysql_client.async_fetch(
+            query=fetch_query, params=(most_related_mode_id,)
+        )
+        if not response:
+            return
+        else:
+            old_result = response[0]
+            merge_prompt = f"""
+## 任务说明
+您需要将一个新的模式知识与标准模式进行知识融合,创建一个综合性的知识模式。
+
+## 融合要求
+1. **名称融合**:基于标准模式名称和新的模式知识名称,创建一个新的、有意义的名称,体现两者的所有特征
+2. **描述融合**:合并标准模式描述和新的模式知识描述,创建一个全面综合的描述,体现两者的所有特征
+3. **知识总结融合**:整合标准模式总结和新的模式知识,确保包含所有相关信息,按照以下结构组织:
+   - 灵感来源
+   - 内容结构
+   - 写作方法
+   - 核心逻辑
+   - 产出模板
+   - 应用场景
+
+## 输入信息
+**标准模式名称**:{old_result['name']}
+**标准模式描述**:{old_result['description']}
+**标准模式总结**:{old_result['result']}
+
+**新的模式知识名称**:{new_result['name']}
+**新的模式知识描述**:{new_result['description']}
+**新的模式知识**:{new_result['details']}
+
+## 输出要求
+请严格按照以下JSON格式输出,无需考虑输出长度,不要丢失信息。
+
+## 输出格式
+{{
+    "name": "融合后的模式名称,保留所有信息",
+    "description": "融合后的综合描述,保留所有信息",
+    "details": "融合后的详细知识总结,保留所有信息,必须包含以下完整要素:灵感来源、内容结构、写作方法、核心逻辑、产出模板、应用场景"
+}}
+
+请确保融合后的知识模式包含两个模式的所有信息。输出前请校验,合并后的知识模式是否涵盖输入二者的所有元素,如果有缺失,请补全
+ Please think step by step.
+"""
+            print(merge_prompt)
+            response = await fetch_deepseek_completion(
+                prompt=merge_prompt,
+                model="DeepSeek-R1",
+                output_type="json"
+            )
+            print(json.dumps(response, ensure_ascii=False, indent=4))
+
+            update_query1 = """
+                           UPDATE modes
+                           SET standardization_status = %s, \
+                               standard_mode_id       = %s, \
+                               result                 = %s \
+                           WHERE id = %s; \
+                           """
+            await self.mysql_client.async_save(
+                query=update_query1, params=(
+                    2,
+                    most_related_mode_id,
+                    new_result['details'],
+                    mode_id
+                )
+            )
+
+            update_query2 = """
+                update standard_mode
+                set name = %s,
+                    description = %s,
+                    result = %s
+                where standard_id = %s
+            """
+            await self.mysql_client.async_save(
+                query=update_query2, params=(
+                    response['name'],
+                    response['description'],
+                    response['details'],
+                    most_related_mode_id
+                )
+            )
+            # 更新 milvus
+            text = f"模式名称:{response['name']},模式描述:{response['description']}"
+            embedding = await get_basic_embedding(text, DEFAULT_MODEL)
+            data = {
+                "id": pk_id,
+                "mode_id": most_related_mode_id,
+                "mode_vector": embedding,
+            }
+            await async_update_embedding(self.milvus_client, data)
+
+
+    async def save_to_mysql_and_milvus(self, task, result):
+        standard_id = f"standard-{str(uuid.uuid4())}"
+        query = """
+            INSERT INTO standard_mode (standard_id, name, description, result) VALUES 
+                (%s, %s, %s, %s);
+        """
+        await self.mysql_client.async_save(
+            query=query, params=(
+                standard_id,
+                result['name'],
+                result['description'],
+                result['details']
+            )
+        )
+        text = f"维度:{task['dim_name']},模式名称:{result['name']},模式描述:{result['description']}"
+        embedding = await get_basic_embedding(text, DEFAULT_MODEL)
+        data = {
+            "mode_id": standard_id,
+            "mode_vector": embedding,
+        }
+        await async_insert_chunk(self.milvus_client, data)
+
+        update_query = """
+            UPDATE modes 
+            SET standardization_status = %s, standard_mode_id = %s, result = %s WHERE id = %s;
+        """
+        await self.mysql_client.async_save(
+            query=update_query, params=(
+                2,
+                standard_id,
+                result['details'],
+                task['id']
+            )
+        )
+
+    async def deal(self):
+        tasks = await self.get_task()
+        if not tasks:
+            return
+        else:
+            for task in tasks:
+                text = f"维度:{task['dim_name']},模式名称:{task['name']},模式描述:{task['description']}"
+                print(text)
+                embedding = await get_basic_embedding(text, DEFAULT_MODEL)
+                response = await self.base_vector_search(query_vec=embedding)
+                results = response['results']
+                if not results:
+                    # set as new
+                    print("set as new standard mode")
+                    response = await self.get_result_by_llm(task)
+                    print(json.dumps(response, ensure_ascii=False, indent=4))
+                    await self.save_to_mysql_and_milvus(task, response)
+                else:
+                    most_related_mode_id = results[0]['mode_id']
+                    pk_id = results[0]['id']
+                    response = await self.get_result_by_llm(task)
+                    print("new result")
+                    print(json.dumps(response, ensure_ascii=False, indent=4))
+                    await self.merge_as_new_result(most_related_mode_id, response, pk_id, task['id'])
+
+
+async def run_aggregate_pattern():
+    await resource_manager.startup()
+    aggregate_pattern = AggregatePattern(resource_manager)
+    await aggregate_pattern.deal()
+    await resource_manager.shutdown()
+
+if __name__ == "__main__":
+    import asyncio
+    asyncio.run(run_aggregate_pattern())

+ 206 - 0
applications/async_task/record_pattern.py

@@ -0,0 +1,206 @@
+import json
+import traceback
+import uuid
+import logging
+from typing import Dict, List, Tuple, Optional
+from dataclasses import dataclass
+from applications.utils.mysql import Patterns
+
+# 配置日志
+logger = logging.getLogger(__name__)
+
+
+@dataclass
+class ModeData:
+    """模式数据类"""
+
+    id: str
+    name: str
+    percentage: str
+    description: str
+    detail: str
+    output_id: str
+    dimension_name: str
+
+
+@dataclass
+class OutputData:
+    """产物数据类"""
+
+    id: str
+    type: str
+    description: str
+    content: str
+    constraints: str
+
+
+class RecordPattern:
+    def __init__(self, resource):
+        self.pattern_manager = Patterns(resource.mysql_client)
+        self.milvus_client = resource.milvus_client
+
+    async def record_mode(self, mode: ModeData) -> bool:
+        """记录模式数据"""
+        try:
+            mode_tuple = (
+                mode.id,
+                mode.name,
+                mode.percentage,
+                mode.description,
+                json.dumps(mode.detail, ensure_ascii=False),
+                mode.output_id,
+                mode.dimension_name,
+            )
+            result = await self.pattern_manager.insert_modes([mode_tuple])
+            logger.info(f"成功记录模式: {mode.name}")
+            return bool(result)
+        except Exception as e:
+            logger.error(f"记录模式失败: {mode.name}, 错误: {e}")
+            print(traceback.format_exc())
+            return False
+
+    async def record_output(self, output: OutputData) -> bool:
+        """记录产物数据"""
+        try:
+            output_tuple = (
+                output.id,
+                output.type,
+                output.description,
+                output.content,
+                output.constraints,
+            )
+            result = await self.pattern_manager.insert_outputs([output_tuple])
+            logger.info(f"成功记录产物: {output.type}")
+            return bool(result)
+        except Exception as e:
+            logger.error(f"记录产物失败: {output.type}, 错误: {e}")
+            return False
+
+    @staticmethod
+    def _validate_pattern_data(pattern: Dict) -> bool:
+        """验证模式数据格式"""
+        required_keys = ["维度模式分析"]
+        if not all(key in pattern for key in required_keys):
+            logger.error(f"模式数据缺少必要字段: {required_keys}")
+            return False
+
+        dims = pattern["维度模式分析"]
+        if not isinstance(dims, list):
+            logger.error("维度模式分析必须是列表类型")
+            return False
+
+        for dim in dims:
+            if not all(key in dim for key in ["维度名称", "模式列表"]):
+                logger.error("维度数据缺少必要字段")
+                return False
+
+            for method in dim["模式列表"]:
+                required_method_keys = [
+                    "模式命名",
+                    "模式占比",
+                    "模式说明",
+                    "分析详情",
+                    "可复用产物",
+                ]
+                if not all(key in method for key in required_method_keys):
+                    logger.error(f"模式数据缺少必要字段: {required_method_keys}")
+                    return False
+
+                output = method["可复用产物"]
+                required_output_keys = ["产物类型", "产物描述", "产物内容", "变量约束"]
+                if not all(key in output for key in required_output_keys):
+                    logger.error(f"产物数据缺少必要字段: {required_output_keys}")
+                    return False
+
+        return True
+
+    @staticmethod
+    def _extract_mode_data(method: Dict, dim_name: str) -> Optional[ModeData]:
+        """提取模式数据"""
+        try:
+            mode_id = f"mode-{uuid.uuid4()}"
+            output_id = f"output-{uuid.uuid4()}"
+
+            return ModeData(
+                id=mode_id,
+                name=method.get("模式命名", ""),
+                percentage=method.get("模式占比", ""),
+                description=method.get("模式说明", ""),
+                detail=method.get("分析详情", ""),
+                output_id=output_id,
+                dimension_name=dim_name,
+            )
+        except Exception as e:
+            logger.error(f"提取模式数据失败: {e}")
+            return None
+
+    @staticmethod
+    def _extract_output_data(output: Dict, output_id: str) -> Optional[OutputData]:
+        """提取产物数据"""
+        try:
+            return OutputData(
+                id=output_id,
+                type=output.get("产物类型", ""),
+                description=output.get("产物描述", ""),
+                content=json.dumps(output.get("产物内容", {}), ensure_ascii=False),
+                constraints=json.dumps(output.get("变量约束", {}), ensure_ascii=False),
+            )
+        except Exception as e:
+            logger.error(f"提取产物数据失败: {e}")
+            return None
+
+    async def deal(self, pattern: Dict) -> bool:
+        """
+        处理模式数据
+
+        Args:
+            pattern: 模式数据字典
+
+        Returns:
+            bool: 处理是否成功
+        """
+        # 验证数据格式
+        if not self._validate_pattern_data(pattern):
+            logger.error("模式数据格式验证失败")
+            return False
+
+        dims = pattern["维度模式分析"]
+        success_count = 0
+        total_count = 0
+
+        for dim in dims:
+            dim_name = dim["维度名称"]
+
+            for method in dim["模式列表"]:
+                total_count += 1
+
+                # 提取模式数据
+                mode_data = self._extract_mode_data(method, dim_name)
+                if not mode_data:
+                    logger.error(f"提取模式数据失败: {dim_name}")
+                    continue
+
+                # 提取产物数据
+                output = method["可复用产物"]
+                output_data = self._extract_output_data(output, mode_data.output_id)
+                print(output_data)
+                if not output_data:
+                    logger.error(f"提取产物数据失败: {mode_data.name}")
+                    continue
+
+                # 记录数据
+                mode_success = await self.record_mode(mode_data)
+                output_success = await self.record_output(output_data)
+
+                if mode_success and output_success:
+                    success_count += 1
+                    logger.info(f"成功处理模式: {mode_data.name}")
+                else:
+                    logger.error(f"处理模式失败: {mode_data.name}")
+
+        success_rate = (success_count / total_count) * 100 if total_count > 0 else 0
+        logger.info(
+            f"模式处理完成: 成功 {success_count}/{total_count} ({success_rate:.1f}%)"
+        )
+
+        return success_count > 0

+ 19 - 19
applications/resource/resource_manager.py

@@ -3,7 +3,7 @@ from neo4j import AsyncGraphDatabase, AsyncDriver
 
 from applications.config import NEO4j_CONFIG
 from applications.utils.mysql import DatabaseManager
-from applications.utils.milvus.field import fields
+from applications.utils.milvus.field import fields, mode_fields
 from applications.utils.elastic_search import AsyncElasticSearchClient
 
 
@@ -23,9 +23,9 @@ class ResourceManager:
         connections.connect("default", **self.milvus_config)
 
         schema = CollectionSchema(
-            fields, description="Chunk multi-vector embeddings with metadata"
+            mode_fields, description="标准模式向量空间"
         )
-        self.milvus_client = Collection(name="chunk_multi_embeddings_v2", schema=schema)
+        self.milvus_client = Collection(name="standard_mode_embeddings", schema=schema)
 
         # create index
         vector_index_params = {
@@ -33,20 +33,20 @@ class ResourceManager:
             "metric_type": "COSINE",
             "params": {"M": 16, "efConstruction": 200},
         }
-        self.milvus_client.create_index("vector_text", vector_index_params)
-        self.milvus_client.create_index("vector_summary", vector_index_params)
-        self.milvus_client.create_index("vector_questions", vector_index_params)
+        self.milvus_client.create_index("mode_vector", vector_index_params)
         self.milvus_client.load()
 
+
+
     async def startup(self):
         # 初始化 Elasticsearch
-        self.es_client = AsyncElasticSearchClient(
-            index_name=self.es_index, hosts=self.es_hosts, password=self.es_password
-        )
-        if await self.es_client.es.ping():
-            print("✅ Elasticsearch connected")
-        else:
-            print("❌ Elasticsearch connection failed")
+        # self.es_client = AsyncElasticSearchClient(
+        #     index_name=self.es_index, hosts=self.es_hosts, password=self.es_password
+        # )
+        # if await self.es_client.es.ping():
+        #     print("✅ Elasticsearch connected")
+        # else:
+        #     print("❌ Elasticsearch connection failed")
 
         # 初始化 MySQL
         self.mysql_client = DatabaseManager()
@@ -57,10 +57,10 @@ class ResourceManager:
         await self.load_milvus()
         print("✅ Milvus loaded")
 
-        uri: str = NEO4j_CONFIG["url"]
-        auth: tuple = NEO4j_CONFIG["user"], NEO4j_CONFIG["password"]
-        self.graph_client = AsyncGraphDatabase.driver(uri=uri, auth=auth)
-        print("✅ NEO4j loaded")
+        # uri: str = NEO4j_CONFIG["url"]
+        # auth: tuple = NEO4j_CONFIG["user"], NEO4j_CONFIG["password"]
+        # self.graph_client = AsyncGraphDatabase.driver(uri=uri, auth=auth)
+        # print("✅ NEO4j loaded")
 
     async def shutdown(self):
         # 关闭 Elasticsearch
@@ -77,8 +77,8 @@ class ResourceManager:
             await self.mysql_client.close_pools()
             print("Mysql closed")
 
-        await self.graph_client.close()
-        print("Graph closed")
+        # await self.graph_client.close()
+        # print("Graph closed")
 
 
 _resource_manager: ResourceManager | None = None

+ 2 - 0
applications/utils/milvus/__init__.py

@@ -1,9 +1,11 @@
 from .functions import async_insert_chunk, async_delete_chunk
+from .functions import async_update_embedding
 from .search import MilvusSearch
 
 
 __all__ = [
     "async_insert_chunk",
     "async_delete_chunk",
+    "async_update_embedding",
     "MilvusSearch",
 ]

+ 22 - 1
applications/utils/milvus/field.py

@@ -36,5 +36,26 @@ fields = [
     ),
 ]
 
+# pattern fields
+mode_fields = [
+    FieldSchema(
+        name="id",
+        dtype=DataType.INT64,
+        is_primary=True,
+        auto_id=True,
+        description="自增id",
+    ),
+    # 文档 id 字段
+    FieldSchema(
+        name="mode_id", dtype=DataType.VARCHAR, max_length=64, description="模式id"
+    ),
+    FieldSchema(
+            name="mode_vector",
+            dtype=DataType.FLOAT_VECTOR,
+            dim=2560,
+            description="chunk文本 embedding",
+        ),
+]
+
 
-__all__ = ["fields"]
+__all__ = ["fields", "mode_fields"]

+ 24 - 0
applications/utils/milvus/functions.py

@@ -13,6 +13,30 @@ async def async_insert_chunk(collection: pymilvus.Collection, data: Dict) -> Lis
     result = await asyncio.to_thread(collection.insert, [data])
     return result.primary_keys
 
+async def async_update_embedding(collection: pymilvus.Collection, data: Dict, flush: bool = True) -> List[int]:
+
+    """
+    Update entities by ids from a Milvus collection asynchronously.
+    :param collection: Milvus Collection object
+    :param data: Dictionary of fields to update, with keys as field names and values as new values
+    :param flush: Whether to flush the collection after update
+    :return: List of primary key ids that were updated
+    """
+    result = await asyncio.to_thread(
+        collection.upsert,
+        data=data
+    )
+
+    if flush:
+        await asyncio.to_thread(collection.flush)
+
+    # Prefer server-returned PKs; fallback to input pk
+    try:
+        print("update successful")
+        return list(result.primary_keys)
+    except Exception:
+        return [data['id']]
+
 
 async def async_delete_chunk(
     collection: pymilvus.Collection, ids: List[int]

+ 2 - 0
applications/utils/mysql/__init__.py

@@ -3,6 +3,7 @@ from .pool import DatabaseManager
 from .mapper import Dataset, ChatResult
 from .content_chunks import ContentChunks
 from .contents import Contents
+from .patterns import Patterns
 
 
 __all__ = [
@@ -12,4 +13,5 @@ __all__ = [
     "Dataset",
     "ChatResult",
     "Books",
+    "Patterns",
 ]

+ 90 - 0
applications/utils/mysql/patterns.py

@@ -0,0 +1,90 @@
+from .base import BaseMySQLClient
+
+
+class Patterns(BaseMySQLClient):
+    """
+    模式管理类,继承自基础MySQL客户端
+    用于处理维度、模式和产物的数据库操作
+    """
+
+    async def insert_dimensions(self, dimensions):
+        """
+        批量插入维度数据到数据库
+
+        Args:
+            dimensions: 维度数据列表,每个元素包含(dim_name, dim_description, level)
+                        - dim_name: 维度名称
+                        - dim_description: 维度描述
+                        - level: 维度层级
+
+        Returns:
+            awaitable: 异步数据库操作结果
+        """
+        query = """
+            INSERT IGNORE INTO dimensions (dim_name, dim_description, level)
+                VALUES (%s, %s, %s);
+        """
+        return await self.pool.async_save(query=query, params=dimensions, batch=True)
+
+    async def update_dimension_status(self, dimension_id, ori_status, new_status):
+        """
+        更新维度状态
+
+        Args:
+            dimension_id: 维度ID
+            ori_status: 原始状态(用于条件检查)
+            new_status: 新状态
+
+        Returns:
+            awaitable: 异步数据库操作结果
+        """
+        query = """
+            UPDATE dimensions SET status = %s WHERE id = %s AND status = %s;
+        """
+        return await self.pool.async_save(
+            query=query, params=(new_status, dimension_id, ori_status)
+        )
+
+    async def insert_modes(self, modes):
+        """
+        批量插入模式数据到数据库
+
+        Args:
+            modes: 模式数据列表,每个元素包含(mode_id, name, mode_percentage, description, detail, output_id, dim_name)
+                   - mode_id: 模式ID
+                   - name: 模式名称
+                   - mode_percentage: 模式占比
+                   - description: 模式描述
+                   - detail: 模式详情
+                   - output_id: 关联的产物ID
+                   - dim_name: 所属维度名称
+
+        Returns:
+            awaitable: 异步数据库操作结果
+        """
+        query = """
+            INSERT IGNORE INTO modes (mode_id, name, mode_percentage, description, detail, output_id, dim_name)
+                VALUES (%s, %s, %s, %s, %s, %s, %s);
+        """
+        return await self.pool.async_save(query=query, params=modes, batch=True)
+
+    async def insert_outputs(self, outputs):
+        """
+        批量插入产物数据到数据库
+
+        Args:
+            outputs: 产物数据列表,每个元素包含(output_id, output_type, description, content, constrains)
+                     - output_id: 产物ID
+                     - output_type: 产物类型
+                     - description: 产物描述
+                     - content: 产物内容
+                     - constrains: 产物约束条件
+
+        Returns:
+            awaitable: 异步数据库操作结果
+        """
+        query = """
+            INSERT IGNORE INTO outputs (output_id, output_type, description, content, constrains)
+                VALUES (%s, %s, %s, %s, %s);
+        """
+        return await self.pool.async_save(query=query, params=outputs, batch=True)

+ 12 - 0
routes/blueprint.py

@@ -12,6 +12,7 @@ from applications.api import get_basic_embedding
 from applications.api import get_img_embedding
 from applications.async_task import AutoRechunkTask, BuildGraph
 from applications.async_task import ChunkEmbeddingTask, DeleteTask, ChunkBooksTask
+from applications.async_task import RecordPattern
 from applications.config import (
     DEFAULT_MODEL,
     LOCAL_MODEL_CONFIG,
@@ -27,6 +28,7 @@ from applications.utils.task.async_task import (
     handle_books,
     process_question,
     query_search,
+
 )
 
 server_bp = Blueprint("api", __name__, url_prefix="/api")
@@ -578,3 +580,13 @@ async def process_book():
     asyncio.create_task(handle_books())
     # 返回立即响应
     return jsonify({"status": "success", "message": "任务已提交后台处理"}), 200
+
+
+@server_bp.route("/record/pattern", methods=["POST"])
+async def record_pattern():
+    body = await request.get_json()
+    pattern = body.get("pattern", {})
+    resource = get_resource_manager()
+    record_pattern_task = RecordPattern(resource)
+    await record_pattern_task.deal(pattern)
+    return jsonify({"status_code": 200, "detail": "success", "data": {}})