delete_task.py 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119
  1. from typing import List, Dict
  2. from applications.utils.mysql import ContentChunks, Contents, Dataset
  3. from applications.utils.async_utils import run_tasks_with_asyncio_task_group
  4. from applications.utils.milvus import async_delete_chunk
  5. class DeleteTask:
  6. def __init__(self, resource):
  7. self.mysql_client = resource.mysql_client
  8. self.milvus_client = resource.milvus_client
  9. self.es_client = resource.es_client
  10. self.content_manager = None
  11. self.chunk_manager = None
  12. self.dataset_manager = None
  13. async def search_id_from_es(self, filters: Dict) -> List:
  14. must_clauses = []
  15. for field, value in filters.items():
  16. must_clauses.append({"term": {field: value}})
  17. query = {
  18. "query": {"bool": {"must": must_clauses}},
  19. "_source": False,
  20. }
  21. try:
  22. resp = await self.es_client.async_search(query=query)
  23. return [int(hit["_id"]) for hit in resp["hits"]["hits"]]
  24. except Exception as e:
  25. print(f"search failed: {e}")
  26. return []
  27. async def delete_ids_from_es(self, ids: List):
  28. return await run_tasks_with_asyncio_task_group(
  29. task_list=ids,
  30. handler=self.es_client.async_delete,
  31. description="delete IDs From ElasticSearch",
  32. unit="chunk",
  33. max_concurrency=10,
  34. )
  35. async def delete_by_query(self, filters: Dict):
  36. must_clauses = []
  37. for field, value in filters.items():
  38. must_clauses.append({"term": {field: value}})
  39. query = {"query": {"bool": {"must": must_clauses}}}
  40. await self.es_client.async_delete_by_query(query=query)
  41. async def delete_ids_from_milvus(self, ids: List):
  42. return await async_delete_chunk(self.milvus_client, ids)
  43. async def delete_by_filters(self, filters: Dict):
  44. # step2, 查出 es 中的 id,并且删除
  45. ids = await self.search_id_from_es(filters)
  46. if not ids:
  47. return
  48. # step3, delete from milvus
  49. await self.delete_ids_from_milvus(ids)
  50. # step4, delete from es by query
  51. await self.delete_by_query(filters)
  52. async def delete_chunk(self, params):
  53. doc_id = params["doc_id"]
  54. chunk_id = params["chunk_id"]
  55. try:
  56. self.chunk_manager = ContentChunks(self.mysql_client)
  57. await self.chunk_manager.update_doc_chunk_status(doc_id, chunk_id, 1, 0)
  58. await self.delete_by_filters({"doc_id": doc_id, "chunk_id": chunk_id})
  59. return {"doc_id": doc_id, "chunk_id": chunk_id, "status": "success"}
  60. except Exception as e:
  61. print(f"delete chunk failed: {e}")
  62. return {"doc_id": doc_id, "chunk_id": chunk_id, "status": "failed"}
  63. async def delete_doc(self, params):
  64. doc_id = params["doc_id"]
  65. try:
  66. self.chunk_manager = ContentChunks(self.mysql_client)
  67. self.content_manager = Contents(self.mysql_client)
  68. await self.chunk_manager.update_doc_status(doc_id, 1, 0)
  69. await self.content_manager.update_doc_status(doc_id, 1, 0)
  70. await self.delete_by_filters({"doc_id": doc_id})
  71. return {"doc_id": doc_id, "status": "success"}
  72. except Exception as e:
  73. print(f"delete chunk failed: {e}")
  74. return {"doc_id": doc_id, "status": "failed"}
  75. async def delete_dataset(self, params):
  76. dataset_id = params["dataset_id"]
  77. try:
  78. self.chunk_manager = ContentChunks(self.mysql_client)
  79. self.content_manager = Contents(self.mysql_client)
  80. self.dataset_manager = Dataset(self.mysql_client)
  81. await self.chunk_manager.update_dataset_status(dataset_id, 1, 0)
  82. await self.content_manager.update_dataset_status(dataset_id, 1, 0)
  83. await self.dataset_manager.update_dataset_status(dataset_id, 1, 0)
  84. await self.delete_by_filters({"dataset_id": dataset_id})
  85. return {"dataset_id": dataset_id, "status": "success"}
  86. except Exception as e:
  87. print(f"delete dataset failed: {e}")
  88. return {"dataset_id": dataset_id, "status": "failed"}
  89. async def deal(self, level, params):
  90. """
  91. :param level: 删除级别
  92. :param params: 删除 id 信息
  93. :return:
  94. """
  95. match level:
  96. case "dataset":
  97. return await self.delete_dataset(params)
  98. case "doc":
  99. return await self.delete_doc(params)
  100. case "chunk":
  101. return await self.delete_chunk(params)
  102. case _:
  103. return {"error": "error level"}