"""解构结果拉取与贡献点提取服务。""" from __future__ import annotations from datetime import datetime from typing import Any from app.hot_content.client import JsonApiClient, parse_decode_data_content from app.hot_content.config import load_flow_config from app.hot_content.contribution import build_contribution_points from app.hot_content.exceptions import HotContentFlowError from app.hot_content.repository import HotContentRepository from app.hot_content.status import ExecutionStatus, decode_api_status_to_code from app.hot_content.timezone import SHANGHAI_TZ from app.hot_content.types import FlowConfig class DecodeResultFlowService: def __init__( self, config: FlowConfig, repository: HotContentRepository, api_client: JsonApiClient, ): self.config = config self.repository = repository self.api_client = api_client def run(self) -> dict[str, Any]: records = self.repository.list_decode_result_candidates( limit=max(self.config.decode_result_batch_size, 1) ) if not records: return self._build_summary([], 0, 0, 0, 0) record_id_by_unique_key = { str(record["unique_key"]): int(record["id"]) for record in records } request_payload = { "params": { "configId": self.config.decode_config_id, "channelContentIds": list(record_id_by_unique_key.keys()), } } response = self.api_client.post_json(self.config.decode_result_api_url, request_payload) data_items = response.get("data") or [] if not isinstance(data_items, list): raise HotContentFlowError("invalid decode result response: data is not list") saved_count = 0 failed_count = 0 pending_count = 0 for item in data_items: if not isinstance(item, dict): continue channel_content_id = str(item.get("channelContentId") or "").strip() if not channel_content_id: continue record_id = record_id_by_unique_key.get(channel_content_id) if record_id is None: failed_count += 1 continue try: result_status = str(item.get("status") or "").strip() status_code = decode_api_status_to_code( result_status, has_success_response=True, ) if status_code != ExecutionStatus.DECODE_SUCCESS: self.repository.update_status( record_id=record_id, status=status_code, error_message=str(item.get("errorMessage") or "") or None, ) if status_code == ExecutionStatus.DECODE_PENDING: pending_count += 1 elif status_code == ExecutionStatus.DECODE_FAILED: failed_count += 1 continue decode_result = parse_decode_data_content(item) contribution_points = build_contribution_points( decode_result, score_threshold=self.config.contribution_score_threshold, ) contribution_points["channelContentId"] = channel_content_id self.repository.save_decode_result_export( record_id=record_id, decode_result_json=decode_result, contribution_points_json=contribution_points, ) saved_count += 1 except HotContentFlowError as exc: self.repository.update_status( record_id=record_id, status=ExecutionStatus.DECODE_RESULT_FAILED, error_message=str(exc), ) failed_count += 1 return self._build_summary(records, len(data_items), saved_count, failed_count, pending_count) def _build_summary( self, records: list[dict[str, Any]], response_item_count: int, saved_count: int, failed_count: int, pending_count: int, ) -> dict[str, Any]: return { "run_at": datetime.now(SHANGHAI_TZ).isoformat(), "requested_id_count": len(records), "decode_result_item_count": response_item_count, "saved_count": saved_count, "failed_count": failed_count, "pending_count": pending_count, "score_threshold": self.config.contribution_score_threshold, } def run_once(config: FlowConfig | None = None) -> dict[str, Any]: flow_config = config or load_flow_config() repository = HotContentRepository(flow_config.mysql) try: api_client = JsonApiClient( timeout_seconds=flow_config.request_timeout_seconds, verify_ssl=flow_config.https_verify_ssl, ) service = DecodeResultFlowService(flow_config, repository, api_client) return service.run() finally: repository.close()