| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133 |
- from typing import List, Dict
- from app.core.database import DatabaseManager
- from ._const import InnerArticlesDecodeConst
- class InnerArticlesDecodeMapper(InnerArticlesDecodeConst):
- def __init__(self, pool: DatabaseManager):
- self.pool = pool
- # 存储解构任务
- async def record_decode_task(
- self, task_id: str, wx_sn: str, remark: str = None
- ) -> int:
- query = """
- INSERT IGNORE INTO long_articles_decode_tasks (task_id, wx_sn, remark, source)
- VALUES (%s, %s, %s, %s)
- """
- return await self.pool.async_save(
- query=query, params=(task_id, wx_sn, remark, self.SourceType.INNER)
- )
- # 更新解构任务状态
- async def update_decode_task_status(
- self, task_id: str, ori_status: int, new_status: int, remark: str = None
- ) -> int:
- query = """
- UPDATE long_articles_decode_tasks
- SET status = %s, remark = %s
- WHERE task_id = %s AND status = %s;
- """
- return await self.pool.async_save(
- query=query, params=(new_status, remark, task_id, ori_status)
- )
- # 设置解构结果
- async def set_decode_result(
- self, task_id: str, result: str, remark: str = None
- ) -> int:
- query = """
- UPDATE long_articles_decode_tasks
- SET status = %s, remark = %s, result = %s
- WHERE task_id = %s AND status = %s;
- """
- return await self.pool.async_save(
- query=query,
- params=(
- self.TaskStatus.SUCCESS,
- remark,
- result,
- task_id,
- self.TaskStatus.PROCESSING,
- ),
- )
- # 获取内部文章生成信息
- async def fetch_inner_articles_produce_detail(self, source_id) -> List[Dict]:
- query = """
- SELECT produce_module_type, output
- FROM produce_plan_module_output WHERE plan_exe_id = %s
- AND produce_module_type in (1,2,3,4,18);
- """
- return await self.pool.async_fetch(
- query=query, db_name="aigc", params=(source_id,)
- )
- # 获取待拉取结果的解构任务(status=INIT,尚未拿到解构结果)
- async def fetch_decoding_tasks(self) -> List[Dict]:
- query = """
- SELECT task_id FROM long_articles_decode_tasks WHERE status = %s AND source = %s LIMIT %s;
- """
- return await self.pool.async_fetch(
- query=query, params=(self.TaskStatus.INIT, self.SourceType.INNER, self.TASK_BATCH)
- )
- # 获取待解析的任务(获取处理成功的任务)
- async def fetch_extract_tasks(self):
- query = """
- SELECT id, result FROM long_articles_decode_tasks
- WHERE extract_status = %s AND status = %s;
- """
- return await self.pool.async_fetch(
- query=query, params=(self.ExtractStatus.INIT, self.TaskStatus.SUCCESS)
- )
- # 修改解析状态(用于加锁与状态流转)
- async def update_extract_status(self, task_id, ori_status, new_status):
- query = """
- UPDATE long_articles_decode_tasks
- SET extract_status = %s WHERE extract_status = %s AND id = %s;
- """
- return await self.pool.async_save(
- query=query,
- params=(
- new_status,
- ori_status,
- task_id,
- ),
- )
- # 记录解析结果明细到 long_articles_decode_task_detail
- async def record_extract_detail(self, decode_task_id: int, detail: Dict) -> int:
- query = """
- INSERT INTO long_articles_decode_task_detail
- (decode_task_id, inspiration, purpose, key_point, topic)
- VALUES (%s, %s, %s, %s, %s);
- """
- return await self.pool.async_save(
- query=query,
- params=(
- decode_task_id,
- detail.get("inspiration", ""),
- detail.get("purpose", ""),
- detail.get("key_point", ""),
- detail.get("topic", ""),
- ),
- )
- # 获取内部文章
- async def fetch_inner_articles(self):
- query = """
- SELECT source_id, title, wx_sn
- FROM datastat_sort_strategy
- WHERE date_str >= '20260101'
- AND account_type != '服务号'
- AND position = 1
- AND source_id is not null
- GROUP BY source_id
- HAVING sum(view_count) / sum(avg_view_count) >= 1.2
- AND min(read_rate) >= 0.2;
- """
- return await self.pool.async_fetch(query=query)
|