_mapper.py 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166
  1. from typing import Dict, List
  2. from app.core.database import DatabaseManager
  3. from ._const import DecodeMaterialConst
  4. TABLE_SOURCE = "growth_daily_material"
  5. TABLE_TASK = "long_articles_decode_tasks"
  6. class MaterialDecodeTaskMapper(DecodeMaterialConst):
  7. """素材解构 Mapper — 操作 growth_daily_material 与 long_articles_decode_tasks"""
  8. def __init__(self, pool: DatabaseManager):
  9. self.pool = pool
  10. # ——— growth_daily_material ———
  11. async def fetch_materials(self) -> List[Dict]:
  12. """获取待解构素材:status=INIT"""
  13. query = f"""
  14. SELECT id, material_id, material_title, material_cover
  15. FROM {TABLE_SOURCE}
  16. WHERE status = %s
  17. LIMIT %s
  18. """
  19. return await self.pool.async_fetch(
  20. query=query, params=(self.TaskStatus.INIT, self.TASK_BATCH)
  21. )
  22. async def update_material_status(
  23. self, id_: int, ori_status: int, new_status: int
  24. ) -> int:
  25. """更新素材解构状态(乐观锁)"""
  26. query = f"""
  27. UPDATE {TABLE_SOURCE}
  28. SET status = %s
  29. WHERE id = %s AND status = %s
  30. """
  31. return await self.pool.async_save(
  32. query=query, params=(new_status, id_, ori_status)
  33. )
  34. # ——— long_articles_decode_tasks ———
  35. async def insert_decode_task(
  36. self,
  37. source_id: str,
  38. payload: str,
  39. remark: str = None,
  40. status: int = None,
  41. ) -> int:
  42. if status is not None:
  43. query = f"""
  44. INSERT IGNORE INTO {TABLE_TASK}
  45. (source_id, config_id, source, channel, payload, remark, status)
  46. VALUES (%s, %s, %s, %s, %s, %s, %s)
  47. """
  48. params = (
  49. source_id, self.CONFIG_ID, self.SourceType.MATERIAL,
  50. self.TaskChannel.MATERIAL, payload, remark, status,
  51. )
  52. else:
  53. query = f"""
  54. INSERT IGNORE INTO {TABLE_TASK}
  55. (source_id, config_id, source, channel, payload, remark)
  56. VALUES (%s, %s, %s, %s, %s, %s)
  57. """
  58. params = (
  59. source_id, self.CONFIG_ID, self.SourceType.MATERIAL,
  60. self.TaskChannel.MATERIAL, payload, remark,
  61. )
  62. return await self.pool.async_save(query=query, params=params)
  63. async def set_decode_result(
  64. self,
  65. source_id: str,
  66. result: str,
  67. remark: str = None,
  68. ) -> int:
  69. query = f"""
  70. UPDATE {TABLE_TASK}
  71. SET status = %s, result = %s, remark = %s
  72. WHERE source_id = %s AND status IN (%s, %s) AND config_id = %s
  73. """
  74. return await self.pool.async_save(
  75. query=query,
  76. params=(
  77. self.TaskStatus.SUCCESS,
  78. result,
  79. remark,
  80. source_id,
  81. self.TaskStatus.INIT,
  82. self.TaskStatus.PROCESSING,
  83. self.CONFIG_ID,
  84. ),
  85. )
  86. async def fetch_pending_tasks(self) -> List[Dict]:
  87. query = f"""
  88. SELECT source_id
  89. FROM {TABLE_TASK}
  90. WHERE status IN (%s, %s) AND source = %s AND config_id = %s
  91. LIMIT %s
  92. """
  93. return await self.pool.async_fetch(
  94. query=query,
  95. params=(
  96. self.TaskStatus.INIT,
  97. self.TaskStatus.PROCESSING,
  98. self.SourceType.MATERIAL,
  99. self.CONFIG_ID,
  100. self.TASK_BATCH,
  101. ),
  102. )
  103. async def update_task_status_by_source_id(
  104. self,
  105. source_id: str,
  106. new_status: int,
  107. remark: str = None,
  108. ) -> int:
  109. query = f"""
  110. UPDATE {TABLE_TASK}
  111. SET status = %s, remark = %s
  112. WHERE source_id = %s AND status IN (%s, %s) AND config_id = %s
  113. """
  114. return await self.pool.async_save(
  115. query=query,
  116. params=(
  117. new_status, remark, source_id,
  118. self.TaskStatus.INIT, self.TaskStatus.PROCESSING,
  119. self.CONFIG_ID,
  120. ),
  121. )
  122. async def fetch_existing_source_ids(self, source_ids: List[str]) -> set:
  123. """批量查询已有任务记录的 source_id,用于去重跳过"""
  124. if not source_ids:
  125. return set()
  126. placeholders = ",".join(["%s"] * len(source_ids))
  127. query = f"""
  128. SELECT source_id FROM {TABLE_TASK}
  129. WHERE source_id IN ({placeholders})
  130. AND config_id = %s
  131. AND source = %s
  132. AND status IN (%s, %s, %s, %s)
  133. """
  134. return {
  135. r["source_id"]
  136. for r in await self.pool.async_fetch(
  137. query=query,
  138. params=(
  139. *source_ids,
  140. self.CONFIG_ID,
  141. self.SourceType.MATERIAL,
  142. self.TaskStatus.INIT,
  143. self.TaskStatus.PROCESSING,
  144. self.TaskStatus.SUCCESS,
  145. self.TaskStatus.FAILED,
  146. ),
  147. )
  148. }
  149. __all__ = ["MaterialDecodeTaskMapper"]