"""热点内容流程配置加载。""" from __future__ import annotations import json import os from pathlib import Path from typing import Any from app.core.config import PROJECT_ROOT, settings from app.hot_content.exceptions import HotContentFlowError from app.hot_content.types import FlowConfig, HotSourceConfig, MysqlConfig def _get_env(name: str, default: str = "") -> str: value = os.getenv(name) if value is None or value == "": return default return value def _get_env_int(name: str, default: int) -> int: raw = os.getenv(name) if raw is None or raw == "": return default try: return int(raw) except ValueError as exc: raise HotContentFlowError(f"invalid integer env {name}={raw!r}") from exc def _get_env_float(name: str, default: float) -> float: raw = os.getenv(name) if raw is None or raw == "": return default try: return float(raw) except ValueError as exc: raise HotContentFlowError(f"invalid float env {name}={raw!r}") from exc def _get_env_bool(name: str, default: bool) -> bool: raw = os.getenv(name) if raw is None or raw == "": return default return raw.strip().lower() in {"1", "true", "yes", "y", "on"} def _load_json_from_env_or_file(env_name: str, file_env_name: str) -> Any | None: file_path = os.getenv(file_env_name) if file_path: path = Path(file_path).expanduser() if not path.is_absolute(): path = PROJECT_ROOT / path try: return json.loads(path.read_text(encoding="utf-8")) except json.JSONDecodeError as exc: raise HotContentFlowError(f"invalid json file {path}") from exc raw = os.getenv(env_name) if not raw: return None try: return json.loads(raw) except json.JSONDecodeError as exc: raise HotContentFlowError(f"invalid json env {env_name}") from exc def _normalize_source_config(item: Any) -> HotSourceConfig: if isinstance(item, str): source = item.strip() if not source: raise HotContentFlowError("hot source cannot be empty") return HotSourceConfig(source=source) if not isinstance(item, dict): raise HotContentFlowError(f"invalid hot source config: {item!r}") source = str(item.get("source") or item.get("source_name") or "").strip() if not source: raise HotContentFlowError(f"hot source missing source: {item!r}") return HotSourceConfig( source=source, count=int(item.get("count") or item.get("limit") or item.get("rank_limit") or 10), ) def _load_sources() -> list[HotSourceConfig]: raw_sources = _load_json_from_env_or_file("HOT_FLOW_SOURCES_JSON", "HOT_FLOW_SOURCES_FILE") if raw_sources is None: raw_sources = settings.hot_flow_sources if not isinstance(raw_sources, list): raise HotContentFlowError("HOT_FLOW_SOURCES_JSON/HOT_FLOW_SOURCES_FILE must be a list") sources = [_normalize_source_config(item) for item in raw_sources] if not sources: raise HotContentFlowError("hot sources cannot be empty") return sources def _parse_cron_hours(value: str) -> str: hours = [item.strip() for item in value.split(",") if item.strip()] if not hours: raise HotContentFlowError("hot flow cron hours cannot be empty") normalized: list[str] = [] for hour in hours: try: hour_num = int(hour) except ValueError as exc: raise HotContentFlowError(f"invalid hot flow cron hour: {hour!r}") from exc if not 0 <= hour_num <= 23: raise HotContentFlowError(f"hot flow cron hour out of range: {hour_num}") normalized.append(str(hour_num)) return ",".join(normalized) def load_flow_config(interval_override: int | None = None) -> FlowConfig: crawapi_base_url = _get_env("CRAWAPI_BASE_URL", settings.crawapi_base_url).rstrip("/") hot_rank_path = _get_env( "CRAWAPI_HOT_CONTENT_RANK_PATH", settings.crawapi_hot_content_rank_path, ) if not crawapi_base_url: raise HotContentFlowError("missing CRAWAPI_BASE_URL or settings.crawapi_base_url") if not hot_rank_path: raise HotContentFlowError( "missing CRAWAPI_HOT_CONTENT_RANK_PATH or settings.crawapi_hot_content_rank_path" ) interval_seconds = ( interval_override if interval_override is not None else _get_env_int("HOT_FLOW_INTERVAL_SECONDS", settings.hot_flow_interval_seconds) ) return FlowConfig( crawapi_base_url=crawapi_base_url, hot_rank_path=hot_rank_path, keyword_search_path=_get_env( "CRAWAPI_KEYWORD_SEARCH_PATH", settings.crawapi_keyword_search_path, ), decode_api_url=_get_env("DECODE_API_URL", settings.decode_api_url), decode_result_api_url=_get_env( "DECODE_RESULT_API_URL", settings.decode_result_api_url, ), decode_config_id=_get_env_int("DECODE_CONFIG_ID", settings.decode_config_id), request_timeout_seconds=_get_env_int( "REQUEST_TIMEOUT_SECONDS", settings.request_timeout_seconds, ), https_verify_ssl=_get_env_bool("HTTPS_VERIFY_SSL", settings.https_verify_ssl), hot_flow_cron_hours=_parse_cron_hours( _get_env("HOT_FLOW_CRON_HOURS", settings.hot_flow_cron_hours) ), hot_flow_cron_minute=_get_env_int( "HOT_FLOW_CRON_MINUTE", settings.hot_flow_cron_minute, ), schedule_interval_seconds=interval_seconds, decode_result_interval_seconds=_get_env_int( "DECODE_RESULT_FLOW_INTERVAL_SECONDS", settings.decode_result_flow_interval_seconds, ), decode_result_batch_size=_get_env_int( "DECODE_RESULT_BATCH_SIZE", settings.decode_result_batch_size, ), contribution_score_threshold=float( _get_env( "CONTRIBUTION_SCORE_THRESHOLD", str(settings.contribution_score_threshold), ) ), demand_pool_source_table=_get_env( "DEMAND_POOL_SOURCE_TABLE", settings.demand_pool_source_table, ), demand_pool_excluded_strategy=_get_env( "DEMAND_POOL_EXCLUDED_STRATEGY", settings.demand_pool_excluded_strategy, ), demand_pool_top_n=_get_env_int( "DEMAND_POOL_TOP_N", settings.demand_pool_top_n, ), hot_demand_pool_strategy=_get_env( "HOT_DEMAND_POOL_STRATEGY", settings.hot_demand_pool_strategy, ), wxindex_score_threshold=_get_env_float( "WXINDEX_SCORE_THRESHOLD", _get_env_float( "HOT_DEMAND_POOL_WXINDEX_THRESHOLD", _get_env_float( "WXINDEX_LATEST_SCORE_THRESHOLD", settings.wxindex_score_threshold, ), ), ), postprocess_batch_size=_get_env_int( "POSTPROCESS_BATCH_SIZE", settings.postprocess_batch_size, ), contribution_match_llm_model=_get_env( "CONTRIBUTION_MATCH_LLM_MODEL", settings.contribution_match_llm_model, ), contribution_match_llm_max_attempts=_get_env_int( "CONTRIBUTION_MATCH_LLM_MAX_ATTEMPTS", settings.contribution_match_llm_max_attempts, ), contribution_match_llm_retry_sleep_seconds=_get_env_float( "CONTRIBUTION_MATCH_LLM_RETRY_SLEEP_SECONDS", settings.contribution_match_llm_retry_sleep_seconds, ), contribution_match_llm_max_tokens=_get_env_int( "CONTRIBUTION_MATCH_LLM_MAX_TOKENS", settings.contribution_match_llm_max_tokens, ), wxindex_llm_model=_get_env("WXINDEX_LLM_MODEL", settings.wxindex_llm_model), wxindex_llm_max_attempts=_get_env_int( "WXINDEX_LLM_MAX_ATTEMPTS", settings.wxindex_llm_max_attempts, ), wxindex_llm_max_tokens=_get_env_int( "WXINDEX_LLM_MAX_TOKENS", settings.wxindex_llm_max_tokens, ), wxindex_api_url=_get_env("WXINDEX_API_URL", settings.wxindex_api_url), wxindex_lookback_days=_get_env_int( "WXINDEX_LOOKBACK_DAYS", settings.wxindex_lookback_days, ), sources=_load_sources(), mysql=MysqlConfig( host=_get_env("MYSQL_HOST", settings.mysql_host), port=_get_env_int("MYSQL_PORT", settings.mysql_port), user=_get_env("MYSQL_USER", settings.mysql_user), password=_get_env("MYSQL_PASSWORD", settings.mysql_password), database=_get_env("MYSQL_DATABASE", settings.mysql_database), charset=_get_env("MYSQL_CHARSET", settings.mysql_charset), ), )