_mapper.py 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197
  1. from typing import Dict, List
  2. from app.core.database import DatabaseManager
  3. from ._const import DecodeMaterialConst
  4. TABLE_SOURCE = "growth_daily_material"
  5. TABLE_TASK = "long_articles_decode_tasks"
  6. class MaterialDecodeTaskMapper(DecodeMaterialConst):
  7. """素材解构 Mapper — 操作 growth_daily_material 与 long_articles_decode_tasks"""
  8. def __init__(self, pool: DatabaseManager):
  9. self.pool = pool
  10. # ——— growth_daily_material ———
  11. async def count_pending_tasks(self) -> int:
  12. """统计队列中正在进行(INIT + PROCESSING)的素材任务数,用于控制消费端压力"""
  13. query = f"""
  14. SELECT COUNT(*) AS cnt
  15. FROM {TABLE_TASK}
  16. WHERE status IN (%s, %s) AND source = %s AND config_id = %s
  17. """
  18. rows = await self.pool.async_fetch(
  19. query=query,
  20. params=(
  21. self.TaskStatus.INIT,
  22. self.TaskStatus.PROCESSING,
  23. self.SourceType.MATERIAL,
  24. self.CONFIG_ID,
  25. ),
  26. )
  27. return rows[0]["cnt"] if rows else 0
  28. async def fetch_materials(self, limit: int = None) -> List[Dict]:
  29. """获取待解构素材:status=INIT"""
  30. query = f"""
  31. SELECT id, material_id, material_title, material_cover
  32. FROM {TABLE_SOURCE}
  33. WHERE status = %s
  34. LIMIT %s
  35. """
  36. return await self.pool.async_fetch(
  37. query=query,
  38. params=(self.TaskStatus.INIT, limit if limit is not None else self.TASK_BATCH),
  39. )
  40. async def update_material_status(
  41. self, id_: int, ori_status: int, new_status: int
  42. ) -> int:
  43. """更新素材解构状态(乐观锁)"""
  44. query = f"""
  45. UPDATE {TABLE_SOURCE}
  46. SET status = %s
  47. WHERE id = %s AND status = %s
  48. """
  49. return await self.pool.async_save(
  50. query=query, params=(new_status, id_, ori_status)
  51. )
  52. # ——— long_articles_decode_tasks ———
  53. async def insert_decode_task(
  54. self,
  55. source_id: str,
  56. payload: str,
  57. remark: str = None,
  58. status: int = None,
  59. ) -> int:
  60. if status is not None:
  61. query = f"""
  62. INSERT IGNORE INTO {TABLE_TASK}
  63. (source_id, config_id, source, channel, payload, remark, status)
  64. VALUES (%s, %s, %s, %s, %s, %s, %s)
  65. """
  66. params = (
  67. source_id,
  68. self.CONFIG_ID,
  69. self.SourceType.MATERIAL,
  70. self.TaskChannel.MATERIAL,
  71. payload,
  72. remark,
  73. status,
  74. )
  75. else:
  76. query = f"""
  77. INSERT IGNORE INTO {TABLE_TASK}
  78. (source_id, config_id, source, channel, payload, remark)
  79. VALUES (%s, %s, %s, %s, %s, %s)
  80. """
  81. params = (
  82. source_id,
  83. self.CONFIG_ID,
  84. self.SourceType.MATERIAL,
  85. self.TaskChannel.MATERIAL,
  86. payload,
  87. remark,
  88. )
  89. return await self.pool.async_save(query=query, params=params)
  90. async def set_decode_result(
  91. self,
  92. source_id: str,
  93. result: str,
  94. remark: str = None,
  95. ) -> int:
  96. query = f"""
  97. UPDATE {TABLE_TASK}
  98. SET status = %s, result = %s, remark = %s
  99. WHERE source_id = %s AND status IN (%s, %s) AND config_id = %s
  100. """
  101. return await self.pool.async_save(
  102. query=query,
  103. params=(
  104. self.TaskStatus.SUCCESS,
  105. result,
  106. remark,
  107. source_id,
  108. self.TaskStatus.INIT,
  109. self.TaskStatus.PROCESSING,
  110. self.CONFIG_ID,
  111. ),
  112. )
  113. async def fetch_pending_tasks(self) -> List[Dict]:
  114. query = f"""
  115. SELECT source_id
  116. FROM {TABLE_TASK}
  117. WHERE status IN (%s, %s) AND source = %s AND config_id = %s
  118. LIMIT %s
  119. """
  120. return await self.pool.async_fetch(
  121. query=query,
  122. params=(
  123. self.TaskStatus.INIT,
  124. self.TaskStatus.PROCESSING,
  125. self.SourceType.MATERIAL,
  126. self.CONFIG_ID,
  127. self.TASK_BATCH,
  128. ),
  129. )
  130. async def update_task_status_by_source_id(
  131. self,
  132. source_id: str,
  133. new_status: int,
  134. remark: str = None,
  135. ) -> int:
  136. query = f"""
  137. UPDATE {TABLE_TASK}
  138. SET status = %s, remark = %s
  139. WHERE source_id = %s AND status IN (%s, %s) AND config_id = %s
  140. """
  141. return await self.pool.async_save(
  142. query=query,
  143. params=(
  144. new_status,
  145. remark,
  146. source_id,
  147. self.TaskStatus.INIT,
  148. self.TaskStatus.PROCESSING,
  149. self.CONFIG_ID,
  150. ),
  151. )
  152. async def fetch_existing_source_ids(self, source_ids: List[str]) -> set:
  153. """批量查询已有任务记录的 source_id,用于去重跳过"""
  154. if not source_ids:
  155. return set()
  156. placeholders = ",".join(["%s"] * len(source_ids))
  157. query = f"""
  158. SELECT source_id FROM {TABLE_TASK}
  159. WHERE source_id IN ({placeholders})
  160. AND config_id = %s
  161. AND source = %s
  162. AND status IN (%s, %s, %s, %s)
  163. """
  164. return {
  165. r["source_id"]
  166. for r in await self.pool.async_fetch(
  167. query=query,
  168. params=(
  169. *source_ids,
  170. self.CONFIG_ID,
  171. self.SourceType.MATERIAL,
  172. self.TaskStatus.INIT,
  173. self.TaskStatus.PROCESSING,
  174. self.TaskStatus.SUCCESS,
  175. self.TaskStatus.FAILED,
  176. ),
  177. )
  178. }
  179. __all__ = ["MaterialDecodeTaskMapper"]