_mapper.py 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182
  1. from typing import Dict, List
  2. from app.core.database import DatabaseManager
  3. from ._const import DecodeCardConst
  4. TABLE_SOURCE = "auto_reply_top_cards_daily"
  5. TABLE_TASK = "long_articles_decode_tasks"
  6. class CardDecodeTaskMapper(DecodeCardConst):
  7. """卡片解构 Mapper — 操作 auto_reply_top_cards_daily 与 long_articles_decode_tasks"""
  8. def __init__(self, pool: DatabaseManager):
  9. self.pool = pool
  10. # ——— auto_reply_top_cards_daily ———
  11. async def fetch_cards(self) -> List[Dict]:
  12. """获取待解构卡片:status=INIT"""
  13. query = f"""
  14. SELECT id, channel, share_cover, share_title, card_cover_id
  15. FROM {TABLE_SOURCE}
  16. WHERE status = %s
  17. LIMIT %s
  18. """
  19. return await self.pool.async_fetch(
  20. query=query, params=(self.TaskStatus.INIT, self.TASK_BATCH)
  21. )
  22. async def update_card_status(
  23. self, id_: int, ori_status: int, new_status: int
  24. ) -> int:
  25. """更新卡片解构状态(乐观锁)"""
  26. query = f"""
  27. UPDATE {TABLE_SOURCE}
  28. SET status = %s
  29. WHERE id = %s AND status = %s
  30. """
  31. return await self.pool.async_save(
  32. query=query, params=(new_status, id_, ori_status)
  33. )
  34. async def set_card_cover_id(self, id_: int, cover_id: str) -> int:
  35. """回填 card_cover_id"""
  36. query = f"""
  37. UPDATE {TABLE_SOURCE}
  38. SET card_cover_id = %s
  39. WHERE id = %s
  40. """
  41. return await self.pool.async_save(query=query, params=(cover_id, id_))
  42. # ——— long_articles_decode_tasks ———
  43. async def insert_decode_task(
  44. self,
  45. source_id: str,
  46. config_id: int,
  47. source: int,
  48. payload: str,
  49. remark: str = None,
  50. status: int = None,
  51. ) -> int:
  52. if status is not None:
  53. query = f"""
  54. INSERT IGNORE INTO {TABLE_TASK}
  55. (source_id, config_id, source, channel, payload, remark, status)
  56. VALUES (%s, %s, %s, %s, %s, %s, %s)
  57. """
  58. params = (
  59. source_id, config_id, source,
  60. self.TaskChannel.PARTNER_CARD, payload, remark, status,
  61. )
  62. else:
  63. query = f"""
  64. INSERT IGNORE INTO {TABLE_TASK}
  65. (source_id, config_id, source, channel, payload, remark)
  66. VALUES (%s, %s, %s, %s, %s, %s)
  67. """
  68. params = (
  69. source_id, config_id, source,
  70. self.TaskChannel.PARTNER_CARD, payload, remark,
  71. )
  72. return await self.pool.async_save(query=query, params=params)
  73. async def set_decode_result(
  74. self,
  75. source_id: str,
  76. config_id: int,
  77. result: str,
  78. remark: str = None,
  79. ) -> int:
  80. query = f"""
  81. UPDATE {TABLE_TASK}
  82. SET status = %s, result = %s, remark = %s
  83. WHERE source_id = %s AND status IN (%s, %s) AND config_id = %s
  84. """
  85. return await self.pool.async_save(
  86. query=query,
  87. params=(
  88. self.TaskStatus.SUCCESS,
  89. result,
  90. remark,
  91. source_id,
  92. self.TaskStatus.INIT,
  93. self.TaskStatus.PROCESSING,
  94. config_id,
  95. ),
  96. )
  97. async def fetch_pending_tasks(self) -> List[Dict]:
  98. query = f"""
  99. SELECT source_id, config_id
  100. FROM {TABLE_TASK}
  101. WHERE status IN (%s, %s) AND source IN (%s, %s)
  102. ORDER BY config_id
  103. LIMIT %s
  104. """
  105. return await self.pool.async_fetch(
  106. query=query,
  107. params=(
  108. self.TaskStatus.INIT,
  109. self.TaskStatus.PROCESSING,
  110. self.SourceType.PARTNER_CARD_TOULIU,
  111. self.SourceType.PARTNER_CARD_COOPERATE,
  112. self.TASK_BATCH,
  113. ),
  114. )
  115. async def update_task_status_by_source_id(
  116. self,
  117. source_id: str,
  118. config_id: int,
  119. new_status: int,
  120. remark: str = None,
  121. ) -> int:
  122. query = f"""
  123. UPDATE {TABLE_TASK}
  124. SET status = %s, remark = %s
  125. WHERE source_id = %s AND status IN (%s, %s) AND config_id = %s
  126. """
  127. return await self.pool.async_save(
  128. query=query,
  129. params=(
  130. new_status, remark, source_id,
  131. self.TaskStatus.INIT, self.TaskStatus.PROCESSING,
  132. config_id,
  133. ),
  134. )
  135. async def fetch_existing_source_ids(
  136. self, source_ids: List[str], config_id: int, source: int
  137. ) -> set:
  138. """批量查询已有任务记录的 source_id,用于去重跳过"""
  139. if not source_ids:
  140. return set()
  141. placeholders = ",".join(["%s"] * len(source_ids))
  142. query = f"""
  143. SELECT source_id FROM {TABLE_TASK}
  144. WHERE source_id IN ({placeholders})
  145. AND config_id = %s
  146. AND source = %s
  147. AND status IN (%s, %s, %s, %s)
  148. """
  149. return {
  150. r["source_id"]
  151. for r in await self.pool.async_fetch(
  152. query=query,
  153. params=(
  154. *source_ids,
  155. config_id,
  156. source,
  157. self.TaskStatus.INIT,
  158. self.TaskStatus.PROCESSING,
  159. self.TaskStatus.SUCCESS,
  160. self.TaskStatus.FAILED,
  161. ),
  162. )
  163. }
  164. __all__ = ["CardDecodeTaskMapper"]