decode_result_service.py 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135
  1. """解构结果拉取与贡献点提取服务。"""
  2. from __future__ import annotations
  3. from datetime import datetime
  4. from typing import Any
  5. from app.hot_content.client import JsonApiClient, parse_decode_data_content
  6. from app.hot_content.config import load_flow_config
  7. from app.hot_content.contribution import build_contribution_points
  8. from app.hot_content.exceptions import HotContentFlowError
  9. from app.hot_content.repository import HotContentRepository
  10. from app.hot_content.status import ExecutionStatus, decode_api_status_to_code
  11. from app.hot_content.timezone import SHANGHAI_TZ
  12. from app.hot_content.types import FlowConfig
  13. class DecodeResultFlowService:
  14. def __init__(
  15. self,
  16. config: FlowConfig,
  17. repository: HotContentRepository,
  18. api_client: JsonApiClient,
  19. ):
  20. self.config = config
  21. self.repository = repository
  22. self.api_client = api_client
  23. def run(self) -> dict[str, Any]:
  24. records = self.repository.list_decode_result_candidates(
  25. limit=max(self.config.decode_result_batch_size, 1)
  26. )
  27. if not records:
  28. return self._build_summary([], 0, 0, 0, 0)
  29. record_id_by_unique_key = {
  30. str(record["unique_key"]): int(record["id"])
  31. for record in records
  32. }
  33. request_payload = {
  34. "params": {
  35. "configId": self.config.decode_config_id,
  36. "channelContentIds": list(record_id_by_unique_key.keys()),
  37. }
  38. }
  39. response = self.api_client.post_json(self.config.decode_result_api_url, request_payload)
  40. data_items = response.get("data") or []
  41. if not isinstance(data_items, list):
  42. raise HotContentFlowError("invalid decode result response: data is not list")
  43. saved_count = 0
  44. failed_count = 0
  45. pending_count = 0
  46. for item in data_items:
  47. if not isinstance(item, dict):
  48. continue
  49. channel_content_id = str(item.get("channelContentId") or "").strip()
  50. if not channel_content_id:
  51. continue
  52. record_id = record_id_by_unique_key.get(channel_content_id)
  53. if record_id is None:
  54. failed_count += 1
  55. continue
  56. try:
  57. result_status = str(item.get("status") or "").strip()
  58. status_code = decode_api_status_to_code(
  59. result_status,
  60. has_success_response=True,
  61. )
  62. if status_code != ExecutionStatus.DECODE_SUCCESS:
  63. self.repository.update_status(
  64. record_id=record_id,
  65. status=status_code,
  66. error_message=str(item.get("errorMessage") or "") or None,
  67. )
  68. if status_code == ExecutionStatus.DECODE_PENDING:
  69. pending_count += 1
  70. elif status_code == ExecutionStatus.DECODE_FAILED:
  71. failed_count += 1
  72. continue
  73. decode_result = parse_decode_data_content(item)
  74. contribution_points = build_contribution_points(
  75. decode_result,
  76. score_threshold=self.config.contribution_score_threshold,
  77. )
  78. contribution_points["channelContentId"] = channel_content_id
  79. self.repository.save_decode_result_export(
  80. record_id=record_id,
  81. decode_result_json=decode_result,
  82. contribution_points_json=contribution_points,
  83. )
  84. saved_count += 1
  85. except HotContentFlowError as exc:
  86. self.repository.update_status(
  87. record_id=record_id,
  88. status=ExecutionStatus.DECODE_RESULT_FAILED,
  89. error_message=str(exc),
  90. )
  91. failed_count += 1
  92. return self._build_summary(records, len(data_items), saved_count, failed_count, pending_count)
  93. def _build_summary(
  94. self,
  95. records: list[dict[str, Any]],
  96. response_item_count: int,
  97. saved_count: int,
  98. failed_count: int,
  99. pending_count: int,
  100. ) -> dict[str, Any]:
  101. return {
  102. "run_at": datetime.now(SHANGHAI_TZ).isoformat(),
  103. "requested_id_count": len(records),
  104. "decode_result_item_count": response_item_count,
  105. "saved_count": saved_count,
  106. "failed_count": failed_count,
  107. "pending_count": pending_count,
  108. "score_threshold": self.config.contribution_score_threshold,
  109. }
  110. def run_once(config: FlowConfig | None = None) -> dict[str, Any]:
  111. flow_config = config or load_flow_config()
  112. repository = HotContentRepository(flow_config.mysql)
  113. try:
  114. api_client = JsonApiClient(
  115. timeout_seconds=flow_config.request_timeout_seconds,
  116. verify_ssl=flow_config.https_verify_ssl,
  117. )
  118. service = DecodeResultFlowService(flow_config, repository, api_client)
  119. return service.run()
  120. finally:
  121. repository.close()