_mapper.py 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179
  1. from typing import List, Dict
  2. from app.core.database import DatabaseManager
  3. from ._const import DecodeTaskConst
  4. class ArticlesDecodeTaskMapper(DecodeTaskConst):
  5. def __init__(self, pool: DatabaseManager):
  6. self.pool = pool
  7. # 存储解构任务
  8. async def record_decode_task(
  9. self, task_id: str, wx_sn: str, remark: str = None
  10. ) -> int:
  11. query = """
  12. INSERT INTO long_articles_decode_tasks (task_id, wx_sn, remark)
  13. VALUES (%s, %s, %s)
  14. """
  15. return await self.pool.async_save(query=query, params=(task_id, wx_sn, remark))
  16. # 更新解构任务状态
  17. async def update_decode_task_status(
  18. self, task_id: str, ori_status: int, new_status: int, remark: str = None
  19. ) -> int:
  20. query = """
  21. UPDATE long_articles_decode_tasks
  22. SET status = %s, remark = %s
  23. WHERE task_id = %s AND status = %s;
  24. """
  25. return await self.pool.async_save(
  26. query=query, params=(new_status, remark, task_id, ori_status)
  27. )
  28. # 设置解构结果
  29. async def set_decode_result(
  30. self, task_id: str, result: str, remark: str = None
  31. ) -> int:
  32. query = """
  33. UPDATE long_articles_decode_tasks
  34. SET status = %s, remark = %s, result = %s
  35. WHERE task_id = %s AND status = %s;
  36. """
  37. return await self.pool.async_save(
  38. query=query,
  39. params=(
  40. self.TaskStatus.SUCCESS,
  41. remark,
  42. result,
  43. task_id,
  44. self.TaskStatus.PROCESSING,
  45. ),
  46. )
  47. # 获取待拉取结果的解构任务(status=INIT,尚未拿到解构结果)
  48. async def fetch_decoding_tasks(self) -> List[Dict]:
  49. query = """
  50. SELECT task_id FROM long_articles_decode_tasks WHERE status = %s LIMIT %s;
  51. """
  52. return await self.pool.async_fetch(
  53. query=query, params=(self.TaskStatus.INIT, self.TASK_BATCH)
  54. )
  55. # 获取待解析的任务(获取处理成功的任务)
  56. async def fetch_extract_tasks(self):
  57. query = """
  58. SELECT id, result FROM long_articles_decode_tasks
  59. WHERE extract_status = %s AND status = %s;
  60. """
  61. return await self.pool.async_fetch(
  62. query=query, params=(self.ExtractStatus.INIT, self.TaskStatus.SUCCESS)
  63. )
  64. # 修改解析状态(用于加锁与状态流转)
  65. async def update_extract_status(self, task_id, ori_status, new_status):
  66. query = """
  67. UPDATE long_articles_decode_tasks
  68. SET extract_status = %s WHERE extract_status = %s AND id = %s;
  69. """
  70. return await self.pool.async_save(
  71. query=query,
  72. params=(
  73. new_status,
  74. ori_status,
  75. task_id,
  76. ),
  77. )
  78. # 记录解析结果明细到 long_articles_decode_task_detail
  79. async def record_extract_detail(self, decode_task_id: int, detail: Dict) -> int:
  80. query = """
  81. INSERT INTO long_articles_decode_task_detail
  82. (decode_task_id, inspiration, purpose, key_point, topic)
  83. VALUES (%s, %s, %s, %s, %s);
  84. """
  85. return await self.pool.async_save(
  86. query=query,
  87. params=(
  88. decode_task_id,
  89. detail.get("inspiration", ""),
  90. detail.get("purpose", ""),
  91. detail.get("key_point", ""),
  92. detail.get("topic", ""),
  93. ),
  94. )
  95. class AdPlatformArticlesDecodeTaskMapper(ArticlesDecodeTaskMapper):
  96. def __init__(self, pool: DatabaseManager):
  97. super().__init__(pool)
  98. # 修改文章解构状态
  99. async def update_article_decode_status(
  100. self, id_: int, ori_status: int, new_status: int
  101. ) -> int:
  102. query = """
  103. UPDATE ad_platform_accounts_daily_detail
  104. SET decode_status = %s
  105. WHERE id = %s AND decode_status = %s;
  106. """
  107. return await self.pool.async_save(
  108. query=query, params=(new_status, id_, ori_status)
  109. )
  110. # 获取待解构文章
  111. async def fetch_decode_articles(self) -> List[Dict]:
  112. query = """
  113. SELECT id, account_name, gh_id, article_title, article_cover,
  114. article_text, article_images, wx_sn
  115. FROM ad_platform_accounts_daily_detail WHERE fetch_status = %s AND decode_status = %s
  116. LIMIT %s;
  117. """
  118. return await self.pool.async_fetch(
  119. query=query,
  120. params=(self.TaskStatus.SUCCESS, self.TaskStatus.INIT, self.TASK_BATCH),
  121. )
  122. class InnerArticlesDecodeTaskMapper(ArticlesDecodeTaskMapper):
  123. def __init__(self, pool: DatabaseManager):
  124. super().__init__(pool)
  125. # 获取内部文章
  126. async def fetch_inner_articles(self):
  127. query = """
  128. SELECT title
  129. ,SUM(fans) AS total_fans
  130. ,SUM(view_count) AS total_view
  131. ,SUM(view_count) / SUM(fans) AS avg_read_rate
  132. ,SUM(first_level) AS total_first_level
  133. ,MAX(source_id) as source_id
  134. ,MAX(wx_sn) as wx_sn
  135. FROM datastat_sort_strategy
  136. WHERE date_str >= '20250101'
  137. GROUP BY title
  138. HAVING total_fans > 100000
  139. AND avg_read_rate > 0.008
  140. AND total_first_level > 0;
  141. """
  142. return await self.pool.async_fetch(query=query)
  143. # 获取内部文章生成信息
  144. async def fetch_inner_articles_produce_detail(self, source_id) -> List[Dict]:
  145. query = """
  146. SELECT produce_module_type, output
  147. FROM produce_plan_module_output WHERE plan_exe_id = %s
  148. AND produce_module_type in (1,2,3,4,18);
  149. """
  150. return await self.pool.async_fetch(
  151. query=query, db_name="aigc", params=(source_id,)
  152. )
  153. __all__ = [
  154. "ArticlesDecodeTaskMapper",
  155. "AdPlatformArticlesDecodeTaskMapper",
  156. "InnerArticlesDecodeTaskMapper",
  157. ]