Explorar el Código

Merge branch 'feature/luojunhui/2025-09-19-delete_knowledge' of Server/llm_vector_server into master

luojunhui hace 2 semanas
padre
commit
e26e49ee67

+ 2 - 1
applications/async_task/__init__.py

@@ -1,4 +1,5 @@
 from .chunk_task import ChunkEmbeddingTask
+from .delete_task import DeleteTask
 
 
-__all__ = ["ChunkEmbeddingTask"]
+__all__ = ["ChunkEmbeddingTask", "DeleteTask"]

+ 119 - 0
applications/async_task/delete_task.py

@@ -0,0 +1,119 @@
+from typing import List, Dict
+
+from applications.utils.mysql import ContentChunks, Contents, Dataset
+from applications.utils.async_utils import run_tasks_with_asyncio_task_group
+from applications.utils.milvus import async_delete_chunk
+
+
+class DeleteTask:
+    def __init__(self, resource):
+        self.mysql_client = resource.mysql_client
+        self.milvus_client = resource.milvus_client
+        self.es_client = resource.es_client
+        self.content_manager = None
+        self.chunk_manager = None
+        self.dataset_manager = None
+
+    async def search_id_from_es(self, filters: Dict) -> List:
+        must_clauses = []
+        for field, value in filters.items():
+            must_clauses.append({"term": {field: value}})
+        query = {
+            "query": {"bool": {"must": must_clauses}},
+            "_source": False,
+        }
+        try:
+            resp = await self.es_client.async_search(query=query)
+            return [int(hit["_id"]) for hit in resp["hits"]["hits"]]
+        except Exception as e:
+            print(f"search failed: {e}")
+            return []
+
+    async def delete_ids_from_es(self, ids: List):
+        return await run_tasks_with_asyncio_task_group(
+            task_list=ids,
+            handler=self.es_client.async_delete,
+            description="delete IDs From ElasticSearch",
+            unit="chunk",
+            max_concurrency=10,
+        )
+
+    async def delete_by_query(self, filters: Dict):
+        must_clauses = []
+        for field, value in filters.items():
+            must_clauses.append({"term": {field: value}})
+        query = {"query": {"bool": {"must": must_clauses}}}
+        await self.es_client.async_delete_by_query(query=query)
+
+    async def delete_ids_from_milvus(self, ids: List):
+        return await async_delete_chunk(self.milvus_client, ids)
+
+    async def delete_by_filters(self, filters: Dict):
+        # step2, 查出 es 中的 id,并且删除
+        ids = await self.search_id_from_es(filters)
+        if not ids:
+            return
+
+        # step3, delete from milvus
+        await self.delete_ids_from_milvus(ids)
+
+        # step4, delete from es by query
+        await self.delete_by_query(filters)
+
+    async def delete_chunk(self, params):
+        doc_id = params["doc_id"]
+        chunk_id = params["chunk_id"]
+        try:
+            self.chunk_manager = ContentChunks(self.mysql_client)
+            await self.chunk_manager.update_doc_chunk_status(doc_id, chunk_id, 1, 0)
+            await self.delete_by_filters({"doc_id": doc_id, "chunk_id": chunk_id})
+            return {"doc_id": doc_id, "chunk_id": chunk_id, "status": "success"}
+        except Exception as e:
+            print(f"delete chunk failed: {e}")
+            return {"doc_id": doc_id, "chunk_id": chunk_id, "status": "failed"}
+
+    async def delete_doc(self, params):
+        doc_id = params["doc_id"]
+        try:
+            self.chunk_manager = ContentChunks(self.mysql_client)
+            self.content_manager = Contents(self.mysql_client)
+            await self.chunk_manager.update_doc_status(doc_id, 1, 0)
+            await self.content_manager.update_doc_status(doc_id, 1, 0)
+
+            await self.delete_by_filters({"doc_id": doc_id})
+            return {"doc_id": doc_id, "status": "success"}
+        except Exception as e:
+            print(f"delete chunk failed: {e}")
+            return {"doc_id": doc_id, "status": "failed"}
+
+    async def delete_dataset(self, params):
+        dataset_id = params["dataset_id"]
+        try:
+            self.chunk_manager = ContentChunks(self.mysql_client)
+            self.content_manager = Contents(self.mysql_client)
+            self.dataset_manager = Dataset(self.mysql_client)
+            await self.chunk_manager.update_dataset_status(dataset_id, 1, 0)
+            await self.content_manager.update_dataset_status(dataset_id, 1, 0)
+            await self.dataset_manager.update_dataset_status(dataset_id, 1, 0)
+
+            await self.delete_by_filters({"dataset_id": dataset_id})
+            return {"dataset_id": dataset_id, "status": "success"}
+        except Exception as e:
+            print(f"delete dataset failed: {e}")
+            return {"dataset_id": dataset_id, "status": "failed"}
+
+    async def deal(self, level, params):
+        """
+        :param level: 删除级别
+        :param params: 删除 id 信息
+        :return:
+        """
+        match level:
+            case "dataset":
+                return await self.delete_dataset(params)
+            case "doc":
+                return await self.delete_doc(params)
+            case "chunk":
+                return await self.delete_chunk(params)
+            case _:
+                return {"error": "error level"}

