_mapper.py 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178
  1. from typing import Dict, List
  2. from app.core.database import DatabaseManager
  3. from ._const import DecodeMaterialConst
  4. TABLE_SOURCE = "growth_daily_material"
  5. TABLE_TASK = "long_articles_decode_tasks"
  6. class MaterialDecodeTaskMapper(DecodeMaterialConst):
  7. """素材解构 Mapper — 操作 growth_daily_material 与 long_articles_decode_tasks"""
  8. def __init__(self, pool: DatabaseManager):
  9. self.pool = pool
  10. # ——— growth_daily_material ———
  11. async def fetch_materials(self) -> List[Dict]:
  12. """获取待解构素材:status=INIT"""
  13. query = f"""
  14. SELECT id, material_id, material_title, material_cover
  15. FROM {TABLE_SOURCE}
  16. WHERE status = %s
  17. LIMIT %s
  18. """
  19. return await self.pool.async_fetch(
  20. query=query, params=(self.TaskStatus.INIT, self.TASK_BATCH)
  21. )
  22. async def update_material_status(
  23. self, id_: int, ori_status: int, new_status: int
  24. ) -> int:
  25. """更新素材解构状态(乐观锁)"""
  26. query = f"""
  27. UPDATE {TABLE_SOURCE}
  28. SET status = %s
  29. WHERE id = %s AND status = %s
  30. """
  31. return await self.pool.async_save(
  32. query=query, params=(new_status, id_, ori_status)
  33. )
  34. # ——— long_articles_decode_tasks ———
  35. async def insert_decode_task(
  36. self,
  37. source_id: str,
  38. payload: str,
  39. remark: str = None,
  40. status: int = None,
  41. ) -> int:
  42. if status is not None:
  43. query = f"""
  44. INSERT IGNORE INTO {TABLE_TASK}
  45. (source_id, config_id, source, channel, payload, remark, status)
  46. VALUES (%s, %s, %s, %s, %s, %s, %s)
  47. """
  48. params = (
  49. source_id,
  50. self.CONFIG_ID,
  51. self.SourceType.MATERIAL,
  52. self.TaskChannel.MATERIAL,
  53. payload,
  54. remark,
  55. status,
  56. )
  57. else:
  58. query = f"""
  59. INSERT IGNORE INTO {TABLE_TASK}
  60. (source_id, config_id, source, channel, payload, remark)
  61. VALUES (%s, %s, %s, %s, %s, %s)
  62. """
  63. params = (
  64. source_id,
  65. self.CONFIG_ID,
  66. self.SourceType.MATERIAL,
  67. self.TaskChannel.MATERIAL,
  68. payload,
  69. remark,
  70. )
  71. return await self.pool.async_save(query=query, params=params)
  72. async def set_decode_result(
  73. self,
  74. source_id: str,
  75. result: str,
  76. remark: str = None,
  77. ) -> int:
  78. query = f"""
  79. UPDATE {TABLE_TASK}
  80. SET status = %s, result = %s, remark = %s
  81. WHERE source_id = %s AND status IN (%s, %s) AND config_id = %s
  82. """
  83. return await self.pool.async_save(
  84. query=query,
  85. params=(
  86. self.TaskStatus.SUCCESS,
  87. result,
  88. remark,
  89. source_id,
  90. self.TaskStatus.INIT,
  91. self.TaskStatus.PROCESSING,
  92. self.CONFIG_ID,
  93. ),
  94. )
  95. async def fetch_pending_tasks(self) -> List[Dict]:
  96. query = f"""
  97. SELECT source_id
  98. FROM {TABLE_TASK}
  99. WHERE status IN (%s, %s) AND source = %s AND config_id = %s
  100. LIMIT %s
  101. """
  102. return await self.pool.async_fetch(
  103. query=query,
  104. params=(
  105. self.TaskStatus.INIT,
  106. self.TaskStatus.PROCESSING,
  107. self.SourceType.MATERIAL,
  108. self.CONFIG_ID,
  109. self.TASK_BATCH,
  110. ),
  111. )
  112. async def update_task_status_by_source_id(
  113. self,
  114. source_id: str,
  115. new_status: int,
  116. remark: str = None,
  117. ) -> int:
  118. query = f"""
  119. UPDATE {TABLE_TASK}
  120. SET status = %s, remark = %s
  121. WHERE source_id = %s AND status IN (%s, %s) AND config_id = %s
  122. """
  123. return await self.pool.async_save(
  124. query=query,
  125. params=(
  126. new_status,
  127. remark,
  128. source_id,
  129. self.TaskStatus.INIT,
  130. self.TaskStatus.PROCESSING,
  131. self.CONFIG_ID,
  132. ),
  133. )
  134. async def fetch_existing_source_ids(self, source_ids: List[str]) -> set:
  135. """批量查询已有任务记录的 source_id,用于去重跳过"""
  136. if not source_ids:
  137. return set()
  138. placeholders = ",".join(["%s"] * len(source_ids))
  139. query = f"""
  140. SELECT source_id FROM {TABLE_TASK}
  141. WHERE source_id IN ({placeholders})
  142. AND config_id = %s
  143. AND source = %s
  144. AND status IN (%s, %s, %s, %s)
  145. """
  146. return {
  147. r["source_id"]
  148. for r in await self.pool.async_fetch(
  149. query=query,
  150. params=(
  151. *source_ids,
  152. self.CONFIG_ID,
  153. self.SourceType.MATERIAL,
  154. self.TaskStatus.INIT,
  155. self.TaskStatus.PROCESSING,
  156. self.TaskStatus.SUCCESS,
  157. self.TaskStatus.FAILED,
  158. ),
  159. )
  160. }
  161. __all__ = ["MaterialDecodeTaskMapper"]