from typing import List, Dict from applications.utils.mysql import ContentChunks, Contents, Dataset from applications.utils.async_utils import run_tasks_with_asyncio_task_group from applications.utils.milvus import async_delete_chunk class DeleteTask: def __init__(self, resource): self.mysql_client = resource.mysql_client self.milvus_client = resource.milvus_client self.es_client = resource.es_client self.content_manager = None self.chunk_manager = None self.dataset_manager = None async def search_id_from_es(self, filters: Dict) -> List: must_clauses = [] for field, value in filters.items(): must_clauses.append({"term": {field: value}}) query = { "query": {"bool": {"must": must_clauses}}, "_source": False, } try: resp = await self.es_client.async_search(query=query) return [int(hit["_id"]) for hit in resp["hits"]["hits"]] except Exception as e: print(f"search failed: {e}") return [] async def delete_ids_from_es(self, ids: List): return await run_tasks_with_asyncio_task_group( task_list=ids, handler=self.es_client.async_delete, description="delete IDs From ElasticSearch", unit="chunk", max_concurrency=10, ) async def delete_by_query(self, filters: Dict): must_clauses = [] for field, value in filters.items(): must_clauses.append({"term": {field: value}}) query = {"query": {"bool": {"must": must_clauses}}} await self.es_client.async_delete_by_query(query=query) async def delete_ids_from_milvus(self, ids: List): return await async_delete_chunk(self.milvus_client, ids) async def delete_by_filters(self, filters: Dict): # step2, 查出 es 中的 id,并且删除 ids = await self.search_id_from_es(filters) if not ids: return # step3, delete from milvus await self.delete_ids_from_milvus(ids) # step4, delete from es by query await self.delete_by_query(filters) async def delete_chunk(self, params): doc_id = params["doc_id"] chunk_id = params["chunk_id"] try: self.chunk_manager = ContentChunks(self.mysql_client) await self.chunk_manager.update_doc_chunk_status(doc_id, chunk_id, 1, 0) await self.delete_by_filters({"doc_id": doc_id, "chunk_id": chunk_id}) return {"doc_id": doc_id, "chunk_id": chunk_id, "status": "success"} except Exception as e: print(f"delete chunk failed: {e}") return {"doc_id": doc_id, "chunk_id": chunk_id, "status": "failed"} async def delete_doc(self, params): doc_id = params["doc_id"] try: self.chunk_manager = ContentChunks(self.mysql_client) self.content_manager = Contents(self.mysql_client) await self.chunk_manager.update_doc_status(doc_id, 1, 0) await self.content_manager.update_doc_status(doc_id, 1, 0) await self.delete_by_filters({"doc_id": doc_id}) return {"doc_id": doc_id, "status": "success"} except Exception as e: print(f"delete chunk failed: {e}") return {"doc_id": doc_id, "status": "failed"} async def delete_dataset(self, params): dataset_id = params["dataset_id"] try: self.chunk_manager = ContentChunks(self.mysql_client) self.content_manager = Contents(self.mysql_client) self.dataset_manager = Dataset(self.mysql_client) await self.chunk_manager.update_dataset_status(dataset_id, 1, 0) await self.content_manager.update_dataset_status(dataset_id, 1, 0) await self.dataset_manager.update_dataset_status(dataset_id, 1, 0) await self.delete_by_filters({"dataset_id": dataset_id}) return {"dataset_id": dataset_id, "status": "success"} except Exception as e: print(f"delete dataset failed: {e}") return {"dataset_id": dataset_id, "status": "failed"} async def deal(self, level, params): """ :param level: 删除级别 :param params: 删除 id 信息 :return: """ match level: case "dataset": return await self.delete_dataset(params) case "doc": return await self.delete_doc(params) case "chunk": return await self.delete_chunk(params) case _: return {"error": "error level"}