| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240 |
- from typing import Dict, List
- from app.core.database import DatabaseManager
- from ._const import DecodeArticleConst
- TABLE = "long_articles_decode_tasks_v2"
- class ArticlesDecodeTaskMapper(DecodeArticleConst):
- def __init__(self, pool: DatabaseManager):
- self.pool = pool
- async def insert_decode_task(
- self,
- channel_content_id: str,
- content_id: str,
- source: int,
- payload: str,
- remark: str = None,
- ) -> int:
- query = f"""
- INSERT IGNORE INTO {TABLE}
- (channel_content_id, config_id, content_id, source, payload, remark)
- VALUES (%s, %s, %s, %s, %s, %s)
- """
- return await self.pool.async_save(
- query=query,
- params=(
- channel_content_id,
- self.CONFIG_ID,
- content_id,
- source,
- payload,
- remark,
- ),
- )
- async def update_task_status_by_channel(
- self,
- channel_content_id: str,
- ori_status: int,
- new_status: int,
- remark: str = None,
- ) -> int:
- query = f"""
- UPDATE {TABLE}
- SET status = %s, remark = %s
- WHERE channel_content_id = %s AND status = %s AND config_id = %s
- """
- return await self.pool.async_save(
- query=query,
- params=(new_status, remark, channel_content_id, ori_status, self.CONFIG_ID),
- )
- async def set_decode_result(
- self,
- channel_content_id: str,
- result: str,
- remark: str = None,
- ) -> int:
- query = f"""
- UPDATE {TABLE}
- SET status = %s, result = %s, remark = %s
- WHERE channel_content_id = %s AND status IN (%s, %s) AND config_id = %s
- """
- return await self.pool.async_save(
- query=query,
- params=(
- self.TaskStatus.SUCCESS,
- result,
- remark,
- channel_content_id,
- self.TaskStatus.INIT,
- self.TaskStatus.PROCESSING,
- self.CONFIG_ID,
- ),
- )
- async def fetch_pending_tasks(
- self, source: int = None
- ) -> List[Dict]:
- if source is not None:
- query = f"""
- SELECT channel_content_id, content_id
- FROM {TABLE}
- WHERE status = %s AND source = %s AND config_id = %s
- LIMIT %s
- """
- params = (self.TaskStatus.INIT, source, self.CONFIG_ID, self.TASK_BATCH)
- else:
- query = f"""
- SELECT channel_content_id, content_id
- FROM {TABLE}
- WHERE status = %s AND config_id = %s
- LIMIT %s
- """
- params = (self.TaskStatus.INIT, self.CONFIG_ID, self.TASK_BATCH)
- return await self.pool.async_fetch(query=query, params=params)
- async def fetch_existing_channel_content_ids(
- self, channel_content_ids: List[str]
- ) -> set:
- """批量查询哪些 channel_content_id 已有成功解构结果,用于去重跳过"""
- if not channel_content_ids:
- return set()
- placeholders = ",".join(["%s"] * len(channel_content_ids))
- query = f"""
- SELECT channel_content_id FROM {TABLE}
- WHERE channel_content_id IN ({placeholders})
- AND config_id = %s
- AND status = %s
- """
- rows = await self.pool.async_fetch(
- query=query,
- params=(*channel_content_ids, self.CONFIG_ID, self.TaskStatus.SUCCESS),
- )
- return {r["channel_content_id"] for r in rows}
- async def fetch_extract_tasks(self) -> List[Dict]:
- query = f"""
- SELECT id, result FROM {TABLE}
- WHERE extract_status = %s AND status = %s AND config_id = %s
- """
- return await self.pool.async_fetch(
- query=query,
- params=(self.ExtractStatus.INIT, self.TaskStatus.SUCCESS, self.CONFIG_ID),
- )
- async def update_extract_status(
- self, task_id: int, ori_status: int, new_status: int
- ) -> int:
- query = f"""
- UPDATE {TABLE}
- SET extract_status = %s
- WHERE extract_status = %s AND id = %s
- """
- return await self.pool.async_save(
- query=query, params=(new_status, ori_status, task_id)
- )
- async def record_extract_detail(
- self, decode_task_id: int, detail: Dict
- ) -> int:
- query = """
- INSERT INTO long_articles_decode_task_detail_v2
- (decode_task_id, inspiration, purpose, key_point, topic)
- VALUES (%s, %s, %s, %s, %s)
- """
- return await self.pool.async_save(
- query=query,
- params=(
- decode_task_id,
- detail.get("inspiration", ""),
- detail.get("purpose", ""),
- detail.get("key_point", ""),
- detail.get("topic", ""),
- ),
- )
- class AdPlatformArticlesDecodeTaskMapper(ArticlesDecodeTaskMapper):
- def __init__(self, pool: DatabaseManager):
- super().__init__(pool)
- async def update_article_decode_status(
- self, id_: int, ori_status: int, new_status: int
- ) -> int:
- query = """
- UPDATE ad_platform_accounts_daily_detail
- SET decode_status = %s
- WHERE id = %s AND decode_status = %s
- """
- return await self.pool.async_save(
- query=query, params=(new_status, id_, ori_status)
- )
- async def fetch_decode_articles(self) -> List[Dict]:
- query = """
- SELECT id, account_name, gh_id, article_title, article_cover,
- article_text, article_images, wx_sn
- FROM ad_platform_accounts_daily_detail
- WHERE fetch_status = %s AND decode_status = %s
- LIMIT %s
- """
- return await self.pool.async_fetch(
- query=query,
- params=(self.TaskStatus.SUCCESS, self.TaskStatus.INIT, self.TASK_BATCH),
- )
- class InnerArticlesDecodeTaskMapper(ArticlesDecodeTaskMapper):
- TABLE_INNER = "long_articles_decode_articles"
- def __init__(self, pool: DatabaseManager):
- super().__init__(pool)
- async def fetch_inner_articles(self) -> List[Dict]:
- query = f"""
- SELECT id, title, source_id, wx_sn, coverimgurl, article_text, summary, card_title
- FROM {self.TABLE_INNER}
- WHERE status = %s
- ORDER BY max_read_rate DESC
- LIMIT %s
- """
- return await self.pool.async_fetch(
- query=query, params=(self.TaskStatus.INIT, self.TASK_BATCH)
- )
- async def update_inner_article_status(
- self, id_: int, ori_status: int, new_status: int
- ) -> int:
- query = f"""
- UPDATE {self.TABLE_INNER}
- SET status = %s
- WHERE id = %s AND status = %s
- """
- return await self.pool.async_save(
- query=query, params=(new_status, id_, ori_status)
- )
- async def fetch_inner_articles_produce_detail(
- self, source_id
- ) -> List[Dict]:
- query = """
- SELECT produce_module_type, output
- FROM produce_plan_module_output
- WHERE plan_exe_id = %s
- AND produce_module_type IN (1, 2, 4)
- """
- return await self.pool.async_fetch(
- query=query, db_name="aigc", params=(source_id,)
- )
- __all__ = [
- "ArticlesDecodeTaskMapper",
- "AdPlatformArticlesDecodeTaskMapper",
- "InnerArticlesDecodeTaskMapper",
- ]
|