"""热点内容流程服务。""" from __future__ import annotations from datetime import datetime import time from typing import Any from app.hot_content.client import ( JsonApiClient, build_url, extract_decode_item_map, extract_keyword_items, extract_rank_items, pick_first_valid_content, render_template, ) from app.hot_content.config import load_flow_config from app.hot_content.exceptions import HotContentFlowError from app.hot_content.repository import HotContentRepository, unique_title_key from app.hot_content.status import ExecutionStatus, decode_api_status_to_code from app.hot_content.timezone import SHANGHAI_TZ from app.hot_content.types import FlowConfig HOT_RANK_PAYLOAD = {"sort_type": "最热", "category": "news", "cursor": 0} KEYWORD_PAYLOAD_TEMPLATE = {"keyword": "{title}", "cursor": 0} class HotContentFlowService: def __init__( self, config: FlowConfig, repository: HotContentRepository, api_client: JsonApiClient, ): self.config = config self.repository = repository self.api_client = api_client def run(self) -> dict[str, Any]: hot_titles = self.fetch_and_save_hot_titles() selected_contents = self.search_and_save_contents(hot_titles) decode_resp = self.submit_decode_tasks(selected_contents) return self.build_summary(hot_titles, selected_contents, decode_resp) def fetch_and_save_hot_titles(self) -> list[dict[str, Any]]: hot_url = build_url(self.config.crawapi_base_url, self.config.hot_rank_path) saved_titles: list[dict[str, Any]] = [] seen_keys: set[str] = set() resp = self.api_client.post_json(hot_url, HOT_RANK_PAYLOAD) for source_config in self.config.sources: rank_items = extract_rank_items(resp, source_config.source) for rank_item in rank_items[: source_config.count]: title = str(rank_item.get("title") or "").strip() if not title: continue unique_key = unique_title_key(source_config.source, title) if unique_key in seen_keys: continue seen_keys.add(unique_key) record = self.repository.upsert_record( source=source_config.source, title=title, rank=_to_int_or_none(rank_item.get("rank")), ) saved_titles.append( { "id": record["id"], "unique_key": record["unique_key"], "execution_status": record["execution_status"], "article_title": record["article_title"], "article_body": record["article_body"], "article_url": record["article_url"], "has_decode_request": record["has_decode_request"], "has_contribution_points": record["has_contribution_points"], "source": source_config.source, "title": title, "rank": _to_int_or_none(rank_item.get("rank")), "source_config": source_config, } ) return saved_titles def search_and_save_contents(self, hot_titles: list[dict[str, Any]]) -> list[dict[str, Any]]: selected_contents: list[dict[str, Any]] = [] for hot in hot_titles: execution_status = int(hot.get("execution_status") or 0) if execution_status >= ExecutionStatus.DECODE_SUBMITTED or hot.get( "has_contribution_points" ): continue existing_body = str(hot.get("article_body") or "").strip() existing_title = str(hot.get("article_title") or "").strip() if execution_status == ExecutionStatus.CONTENT_OK and existing_title and existing_body: selected_contents.append( { "record_id": hot["id"], "unique_key": hot["unique_key"], "hot_title": hot["title"], "status": "ok", "content_title": existing_title, "body_text": existing_body, "url": str(hot.get("article_url") or ""), } ) continue keyword_url = build_url(self.config.crawapi_base_url, self.config.keyword_search_path) payload = render_template( KEYWORD_PAYLOAD_TEMPLATE, { "title": hot["title"], "source": hot["source"], "record_id": str(hot["id"]), "unique_key": str(hot["unique_key"]), "hot_title_id": str(hot["id"]), }, ) for attempt in range(1, 4): # max 3 retries try: resp = self.api_client.post_json(keyword_url, payload) selected = pick_first_valid_content(extract_keyword_items(resp)) if selected is None: self.repository.update_status( record_id=hot["id"], status=ExecutionStatus.NO_VALID_CONTENT, ) selected_contents.append( { "record_id": hot["id"], "unique_key": hot["unique_key"], "hot_title": hot["title"], "status": "no_valid_content", } ) break self.repository.update_article( record_id=hot["id"], article_title=selected["content_title"], article_body=selected["body_text"], url=selected["url"], ) selected_contents.append( { "record_id": hot["id"], "unique_key": hot["unique_key"], "hot_title": hot["title"], "status": "ok", **selected, } ) break except HotContentFlowError as exc: if attempt < 3: # 简单退避,避免对不稳定接口造成瞬时压力 time.sleep(attempt) continue self.repository.update_status( record_id=hot["id"], status=ExecutionStatus.CONTENT_REQUEST_FAILED, error_message=str(exc), ) selected_contents.append( { "record_id": hot["id"], "unique_key": hot["unique_key"], "hot_title": hot["title"], "status": "content_request_failed", "error": str(exc), } ) return selected_contents def submit_decode_tasks(self, selected_contents: list[dict[str, Any]]) -> dict[str, Any]: decode_items: list[dict[str, Any]] = [] request_count = 0 failed_request_count = 0 for item in selected_contents: if item.get("status") != "ok": continue record_id = int(item["record_id"]) channel_content_id = str(item["unique_key"]) title = str(item.get("content_title") or "").strip() body_text = str(item.get("body_text") or "").strip() if not title or not body_text: continue post = { "channelContentId": channel_content_id, "title": title, "bodyText": body_text, "contentModal": 3, } decode_payload = { "params": { "configId": self.config.decode_config_id, "skipCompleted": False, "posts": [post], } } request_count += 1 try: decode_resp = self.api_client.post_json(self.config.decode_api_url, decode_payload) except HotContentFlowError as exc: decode_resp = {"code": -1, "data": [], "msg": str(exc)} failed_request_count += 1 decode_item_map = extract_decode_item_map(decode_resp) decode_item = decode_item_map.get(channel_content_id, {}) status = decode_api_status_to_code( str(decode_item.get("status") or ""), has_success_response=int(decode_resp.get("code") or 0) == 0, ) error_message = None if int(decode_resp.get("code") or 0) != 0: error_message = str(decode_resp.get("msg") or decode_resp) self.repository.update_decode_result( record_id=record_id, status=status, request_json=decode_payload, response_json=decode_item or decode_resp, error_message=error_message, ) if decode_item: decode_items.append(decode_item) if request_count == 0: return {"code": 0, "data": [], "msg": "no_valid_posts", "request_count": 0} return { "code": 0 if failed_request_count == 0 else -1, "data": decode_items, "request_count": request_count, "failed_request_count": failed_request_count, } def build_summary( self, hot_titles: list[dict[str, Any]], selected_contents: list[dict[str, Any]], decode_resp: dict[str, Any], ) -> dict[str, Any]: decode_items = decode_resp.get("data") if isinstance(decode_resp, dict) else [] decode_items = decode_items if isinstance(decode_items, list) else [] return { "run_at": datetime.now(SHANGHAI_TZ).isoformat(), "source_count": len(self.config.sources), "hot_title_count": len(hot_titles), "selected_ok_count": sum(1 for item in selected_contents if item.get("status") == "ok"), "selected_failed_count": sum( 1 for item in selected_contents if item.get("status") != "ok" ), "decode_post_count": sum(1 for item in selected_contents if item.get("status") == "ok"), "decode_api_code": decode_resp.get("code") if isinstance(decode_resp, dict) else None, "decode_success_count": sum( 1 for item in decode_items if str(item.get("status") or "") == "SUCCESS" ), "decode_pending_count": sum( 1 for item in decode_items if str(item.get("status") or "") == "PENDING" ), "decode_failed_count": sum( 1 for item in decode_items if str(item.get("status") or "") == "FAILED" ), } def run_once(config: FlowConfig | None = None) -> dict[str, Any]: flow_config = config or load_flow_config() repository = HotContentRepository(flow_config.mysql) try: api_client = JsonApiClient( timeout_seconds=flow_config.request_timeout_seconds, verify_ssl=flow_config.https_verify_ssl, ) service = HotContentFlowService(flow_config, repository, api_client) return service.run() finally: repository.close() def _to_int_or_none(value: Any) -> int | None: try: return int(value) except (TypeError, ValueError): return None