_mapper.py 8.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264
  1. from typing import Dict, List
  2. from app.core.database import DatabaseManager
  3. from ._const import DecodeArticleConst
  4. TABLE = "long_articles_decode_tasks"
  5. class ArticlesDecodeTaskMapper(DecodeArticleConst):
  6. def __init__(self, pool: DatabaseManager):
  7. self.pool = pool
  8. async def insert_decode_task(
  9. self,
  10. source_id: str,
  11. source: int,
  12. payload: str,
  13. remark: str = None,
  14. status: int = None,
  15. channel: int = None,
  16. ) -> int:
  17. ch = channel if channel is not None else self.TaskChannel.ARTICLE
  18. if status is not None:
  19. query = f"""
  20. INSERT IGNORE INTO {TABLE}
  21. (source_id, config_id, source, channel, payload, remark, status)
  22. VALUES (%s, %s, %s, %s, %s, %s, %s)
  23. """
  24. params = (
  25. source_id,
  26. self.CONFIG_ID,
  27. source,
  28. ch,
  29. payload,
  30. remark,
  31. status,
  32. )
  33. else:
  34. query = f"""
  35. INSERT IGNORE INTO {TABLE}
  36. (source_id, config_id, source, channel, payload, remark)
  37. VALUES (%s, %s, %s, %s, %s, %s)
  38. """
  39. params = (
  40. source_id,
  41. self.CONFIG_ID,
  42. source,
  43. ch,
  44. payload,
  45. remark,
  46. )
  47. return await self.pool.async_save(query=query, params=params)
  48. async def update_task_status_by_source_id(
  49. self,
  50. source_id: str,
  51. ori_status: int,
  52. new_status: int,
  53. remark: str = None,
  54. ) -> int:
  55. query = f"""
  56. UPDATE {TABLE}
  57. SET status = %s, remark = %s
  58. WHERE source_id = %s AND status = %s AND config_id = %s
  59. """
  60. return await self.pool.async_save(
  61. query=query,
  62. params=(new_status, remark, source_id, ori_status, self.CONFIG_ID),
  63. )
  64. async def set_decode_result(
  65. self,
  66. source_id: str,
  67. result: str,
  68. remark: str = None,
  69. ) -> int:
  70. query = f"""
  71. UPDATE {TABLE}
  72. SET status = %s, result = %s, remark = %s
  73. WHERE source_id = %s AND status IN (%s, %s) AND config_id = %s
  74. """
  75. return await self.pool.async_save(
  76. query=query,
  77. params=(
  78. self.TaskStatus.SUCCESS,
  79. result,
  80. remark,
  81. source_id,
  82. self.TaskStatus.INIT,
  83. self.TaskStatus.PROCESSING,
  84. self.CONFIG_ID,
  85. ),
  86. )
  87. async def fetch_pending_tasks(self, source: int = None) -> List[Dict]:
  88. if source is not None:
  89. query = f"""
  90. SELECT source_id
  91. FROM {TABLE}
  92. WHERE status IN (%s, %s) AND source = %s AND config_id = %s
  93. LIMIT %s
  94. """
  95. params = (
  96. self.TaskStatus.INIT,
  97. self.TaskStatus.PROCESSING,
  98. source,
  99. self.CONFIG_ID,
  100. self.TASK_BATCH,
  101. )
  102. else:
  103. query = f"""
  104. SELECT source_id
  105. FROM {TABLE}
  106. WHERE status IN (%s, %s) AND config_id = %s
  107. LIMIT %s
  108. """
  109. params = (
  110. self.TaskStatus.INIT,
  111. self.TaskStatus.PROCESSING,
  112. self.CONFIG_ID,
  113. self.TASK_BATCH,
  114. )
  115. return await self.pool.async_fetch(query=query, params=params)
  116. async def fetch_existing_source_ids(self, source_ids: List[str]) -> set:
  117. """批量查询哪些 source_id 已有进行中或成功的解构任务,用于去重跳过"""
  118. if not source_ids:
  119. return set()
  120. placeholders = ",".join(["%s"] * len(source_ids))
  121. query = f"""
  122. SELECT source_id FROM {TABLE}
  123. WHERE source_id IN ({placeholders})
  124. AND config_id = %s
  125. AND status IN (%s, %s, %s)
  126. """
  127. rows = await self.pool.async_fetch(
  128. query=query,
  129. params=(
  130. *source_ids,
  131. self.CONFIG_ID,
  132. self.TaskStatus.INIT,
  133. self.TaskStatus.PROCESSING,
  134. self.TaskStatus.SUCCESS,
  135. ),
  136. )
  137. return {r["source_id"] for r in rows}
  138. async def fetch_extract_tasks(self) -> List[Dict]:
  139. query = f"""
  140. SELECT id, result FROM {TABLE}
  141. WHERE extract_status = %s AND status = %s AND config_id = %s
  142. """
  143. return await self.pool.async_fetch(
  144. query=query,
  145. params=(self.ExtractStatus.INIT, self.TaskStatus.SUCCESS, self.CONFIG_ID),
  146. )
  147. async def update_extract_status(
  148. self, task_id: int, ori_status: int, new_status: int
  149. ) -> int:
  150. query = f"""
  151. UPDATE {TABLE}
  152. SET extract_status = %s
  153. WHERE extract_status = %s AND id = %s
  154. """
  155. return await self.pool.async_save(
  156. query=query, params=(new_status, ori_status, task_id)
  157. )
  158. async def record_extract_detail(self, decode_task_id: int, detail: Dict) -> int:
  159. query = """
  160. INSERT INTO long_articles_decode_task_detail
  161. (decode_task_id, inspiration, purpose, key_point, topic)
  162. VALUES (%s, %s, %s, %s, %s)
  163. """
  164. return await self.pool.async_save(
  165. query=query,
  166. params=(
  167. decode_task_id,
  168. detail.get("inspiration", ""),
  169. detail.get("purpose", ""),
  170. detail.get("key_point", ""),
  171. detail.get("topic", ""),
  172. ),
  173. )
  174. class AdPlatformArticlesDecodeTaskMapper(ArticlesDecodeTaskMapper):
  175. def __init__(self, pool: DatabaseManager):
  176. super().__init__(pool)
  177. async def update_article_decode_status(
  178. self, id_: int, ori_status: int, new_status: int
  179. ) -> int:
  180. query = """
  181. UPDATE ad_platform_accounts_daily_detail
  182. SET decode_status = %s
  183. WHERE id = %s AND decode_status = %s
  184. """
  185. return await self.pool.async_save(
  186. query=query, params=(new_status, id_, ori_status)
  187. )
  188. async def fetch_decode_articles(self) -> List[Dict]:
  189. query = """
  190. SELECT id, account_name, gh_id, article_title, article_cover,
  191. article_text, article_images, wx_sn
  192. FROM ad_platform_accounts_daily_detail
  193. WHERE fetch_status = %s AND decode_status = %s
  194. LIMIT %s
  195. """
  196. return await self.pool.async_fetch(
  197. query=query,
  198. params=(self.TaskStatus.SUCCESS, self.TaskStatus.INIT, self.TASK_BATCH),
  199. )
  200. class InnerArticlesDecodeTaskMapper(ArticlesDecodeTaskMapper):
  201. TABLE_INNER = "long_articles_decode_articles"
  202. def __init__(self, pool: DatabaseManager):
  203. super().__init__(pool)
  204. async def fetch_inner_articles(self) -> List[Dict]:
  205. query = f"""
  206. SELECT id, title, source_id, coverimgurl, article_text, summary, card_title
  207. FROM {self.TABLE_INNER}
  208. WHERE status = %s
  209. LIMIT %s
  210. """
  211. return await self.pool.async_fetch(
  212. query=query, params=(self.TaskStatus.INIT, self.TASK_BATCH)
  213. )
  214. async def update_inner_article_status(
  215. self, id_: int, ori_status: int, new_status: int
  216. ) -> int:
  217. query = f"""
  218. UPDATE {self.TABLE_INNER}
  219. SET status = %s
  220. WHERE id = %s AND status = %s
  221. """
  222. return await self.pool.async_save(
  223. query=query, params=(new_status, id_, ori_status)
  224. )
  225. async def fetch_inner_articles_produce_detail(self, source_id) -> List[Dict]:
  226. query = """
  227. SELECT produce_module_type, output
  228. FROM produce_plan_module_output
  229. WHERE plan_exe_id = %s
  230. AND produce_module_type IN (1, 2, 4)
  231. """
  232. return await self.pool.async_fetch(
  233. query=query, db_name="aigc", params=(source_id,)
  234. )
  235. __all__ = [
  236. "ArticlesDecodeTaskMapper",
  237. "AdPlatformArticlesDecodeTaskMapper",
  238. "InnerArticlesDecodeTaskMapper",
  239. ]