_mapper.py 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180
  1. from typing import Dict, List
  2. from app.core.database import DatabaseManager
  3. from ._const import DecodeCardConst
  4. TABLE_SOURCE = "auto_reply_top_cards_daily"
  5. TABLE_TASK = "long_articles_decode_tasks"
  6. class CardDecodeTaskMapper(DecodeCardConst):
  7. """卡片解构 Mapper — 操作 auto_reply_top_cards_daily 与 long_articles_decode_tasks"""
  8. def __init__(self, pool: DatabaseManager):
  9. self.pool = pool
  10. # ——— auto_reply_top_cards_daily ———
  11. async def fetch_cards(self) -> List[Dict]:
  12. """获取待解构卡片:status=INIT"""
  13. query = f"""
  14. SELECT id, channel, share_cover, share_title, card_cover_id
  15. FROM {TABLE_SOURCE}
  16. WHERE status = %s
  17. LIMIT %s
  18. """
  19. return await self.pool.async_fetch(
  20. query=query, params=(self.TaskStatus.INIT, self.TASK_BATCH)
  21. )
  22. async def update_card_status(
  23. self, id_: int, ori_status: int, new_status: int
  24. ) -> int:
  25. """更新卡片解构状态(乐观锁)"""
  26. query = f"""
  27. UPDATE {TABLE_SOURCE}
  28. SET status = %s
  29. WHERE id = %s AND status = %s
  30. """
  31. return await self.pool.async_save(
  32. query=query, params=(new_status, id_, ori_status)
  33. )
  34. async def set_card_cover_id(self, id_: int, cover_id: str) -> int:
  35. """回填 card_cover_id"""
  36. query = f"""
  37. UPDATE {TABLE_SOURCE}
  38. SET card_cover_id = %s
  39. WHERE id = %s
  40. """
  41. return await self.pool.async_save(query=query, params=(cover_id, id_))
  42. # ——— long_articles_decode_tasks ———
  43. async def insert_decode_task(
  44. self,
  45. source_id: str,
  46. config_id: int,
  47. payload: str,
  48. remark: str = None,
  49. status: int = None,
  50. ) -> int:
  51. if status is not None:
  52. query = f"""
  53. INSERT IGNORE INTO {TABLE_TASK}
  54. (source_id, config_id, source, channel, payload, remark, status)
  55. VALUES (%s, %s, %s, %s, %s, %s, %s)
  56. """
  57. params = (
  58. source_id, config_id, self.SourceType.PARTNER_CARD,
  59. self.TaskChannel.PARTNER_CARD, payload, remark, status,
  60. )
  61. else:
  62. query = f"""
  63. INSERT IGNORE INTO {TABLE_TASK}
  64. (source_id, config_id, source, channel, payload, remark)
  65. VALUES (%s, %s, %s, %s, %s, %s)
  66. """
  67. params = (
  68. source_id, config_id, self.SourceType.PARTNER_CARD,
  69. self.TaskChannel.PARTNER_CARD, payload, remark,
  70. )
  71. return await self.pool.async_save(query=query, params=params)
  72. async def set_decode_result(
  73. self,
  74. source_id: str,
  75. config_id: int,
  76. result: str,
  77. remark: str = None,
  78. ) -> int:
  79. query = f"""
  80. UPDATE {TABLE_TASK}
  81. SET status = %s, result = %s, remark = %s
  82. WHERE source_id = %s AND status IN (%s, %s) AND config_id = %s
  83. """
  84. return await self.pool.async_save(
  85. query=query,
  86. params=(
  87. self.TaskStatus.SUCCESS,
  88. result,
  89. remark,
  90. source_id,
  91. self.TaskStatus.INIT,
  92. self.TaskStatus.PROCESSING,
  93. config_id,
  94. ),
  95. )
  96. async def fetch_pending_tasks(self) -> List[Dict]:
  97. query = f"""
  98. SELECT source_id, config_id
  99. FROM {TABLE_TASK}
  100. WHERE status IN (%s, %s) AND source = %s
  101. ORDER BY config_id
  102. LIMIT %s
  103. """
  104. return await self.pool.async_fetch(
  105. query=query,
  106. params=(
  107. self.TaskStatus.INIT,
  108. self.TaskStatus.PROCESSING,
  109. self.SourceType.PARTNER_CARD,
  110. self.TASK_BATCH,
  111. ),
  112. )
  113. async def update_task_status_by_source_id(
  114. self,
  115. source_id: str,
  116. config_id: int,
  117. new_status: int,
  118. remark: str = None,
  119. ) -> int:
  120. query = f"""
  121. UPDATE {TABLE_TASK}
  122. SET status = %s, remark = %s
  123. WHERE source_id = %s AND status IN (%s, %s) AND config_id = %s
  124. """
  125. return await self.pool.async_save(
  126. query=query,
  127. params=(
  128. new_status, remark, source_id,
  129. self.TaskStatus.INIT, self.TaskStatus.PROCESSING,
  130. config_id,
  131. ),
  132. )
  133. async def fetch_existing_source_ids(
  134. self, source_ids: List[str], config_id: int
  135. ) -> set:
  136. """批量查询已有任务记录的 source_id,用于去重跳过"""
  137. if not source_ids:
  138. return set()
  139. placeholders = ",".join(["%s"] * len(source_ids))
  140. query = f"""
  141. SELECT source_id FROM {TABLE_TASK}
  142. WHERE source_id IN ({placeholders})
  143. AND config_id = %s
  144. AND source = %s
  145. AND status IN (%s, %s, %s, %s)
  146. """
  147. return {
  148. r["source_id"]
  149. for r in await self.pool.async_fetch(
  150. query=query,
  151. params=(
  152. *source_ids,
  153. config_id,
  154. self.SourceType.PARTNER_CARD,
  155. self.TaskStatus.INIT,
  156. self.TaskStatus.PROCESSING,
  157. self.TaskStatus.SUCCESS,
  158. self.TaskStatus.FAILED,
  159. ),
  160. )
  161. }
  162. __all__ = ["CardDecodeTaskMapper"]