| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135 |
- """解构结果拉取与贡献点提取服务。"""
- from __future__ import annotations
- from datetime import datetime
- from typing import Any
- from app.hot_content.client import JsonApiClient, parse_decode_data_content
- from app.hot_content.config import load_flow_config
- from app.hot_content.contribution import build_contribution_points
- from app.hot_content.exceptions import HotContentFlowError
- from app.hot_content.repository import HotContentRepository
- from app.hot_content.status import ExecutionStatus, decode_api_status_to_code
- from app.hot_content.timezone import SHANGHAI_TZ
- from app.hot_content.types import FlowConfig
- class DecodeResultFlowService:
- def __init__(
- self,
- config: FlowConfig,
- repository: HotContentRepository,
- api_client: JsonApiClient,
- ):
- self.config = config
- self.repository = repository
- self.api_client = api_client
- def run(self) -> dict[str, Any]:
- records = self.repository.list_decode_result_candidates(
- limit=max(self.config.decode_result_batch_size, 1)
- )
- if not records:
- return self._build_summary([], 0, 0, 0, 0)
- record_id_by_unique_key = {
- str(record["unique_key"]): int(record["id"])
- for record in records
- }
- request_payload = {
- "params": {
- "configId": self.config.decode_config_id,
- "channelContentIds": list(record_id_by_unique_key.keys()),
- }
- }
- response = self.api_client.post_json(self.config.decode_result_api_url, request_payload)
- data_items = response.get("data") or []
- if not isinstance(data_items, list):
- raise HotContentFlowError("invalid decode result response: data is not list")
- saved_count = 0
- failed_count = 0
- pending_count = 0
- for item in data_items:
- if not isinstance(item, dict):
- continue
- channel_content_id = str(item.get("channelContentId") or "").strip()
- if not channel_content_id:
- continue
- record_id = record_id_by_unique_key.get(channel_content_id)
- if record_id is None:
- failed_count += 1
- continue
- try:
- result_status = str(item.get("status") or "").strip()
- status_code = decode_api_status_to_code(
- result_status,
- has_success_response=True,
- )
- if status_code != ExecutionStatus.DECODE_SUCCESS:
- self.repository.update_status(
- record_id=record_id,
- status=status_code,
- error_message=str(item.get("errorMessage") or "") or None,
- )
- if status_code == ExecutionStatus.DECODE_PENDING:
- pending_count += 1
- elif status_code == ExecutionStatus.DECODE_FAILED:
- failed_count += 1
- continue
- decode_result = parse_decode_data_content(item)
- contribution_points = build_contribution_points(
- decode_result,
- score_threshold=self.config.contribution_score_threshold,
- )
- contribution_points["channelContentId"] = channel_content_id
- self.repository.save_decode_result_export(
- record_id=record_id,
- decode_result_json=decode_result,
- contribution_points_json=contribution_points,
- )
- saved_count += 1
- except HotContentFlowError as exc:
- self.repository.update_status(
- record_id=record_id,
- status=ExecutionStatus.DECODE_RESULT_FAILED,
- error_message=str(exc),
- )
- failed_count += 1
- return self._build_summary(records, len(data_items), saved_count, failed_count, pending_count)
- def _build_summary(
- self,
- records: list[dict[str, Any]],
- response_item_count: int,
- saved_count: int,
- failed_count: int,
- pending_count: int,
- ) -> dict[str, Any]:
- return {
- "run_at": datetime.now(SHANGHAI_TZ).isoformat(),
- "requested_id_count": len(records),
- "decode_result_item_count": response_item_count,
- "saved_count": saved_count,
- "failed_count": failed_count,
- "pending_count": pending_count,
- "score_threshold": self.config.contribution_score_threshold,
- }
- def run_once(config: FlowConfig | None = None) -> dict[str, Any]:
- flow_config = config or load_flow_config()
- repository = HotContentRepository(flow_config.mysql)
- try:
- api_client = JsonApiClient(
- timeout_seconds=flow_config.request_timeout_seconds,
- verify_ssl=flow_config.https_verify_ssl,
- )
- service = DecodeResultFlowService(flow_config, repository, api_client)
- return service.run()
- finally:
- repository.close()
|