"""热点内容流程服务。""" from __future__ import annotations from datetime import datetime import json import time from typing import Any from app.hot_content.category_filter import llm_filter_record_content from app.hot_content.client import ( JsonApiClient, build_url, extract_decode_item_map, extract_keyword_items, extract_rank_items, pick_first_valid_content, render_template, ) from app.hot_content.config import load_flow_config from app.hot_content.exceptions import HotContentFlowError from app.hot_content.repository import HotContentRepository, unique_title_key from app.hot_content.status import ExecutionStatus, decode_api_status_to_code from app.hot_content.timezone import SHANGHAI_TZ from app.hot_content.types import FlowConfig HOT_RANK_PAYLOAD = {"sort_type": "最热", "category": "news", "cursor": 0} KEYWORD_PAYLOAD_TEMPLATE = {"keyword": "{title}", "cursor": 0} class HotContentFlowService: def __init__( self, config: FlowConfig, repository: HotContentRepository, api_client: JsonApiClient, ): self.config = config self.repository = repository self.api_client = api_client def run(self) -> dict[str, Any]: hot_titles = self.fetch_and_save_hot_titles() selected_contents = self.search_and_save_contents(hot_titles) selected_contents = self.filter_contents_by_category(selected_contents) decode_resp = self.submit_decode_tasks(selected_contents) return self.build_summary(hot_titles, selected_contents, decode_resp) def fetch_and_save_hot_titles(self) -> list[dict[str, Any]]: saved_titles: list[dict[str, Any]] = [] seen_keys: set[str] = set() response_cache: dict[str, dict[str, Any]] = {} for source_config in self.config.sources: hot_rank_base_url = ( source_config.hot_rank_base_url or self.config.crawapi_base_url ) hot_rank_path = source_config.hot_rank_path or self.config.hot_rank_path hot_rank_payload = ( source_config.hot_rank_payload if source_config.hot_rank_payload is not None else HOT_RANK_PAYLOAD ) cache_key = json.dumps( { "base_url": hot_rank_base_url, "path": hot_rank_path, "payload": hot_rank_payload, }, ensure_ascii=False, sort_keys=True, ) if cache_key not in response_cache: hot_url = build_url(hot_rank_base_url, hot_rank_path) response_cache[cache_key] = self.api_client.post_json( hot_url, hot_rank_payload, ) resp = response_cache[cache_key] rank_items = extract_rank_items(resp, source_config.source) for rank_item in rank_items: title = str(rank_item.get("title") or rank_item.get("name") or "").strip() if not title: continue unique_key = unique_title_key(source_config.source, title) if unique_key in seen_keys: continue seen_keys.add(unique_key) record = self.repository.upsert_record( source=source_config.source, title=title, rank=_to_int_or_none(rank_item.get("rank")), ) saved_titles.append( { "id": record["id"], "unique_key": record["unique_key"], "execution_status": record["execution_status"], "article_title": record["article_title"], "article_body": record["article_body"], "article_url": record["article_url"], "has_decode_request": record["has_decode_request"], "has_contribution_points": record["has_contribution_points"], "source": source_config.source, "title": title, "rank": _to_int_or_none(rank_item.get("rank")), "source_config": source_config, } ) return saved_titles def search_and_save_contents(self, hot_titles: list[dict[str, Any]]) -> list[dict[str, Any]]: selected_contents: list[dict[str, Any]] = [] for hot in hot_titles: execution_status = int(hot.get("execution_status") or 0) if execution_status >= ExecutionStatus.DECODE_SUBMITTED or hot.get( "has_contribution_points" ): continue if execution_status == ExecutionStatus.CATEGORY_FILTER_REJECTED: continue existing_body = str(hot.get("article_body") or "").strip() existing_title = str(hot.get("article_title") or "").strip() if ( execution_status in { ExecutionStatus.CONTENT_OK, ExecutionStatus.CATEGORY_FILTER_PASSED, } and existing_title and existing_body ): selected_contents.append( { "record_id": hot["id"], "unique_key": hot["unique_key"], "hot_title": hot["title"], "status": "ok", "content_title": existing_title, "body_text": existing_body, "url": str(hot.get("article_url") or ""), "category_filter_passed": ( execution_status == ExecutionStatus.CATEGORY_FILTER_PASSED ), } ) continue keyword_url = build_url(self.config.crawapi_base_url, self.config.keyword_search_path) payload = render_template( KEYWORD_PAYLOAD_TEMPLATE, { "title": hot["title"], "source": hot["source"], "record_id": str(hot["id"]), "unique_key": str(hot["unique_key"]), "hot_title_id": str(hot["id"]), }, ) for attempt in range(1, 4): # max 3 retries try: resp = self.api_client.post_json(keyword_url, payload) selected = pick_first_valid_content(extract_keyword_items(resp)) if selected is None: self.repository.mark_no_valid_content( record_id=hot["id"], reason="no valid content", ) selected_contents.append( { "record_id": hot["id"], "unique_key": hot["unique_key"], "hot_title": hot["title"], "status": "no_valid_content", } ) break self.repository.update_article( record_id=hot["id"], article_title=selected["content_title"], article_body=selected["body_text"], url=selected["url"], ) selected_contents.append( { "record_id": hot["id"], "unique_key": hot["unique_key"], "hot_title": hot["title"], "status": "ok", **selected, } ) break except HotContentFlowError as exc: if attempt < 3: # 简单退避,避免对不稳定接口造成瞬时压力 time.sleep(attempt) continue self.repository.update_status( record_id=hot["id"], status=ExecutionStatus.CONTENT_REQUEST_FAILED, error_message=str(exc), ) selected_contents.append( { "record_id": hot["id"], "unique_key": hot["unique_key"], "hot_title": hot["title"], "status": "content_request_failed", "error": str(exc), } ) return selected_contents def filter_contents_by_category( self, selected_contents: list[dict[str, Any]], ) -> list[dict[str, Any]]: filtered: list[dict[str, Any]] = [] category_list = list(self.config.category_filter_categories) total = sum(1 for item in selected_contents if item.get("status") == "ok") processed = 0 for item in selected_contents: if item.get("status") != "ok": filtered.append(item) continue if item.get("category_filter_passed"): filtered.append(item) continue record_id = int(item["record_id"]) record = self.repository.get_record_for_category_filter(record_id) if not record: self.repository.mark_no_valid_content( record_id=record_id, reason="record not found", ) filtered.append( { "record_id": record_id, "unique_key": item.get("unique_key"), "status": "no_valid_content", "reason": "record not found", } ) continue hot_title = str(record.get("title") or "").strip() content_title = str(record.get("article_title") or "").strip() body_text = str(record.get("article_body") or "").strip() if not hot_title or not content_title or not body_text: self.repository.mark_no_valid_content( record_id=record_id, reason="missing title or body", ) filtered.append( { "record_id": record_id, "unique_key": item.get("unique_key"), "status": "no_valid_content", "reason": "missing title or body", } ) continue processed += 1 result = llm_filter_record_content( record_id=record_id, hot_title=hot_title, content_title=content_title, body_text=body_text, category_list=category_list, model=self.config.category_filter_llm_model, max_attempts=self.config.category_filter_llm_max_attempts, retry_sleep_seconds=self.config.category_filter_llm_retry_sleep_seconds, max_tokens=self.config.category_filter_llm_max_tokens, body_max_chars=self.config.category_filter_body_max_chars, ) passed = bool(result.get("passed")) self.repository.update_category_filter_result( record_id=record_id, passed=passed, result_json=result, ) if passed: filtered.append( { **item, "category_filter_passed": True, "matched_category": result.get("matched_category"), } ) else: filtered.append( { "record_id": record_id, "unique_key": item.get("unique_key"), "status": "category_rejected", } ) if ( processed < total and self.config.category_filter_item_sleep_seconds > 0 ): time.sleep(self.config.category_filter_item_sleep_seconds) return filtered def submit_decode_tasks(self, selected_contents: list[dict[str, Any]]) -> dict[str, Any]: decode_items: list[dict[str, Any]] = [] request_count = 0 failed_request_count = 0 for item in selected_contents: if item.get("status") != "ok": continue record_id = int(item["record_id"]) channel_content_id = str(item["unique_key"]) title = str(item.get("content_title") or "").strip() body_text = str(item.get("body_text") or "").strip() if not title or not body_text: continue post = { "channelContentId": channel_content_id, "title": title, "bodyText": body_text, "contentModal": 3, } decode_payload = { "params": { "configId": self.config.decode_config_id, "skipCompleted": False, "posts": [post], } } request_count += 1 try: decode_resp = self.api_client.post_json(self.config.decode_api_url, decode_payload) except HotContentFlowError as exc: decode_resp = {"code": -1, "data": [], "msg": str(exc)} failed_request_count += 1 decode_item_map = extract_decode_item_map(decode_resp) decode_item = decode_item_map.get(channel_content_id, {}) status = decode_api_status_to_code( str(decode_item.get("status") or ""), has_success_response=int(decode_resp.get("code") or 0) == 0, ) error_message = None if int(decode_resp.get("code") or 0) != 0: error_message = str(decode_resp.get("msg") or decode_resp) self.repository.update_decode_result( record_id=record_id, status=status, request_json=decode_payload, response_json=decode_item or decode_resp, error_message=error_message, ) if decode_item: decode_items.append(decode_item) if request_count == 0: return {"code": 0, "data": [], "msg": "no_valid_posts", "request_count": 0} return { "code": 0 if failed_request_count == 0 else -1, "data": decode_items, "request_count": request_count, "failed_request_count": failed_request_count, } def build_summary( self, hot_titles: list[dict[str, Any]], selected_contents: list[dict[str, Any]], decode_resp: dict[str, Any], ) -> dict[str, Any]: decode_items = decode_resp.get("data") if isinstance(decode_resp, dict) else [] decode_items = decode_items if isinstance(decode_items, list) else [] return { "run_at": datetime.now(SHANGHAI_TZ).isoformat(), "source_count": len(self.config.sources), "hot_title_count": len(hot_titles), "selected_ok_count": sum(1 for item in selected_contents if item.get("status") == "ok"), "selected_failed_count": sum( 1 for item in selected_contents if item.get("status") != "ok" ), "category_rejected_count": sum( 1 for item in selected_contents if item.get("status") == "category_rejected" ), "no_valid_content_count": sum( 1 for item in selected_contents if item.get("status") == "no_valid_content" ), "decode_post_count": sum(1 for item in selected_contents if item.get("status") == "ok"), "decode_api_code": decode_resp.get("code") if isinstance(decode_resp, dict) else None, "decode_success_count": sum( 1 for item in decode_items if str(item.get("status") or "") == "SUCCESS" ), "decode_pending_count": sum( 1 for item in decode_items if str(item.get("status") or "") == "PENDING" ), "decode_failed_count": sum( 1 for item in decode_items if str(item.get("status") or "") == "FAILED" ), } def run_once(config: FlowConfig | None = None) -> dict[str, Any]: flow_config = config or load_flow_config() repository = HotContentRepository(flow_config.mysql) try: api_client = JsonApiClient( timeout_seconds=flow_config.request_timeout_seconds, verify_ssl=flow_config.https_verify_ssl, ) service = HotContentFlowService(flow_config, repository, api_client) return service.run() finally: repository.close() def _to_int_or_none(value: Any) -> int | None: try: return int(value) except (TypeError, ValueError): return None