delete_task.py 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125
  1. from typing import List, Dict
  2. from applications.utils.mysql import ContentChunks, Contents, Dataset
  3. from applications.utils.async_utils import run_tasks_with_asyncio_task_group
  4. from applications.utils.milvus import async_delete_chunk
  5. class DeleteTask:
  6. def __init__(self, resource):
  7. self.mysql_client = resource.mysql_client
  8. self.milvus_client = resource.milvus_client
  9. self.es_client = resource.es_client
  10. self.content_manager = None
  11. self.chunk_manager = None
  12. self.dataset_manager = None
  13. async def search_id_from_es(self, filters: Dict) -> List:
  14. must_clauses = []
  15. for field, value in filters.items():
  16. must_clauses.append({"term": {field: value}})
  17. query = {
  18. "query": {"bool": {"must": must_clauses}},
  19. "_source": False,
  20. }
  21. try:
  22. resp = await self.es_client.async_search(query=query)
  23. return [int(hit["_id"]) for hit in resp["hits"]["hits"]]
  24. except Exception as e:
  25. print(f"search failed: {e}")
  26. return []
  27. async def delete_ids_from_es(self, ids: List):
  28. return await run_tasks_with_asyncio_task_group(
  29. task_list=ids,
  30. handler=self.es_client.async_delete,
  31. description="delete IDs From ElasticSearch",
  32. unit="chunk",
  33. max_concurrency=10
  34. )
  35. async def delete_by_query(self, filters: Dict):
  36. must_clauses = []
  37. for field, value in filters.items():
  38. must_clauses.append({"term": {field: value}})
  39. query = {
  40. "query": {
  41. "bool": {"must": must_clauses}
  42. }
  43. }
  44. await self.es_client.async_delete_by_query(query=query)
  45. async def delete_ids_from_milvus(self, ids: List):
  46. return await async_delete_chunk(self.milvus_client, ids)
  47. async def delete_by_filters(self, filters: Dict):
  48. # step2, 查出 es 中的 id,并且删除
  49. ids = await self.search_id_from_es(filters)
  50. if not ids:
  51. return
  52. # step3, delete from milvus
  53. await self.delete_ids_from_milvus(ids)
  54. # step4, delete from es by query
  55. await self.delete_by_query(filters)
  56. async def delete_chunk(self, params):
  57. doc_id = params["doc_id"]
  58. chunk_id = params["chunk_id"]
  59. try:
  60. self.chunk_manager = ContentChunks(self.mysql_client)
  61. await self.chunk_manager.update_doc_chunk_status(doc_id, chunk_id, 1, 0)
  62. await self.delete_by_filters({"doc_id": doc_id, "chunk_id": chunk_id})
  63. return {"doc_id": doc_id, "chunk_id": chunk_id, "status": "success"}
  64. except Exception as e:
  65. print(f"delete chunk failed: {e}")
  66. return {"doc_id": doc_id, "chunk_id": chunk_id, "status": "failed"}
  67. async def delete_doc(self, params):
  68. doc_id = params["doc_id"]
  69. try:
  70. self.chunk_manager = ContentChunks(self.mysql_client)
  71. self.content_manager = Contents(self.mysql_client)
  72. await self.chunk_manager.update_doc_status(doc_id, 1, 0)
  73. await self.content_manager.update_doc_status(doc_id, 1, 0)
  74. await self.delete_by_filters({"doc_id": doc_id})
  75. return {"doc_id": doc_id, "status": "success"}
  76. except Exception as e:
  77. print(f"delete chunk failed: {e}")
  78. return {"doc_id": doc_id, "status": "failed"}
  79. async def delete_dataset(self, params):
  80. dataset_id = params["dataset_id"]
  81. try:
  82. self.chunk_manager = ContentChunks(self.mysql_client)
  83. self.content_manager = Contents(self.mysql_client)
  84. self.dataset_manager = Dataset(self.mysql_client)
  85. await self.chunk_manager.update_dataset_status(dataset_id, 1, 0)
  86. await self.content_manager.update_dataset_status(dataset_id, 1, 0)
  87. await self.dataset_manager.update_dataset_status(dataset_id, 1, 0)
  88. await self.delete_by_filters({"dataset_id": dataset_id})
  89. return {"dataset_id": dataset_id, "status": "success"}
  90. except Exception as e:
  91. print(f"delete dataset failed: {e}")
  92. return {"dataset_id": dataset_id, "status": "failed"}
  93. async def deal(self, level, params):
  94. """
  95. :param level: 删除级别
  96. :param params: 删除 id 信息
  97. :return:
  98. """
  99. match level:
  100. case "dataset":
  101. return await self.delete_dataset(params)
  102. case "doc":
  103. return await self.delete_doc(params)
  104. case "chunk":
  105. return await self.delete_chunk(params)
  106. case _:
  107. return {"error": "error level"}