+ 0 - 1
applications/utils/chunks/kg_classifier.py

@@ -9,7 +9,6 @@ from applications.api import get_basic_embedding
 
 
 class KGClassifier:
-
     def __init__(self, kg_spec: Dict[str, Any]):
         self.root = kg_spec["root"]
         self._embed_cache: Dict[str, np.ndarray] = {}

+ 0 - 1
applications/utils/chunks/topic_aware_chunking.py

@@ -88,7 +88,6 @@ class BoundaryDetector:
 
 
 class TopicAwareChunker(BoundaryDetector, SplitTextIntoSentences):
-
     INIT_STATUS = 0
     PROCESSING_STATUS = 1
     FINISHED_STATUS = 2

+ 8 - 28
applications/utils/elastic_search/client.py

@@ -1,11 +1,8 @@
 from elasticsearch import AsyncElasticsearch
 from elasticsearch.helpers import async_bulk
 
-from applications.utils.async_utils import run_tasks_with_asyncio_task_group
-
 
 class AsyncElasticSearchClient:
-
     def __init__(self, index_name, hosts, password):
         self.es = AsyncElasticsearch(hosts=hosts, basic_auth=("elastic", password))
         self.index_name = index_name
@@ -29,39 +26,22 @@ class AsyncElasticSearchClient:
         except Exception as e:
             print("fail to create index, reason:", e)
 
-    async def search(self, query):
+    async def async_search(self, query):
         resp = await self.es.search(index=self.index_name, body=query)
         return resp
 
-    async def update(self, obj):
+    async def async_update(self, obj):
         return await self.es.update(
             index=self.index_name, id=obj["es_id"], body=obj["doc"]
         )
 
-    async def update_by_filed(self, field_name: str, field_value: str, doc: dict):
-        try:
-            # 先查出 doc_id
-            query = {"query": {"term": {field_name: field_value}}}
-            resp = await self.es.search(index=self.index_name, body=query)
-            if not resp["hits"]["hits"]:
-                print(f"No document found with {field_name}={field_value}")
-                return None
-
-            task_list = [
-                {"es_id": hit["_id"], "doc": doc} for hit in resp["hits"]["hits"]
-            ]
+    async def async_delete(self, es_id):
+        await self.es.delete(index=self.index_name, id=es_id)
 
-            # update by ids
-            return await run_tasks_with_asyncio_task_group(
-                task_list=task_list,
-                handler=self.es.update,
-                description="update by filed",
-                unit="document",
-                max_concurrency=10,
-            )
-        except Exception as e:
-            print(f"fail to update by {field_name}={field_value}, reason:", e)
-            return None
+    async def async_delete_by_query(self, query):
+        await self.es.delete_by_query(
+            index=self.index_name, body=query, conflicts="proceed", refresh=True
+        )
 
     async def bulk_insert(self, docs):
         success, errors = await async_bulk(self.es, docs, request_timeout=10)

+ 2 - 2
applications/utils/elastic_search/search_strategy.py

@@ -3,7 +3,7 @@ from typing import List, Dict
 
 class ElasticSearchStrategy:
     def __init__(self, es):
-        self.es = es
+        self.es_client = es
 
     async def base_search(
         self,
@@ -26,7 +26,7 @@ class ElasticSearchStrategy:
             "_source": _source,
         }
         try:
