_mapper.py 7.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240
  1. from typing import Dict, List
  2. from app.core.database import DatabaseManager
  3. from ._const import DecodeArticleConst
  4. TABLE = "long_articles_decode_tasks_v2"
  5. class ArticlesDecodeTaskMapper(DecodeArticleConst):
  6. def __init__(self, pool: DatabaseManager):
  7. self.pool = pool
  8. async def insert_decode_task(
  9. self,
  10. channel_content_id: str,
  11. content_id: str,
  12. source: int,
  13. payload: str,
  14. remark: str = None,
  15. ) -> int:
  16. query = f"""
  17. INSERT IGNORE INTO {TABLE}
  18. (channel_content_id, config_id, content_id, source, payload, remark)
  19. VALUES (%s, %s, %s, %s, %s, %s)
  20. """
  21. return await self.pool.async_save(
  22. query=query,
  23. params=(
  24. channel_content_id,
  25. self.CONFIG_ID,
  26. content_id,
  27. source,
  28. payload,
  29. remark,
  30. ),
  31. )
  32. async def update_task_status_by_channel(
  33. self,
  34. channel_content_id: str,
  35. ori_status: int,
  36. new_status: int,
  37. remark: str = None,
  38. ) -> int:
  39. query = f"""
  40. UPDATE {TABLE}
  41. SET status = %s, remark = %s
  42. WHERE channel_content_id = %s AND status = %s AND config_id = %s
  43. """
  44. return await self.pool.async_save(
  45. query=query,
  46. params=(new_status, remark, channel_content_id, ori_status, self.CONFIG_ID),
  47. )
  48. async def set_decode_result(
  49. self,
  50. channel_content_id: str,
  51. result: str,
  52. remark: str = None,
  53. ) -> int:
  54. query = f"""
  55. UPDATE {TABLE}
  56. SET status = %s, result = %s, remark = %s
  57. WHERE channel_content_id = %s AND status IN (%s, %s) AND config_id = %s
  58. """
  59. return await self.pool.async_save(
  60. query=query,
  61. params=(
  62. self.TaskStatus.SUCCESS,
  63. result,
  64. remark,
  65. channel_content_id,
  66. self.TaskStatus.INIT,
  67. self.TaskStatus.PROCESSING,
  68. self.CONFIG_ID,
  69. ),
  70. )
  71. async def fetch_pending_tasks(
  72. self, source: int = None
  73. ) -> List[Dict]:
  74. if source is not None:
  75. query = f"""
  76. SELECT channel_content_id, content_id
  77. FROM {TABLE}
  78. WHERE status = %s AND source = %s AND config_id = %s
  79. LIMIT %s
  80. """
  81. params = (self.TaskStatus.INIT, source, self.CONFIG_ID, self.TASK_BATCH)
  82. else:
  83. query = f"""
  84. SELECT channel_content_id, content_id
  85. FROM {TABLE}
  86. WHERE status = %s AND config_id = %s
  87. LIMIT %s
  88. """
  89. params = (self.TaskStatus.INIT, self.CONFIG_ID, self.TASK_BATCH)
  90. return await self.pool.async_fetch(query=query, params=params)
  91. async def fetch_existing_channel_content_ids(
  92. self, channel_content_ids: List[str]
  93. ) -> set:
  94. """批量查询哪些 channel_content_id 已有成功解构结果,用于去重跳过"""
  95. if not channel_content_ids:
  96. return set()
  97. placeholders = ",".join(["%s"] * len(channel_content_ids))
  98. query = f"""
  99. SELECT channel_content_id FROM {TABLE}
  100. WHERE channel_content_id IN ({placeholders})
  101. AND config_id = %s
  102. AND status = %s
  103. """
  104. rows = await self.pool.async_fetch(
  105. query=query,
  106. params=(*channel_content_ids, self.CONFIG_ID, self.TaskStatus.SUCCESS),
  107. )
  108. return {r["channel_content_id"] for r in rows}
  109. async def fetch_extract_tasks(self) -> List[Dict]:
  110. query = f"""
  111. SELECT id, result FROM {TABLE}
  112. WHERE extract_status = %s AND status = %s AND config_id = %s
  113. """
  114. return await self.pool.async_fetch(
  115. query=query,
  116. params=(self.ExtractStatus.INIT, self.TaskStatus.SUCCESS, self.CONFIG_ID),
  117. )
  118. async def update_extract_status(
  119. self, task_id: int, ori_status: int, new_status: int
  120. ) -> int:
  121. query = f"""
  122. UPDATE {TABLE}
  123. SET extract_status = %s
  124. WHERE extract_status = %s AND id = %s
  125. """
  126. return await self.pool.async_save(
  127. query=query, params=(new_status, ori_status, task_id)
  128. )
  129. async def record_extract_detail(
  130. self, decode_task_id: int, detail: Dict
  131. ) -> int:
  132. query = """
  133. INSERT INTO long_articles_decode_task_detail_v2
  134. (decode_task_id, inspiration, purpose, key_point, topic)
  135. VALUES (%s, %s, %s, %s, %s)
  136. """
  137. return await self.pool.async_save(
  138. query=query,
  139. params=(
  140. decode_task_id,
  141. detail.get("inspiration", ""),
  142. detail.get("purpose", ""),
  143. detail.get("key_point", ""),
  144. detail.get("topic", ""),
  145. ),
  146. )
  147. class AdPlatformArticlesDecodeTaskMapper(ArticlesDecodeTaskMapper):
  148. def __init__(self, pool: DatabaseManager):
  149. super().__init__(pool)
  150. async def update_article_decode_status(
  151. self, id_: int, ori_status: int, new_status: int
  152. ) -> int:
  153. query = """
  154. UPDATE ad_platform_accounts_daily_detail
  155. SET decode_status = %s
  156. WHERE id = %s AND decode_status = %s
  157. """
  158. return await self.pool.async_save(
  159. query=query, params=(new_status, id_, ori_status)
  160. )
  161. async def fetch_decode_articles(self) -> List[Dict]:
  162. query = """
  163. SELECT id, account_name, gh_id, article_title, article_cover,
  164. article_text, article_images, wx_sn
  165. FROM ad_platform_accounts_daily_detail
  166. WHERE fetch_status = %s AND decode_status = %s
  167. LIMIT %s
  168. """
  169. return await self.pool.async_fetch(
  170. query=query,
  171. params=(self.TaskStatus.SUCCESS, self.TaskStatus.INIT, self.TASK_BATCH),
  172. )
  173. class InnerArticlesDecodeTaskMapper(ArticlesDecodeTaskMapper):
  174. TABLE_INNER = "long_articles_decode_articles"
  175. def __init__(self, pool: DatabaseManager):
  176. super().__init__(pool)
  177. async def fetch_inner_articles(self) -> List[Dict]:
  178. query = f"""
  179. SELECT id, title, source_id, wx_sn, coverimgurl, article_text, summary, card_title
  180. FROM {self.TABLE_INNER}
  181. WHERE status = %s
  182. ORDER BY max_read_rate DESC
  183. LIMIT %s
  184. """
  185. return await self.pool.async_fetch(
  186. query=query, params=(self.TaskStatus.INIT, self.TASK_BATCH)
  187. )
  188. async def update_inner_article_status(
  189. self, id_: int, ori_status: int, new_status: int
  190. ) -> int:
  191. query = f"""
  192. UPDATE {self.TABLE_INNER}
  193. SET status = %s
  194. WHERE id = %s AND status = %s
  195. """
  196. return await self.pool.async_save(
  197. query=query, params=(new_status, id_, ori_status)
  198. )
  199. async def fetch_inner_articles_produce_detail(
  200. self, source_id
  201. ) -> List[Dict]:
  202. query = """
  203. SELECT produce_module_type, output
  204. FROM produce_plan_module_output
  205. WHERE plan_exe_id = %s
  206. AND produce_module_type IN (1, 2, 4)
  207. """
  208. return await self.pool.async_fetch(
  209. query=query, db_name="aigc", params=(source_id,)
  210. )
  211. __all__ = [
  212. "ArticlesDecodeTaskMapper",
  213. "AdPlatformArticlesDecodeTaskMapper",
  214. "InnerArticlesDecodeTaskMapper",
  215. ]