浏览代码

what解构存储

xueyiming 2 周之前
父节点
当前提交
a0b473800b

+ 8 - 0
applications/resource/resource_manager.py

@@ -2,6 +2,7 @@ from pymilvus import connections, CollectionSchema, Collection
 from neo4j import AsyncGraphDatabase, AsyncDriver
 
 from applications.config import NEO4j_CONFIG
+from applications.utils.milvus.what_field import what_fields
 from applications.utils.mysql import DatabaseManager
 from applications.utils.milvus.field import fields
 from applications.utils.elastic_search import AsyncElasticSearchClient
@@ -16,6 +17,7 @@ class ResourceManager:
 
         self.es_client: AsyncElasticSearchClient | None = None
         self.milvus_client: Collection | None = None
+        self.what_milvus_client: Collection | None = None
         self.mysql_client: DatabaseManager | None = None
         self.graph_client: AsyncDriver | None = None
 
@@ -38,6 +40,12 @@ class ResourceManager:
         self.milvus_client.create_index("vector_questions", vector_index_params)
         self.milvus_client.load()
 
+        what_schema = CollectionSchema(
+            what_fields, description="What multi-vector embeddings with metadata"
+        )
+        self.what_milvus_client = Collection(name="what_multi_embeddings", schema=what_schema)
+        self.what_milvus_client.create_index("vector_text", vector_index_params)
+
     async def startup(self):
         # 初始化 Elasticsearch
         self.es_client = AsyncElasticSearchClient(

+ 26 - 0
applications/utils/milvus/what_field.py

@@ -0,0 +1,26 @@
+from pymilvus import FieldSchema, DataType
+
+# milvus 向量数据库
+what_fields = [
+    # 主键 ID
+    FieldSchema(
+        name="id",
+        dtype=DataType.INT64,
+        is_primary=True,
+        auto_id=True,
+        description="自增id",
+    ),
+    # 文档 id 字段
+    FieldSchema(
+        name="doc_id", dtype=DataType.VARCHAR, max_length=64, description="文档id"
+    ),
+    FieldSchema(
+        name="vector_text",
+        dtype=DataType.FLOAT_VECTOR,
+        dim=2560,
+        description="",
+    )
+]
+
+
+__all__ = ["what_fields"]

+ 2 - 0
applications/utils/mysql/__init__.py

@@ -1,4 +1,5 @@
 from .books import Books
+from .deconstruction_what import DeconstructionWhat
 from .pool import DatabaseManager
 from .mapper import Dataset, ChatResult
 from .content_chunks import ContentChunks
@@ -12,4 +13,5 @@ __all__ = [
     "Dataset",
     "ChatResult",
     "Books",
+    "DeconstructionWhat",
 ]

+ 31 - 0
applications/utils/mysql/deconstruction_what.py

@@ -0,0 +1,31 @@
+from .base import BaseMySQLClient
+
+
+class DeconstructionWhat(BaseMySQLClient):
+    async def insert_deconstruction_what(self, content_id, doc_id, what_text, description):
+        query = """
+               INSERT INTO deconstruction_what (doc_id, content_id, what_text, description)
+                VALUES (%s, %s, %s, %s);
+           """
+        return await self.pool.async_save(
+            query=query, params=(doc_id, content_id, what_text, description)
+        )
+
+    async def update_deconstruction_what_embedding_status(self, doc_id):
+        query = """
+               UPDATE deconstruction_what
+               SET embedding_status = 1
+               WHERE doc_id = %s
+           """
+        return await self.pool.async_save(
+            query=query, params=(doc_id,)
+        )
+
+    async def insert_deconstruction_what_contents(self, content_id, content):
+        query = """
+                       INSERT INTO deconstruction_what_contents (content_id, content)
+                        VALUES (%s, %s);
+                   """
+        return await self.pool.async_save(
+            query=query, params=(content_id, content,)
+        )

+ 36 - 1
routes/blueprint.py

@@ -20,7 +20,8 @@ from applications.config import (
 from applications.resource import get_resource_manager
 from applications.search import HybridSearch
 from applications.utils.chat import RAGChatAgent
-from applications.utils.mysql import Dataset, Contents, ContentChunks, ChatResult, Books
+from applications.utils.milvus import async_insert_chunk
+from applications.utils.mysql import Dataset, Contents, ContentChunks, ChatResult, Books, DeconstructionWhat
 from applications.api.qwen import QwenClient
 from applications.utils.oss.oss_client import OSSClient
 from applications.utils.task.async_task import (
@@ -578,3 +579,37 @@ async def process_book():
     asyncio.create_task(handle_books())
     # 返回立即响应
     return jsonify({"status": "success", "message": "任务已提交后台处理"}), 200
+
+
+@server_bp.route("/what/add", methods=["POST"])
+async def test():
+    body = await request.get_json()
+    resource = get_resource_manager()
+    deconstruction_what_mapper = DeconstructionWhat(resource.mysql_client)
+    content_id = f"what-{uuid.uuid4()}"
+    await deconstruction_what_mapper.insert_deconstruction_what_contents(content_id,
+                                                                           json.dumps(body, ensure_ascii=False))
+    target = body.get("帖子包含元素")
+    res = []
+    for data in target:
+        what = data.get("what")
+        desc = data.get("描述")
+        res.append({"what": what, "desc": json.dumps(desc, ensure_ascii=False)})
+        children = data.get("子节点元素")
+        for child in children:
+            child_what = child.get("what")
+            child_desc = child.get("描述")
+            res.append({"what": child_what, "desc": json.dumps(child_desc, ensure_ascii=False)})
+    for data in res:
+        doc_id = f"doc-{uuid.uuid4()}"
+        deconstruction_what_mapper = DeconstructionWhat(resource.mysql_client)
+        await deconstruction_what_mapper.insert_deconstruction_what(content_id, doc_id, data.get("what"), data.get("desc"))
+        what_embedding = await get_basic_embedding(text=data.get("what"), model=DEFAULT_MODEL)
+        data = {
+            "doc_id": doc_id,
+            "vector_text": what_embedding
+        }
+        res = await async_insert_chunk(resource.what_milvus_client, data)
+        if res:
+            await deconstruction_what_mapper.update_deconstruction_what_embedding_status(doc_id)
+    return jsonify({"status_code": 200, "detail": "success"})