_mapper.py 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133
  1. from typing import List, Dict
  2. from app.core.database import DatabaseManager
  3. from ._const import InnerArticlesDecodeConst
  4. class InnerArticlesDecodeMapper(InnerArticlesDecodeConst):
  5. def __init__(self, pool: DatabaseManager):
  6. self.pool = pool
  7. # 存储解构任务
  8. async def record_decode_task(
  9. self, task_id: str, wx_sn: str, remark: str = None
  10. ) -> int:
  11. query = """
  12. INSERT IGNORE INTO long_articles_decode_tasks (task_id, wx_sn, remark, source)
  13. VALUES (%s, %s, %s, %s)
  14. """
  15. return await self.pool.async_save(
  16. query=query, params=(task_id, wx_sn, remark, self.SourceType.INNER)
  17. )
  18. # 更新解构任务状态
  19. async def update_decode_task_status(
  20. self, task_id: str, ori_status: int, new_status: int, remark: str = None
  21. ) -> int:
  22. query = """
  23. UPDATE long_articles_decode_tasks
  24. SET status = %s, remark = %s
  25. WHERE task_id = %s AND status = %s;
  26. """
  27. return await self.pool.async_save(
  28. query=query, params=(new_status, remark, task_id, ori_status)
  29. )
  30. # 设置解构结果
  31. async def set_decode_result(
  32. self, task_id: str, result: str, remark: str = None
  33. ) -> int:
  34. query = """
  35. UPDATE long_articles_decode_tasks
  36. SET status = %s, remark = %s, result = %s
  37. WHERE task_id = %s AND status = %s;
  38. """
  39. return await self.pool.async_save(
  40. query=query,
  41. params=(
  42. self.TaskStatus.SUCCESS,
  43. remark,
  44. result,
  45. task_id,
  46. self.TaskStatus.PROCESSING,
  47. ),
  48. )
  49. # 获取内部文章生成信息
  50. async def fetch_inner_articles_produce_detail(self, source_id) -> List[Dict]:
  51. query = """
  52. SELECT produce_module_type, output
  53. FROM produce_plan_module_output WHERE plan_exe_id = %s
  54. AND produce_module_type in (1,2,3,4,18);
  55. """
  56. return await self.pool.async_fetch(
  57. query=query, db_name="aigc", params=(source_id,)
  58. )
  59. # 获取待拉取结果的解构任务(status=INIT,尚未拿到解构结果)
  60. async def fetch_decoding_tasks(self) -> List[Dict]:
  61. query = """
  62. SELECT task_id FROM long_articles_decode_tasks WHERE status = %s AND source = %s LIMIT %s;
  63. """
  64. return await self.pool.async_fetch(
  65. query=query, params=(self.TaskStatus.INIT, self.SourceType.INNER, self.TASK_BATCH)
  66. )
  67. # 获取待解析的任务(获取处理成功的任务)
  68. async def fetch_extract_tasks(self):
  69. query = """
  70. SELECT id, result FROM long_articles_decode_tasks
  71. WHERE extract_status = %s AND status = %s;
  72. """
  73. return await self.pool.async_fetch(
  74. query=query, params=(self.ExtractStatus.INIT, self.TaskStatus.SUCCESS)
  75. )
  76. # 修改解析状态(用于加锁与状态流转)
  77. async def update_extract_status(self, task_id, ori_status, new_status):
  78. query = """
  79. UPDATE long_articles_decode_tasks
  80. SET extract_status = %s WHERE extract_status = %s AND id = %s;
  81. """
  82. return await self.pool.async_save(
  83. query=query,
  84. params=(
  85. new_status,
  86. ori_status,
  87. task_id,
  88. ),
  89. )
  90. # 记录解析结果明细到 long_articles_decode_task_detail
  91. async def record_extract_detail(self, decode_task_id: int, detail: Dict) -> int:
  92. query = """
  93. INSERT INTO long_articles_decode_task_detail
  94. (decode_task_id, inspiration, purpose, key_point, topic)
  95. VALUES (%s, %s, %s, %s, %s);
  96. """
  97. return await self.pool.async_save(
  98. query=query,
  99. params=(
  100. decode_task_id,
  101. detail.get("inspiration", ""),
  102. detail.get("purpose", ""),
  103. detail.get("key_point", ""),
  104. detail.get("topic", ""),
  105. ),
  106. )
  107. # 获取内部文章
  108. async def fetch_inner_articles(self):
  109. query = """
  110. SELECT source_id, title, wx_sn
  111. FROM datastat_sort_strategy
  112. WHERE date_str >= '20260101'
  113. AND account_type != '服务号'
  114. AND position = 1
  115. AND source_id is not null
  116. GROUP BY source_id
  117. HAVING sum(view_count) / sum(avg_view_count) >= 1.2
  118. AND min(read_rate) >= 0.2;
  119. """
  120. return await self.pool.async_fetch(query=query)