import json from typing import Dict, List from app.core.database import DatabaseManager from app.core.observability import LogService from ._const import DecodeMaterialConst from ._mapper import MaterialDecodeTaskMapper from ._utils import MaterialDecodeUtils class CreateMaterialsDecodeTask(DecodeMaterialConst): def __init__(self, pool: DatabaseManager, log_service: LogService): self.pool = pool self.log_service = log_service self.mapper = MaterialDecodeTaskMapper(self.pool) self.tool = MaterialDecodeUtils() async def _acquire_materials(self) -> List[Dict]: """获取待解构素材并加锁(status INIT → PROCESSING)""" materials = await self.mapper.fetch_materials() locked = [] for m in materials: mid = m["id"] acquired = await self.mapper.update_material_status( mid, self.TaskStatus.INIT, self.TaskStatus.PROCESSING ) if acquired: locked.append(m) else: await self.log_service.log( contents={ "material_id": mid, "task": "create_material_decode_task", "status": "skip", "message": "acquire lock failed", } ) return locked async def _submit_and_record(self, materials: List[Dict]): if not materials: return # 无 material_cover 的素材直接标记失败 valid = [] for m in materials: if not m.get("material_cover"): await self.mapper.update_material_status( m["id"], self.TaskStatus.PROCESSING, self.TaskStatus.FAILED, ) await self.log_service.log( contents={ "material_id": m["id"], "task": "create_material_decode_task", "status": "fail", "message": "material_cover is empty, marked as FAILED", } ) else: valid.append(m) if not valid: return # 跨批次去重:查 DB 已有任务 all_source_ids = [str(m["material_id"]) for m in valid] existing = await self.mapper.fetch_existing_source_ids(all_source_ids) # 同批次去重:相同 material_id 只保留第一条 seen = set() deduped = [] dups = [] for m in valid: mid = str(m["material_id"]) if mid in existing or mid in seen: dups.append(m) else: seen.add(mid) deduped.append(m) if dups: await self.log_service.log( contents={ "task": "create_material_decode_task", "message": f"Skipped {len(dups)} duplicate materials (already submitted or same-batch)", } ) for m in dups: await self.mapper.update_material_status( m["id"], self.TaskStatus.PROCESSING, self.TaskStatus.SUCCESS, ) if not deduped: return posts = self.tool.prepare_posts(deduped) submit_results = await self.tool.submit_decode_batch(posts) posts_by_cid = {p["channelContentId"]: p for p in posts} for m in deduped: source_id = str(m["material_id"]) mid = m["id"] result = submit_results.get(source_id) if not result: await self.mapper.update_material_status( mid, self.TaskStatus.PROCESSING, self.TaskStatus.INIT ) await self.log_service.log( contents={ "material_id": mid, "source_id": source_id, "task": "create_material_decode_task", "status": "fail", "message": "no response for source_id, rolled back to INIT", } ) continue status = result.get("status") if status == self.SubmitStatus.FAILED: await self.mapper.update_material_status( mid, self.TaskStatus.PROCESSING, self.TaskStatus.INIT ) await self.log_service.log( contents={ "material_id": mid, "source_id": source_id, "task": "create_material_decode_task", "status": "fail", "data": result, } ) continue if status == self.SubmitStatus.SUCCESS: query_results = await self.tool.query_decode_results_batch([source_id]) result_data = query_results.get(source_id) if ( result_data and result_data.get("status") == self.QueryStatus.SUCCESS ): data_content = result_data.get("dataContent") or "{}" html = result_data.get("html") await self.mapper.insert_decode_task( source_id=source_id, payload=json.dumps( posts_by_cid.get(source_id, {}), ensure_ascii=False ), remark="提交时已有解构结果,直接落库", ) await self.mapper.set_decode_result( source_id=source_id, result=json.dumps( {"dataContent": data_content, "html": html}, ensure_ascii=False, ), remark="提交时已返回 SUCCESS,结果已落库", ) await self.mapper.update_material_status( mid, self.TaskStatus.PROCESSING, self.TaskStatus.SUCCESS ) await self.log_service.log( contents={ "material_id": mid, "source_id": source_id, "task": "create_material_decode_task", "status": "success", "message": "decode result already available on submit", } ) else: await self.mapper.insert_decode_task( source_id=source_id, payload=json.dumps( posts_by_cid.get(source_id, {}), ensure_ascii=False ), remark="提交返回SUCCESS,查询未果,等待轮询", status=self.TaskStatus.PROCESSING, ) await self.mapper.update_material_status( mid, self.TaskStatus.PROCESSING, self.TaskStatus.SUCCESS ) await self.log_service.log( contents={ "material_id": mid, "source_id": source_id, "task": "create_material_decode_task", "status": "pending", "message": "submit SUCCESS but query not ready, inserted for polling", } ) elif status == self.SubmitStatus.PENDING: await self.mapper.insert_decode_task( source_id=source_id, payload=json.dumps( posts_by_cid.get(source_id, {}), ensure_ascii=False ), remark="素材解构任务已提交,等待轮询", status=self.TaskStatus.PROCESSING, ) await self.mapper.update_material_status( mid, self.TaskStatus.PROCESSING, self.TaskStatus.SUCCESS ) await self.log_service.log( contents={ "material_id": mid, "source_id": source_id, "task": "create_material_decode_task", "status": "pending", "message": "task submitted, waiting for polling", } ) else: await self.mapper.update_material_status( mid, self.TaskStatus.PROCESSING, self.TaskStatus.INIT ) await self.log_service.log( contents={ "material_id": mid, "source_id": source_id, "task": "create_material_decode_task", "status": "fail", "message": f"unexpected submit status: {status}, rolled back to INIT", "data": result, } ) async def deal(self): materials = await self._acquire_materials() if not materials: await self.log_service.log( contents={ "task": "create_material_decode_task", "message": "No more materials to decode", } ) return await self._submit_and_record(materials) await self.log_service.log( contents={ "task": "create_material_decode_task", "message": f"Processed {len(materials)} materials", } ) __all__ = ["CreateMaterialsDecodeTask"]