mapper.py 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899
  1. import json
  2. from applications.config import Chunk
  3. class TaskConst:
  4. INIT_STATUS = 0
  5. PROCESSING_STATUS = 1
  6. FINISHED_STATUS = 2
  7. FAILED_STATUS = 3
  8. class BaseMySQLClient:
  9. def __init__(self, pool):
  10. self.pool = pool
  11. class Contents(BaseMySQLClient):
  12. async def insert_content(self, doc_id, text):
  13. query = """
  14. INSERT IGNORE INTO contents
  15. (doc_id, text)
  16. VALUES (%s, %s);
  17. """
  18. return await self.pool.async_save(query=query, params=(doc_id, text))
  19. async def update_content_status(self, doc_id, ori_status, new_status):
  20. query = """
  21. UPDATE contents
  22. SET status = %s
  23. WHERE doc_id = %s AND status = %s;
  24. """
  25. return await self.pool.async_save(
  26. query=query, params=(new_status, doc_id, ori_status)
  27. )
  28. class ContentChunks(BaseMySQLClient):
  29. async def insert_chunk(self, chunk: Chunk) -> int:
  30. query = """
  31. INSERT IGNORE INTO content_chunks
  32. (chunk_id, doc_id, text, tokens, topic_purity)
  33. VALUES (%s, %s, %s, %s, %s);
  34. """
  35. return await self.pool.async_save(
  36. query=query,
  37. params=(
  38. chunk.chunk_id,
  39. chunk.doc_id,
  40. chunk.text,
  41. chunk.tokens,
  42. chunk.topic_purity,
  43. ),
  44. )
  45. async def update_chunk_status(self, doc_id, chunk_id, ori_status, new_status):
  46. query = """
  47. UPDATE content_chunks
  48. SET chunk_status = %s
  49. WHERE doc_id = %s AND chunk_id = %s AND chunk_status = %s;
  50. """
  51. return await self.pool.async_save(
  52. query=query, params=(new_status, doc_id, chunk_id, ori_status)
  53. )
  54. async def update_embedding_status(self, doc_id, chunk_id, ori_status, new_status):
  55. query = """
  56. UPDATE content_chunks
  57. SET embedding_status = %s
  58. WHERE doc_id = %s AND chunk_id = %s AND embedding_status = %s;
  59. """
  60. return await self.pool.async_save(
  61. query=query, params=(new_status, doc_id, chunk_id, ori_status)
  62. )
  63. async def set_chunk_result(self, chunk: Chunk, ori_status, new_status):
  64. query = """
  65. UPDATE content_chunks
  66. SET summary = %s, topic = %s, domain = %s, task_type = %s, concepts = %s, keywords = %s, questions = %s, chunk_status = %s
  67. WHERE doc_id = %s AND chunk_id = %s AND chunk_status = %s;
  68. """
  69. return await self.pool.async_save(
  70. query=query,
  71. params=(
  72. chunk.summary,
  73. chunk.topic,
  74. chunk.domain,
  75. chunk.task_type,
  76. json.dumps(chunk.concepts),
  77. json.dumps(chunk.keywords),
  78. json.dumps(chunk.questions),
  79. new_status,
  80. chunk.doc_id,
  81. chunk.chunk_id,
  82. ori_status,
  83. ),
  84. )