| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194 |
- from typing import Dict, List
- from app.core.database import DatabaseManager
- from ._const import DecodeCardConst
- TABLE_SOURCE = "auto_reply_top_cards_daily"
- TABLE_TASK = "long_articles_decode_tasks"
- class CardDecodeTaskMapper(DecodeCardConst):
- """卡片解构 Mapper — 操作 auto_reply_top_cards_daily 与 long_articles_decode_tasks"""
- def __init__(self, pool: DatabaseManager):
- self.pool = pool
- # ——— auto_reply_top_cards_daily ———
- async def fetch_cards(self) -> List[Dict]:
- """获取待解构卡片:status=INIT"""
- query = f"""
- SELECT id, channel, share_cover, share_title, card_cover_id
- FROM {TABLE_SOURCE}
- WHERE status = %s
- LIMIT %s
- """
- return await self.pool.async_fetch(
- query=query, params=(self.TaskStatus.INIT, self.TASK_BATCH)
- )
- async def update_card_status(
- self, id_: int, ori_status: int, new_status: int
- ) -> int:
- """更新卡片解构状态(乐观锁)"""
- query = f"""
- UPDATE {TABLE_SOURCE}
- SET status = %s
- WHERE id = %s AND status = %s
- """
- return await self.pool.async_save(
- query=query, params=(new_status, id_, ori_status)
- )
- async def set_card_cover_id(self, id_: int, cover_id: str) -> int:
- """回填 card_cover_id"""
- query = f"""
- UPDATE {TABLE_SOURCE}
- SET card_cover_id = %s
- WHERE id = %s
- """
- return await self.pool.async_save(query=query, params=(cover_id, id_))
- # ——— long_articles_decode_tasks ———
- async def insert_decode_task(
- self,
- source_id: str,
- config_id: int,
- source: int,
- payload: str,
- remark: str = None,
- status: int = None,
- ) -> int:
- if status is not None:
- query = f"""
- INSERT IGNORE INTO {TABLE_TASK}
- (source_id, config_id, source, channel, payload, remark, status)
- VALUES (%s, %s, %s, %s, %s, %s, %s)
- """
- params = (
- source_id,
- config_id,
- source,
- self.TaskChannel.PARTNER_CARD,
- payload,
- remark,
- status,
- )
- else:
- query = f"""
- INSERT IGNORE INTO {TABLE_TASK}
- (source_id, config_id, source, channel, payload, remark)
- VALUES (%s, %s, %s, %s, %s, %s)
- """
- params = (
- source_id,
- config_id,
- source,
- self.TaskChannel.PARTNER_CARD,
- payload,
- remark,
- )
- return await self.pool.async_save(query=query, params=params)
- async def set_decode_result(
- self,
- source_id: str,
- config_id: int,
- result: str,
- remark: str = None,
- ) -> int:
- query = f"""
- UPDATE {TABLE_TASK}
- SET status = %s, result = %s, remark = %s
- WHERE source_id = %s AND status IN (%s, %s) AND config_id = %s
- """
- return await self.pool.async_save(
- query=query,
- params=(
- self.TaskStatus.SUCCESS,
- result,
- remark,
- source_id,
- self.TaskStatus.INIT,
- self.TaskStatus.PROCESSING,
- config_id,
- ),
- )
- async def fetch_pending_tasks(self) -> List[Dict]:
- query = f"""
- SELECT source_id, config_id
- FROM {TABLE_TASK}
- WHERE status IN (%s, %s) AND source IN (%s, %s)
- ORDER BY config_id
- LIMIT %s
- """
- return await self.pool.async_fetch(
- query=query,
- params=(
- self.TaskStatus.INIT,
- self.TaskStatus.PROCESSING,
- self.SourceType.PARTNER_CARD_TOULIU,
- self.SourceType.PARTNER_CARD_COOPERATE,
- self.TASK_BATCH,
- ),
- )
- async def update_task_status_by_source_id(
- self,
- source_id: str,
- config_id: int,
- new_status: int,
- remark: str = None,
- ) -> int:
- query = f"""
- UPDATE {TABLE_TASK}
- SET status = %s, remark = %s
- WHERE source_id = %s AND status IN (%s, %s) AND config_id = %s
- """
- return await self.pool.async_save(
- query=query,
- params=(
- new_status,
- remark,
- source_id,
- self.TaskStatus.INIT,
- self.TaskStatus.PROCESSING,
- config_id,
- ),
- )
- async def fetch_existing_source_ids(
- self, source_ids: List[str], config_id: int, source: int
- ) -> set:
- """批量查询已有任务记录的 source_id,用于去重跳过"""
- if not source_ids:
- return set()
- placeholders = ",".join(["%s"] * len(source_ids))
- query = f"""
- SELECT source_id FROM {TABLE_TASK}
- WHERE source_id IN ({placeholders})
- AND config_id = %s
- AND source = %s
- AND status IN (%s, %s, %s, %s)
- """
- return {
- r["source_id"]
- for r in await self.pool.async_fetch(
- query=query,
- params=(
- *source_ids,
- config_id,
- source,
- self.TaskStatus.INIT,
- self.TaskStatus.PROCESSING,
- self.TaskStatus.SUCCESS,
- self.TaskStatus.FAILED,
- ),
- )
- }
- __all__ = ["CardDecodeTaskMapper"]
|