Browse Source

first commit

xueyiming 3 weeks ago
commit
bc80fd78ed

+ 20 - 0
.dockerignore

@@ -0,0 +1,20 @@
+.venv
+.git
+.idea
+__pycache__
+*.py[cod]
+*.pyo
+*.pyd
+.Python
+*.egg-info
+dist
+build
+.pytest_cache
+.mypy_cache
+.ruff_cache
+*.html
+.DS_Store
+.env
+.env.*
+*.log
+terminals

+ 58 - 0
.gitignore

@@ -0,0 +1,58 @@
+# Python
+__pycache__/
+*.py[cod]
+*$py.class
+*.so
+.Python
+*.egg
+*.egg-info/
+.eggs/
+dist/
+build/
+pip-wheel-metadata/
+*.manifest
+*.spec
+
+# Virtual environments
+.venv/
+venv/
+ENV/
+env/
+
+# Environment & secrets
+.env
+.env.*
+!.env.example
+
+# IDE
+.idea/
+.vscode/
+*.swp
+*.swo
+*~
+
+# OS
+.DS_Store
+Thumbs.db
+
+# Logs & runtime
+*.log
+logs/
+
+# Test & coverage
+.pytest_cache/
+.mypy_cache/
+.ruff_cache/
+.coverage
+htmlcov/
+.tox/
+.nox/
+
+# Jupyter
+.ipynb_checkpoints/
+
+# Generated outputs
+*.html
+
+# Docker local overrides
+docker-compose.override.yml

+ 27 - 0
Dockerfile

