_mapper.py 8.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260
  1. from typing import Dict, List
  2. from app.core.database import DatabaseManager
  3. from ._const import DecodeArticleConst
  4. TABLE = "long_articles_decode_tasks"
  5. class ArticlesDecodeTaskMapper(DecodeArticleConst):
  6. def __init__(self, pool: DatabaseManager):
  7. self.pool = pool
  8. async def insert_decode_task(
  9. self,
  10. source_id: str,
  11. source: int,
  12. payload: str,
  13. remark: str = None,
  14. status: int = None,
  15. ) -> int:
  16. if status is not None:
  17. query = f"""
  18. INSERT IGNORE INTO {TABLE}
  19. (source_id, config_id, source, payload, remark, status)
  20. VALUES (%s, %s, %s, %s, %s, %s)
  21. """
  22. params = (
  23. source_id,
  24. self.CONFIG_ID,
  25. source,
  26. payload,
  27. remark,
  28. status,
  29. )
  30. else:
  31. query = f"""
  32. INSERT IGNORE INTO {TABLE}
  33. (source_id, config_id, source, payload, remark)
  34. VALUES (%s, %s, %s, %s, %s)
  35. """
  36. params = (
  37. source_id,
  38. self.CONFIG_ID,
  39. source,
  40. payload,
  41. remark,
  42. )
  43. return await self.pool.async_save(query=query, params=params)
  44. async def update_task_status_by_source_id(
  45. self,
  46. source_id: str,
  47. ori_status: int,
  48. new_status: int,
  49. remark: str = None,
  50. ) -> int:
  51. query = f"""
  52. UPDATE {TABLE}
  53. SET status = %s, remark = %s
  54. WHERE source_id = %s AND status = %s AND config_id = %s
  55. """
  56. return await self.pool.async_save(
  57. query=query,
  58. params=(new_status, remark, source_id, ori_status, self.CONFIG_ID),
  59. )
  60. async def set_decode_result(
  61. self,
  62. source_id: str,
  63. result: str,
  64. remark: str = None,
  65. ) -> int:
  66. query = f"""
  67. UPDATE {TABLE}
  68. SET status = %s, result = %s, remark = %s
  69. WHERE source_id = %s AND status IN (%s, %s) AND config_id = %s
  70. """
  71. return await self.pool.async_save(
  72. query=query,
  73. params=(
  74. self.TaskStatus.SUCCESS,
  75. result,
  76. remark,
  77. source_id,
  78. self.TaskStatus.INIT,
  79. self.TaskStatus.PROCESSING,
  80. self.CONFIG_ID,
  81. ),
  82. )
  83. async def fetch_pending_tasks(self, source: int = None) -> List[Dict]:
  84. if source is not None:
  85. query = f"""
  86. SELECT source_id
  87. FROM {TABLE}
  88. WHERE status IN (%s, %s) AND source = %s AND config_id = %s
  89. LIMIT %s
  90. """
  91. params = (
  92. self.TaskStatus.INIT,
  93. self.TaskStatus.PROCESSING,
  94. source,
  95. self.CONFIG_ID,
  96. self.TASK_BATCH,
  97. )
  98. else:
  99. query = f"""
  100. SELECT source_id
  101. FROM {TABLE}
  102. WHERE status IN (%s, %s) AND config_id = %s
  103. LIMIT %s
  104. """
  105. params = (
  106. self.TaskStatus.INIT,
  107. self.TaskStatus.PROCESSING,
  108. self.CONFIG_ID,
  109. self.TASK_BATCH,
  110. )
  111. return await self.pool.async_fetch(query=query, params=params)
  112. async def fetch_existing_source_ids(self, source_ids: List[str]) -> set:
  113. """批量查询哪些 source_id 已有进行中或成功的解构任务,用于去重跳过"""
  114. if not source_ids:
  115. return set()
  116. placeholders = ",".join(["%s"] * len(source_ids))
  117. query = f"""
  118. SELECT source_id FROM {TABLE}
  119. WHERE source_id IN ({placeholders})
  120. AND config_id = %s
  121. AND status IN (%s, %s, %s)
  122. """
  123. rows = await self.pool.async_fetch(
  124. query=query,
  125. params=(
  126. *source_ids,
  127. self.CONFIG_ID,
  128. self.TaskStatus.INIT,
  129. self.TaskStatus.PROCESSING,
  130. self.TaskStatus.SUCCESS,
  131. ),
  132. )
  133. return {r["source_id"] for r in rows}
  134. async def fetch_extract_tasks(self) -> List[Dict]:
  135. query = f"""
  136. SELECT id, result FROM {TABLE}
  137. WHERE extract_status = %s AND status = %s AND config_id = %s
  138. """
  139. return await self.pool.async_fetch(
  140. query=query,
  141. params=(self.ExtractStatus.INIT, self.TaskStatus.SUCCESS, self.CONFIG_ID),
  142. )
  143. async def update_extract_status(
  144. self, task_id: int, ori_status: int, new_status: int
  145. ) -> int:
  146. query = f"""
  147. UPDATE {TABLE}
  148. SET extract_status = %s
  149. WHERE extract_status = %s AND id = %s
  150. """
  151. return await self.pool.async_save(
  152. query=query, params=(new_status, ori_status, task_id)
  153. )
  154. async def record_extract_detail(self, decode_task_id: int, detail: Dict) -> int:
  155. query = """
  156. INSERT INTO long_articles_decode_task_detail
  157. (decode_task_id, inspiration, purpose, key_point, topic)
  158. VALUES (%s, %s, %s, %s, %s)
  159. """
  160. return await self.pool.async_save(
  161. query=query,
  162. params=(
  163. decode_task_id,
  164. detail.get("inspiration", ""),
  165. detail.get("purpose", ""),
  166. detail.get("key_point", ""),
  167. detail.get("topic", ""),
  168. ),
  169. )
  170. class AdPlatformArticlesDecodeTaskMapper(ArticlesDecodeTaskMapper):
  171. def __init__(self, pool: DatabaseManager):
  172. super().__init__(pool)
  173. async def update_article_decode_status(
  174. self, id_: int, ori_status: int, new_status: int
  175. ) -> int:
  176. query = """
  177. UPDATE ad_platform_accounts_daily_detail
  178. SET decode_status = %s
  179. WHERE id = %s AND decode_status = %s
  180. """
  181. return await self.pool.async_save(
  182. query=query, params=(new_status, id_, ori_status)
  183. )
  184. async def fetch_decode_articles(self) -> List[Dict]:
  185. query = """
  186. SELECT id, account_name, gh_id, article_title, article_cover,
  187. article_text, article_images, wx_sn
  188. FROM ad_platform_accounts_daily_detail
  189. WHERE fetch_status = %s AND decode_status = %s
  190. LIMIT %s
  191. """
  192. return await self.pool.async_fetch(
  193. query=query,
  194. params=(self.TaskStatus.SUCCESS, self.TaskStatus.INIT, self.TASK_BATCH),
  195. )
  196. class InnerArticlesDecodeTaskMapper(ArticlesDecodeTaskMapper):
  197. TABLE_INNER = "long_articles_decode_articles"
  198. def __init__(self, pool: DatabaseManager):
  199. super().__init__(pool)
  200. async def fetch_inner_articles(self) -> List[Dict]:
  201. query = f"""
  202. SELECT id, title, source_id, coverimgurl, article_text, summary, card_title
  203. FROM {self.TABLE_INNER}
  204. WHERE status = %s
  205. LIMIT %s
  206. """
  207. return await self.pool.async_fetch(
  208. query=query, params=(self.TaskStatus.INIT, self.TASK_BATCH)
  209. )
  210. async def update_inner_article_status(
  211. self, id_: int, ori_status: int, new_status: int
  212. ) -> int:
  213. query = f"""
  214. UPDATE {self.TABLE_INNER}
  215. SET status = %s
  216. WHERE id = %s AND status = %s
  217. """
  218. return await self.pool.async_save(
  219. query=query, params=(new_status, id_, ori_status)
  220. )
  221. async def fetch_inner_articles_produce_detail(self, source_id) -> List[Dict]:
  222. query = """
  223. SELECT produce_module_type, output
  224. FROM produce_plan_module_output
  225. WHERE plan_exe_id = %s
  226. AND produce_module_type IN (1, 2, 4)
  227. """
  228. return await self.pool.async_fetch(
  229. query=query, db_name="aigc", params=(source_id,)
  230. )
  231. __all__ = [
  232. "ArticlesDecodeTaskMapper",
  233. "AdPlatformArticlesDecodeTaskMapper",
  234. "InnerArticlesDecodeTaskMapper",
  235. ]