|
@@ -35,18 +35,14 @@ class DeleteTask:
|
|
|
handler=self.es_client.async_delete,
|
|
|
description="delete IDs From ElasticSearch",
|
|
|
unit="chunk",
|
|
|
- max_concurrency=10
|
|
|
+ max_concurrency=10,
|
|
|
)
|
|
|
|
|
|
async def delete_by_query(self, filters: Dict):
|
|
|
must_clauses = []
|
|
|
for field, value in filters.items():
|
|
|
must_clauses.append({"term": {field: value}})
|
|
|
- query = {
|
|
|
- "query": {
|
|
|
- "bool": {"must": must_clauses}
|
|
|
- }
|
|
|
- }
|
|
|
+ query = {"query": {"bool": {"must": must_clauses}}}
|
|
|
await self.es_client.async_delete_by_query(query=query)
|
|
|
|
|
|
async def delete_ids_from_milvus(self, ids: List):
|
|
@@ -64,7 +60,6 @@ class DeleteTask:
|
|
|
# step4, delete from es by query
|
|
|
await self.delete_by_query(filters)
|
|
|
|
|
|
-
|
|
|
async def delete_chunk(self, params):
|
|
|
doc_id = params["doc_id"]
|
|
|
chunk_id = params["chunk_id"]
|
|
@@ -107,7 +102,6 @@ class DeleteTask:
|
|
|
print(f"delete dataset failed: {e}")
|
|
|
return {"dataset_id": dataset_id, "status": "failed"}
|
|
|
|
|
|
-
|
|
|
async def deal(self, level, params):
|
|
|
"""
|
|
|
:param level: 删除级别
|
|
@@ -122,4 +116,4 @@ class DeleteTask:
|
|
|
case "chunk":
|
|
|
return await self.delete_chunk(params)
|
|
|
case _:
|
|
|
- return {"error": "error level"}
|
|
|
+ return {"error": "error level"}
|