-            resp = await self.es.search(query=query)
+            resp = await self.es_client.async_search(query=query)
             return [
                 hit["_source"] if _source else hit["_id"]
                 for hit in resp["hits"]["hits"]

+ 2 - 2
applications/utils/milvus/__init__.py

@@ -1,9 +1,9 @@
-from .functions import async_insert_chunk, async_search_chunk
+from .functions import async_insert_chunk, async_delete_chunk
 from .search import MilvusSearch
 
 
 __all__ = [
     "async_insert_chunk",
-    "async_search_chunk",
+    "async_delete_chunk",
     "MilvusSearch",
 ]

+ 0 - 26
applications/utils/milvus/collection.py

@@ -1,26 +0,0 @@
-# from pymilvus import connections, CollectionSchema, Collection
-# from applications.utils.milvus.field import fields
-# from applications.config import MILVUS_CONFIG
-#
-#
-# connections.connect("default", **MILVUS_CONFIG)
-#
-# schema = CollectionSchema(
-#     fields, description="Chunk multi-vector embeddings with metadata"
-# )
-# milvus_collection = Collection(name="chunk_multi_embeddings_v2", schema=schema)
-#
-# # create index
-# vector_index_params = {
-#     "index_type": "IVF_FLAT",
-#     "metric_type": "COSINE",
-#     "params": {"M": 16, "efConstruction": 200},
-# }
-#
-# milvus_collection.create_index("vector_text", vector_index_params)
-# milvus_collection.create_index("vector_summary", vector_index_params)
-# milvus_collection.create_index("vector_questions", vector_index_params)
-#
-# milvus_collection.load()
-#
-# __all__ = ["milvus_collection"]

+ 18 - 16
applications/utils/milvus/functions.py

@@ -14,21 +14,23 @@ async def async_insert_chunk(collection: pymilvus.Collection, data: Dict) -> Lis
     return result.primary_keys
 
 
-async def async_search_chunk(
-    collection: pymilvus.Collection, query_vector, params: Dict
-):
+async def async_delete_chunk(
+    collection: pymilvus.Collection, ids: List[int]
+) -> List[int]:
     """
-    :param query_vector: query 向量
-    :param collection:
-    :param params: search 参数
-    :return:
+    Delete entities by ids from a Milvus collection asynchronously.
+
+    :param collection: Milvus Collection object
+    :param ids: List of primary key ids to delete
+    :return: List of successfully deleted ids
     """
-    expr = None
-    return await asyncio.to_thread(
-        collection.search,
-        data=[query_vector],
-        param={"metric_type": "COSINE", "params": {"nprobe": 10}},
-        limit=params["limit"],
-        anns_field="vector_text",
-        expr=expr,
-    )
+    if not ids:
+        return []
+    expr = f"id in {ids}"
+    result = await asyncio.to_thread(collection.delete, expr)
+    await asyncio.to_thread(collection.flush)
+    success_count = result.succ_count
+    if success_count == len(ids):
+        return ids
+    else:
+        return ids[:success_count]

+ 0 - 2
applications/utils/milvus/search.py

@@ -3,7 +3,6 @@ from typing import List, Optional, Dict, Any, Union
 
 
 class MilvusBase:
-
     output_fields = [
         "id",
         "doc_id",
@@ -33,7 +32,6 @@ class MilvusBase:
 
 
 class MilvusSearch(MilvusBase):
-
     # 通过向量粗搜索
     async def base_vector_search(
         self,

+ 2 - 4
applications/utils/mysql/__init__.py

@@ -1,7 +1,5 @@
 from .pool import DatabaseManager
-from .mapper import Contents, ContentChunks
+from .mapper import Contents, ContentChunks, Dataset
 
-# 全局数据库管理器实例
-# mysql_manager = DatabaseManager()
 
-__all__ = ["Contents", "ContentChunks", "DatabaseManager"]
+__all__ = ["Contents", "ContentChunks", "DatabaseManager", "Dataset"]

+ 64 - 3
applications/utils/mysql/mapper.py

@@ -10,13 +10,22 @@ class TaskConst:
 
 
 class BaseMySQLClient:
-
     def __init__(self, pool):
         self.pool = pool
 
 
-class Contents(BaseMySQLClient):
+class Dataset(BaseMySQLClient):
+    async def update_dataset_status(self, dataset_id, ori_status, new_status):
+        query = """
+            UPDATE dataset set status = %s where id = %s and status = %s;
+        """
+        return await self.pool.async_save(
+            query=query,
+            params=(new_status, dataset_id, ori_status),
+        )
+
 
+class Contents(BaseMySQLClient):
     async def insert_content(self, doc_id, text, text_type, title, dataset_id):
         query = """
             INSERT IGNORE INTO contents
@@ -37,9 +46,34 @@ class Contents(BaseMySQLClient):
             query=query, params=(new_status, doc_id, ori_status)
         )
 
+    async def update_dataset_status(self, dataset_id, ori_status, new_status):
+        query = """
+            UPDATE contents
+            SET status = %s
+            WHERE dataset_id = %s AND status = %s;
+        """
+        return await self.pool.async_save(
+            query=query, params=(new_status, dataset_id, ori_status)
+        )
+
+    async def update_doc_status(self, doc_id, ori_status, new_status):
+        """
+        this function is to change the using status of each document
+        :param doc_id:
+        :param ori_status:
+        :param new_status:
+        :return:
+        """
+        query = """
+            UPDATE contents
+            SET doc_status = %s WHERE doc_id = %s and doc_status = %s;
+        """
+        return await self.pool.async_save(
+            query=query, params=(new_status, doc_id, ori_status)
+        )
 
-class ContentChunks(BaseMySQLClient):
 
+class ContentChunks(BaseMySQLClient):
     async def insert_chunk(self, chunk: Chunk) -> int:
         query = """
             INSERT IGNORE INTO content_chunks
@@ -112,3 +146,30 @@ class ContentChunks(BaseMySQLClient):
         return await self.pool.async_save(
             query=query, params=(new_status, doc_id, chunk_id, ori_status)
         )
+
+    async def update_doc_chunk_status(self, doc_id, chunk_id, ori_status, new_status):
+        query = """
+            UPDATE content_chunks set status = %s 
+            WHERE doc_id = %s AND chunk_id = %s AND status = %s;
+        """
+        return await self.pool.async_save(
+            query=query, params=(new_status, doc_id, chunk_id, ori_status)
+        )
+
+    async def update_doc_status(self, doc_id, ori_status, new_status):
+        query = """
+            UPDATE content_chunks set status = %s 
+            WHERE doc_id = %s AND status = %s;
+        """
+        return await self.pool.async_save(
+            query=query, params=(new_status, doc_id, ori_status)
+        )
+
+    async def update_dataset_status(self, dataset_id, ori_status, new_status):
+        query = """
+            UPDATE content_chunks set status = %s 
+            WHERE dataset_id = %s AND status = %s;
+        """
+        return await self.pool.async_save(
+            query=query, params=(new_status, dataset_id, ori_status)
+        )

+ 0 - 1
applications/utils/nlp/split_text_into_sentences.py

@@ -6,7 +6,6 @@ from typing import List
 
 
 class SplitTextIntoSentences:
-
     @staticmethod
     def nltk_sent_tokenize(text: str) -> List[str]:
         """especially for English"""

+ 0 - 1
applications/utils/response/base_response.py

@@ -1,5 +1,4 @@
 class BaseResponse:
-
     @staticmethod
     def negative_response():
         pass

+ 1 - 0
requirements.txt

@@ -1,4 +1,5 @@
 aiodns==3.5.0
+requests
 aiomysql==0.2.0
 black==25.1.0
 bottleneck==1.4.2

+ 14 - 1
routes/buleprint.py

@@ -13,7 +13,7 @@ from applications.config import (
 from applications.resource import get_resource_manager
 from applications.api import get_basic_embedding
 from applications.api import get_img_embedding
-from applications.async_task import ChunkEmbeddingTask
+from applications.async_task import ChunkEmbeddingTask, DeleteTask
 from applications.search import HybridSearch
 
 
@@ -43,6 +43,19 @@ async def img_embed():
     return jsonify(embedding)
 
 
+@server_bp.route("/delete", methods=["POST"])
+async def delete():
+    body = await request.get_json()
+    level = body.get("level")
+    params = body.get("params")
+    if not level or not params:
+        return jsonify({"error": "error  level or params"})
+    resource = get_resource_manager()
+    delete_task = DeleteTask(resource)
+    response = await delete_task.deal(level, params)
+    return jsonify(response)
+
+
 @server_bp.route("/chunk", methods=["POST"])
 async def chunk():
     body = await request.get_json()