| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356 |
- import json
- from collections import defaultdict
- from typing import Dict, List
- from app.core.database import DatabaseManager
- from app.core.observability import LogService
- from ._const import DecodeCardConst
- from ._mapper import CardDecodeTaskMapper
- from ._utils import CardDecodeUtils
- class CreateCardsDecodeTask(DecodeCardConst):
- def __init__(self, pool: DatabaseManager, log_service: LogService):
- self.pool = pool
- self.log_service = log_service
- self.mapper = CardDecodeTaskMapper(self.pool)
- self.tool = CardDecodeUtils()
- async def _acquire_cards(self) -> List[Dict]:
- """获取待解构卡片并加锁(status INIT → PROCESSING)"""
- cards = await self.mapper.fetch_cards()
- locked = []
- for card in cards:
- card_id = card["id"]
- acquired = await self.mapper.update_card_status(
- card_id, self.TaskStatus.INIT, self.TaskStatus.PROCESSING
- )
- if acquired:
- locked.append(card)
- else:
- await self.log_service.log(
- contents={
- "card_id": card_id,
- "task": "create_card_decode_task",
- "status": "skip",
- "message": "acquire lock failed",
- }
- )
- return locked
- async def _ensure_cover_ids(self, cards: List[Dict]):
- """为缺少 card_cover_id 的卡片下载封面并计算 MD5"""
- for card in cards:
- if card.get("card_cover_id"):
- continue
- if not card.get("share_cover"):
- continue # 空封面留给 _submit_and_record 标记 FAILED
- try:
- cover_id = self.tool.compute_cover_md5(card["share_cover"])
- await self.mapper.set_card_cover_id(card["id"], cover_id)
- card["card_cover_id"] = cover_id
- await self.log_service.log(
- contents={
- "card_id": card["id"],
- "task": "create_card_decode_task",
- "status": "info",
- "message": f"computed cover_id={cover_id}",
- }
- )
- except Exception as e:
- await self.mapper.update_card_status(
- card["id"],
- self.TaskStatus.PROCESSING,
- self.TaskStatus.FAILED,
- )
- await self.log_service.log(
- contents={
- "card_id": card["id"],
- "share_cover": card.get("share_cover"),
- "task": "create_card_decode_task",
- "status": "fail",
- "message": f"compute cover md5 failed, marked as FAILED: {e}",
- }
- )
- @staticmethod
- def _group_cards_by_channel(cards: List[Dict]) -> Dict[int, List[Dict]]:
- """按 auto_reply_top_cards_daily.channel 分组,映射到对应的 config_id"""
- grouped = defaultdict(list)
- for card in cards:
- source_channel = card.get("channel", "")
- config_id = DecodeCardConst.CHANNEL_CONFIG_MAP.get(source_channel)
- if config_id:
- grouped[config_id].append(card)
- return dict(grouped)
- async def _submit_and_record_for_config(
- self, cards: List[Dict], config_id: int
- ):
- """对同一 config_id 的卡片执行提交与落库"""
- if not cards:
- return
- # 过滤已有任务记录的卡片(按 card_cover_id 去重)
- cards_with_cid = [c for c in cards if c.get("card_cover_id")]
- if not cards_with_cid:
- return
- # 跨批次去重:查 DB 已有任务
- all_source_ids = [str(c["card_cover_id"]) for c in cards_with_cid]
- existing = await self.mapper.fetch_existing_source_ids(
- all_source_ids, config_id
- )
- # 同批次去重:相同 card_cover_id 只保留第一条
- seen = set()
- deduped_cards = []
- intra_dups = []
- for c in cards_with_cid:
- cid = str(c["card_cover_id"])
- if cid in existing or cid in seen:
- intra_dups.append(c)
- else:
- seen.add(cid)
- deduped_cards.append(c)
- if intra_dups:
- await self.log_service.log(
- contents={
- "task": "create_card_decode_task",
- "config_id": config_id,
- "message": f"Skipped {len(intra_dups)} duplicate cards (already submitted or same-batch)",
- }
- )
- for card in intra_dups:
- await self.mapper.update_card_status(
- card["id"],
- self.TaskStatus.PROCESSING,
- self.TaskStatus.SUCCESS,
- )
- new_cards = deduped_cards
- if not new_cards:
- return
- posts = self.tool.prepare_posts(new_cards)
- submit_results = await self.tool.submit_decode_batch(
- posts, config_id=config_id
- )
- posts_by_cid = {p["channelContentId"]: p for p in posts}
- for card in new_cards:
- source_id = str(card["card_cover_id"])
- card_id = card["id"]
- result = submit_results.get(source_id)
- if not result:
- await self.mapper.update_card_status(
- card_id, self.TaskStatus.PROCESSING, self.TaskStatus.INIT
- )
- await self.log_service.log(
- contents={
- "card_id": card_id,
- "source_id": source_id,
- "config_id": config_id,
- "task": "create_card_decode_task",
- "status": "fail",
- "message": "no response for source_id, rolled back to INIT",
- }
- )
- continue
- status = result.get("status")
- if status == self.SubmitStatus.FAILED:
- await self.mapper.update_card_status(
- card_id, self.TaskStatus.PROCESSING, self.TaskStatus.INIT
- )
- await self.log_service.log(
- contents={
- "card_id": card_id,
- "source_id": source_id,
- "config_id": config_id,
- "task": "create_card_decode_task",
- "status": "fail",
- "data": result,
- }
- )
- continue
- if status == self.SubmitStatus.SUCCESS:
- query_results = await self.tool.query_decode_results_batch(
- [source_id], config_id=config_id
- )
- result_data = query_results.get(source_id)
- if (
- result_data
- and result_data.get("status") == self.QueryStatus.SUCCESS
- ):
- data_content = result_data.get("dataContent") or "{}"
- html = result_data.get("html")
- await self.mapper.insert_decode_task(
- source_id=source_id,
- config_id=config_id,
- payload=json.dumps(
- posts_by_cid.get(source_id, {}), ensure_ascii=False
- ),
- remark="提交时已有解构结果,直接落库",
- )
- await self.mapper.set_decode_result(
- source_id=source_id,
- config_id=config_id,
- result=json.dumps(
- {"dataContent": data_content, "html": html},
- ensure_ascii=False,
- ),
- remark="提交时已返回 SUCCESS,结果已落库",
- )
- await self.mapper.update_card_status(
- card_id, self.TaskStatus.PROCESSING, self.TaskStatus.SUCCESS
- )
- await self.log_service.log(
- contents={
- "card_id": card_id,
- "source_id": source_id,
- "config_id": config_id,
- "task": "create_card_decode_task",
- "status": "success",
- "message": "decode result already available on submit",
- }
- )
- else:
- await self.mapper.insert_decode_task(
- source_id=source_id,
- config_id=config_id,
- payload=json.dumps(
- posts_by_cid.get(source_id, {}), ensure_ascii=False
- ),
- remark="提交返回SUCCESS,查询未果,等待轮询",
- status=self.TaskStatus.PROCESSING,
- )
- await self.mapper.update_card_status(
- card_id, self.TaskStatus.PROCESSING, self.TaskStatus.SUCCESS
- )
- await self.log_service.log(
- contents={
- "card_id": card_id,
- "source_id": source_id,
- "config_id": config_id,
- "task": "create_card_decode_task",
- "status": "pending",
- "message": "submit SUCCESS but query not ready, inserted for polling",
- }
- )
- elif status == self.SubmitStatus.PENDING:
- await self.mapper.insert_decode_task(
- source_id=source_id,
- config_id=config_id,
- payload=json.dumps(
- posts_by_cid.get(source_id, {}), ensure_ascii=False
- ),
- remark="卡片解构任务已提交,等待轮询",
- status=self.TaskStatus.PROCESSING,
- )
- await self.mapper.update_card_status(
- card_id, self.TaskStatus.PROCESSING, self.TaskStatus.SUCCESS
- )
- await self.log_service.log(
- contents={
- "card_id": card_id,
- "source_id": source_id,
- "config_id": config_id,
- "task": "create_card_decode_task",
- "status": "pending",
- "message": "task submitted, waiting for polling",
- }
- )
- else:
- await self.mapper.update_card_status(
- card_id, self.TaskStatus.PROCESSING, self.TaskStatus.INIT
- )
- await self.log_service.log(
- contents={
- "card_id": card_id,
- "source_id": source_id,
- "config_id": config_id,
- "task": "create_card_decode_task",
- "status": "fail",
- "message": f"unexpected submit status: {status}, rolled back to INIT",
- "data": result,
- }
- )
- async def _submit_and_record(self, cards: List[Dict]):
- if not cards:
- return
- # 无 share_cover 的卡片直接标记失败
- valid_cards = []
- for card in cards:
- if not card.get("share_cover"):
- await self.mapper.update_card_status(
- card["id"],
- self.TaskStatus.PROCESSING,
- self.TaskStatus.FAILED,
- )
- await self.log_service.log(
- contents={
- "card_id": card["id"],
- "task": "create_card_decode_task",
- "status": "fail",
- "message": "share_cover is empty, marked as FAILED",
- }
- )
- else:
- valid_cards.append(card)
- if not valid_cards:
- return
- grouped = self._group_cards_by_channel(valid_cards)
- for config_id, group_cards in grouped.items():
- await self._submit_and_record_for_config(group_cards, config_id)
- # 处理不在映射表中的卡片(回滚状态)
- mapped_ids = {id(c) for g in grouped.values() for c in g}
- for card in valid_cards:
- if id(card) not in mapped_ids:
- await self.mapper.update_card_status(
- card["id"],
- self.TaskStatus.PROCESSING,
- self.TaskStatus.INIT,
- )
- await self.log_service.log(
- contents={
- "card_id": card["id"],
- "channel": card.get("channel"),
- "task": "create_card_decode_task",
- "status": "fail",
- "message": "unknown channel, rolled back to INIT",
- }
- )
- async def deal(self):
- cards = await self._acquire_cards()
- if not cards:
- await self.log_service.log(
- contents={
- "task": "create_card_decode_task",
- "message": "No more cards to decode",
- }
- )
- return
- await self._ensure_cover_ids(cards)
- await self._submit_and_record(cards)
- await self.log_service.log(
- contents={
- "task": "create_card_decode_task",
- "message": f"Processed {len(cards)} cards",
- }
- )
- __all__ = ["CreateCardsDecodeTask"]
|