|
@@ -0,0 +1,125 @@
|
|
|
+from typing import List, Dict
|
|
|
+
|
|
|
+from applications.utils.mysql import ContentChunks, Contents, Dataset
|
|
|
+from applications.utils.async_utils import run_tasks_with_asyncio_task_group
|
|
|
+from applications.utils.milvus import async_delete_chunk
|
|
|
+
|
|
|
+
|
|
|
+class DeleteTask:
|
|
|
+ def __init__(self, resource):
|
|
|
+ self.mysql_client = resource.mysql_client
|
|
|
+ self.milvus_client = resource.milvus_client
|
|
|
+ self.es_client = resource.es_client
|
|
|
+ self.content_manager = None
|
|
|
+ self.chunk_manager = None
|
|
|
+ self.dataset_manager = None
|
|
|
+
|
|
|
+ async def search_id_from_es(self, filters: Dict) -> List:
|
|
|
+ must_clauses = []
|
|
|
+ for field, value in filters.items():
|
|
|
+ must_clauses.append({"term": {field: value}})
|
|
|
+ query = {
|
|
|
+ "query": {"bool": {"must": must_clauses}},
|
|
|
+ "_source": False,
|
|
|
+ }
|
|
|
+ try:
|
|
|
+ resp = await self.es_client.async_search(query=query)
|
|
|
+ return [hit["_id"] for hit in resp["hits"]["hits"]]
|
|
|
+ except Exception as e:
|
|
|
+ print(f"search failed: {e}")
|
|
|
+ return []
|
|
|
+
|
|
|
+ async def delete_ids_from_es(self, ids: List):
|
|
|
+ return await run_tasks_with_asyncio_task_group(
|
|
|
+ task_list=ids,
|
|
|
+ handler=self.es_client.async_delete,
|
|
|
+ description="delete IDs From ElasticSearch",
|
|
|
+ unit="chunk",
|
|
|
+ max_concurrency=10
|
|
|
+ )
|
|
|
+
|
|
|
+ async def delete_by_query(self, filters: Dict):
|
|
|
+ must_clauses = []
|
|
|
+ for field, value in filters.items():
|
|
|
+ must_clauses.append({"term": {field: value}})
|
|
|
+ query = {
|
|
|
+ "query": {
|
|
|
+ "bool": {"must": must_clauses}
|
|
|
+ }
|
|
|
+ }
|
|
|
+ await self.es_client.async_delete_by_query(query=query)
|
|
|
+
|
|
|
+ async def delete_ids_from_milvus(self, ids: List):
|
|
|
+ return await async_delete_chunk(self.milvus_client, ids)
|
|
|
+
|
|
|
+ async def delete_by_filters(self, filters: Dict):
|
|
|
+ # step2, 查出 es 中的 id,并且删除
|
|
|
+ ids = await self.search_id_from_es(filters)
|
|
|
+ if not ids:
|
|
|
+ return
|
|
|
+
|
|
|
+ # step3, delete from milvus
|
|
|
+ await self.delete_ids_from_milvus(ids)
|
|
|
+
|
|
|
+ # step4, delete from es by query
|
|
|
+ await self.delete_by_query(filters)
|
|
|
+
|
|
|
+
|
|
|
+ async def delete_chunk(self, params):
|
|
|
+ doc_id = params["doc_id"]
|
|
|
+ chunk_id = params["chunk_id"]
|
|
|
+ try:
|
|
|
+ self.chunk_manager = ContentChunks(self.mysql_client)
|
|
|
+ await self.chunk_manager.update_doc_chunk_status(doc_id, chunk_id, 1, 0)
|
|
|
+ await self.delete_by_filters({"doc_id": doc_id, "chunk_id": chunk_id})
|
|
|
+ return {"doc_id": doc_id, "chunk_id": chunk_id, "status": "success"}
|
|
|
+ except Exception as e:
|
|
|
+ print(f"delete chunk failed: {e}")
|
|
|
+ return {"doc_id": doc_id, "chunk_id": chunk_id, "status": "failed"}
|
|
|
+
|
|
|
+ async def delete_doc(self, params):
|
|
|
+ doc_id = params["doc_id"]
|
|
|
+ try:
|
|
|
+ self.chunk_manager = ContentChunks(self.mysql_client)
|
|
|
+ self.content_manager = Contents(self.mysql_client)
|
|
|
+ await self.chunk_manager.update_doc_status(doc_id, 1, 0)
|
|
|
+ await self.content_manager.update_doc_status(doc_id, 1, 0)
|
|
|
+
|
|
|
+ await self.delete_by_filters({"doc_id": doc_id})
|
|
|
+ return {"doc_id": doc_id, "status": "success"}
|
|
|
+ except Exception as e:
|
|
|
+ print(f"delete chunk failed: {e}")
|
|
|
+ return {"doc_id": doc_id, "status": "failed"}
|
|
|
+
|
|
|
+ async def delete_dataset(self, params):
|
|
|
+ dataset_id = params["dataset_id"]
|
|
|
+ try:
|
|
|
+ self.chunk_manager = ContentChunks(self.mysql_client)
|
|
|
+ self.content_manager = Contents(self.mysql_client)
|
|
|
+ self.dataset_manager = Dataset(self.mysql_client)
|
|
|
+ await self.chunk_manager.update_dataset_status(dataset_id, 1, 0)
|
|
|
+ await self.content_manager.update_dataset_status(dataset_id, 1, 0)
|
|
|
+ await self.dataset_manager.update_dataset_status(dataset_id, 1, 0)
|
|
|
+
|
|
|
+ await self.delete_by_filters({"dataset_id": dataset_id})
|
|
|
+ return {"dataset_id": dataset_id, "status": "success"}
|
|
|
+ except Exception as e:
|
|
|
+ print(f"delete dataset failed: {e}")
|
|
|
+ return {"dataset_id": dataset_id, "status": "failed"}
|
|
|
+
|
|
|
+
|
|
|
+ async def deal(self, level, params):
|
|
|
+ """
|
|
|
+ :param level: 删除级别
|
|
|
+ :param params: 删除 id 信息
|
|
|
+ :return:
|
|
|
+ """
|
|
|
+ match level:
|
|
|
+ case "dataset":
|
|
|
+ return await self.delete_dataset(params)
|
|
|
+ case "doc":
|
|
|
+ return await self.delete_doc(params)
|
|
|
+ case "chunk":
|
|
|
+ return await self.delete_chunk(params)
|
|
|
+ case _:
|
|
|
+ return {"error": "error level"}
|