123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125 |
- from typing import List, Dict
- from applications.utils.mysql import ContentChunks, Contents, Dataset
- from applications.utils.async_utils import run_tasks_with_asyncio_task_group
- from applications.utils.milvus import async_delete_chunk
- class DeleteTask:
- def __init__(self, resource):
- self.mysql_client = resource.mysql_client
- self.milvus_client = resource.milvus_client
- self.es_client = resource.es_client
- self.content_manager = None
- self.chunk_manager = None
- self.dataset_manager = None
- async def search_id_from_es(self, filters: Dict) -> List:
- must_clauses = []
- for field, value in filters.items():
- must_clauses.append({"term": {field: value}})
- query = {
- "query": {"bool": {"must": must_clauses}},
- "_source": False,
- }
- try:
- resp = await self.es_client.async_search(query=query)
- return [hit["_id"] for hit in resp["hits"]["hits"]]
- except Exception as e:
- print(f"search failed: {e}")
- return []
- async def delete_ids_from_es(self, ids: List):
- return await run_tasks_with_asyncio_task_group(
- task_list=ids,
- handler=self.es_client.async_delete,
- description="delete IDs From ElasticSearch",
- unit="chunk",
- max_concurrency=10
- )
- async def delete_by_query(self, filters: Dict):
- must_clauses = []
- for field, value in filters.items():
- must_clauses.append({"term": {field: value}})
- query = {
- "query": {
- "bool": {"must": must_clauses}
- }
- }
- await self.es_client.async_delete_by_query(query=query)
- async def delete_ids_from_milvus(self, ids: List):
- return await async_delete_chunk(self.milvus_client, ids)
- async def delete_by_filters(self, filters: Dict):
- # step2, 查出 es 中的 id,并且删除
- ids = await self.search_id_from_es(filters)
- if not ids:
- return
- # step3, delete from milvus
- await self.delete_ids_from_milvus(ids)
- # step4, delete from es by query
- await self.delete_by_query(filters)
- async def delete_chunk(self, params):
- doc_id = params["doc_id"]
- chunk_id = params["chunk_id"]
- try:
- self.chunk_manager = ContentChunks(self.mysql_client)
- await self.chunk_manager.update_doc_chunk_status(doc_id, chunk_id, 1, 0)
- await self.delete_by_filters({"doc_id": doc_id, "chunk_id": chunk_id})
- return {"doc_id": doc_id, "chunk_id": chunk_id, "status": "success"}
- except Exception as e:
- print(f"delete chunk failed: {e}")
- return {"doc_id": doc_id, "chunk_id": chunk_id, "status": "failed"}
- async def delete_doc(self, params):
- doc_id = params["doc_id"]
- try:
- self.chunk_manager = ContentChunks(self.mysql_client)
- self.content_manager = Contents(self.mysql_client)
- await self.chunk_manager.update_doc_status(doc_id, 1, 0)
- await self.content_manager.update_doc_status(doc_id, 1, 0)
- await self.delete_by_filters({"doc_id": doc_id})
- return {"doc_id": doc_id, "status": "success"}
- except Exception as e:
- print(f"delete chunk failed: {e}")
- return {"doc_id": doc_id, "status": "failed"}
- async def delete_dataset(self, params):
- dataset_id = params["dataset_id"]
- try:
- self.chunk_manager = ContentChunks(self.mysql_client)
- self.content_manager = Contents(self.mysql_client)
- self.dataset_manager = Dataset(self.mysql_client)
- await self.chunk_manager.update_dataset_status(dataset_id, 1, 0)
- await self.content_manager.update_dataset_status(dataset_id, 1, 0)
- await self.dataset_manager.update_dataset_status(dataset_id, 1, 0)
- await self.delete_by_filters({"dataset_id": dataset_id})
- return {"dataset_id": dataset_id, "status": "success"}
- except Exception as e:
- print(f"delete dataset failed: {e}")
- return {"dataset_id": dataset_id, "status": "failed"}
- async def deal(self, level, params):
- """
- :param level: 删除级别
- :param params: 删除 id 信息
- :return:
- """
- match level:
- case "dataset":
- return await self.delete_dataset(params)
- case "doc":
- return await self.delete_doc(params)
- case "chunk":
- return await self.delete_chunk(params)
- case _:
- return {"error": "error level"}
|