@@ -0,0 +1,27 @@
+FROM registry.cn-hangzhou.aliyuncs.com/stuuudy/python:3.11-slim AS base
+
+WORKDIR /app
+
+ENV PYTHONDONTWRITEBYTECODE=1 \
+    PYTHONUNBUFFERED=1 \
+    TZ=Asia/Shanghai \
+    PYTHONPATH=/app
+
+# 安装编译工具链(pyodps 等依赖构建 wheel 可能需要 gcc)
+RUN sed -i 's/deb.debian.org/mirrors.aliyun.com/g' /etc/apt/sources.list.d/debian.sources && \
+    apt-get update && \
+    apt-get install -y --no-install-recommends gcc g++ libc6-dev tzdata && \
+    ln -snf /usr/share/zoneinfo/$TZ /etc/localtime && \
+    echo $TZ > /etc/timezone && \
+    rm -rf /var/lib/apt/lists/*
+
+# 1) 安装依赖(利用 Docker layer 缓存)
+COPY requirements.txt /app/requirements.txt
+RUN pip install --no-cache-dir --upgrade pip -i https://pypi.tuna.tsinghua.edu.cn/simple/ && \
+    pip install --no-cache-dir -r requirements.txt -i https://mirrors.aliyun.com/pypi/simple/
+
+# 2) 复制源码
+COPY . /app
+
+# 定时任务调度:热点搜索(6/12/18) + 解构后处理(30分钟) + ODPS 需求表写入
+CMD ["python", "-m", "app.scheduler"]

+ 1 - 0
app/__init__.py

@@ -0,0 +1 @@
+"""External demand application package."""

+ 1 - 0
app/aliyun_odps/__init__.py

@@ -0,0 +1 @@
+"""Aliyun ODPS utilities."""

+ 32 - 0
app/aliyun_odps/client.py

@@ -0,0 +1,32 @@
+"""ODPS client factory."""
+
+from __future__ import annotations
+
+from typing import Any
+
+from app.core.config import settings
+from app.hot_content.exceptions import HotContentFlowError
+
+
+def get_odps_client() -> Any:
+    """Create a PyODPS client from environment-backed settings."""
+    if not settings.odps_access_id.strip():
+        raise HotContentFlowError("ODPS_ACCESS_ID is not configured")
+    if not settings.odps_access_key.strip():
+        raise HotContentFlowError("ODPS_ACCESS_KEY is not configured")
+    if not settings.odps_project.strip():
+        raise HotContentFlowError("ODPS_PROJECT is not configured")
+    if not settings.odps_endpoint.strip():
+        raise HotContentFlowError("ODPS_ENDPOINT is not configured")
+
+    from odps import ODPS
+
+    kwargs: dict[str, Any] = {
+        "access_id": settings.odps_access_id.strip(),
+        "secret_access_key": settings.odps_access_key.strip(),
+        "project": settings.odps_project.strip(),
+        "endpoint": settings.odps_endpoint.strip(),
+    }
+    if settings.odps_tunnel_endpoint.strip():
+        kwargs["tunnel_endpoint"] = settings.odps_tunnel_endpoint.strip()
+    return ODPS(**kwargs)

+ 9 - 0
app/core/__init__.py

@@ -0,0 +1,9 @@
+"""Core application utilities."""
+
+from app.core.open_router_llm import (
+    OpenRouterCallError,
+    chat_text,
+    create_chat_completion,
+)
+
+__all__ = ["OpenRouterCallError", "chat_text", "create_chat_completion"]

+ 328 - 0
app/core/config.py

@@ -0,0 +1,328 @@
+"""应用配置。
+
+配置优先级:环境变量 > 这里的默认值。
+"""
+
+from __future__ import annotations
+
+import json
+import os
+from dataclasses import dataclass, field
+from pathlib import Path
+from typing import Any
+
+
+PROJECT_ROOT = Path(__file__).resolve().parents[2]
+
+
+def _env(name: str, default: str) -> str:
+    value = os.getenv(name)
+    if value is None or value == "":
+        return default
+    return value
+
+
+def _env_int(name: str, default: int) -> int:
+    value = os.getenv(name)
+    if value is None or value == "":
+        return default
+    return int(value)
+
+
+def _env_int_optional(name: str, default: int | None = None) -> int | None:
+    value = os.getenv(name)
+    if value is None or value == "":
+        return default
+    return int(value)
+
+
+def _env_float(name: str, default: float) -> float:
+    value = os.getenv(name)
+    if value is None or value == "":
+        return default
+    return float(value)
+
+
+def _env_float_optional(name: str, default: float | None = None) -> float | None:
+    value = os.getenv(name)
+    if value is None or value == "":
+        return default
+    return float(value)
+
+
+def _env_first(names: tuple[str, ...], default: str) -> str:
+    for name in names:
+        value = os.getenv(name)
+        if value is not None and value != "":
+            return value
+    return default
+
+
+def _env_bool(name: str, default: bool) -> bool:
+    value = os.getenv(name)
+    if value is None or value == "":
+        return default
+    return value.strip().lower() in {"1", "true", "yes", "y", "on"}
+
+
+def _load_json_file(path_value: str) -> Any:
+    path = Path(path_value).expanduser()
+    if not path.is_absolute():
+        path = PROJECT_ROOT / path
+    return json.loads(path.read_text(encoding="utf-8"))
+
+
+def _env_json(name: str, default: Any, file_env_name: str | None = None) -> Any:
+    if file_env_name:
+        file_path = os.getenv(file_env_name)
+        if file_path:
+            return _load_json_file(file_path)
+
+    value = os.getenv(name)
+    if value is None or value == "":
+        return default
+    return json.loads(value)
+
+
+@dataclass(frozen=True)
+class Settings:
+    crawapi_base_url: str = "http://crawapi.piaoquantv.com"
+    crawapi_hot_content_rank_path: str = "/crawler/jin_ri_re_bang/content_rank"
+    crawapi_keyword_search_path: str = "/crawler/bai_du/keyword"
+
+    decode_api_url: str = "https://aigc-api.aiddit.com/aigc/api/task/decode"
+    decode_result_api_url: str = "https://aigc-api.aiddit.com/aigc/api/task/decode/result"
+    decode_config_id: int = 70
+    request_timeout_seconds: int = 180
+    https_verify_ssl: bool = False
+
+    hot_flow_cron_hours: str = "6,12,18"
+    hot_flow_cron_minute: int = 0
+    hot_flow_interval_seconds: int = 1800
+    decode_result_flow_interval_seconds: int = 1800
+    decode_result_batch_size: int = 50
+    contribution_score_threshold: float = 0.6
+    hot_flow_sources: list[dict[str, Any]] = field(
+        default_factory=lambda: [
+            {
+                "source": "百度",
+                "count": 10,
+            },
+            {
+                "source": "微博",
+                "count": 10,
+            },
+            {
+                "source": "微信",
+                "count": 10,
+            }
+        ]
+    )
+
+    mysql_host: str = "rm-t4nh1xx6o2a6vj8qu3o.mysql.singapore.rds.aliyuncs.com"
+    mysql_port: int = 3306
+    mysql_user: str = "content_rw"
+    mysql_password: str = "bC1aH4bA1lB0"
+    mysql_database: str = "external_demand"
+    mysql_charset: str = "utf8mb4"
+
+    open_router_api_key: str = "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"
+    open_router_default_model: str = "anthropic/claude-haiku-4-5"
+    open_router_timeout_seconds: int = 60
+    open_router_http_referer: str = ""
+    open_router_app_title: str = "external_demand"
+    open_router_base_url: str = "https://openrouter.ai/api/v1"
+    open_router_temperature: float | None = 0.7
+    open_router_max_tokens: int | None = 20000
+
+    odps_access_id: str = "LTAI9EBa0bd5PrDa"
+    odps_access_key: str = "vAalxds7YxhfOA2yVv8GziCg3Y87v5"
+    odps_project: str = "loghubods"
+    odps_endpoint: str = "http://service.odps.aliyun.com/api"
+    odps_tunnel_endpoint: str = ""
+
+    demand_pool_source_table: str = "dwd_multi_demand_pool_di"
+    demand_pool_excluded_strategy: str = "当下供需gap-分词"
+    demand_pool_top_n: int = 200
+    hot_demand_pool_strategy: str = "近期热点"
+    wxindex_score_threshold: float = 1_000_000.0
+
+    postprocess_batch_size: int = 20
+    contribution_match_llm_model: str = ""
+    contribution_match_llm_max_attempts: int = 3
+    contribution_match_llm_retry_sleep_seconds: float = 1.0
+    contribution_match_llm_max_tokens: int = 4000
+    wxindex_llm_model: str = ""
+    wxindex_llm_max_attempts: int = 3
+    wxindex_llm_max_tokens: int = 1200
+    wxindex_api_url: str = "http://crawapi.piaoquantv.com/crawler/wei_xin/wxindex"
+    wxindex_lookback_days: int = 7
+
+    @classmethod
+    def from_env(cls) -> "Settings":
+        defaults = cls()
+        return cls(
+            crawapi_base_url=_env("CRAWAPI_BASE_URL", defaults.crawapi_base_url),
+            crawapi_hot_content_rank_path=_env(
+                "CRAWAPI_HOT_CONTENT_RANK_PATH",
+                defaults.crawapi_hot_content_rank_path,
+            ),
+            crawapi_keyword_search_path=_env(
+                "CRAWAPI_KEYWORD_SEARCH_PATH",
+                defaults.crawapi_keyword_search_path,
+            ),
+            decode_api_url=_env("DECODE_API_URL", defaults.decode_api_url),
+            decode_result_api_url=_env(
+                "DECODE_RESULT_API_URL",
+                defaults.decode_result_api_url,
+            ),
+            decode_config_id=_env_int("DECODE_CONFIG_ID", defaults.decode_config_id),
+            request_timeout_seconds=_env_int(
+                "REQUEST_TIMEOUT_SECONDS",
+                defaults.request_timeout_seconds,
+            ),
+            https_verify_ssl=_env_bool("HTTPS_VERIFY_SSL", defaults.https_verify_ssl),
+            hot_flow_cron_hours=_env(
+                "HOT_FLOW_CRON_HOURS",
+                defaults.hot_flow_cron_hours,
+            ),
+            hot_flow_cron_minute=_env_int(
+                "HOT_FLOW_CRON_MINUTE",
+                defaults.hot_flow_cron_minute,
+            ),
+            hot_flow_interval_seconds=_env_int(
+                "HOT_FLOW_INTERVAL_SECONDS",
+                defaults.hot_flow_interval_seconds,
+            ),
+            decode_result_flow_interval_seconds=_env_int(
+                "DECODE_RESULT_FLOW_INTERVAL_SECONDS",
+                defaults.decode_result_flow_interval_seconds,
+            ),
+            decode_result_batch_size=_env_int(
+                "DECODE_RESULT_BATCH_SIZE",
+                defaults.decode_result_batch_size,
+            ),
+            contribution_score_threshold=float(
+                _env(
+                    "CONTRIBUTION_SCORE_THRESHOLD",
+                    str(defaults.contribution_score_threshold),
+                )
+            ),
+            hot_flow_sources=_env_json(
+                "HOT_FLOW_SOURCES_JSON",
+                defaults.hot_flow_sources,
+                "HOT_FLOW_SOURCES_FILE",
+            ),
+            mysql_host=_env("MYSQL_HOST", defaults.mysql_host),
+            mysql_port=_env_int("MYSQL_PORT", defaults.mysql_port),
+            mysql_user=_env("MYSQL_USER", defaults.mysql_user),
+            mysql_password=_env("MYSQL_PASSWORD", defaults.mysql_password),
+            mysql_database=_env("MYSQL_DATABASE", defaults.mysql_database),
+            mysql_charset=_env("MYSQL_CHARSET", defaults.mysql_charset),
+            open_router_api_key=_env_first(
+                ("OPEN_ROUTER_API_KEY", "OPENROUTER_API_KEY"),
+                defaults.open_router_api_key,
+            ),
+            open_router_default_model=_env(
+                "OPEN_ROUTER_DEFAULT_MODEL",
+                defaults.open_router_default_model,
+            ),
+            open_router_timeout_seconds=_env_int(
+                "OPEN_ROUTER_TIMEOUT_SECONDS",
+                defaults.open_router_timeout_seconds,
+            ),
+            open_router_http_referer=_env_first(
+                ("OPEN_ROUTER_HTTP_REFERER", "OPENROUTER_HTTP_REFERER"),
+                defaults.open_router_http_referer,
+            ),
+            open_router_app_title=_env_first(
+                ("OPEN_ROUTER_APP_TITLE", "OPENROUTER_X_OPEN_ROUTER_TITLE"),
+                defaults.open_router_app_title,
+            ),
+            open_router_base_url=_env(
+                "OPEN_ROUTER_BASE_URL",
+                defaults.open_router_base_url,
+            ),
+            open_router_temperature=_env_float_optional(
+                "OPEN_ROUTER_TEMPERATURE",
+                defaults.open_router_temperature,
+            ),
+            open_router_max_tokens=_env_int_optional(
+                "OPEN_ROUTER_MAX_TOKENS",
+                defaults.open_router_max_tokens,
+            ),
+            odps_access_id=_env("ODPS_ACCESS_ID", defaults.odps_access_id),
+            odps_access_key=_env("ODPS_ACCESS_KEY", defaults.odps_access_key),
+            odps_project=_env("ODPS_PROJECT", defaults.odps_project),
+            odps_endpoint=_env("ODPS_ENDPOINT", defaults.odps_endpoint),
+            odps_tunnel_endpoint=_env(
+                "ODPS_TUNNEL_ENDPOINT",
+                defaults.odps_tunnel_endpoint,
+            ),
+            demand_pool_source_table=_env(
+                "DEMAND_POOL_SOURCE_TABLE",
+                defaults.demand_pool_source_table,
+            ),
+            demand_pool_excluded_strategy=_env(
+                "DEMAND_POOL_EXCLUDED_STRATEGY",
+                defaults.demand_pool_excluded_strategy,
+            ),
+            demand_pool_top_n=_env_int(
+                "DEMAND_POOL_TOP_N",
+                defaults.demand_pool_top_n,
+            ),
+            hot_demand_pool_strategy=_env(
+                "HOT_DEMAND_POOL_STRATEGY",
+                defaults.hot_demand_pool_strategy,
+            ),
+            wxindex_score_threshold=_env_float(
+                "WXINDEX_SCORE_THRESHOLD",
+                _env_float(
+                    "HOT_DEMAND_POOL_WXINDEX_THRESHOLD",
+                    _env_float(
+                        "WXINDEX_LATEST_SCORE_THRESHOLD",
+                        defaults.wxindex_score_threshold,
+                    ),
+                ),
+            ),
+            postprocess_batch_size=_env_int(
+                "POSTPROCESS_BATCH_SIZE",
+                defaults.postprocess_batch_size,
+            ),
+            contribution_match_llm_model=_env(
+                "CONTRIBUTION_MATCH_LLM_MODEL",
+                defaults.contribution_match_llm_model,
+            ),
+            contribution_match_llm_max_attempts=_env_int(
+                "CONTRIBUTION_MATCH_LLM_MAX_ATTEMPTS",
+                defaults.contribution_match_llm_max_attempts,
+            ),
+            contribution_match_llm_retry_sleep_seconds=_env_float(
+                "CONTRIBUTION_MATCH_LLM_RETRY_SLEEP_SECONDS",
+                defaults.contribution_match_llm_retry_sleep_seconds,
+            ),
+            contribution_match_llm_max_tokens=_env_int(
+                "CONTRIBUTION_MATCH_LLM_MAX_TOKENS",
+                defaults.contribution_match_llm_max_tokens,
+            ),
+            wxindex_llm_model=_env(
+                "WXINDEX_LLM_MODEL",
+                defaults.wxindex_llm_model,
+            ),
+            wxindex_llm_max_attempts=_env_int(
+                "WXINDEX_LLM_MAX_ATTEMPTS",
+                defaults.wxindex_llm_max_attempts,
+            ),
+            wxindex_llm_max_tokens=_env_int(
+                "WXINDEX_LLM_MAX_TOKENS",
+                defaults.wxindex_llm_max_tokens,
+            ),
+            wxindex_api_url=_env("WXINDEX_API_URL", defaults.wxindex_api_url),
+            wxindex_lookback_days=_env_int(
+                "WXINDEX_LOOKBACK_DAYS",
+                defaults.wxindex_lookback_days,
+            ),
+        )
+
+
+settings = Settings.from_env()

+ 138 - 0
app/core/open_router_llm.py

@@ -0,0 +1,138 @@
+"""OpenRouter LLM 调用工具。"""
+
+from __future__ import annotations
+
+from typing import Any
+
+from openrouter import OpenRouter
+from openrouter.errors.openroutererror import OpenRouterError as SdkOpenRouterError
+
+from app.core.config import settings
+
+_ALLOWED_ROLES = frozenset({"system", "user", "assistant"})
+
+
+class OpenRouterCallError(Exception):
+    """项目内 OpenRouter 调用失败。"""
+
+    def __init__(self, message: str, *, cause: Exception | None = None):
+        super().__init__(message)
+        self.cause = cause
+
+
+def _build_client() -> OpenRouter:
+    kwargs: dict[str, Any] = {
+        "api_key": settings.open_router_api_key.strip(),
+        "timeout_ms": max(settings.open_router_timeout_seconds, 1) * 1000,
+    }
+    if settings.open_router_http_referer.strip():
+        kwargs["http_referer"] = settings.open_router_http_referer.strip()
+    if settings.open_router_app_title.strip():
+        kwargs["x_open_router_title"] = settings.open_router_app_title.strip()
+    if settings.open_router_base_url.strip():
+        kwargs["server_url"] = settings.open_router_base_url.strip()
+    return OpenRouter(**kwargs)
+
+
+def _normalize_messages(messages: list[dict[str, str]]) -> list[dict[str, str]]:
+    if not messages:
+        raise OpenRouterCallError("messages must not be empty")
+
+    normalized: list[dict[str, str]] = []
+    for index, item in enumerate(messages):
+        role = (item.get("role") or "").strip()
+        content = item.get("content")
+        if role not in _ALLOWED_ROLES:
+            raise OpenRouterCallError(
+                f"messages[{index}].role must be one of: system, user, assistant"
+            )
+        if content is None or not str(content).strip():
+            raise OpenRouterCallError(f"messages[{index}].content must not be empty")
+        normalized.append({"role": role, "content": str(content)})
+    return normalized
+
+
+def _normalize_chat_response(response: Any) -> dict[str, Any]:
+    choices = getattr(response, "choices", None) or []
+    first_choice = choices[0] if choices else None
+    message = getattr(first_choice, "message", None) if first_choice else None
+    content = getattr(message, "content", None) or ""
+
+    usage = getattr(response, "usage", None)
+    if hasattr(usage, "model_dump"):
+        usage = usage.model_dump()
+    elif usage is not None and not isinstance(usage, dict):
+        usage = dict(usage) if hasattr(usage, "__iter__") else usage
+
+    return {
+        "id": getattr(response, "id", None),
+        "model": getattr(response, "model", None),
+        "content": content,
+        "usage": usage,
+        "finish_reason": getattr(first_choice, "finish_reason", None) if first_choice else None,
+    }
+
+
+def create_chat_completion(
+    messages: list[dict[str, str]],
+    *,
+    model: str | None = None,
+    temperature: float | None = None,
+    max_tokens: int | None = None,
+) -> dict[str, Any]:
+    """通过 OpenRouter SDK 调用对话补全,返回结构化结果。"""
+    if not settings.open_router_api_key.strip():
+        raise OpenRouterCallError("OPEN_ROUTER_API_KEY or OPENROUTER_API_KEY is not configured")
+
+    normalized_messages = _normalize_messages(messages)
+    model_name = (model or settings.open_router_default_model).strip()
+    if not model_name:
+        raise OpenRouterCallError(
+            "model is required (set OPEN_ROUTER_DEFAULT_MODEL or pass model=...)"
+        )
+
+    request_kwargs: dict[str, Any] = {
+        "model": model_name,
+        "messages": normalized_messages,
+    }
+    temperature_value = (
+        temperature if temperature is not None else settings.open_router_temperature
+    )
+    max_tokens_value = max_tokens if max_tokens is not None else settings.open_router_max_tokens
+    if temperature_value is not None:
+        request_kwargs["temperature"] = temperature_value
+    if max_tokens_value is not None:
+        request_kwargs["max_tokens"] = max_tokens_value
+
+    try:
+        with _build_client() as client:
+            response = client.chat.send(**request_kwargs)
+    except SdkOpenRouterError as exc:
+        raise OpenRouterCallError(str(exc), cause=exc) from exc
+    except Exception as exc:
+        raise OpenRouterCallError(f"OpenRouter SDK error: {exc}", cause=exc) from exc
+
+    return _normalize_chat_response(response)
+
+
+def chat_text(
+    user_prompt: str,
+    *,
+    system_prompt: str | None = None,
+    model: str | None = None,
+    temperature: float | None = None,
+    max_tokens: int | None = None,
+) -> str:
+    """单轮对话便捷方法,直接返回模型回复文本。"""
+    messages: list[dict[str, str]] = []
+    if system_prompt and system_prompt.strip():
+        messages.append({"role": "system", "content": system_prompt.strip()})
+    messages.append({"role": "user", "content": user_prompt.strip()})
+
+    result = create_chat_completion(
+        messages,
+        model=model,
+        temperature=temperature,
+        max_tokens=max_tokens,
+    )
+    return str(result.get("content") or "")

+ 1 - 0
app/hot_content/__init__.py

@@ -0,0 +1 @@
+"""热点内容服务包。"""

+ 32 - 0
app/hot_content/cli.py

@@ -0,0 +1,32 @@
+"""热点内容流程命令行入口。"""
+
+from __future__ import annotations
+
+import argparse
+import json
+
+from app.hot_content.config import load_flow_config
+from app.hot_content.service import run_once
+
+
+def parse_args() -> argparse.Namespace:
+    parser = argparse.ArgumentParser(description="热点内容 MySQL 入库与定时调度流程")
+    parser.add_argument("--once", action="store_true", help="只执行一次,不进入循环调度")
+    return parser.parse_args()
+
+
+def main() -> None:
+    args = parse_args()
+    config = load_flow_config()
+    if args.once:
+        summary = run_once(config)
+        print(json.dumps(summary, ensure_ascii=False, indent=2))
+        return
+
+    from app.scheduler import start_scheduler
+
+    start_scheduler()
+
+
+if __name__ == "__main__":
+    main()

+ 163 - 0
app/hot_content/client.py

@@ -0,0 +1,163 @@
+"""热点内容流程外部 API 客户端。"""
+
+from __future__ import annotations
+
+import json
+import socket
+import ssl
+import urllib.error
+import urllib.request
+from typing import Any
+
+from app.hot_content.exceptions import HotContentFlowError
+
+
+def build_url(base_url: str, path: str) -> str:
+    return f"{base_url.rstrip('/')}/{path.lstrip('/')}"
+
+
+def render_template(value: Any, variables: dict[str, str]) -> Any:
+    if isinstance(value, str):
+        return value.format(**variables)
+    if isinstance(value, list):
+        return [render_template(item, variables) for item in value]
+    if isinstance(value, dict):
+        return {key: render_template(item, variables) for key, item in value.items()}
+    return value
+
+
+class JsonApiClient:
+    def __init__(self, timeout_seconds: int, verify_ssl: bool):
+        self.timeout_seconds = timeout_seconds
+        self.verify_ssl = verify_ssl
+
+    def post_json(self, url: str, payload: dict[str, Any]) -> dict[str, Any]:
+        body_bytes = json.dumps(payload, ensure_ascii=False).encode("utf-8")
+        req = urllib.request.Request(
+            url,
+            data=body_bytes,
+            headers={"Content-Type": "application/json"},
+            method="POST",
+        )
+        try:
+            with urllib.request.urlopen(
+                req,
+                timeout=self.timeout_seconds,
+                context=self._https_context(),
+            ) as resp:
+                raw = resp.read().decode("utf-8")
+        except urllib.error.HTTPError as exc:
+            detail = exc.read().decode("utf-8", errors="replace")
+            raise HotContentFlowError(f"api http error: {exc.code} {detail}") from exc
+        except (urllib.error.URLError, TimeoutError, socket.timeout) as exc:
+            raise HotContentFlowError(f"api timeout/url error: {exc}") from exc
+
+        try:
+            data = json.loads(raw)
+        except json.JSONDecodeError as exc:
+            raise HotContentFlowError(f"api invalid json: {exc}") from exc
+
+        code = data.get("code")
+        if code is not None and int(code) != 0:
+            raise HotContentFlowError(f"api business error: {data}")
+        return data
+
+    def _https_context(self) -> ssl.SSLContext | None:
+        if self.verify_ssl:
+            return None
+        context = ssl.create_default_context()
+        context.check_hostname = False
+        context.verify_mode = ssl.CERT_NONE
+        return context
+
+
+def extract_rank_items(resp: dict[str, Any], source: str) -> list[dict[str, Any]]:
+    data = resp.get("data") or {}
+    rows = data.get("data") if isinstance(data, dict) else data
+    if not isinstance(rows, list):
+        return []
+
+    result: list[dict[str, Any]] = []
+    for row in rows:
+        if not isinstance(row, dict):
+            continue
+        rank_list = row.get("rankList")
+        if isinstance(rank_list, list):
+            row_source = str(row.get("source") or "").strip()
+            if row_source and row_source != source:
+                continue
+            result.extend(item for item in rank_list if isinstance(item, dict))
+            continue
+
+        item_source = str(row.get("source") or source).strip()
+        if item_source == source:
+            result.append(row)
+    return result
+
+
+def extract_keyword_items(resp: dict[str, Any]) -> list[dict[str, Any]]:
+    data = resp.get("data") or {}
+    rows = data.get("data") if isinstance(data, dict) else data
+    if not isinstance(rows, list):
+        return []
+    return [item for item in rows if isinstance(item, dict)]
+
+
+def pick_first_valid_content(items: list[dict[str, Any]]) -> dict[str, Any] | None:
+    for idx, item in enumerate(items):
+        content_title = str(item.get("title") or "").strip()
+        body_text = str(item.get("content") or "").strip()
+        if not body_text:
+            body_text = str(item.get("description") or "").strip()
+        if not content_title or not body_text:
+            continue
+        return {
+            "selected_index": idx,
+            "content_title": content_title,
+            "body_text": body_text,
+            "url": str(item.get("url") or "").strip(),
+            "content_source": str(item.get("source") or "").strip(),
+            "raw_json": item,
+        }
+    return None
+
+
+def extract_decode_item_map(resp: dict[str, Any]) -> dict[str, dict[str, Any]]:
+    rows = resp.get("data") if isinstance(resp, dict) else []
+    if not isinstance(rows, list):
+        return {}
+
+    result: dict[str, dict[str, Any]] = {}
+    for row in rows:
+        if not isinstance(row, dict):
+            continue
+        channel_content_id = str(row.get("channelContentId") or "").strip()
+        if channel_content_id:
+            result[channel_content_id] = row
+    return result
+
+
+def parse_decode_data_content(item: dict[str, Any]) -> dict[str, Any]:
+    channel_content_id = str(item.get("channelContentId") or "").strip()
+    raw_data_content = str(item.get("dataContent") or "")
+    if not raw_data_content.strip():
+        return {
+            "channelContentId": channel_content_id,
+            "status": item.get("status"),
+            "errorMessage": item.get("errorMessage"),
+            "html": item.get("html"),
+            "dataContent": None,
+        }
+
+    try:
+        parsed = json.loads(raw_data_content)
+    except json.JSONDecodeError as exc:
+        raise HotContentFlowError(
+            f"invalid dataContent json for channelContentId={channel_content_id}: {exc}"
+        ) from exc
+    if not isinstance(parsed, dict):
+        raise HotContentFlowError(
+            f"dataContent is not json object for channelContentId={channel_content_id}"
+        )
+    parsed["html"] = item.get("html")
+    return parsed

+ 244 - 0
app/hot_content/config.py

@@ -0,0 +1,244 @@
+"""热点内容流程配置加载。"""
+
+from __future__ import annotations
+
+import json
+import os
+from pathlib import Path
+from typing import Any
+
+from app.core.config import PROJECT_ROOT, settings
+from app.hot_content.exceptions import HotContentFlowError
+from app.hot_content.types import FlowConfig, HotSourceConfig, MysqlConfig
+
+
+def _get_env(name: str, default: str = "") -> str:
+    value = os.getenv(name)
+    if value is None or value == "":
+        return default
+    return value
+
+
+def _get_env_int(name: str, default: int) -> int:
+    raw = os.getenv(name)
+    if raw is None or raw == "":
+        return default
+    try:
+        return int(raw)
+    except ValueError as exc:
+        raise HotContentFlowError(f"invalid integer env {name}={raw!r}") from exc
+
+
+def _get_env_float(name: str, default: float) -> float:
+    raw = os.getenv(name)
+    if raw is None or raw == "":
+        return default
+    try:
+        return float(raw)
+    except ValueError as exc:
+        raise HotContentFlowError(f"invalid float env {name}={raw!r}") from exc
+
+
+def _get_env_bool(name: str, default: bool) -> bool:
+    raw = os.getenv(name)
+    if raw is None or raw == "":
+        return default
+    return raw.strip().lower() in {"1", "true", "yes", "y", "on"}
+
+
+def _load_json_from_env_or_file(env_name: str, file_env_name: str) -> Any | None:
+    file_path = os.getenv(file_env_name)
+    if file_path:
+        path = Path(file_path).expanduser()
+        if not path.is_absolute():
+            path = PROJECT_ROOT / path
+        try:
+            return json.loads(path.read_text(encoding="utf-8"))
+        except json.JSONDecodeError as exc:
+            raise HotContentFlowError(f"invalid json file {path}") from exc
+
+    raw = os.getenv(env_name)
+    if not raw:
+        return None
+    try:
+        return json.loads(raw)
+    except json.JSONDecodeError as exc:
+        raise HotContentFlowError(f"invalid json env {env_name}") from exc
+
+
+def _normalize_source_config(item: Any) -> HotSourceConfig:
+    if isinstance(item, str):
+        source = item.strip()
+        if not source:
+            raise HotContentFlowError("hot source cannot be empty")
+        return HotSourceConfig(source=source)
+    if not isinstance(item, dict):
+        raise HotContentFlowError(f"invalid hot source config: {item!r}")
+
+    source = str(item.get("source") or item.get("source_name") or "").strip()
+    if not source:
+        raise HotContentFlowError(f"hot source missing source: {item!r}")
+
+    return HotSourceConfig(
+        source=source,
+        count=int(item.get("count") or item.get("limit") or item.get("rank_limit") or 10),
+    )
+
+
+def _load_sources() -> list[HotSourceConfig]:
+    raw_sources = _load_json_from_env_or_file("HOT_FLOW_SOURCES_JSON", "HOT_FLOW_SOURCES_FILE")
+    if raw_sources is None:
+        raw_sources = settings.hot_flow_sources
+    if not isinstance(raw_sources, list):
+        raise HotContentFlowError("HOT_FLOW_SOURCES_JSON/HOT_FLOW_SOURCES_FILE must be a list")
+
+    sources = [_normalize_source_config(item) for item in raw_sources]
+    if not sources:
+        raise HotContentFlowError("hot sources cannot be empty")
+    return sources
+
+
+def _parse_cron_hours(value: str) -> str:
+    hours = [item.strip() for item in value.split(",") if item.strip()]
+    if not hours:
+        raise HotContentFlowError("hot flow cron hours cannot be empty")
+    normalized: list[str] = []
+    for hour in hours:
+        try:
+            hour_num = int(hour)
+        except ValueError as exc:
+            raise HotContentFlowError(f"invalid hot flow cron hour: {hour!r}") from exc
+        if not 0 <= hour_num <= 23:
+            raise HotContentFlowError(f"hot flow cron hour out of range: {hour_num}")
+        normalized.append(str(hour_num))
+    return ",".join(normalized)
+
+
+def load_flow_config(interval_override: int | None = None) -> FlowConfig:
+    crawapi_base_url = _get_env("CRAWAPI_BASE_URL", settings.crawapi_base_url).rstrip("/")
+    hot_rank_path = _get_env(
+        "CRAWAPI_HOT_CONTENT_RANK_PATH",
+        settings.crawapi_hot_content_rank_path,
+    )
+    if not crawapi_base_url:
+        raise HotContentFlowError("missing CRAWAPI_BASE_URL or settings.crawapi_base_url")
+    if not hot_rank_path:
+        raise HotContentFlowError(
+            "missing CRAWAPI_HOT_CONTENT_RANK_PATH or settings.crawapi_hot_content_rank_path"
+        )
+
+    interval_seconds = (
+        interval_override
+        if interval_override is not None
+        else _get_env_int("HOT_FLOW_INTERVAL_SECONDS", settings.hot_flow_interval_seconds)
+    )
+    return FlowConfig(
+        crawapi_base_url=crawapi_base_url,
+        hot_rank_path=hot_rank_path,
+        keyword_search_path=_get_env(
+            "CRAWAPI_KEYWORD_SEARCH_PATH",
+            settings.crawapi_keyword_search_path,
+        ),
+        decode_api_url=_get_env("DECODE_API_URL", settings.decode_api_url),
+        decode_result_api_url=_get_env(
+            "DECODE_RESULT_API_URL",
+            settings.decode_result_api_url,
+        ),
+        decode_config_id=_get_env_int("DECODE_CONFIG_ID", settings.decode_config_id),
+        request_timeout_seconds=_get_env_int(
+            "REQUEST_TIMEOUT_SECONDS",
+            settings.request_timeout_seconds,
+        ),
+        https_verify_ssl=_get_env_bool("HTTPS_VERIFY_SSL", settings.https_verify_ssl),
+        hot_flow_cron_hours=_parse_cron_hours(
+            _get_env("HOT_FLOW_CRON_HOURS", settings.hot_flow_cron_hours)
+        ),
+        hot_flow_cron_minute=_get_env_int(
+            "HOT_FLOW_CRON_MINUTE",
+            settings.hot_flow_cron_minute,
+        ),
+        schedule_interval_seconds=interval_seconds,
+        decode_result_interval_seconds=_get_env_int(
+            "DECODE_RESULT_FLOW_INTERVAL_SECONDS",
+            settings.decode_result_flow_interval_seconds,
+        ),
+        decode_result_batch_size=_get_env_int(
+            "DECODE_RESULT_BATCH_SIZE",
+            settings.decode_result_batch_size,
+        ),
+        contribution_score_threshold=float(
+            _get_env(
+                "CONTRIBUTION_SCORE_THRESHOLD",
+                str(settings.contribution_score_threshold),
+            )
+        ),
+        demand_pool_source_table=_get_env(
+            "DEMAND_POOL_SOURCE_TABLE",
+            settings.demand_pool_source_table,
+        ),
+        demand_pool_excluded_strategy=_get_env(
+            "DEMAND_POOL_EXCLUDED_STRATEGY",
+            settings.demand_pool_excluded_strategy,
+        ),
+        demand_pool_top_n=_get_env_int(
+            "DEMAND_POOL_TOP_N",
+            settings.demand_pool_top_n,
+        ),
+        hot_demand_pool_strategy=_get_env(
+            "HOT_DEMAND_POOL_STRATEGY",
+            settings.hot_demand_pool_strategy,
+        ),
+        wxindex_score_threshold=_get_env_float(
+            "WXINDEX_SCORE_THRESHOLD",
+            _get_env_float(
+                "HOT_DEMAND_POOL_WXINDEX_THRESHOLD",
+                _get_env_float(
+                    "WXINDEX_LATEST_SCORE_THRESHOLD",
+                    settings.wxindex_score_threshold,
+                ),
+            ),
+        ),
+        postprocess_batch_size=_get_env_int(
+            "POSTPROCESS_BATCH_SIZE",
+            settings.postprocess_batch_size,
+        ),
+        contribution_match_llm_model=_get_env(
+            "CONTRIBUTION_MATCH_LLM_MODEL",
+            settings.contribution_match_llm_model,
+        ),
+        contribution_match_llm_max_attempts=_get_env_int(
+            "CONTRIBUTION_MATCH_LLM_MAX_ATTEMPTS",
+            settings.contribution_match_llm_max_attempts,
+        ),
+        contribution_match_llm_retry_sleep_seconds=_get_env_float(
+            "CONTRIBUTION_MATCH_LLM_RETRY_SLEEP_SECONDS",
+            settings.contribution_match_llm_retry_sleep_seconds,
+        ),
+        contribution_match_llm_max_tokens=_get_env_int(
+            "CONTRIBUTION_MATCH_LLM_MAX_TOKENS",
+            settings.contribution_match_llm_max_tokens,
+        ),
+        wxindex_llm_model=_get_env("WXINDEX_LLM_MODEL", settings.wxindex_llm_model),
+        wxindex_llm_max_attempts=_get_env_int(
+            "WXINDEX_LLM_MAX_ATTEMPTS",
+            settings.wxindex_llm_max_attempts,
+        ),
+        wxindex_llm_max_tokens=_get_env_int(
+            "WXINDEX_LLM_MAX_TOKENS",
+            settings.wxindex_llm_max_tokens,
+        ),
+        wxindex_api_url=_get_env("WXINDEX_API_URL", settings.wxindex_api_url),
+        wxindex_lookback_days=_get_env_int(
+            "WXINDEX_LOOKBACK_DAYS",
+            settings.wxindex_lookback_days,
+        ),
+        sources=_load_sources(),
+        mysql=MysqlConfig(
+            host=_get_env("MYSQL_HOST", settings.mysql_host),
+            port=_get_env_int("MYSQL_PORT", settings.mysql_port),
+            user=_get_env("MYSQL_USER", settings.mysql_user),
+            password=_get_env("MYSQL_PASSWORD", settings.mysql_password),
+            database=_get_env("MYSQL_DATABASE", settings.mysql_database),
+            charset=_get_env("MYSQL_CHARSET", settings.mysql_charset),
+        ),
+    )

+ 125 - 0
app/hot_content/contribution.py

@@ -0,0 +1,125 @@
+"""解构结果中的高贡献词和点提取。"""
+
+from __future__ import annotations
+
+from typing import Any
+
+
+POINT_SOURCE_KEYS = ("关键点", "灵感点", "目的点")
+
+
+def build_contribution_points(
+    decode_result: dict[str, Any],
+    *,
+    score_threshold: float,
+) -> dict[str, Any]:
+    high_words = extract_high_contribution_words(decode_result, score_threshold=score_threshold)
+    return {
+        "channelContentId": str(
+            decode_result.get("帖子ID")
+            or (decode_result.get("target_post") or {}).get("channel_content_id")
+            or ""
+        ),
+        "高贡献词列表": high_words,
+        "点列表": extract_matched_points(decode_result, high_words),
+    }
+
+
+def extract_high_contribution_words(
+    decode_result: dict[str, Any],
+    *,
+    score_threshold: float,
+) -> list[dict[str, Any]]:
+    rows = decode_result.get("contribution_results") or []
+    if not isinstance(rows, list):
+        return []
+
+    words: list[dict[str, Any]] = []
+    seen: set[str] = set()
+    for row in rows:
+        if not isinstance(row, dict):
+            continue
+        word = str(row.get("词") or "").strip()
+        score = _to_float(row.get("贡献度"))
+        if not word or score is None or score < score_threshold:
+            continue
+        if word in seen:
+            continue
+        seen.add(word)
+        words.append({"词": word, "贡献度": score})
+    return words
+
+
+def extract_matched_points(
+    decode_result: dict[str, Any],
+    high_words: list[dict[str, Any]],
+) -> list[dict[str, Any]]:
+    matched_points: list[dict[str, Any]] = []
+    seen: set[tuple[str, str]] = set()
+    for source_key in POINT_SOURCE_KEYS:
+        points = decode_result.get(source_key) or []
+        if not isinstance(points, list):
+            continue
+        for point_obj in points:
+            if not isinstance(point_obj, dict):
+                continue
+            point_name = str(point_obj.get("点") or "").strip()
+            if not point_name:
+                continue
+
+            token_words = collect_token_words(point_obj)
+            if not token_words:
+                continue
+
+            hit_words = [
+                {"词": word_item["词"], "贡献度": word_item["贡献度"]}
+                for word_item in high_words
+                if word_matches_tokens(word_item["词"], token_words)
+            ]
+            if not hit_words:
+                continue
+
+            dedup_key = (source_key, point_name)
+            if dedup_key in seen:
+                continue
+            seen.add(dedup_key)
+            matched_points.append(
+                {
+                    "来源": source_key,
+                    "点": point_name,
+                    "点描述": str(point_obj.get("点描述") or ""),
+                    "匹配词列表": hit_words,
+                    "分词结果": token_words,
+                }
+            )
+    return matched_points
+
+
+def collect_token_words(point_obj: dict[str, Any]) -> list[str]:
+    token_rows = point_obj.get("分词结果") or []
+    if not isinstance(token_rows, list):
+        return []
+
+    token_words: list[str] = []
+    for token in token_rows:
+        if isinstance(token, dict):
+            word = str(token.get("词") or "").strip()
+        else:
+            word = str(token or "").strip()
+        if word:
+            token_words.append(word)
+    return token_words
+
+
+def word_matches_tokens(word: str, token_words: list[str]) -> bool:
+    for token in token_words:
+        if word in token or token in word:
+            return True
+    return False
+
+
+def _to_float(value: Any) -> float | None:
+    try:
+        return float(value)
+    except (TypeError, ValueError):
+        return None

+ 135 - 0
app/hot_content/decode_result_service.py

@@ -0,0 +1,135 @@
+"""解构结果拉取与贡献点提取服务。"""
+
+from __future__ import annotations
+
+from datetime import datetime
+from typing import Any
+
+from app.hot_content.client import JsonApiClient, parse_decode_data_content
+from app.hot_content.config import load_flow_config
+from app.hot_content.contribution import build_contribution_points
+from app.hot_content.exceptions import HotContentFlowError
+from app.hot_content.repository import HotContentRepository
+from app.hot_content.status import ExecutionStatus, decode_api_status_to_code
+from app.hot_content.timezone import SHANGHAI_TZ
+from app.hot_content.types import FlowConfig
+
+
+class DecodeResultFlowService:
+    def __init__(
+        self,
+        config: FlowConfig,
+        repository: HotContentRepository,
+        api_client: JsonApiClient,
+    ):
+        self.config = config
+        self.repository = repository
+        self.api_client = api_client
+
+    def run(self) -> dict[str, Any]:
+        records = self.repository.list_decode_result_candidates(
+            limit=max(self.config.decode_result_batch_size, 1)
+        )
+        if not records:
+            return self._build_summary([], 0, 0, 0, 0)
+
+        record_id_by_unique_key = {
+            str(record["unique_key"]): int(record["id"])
+            for record in records
+        }
+        request_payload = {
+            "params": {
+                "configId": self.config.decode_config_id,
+                "channelContentIds": list(record_id_by_unique_key.keys()),
+            }
+        }
+        response = self.api_client.post_json(self.config.decode_result_api_url, request_payload)
+        data_items = response.get("data") or []
+        if not isinstance(data_items, list):
+            raise HotContentFlowError("invalid decode result response: data is not list")
+
+        saved_count = 0
+        failed_count = 0
+        pending_count = 0
+        for item in data_items:
+            if not isinstance(item, dict):
+                continue
+            channel_content_id = str(item.get("channelContentId") or "").strip()
+            if not channel_content_id:
+                continue
+            record_id = record_id_by_unique_key.get(channel_content_id)
+            if record_id is None:
+                failed_count += 1
+                continue
+
+            try:
+                result_status = str(item.get("status") or "").strip()
+                status_code = decode_api_status_to_code(
+                    result_status,
+                    has_success_response=True,
+                )
+                if status_code != ExecutionStatus.DECODE_SUCCESS:
+                    self.repository.update_status(
+                        record_id=record_id,
+                        status=status_code,
+                        error_message=str(item.get("errorMessage") or "") or None,
+                    )
+                    if status_code == ExecutionStatus.DECODE_PENDING:
+                        pending_count += 1
+                    elif status_code == ExecutionStatus.DECODE_FAILED:
+                        failed_count += 1
+                    continue
+
+                decode_result = parse_decode_data_content(item)
+                contribution_points = build_contribution_points(
+                    decode_result,
+                    score_threshold=self.config.contribution_score_threshold,
+                )
+                contribution_points["channelContentId"] = channel_content_id
+                self.repository.save_decode_result_export(
+                    record_id=record_id,
+                    decode_result_json=decode_result,
+                    contribution_points_json=contribution_points,
+                )
+                saved_count += 1
+            except HotContentFlowError as exc:
+                self.repository.update_status(
+                    record_id=record_id,
+                    status=ExecutionStatus.DECODE_RESULT_FAILED,
+                    error_message=str(exc),
+                )
+                failed_count += 1
+
+        return self._build_summary(records, len(data_items), saved_count, failed_count, pending_count)
+
+    def _build_summary(
+        self,
+        records: list[dict[str, Any]],
+        response_item_count: int,
+        saved_count: int,
+        failed_count: int,
+        pending_count: int,
+    ) -> dict[str, Any]:
+        return {
+            "run_at": datetime.now(SHANGHAI_TZ).isoformat(),
+            "requested_id_count": len(records),
+            "decode_result_item_count": response_item_count,
+            "saved_count": saved_count,
+            "failed_count": failed_count,
+            "pending_count": pending_count,
+            "score_threshold": self.config.contribution_score_threshold,
+        }
+
+
+def run_once(config: FlowConfig | None = None) -> dict[str, Any]:
+    flow_config = config or load_flow_config()
+    repository = HotContentRepository(flow_config.mysql)
+    try:
+        api_client = JsonApiClient(
+            timeout_seconds=flow_config.request_timeout_seconds,
+            verify_ssl=flow_config.https_verify_ssl,
+        )
+        service = DecodeResultFlowService(flow_config, repository, api_client)
+        return service.run()
+    finally:
+        repository.close()

+ 226 - 0
app/hot_content/demand_cache_service.py

@@ -0,0 +1,226 @@
+"""需求池 MySQL 缓存服务。"""
+
+from __future__ import annotations
+
+import re
+from datetime import datetime, timedelta
+from typing import Any
+
+from app.hot_content.config import load_flow_config
+from app.hot_content.exceptions import HotContentFlowError
+from app.hot_content.repository import HotContentRepository
+from app.hot_content.timezone import SHANGHAI_TZ
+from app.hot_content.types import FlowConfig
+from app.aliyun_odps.client import get_odps_client
+
+IDENTIFIER_RE = re.compile(r"^[A-Za-z_][A-Za-z0-9_]*(?:\.[A-Za-z_][A-Za-z0-9_]*)?$")
+
+
+def _safe_identifier(name: str) -> str:
+    value = name.strip()
+    if not IDENTIFIER_RE.match(value):
+        raise HotContentFlowError(f"invalid sql identifier: {name}")
+    return value
+
+
+def _escape_sql_string(value: str) -> str:
+    return value.replace("'", "''")
+
+
+class DemandCacheService:
+    def __init__(
+        self,
+        config: FlowConfig,
+        repository: HotContentRepository,
+    ):
+        self.config = config
+        self.repository = repository
+
+    def run(self, *, partition_dt: str | None = None) -> dict[str, Any]:
+        cache = self.get_or_create_current_hour_cache(partition_dt=partition_dt)
+        return {
+            "run_at": datetime.now(SHANGHAI_TZ).isoformat(),
+            "status": "success",
+            "cache_id": cache["id"],
+            "cache_hour": _format_cache_hour(cache["cache_hour"]),
+            "source": cache["source"],
+            "source_table": cache["source_table"],
+            "partition_dt": cache.get("partition_dt"),
+            "item_count": len(cache["demand_name_set"]),
+        }
+
+    def get_or_create_current_hour_cache(
+        self,
+        *,
+        partition_dt: str | None = None,
+    ) -> dict[str, Any]:
+        source_table = self.config.demand_pool_source_table.strip()
+        if not source_table:
+            raise HotContentFlowError("DEMAND_POOL_SOURCE_TABLE is not configured")
+        if self.config.demand_pool_top_n <= 0:
+            raise HotContentFlowError("DEMAND_POOL_TOP_N must be positive")
+
+        cache_hour = _current_cache_hour()
+        cached = self.repository.get_demand_cache_by_hour(cache_hour=cache_hour)
+        if cached is not None:
+            cached["source"] = "mysql_cache"
+            return cached
+
+        demand_name_set, resolved_partition_dt = self.fetch_demand_name_set(
+            partition_dt=partition_dt
+        )
+        cache_id = self.repository.save_demand_cache_set(
+            cache_hour=cache_hour,
+            source_table=source_table,
+            partition_dt=resolved_partition_dt,
+            excluded_strategy=self.config.demand_pool_excluded_strategy,
+            top_n=self.config.demand_pool_top_n,
+            demand_name_set=demand_name_set,
+        )
+        return {
+            "id": cache_id,
+            "cache_hour": cache_hour,
+            "source": "hive",
+            "source_table": source_table,
+            "partition_dt": resolved_partition_dt,
+            "demand_name_set": demand_name_set,
+            "item_count": len(demand_name_set),
+        }
+
+    def fetch_demand_name_set(
+        self,
+        *,
+        partition_dt: str | None = None,
+    ) -> tuple[list[str], str | None]:
+        table = _safe_identifier(self.config.demand_pool_source_table)
+        excluded = _escape_sql_string(self.config.demand_pool_excluded_strategy)
+        partition_dts = _resolve_partition_dts(partition_dt)
+        dt_clause = _build_dt_clause(partition_dts)
+
+        sql = f"""
+        WITH filtered AS (
+            SELECT
+                dt,
+                strategy,
+                TRIM(demand_name) AS demand_name,
+                weight
+            FROM {table}
+            WHERE {dt_clause}
+              AND strategy IS NOT NULL
+              AND strategy <> '{excluded}'
+              AND demand_name IS NOT NULL
+              AND TRIM(demand_name) <> ''
+        ),
+        deduped AS (
+            SELECT
+                dt,
+                strategy,
+                demand_name,
+                weight,
+                ROW_NUMBER() OVER (
+                    PARTITION BY strategy, demand_name
+                    ORDER BY weight DESC, dt DESC
+                ) AS dup_rn
+            FROM filtered
+        ),
+        ranked AS (
+            SELECT
+                dt,
+                strategy,
+                demand_name,
+                weight,
+                ROW_NUMBER() OVER (
+                    PARTITION BY strategy
+                    ORDER BY weight DESC, dt DESC, demand_name ASC
+                ) AS rn
+            FROM deduped
+            WHERE dup_rn = 1
+        )
+        SELECT dt, strategy, demand_name, weight, rn
+        FROM ranked
+        WHERE rn <= {int(self.config.demand_pool_top_n)}
+        ORDER BY strategy ASC, rn ASC
+        """
+
+        odps_client = get_odps_client()
+        instance = odps_client.execute_sql(sql)
+        raw_demand_names: list[str] = []
+        with instance.open_reader(tunnel=True) as reader:
+            for record in reader:
+                demand_name = str(record["demand_name"] or "").strip()
+                if demand_name:
+                    raw_demand_names.append(demand_name)
+        demand_name_set = _dedupe_demand_names(raw_demand_names)
+        resolved_partition_dt = ",".join(partition_dts) if partition_dts else None
+        return demand_name_set, resolved_partition_dt
+
+
+def _normalize_demand_key(value: str) -> str:
+    return "".join(value.split())
+
+
+def _dedupe_demand_names(demand_names: list[str]) -> list[str]:
+    deduped: list[str] = []
+    seen: set[str] = set()
+    for raw_name in demand_names:
+        demand_name = str(raw_name).strip()
+        if not demand_name:
+            continue
+        keys = {demand_name, _normalize_demand_key(demand_name)}
+        if keys & seen:
+            continue
+        seen.update(keys)
+        deduped.append(demand_name)
+    return deduped
+
+
+def _resolve_partition_dts(partition_dt: str | None) -> list[str]:
+    if partition_dt:
+        value = partition_dt.strip()
+        return [value] if value else []
+    today = datetime.now(SHANGHAI_TZ).date()
+    yesterday = today - timedelta(days=1)
+    return [
+        yesterday.strftime("%Y%m%d"),
+        today.strftime("%Y%m%d"),
+    ]
+
+
+def _build_dt_clause(partition_dts: list[str]) -> str:
+    if not partition_dts:
+        raise HotContentFlowError("partition dt list is empty")
+    if len(partition_dts) == 1:
+        return f"dt = '{_escape_sql_string(partition_dts[0])}'"
+    dt_values = ", ".join(
+        f"'{_escape_sql_string(dt)}'" for dt in partition_dts
+    )
+    return f"dt IN ({dt_values})"
+
+
+def _current_cache_hour() -> datetime:
+    return datetime.now(SHANGHAI_TZ).replace(
+        minute=0,
+        second=0,
+        microsecond=0,
+        tzinfo=None,
+    )
+
+
+def _format_cache_hour(value: Any) -> str:
+    if hasattr(value, "isoformat"):
+        return value.isoformat()
+    return str(value)
+
+
+def run_once(
+    config: FlowConfig | None = None,
+    *,
+    partition_dt: str | None = None,
+) -> dict[str, Any]:
+    flow_config = config or load_flow_config()
+    repository = HotContentRepository(flow_config.mysql)
+    try:
+        service = DemandCacheService(flow_config, repository)
+        return service.run(partition_dt=partition_dt)
+    finally:
+        repository.close()

+ 474 - 0
app/hot_content/demand_export.py

@@ -0,0 +1,474 @@
+"""微信指数达标后的需求元素/短语导出逻辑。"""
+
+from __future__ import annotations
+
+import argparse
+import json
+from typing import Any
+
+from app.hot_content.config import load_flow_config
+from app.hot_content.repository import HotContentRepository
+
+WXINDEX_EXPORT_THRESHOLD = 1_000_000.0  # 与 WXINDEX_SCORE_THRESHOLD 默认值一致
+
+
+POINT_CATEGORIES = ("灵感点", "目的点", "关键点")
+ITEM_TYPE_ELEMENT = "元素"
+ITEM_TYPE_PHRASE = "短语"
+
+
+def get_latest_wxindex_score(trend_json: dict[str, Any]) -> float | None:
+    wxindex = trend_json.get("wxindex")
+    if not isinstance(wxindex, dict):
+        return None
+    try:
+        return float(wxindex.get("latest_total_score"))
+    except (TypeError, ValueError):
+        return None
+
+
+def get_wxindex_keyword(trend_json: dict[str, Any] | None) -> str:
+    if not isinstance(trend_json, dict):
+        return ""
+    wxindex = trend_json.get("wxindex")
+    if isinstance(wxindex, dict):
+        keyword = str(wxindex.get("keyword") or "").strip()
+        if keyword:
+            return keyword
+    return str(trend_json.get("llm_selected_word") or "").strip()
+
+
+def get_wxindex_trend(trend_json: dict[str, Any]) -> str:
+    wxindex = trend_json.get("wxindex")
+    if not isinstance(wxindex, dict):
+        return ""
+    return str(wxindex.get("trend") or "").strip()
+
+
+def _to_contribution_score(value: Any) -> float | None:
+    try:
+        if value is None:
+            return None
+        return float(value)
+    except (TypeError, ValueError):
+        return None
+
+
+def extract_matched_demand_name_list(word_row: dict[str, Any]) -> list[str]:
+    match_rows = word_row.get("匹配需求列表") or []
+    if not isinstance(match_rows, list):
+        return []
+
+    names: list[str] = []
+    seen: set[str] = set()
+    for match in match_rows:
+        if not isinstance(match, dict):
+            continue
+        demand_name = str(match.get("demand_name") or "").strip()
+        if not demand_name or demand_name in seen:
+            continue
+        seen.add(demand_name)
+        names.append(demand_name)
+    return names
+
+
+def extract_matched_demand_names(word_row: dict[str, Any]) -> str:
+    return " ".join(extract_matched_demand_name_list(word_row))
+
+
+def build_word_lookup(words_rows: list[Any]) -> dict[str, dict[str, Any]]:
+    lookup: dict[str, dict[str, Any]] = {}
+    for word_row in words_rows:
+        if not isinstance(word_row, dict):
+            continue
+        word_text = str(word_row.get("词") or "").strip()
+        if word_text:
+            lookup[word_text] = word_row
+    return lookup
+
+
+def build_word_to_categories(points: list[Any]) -> dict[str, set[str]]:
+    word_categories: dict[str, set[str]] = {}
+    if not isinstance(points, list):
+        return word_categories
+
+    for point_item in points:
+        if not isinstance(point_item, dict):
+            continue
+        category = str(point_item.get("来源") or "").strip()
+        if category not in POINT_CATEGORIES:
+            continue
+        match_words = point_item.get("匹配词列表") or []
+        if not isinstance(match_words, list):
+            continue
+        for hit in match_words:
+            if not isinstance(hit, dict):
+                continue
+            word_text = str(hit.get("词") or "").strip()
+            if not word_text:
+                continue
+            word_categories.setdefault(word_text, set()).add(category)
+    return word_categories
+
+
+def ordered_point_categories(categories: set[str]) -> list[str]:
+    return [category for category in POINT_CATEGORIES if category in categories]
+
+
+def extract_point_matched_demand_names(
+    point_item: dict[str, Any],
+    word_lookup: dict[str, dict[str, Any]],
+) -> str:
+    match_words = point_item.get("匹配词列表") or []
+    if not isinstance(match_words, list):
+        return ""
+
+    names: list[str] = []
+    seen: set[str] = set()
+    for hit in match_words:
+        if not isinstance(hit, dict):
+            continue
+        word_text = str(hit.get("词") or "").strip()
+        word_row = word_lookup.get(word_text)
+        if not word_row:
+            continue
+        for demand_name in extract_matched_demand_name_list(word_row):
+            if demand_name in seen:
+                continue
+            seen.add(demand_name)
+            names.append(demand_name)
+    return " ".join(names)
+
+
+def _build_word_export_row(
+    word_text: str,
+    word_row: dict[str, Any],
+    category: str,
+) -> dict[str, Any]:
+    return {
+        "item_type": ITEM_TYPE_ELEMENT,
+        "item_text": word_text,
+        "point_category": category,
+        "matched_demand": extract_matched_demand_names(word_row),
+        "contribution_score": _to_contribution_score(word_row.get("贡献度")),
+    }
+
+
+def _resolve_word_row(
+    word_text: str,
+    *,
+    word_lookup: dict[str, dict[str, Any]],
+    match_result: dict[str, Any],
+) -> dict[str, Any]:
+    word_row = word_lookup.get(word_text)
+    if isinstance(word_row, dict):
+        return word_row
+
+    for row in match_result.get("匹配到需求的词列表") or []:
+        if isinstance(row, dict) and str(row.get("词") or "").strip() == word_text:
+            return row
+    for row in match_result.get("高贡献词列表") or []:
+        if isinstance(row, dict) and str(row.get("词") or "").strip() == word_text:
+            return row
+    return {"词": word_text}
+
+
+def append_wxindex_keyword_rows(
+    export_rows: list[dict[str, Any]],
+    *,
+    trend_json: dict[str, Any] | None,
+    match_result: dict[str, Any],
+    word_lookup: dict[str, dict[str, Any]],
+    word_to_categories: dict[str, set[str]],
+) -> None:
+    keyword = get_wxindex_keyword(trend_json)
+    if not keyword:
+        return
+
+    has_keyword_row = any(
+        row.get("item_type") == ITEM_TYPE_ELEMENT and str(row.get("item_text") or "").strip() == keyword
+        for row in export_rows
+    )
+    if has_keyword_row:
+        return
+
+    word_row = _resolve_word_row(keyword, word_lookup=word_lookup, match_result=match_result)
+    categories = ordered_point_categories(word_to_categories.get(keyword, set()))
+    if categories:
+        for category in categories:
+            export_rows.append(_build_word_export_row(keyword, word_row, category))
+        return
+
+    export_rows.append(_build_word_export_row(keyword, word_row, ""))
+
+
+def build_demand_export_rows(
+    match_result: dict[str, Any],
+    *,
+    contribution_points: dict[str, Any] | None = None,
+    trend_json: dict[str, Any] | None = None,
+) -> list[dict[str, Any]]:
+    export_rows: list[dict[str, Any]] = []
+    words_rows = match_result.get("高贡献词列表") or []
+    if not isinstance(words_rows, list):
+        words_rows = []
+
+    contribution_source = contribution_points if isinstance(contribution_points, dict) else match_result
+    points = contribution_source.get("点列表") or []
+    if not isinstance(points, list):
+        points = []
+
+    word_lookup = build_word_lookup(words_rows)
+    word_to_categories = build_word_to_categories(points)
+
+    for word_text, word_row in word_lookup.items():
+        categories = ordered_point_categories(word_to_categories.get(word_text, set()))
+        if not categories:
+            continue
+        for category in categories:
+            export_rows.append(_build_word_export_row(word_text, word_row, category))
+
+    for point_item in points:
+        if not isinstance(point_item, dict):
+            continue
+        point_text = str(point_item.get("点") or "").strip()
+        category = str(point_item.get("来源") or "").strip()
+        if not point_text or category not in POINT_CATEGORIES:
+            continue
+        export_rows.append(
+            {
+                "item_type": ITEM_TYPE_PHRASE,
+                "item_text": point_text,
+                "point_category": category,
+                "matched_demand": extract_point_matched_demand_names(point_item, word_lookup),
+                "contribution_score": None,
+            }
+        )
+
+    append_wxindex_keyword_rows(
+        export_rows,
+        trend_json=trend_json,
+        match_result=match_result,
+        word_lookup=word_lookup,
+        word_to_categories=word_to_categories,
+    )
+
+    deduped_rows: list[dict[str, Any]] = []
+    seen: set[tuple[str, str, str]] = set()
+    for row in export_rows:
+        key = (
+            row["item_type"],
+            row["item_text"],
+            str(row.get("point_category") or ""),
+        )
+        if key in seen:
+            continue
+        seen.add(key)
+        deduped_rows.append(row)
+    return deduped_rows
+
+
+def attach_wxindex_metadata(
+    export_rows: list[dict[str, Any]],
+    trend_json: dict[str, Any] | None,
+) -> list[dict[str, Any]]:
+    latest_score = (
+        get_latest_wxindex_score(trend_json)
+        if isinstance(trend_json, dict)
+        else None
+    )
+    trend = get_wxindex_trend(trend_json) if isinstance(trend_json, dict) else ""
+    wxindex_keyword = get_wxindex_keyword(trend_json)
+    rows: list[dict[str, Any]] = []
+    for row in export_rows:
+        item_type = str(row.get("item_type") or "")
+        item_text = str(row.get("item_text") or "").strip()
+        matched_demand = str(row.get("matched_demand") or "").strip()
+        has_record_wxindex = latest_score is not None
+        is_wxindex_keyword = (
+            item_type == ITEM_TYPE_ELEMENT and wxindex_keyword and item_text == wxindex_keyword
+        )
+
+        if has_record_wxindex and (
+            matched_demand or item_type == ITEM_TYPE_PHRASE or is_wxindex_keyword
+        ):
+            wxindex_score = float(latest_score)
+            wxindex_trend_value = trend
+        else:
+            wxindex_score = 0.0
+            wxindex_trend_value = ""
+
+        rows.append(
+            {
+                **row,
+                "wxindex_keyword": wxindex_keyword,
+                "wxindex_latest_score": wxindex_score,
+                "wxindex_trend": wxindex_trend_value,
+            }
+        )
+    return rows
+
+
+def _json_loads(value: Any) -> Any:
+    if value is None:
+        return None
+    if isinstance(value, (dict, list)):
+        return value
+    if isinstance(value, (bytes, bytearray)):
+        value = value.decode("utf-8")
+    if isinstance(value, str):
+        return json.loads(value)
+    return value
+
+
+def fetch_export_candidate_records(cursor: Any, limit: int) -> list[dict[str, Any]]:
+    limit_sql = "" if limit <= 0 else "LIMIT %s"
+    params: tuple[Any, ...] = () if limit <= 0 else (limit,)
+    cursor.execute(
+        f"""
+        SELECT
+            id,
+            source,
+            title,
+            article_title,
+            contribution_points_json,
+            contribution_demand_match_json,
+            wxindex_trend_json
+        FROM hot_content_records
+        WHERE contribution_demand_match_json IS NOT NULL
+          AND TRIM(CAST(contribution_demand_match_json AS CHAR)) <> ''
+        ORDER BY id ASC
+        {limit_sql}
+        """,
+        params,
+    )
+    return list(cursor.fetchall())
+
+
+def export_existing_records(
+    repository: HotContentRepository,
+    records: list[dict[str, Any]],
+    *,
+    dry_run: bool,
+    verbose: bool,
+) -> dict[str, int]:
+    summary = {
+        "scanned": 0,
+        "exported_records": 0,
+        "exported_rows": 0,
+        "no_export_rows": 0,
+        "invalid_json": 0,
+        "skipped": 0,
+    }
+
+    for row in records:
+        summary["scanned"] += 1
+        record_id = int(row["id"])
+        try:
+            match_json = _json_loads(row.get("contribution_demand_match_json"))
+            contribution_points = _json_loads(row.get("contribution_points_json"))
+            trend_json = _json_loads(row.get("wxindex_trend_json"))
+        except json.JSONDecodeError:
+            summary["invalid_json"] += 1
+            if verbose:
+                print(f"id={record_id}: JSON 解析失败,已跳过")
+            continue
+
+        if not isinstance(match_json, dict):
+            summary["skipped"] += 1
+            continue
+
+        latest_score = (
+            get_latest_wxindex_score(trend_json)
+            if isinstance(trend_json, dict)
+            else None
+        )
+        export_rows = attach_wxindex_metadata(
+            build_demand_export_rows(
+                match_json,
+                contribution_points=(
+                    contribution_points if isinstance(contribution_points, dict) else None
+                ),
+                trend_json=trend_json if isinstance(trend_json, dict) else None,
+            ),
+            trend_json if isinstance(trend_json, dict) else None,
+        )
+        if not export_rows:
+            summary["no_export_rows"] += 1
+            continue
+
+        if verbose or dry_run:
+            matched_rows = sum(
+                1 for item in export_rows if str(item.get("matched_demand") or "").strip()
+            )
+            print(
+                f"id={record_id} rows={len(export_rows)} matched_rows={matched_rows} "
+                f"title={str(row.get('title') or '')[:40]}"
+            )
+
+        if not dry_run:
+            repository.replace_demand_export_rows(
+                record_id=record_id,
+                source=str(row.get("source") or ""),
+                hot_title=str(row.get("title") or ""),
+                article_title=str(row.get("article_title") or ""),
+                rows=export_rows,
+            )
+
+        summary["exported_records"] += 1
+        summary["exported_rows"] += len(export_rows)
+
+    return summary
+
+
+def parse_args(argv: list[str] | None = None) -> argparse.Namespace:
+    parser = argparse.ArgumentParser(
+        description=(
+            "扫描已有 contribution_demand_match_json 记录,"
+            "导出全部元素/短语到 hot_content_demand_exports;"
+            "元素/短语按灵感点/目的点/关键点展开为多行,无点类型数据过滤;"
+            "并补充获取微信指数的词、微信指数及趋势。"
+        ),
+    )
+    parser.add_argument(
+        "--limit",
+        type=int,
+        default=0,
+        help="最多处理多少条记录,默认 0 表示不限制。",
+    )
+    parser.add_argument(
+        "--dry-run",
+        action="store_true",
+        help="只统计/打印,不写入数据库。",
+    )
+    parser.add_argument(
+        "--verbose",
+        action="store_true",
+        help="打印每条成功导出的记录。",
+    )
+    return parser.parse_args(argv)
+
+
+def main(argv: list[str] | None = None) -> dict[str, int]:
+    args = parse_args(argv)
+    config = load_flow_config()
+    repository = HotContentRepository(config.mysql)
+    try:
+        with repository.conn.cursor() as cursor:
+            records = fetch_export_candidate_records(cursor, args.limit)
+        summary = export_existing_records(
+            repository,
+            records,
+            dry_run=args.dry_run,
+            verbose=args.verbose,
+        )
+    finally:
+        repository.close()
+
+    action = "预览完成" if args.dry_run else "导出完成"
+    print(f"{action}:{json.dumps(summary, ensure_ascii=False)}")
+    return summary
+
+
+if __name__ == "__main__":
+    main()

+ 175 - 0
app/hot_content/demand_hive_export.py

@@ -0,0 +1,175 @@
+"""近期热点需求写入 Hive 的行构建逻辑。"""
+
+from __future__ import annotations
+
+import hashlib
+from typing import Any
+
+from app.hot_content.demand_export import ITEM_TYPE_ELEMENT, ITEM_TYPE_PHRASE
+
+TITLE_RETAIN_POINT_CATEGORIES = frozenset({"灵感点", "目的点"})
+TYPE_FEATURE_POINT = "特征点"
+TYPE_PHRASE = "短语"
+WEIGHT_DIVISOR = 1_000_000.0
+
+
+def _normalize_demand_key(value: str) -> str:
+    return "".join(value.split())
+
+
+def build_demand_id(*, strategy: str, demand_name: str, partition_dt: str) -> str:
+    raw = f"{strategy}{demand_name}{partition_dt}"
+    return hashlib.md5(raw.encode("utf-8")).hexdigest()
+
+
+def _dedupe_texts(texts: list[str]) -> list[str]:
+    deduped: list[str] = []
+    seen: set[str] = set()
+    for raw in texts:
+        text = str(raw).strip()
+        if not text:
+            continue
+        keys = {text, _normalize_demand_key(text)}
+        if keys & seen:
+            continue
+        seen.update(keys)
+        deduped.append(text)
+    return deduped
+
+
+def _record_wxindex_score(export_rows: list[dict[str, Any]]) -> float:
+    scores: list[float] = []
+    for row in export_rows:
+        try:
+            scores.append(float(row.get("wxindex_latest_score") or 0))
+        except (TypeError, ValueError):
+            continue
+    return max(scores) if scores else 0.0
+
+
+def _should_retain_title(
+    export_rows: list[dict[str, Any]],
+    *,
+    wxindex_threshold: float,
+) -> bool:
+    if _record_wxindex_score(export_rows) < wxindex_threshold:
+        return False
+    return any(_has_inspiration_or_goal_demand_match(row) for row in export_rows)
+
+
+def _has_inspiration_or_goal_demand_match(row: dict[str, Any]) -> bool:
+    point_category = str(row.get("point_category") or "").strip()
+    if point_category not in TITLE_RETAIN_POINT_CATEGORIES:
+        return False
+    return bool(str(row.get("matched_demand") or "").strip())
+
+
+def _has_matched_demand(row: dict[str, Any]) -> bool:
+    return bool(str(row.get("matched_demand") or "").strip())
+
+
+def build_hive_rows_for_record(
+    export_rows: list[dict[str, Any]],
+    *,
+    strategy: str,
+    partition_dt: str,
+    wxindex_threshold: float,
+) -> list[dict[str, Any]]:
+    if not _should_retain_title(export_rows, wxindex_threshold=wxindex_threshold):
+        return []
+
+    weight = _record_wxindex_score(export_rows) / WEIGHT_DIVISOR
+
+    element_texts = _dedupe_texts(
+        [
+            str(row.get("item_text") or "").strip()
+            for row in export_rows
+            if str(row.get("item_type") or "") == ITEM_TYPE_ELEMENT and _has_matched_demand(row)
+        ]
+    )
+    phrase_texts = _dedupe_texts(
+        [
+            str(row.get("item_text") or "").strip()
+            for row in export_rows
+            if str(row.get("item_type") or "") == ITEM_TYPE_PHRASE
+            and str(row.get("item_text") or "").strip()
+        ]
+    )
+
+    hive_rows: list[dict[str, Any]] = []
+    if element_texts:
+        demand_name = " ".join(element_texts)
+        hive_rows.append(
+            _build_hive_row(
+                strategy=strategy,
+                demand_name=demand_name,
+                weight=weight,
+                demand_type=TYPE_FEATURE_POINT,
+                partition_dt=partition_dt,
+            )
+        )
+
+    for phrase_text in phrase_texts:
+        hive_rows.append(
+            _build_hive_row(
+                strategy=strategy,
+                demand_name=phrase_text,
+                weight=weight,
+                demand_type=TYPE_PHRASE,
+                partition_dt=partition_dt,
+            )
+        )
+    return hive_rows
+
+
+def build_hive_rows_from_export_groups(
+    export_groups: list[dict[str, Any]],
+    *,
+    strategy: str,
+    partition_dt: str,
+    wxindex_threshold: float,
+) -> list[dict[str, Any]]:
+    rows: list[dict[str, Any]] = []
+    seen_demand_ids: set[str] = set()
+    for group in export_groups:
+        export_rows = group.get("export_rows") or []
+        if not isinstance(export_rows, list):
+            continue
+        for hive_row in build_hive_rows_for_record(
+            export_rows,
+            strategy=strategy,
+            partition_dt=partition_dt,
+            wxindex_threshold=wxindex_threshold,
+        ):
+            demand_id = str(hive_row["demand_id"])
+            if demand_id in seen_demand_ids:
+                continue
+            seen_demand_ids.add(demand_id)
+            rows.append(hive_row)
+    return rows
+
+
+def _build_hive_row(
+    *,
+    strategy: str,
+    demand_name: str,
+    weight: float,
+    demand_type: str,
+    partition_dt: str,
+) -> dict[str, Any]:
+    normalized_name = demand_name.strip()
+    return {
+        "strategy": strategy,
+        "demand_id": build_demand_id(
+            strategy=strategy,
+            demand_name=normalized_name,
+            partition_dt=partition_dt,
+        ),
+        "demand_name": normalized_name,
+        "weight": weight,
+        "type": demand_type,
+        "video_count": None,
+        "video_list": [],
+        "extend": "{}",
+        "dt": partition_dt,
+    }

+ 155 - 0
app/hot_content/demand_pool_writer.py

@@ -0,0 +1,155 @@
+"""近期热点需求写入 Hive 需求池表。"""
+
+from __future__ import annotations
+
+import re
+from datetime import datetime
+from typing import Any
+
+from app.aliyun_odps.client import get_odps_client
+from app.hot_content.demand_hive_export import build_hive_rows_from_export_groups
+from app.hot_content.exceptions import HotContentFlowError
+from app.hot_content.repository import HotContentRepository
+from app.hot_content.timezone import SHANGHAI_TZ
+from app.hot_content.types import FlowConfig
+
+IDENTIFIER_RE = re.compile(r"^[A-Za-z_][A-Za-z0-9_]*(?:\.[A-Za-z_][A-Za-z0-9_]*)?$")
+HIVE_COLUMNS = (
+    "strategy",
+    "demand_id",
+    "demand_name",
+    "weight",
+    "type",
+    "video_count",
+    "video_list",
+    "extend",
+)
+
+
+def _safe_identifier(name: str) -> str:
+    value = name.strip()
+    if not IDENTIFIER_RE.match(value):
+        raise HotContentFlowError(f"invalid sql identifier: {name}")
+    return value
+
+
+def _escape_sql_string(value: str) -> str:
+    return value.replace("'", "''")
+
+
+class HotDemandPoolWriter:
+    def __init__(self, config: FlowConfig, repository: HotContentRepository):
+        self.config = config
+        self.repository = repository
+
+    def sync_today(self) -> dict[str, Any]:
+        partition_dt = datetime.now(SHANGHAI_TZ).date().strftime("%Y%m%d")
+        export_groups = self.repository.list_demand_export_groups()
+        hive_rows = build_hive_rows_from_export_groups(
+            export_groups,
+            strategy=self.config.hot_demand_pool_strategy,
+            partition_dt=partition_dt,
+            wxindex_threshold=self.config.wxindex_score_threshold,
+        )
+        written_count = self._write_partition(
+            hive_rows=hive_rows,
+            partition_dt=partition_dt,
+            strategy=self.config.hot_demand_pool_strategy,
+        )
+        return {
+            "partition_dt": partition_dt,
+            "strategy": self.config.hot_demand_pool_strategy,
+            "source_record_count": len(export_groups),
+            "hive_row_count": len(hive_rows),
+            "written_count": written_count,
+            "target_table": self.config.demand_pool_source_table,
+        }
+
+    def _write_partition(
+        self,
+        *,
+        hive_rows: list[dict[str, Any]],
+        partition_dt: str,
+        strategy: str,
+    ) -> int:
+        table_name = _safe_identifier(self.config.demand_pool_source_table)
+        odps_client = get_odps_client()
+        table = odps_client.get_table(table_name)
+        partition_spec = f"dt={partition_dt}"
+
+        preserved_rows = self._read_preserved_rows(
+            table=table,
+            partition_spec=partition_spec,
+            strategy=strategy,
+        )
+        payload_rows = preserved_rows + [
+            self._to_write_row(row) for row in hive_rows
+        ]
+        if not payload_rows and table.exist_partition(partition_spec):
+            odps_client.write_table(
+                table_name,
+                [],
+                partition=partition_spec,
+                create_partition=True,
+                overwrite=True,
+            )
+            return 0
+
+        odps_client.write_table(
+            table_name,
+            payload_rows,
+            partition=partition_spec,
+            create_partition=True,
+            overwrite=True,
+        )
+        return len(hive_rows)
+
+    @staticmethod
+    def _read_preserved_rows(
+        *,
+        table: Any,
+        partition_spec: str,
+        strategy: str,
+    ) -> list[list[Any]]:
+        if not table.exist_partition(partition_spec):
+            return []
+
+        preserved_rows: list[list[Any]] = []
+        with table.open_reader(partition=partition_spec) as reader:
+            for record in reader:
+                if str(record["strategy"] or "") == strategy:
+                    continue
+                preserved_rows.append(
+                    [
+                        record["strategy"],
+                        record["demand_id"],
+                        record["demand_name"],
+                        record["weight"],
+                        record["type"],
+                        record["video_count"],
+                        record["video_list"],
+                        record["extend"],
+                    ]
+                )
+        return preserved_rows
+
+    @staticmethod
+    def _to_write_row(row: dict[str, Any]) -> list[Any]:
+        return [
+            row["strategy"],
+            row["demand_id"],
+            row["demand_name"],
+            float(row["weight"]),
+            row["type"],
+            row["video_count"],
+            row["video_list"],
+            row["extend"],
+        ]
+
+
+def sync_hot_demands_to_hive(
+    config: FlowConfig,
+    repository: HotContentRepository,
+) -> dict[str, Any]:
+    writer = HotDemandPoolWriter(config, repository)
+    return writer.sync_today()

+ 5 - 0
app/hot_content/exceptions.py

@@ -0,0 +1,5 @@
+"""热点内容流程异常。"""
+
+
+class HotContentFlowError(Exception):
+    """热点内容流程执行异常。"""

+ 667 - 0
app/hot_content/postprocess_service.py

@@ -0,0 +1,667 @@
+"""贡献点需求匹配与微信指数趋势后处理服务。"""
+
+from __future__ import annotations
+
+import json
+import re
+import time
+from datetime import datetime, timedelta
+from typing import Any
+
+from app.core.open_router_llm import OpenRouterCallError, create_chat_completion
+from app.hot_content.client import JsonApiClient
+from app.hot_content.config import load_flow_config
+from app.hot_content.demand_cache_service import DemandCacheService
+from app.hot_content.exceptions import HotContentFlowError
+from app.hot_content.repository import HotContentRepository
+from app.hot_content.status import PostprocessStatus
+from app.hot_content.timezone import SHANGHAI_TZ
+from app.hot_content.types import FlowConfig
+from app.hot_content.demand_export import (
+    attach_wxindex_metadata,
+    build_demand_export_rows,
+)
+from app.hot_content.demand_pool_writer import sync_hot_demands_to_hive
+from app.hot_content.wxindex_trend import calc_wxindex_trend
+
+
+class WxindexSelectionSkipped(Exception):
+    """微信指数选词无效时跳过后续查询。"""
+
+
+def _extract_json_object(text: str) -> dict[str, Any]:
+    raw = text.strip()
+    if raw.startswith("```"):
+        raw = re.sub(r"^```(?:json)?\s*", "", raw)
+        raw = re.sub(r"\s*```$", "", raw)
+    try:
+        parsed = json.loads(raw)
+        if isinstance(parsed, dict):
+            return parsed
+    except json.JSONDecodeError:
+        pass
+    match = re.search(r"\{[\s\S]*\}", raw)
+    if not match:
+        raise HotContentFlowError("llm output is not json object")
+    try:
+        parsed = json.loads(match.group(0))
+    except json.JSONDecodeError as exc:
+        raise HotContentFlowError(f"llm output invalid json: {exc}") from exc
+    if not isinstance(parsed, dict):
+        raise HotContentFlowError("llm output is not json object")
+    return parsed
+
+
+def _get_recent_range(lookback_days: int) -> tuple[str, str]:
+    today = datetime.now(SHANGHAI_TZ).date()
+    end_date = today - timedelta(days=1)
+    start_date = end_date - timedelta(days=max(lookback_days, 1))
+    return start_date.strftime("%Y%m%d"), end_date.strftime("%Y%m%d")
+
+
+def _parse_total_scores(wx_resp: dict[str, Any]) -> list[dict[str, Any]]:
+    rows = ((wx_resp.get("data") or {}).get("data") or [])
+    if not isinstance(rows, list):
+        return []
+
+    series: list[dict[str, Any]] = []
+    for row in rows:
+        if not isinstance(row, dict):
+            continue
+        ymd = str(row.get("ymd") or "").strip()
+        total_score = ((row.get("channel_score") or {}).get("total_score"))
+        try:
+            score_num = float(total_score) if total_score is not None else None
+        except (TypeError, ValueError):
+            score_num = None
+        if ymd and score_num is not None:
+            series.append({"ymd": ymd, "total_score": score_num})
+    series.sort(key=lambda x: x["ymd"])
+    return series
+
+
+def _normalize_demand_key(value: str) -> str:
+    return "".join(value.split())
+
+
+def _build_demand_lookup(demand_name_set: list[str]) -> dict[str, str]:
+    lookup: dict[str, str] = {}
+    for item in demand_name_set:
+        demand_name = str(item).strip()
+        if not demand_name:
+            continue
+        lookup.setdefault(demand_name, demand_name)
+        compact_key = _normalize_demand_key(demand_name)
+        if compact_key:
+            lookup.setdefault(compact_key, demand_name)
+    return lookup
+
+
+def _resolve_demand_name(
+    demand_name: str,
+    demand_lookup: dict[str, str],
+) -> str | None:
+    value = demand_name.strip()
+    if not value:
+        return None
+    return demand_lookup.get(value) or demand_lookup.get(_normalize_demand_key(value))
+
+
+class ContributionPostprocessService:
+    def __init__(
+        self,
+        config: FlowConfig,
+        repository: HotContentRepository,
+        api_client: JsonApiClient,
+    ):
+        self.config = config
+        self.repository = repository
+        self.api_client = api_client
+
+    def run(self) -> dict[str, Any]:
+        records = self.repository.list_postprocess_candidates(
+            limit=max(self.config.postprocess_batch_size, 1)
+        )
+        if not records:
+            return self._finalize_run_result(
+                {
+                    "run_at": datetime.now(SHANGHAI_TZ).isoformat(),
+                    "status": "success",
+                    "candidate_count": 0,
+                    "matched_count": 0,
+                    "wxindex_count": 0,
+                    "skipped_count": 0,
+                    "failed_count": 0,
+                }
+            )
+
+        cache = DemandCacheService(
+            self.config,
+            self.repository,
+        ).get_or_create_current_hour_cache()
+        demand_cache_run_id = int(cache["id"])
+        demand_name_set = cache["demand_name_set"]
+        if not demand_name_set:
+            return self._finalize_run_result(
+                {
+                    "run_at": datetime.now(SHANGHAI_TZ).isoformat(),
+                    "status": "skipped",
+                    "reason": "empty_demand_cache",
+                    "demand_cache_run_id": demand_cache_run_id,
+                    "cache_source": cache.get("source"),
+                    "processed_count": 0,
+                }
+            )
+
+        matched_count = 0
+        wxindex_count = 0
+        exported_count = 0
+        skipped_count = 0
+        failed_count = 0
+        for record in records:
+            record_id = int(record["id"])
+            try:
+                match_result = record.get("contribution_demand_match_json")
+                if not isinstance(match_result, dict):
+                    match_result = self.match_record(
+                        record=record,
+                        demand_name_set=demand_name_set,
+                        demand_cache_run_id=demand_cache_run_id,
+                    )
+                    self.repository.save_contribution_demand_match(
+                        record_id=record_id,
+                        demand_cache_run_id=demand_cache_run_id,
+                        match_json=match_result,
+                    )
+                    matched_count += 1
+
+                sanitized_match_result = self.sanitize_match_result(
+                    match_result,
+                    demand_name_set=demand_name_set,
+                    demand_cache_run_id=demand_cache_run_id,
+                )
+                if sanitized_match_result != match_result:
+                    match_result = sanitized_match_result
+                    self.repository.save_contribution_demand_match(
+                        record_id=record_id,
+                        demand_cache_run_id=demand_cache_run_id,
+                        match_json=match_result,
+                    )
+
+                trend_result = self.build_wxindex_trend(record, match_result)
+                if trend_result is None:
+                    self.repository.update_postprocess_status(
+                        record_id=record_id,
+                        status=PostprocessStatus.SKIPPED,
+                        error_message="no matched demand words",
+                    )
+                    exported_count += self.export_demand_terms_if_needed(
+                        record=record,
+                        match_result=match_result,
+                        trend_result=None,
+                    )
+                    skipped_count += 1
+                    continue
+
+                self.repository.save_wxindex_trend(
+                    record_id=record_id,
+                    trend_json=trend_result,
+                )
+                exported_count += self.export_demand_terms_if_needed(
+                    record=record,
+                    match_result=match_result,
+                    trend_result=trend_result,
+                )
+                wxindex_count += 1
+            except WxindexSelectionSkipped as exc:
+                self.repository.update_postprocess_status(
+                    record_id=record_id,
+                    status=PostprocessStatus.SKIPPED,
+                    error_message=str(exc),
+                )
+                if isinstance(match_result, dict):
+                    exported_count += self.export_demand_terms_if_needed(
+                        record=record,
+                        match_result=match_result,
+                        trend_result=None,
+                    )
+                skipped_count += 1
+            except Exception as exc:
+                self.repository.update_postprocess_status(
+                    record_id=record_id,
+                    status=PostprocessStatus.FAILED,
+                    error_message=str(exc),
+                )
+                failed_count += 1
+
+        return self._finalize_run_result(
+            {
+                "run_at": datetime.now(SHANGHAI_TZ).isoformat(),
+                "status": "success",
+                "demand_cache_run_id": demand_cache_run_id,
+                "cache_source": cache.get("source"),
+                "cache_hour": str(cache.get("cache_hour") or ""),
+                "demand_name_count": len(demand_name_set),
+                "candidate_count": len(records),
+                "matched_count": matched_count,
+                "wxindex_count": wxindex_count,
+                "exported_count": exported_count,
+                "skipped_count": skipped_count,
+                "failed_count": failed_count,
+            }
+        )
+
+    def _finalize_run_result(self, result: dict[str, Any]) -> dict[str, Any]:
+        try:
+            result["hive_sync"] = sync_hot_demands_to_hive(self.config, self.repository)
+        except Exception as exc:
+            result["hive_sync_error"] = str(exc)
+        return result
+
+    def export_demand_terms_if_needed(
+        self,
+        *,
+        record: dict[str, Any],
+        match_result: dict[str, Any],
+        trend_result: dict[str, Any] | None,
+    ) -> int:
+        export_rows = attach_wxindex_metadata(
+            build_demand_export_rows(
+                match_result,
+                contribution_points=(
+                    record.get("contribution_points_json")
+                    if isinstance(record.get("contribution_points_json"), dict)
+                    else None
+                ),
+                trend_json=trend_result if isinstance(trend_result, dict) else None,
+            ),
+            trend_result if isinstance(trend_result, dict) else None,
+        )
+        if not export_rows:
+            return 0
+
+        self.repository.replace_demand_export_rows(
+            record_id=int(record["id"]),
+            source=str(record.get("source") or ""),
+            hot_title=str(record.get("title") or ""),
+            article_title=str(record.get("article_title") or ""),
+            rows=export_rows,
+        )
+        return len(export_rows)
+
+    def match_record(
+        self,
+        *,
+        record: dict[str, Any],
+        demand_name_set: list[str],
+        demand_cache_run_id: int,
+    ) -> dict[str, Any]:
+        contribution_points = record.get("contribution_points_json")
+        if not isinstance(contribution_points, dict):
+            raise HotContentFlowError(f"invalid contribution_points_json id={record['id']}")
+
+        channel_content_id = str(
+            contribution_points.get("channelContentId") or record.get("unique_key") or ""
+        ).strip()
+        words_rows = contribution_points.get("高贡献词列表") or []
+        if not isinstance(words_rows, list):
+            words_rows = []
+        word_list = [
+            str(item.get("词") or "").strip()
+            for item in words_rows
+            if isinstance(item, dict) and str(item.get("词") or "").strip()
+        ]
+
+        llm_result = self.llm_match_single_article(
+            channel_content_id=channel_content_id or str(record["unique_key"]),
+            words=word_list,
+            demand_name_set=demand_name_set,
+        )
+        demand_lookup = _build_demand_lookup(demand_name_set)
+        matched_map: dict[str, list[dict[str, str]]] = {}
+        for item in llm_result.get("matched") or []:
+            if not isinstance(item, dict):
+                continue
+            word = str(item.get("title") or item.get("word") or item.get("词") or "").strip()
+            demand_name = str(item.get("demand_name") or "").strip()
+            reason = str(item.get("reason") or "").strip()
+            if not word or word not in word_list:
+                continue
+            canonical_demand_name = _resolve_demand_name(demand_name, demand_lookup)
+            if canonical_demand_name is None:
+                continue
+            matched_map.setdefault(word, []).append(
+                {
+                    "demand_name": canonical_demand_name,
+                    "reason": reason,
+                }
+            )
+
+        output_words: list[dict[str, Any]] = []
+        matched_word_rows: list[dict[str, Any]] = []
+        for row in words_rows:
+            if not isinstance(row, dict):
+                continue
+            word = str(row.get("词") or "").strip()
+            item = {"词": word, "贡献度": row.get("贡献度")}
+            if word in matched_map:
+                item["匹配需求列表"] = matched_map[word]
+                matched_word_rows.append(item)
+            output_words.append(item)
+
+        matched_points = self.filter_matched_points(
+            contribution_points.get("点列表"),
+            set(matched_map.keys()),
+        )
+        return {
+            "channelContentId": channel_content_id,
+            "demand_cache_run_id": demand_cache_run_id,
+            "高贡献词列表": output_words,
+            "匹配到需求的词列表": matched_word_rows,
+            "点列表": matched_points,
+        }
+
+    def sanitize_match_result(
+        self,
+        match_result: dict[str, Any],
+        *,
+        demand_name_set: list[str],
+        demand_cache_run_id: int,
+    ) -> dict[str, Any]:
+        demand_lookup = _build_demand_lookup(demand_name_set)
+        words_rows = match_result.get("高贡献词列表") or []
+        if not isinstance(words_rows, list):
+            words_rows = []
+
+        output_words: list[dict[str, Any]] = []
+        matched_word_rows: list[dict[str, Any]] = []
+        valid_words: set[str] = set()
+        for row in words_rows:
+            if not isinstance(row, dict):
+                continue
+            word = str(row.get("词") or "").strip()
+            item = {"词": word, "贡献度": row.get("贡献度")}
+            match_rows = row.get("匹配需求列表") or []
+            if not isinstance(match_rows, list):
+                match_rows = []
+            valid_match_rows: list[dict[str, str]] = []
+            for match in match_rows:
+                if not isinstance(match, dict):
+                    continue
+                demand_name = str(match.get("demand_name") or "").strip()
+                canonical_demand_name = _resolve_demand_name(demand_name, demand_lookup)
+                if canonical_demand_name is None:
+                    continue
+                valid_match_rows.append(
+                    {
+                        "demand_name": canonical_demand_name,
+                        "reason": str(match.get("reason") or "").strip(),
+                    }
+                )
+            if word and valid_match_rows:
+                item["匹配需求列表"] = valid_match_rows
+                matched_word_rows.append(item)
+                valid_words.add(word)
+            output_words.append(item)
+
+        return {
+            "channelContentId": str(match_result.get("channelContentId") or ""),
+            "demand_cache_run_id": demand_cache_run_id,
+            "高贡献词列表": output_words,
+            "匹配到需求的词列表": matched_word_rows,
+            "点列表": self.filter_matched_points(
+                match_result.get("点列表"),
+                valid_words,
+            ),
+        }
+
+    def llm_match_single_article(
+        self,
+        *,
+        channel_content_id: str,
+        words: list[str],
+        demand_name_set: list[str],
+    ) -> dict[str, Any]:
+        if not words:
+            return {"source": channel_content_id, "matched": []}
+
+        system_prompt = """
+        #角色
+        你是一个专业的语义匹配分析专家,擅长判断词语之间的语义关联性。
+        # 任务
+        我会提供两组数据:
+        1. 热点词列表:一组待匹配的热点词语
+        2. 需求词库:一组已有的需求词语
+        请你逐一分析每个热点词,判断它是否能与需求词库中的某个/某些需求词匹配上。
+        热点词要等于需求词,或者属于需求词,或者表达了相同含义。
+        # 输出规则
+        1. 严格输出 JSON 对象,禁止输出 JSON 之外的任何内容。
+        # 约束
+        1. 热点词和需求词必须来自给定词语,不能创造给定词语之外的词。
+        """
+        user_payload = {
+            "source": channel_content_id,
+            "words": words,
+            "demand_name_set": demand_name_set,
+            "output_schema": {
+                "source": "string",
+                "matched": [
+                    {
+                        "title": "string, must be selected from words",
+                        "demand_name": "string, must be selected from demand_name_set",
+                        "reason": "string",
+                    }
+                ],
+            },
+        }
+
+        last_error: Exception | None = None
+        for attempt in range(1, max(self.config.contribution_match_llm_max_attempts, 1) + 1):
+            try:
+                resp = create_chat_completion(
+                    [
+                        {"role": "system", "content": system_prompt},
+                        {
+                            "role": "user",
+                            "content": json.dumps(user_payload, ensure_ascii=False),
+                        },
+                    ],
+                    model=self.config.contribution_match_llm_model or None,
+                    temperature=0,
+                    max_tokens=max(self.config.contribution_match_llm_max_tokens, 1),
+                )
+                parsed = _extract_json_object(str(resp.get("content") or ""))
+                parsed.setdefault("source", channel_content_id)
+                parsed.setdefault("matched", [])
+                return parsed
+            except (OpenRouterCallError, HotContentFlowError) as exc:
+                last_error = exc
+                if attempt < max(self.config.contribution_match_llm_max_attempts, 1):
+                    time.sleep(max(self.config.contribution_match_llm_retry_sleep_seconds, 0))
+        raise HotContentFlowError(
+            f"llm match failed for channelContentId={channel_content_id}: {last_error}"
+        ) from last_error
+
+    def build_wxindex_trend(
+        self,
+        record: dict[str, Any],
+        match_result: dict[str, Any],
+    ) -> dict[str, Any] | None:
+        matched_word_rows = match_result.get("匹配到需求的词列表") or []
+        if not isinstance(matched_word_rows, list) or not matched_word_rows:
+            return None
+
+        candidate_words = [
+            str(row.get("词") or "").strip()
+            for row in matched_word_rows
+            if isinstance(row, dict) and str(row.get("词") or "").strip()
+        ]
+        if not candidate_words:
+            return None
+
+        channel_content_id = str(
+            match_result.get("channelContentId") or record.get("unique_key") or ""
+        )
+        article_title, body_text = self.extract_article_text(record)
+        if len(candidate_words) == 1:
+            pick = {
+                "selected_word": candidate_words[0],
+                "reason": "only one candidate word",
+            }
+        else:
+            pick = self.llm_pick_best_word(
+                channel_content_id=channel_content_id,
+                article_title=article_title,
+                body_text=body_text,
+                candidate_words=candidate_words,
+            )
+        selected_word = pick["selected_word"]
+        start_ymd, end_ymd = _get_recent_range(self.config.wxindex_lookback_days)
+        wx_payload = {
+            "keyword": selected_word,
+            "start_ymd": start_ymd,
+            "end_ymd": end_ymd,
+        }
+        wx_resp = self.api_client.post_json(self.config.wxindex_api_url, wx_payload)
+        series = _parse_total_scores(wx_resp)
+        latest_score = series[-1]["total_score"] if series else None
+        threshold = float(self.config.wxindex_score_threshold)
+        return {
+            "channelContentId": channel_content_id,
+            "article_title": article_title,
+            "llm_selected_word": selected_word,
+            "llm_reason": pick["reason"],
+            "wxindex": {
+                "keyword": selected_word,
+                "start_ymd": start_ymd,
+                "end_ymd": end_ymd,
+                "total_score_7d": series,
+                "latest_total_score": latest_score,
+                "threshold": threshold,
+                "latest_gt_threshold": (
+                    False if latest_score is None else latest_score > threshold
+                ),
+                "trend": calc_wxindex_trend(series),
+            },
+        }
+
+    def llm_pick_best_word(
+        self,
+        *,
+        channel_content_id: str,
+        article_title: str,
+        body_text: str,
+        candidate_words: list[str],
+    ) -> dict[str, str]:
+        system_prompt = """
+        #角色
+        你是一个专业的语义分析专家,擅长精准概括整篇文章。
+        # 任务
+        我会提供一篇文章的标题、正文和候选词列表,请你选择一个最能代表文章内容的词。
+        # 输出规则
+        1. 严格输出 JSON 对象,禁止输出 JSON 之外的任何内容。
+        """
+        user_payload = {
+            "source": channel_content_id,
+            "article_title": article_title,
+            "article_body_text": body_text,
+            "candidate_words": candidate_words,
+            "output_schema": {
+                "source": "string",
+                "selected_word": "string, must be selected from candidate_words",
+                "reason": "string",
+            },
+            "constraints": [
+                "selected_word 必须来自 candidate_words",
+                "reason 简洁说明,不超过40字",
+                "仅输出 JSON 对象,不要 markdown 代码块",
+            ],
+        }
+
+        last_error: Exception | None = None
+        for attempt in range(1, max(self.config.wxindex_llm_max_attempts, 1) + 1):
+            try:
+                resp = create_chat_completion(
+                    [
+                        {"role": "system", "content": system_prompt},
+                        {
+                            "role": "user",
+                            "content": json.dumps(user_payload, ensure_ascii=False),
+                        },
+                    ],
+                    model=self.config.wxindex_llm_model or None,
+                    temperature=0,
+                    max_tokens=max(self.config.wxindex_llm_max_tokens, 1),
+                )
+                parsed = _extract_json_object(str(resp.get("content") or ""))
+                selected_word = str(parsed.get("selected_word") or "").strip()
+                reason = str(parsed.get("reason") or "").strip()
+                if selected_word not in candidate_words:
+                    raise WxindexSelectionSkipped(
+                        f"selected_word not in candidates for {channel_content_id}: "
+                        f"{selected_word}"
+                    )
+                return {"selected_word": selected_word, "reason": reason}
+            except (OpenRouterCallError, HotContentFlowError) as exc:
+                last_error = exc
+                if attempt < max(self.config.wxindex_llm_max_attempts, 1):
+                    continue
+        raise HotContentFlowError(
+            f"llm pick word failed for {channel_content_id}: {last_error}"
+        ) from last_error
+
+    @staticmethod
+    def filter_matched_points(raw_points: Any, matched_words: set[str]) -> list[dict[str, Any]]:
+        if not matched_words or not isinstance(raw_points, list):
+            return []
+
+        matched_points: list[dict[str, Any]] = []
+        for point in raw_points:
+            if not isinstance(point, dict):
+                continue
+            point_match_rows = point.get("匹配词列表") or []
+            if not isinstance(point_match_rows, list):
+                point_match_rows = []
+            keep_rows = [
+                row
+                for row in point_match_rows
+                if isinstance(row, dict) and str(row.get("词") or "").strip() in matched_words
+            ]
+            if keep_rows:
+                matched_points.append(
+                    {
+                        "来源": str(point.get("来源") or ""),
+                        "点": str(point.get("点") or ""),
+                        "点描述": str(point.get("点描述") or ""),
+                        "匹配词列表": keep_rows,
+                    }
+                )
+        return matched_points
+
+    @staticmethod
+    def extract_article_text(record: dict[str, Any]) -> tuple[str, str]:
+        decode_result = record.get("decode_result_json")
+        target_post = decode_result.get("target_post") if isinstance(decode_result, dict) else {}
+        if not isinstance(target_post, dict):
+            target_post = {}
+        article_title = str(
+            target_post.get("title") or record.get("article_title") or ""
+        ).strip()
+        body_text = str(
+            target_post.get("body_text") or record.get("article_body") or ""
+        ).strip()
+        return article_title, body_text
+
+
+def run_once(config: FlowConfig | None = None) -> dict[str, Any]:
+    flow_config = config or load_flow_config()
+    repository = HotContentRepository(flow_config.mysql)
+    try:
+        api_client = JsonApiClient(
+            timeout_seconds=flow_config.request_timeout_seconds,
+            verify_ssl=flow_config.https_verify_ssl,
+        )
+        service = ContributionPostprocessService(flow_config, repository, api_client)
+        return service.run()
+    finally:
+        repository.close()

+ 640 - 0
app/hot_content/repository.py

@@ -0,0 +1,640 @@
+"""热点内容 MySQL 仓储。"""
+
+from __future__ import annotations
+
+import hashlib
+import json
+from datetime import datetime
+from typing import Any
+
+try:
+    import pymysql
+    from pymysql.cursors import DictCursor
+except ImportError:  # pragma: no cover - runtime dependency check
+    pymysql = None
+    DictCursor = None
+
+from app.hot_content.exceptions import HotContentFlowError
+from app.hot_content.status import ExecutionStatus, PostprocessStatus
+from app.hot_content.types import MysqlConfig
+
+
+def _json_dumps(data: Any) -> str:
+    return json.dumps(data, ensure_ascii=False, separators=(",", ":"))
+
+
+def _json_loads(value: Any) -> Any:
+    if value is None:
+        return None
+    if isinstance(value, (dict, list)):
+        return value
+    if isinstance(value, (bytes, bytearray)):
+        value = value.decode("utf-8")
+    if isinstance(value, str):
+        return json.loads(value)
+    return value
+
+
+def _normalize_demand_names(demand_name_set: list[str]) -> list[str]:
+    names: list[str] = []
+    seen: set[str] = set()
+    for item in demand_name_set:
+        name = str(item).strip()
+        if not name or name in seen:
+            continue
+        seen.add(name)
+        names.append(name)
+    return names
+
+
+def unique_title_key(source: str, title: str) -> str:
+    return hashlib.sha256(f"{source}\n{title}".encode("utf-8")).hexdigest()
+
+
+class HotContentRepository:
+    def __init__(self, config: MysqlConfig):
+        if pymysql is None or DictCursor is None:
+            raise HotContentFlowError("missing dependency: pip install pymysql")
+        self.conn = pymysql.connect(
+            host=config.host,
+            port=config.port,
+            user=config.user,
+            password=config.password,
+            database=config.database,
+            charset=config.charset,
+            autocommit=True,
+            cursorclass=DictCursor,
+        )
+
+    def close(self) -> None:
+        self.conn.close()
+
+    def upsert_record(self, *, source: str, title: str, rank: int | None) -> dict[str, Any]:
+        key = unique_title_key(source, title)
+        sql = """
+            INSERT INTO hot_content_records (
+                unique_key, source, title, hot_rank, execution_status, created_at, updated_at
+            )
+            VALUES (%s, %s, %s, %s, %s, NOW(), NOW())
+            ON DUPLICATE KEY UPDATE
+                id=LAST_INSERT_ID(id),
+                hot_rank=VALUES(hot_rank),
+                updated_at=NOW()
+        """
+        with self.conn.cursor() as cursor:
+            cursor.execute(
+                sql,
+                (
+                    key,
+                    source,
+                    title,
+                    rank,
+                    ExecutionStatus.HOT_SAVED,
+                ),
+            )
+            record_id = int(cursor.lastrowid)
+            cursor.execute(
+                """
+                SELECT
+                    id,
+                    unique_key,
+                    execution_status,
+                    article_title,
+                    article_body,
+                    article_url,
+                    decode_request_result IS NOT NULL AS has_decode_request,
+                    contribution_points_json IS NOT NULL AS has_contribution_points
+                FROM hot_content_records
+                WHERE id = %s
+                """,
+                (record_id,),
+            )
+            row = cursor.fetchone()
+        if not row:
+            raise HotContentFlowError(f"missing hot_content_records id={record_id}")
+        return {
+            "id": int(row["id"]),
+            "unique_key": str(row["unique_key"]),
+            "execution_status": int(row["execution_status"]),
+            "article_title": row.get("article_title"),
+            "article_body": row.get("article_body"),
+            "article_url": row.get("article_url"),
+            "has_decode_request": bool(row.get("has_decode_request")),
+            "has_contribution_points": bool(row.get("has_contribution_points")),
+        }
+
+    def update_status(
+        self,
+        *,
+        record_id: int,
+        status: int,
+        error_message: str | None = None,
+    ) -> None:
+        sql = """
+            UPDATE hot_content_records
+            SET execution_status=%s, error_reason=%s, updated_at=NOW()
+            WHERE id=%s
+        """
+        with self.conn.cursor() as cursor:
+            cursor.execute(sql, (status, error_message, record_id))
+
+    def update_article(
+        self,
+        *,
+        record_id: int,
+        article_title: str,
+        article_body: str,
+        url: str,
+    ) -> None:
+        sql = """
+            UPDATE hot_content_records
+            SET article_title=%s,
+                article_body=%s,
+                article_url=%s,
+                execution_status=%s,
+                error_reason=NULL,
+                updated_at=NOW()
+            WHERE id=%s
+        """
+        with self.conn.cursor() as cursor:
+            cursor.execute(
+                sql,
+                (
+                    article_title,
+                    article_body,
+                    url,
+                    ExecutionStatus.CONTENT_OK,
+                    record_id,
+                ),
+            )
+
+    def update_decode_result(
+        self,
+        *,
+        record_id: int,
+        status: int,
+        request_json: dict[str, Any],
+        response_json: dict[str, Any] | None,
+        error_message: str | None = None,
+    ) -> None:
+        decode_request_result = {
+            "request": request_json,
+            "response": response_json,
+        }
+        sql = """
+            UPDATE hot_content_records
+            SET decode_request_result=%s,
+                execution_status=%s,
+                error_reason=%s,
+                updated_at=NOW()
+            WHERE id=%s
+        """
+        with self.conn.cursor() as cursor:
+            cursor.execute(
+                sql,
+                (
+                    _json_dumps(decode_request_result),
+                    status,
+                    error_message,
+                    record_id,
+                ),
+            )
+
+    def list_decode_result_candidates(self, *, limit: int) -> list[dict[str, Any]]:
+        sql = """
+            SELECT id, unique_key
+            FROM hot_content_records
+            WHERE execution_status IN (%s, %s, %s)
+              AND contribution_points_json IS NULL
+            ORDER BY updated_at ASC, id ASC
+            LIMIT %s
+        """
+        with self.conn.cursor() as cursor:
+            cursor.execute(
+                sql,
+                (
+                    ExecutionStatus.DECODE_SUBMITTED,
+                    ExecutionStatus.DECODE_SUCCESS,
+                    ExecutionStatus.DECODE_PENDING,
+                    limit,
+                ),
+            )
+            rows = cursor.fetchall()
+        return [
+            {
+                "id": int(row["id"]),
+                "unique_key": str(row["unique_key"]),
+            }
+            for row in rows
+        ]
+
+    def save_decode_result_export(
+        self,
+        *,
+        record_id: int,
+        decode_result_json: dict[str, Any],
+        contribution_points_json: dict[str, Any],
+    ) -> None:
+        sql = """
+            UPDATE hot_content_records
+            SET decode_result_json=%s,
+                contribution_points_json=%s,
+                execution_status=%s,
+                error_reason=NULL,
+                updated_at=NOW()
+            WHERE id=%s
+        """
+        with self.conn.cursor() as cursor:
+            cursor.execute(
+                sql,
+                (
+                    _json_dumps(decode_result_json),
+                    _json_dumps(contribution_points_json),
+                    ExecutionStatus.CONTRIBUTION_EXTRACTED,
+                    record_id,
+                ),
+            )
+
+    def get_demand_cache_by_hour(self, *, cache_hour: datetime) -> dict[str, Any] | None:
+        sql = """
+            SELECT
+                id,
+                cache_hour,
+                source_table,
+                partition_dt,
+                demand_name_set_json,
+                item_count,
+                updated_at
+            FROM demand_pool_hourly_cache
+            WHERE cache_hour=%s
+            LIMIT 1
+        """
+        with self.conn.cursor() as cursor:
+            cursor.execute(sql, (cache_hour,))
+            row = cursor.fetchone()
+        if not row:
+            return None
+        demand_name_set = _json_loads(row.get("demand_name_set_json")) or []
+        if not isinstance(demand_name_set, list):
+            demand_name_set = []
+        return {
+            "id": int(row["id"]),
+            "cache_hour": row.get("cache_hour"),
+            "source_table": str(row["source_table"]),
+            "partition_dt": row.get("partition_dt"),
+            "demand_name_set": [
+                str(name).strip()
+                for name in demand_name_set
+                if str(name).strip()
+            ],
+            "item_count": int(row.get("item_count") or 0),
+            "updated_at": row.get("updated_at"),
+        }
+
+    def save_demand_cache_set(
+        self,
+        *,
+        cache_hour: datetime,
+        source_table: str,
+        partition_dt: str | None,
+        excluded_strategy: str,
+        top_n: int,
+        demand_name_set: list[str],
+    ) -> int:
+        sql = """
+            INSERT INTO demand_pool_hourly_cache (
+                cache_hour,
+                source_table,
+                partition_dt,
+                excluded_strategy,
+                top_n,
+                demand_name_set_json,
+                item_count,
+                created_at,
+                updated_at
+            )
+            VALUES (%s, %s, %s, %s, %s, %s, %s, NOW(), NOW())
+            ON DUPLICATE KEY UPDATE
+                id=LAST_INSERT_ID(id),
+                source_table=VALUES(source_table),
+                partition_dt=VALUES(partition_dt),
+                excluded_strategy=VALUES(excluded_strategy),
+                top_n=VALUES(top_n),
+                demand_name_set_json=VALUES(demand_name_set_json),
+                item_count=VALUES(item_count),
+                updated_at=NOW()
+        """
+        normalized_names = _normalize_demand_names(demand_name_set)
+        with self.conn.cursor() as cursor:
+            cursor.execute(
+                sql,
+                (
+                    cache_hour,
+                    source_table,
+                    partition_dt,
+                    excluded_strategy,
+                    top_n,
+                    _json_dumps(normalized_names),
+                    len(normalized_names),
+                ),
+            )
+            return int(cursor.lastrowid)
+
+    def list_postprocess_candidates(self, *, limit: int) -> list[dict[str, Any]]:
+        sql = """
+            SELECT
+                id,
+                unique_key,
+                source,
+                title,
+                article_title,
+                article_body,
+                demand_cache_run_id,
+                decode_result_json,
+                contribution_points_json,
+                contribution_demand_match_json
+            FROM hot_content_records
+            WHERE contribution_points_json IS NOT NULL
+              AND postprocess_status IN (%s, %s, %s)
+            ORDER BY updated_at ASC, id ASC
+            LIMIT %s
+        """
+        with self.conn.cursor() as cursor:
+            cursor.execute(
+                sql,
+                (
+                    PostprocessStatus.PENDING,
+                    PostprocessStatus.DEMAND_MATCHED,
+                    PostprocessStatus.FAILED,
+                    limit,
+                ),
+            )
+            rows = cursor.fetchall()
+        return [
+            {
+                "id": int(row["id"]),
+                "unique_key": str(row["unique_key"]),
+                "source": str(row.get("source") or ""),
+                "title": str(row.get("title") or ""),
+                "article_title": row.get("article_title"),
+                "article_body": row.get("article_body"),
+                "demand_cache_run_id": row.get("demand_cache_run_id"),
+                "decode_result_json": _json_loads(row.get("decode_result_json")),
+                "contribution_points_json": _json_loads(row.get("contribution_points_json")),
+                "contribution_demand_match_json": _json_loads(
+                    row.get("contribution_demand_match_json")
+                ),
+            }
+            for row in rows
+        ]
+
+    def save_contribution_demand_match(
+        self,
+        *,
+        record_id: int,
+        demand_cache_run_id: int,
+        match_json: dict[str, Any],
+    ) -> None:
+        sql = """
+            UPDATE hot_content_records
+            SET demand_cache_run_id=%s,
+                contribution_demand_match_json=%s,
+                postprocess_status=%s,
+                postprocess_error_reason=NULL,
+                updated_at=NOW()
+            WHERE id=%s
+        """
+        with self.conn.cursor() as cursor:
+            cursor.execute(
+                sql,
+                (
+                    demand_cache_run_id,
+                    _json_dumps(match_json),
+                    PostprocessStatus.DEMAND_MATCHED,
+                    record_id,
+                ),
+            )
+
+    def save_wxindex_trend(
+        self,
+        *,
+        record_id: int,
+        trend_json: dict[str, Any],
+    ) -> None:
+        sql = """
+            UPDATE hot_content_records
+            SET wxindex_trend_json=%s,
+                postprocess_status=%s,
+                postprocess_error_reason=NULL,
+                updated_at=NOW()
+            WHERE id=%s
+        """
+        with self.conn.cursor() as cursor:
+            cursor.execute(
+                sql,
+                (
+                    _json_dumps(trend_json),
+                    PostprocessStatus.WXINDEX_DONE,
+                    record_id,
+                ),
+            )
+
+    def update_postprocess_status(
+        self,
+        *,
+        record_id: int,
+        status: int,
+        error_message: str | None = None,
+    ) -> None:
+        sql = """
+            UPDATE hot_content_records
+            SET postprocess_status=%s,
+                postprocess_error_reason=%s,
+                updated_at=NOW()
+            WHERE id=%s
+        """
+        with self.conn.cursor() as cursor:
+            cursor.execute(sql, (status, error_message, record_id))
+
+    def replace_demand_export_rows(
+        self,
+        *,
+        record_id: int,
+        source: str,
+        hot_title: str,
+        article_title: str,
+        rows: list[dict[str, Any]],
+    ) -> None:
+        self._ensure_demand_export_table()
+        delete_sql = "DELETE FROM hot_content_demand_exports WHERE record_id=%s"
+        insert_sql = """
+            INSERT INTO hot_content_demand_exports (
+                record_id,
+                source,
+                hot_title,
+                article_title,
+                item_type,
+                item_text,
+                point_category,
+                matched_demand,
+                contribution_score,
+                wxindex_keyword,
+                wxindex_latest_score,
+                wxindex_trend,
+                created_at,
+                updated_at
+            )
+            VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, NOW(), NOW())
+        """
+        with self.conn.cursor() as cursor:
+            cursor.execute(delete_sql, (record_id,))
+            insert_rows = [
+                (
+                    record_id,
+                    source,
+                    hot_title,
+                    article_title,
+                    str(item.get("item_type") or ""),
+                    str(item.get("item_text") or ""),
+                    str(item.get("point_category") or ""),
+                    str(item.get("matched_demand") or ""),
+                    item.get("contribution_score"),
+                    str(item.get("wxindex_keyword") or ""),
+                    float(item.get("wxindex_latest_score") or 0),
+                    str(item.get("wxindex_trend") or ""),
+                )
+                for item in rows
+                if str(item.get("item_type") or "").strip()
+                and str(item.get("item_text") or "").strip()
+            ]
+            if insert_rows:
+                cursor.executemany(insert_sql, insert_rows)
+
+    def list_demand_export_groups(self) -> list[dict[str, Any]]:
+        self._ensure_demand_export_table()
+        sql = """
+            SELECT
+                record_id,
+                item_type,
+                item_text,
+                point_category,
+                matched_demand,
+                wxindex_latest_score
+            FROM hot_content_demand_exports
+            ORDER BY record_id ASC, id ASC
+        """
+        with self.conn.cursor() as cursor:
+            cursor.execute(sql)
+            rows = cursor.fetchall()
+
+        grouped: dict[int, list[dict[str, Any]]] = {}
+        for row in rows:
+            record_id = int(row["record_id"])
+            grouped.setdefault(record_id, []).append(
+                {
+                    "item_type": str(row.get("item_type") or ""),
+                    "item_text": str(row.get("item_text") or ""),
+                    "point_category": str(row.get("point_category") or ""),
+                    "matched_demand": str(row.get("matched_demand") or ""),
+                    "wxindex_latest_score": float(row.get("wxindex_latest_score") or 0),
+                }
+            )
+        return [
+            {"record_id": record_id, "export_rows": export_rows}
+            for record_id, export_rows in grouped.items()
+        ]
+
+    def _ensure_demand_export_table(self) -> None:
+        sql = """
+            CREATE TABLE IF NOT EXISTS hot_content_demand_exports (
+                id BIGINT UNSIGNED NOT NULL AUTO_INCREMENT,
+                record_id BIGINT UNSIGNED NOT NULL,
+                source VARCHAR(64) NOT NULL DEFAULT '',
+                hot_title VARCHAR(1024) NOT NULL DEFAULT '',
+                article_title VARCHAR(1024) NOT NULL DEFAULT '',
+                item_type VARCHAR(32) NOT NULL COMMENT '元素/短语',
+                item_text VARCHAR(1024) NOT NULL,
+                point_category VARCHAR(32) NOT NULL DEFAULT '' COMMENT '点类型:灵感点/目的点/关键点',
+                matched_demand VARCHAR(1024) NOT NULL DEFAULT '' COMMENT '匹配到的需求',
+                contribution_score DOUBLE NULL COMMENT '贡献分,仅元素有值',
+                wxindex_keyword VARCHAR(1024) NOT NULL DEFAULT '' COMMENT '获取微信指数的元素',
+                wxindex_latest_score DOUBLE NOT NULL DEFAULT 0,
+                wxindex_trend VARCHAR(32) NOT NULL DEFAULT '' COMMENT '微信指数趋势',
+                created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
+                updated_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
+                PRIMARY KEY (id),
+                KEY idx_record_id (record_id),
+                KEY idx_source_type (source, item_type)
+            ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci
+        """
+        with self.conn.cursor() as cursor:
+            cursor.execute(sql)
+            self._ensure_demand_export_column(
+                cursor,
+                "matched_demand",
+                """
+                ALTER TABLE hot_content_demand_exports
+                ADD COLUMN matched_demand VARCHAR(1024) NOT NULL DEFAULT ''
+                COMMENT '匹配到的需求'
+                AFTER item_text
+                """,
+            )
+            self._ensure_demand_export_column(
+                cursor,
+                "point_category",
+                """
+                ALTER TABLE hot_content_demand_exports
+                ADD COLUMN point_category VARCHAR(32) NOT NULL DEFAULT ''
+                COMMENT '点类型:灵感点/目的点/关键点'
+                AFTER item_text
+                """,
+            )
+            self._ensure_demand_export_column(
+                cursor,
+                "contribution_score",
+                """
+                ALTER TABLE hot_content_demand_exports
+                ADD COLUMN contribution_score DOUBLE NULL
+                COMMENT '贡献分,仅词有值'
+                AFTER matched_demand
+                """,
+            )
+            self._ensure_demand_export_column(
+                cursor,
+                "wxindex_trend",
+                """
+                ALTER TABLE hot_content_demand_exports
+                ADD COLUMN wxindex_trend VARCHAR(32) NOT NULL DEFAULT ''
+                COMMENT '微信指数趋势'
+                AFTER wxindex_latest_score
+                """,
+            )
+            self._ensure_demand_export_column(
+                cursor,
+                "wxindex_keyword",
+                """
+                ALTER TABLE hot_content_demand_exports
+                ADD COLUMN wxindex_keyword VARCHAR(1024) NOT NULL DEFAULT ''
+                COMMENT '获取微信指数的词'
+                AFTER contribution_score
+                """,
+            )
+
+    def _ensure_demand_export_column(
+        self,
+        cursor: Any,
+        column_name: str,
+        alter_sql: str,
+    ) -> None:
+        cursor.execute(
+            """
+            SELECT COUNT(*) AS cnt
+            FROM information_schema.COLUMNS
+            WHERE TABLE_SCHEMA = DATABASE()
+              AND TABLE_NAME = 'hot_content_demand_exports'
+              AND COLUMN_NAME = %s
+            """,
+            (column_name,),
+        )
+        if int((cursor.fetchone() or {}).get("cnt") or 0) == 0:
+            cursor.execute(alter_sql)

+ 17 - 0
app/hot_content/scheduler.py

@@ -0,0 +1,17 @@
+"""热点内容定时任务启动入口。
+
+运行:
+    python -m app.hot_content.scheduler
+    python -m app.hot_content.scheduler --once
+
+推荐使用统一调度入口:
+    python -m app.scheduler
+"""
+
+from __future__ import annotations
+
+from app.scheduler import main
+
+
+if __name__ == "__main__":
+    main()

+ 292 - 0
app/hot_content/service.py

@@ -0,0 +1,292 @@
+"""热点内容流程服务。"""
+
+from __future__ import annotations
+
+from datetime import datetime
+import time
+from typing import Any
+
+from app.hot_content.client import (
+    JsonApiClient,
+    build_url,
+    extract_decode_item_map,
+    extract_keyword_items,
+    extract_rank_items,
+    pick_first_valid_content,
+    render_template,
+)
+from app.hot_content.config import load_flow_config
+from app.hot_content.exceptions import HotContentFlowError
+from app.hot_content.repository import HotContentRepository, unique_title_key
+from app.hot_content.status import ExecutionStatus, decode_api_status_to_code
+from app.hot_content.timezone import SHANGHAI_TZ
+from app.hot_content.types import FlowConfig
+
+
+HOT_RANK_PAYLOAD = {"sort_type": "最热", "category": "news", "cursor": 0}
+KEYWORD_PAYLOAD_TEMPLATE = {"keyword": "{title}", "cursor": 0}
+
+
+class HotContentFlowService:
+    def __init__(
+        self,
+        config: FlowConfig,
+        repository: HotContentRepository,
+        api_client: JsonApiClient,
+    ):
+        self.config = config
+        self.repository = repository
+        self.api_client = api_client
+
+    def run(self) -> dict[str, Any]:
+        hot_titles = self.fetch_and_save_hot_titles()
+        selected_contents = self.search_and_save_contents(hot_titles)
+        decode_resp = self.submit_decode_tasks(selected_contents)
+        return self.build_summary(hot_titles, selected_contents, decode_resp)
+
+    def fetch_and_save_hot_titles(self) -> list[dict[str, Any]]:
+        hot_url = build_url(self.config.crawapi_base_url, self.config.hot_rank_path)
+        saved_titles: list[dict[str, Any]] = []
+        seen_keys: set[str] = set()
+        resp = self.api_client.post_json(hot_url, HOT_RANK_PAYLOAD)
+
+        for source_config in self.config.sources:
+            rank_items = extract_rank_items(resp, source_config.source)
+            for rank_item in rank_items[: source_config.count]:
+                title = str(rank_item.get("title") or "").strip()
+                if not title:
+                    continue
+                unique_key = unique_title_key(source_config.source, title)
+                if unique_key in seen_keys:
+                    continue
+                seen_keys.add(unique_key)
+
+                record = self.repository.upsert_record(
+                    source=source_config.source,
+                    title=title,
+                    rank=_to_int_or_none(rank_item.get("rank")),
+                )
+                saved_titles.append(
+                    {
+                        "id": record["id"],
+                        "unique_key": record["unique_key"],
+                        "execution_status": record["execution_status"],
+                        "article_title": record["article_title"],
+                        "article_body": record["article_body"],
+                        "article_url": record["article_url"],
+                        "has_decode_request": record["has_decode_request"],
+                        "has_contribution_points": record["has_contribution_points"],
+                        "source": source_config.source,
+                        "title": title,
+                        "rank": _to_int_or_none(rank_item.get("rank")),
+                        "source_config": source_config,
+                    }
+                )
+        return saved_titles
+
+    def search_and_save_contents(self, hot_titles: list[dict[str, Any]]) -> list[dict[str, Any]]:
+        selected_contents: list[dict[str, Any]] = []
+        for hot in hot_titles:
+            execution_status = int(hot.get("execution_status") or 0)
+            if execution_status >= ExecutionStatus.DECODE_SUBMITTED or hot.get(
+                "has_contribution_points"
+            ):
+                continue
+
+            existing_body = str(hot.get("article_body") or "").strip()
+            existing_title = str(hot.get("article_title") or "").strip()
+            if execution_status == ExecutionStatus.CONTENT_OK and existing_title and existing_body:
+                selected_contents.append(
+                    {
+                        "record_id": hot["id"],
+                        "unique_key": hot["unique_key"],
+                        "hot_title": hot["title"],
+                        "status": "ok",
+                        "content_title": existing_title,
+                        "body_text": existing_body,
+                        "url": str(hot.get("article_url") or ""),
+                    }
+                )
+                continue
+
+            keyword_url = build_url(self.config.crawapi_base_url, self.config.keyword_search_path)
+            payload = render_template(
+                KEYWORD_PAYLOAD_TEMPLATE,
+                {
+                    "title": hot["title"],
+                    "source": hot["source"],
+                    "record_id": str(hot["id"]),
+                    "unique_key": str(hot["unique_key"]),
+                    "hot_title_id": str(hot["id"]),
+                },
+            )
+            for attempt in range(1, 4):  # max 3 retries
+                try:
+                    resp = self.api_client.post_json(keyword_url, payload)
+                    selected = pick_first_valid_content(extract_keyword_items(resp))
+                    if selected is None:
+                        self.repository.update_status(
+                            record_id=hot["id"],
+                            status=ExecutionStatus.NO_VALID_CONTENT,
+                        )
+                        selected_contents.append(
+                            {
+                                "record_id": hot["id"],
+                                "unique_key": hot["unique_key"],
+                                "hot_title": hot["title"],
+                                "status": "no_valid_content",
+                            }
+                        )
+                        break
+
+                    self.repository.update_article(
+                        record_id=hot["id"],
+                        article_title=selected["content_title"],
+                        article_body=selected["body_text"],
+                        url=selected["url"],
+                    )
+                    selected_contents.append(
+                        {
+                            "record_id": hot["id"],
+                            "unique_key": hot["unique_key"],
+                            "hot_title": hot["title"],
+                            "status": "ok",
+                            **selected,
+                        }
+                    )
+                    break
+                except HotContentFlowError as exc:
+                    if attempt < 3:
+                        # 简单退避,避免对不稳定接口造成瞬时压力
+                        time.sleep(attempt)
+                        continue
+                    self.repository.update_status(
+                        record_id=hot["id"],
+                        status=ExecutionStatus.CONTENT_REQUEST_FAILED,
+                        error_message=str(exc),
+                    )
+                    selected_contents.append(
+                        {
+                            "record_id": hot["id"],
+                            "unique_key": hot["unique_key"],
+                            "hot_title": hot["title"],
+                            "status": "content_request_failed",
+                            "error": str(exc),
+                        }
+                    )
+        return selected_contents
+
+    def submit_decode_tasks(self, selected_contents: list[dict[str, Any]]) -> dict[str, Any]:
+        decode_items: list[dict[str, Any]] = []
+        request_count = 0
+        failed_request_count = 0
+        for item in selected_contents:
+            if item.get("status") != "ok":
+                continue
+            record_id = int(item["record_id"])
+            channel_content_id = str(item["unique_key"])
+            title = str(item.get("content_title") or "").strip()
+            body_text = str(item.get("body_text") or "").strip()
+            if not title or not body_text:
+                continue
+
+            post = {
+                "channelContentId": channel_content_id,
+                "title": title,
+                "bodyText": body_text,
+                "contentModal": 3,
+            }
+            decode_payload = {
+                "params": {
+                    "configId": self.config.decode_config_id,
+                    "skipCompleted": False,
+                    "posts": [post],
+                }
+            }
+
+            request_count += 1
+            try:
+                decode_resp = self.api_client.post_json(self.config.decode_api_url, decode_payload)
+            except HotContentFlowError as exc:
+                decode_resp = {"code": -1, "data": [], "msg": str(exc)}
+                failed_request_count += 1
+
+            decode_item_map = extract_decode_item_map(decode_resp)
+            decode_item = decode_item_map.get(channel_content_id, {})
+            status = decode_api_status_to_code(
+                str(decode_item.get("status") or ""),
+                has_success_response=int(decode_resp.get("code") or 0) == 0,
+            )
+
+            error_message = None
+            if int(decode_resp.get("code") or 0) != 0:
+                error_message = str(decode_resp.get("msg") or decode_resp)
+
+            self.repository.update_decode_result(
+                record_id=record_id,
+                status=status,
+                request_json=decode_payload,
+                response_json=decode_item or decode_resp,
+                error_message=error_message,
+            )
+            if decode_item:
+                decode_items.append(decode_item)
+
+        if request_count == 0:
+            return {"code": 0, "data": [], "msg": "no_valid_posts", "request_count": 0}
+        return {
+            "code": 0 if failed_request_count == 0 else -1,
+            "data": decode_items,
+            "request_count": request_count,
+            "failed_request_count": failed_request_count,
+        }
+
+    def build_summary(
+        self,
+        hot_titles: list[dict[str, Any]],
+        selected_contents: list[dict[str, Any]],
+        decode_resp: dict[str, Any],
+    ) -> dict[str, Any]:
+        decode_items = decode_resp.get("data") if isinstance(decode_resp, dict) else []
+        decode_items = decode_items if isinstance(decode_items, list) else []
+        return {
+            "run_at": datetime.now(SHANGHAI_TZ).isoformat(),
+            "source_count": len(self.config.sources),
+            "hot_title_count": len(hot_titles),
+            "selected_ok_count": sum(1 for item in selected_contents if item.get("status") == "ok"),
+            "selected_failed_count": sum(
+                1 for item in selected_contents if item.get("status") != "ok"
+            ),
+            "decode_post_count": sum(1 for item in selected_contents if item.get("status") == "ok"),
+            "decode_api_code": decode_resp.get("code") if isinstance(decode_resp, dict) else None,
+            "decode_success_count": sum(
+                1 for item in decode_items if str(item.get("status") or "") == "SUCCESS"
+            ),
+            "decode_pending_count": sum(
+                1 for item in decode_items if str(item.get("status") or "") == "PENDING"
+            ),
+            "decode_failed_count": sum(
+                1 for item in decode_items if str(item.get("status") or "") == "FAILED"
+            ),
+        }
+
+
+def run_once(config: FlowConfig | None = None) -> dict[str, Any]:
+    flow_config = config or load_flow_config()
+    repository = HotContentRepository(flow_config.mysql)
+    try:
+        api_client = JsonApiClient(
+            timeout_seconds=flow_config.request_timeout_seconds,
+            verify_ssl=flow_config.https_verify_ssl,
+        )
+        service = HotContentFlowService(flow_config, repository, api_client)
+        return service.run()
+    finally:
+        repository.close()
+
+
+def _to_int_or_none(value: Any) -> int | None:
+    try:
+        return int(value)
+    except (TypeError, ValueError):
+        return None

+ 42 - 0
app/hot_content/status.py

@@ -0,0 +1,42 @@
+"""热点内容流程状态码。"""
+
+from __future__ import annotations
+
+
+class ExecutionStatus:
+    HOT_SAVED = 10
+    NO_VALID_CONTENT = 15
+    CONTENT_OK = 20
+    CONTENT_REQUEST_FAILED = 25
+    DECODE_SUBMITTED = 30
+    DECODE_SUCCESS = 40
+    DECODE_PENDING = 41
+    DECODE_FAILED = 42
+    DECODE_REQUEST_FAILED = 45
+    DECODE_RESULT_FAILED = 50
+    CONTRIBUTION_EXTRACTED = 60
+
+
+class PostprocessStatus:
+    PENDING = 0
+    DEMAND_MATCHED = 10
+    WXINDEX_DONE = 20
+    SKIPPED = 90
+    FAILED = 99
+
+
+DECODE_API_STATUS_MAP = {
+    "SUCCESS": ExecutionStatus.DECODE_SUCCESS,
+    "PENDING": ExecutionStatus.DECODE_PENDING,
+    "FAILED": ExecutionStatus.DECODE_FAILED,
+}
+
+
+def decode_api_status_to_code(status: str | None, *, has_success_response: bool) -> int:
+    if status:
+        mapped = DECODE_API_STATUS_MAP.get(status.strip().upper())
+        if mapped is not None:
+            return mapped
+    if has_success_response:
+        return ExecutionStatus.DECODE_SUBMITTED
+    return ExecutionStatus.DECODE_REQUEST_FAILED

+ 17 - 0
app/hot_content/timezone.py

@@ -0,0 +1,17 @@
+"""时区工具。"""
+
+from __future__ import annotations
+
+from datetime import timedelta, timezone
+
+try:
+    from zoneinfo import ZoneInfo
+except ImportError:  # pragma: no cover - Python < 3.9 compatibility
+    ZoneInfo = None
+
+
+SHANGHAI_TZ = (
+    ZoneInfo("Asia/Shanghai")
+    if ZoneInfo is not None
+    else timezone(timedelta(hours=8), "Asia/Shanghai")
+)

+ 57 - 0
app/hot_content/types.py

@@ -0,0 +1,57 @@
+"""热点内容流程类型定义。"""
+
+from __future__ import annotations
+
+from dataclasses import dataclass
+
+
+@dataclass(frozen=True)
+class HotSourceConfig:
+    source: str
+    count: int = 10
+
+
+@dataclass(frozen=True)
+class MysqlConfig:
+    host: str
+    port: int
+    user: str
+    password: str
+    database: str
+    charset: str = "utf8mb4"
+
+
+@dataclass(frozen=True)
+class FlowConfig:
+    crawapi_base_url: str
+    hot_rank_path: str
+    keyword_search_path: str
+    decode_api_url: str
+    decode_result_api_url: str
+    decode_config_id: int
+    request_timeout_seconds: int
+    https_verify_ssl: bool
+    hot_flow_cron_hours: str
+    hot_flow_cron_minute: int
+    schedule_interval_seconds: int
+    decode_result_interval_seconds: int
+    decode_result_batch_size: int
+    contribution_score_threshold: float
+    demand_pool_source_table: str
+    demand_pool_excluded_strategy: str
+    demand_pool_top_n: int
+    hot_demand_pool_strategy: str
+    wxindex_score_threshold: float
+    postprocess_batch_size: int
+    contribution_match_llm_model: str
+    contribution_match_llm_max_attempts: int
+    contribution_match_llm_retry_sleep_seconds: float
+    contribution_match_llm_max_tokens: int
+    wxindex_llm_model: str
+    wxindex_llm_max_attempts: int
+    wxindex_llm_max_tokens: int
+    wxindex_api_url: str
+    wxindex_lookback_days: int
+    wxindex_score_threshold: float
+    sources: list[HotSourceConfig]
+    mysql: MysqlConfig

+ 74 - 0
app/hot_content/wxindex_trend.py

@@ -0,0 +1,74 @@
+"""微信指数趋势计算工具。"""
+
+from __future__ import annotations
+
+import math
+from typing import Any
+
+
+MIN_TREND_POINTS = 4
+MAX_TREND_POINTS = 7
+UP_FIT_CHANGE_RATE = 0.04
+UP_WINDOW_CHANGE_RATE = 0.02
+DOWN_FIT_CHANGE_RATE = -0.04
+DOWN_WINDOW_CHANGE_RATE = -0.02
+
+
+def _median(values: list[float]) -> float:
+    if not values:
+        return 0.0
+    ordered = sorted(values)
+    mid = len(ordered) // 2
+    if len(ordered) % 2:
+        return ordered[mid]
+    return (ordered[mid - 1] + ordered[mid]) / 2
+
+
+def _extract_recent_scores(series: list[dict[str, Any]]) -> list[float]:
+    scored_rows: list[tuple[str, int, float]] = []
+    for index, row in enumerate(series):
+        if not isinstance(row, dict):
+            continue
+        try:
+            score = float(row.get("total_score"))
+        except (TypeError, ValueError):
+            continue
+        if math.isnan(score) or score < 0:
+            continue
+        ymd = str(row.get("ymd") or "").strip()
+        scored_rows.append((ymd, index, score))
+
+    scored_rows.sort(key=lambda item: (item[0], item[1]))
+    return [score for _, _, score in scored_rows[-MAX_TREND_POINTS:]]
+
+
+def _theil_sen_slope(values: list[float]) -> float:
+    slopes: list[float] = []
+    for start_index, start_value in enumerate(values):
+        for end_index in range(start_index + 1, len(values)):
+            slopes.append((values[end_index] - start_value) / (end_index - start_index))
+    return _median(slopes)
+
+
+def calc_wxindex_trend(series: list[dict[str, Any]]) -> str:
+    """按最近 7 天整体走势计算趋势,避免被最后一天波动误导。"""
+    scores = _extract_recent_scores(series)
+    if len(scores) < MIN_TREND_POINTS:
+        return "未知"
+
+    log_scores = [math.log1p(score) for score in scores]
+    slope = _theil_sen_slope(log_scores)
+    fit_change_rate = math.expm1(slope * (len(log_scores) - 1))
+
+    early_avg = sum(scores[:3]) / 3
+    late_avg = sum(scores[-3:]) / 3
+    window_change_rate = (late_avg - early_avg) / max(early_avg, 1.0)
+
+    if fit_change_rate >= UP_FIT_CHANGE_RATE and window_change_rate >= UP_WINDOW_CHANGE_RATE:
+        return "上升"
+    if (
+        fit_change_rate <= DOWN_FIT_CHANGE_RATE
+        and window_change_rate <= DOWN_WINDOW_CHANGE_RATE
+    ):
+        return "下降"
+    return "平稳"

+ 161 - 0
app/scheduler.py

@@ -0,0 +1,161 @@
+"""统一定时任务调度入口。"""
+
+from __future__ import annotations
+
+import argparse
+import json
+import sys
+from datetime import datetime, timedelta
+from pathlib import Path
+from typing import Any
+
+PROJECT_ROOT = Path(__file__).resolve().parents[1]
+if str(PROJECT_ROOT) not in sys.path:
+    sys.path.insert(0, str(PROJECT_ROOT))
+
+from app.hot_content.decode_result_service import run_once as run_decode_result_once
+from app.hot_content.config import load_flow_config
+from app.hot_content.postprocess_service import run_once as run_postprocess_once
+from app.hot_content.service import run_once
+from app.hot_content.timezone import SHANGHAI_TZ
+from app.hot_content.types import FlowConfig
+
+
+def _import_blocking_scheduler() -> Any:
+    try:
+        from apscheduler.schedulers.blocking import BlockingScheduler
+    except ImportError as exc:
+        raise RuntimeError("缺少依赖:请先执行 pip install -r requirements.txt") from exc
+    return BlockingScheduler
+
+
+def run_hot_content_job(config: FlowConfig) -> None:
+    try:
+        summary = run_once(config)
+        print(json.dumps(summary, ensure_ascii=False, indent=2))
+    except Exception as exc:
+        print(f"hot content flow failed: {exc}", file=sys.stderr)
+
+
+def run_decode_result_job(config: FlowConfig) -> None:
+    """解构结果拉取 -> 后处理 -> 写入 ODPS 需求表。"""
+    try:
+        summary: dict[str, Any] = {"decode_result": run_decode_result_once(config)}
+        try:
+            summary["postprocess"] = run_postprocess_once(config)
+        except Exception as exc:
+            summary["postprocess_error"] = str(exc)
+            print(f"postprocess flow failed: {exc}", file=sys.stderr)
+        print(json.dumps(summary, ensure_ascii=False, indent=2))
+    except Exception as exc:
+        print(f"decode result flow failed: {exc}", file=sys.stderr)
+
+
+def run_postprocess_job(config: FlowConfig) -> None:
+    try:
+        summary = run_postprocess_once(config)
+        print(json.dumps({"job": "postprocess", "summary": summary}, ensure_ascii=False, indent=2))
+    except Exception as exc:
+        print(f"postprocess flow failed: {exc}", file=sys.stderr)
+
+
+def register_hot_content_job(scheduler: Any, config: FlowConfig) -> None:
+    scheduler.add_job(
+        run_hot_content_job,
+        trigger="cron",
+        hour=config.hot_flow_cron_hours,
+        minute=config.hot_flow_cron_minute,
+        timezone=SHANGHAI_TZ,
+        args=[config],
+        id="hot_content_flow",
+        name="热点内容抓取搜索解构流程",
+        replace_existing=True,
+        coalesce=True,
+        max_instances=1,
+    )
+
+
+def register_decode_result_job(scheduler: Any, config: FlowConfig) -> None:
+    interval_seconds = max(config.decode_result_interval_seconds, 60)
+    scheduler.add_job(
+        run_decode_result_job,
+        trigger="interval",
+        seconds=interval_seconds,
+        args=[config],
+        id="decode_result_flow",
+        name="解构结果拉取、后处理与 ODPS 需求表写入",
+        replace_existing=True,
+        coalesce=True,
+        max_instances=1,
+        next_run_time=datetime.now(SHANGHAI_TZ) + timedelta(seconds=interval_seconds),
+    )
+
+
+def start_scheduler() -> None:
+    BlockingScheduler = _import_blocking_scheduler()
+    scheduler = BlockingScheduler(timezone=SHANGHAI_TZ)
+    config = load_flow_config()
+    register_hot_content_job(scheduler, config)
+    register_decode_result_job(scheduler, config)
+    print(
+        "scheduler started, timezone=Asia/Shanghai, "
+        "jobs=['hot_content_flow', 'decode_result_flow'], "
+        f"hot_cron={config.hot_flow_cron_hours}:{config.hot_flow_cron_minute:02d}, "
+        f"decode_result_interval={config.decode_result_interval_seconds}s"
+    )
+    scheduler.start()
+
+
+def parse_args() -> argparse.Namespace:
+    parser = argparse.ArgumentParser(description="统一定时任务调度入口")
+    parser.add_argument("--once", action="store_true", help="执行一次,不启动调度器")
+    parser.add_argument(
+        "--job",
+        choices=("all", "hot-content", "decode-result", "postprocess"),
+        default="all",
+        help="--once 时选择执行哪个任务",
+    )
+    return parser.parse_args()
+
+
+def main() -> None:
+    args = parse_args()
+    if args.once:
+        config = load_flow_config()
+        if args.job in {"all", "hot-content"}:
+            summary = run_once(config)
+            print(
+                json.dumps(
+                    {"job": "hot_content_flow", "summary": summary},
+                    ensure_ascii=False,
+                    indent=2,
+                )
+            )
+        if args.job in {"all", "decode-result"}:
+            summary = {"decode_result": run_decode_result_once(config)}
+            try:
+                summary["postprocess"] = run_postprocess_once(config)
+            except Exception as exc:
+                summary["postprocess_error"] = str(exc)
+            print(
+                json.dumps(
+                    {"job": "decode_result_flow", "summary": summary},
+                    ensure_ascii=False,
+                    indent=2,
+                )
+            )
+        if args.job in {"postprocess"}:
+            summary = run_postprocess_once(config)
+            print(
+                json.dumps(
+                    {"job": "postprocess", "summary": summary},
+                    ensure_ascii=False,
+                    indent=2,
+                )
+            )
+        return
+    start_scheduler()
+
+
+if __name__ == "__main__":
+    main()

+ 35 - 0
docker-compose.yml

@@ -0,0 +1,35 @@
+services:
+  external-demand-scheduler:
+    build:
+      context: .
+      dockerfile: Dockerfile
+    container_name: external-demand-scheduler
+    restart: unless-stopped
+    environment:
+      TZ: Asia/Shanghai
+      # MySQL
+      MYSQL_HOST: ${MYSQL_HOST:-127.0.0.1}
+      MYSQL_PORT: ${MYSQL_PORT:-3306}
+      MYSQL_USER: ${MYSQL_USER:-root}
+      MYSQL_PASSWORD: ${MYSQL_PASSWORD}
+      MYSQL_DATABASE: ${MYSQL_DATABASE:-external_demand}
+      # ODPS
+      ODPS_ACCESS_ID: ${ODPS_ACCESS_ID}
+      ODPS_ACCESS_KEY: ${ODPS_ACCESS_KEY}
+      ODPS_PROJECT: ${ODPS_PROJECT}
+      ODPS_ENDPOINT: ${ODPS_ENDPOINT:-http://service.odps.aliyun.com/api}
+      ODPS_TUNNEL_ENDPOINT: ${ODPS_TUNNEL_ENDPOINT:-}
+      # LLM
+      OPEN_ROUTER_API_KEY: ${OPEN_ROUTER_API_KEY}
+      OPEN_ROUTER_DEFAULT_MODEL: ${OPEN_ROUTER_DEFAULT_MODEL:-}
+      # 调度
+      HOT_FLOW_CRON_HOURS: ${HOT_FLOW_CRON_HOURS:-6,12,18}
+      HOT_FLOW_CRON_MINUTE: ${HOT_FLOW_CRON_MINUTE:-0}
+      DECODE_RESULT_FLOW_INTERVAL_SECONDS: ${DECODE_RESULT_FLOW_INTERVAL_SECONDS:-1800}
+      # 业务阈值
+      WXINDEX_SCORE_THRESHOLD: ${WXINDEX_SCORE_THRESHOLD:-1000000}
+      DEMAND_POOL_SOURCE_TABLE: ${DEMAND_POOL_SOURCE_TABLE:-dwd_multi_demand_pool_di}
+      HOT_DEMAND_POOL_STRATEGY: ${HOT_DEMAND_POOL_STRATEGY:-近期热点}
+    # 如需挂载本地 .env,可取消注释:
+    # env_file:
+    #   - .env

+ 7 - 0
requirements.txt

@@ -0,0 +1,7 @@
+# Top-level scripts mostly use Python stdlib.
+# These packages support the local app.* modules used by the flow.
+openrouter>=0.9.0,<1.0
+pymysql>=1.1.0
+apscheduler
+pyodps==0.12.6
+