delete_task.py 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134
  1. from typing import List, Dict
  2. from applications.utils.mysql import ContentChunks, Contents, Dataset
  3. from applications.utils.async_utils import run_tasks_with_asyncio_task_group
  4. from applications.utils.milvus import async_delete_chunk
  5. class DeleteTask:
  6. USEFUL_STATUS = 1
  7. DELETE_STATUS = 0
  8. def __init__(self, resource):
  9. self.mysql_client = resource.mysql_client
  10. self.milvus_client = resource.milvus_client
  11. self.es_client = resource.es_client
  12. self.content_manager = None
  13. self.chunk_manager = None
  14. self.dataset_manager = None
  15. async def search_id_from_es(self, filters: Dict) -> List:
  16. must_clauses = []
  17. for field, value in filters.items():
  18. must_clauses.append({"term": {field: value}})
  19. query = {
  20. "query": {"bool": {"must": must_clauses}},
  21. "_source": False,
  22. }
  23. try:
  24. resp = await self.es_client.async_search(query=query)
  25. return [int(hit["_id"]) for hit in resp["hits"]["hits"]]
  26. except Exception as e:
  27. print(f"search failed: {e}")
  28. return []
  29. async def delete_ids_from_es(self, ids: List):
  30. return await run_tasks_with_asyncio_task_group(
  31. task_list=ids,
  32. handler=self.es_client.async_delete,
  33. description="delete IDs From ElasticSearch",
  34. unit="chunk",
  35. max_concurrency=10,
  36. )
  37. async def delete_by_query(self, filters: Dict):
  38. must_clauses = []
  39. for field, value in filters.items():
  40. must_clauses.append({"term": {field: value}})
  41. query = {"query": {"bool": {"must": must_clauses}}}
  42. await self.es_client.async_delete_by_query(query=query)
  43. async def delete_ids_from_milvus(self, ids: List):
  44. return await async_delete_chunk(self.milvus_client, ids)
  45. async def delete_by_filters(self, filters: Dict):
  46. # step2, 查出 es 中的 id,并且删除
  47. ids = await self.search_id_from_es(filters)
  48. if not ids:
  49. return
  50. # step3, delete from milvus
  51. await self.delete_ids_from_milvus(ids)
  52. # step4, delete from es by query
  53. await self.delete_by_query(filters)
  54. async def delete_chunk(self, params):
  55. doc_id = params["doc_id"]
  56. chunk_id = params["chunk_id"]
  57. try:
  58. self.chunk_manager = ContentChunks(self.mysql_client)
  59. await self.chunk_manager.update_doc_chunk_status(
  60. doc_id, chunk_id, self.USEFUL_STATUS, self.DELETE_STATUS
  61. )
  62. await self.delete_by_filters({"doc_id": doc_id, "chunk_id": chunk_id})
  63. return {"doc_id": doc_id, "chunk_id": chunk_id, "status": "success"}
  64. except Exception as e:
  65. print(f"delete chunk failed: {e}")
  66. return {"doc_id": doc_id, "chunk_id": chunk_id, "status": "failed"}
  67. async def delete_doc(self, params):
  68. doc_id = params["doc_id"]
  69. try:
  70. self.chunk_manager = ContentChunks(self.mysql_client)
  71. self.content_manager = Contents(self.mysql_client)
  72. await self.chunk_manager.update_doc_status(
  73. doc_id, self.USEFUL_STATUS, self.DELETE_STATUS
  74. )
  75. await self.content_manager.update_doc_status(
  76. doc_id, self.USEFUL_STATUS, self.DELETE_STATUS
  77. )
  78. await self.delete_by_filters({"doc_id": doc_id})
  79. return {"doc_id": doc_id, "status": "success"}
  80. except Exception as e:
  81. print(f"delete chunk failed: {e}")
  82. return {"doc_id": doc_id, "status": "failed"}
  83. async def delete_dataset(self, params):
  84. dataset_id = params["dataset_id"]
  85. try:
  86. self.chunk_manager = ContentChunks(self.mysql_client)
  87. self.content_manager = Contents(self.mysql_client)
  88. self.dataset_manager = Dataset(self.mysql_client)
  89. await self.chunk_manager.update_dataset_status(
  90. dataset_id, self.USEFUL_STATUS, self.DELETE_STATUS
  91. )
  92. await self.content_manager.update_dataset_status(
  93. dataset_id, self.USEFUL_STATUS, self.DELETE_STATUS
  94. )
  95. await self.dataset_manager.update_dataset_status(
  96. dataset_id, self.USEFUL_STATUS, self.DELETE_STATUS
  97. )
  98. await self.delete_by_filters({"dataset_id": dataset_id})
  99. return {"dataset_id": dataset_id, "status": "success"}
  100. except Exception as e:
  101. print(f"delete dataset failed: {e}")
  102. return {"dataset_id": dataset_id, "status": "failed"}
  103. async def deal(self, level, params):
  104. """
  105. :param level: 删除级别
  106. :param params: 删除 id 信息
  107. :return:
  108. """
  109. match level:
  110. case "dataset":
  111. return await self.delete_dataset(params)
  112. case "doc":
  113. return await self.delete_doc(params)
  114. case "chunk":
  115. return await self.delete_chunk(params)
  116. case _:
  117. return {"error": "error level"}