_mapper.py 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194
  1. from typing import Dict, List
  2. from app.core.database import DatabaseManager
  3. from ._const import DecodeCardConst
  4. TABLE_SOURCE = "auto_reply_top_cards_daily"
  5. TABLE_TASK = "long_articles_decode_tasks"
  6. class CardDecodeTaskMapper(DecodeCardConst):
  7. """卡片解构 Mapper — 操作 auto_reply_top_cards_daily 与 long_articles_decode_tasks"""
  8. def __init__(self, pool: DatabaseManager):
  9. self.pool = pool
  10. # ——— auto_reply_top_cards_daily ———
  11. async def fetch_cards(self) -> List[Dict]:
  12. """获取待解构卡片:status=INIT"""
  13. query = f"""
  14. SELECT id, channel, share_cover, share_title, card_cover_id
  15. FROM {TABLE_SOURCE}
  16. WHERE status = %s
  17. LIMIT %s
  18. """
  19. return await self.pool.async_fetch(
  20. query=query, params=(self.TaskStatus.INIT, self.TASK_BATCH)
  21. )
  22. async def update_card_status(
  23. self, id_: int, ori_status: int, new_status: int
  24. ) -> int:
  25. """更新卡片解构状态(乐观锁)"""
  26. query = f"""
  27. UPDATE {TABLE_SOURCE}
  28. SET status = %s
  29. WHERE id = %s AND status = %s
  30. """
  31. return await self.pool.async_save(
  32. query=query, params=(new_status, id_, ori_status)
  33. )
  34. async def set_card_cover_id(self, id_: int, cover_id: str) -> int:
  35. """回填 card_cover_id"""
  36. query = f"""
  37. UPDATE {TABLE_SOURCE}
  38. SET card_cover_id = %s
  39. WHERE id = %s
  40. """
  41. return await self.pool.async_save(query=query, params=(cover_id, id_))
  42. # ——— long_articles_decode_tasks ———
  43. async def insert_decode_task(
  44. self,
  45. source_id: str,
  46. config_id: int,
  47. source: int,
  48. payload: str,
  49. remark: str = None,
  50. status: int = None,
  51. ) -> int:
  52. if status is not None:
  53. query = f"""
  54. INSERT IGNORE INTO {TABLE_TASK}
  55. (source_id, config_id, source, channel, payload, remark, status)
  56. VALUES (%s, %s, %s, %s, %s, %s, %s)
  57. """
  58. params = (
  59. source_id,
  60. config_id,
  61. source,
  62. self.TaskChannel.PARTNER_CARD,
  63. payload,
  64. remark,
  65. status,
  66. )
  67. else:
  68. query = f"""
  69. INSERT IGNORE INTO {TABLE_TASK}
  70. (source_id, config_id, source, channel, payload, remark)
  71. VALUES (%s, %s, %s, %s, %s, %s)
  72. """
  73. params = (
  74. source_id,
  75. config_id,
  76. source,
  77. self.TaskChannel.PARTNER_CARD,
  78. payload,
  79. remark,
  80. )
  81. return await self.pool.async_save(query=query, params=params)
  82. async def set_decode_result(
  83. self,
  84. source_id: str,
  85. config_id: int,
  86. result: str,
  87. remark: str = None,
  88. ) -> int:
  89. query = f"""
  90. UPDATE {TABLE_TASK}
  91. SET status = %s, result = %s, remark = %s
  92. WHERE source_id = %s AND status IN (%s, %s) AND config_id = %s
  93. """
  94. return await self.pool.async_save(
  95. query=query,
  96. params=(
  97. self.TaskStatus.SUCCESS,
  98. result,
  99. remark,
  100. source_id,
  101. self.TaskStatus.INIT,
  102. self.TaskStatus.PROCESSING,
  103. config_id,
  104. ),
  105. )
  106. async def fetch_pending_tasks(self) -> List[Dict]:
  107. query = f"""
  108. SELECT source_id, config_id
  109. FROM {TABLE_TASK}
  110. WHERE status IN (%s, %s) AND source IN (%s, %s)
  111. ORDER BY config_id
  112. LIMIT %s
  113. """
  114. return await self.pool.async_fetch(
  115. query=query,
  116. params=(
  117. self.TaskStatus.INIT,
  118. self.TaskStatus.PROCESSING,
  119. self.SourceType.PARTNER_CARD_TOULIU,
  120. self.SourceType.PARTNER_CARD_COOPERATE,
  121. self.TASK_BATCH,
  122. ),
  123. )
  124. async def update_task_status_by_source_id(
  125. self,
  126. source_id: str,
  127. config_id: int,
  128. new_status: int,
  129. remark: str = None,
  130. ) -> int:
  131. query = f"""
  132. UPDATE {TABLE_TASK}
  133. SET status = %s, remark = %s
  134. WHERE source_id = %s AND status IN (%s, %s) AND config_id = %s
  135. """
  136. return await self.pool.async_save(
  137. query=query,
  138. params=(
  139. new_status,
  140. remark,
  141. source_id,
  142. self.TaskStatus.INIT,
  143. self.TaskStatus.PROCESSING,
  144. config_id,
  145. ),
  146. )
  147. async def fetch_existing_source_ids(
  148. self, source_ids: List[str], config_id: int, source: int
  149. ) -> set:
  150. """批量查询已有任务记录的 source_id,用于去重跳过"""
  151. if not source_ids:
  152. return set()
  153. placeholders = ",".join(["%s"] * len(source_ids))
  154. query = f"""
  155. SELECT source_id FROM {TABLE_TASK}
  156. WHERE source_id IN ({placeholders})
  157. AND config_id = %s
  158. AND source = %s
  159. AND status IN (%s, %s, %s, %s)
  160. """
  161. return {
  162. r["source_id"]
  163. for r in await self.pool.async_fetch(
  164. query=query,
  165. params=(
  166. *source_ids,
  167. config_id,
  168. source,
  169. self.TaskStatus.INIT,
  170. self.TaskStatus.PROCESSING,
  171. self.TaskStatus.SUCCESS,
  172. self.TaskStatus.FAILED,
  173. ),
  174. )
  175. }
  176. __all__ = ["CardDecodeTaskMapper"]