_mapper.py 9.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292
  1. from typing import Dict, List
  2. from app.core.database import DatabaseManager
  3. from ._const import DecodeArticleConst
  4. TABLE = "long_articles_decode_tasks"
  5. class ArticlesDecodeTaskMapper(DecodeArticleConst):
  6. def __init__(self, pool: DatabaseManager):
  7. self.pool = pool
  8. async def insert_decode_task(
  9. self,
  10. source_id: str,
  11. source: int,
  12. payload: str,
  13. remark: str = None,
  14. status: int = None,
  15. channel: int = None,
  16. ) -> int:
  17. ch = channel if channel is not None else self.TaskChannel.ARTICLE
  18. if status is not None:
  19. query = f"""
  20. INSERT IGNORE INTO {TABLE}
  21. (source_id, config_id, source, channel, payload, remark, status)
  22. VALUES (%s, %s, %s, %s, %s, %s, %s)
  23. """
  24. params = (
  25. source_id,
  26. self.CONFIG_ID,
  27. source,
  28. ch,
  29. payload,
  30. remark,
  31. status,
  32. )
  33. else:
  34. query = f"""
  35. INSERT IGNORE INTO {TABLE}
  36. (source_id, config_id, source, channel, payload, remark)
  37. VALUES (%s, %s, %s, %s, %s, %s)
  38. """
  39. params = (
  40. source_id,
  41. self.CONFIG_ID,
  42. source,
  43. ch,
  44. payload,
  45. remark,
  46. )
  47. return await self.pool.async_save(query=query, params=params)
  48. async def update_task_status_by_source_id(
  49. self,
  50. source_id: str,
  51. ori_status: int,
  52. new_status: int,
  53. remark: str = None,
  54. ) -> int:
  55. query = f"""
  56. UPDATE {TABLE}
  57. SET status = %s, remark = %s
  58. WHERE source_id = %s AND status = %s AND config_id = %s
  59. """
  60. return await self.pool.async_save(
  61. query=query,
  62. params=(new_status, remark, source_id, ori_status, self.CONFIG_ID),
  63. )
  64. async def set_decode_result(
  65. self,
  66. source_id: str,
  67. result: str,
  68. remark: str = None,
  69. ) -> int:
  70. query = f"""
  71. UPDATE {TABLE}
  72. SET status = %s, result = %s, remark = %s
  73. WHERE source_id = %s AND status IN (%s, %s) AND config_id = %s
  74. """
  75. return await self.pool.async_save(
  76. query=query,
  77. params=(
  78. self.TaskStatus.SUCCESS,
  79. result,
  80. remark,
  81. source_id,
  82. self.TaskStatus.INIT,
  83. self.TaskStatus.PROCESSING,
  84. self.CONFIG_ID,
  85. ),
  86. )
  87. async def count_pending_tasks(self, source: int = None) -> int:
  88. """统计队列中正在进行(INIT + PROCESSING)的任务数量,用于控制消费端压力"""
  89. if source is not None:
  90. query = f"""
  91. SELECT COUNT(*) AS cnt
  92. FROM {TABLE}
  93. WHERE status IN (%s, %s) AND source = %s AND config_id = %s
  94. """
  95. params = (
  96. self.TaskStatus.INIT,
  97. self.TaskStatus.PROCESSING,
  98. source,
  99. self.CONFIG_ID,
  100. )
  101. else:
  102. query = f"""
  103. SELECT COUNT(*) AS cnt
  104. FROM {TABLE}
  105. WHERE status IN (%s, %s) AND config_id = %s
  106. """
  107. params = (
  108. self.TaskStatus.INIT,
  109. self.TaskStatus.PROCESSING,
  110. self.CONFIG_ID,
  111. )
  112. rows = await self.pool.async_fetch(query=query, params=params)
  113. return rows[0]["cnt"] if rows else 0
  114. async def fetch_pending_tasks(self, source: int = None) -> List[Dict]:
  115. if source is not None:
  116. query = f"""
  117. SELECT source_id
  118. FROM {TABLE}
  119. WHERE status IN (%s, %s) AND source = %s AND config_id = %s
  120. LIMIT %s
  121. """
  122. params = (
  123. self.TaskStatus.INIT,
  124. self.TaskStatus.PROCESSING,
  125. source,
  126. self.CONFIG_ID,
  127. self.TASK_BATCH,
  128. )
  129. else:
  130. query = f"""
  131. SELECT source_id
  132. FROM {TABLE}
  133. WHERE status IN (%s, %s) AND config_id = %s
  134. LIMIT %s
  135. """
  136. params = (
  137. self.TaskStatus.INIT,
  138. self.TaskStatus.PROCESSING,
  139. self.CONFIG_ID,
  140. self.TASK_BATCH,
  141. )
  142. return await self.pool.async_fetch(query=query, params=params)
  143. async def fetch_existing_source_ids(self, source_ids: List[str]) -> set:
  144. """批量查询哪些 source_id 已有进行中或成功的解构任务,用于去重跳过"""
  145. if not source_ids:
  146. return set()
  147. placeholders = ",".join(["%s"] * len(source_ids))
  148. query = f"""
  149. SELECT source_id FROM {TABLE}
  150. WHERE source_id IN ({placeholders})
  151. AND config_id = %s
  152. AND status IN (%s, %s, %s)
  153. """
  154. rows = await self.pool.async_fetch(
  155. query=query,
  156. params=(
  157. *source_ids,
  158. self.CONFIG_ID,
  159. self.TaskStatus.INIT,
  160. self.TaskStatus.PROCESSING,
  161. self.TaskStatus.SUCCESS,
  162. ),
  163. )
  164. return {r["source_id"] for r in rows}
  165. async def fetch_extract_tasks(self) -> List[Dict]:
  166. query = f"""
  167. SELECT id, result FROM {TABLE}
  168. WHERE extract_status = %s AND status = %s AND config_id = %s
  169. """
  170. return await self.pool.async_fetch(
  171. query=query,
  172. params=(self.ExtractStatus.INIT, self.TaskStatus.SUCCESS, self.CONFIG_ID),
  173. )
  174. async def update_extract_status(
  175. self, task_id: int, ori_status: int, new_status: int
  176. ) -> int:
  177. query = f"""
  178. UPDATE {TABLE}
  179. SET extract_status = %s
  180. WHERE extract_status = %s AND id = %s
  181. """
  182. return await self.pool.async_save(
  183. query=query, params=(new_status, ori_status, task_id)
  184. )
  185. async def record_extract_detail(self, decode_task_id: int, detail: Dict) -> int:
  186. query = """
  187. INSERT INTO long_articles_decode_task_detail
  188. (decode_task_id, inspiration, purpose, key_point, topic)
  189. VALUES (%s, %s, %s, %s, %s)
  190. """
  191. return await self.pool.async_save(
  192. query=query,
  193. params=(
  194. decode_task_id,
  195. detail.get("inspiration", ""),
  196. detail.get("purpose", ""),
  197. detail.get("key_point", ""),
  198. detail.get("topic", ""),
  199. ),
  200. )
  201. class AdPlatformArticlesDecodeTaskMapper(ArticlesDecodeTaskMapper):
  202. def __init__(self, pool: DatabaseManager):
  203. super().__init__(pool)
  204. async def update_article_decode_status(
  205. self, id_: int, ori_status: int, new_status: int
  206. ) -> int:
  207. query = """
  208. UPDATE ad_platform_accounts_daily_detail
  209. SET decode_status = %s
  210. WHERE id = %s AND decode_status = %s
  211. """
  212. return await self.pool.async_save(
  213. query=query, params=(new_status, id_, ori_status)
  214. )
  215. async def fetch_decode_articles(self, limit: int = None) -> List[Dict]:
  216. query = """
  217. SELECT id, account_name, gh_id, article_title, article_cover,
  218. article_text, article_images, wx_sn
  219. FROM ad_platform_accounts_daily_detail
  220. WHERE fetch_status = %s AND decode_status = %s
  221. LIMIT %s
  222. """
  223. return await self.pool.async_fetch(
  224. query=query,
  225. params=(self.TaskStatus.SUCCESS, self.TaskStatus.INIT, limit if limit is not None else self.TASK_BATCH),
  226. )
  227. class InnerArticlesDecodeTaskMapper(ArticlesDecodeTaskMapper):
  228. TABLE_INNER = "long_articles_decode_articles"
  229. def __init__(self, pool: DatabaseManager):
  230. super().__init__(pool)
  231. async def fetch_inner_articles(self, limit: int = None) -> List[Dict]:
  232. query = f"""
  233. SELECT id, title, source_id, coverimgurl, article_text, summary, card_title
  234. FROM {self.TABLE_INNER}
  235. WHERE status = %s
  236. LIMIT %s
  237. """
  238. return await self.pool.async_fetch(
  239. query=query, params=(self.TaskStatus.INIT, limit if limit is not None else self.TASK_BATCH),
  240. )
  241. async def update_inner_article_status(
  242. self, id_: int, ori_status: int, new_status: int
  243. ) -> int:
  244. query = f"""
  245. UPDATE {self.TABLE_INNER}
  246. SET status = %s
  247. WHERE id = %s AND status = %s
  248. """
  249. return await self.pool.async_save(
  250. query=query, params=(new_status, id_, ori_status)
  251. )
  252. async def fetch_inner_articles_produce_detail(self, source_id) -> List[Dict]:
  253. query = """
  254. SELECT produce_module_type, output
  255. FROM produce_plan_module_output
  256. WHERE plan_exe_id = %s
  257. AND produce_module_type IN (1, 2, 4)
  258. """
  259. return await self.pool.async_fetch(
  260. query=query, db_name="aigc", params=(source_id,)
  261. )
  262. __all__ = [
  263. "ArticlesDecodeTaskMapper",
  264. "AdPlatformArticlesDecodeTaskMapper",
  265. "InnerArticlesDecodeTaskMapper",
  266. ]