Quellcode durchsuchen

修改新热事件流程

xueyiming vor 1 Woche
Ursprung
Commit
fd1f114c6a

+ 32 - 1
app/core/config.py

@@ -145,7 +145,7 @@ class Settings:
     demand_pool_excluded_strategy: str = "当下供需gap-分词"
     demand_pool_top_n: int = 200
     hot_demand_pool_strategy: str = "新热事件"
-    wxindex_score_threshold: float = 1_000_000.0
+    wxindex_score_threshold: float = 100_000.0
     odps_daily_write_limit: int = 100
 
     postprocess_batch_size: int = 20
@@ -165,6 +165,13 @@ class Settings:
     demand_quality_llm_retry_sleep_seconds: float = 1.0
     demand_quality_llm_max_tokens: int = 4000
 
+    category_filter_llm_model: str = "anthropic/claude-haiku-4-5"
+    category_filter_llm_max_attempts: int = 3
+    category_filter_llm_retry_sleep_seconds: float = 1.0
+    category_filter_llm_max_tokens: int = 1024
+    category_filter_body_max_chars: int = 2000
+    category_filter_item_sleep_seconds: float = 0.0
+
     @classmethod
     def from_env(cls) -> "Settings":
         defaults = cls()
@@ -357,6 +364,30 @@ class Settings:
                 "DEMAND_QUALITY_LLM_MAX_TOKENS",
                 defaults.demand_quality_llm_max_tokens,
             ),
+            category_filter_llm_model=_env(
+                "CATEGORY_FILTER_LLM_MODEL",
+                defaults.category_filter_llm_model,
+            ),
+            category_filter_llm_max_attempts=_env_int(
+                "CATEGORY_FILTER_LLM_MAX_ATTEMPTS",
+                defaults.category_filter_llm_max_attempts,
+            ),
+            category_filter_llm_retry_sleep_seconds=_env_float(
+                "CATEGORY_FILTER_LLM_RETRY_SLEEP_SECONDS",
+                defaults.category_filter_llm_retry_sleep_seconds,
+            ),
+            category_filter_llm_max_tokens=_env_int(
+                "CATEGORY_FILTER_LLM_MAX_TOKENS",
+                defaults.category_filter_llm_max_tokens,
+            ),
+            category_filter_body_max_chars=_env_int(
+                "CATEGORY_FILTER_BODY_MAX_CHARS",
+                defaults.category_filter_body_max_chars,
+            ),
+            category_filter_item_sleep_seconds=_env_float(
+                "CATEGORY_FILTER_ITEM_SLEEP_SECONDS",
+                defaults.category_filter_item_sleep_seconds,
+            ),
         )
 
 

+ 432 - 0
app/hot_content/category_filter.py

@@ -0,0 +1,432 @@
+"""老年人兴趣分类筛选:标题+文章与分类词库整体匹配。"""
+
+from __future__ import annotations
+
+import json
+import re
+import time
+from typing import Any
+
+from app.core.open_router_llm import OpenRouterCallError, create_chat_completion
+from app.hot_content.exceptions import HotContentFlowError
+
+DEFAULT_ELDERLY_CATEGORY_LIST: list[str] = [
+    "花卉风格 高饱和花卉",
+    "地标景观 交通枢纽",
+    "政治事件 领袖纪念",
+    "外交事件 博弈手段",
+    "外交事件 外交访问",
+    "国际政治 双边关系(中美关系)",
+    "国际政治 外交立场(外交理念)",
+    "政策制度 国家统一",
+    "民生政策 惠民政策",
+    "民生政策 免费福利政策",
+    "国家实力 国际地位",
+    "政治运作 政治博弈",
+    "军事安全 能源安全",
+    "文化概念 文化传承",
+    "健康养生 身体健康 老年健康",
+    "公共管理 医疗卫生 医保报销",
+    "公共管理 补贴福利 老年群体补贴",
+    "公共管理 补贴福利 生活服务补贴",
+    "公共管理 治理监督 反腐",
+    "公共管理 政策法规 行业规则",
+    "时政评议 国际关系 中美关系",
+    "时政评议 社会评议 社会公正",
+    "处世智慧 生存策略 生活指导",
+    "处世智慧 生存策略 经验总结",
+    "处世智慧 价值取向 处世哲学",
+    "社会问题 国际问题 两岸议题",
+    "社会问题 国家安全事件",
+    "社会问题 经济形势 农村农业",
+    "民生生活 教育议题",
+    "生活技巧 安全防护 反诈防骗",
+    "军事谋略 战略运筹 战略方案",
+    "爱国情感 民族情感",
+]
+
+TEXT_SYSTEM_PROMPT = """
+你是一个专业的内容分类语义匹配专家。
+
+# 任务
+我会提供一个待筛选项和一组分类词库。
+请判断该待筛选项是否能与分类词库中的某一条目整体匹配。
+- 能整体匹配 → 保留(keep)
+- 不能整体匹配任何条目 → 移除(remove)
+
+# 分类条目结构
+分类词库中每一行是一个不可分割的整体,由若干层级词语组成(以空格分隔,如「一级 二级 三级」)。
+待筛选项必须能够覆盖该分类条目的完整语义,才算匹配成功。
+
+# 匹配标准
+满足以下任意一条,视为整体匹配成功:
+- 待筛选项与分类条目含义相同或高度相近
+- 待筛选项是分类条目的下位概念(待筛选项所指属于该分类条目描述的范畴)
+- 待筛选项与分类条目在用户检索意图上高度一致
+
+# 禁止视为匹配的情况
+- 待筛选项仅与分类条目中的某一个词/层级相关,但未覆盖该条目的完整含义
+- 待筛选项与分类条目只有表面字符重叠,语义方向不同
+- 待筛选项是分类条目的上位概念(范围过宽,不够精确)
+- 两者只是同属某个大类,但具体含义差异明显
+
+# 多词分类条目处理
+若分类条目由多个层级词语组成,待筛选项必须能同时覆盖该条目的所有关键语义成分;
+缺少任意一个关键成分,则不视为匹配。
+
+# 输出规则
+严格输出 JSON 对象,禁止输出 JSON 之外的任何内容。
+仅输出 action、matched_category、reason 三个字段,不要回显 filter_text。
+reason 字段请用中文表述,不要使用英文双引号 "。
+
+# 约束
+待筛选项和匹配到的分类条目必须来自给定输入,不能创造列表之外的词。
+若可匹配多条分类,选择语义最接近的一条。
+"""
+
+ARTICLE_SYSTEM_PROMPT = """
+你是一个专业的中老年内容适老性分类专家。
+
+# 任务
+我会提供一条热榜标题及其对应的文章标题、正文摘要,以及一组老年人感兴趣的内容分类词库。
+请综合标题与文章内容,判断该内容是否属于老年人感兴趣的话题范畴。
+- 能与分类词库中某一条目整体匹配 → 保留(keep)
+- 不能整体匹配任何条目 → 移除(remove)
+
+# 分类条目结构
+分类词库中每一行是一个不可分割的整体,由若干层级词语组成(以空格分隔)。
+待筛内容必须能够覆盖该分类条目的完整语义,才算匹配成功。
+
+# 匹配标准
+满足以下任意一条,视为整体匹配成功:
+- 内容主题与分类条目含义相同或高度相近
+- 内容是分类条目的下位概念(内容所指属于该分类条目描述的范畴)
+- 内容与分类条目在中老年用户的阅读兴趣上高度一致
+
+# 禁止视为匹配的情况
+- 内容仅与分类条目中的某一个词/层级相关,但未覆盖该条目的完整含义
+- 内容与分类条目只有表面字符重叠,语义方向不同
+- 内容明显面向年轻群体(高考送考、追星、职场、游戏等),中老年专属属性弱
+- 两者只是同属某个大类,但具体含义差异明显
+
+# 输出规则
+严格输出 JSON 对象,禁止输出 JSON 之外的任何内容。
+仅输出 id、action、matched_category、reason 四个字段。
+不要输出 filter_text、标题、正文或任何输入内容的回显。
+reason 字段请用中文表述,不要使用英文双引号 "。
+id 必须与输入 record_id 完全一致。
+
+# 约束
+匹配到的分类条目必须来自给定 category_list,不能创造列表之外的词。
+若可匹配多条分类,选择语义最接近的一条。
+"""
+
+
+def _extract_filter_fields_fallback(
+    raw: str,
+    *,
+    record_id: int | None = None,
+) -> dict[str, Any] | None:
+    """标准 json.loads 失败时,按固定 schema 宽松提取字段。"""
+    action_match = re.search(r'"action"\s*:\s*"(keep|remove)"', raw, re.IGNORECASE)
+    if not action_match:
+        return None
+
+    parsed: dict[str, Any] = {"action": action_match.group(1).lower()}
+    if record_id is not None:
+        id_match = re.search(r'"id"\s*:\s*(\d+)', raw)
+        if id_match:
+            parsed["id"] = int(id_match.group(1))
+        else:
+            parsed["id"] = record_id
+    for key in ("matched_category", "reason"):
+        match = re.search(rf'"{key}"\s*:\s*"((?:[^"\\]|\\.)*)"', raw, re.DOTALL)
+        if match:
+            parsed[key] = (
+                match.group(1)
+                .replace('\\"', '"')
+                .replace("\\n", "\n")
+                .replace("\\t", "\t")
+            )
+    return parsed
+
+
+def _extract_json_object(text: str, *, record_id: int | None = None) -> dict[str, Any]:
+    raw = text.strip()
+    if raw.startswith("```"):
+        raw = re.sub(r"^```(?:json)?\s*", "", raw)
+        raw = re.sub(r"\s*```$", "", raw)
+    try:
+        parsed = json.loads(raw)
+        if isinstance(parsed, dict):
+            return parsed
+    except json.JSONDecodeError:
+        pass
+    match = re.search(r"\{[\s\S]*\}", raw)
+    if not match:
+        raise HotContentFlowError("llm output is not json object")
+    block = match.group(0)
+    try:
+        parsed = json.loads(block)
+    except json.JSONDecodeError as exc:
+        fallback = _extract_filter_fields_fallback(block, record_id=record_id)
+        if fallback:
+            return fallback
+        raise HotContentFlowError(f"llm output invalid json: {exc}") from exc
+    if not isinstance(parsed, dict):
+        raise HotContentFlowError("llm output is not json object")
+    return parsed
+
+
+def _normalize_record_filter_result(
+    parsed: dict[str, Any],
+    *,
+    record_id: int,
+    category_list: list[str],
+) -> dict[str, Any]:
+    category_set = set(category_list)
+    action = str(parsed.get("action") or "").strip().lower()
+    if action not in {"keep", "remove"}:
+        action = "keep" if str(parsed.get("matched_category") or "").strip() else "remove"
+
+    matched_category = str(parsed.get("matched_category") or "").strip()
+    if action == "keep":
+        if matched_category not in category_set:
+            action = "remove"
+            matched_category = ""
+    else:
+        matched_category = ""
+
+    try:
+        returned_id = int(parsed.get("id"))
+    except (TypeError, ValueError):
+        returned_id = record_id
+
+    passed = action == "keep"
+    return {
+        "id": returned_id if returned_id == record_id else record_id,
+        "action": action,
+        "passed": passed,
+        "matched_category": matched_category or None,
+        "reason": str(parsed.get("reason") or "").strip(),
+    }
+
+
+def _normalize_filter_result(
+    parsed: dict[str, Any],
+    *,
+    filter_text: str,
+    category_list: list[str],
+) -> dict[str, Any]:
+    category_set = set(category_list)
+    action = str(parsed.get("action") or "").strip().lower()
+    if action not in {"keep", "remove"}:
+        action = "keep" if str(parsed.get("matched_category") or "").strip() else "remove"
+
+    matched_category = str(parsed.get("matched_category") or "").strip()
+    if action == "keep":
+        if matched_category not in category_set:
+            action = "remove"
+            matched_category = ""
+    else:
+        matched_category = ""
+
+    return {
+        "filter_text": filter_text,
+        "action": action,
+        "matched_category": matched_category or None,
+        "reason": str(parsed.get("reason") or "").strip(),
+    }
+
+
+def build_article_filter_text(
+    *,
+    hot_title: str,
+    content_title: str,
+    body_text: str,
+    body_max_chars: int = 2000,
+) -> str:
+    body = str(body_text or "").strip()
+    if body_max_chars > 0 and len(body) > body_max_chars:
+        body = body[:body_max_chars]
+    return (
+        f"热榜标题:{str(hot_title or '').strip()}\n"
+        f"文章标题:{str(content_title or '').strip()}\n"
+        f"文章正文:{body}"
+    )
+
+
+def llm_filter_record_content(
+    *,
+    record_id: int,
+    hot_title: str,
+    content_title: str,
+    body_text: str,
+    category_list: list[str],
+    model: str,
+    max_attempts: int,
+    retry_sleep_seconds: float,
+    max_tokens: int,
+    body_max_chars: int = 2000,
+) -> dict[str, Any]:
+    """对已入库记录做分类筛选;LLM 输入含正文,输出仅含 id/action/matched_category/reason。"""
+    filter_text = build_article_filter_text(
+        hot_title=hot_title,
+        content_title=content_title,
+        body_text=body_text,
+        body_max_chars=body_max_chars,
+    )
+    user_payload = {
+        "record_id": record_id,
+        "filter_text": filter_text,
+        "category_list": category_list,
+        "output_schema": {
+            "id": "number, must equal record_id",
+            "action": "keep | remove",
+            "matched_category": (
+                "string, required when action=keep, must be selected from category_list"
+            ),
+            "reason": "string, brief Chinese explanation without double quotes",
+        },
+        "constraints": [
+            "仅输出 id、action、matched_category、reason",
+            "不要输出 filter_text、标题或正文",
+            "reason 不要使用英文双引号",
+        ],
+    }
+
+    last_error: Exception | None = None
+    for attempt in range(1, max(max_attempts, 1) + 1):
+        try:
+            resp = create_chat_completion(
+                [
+                    {"role": "system", "content": ARTICLE_SYSTEM_PROMPT},
+                    {
+                        "role": "user",
+                        "content": json.dumps(user_payload, ensure_ascii=False),
+                    },
+                ],
+                model=model or None,
+                temperature=0,
+                max_tokens=max(max_tokens, 1),
+            )
+            parsed = _extract_json_object(
+                str(resp.get("content") or ""),
+                record_id=record_id,
+            )
+            result = _normalize_record_filter_result(
+                parsed,
+                record_id=record_id,
+                category_list=category_list,
+            )
+            usage = resp.get("usage")
+            if isinstance(usage, dict):
+                result["usage"] = dict(usage)
+            return result
+        except (OpenRouterCallError, HotContentFlowError) as exc:
+            last_error = exc
+            if attempt < max(max_attempts, 1):
+                time.sleep(max(retry_sleep_seconds, 0))
+
+    return {
+        "id": record_id,
+        "action": "remove",
+        "passed": False,
+        "matched_category": None,
+        "reason": f"LLM 调用失败,默认移除: {last_error}",
+        "error": str(last_error),
+    }
+
+
+def llm_filter_text(
+    *,
+    filter_text: str,
+    category_list: list[str],
+    source: str,
+    system_prompt: str,
+    model: str,
+    max_attempts: int,
+    retry_sleep_seconds: float,
+    max_tokens: int,
+) -> dict[str, Any]:
+    user_payload = {
+        "source": source,
+        "filter_text": filter_text,
+        "category_list": category_list,
+        "output_schema": {
+            "action": "keep | remove",
+            "matched_category": (
+                "string, required when action=keep, must be selected from category_list"
+            ),
+            "reason": "string, brief Chinese explanation without double quotes",
+        },
+        "constraints": [
+            "仅输出 action、matched_category、reason,不要输出 filter_text",
+            "reason 不要使用英文双引号",
+        ],
+    }
+
+    last_error: Exception | None = None
+    for attempt in range(1, max(max_attempts, 1) + 1):
+        try:
+            resp = create_chat_completion(
+                [
+                    {"role": "system", "content": system_prompt},
+                    {
+                        "role": "user",
+                        "content": json.dumps(user_payload, ensure_ascii=False),
+                    },
+                ],
+                model=model or None,
+                temperature=0,
+                max_tokens=max(max_tokens, 1),
+            )
+            parsed = _extract_json_object(str(resp.get("content") or ""))
+            result = _normalize_filter_result(
+                parsed,
+                filter_text=filter_text,
+                category_list=category_list,
+            )
+            usage = resp.get("usage")
+            if isinstance(usage, dict):
+                result["usage"] = dict(usage)
+            return result
+        except (OpenRouterCallError, HotContentFlowError) as exc:
+            last_error = exc
+            if attempt < max(max_attempts, 1):
+                time.sleep(max(retry_sleep_seconds, 0))
+
+    return {
+        "filter_text": filter_text,
+        "action": "remove",
+        "matched_category": None,
+        "reason": f"LLM 调用失败,默认移除: {last_error}",
+        "error": str(last_error),
+    }
+
+
+def llm_filter_article_content(
+    *,
+    record_id: int,
+    hot_title: str,
+    content_title: str,
+    body_text: str,
+    category_list: list[str],
+    model: str,
+    max_attempts: int,
+    retry_sleep_seconds: float,
+    max_tokens: int,
+    body_max_chars: int = 2000,
+) -> dict[str, Any]:
+    return llm_filter_record_content(
+        record_id=record_id,
+        hot_title=hot_title,
+        content_title=content_title,
+        body_text=body_text,
+        category_list=category_list,
+        model=model,
+        max_attempts=max_attempts,
+        retry_sleep_seconds=retry_sleep_seconds,
+        max_tokens=max_tokens,
+        body_max_chars=body_max_chars,
+    )

+ 43 - 0
app/hot_content/config.py

@@ -8,6 +8,7 @@ from pathlib import Path
 from typing import Any
 
 from app.core.config import PROJECT_ROOT, settings
+from app.hot_content.category_filter import DEFAULT_ELDERLY_CATEGORY_LIST
 from app.hot_content.exceptions import HotContentFlowError
 from app.hot_content.types import FlowConfig, HotSourceConfig, MysqlConfig
 
@@ -114,6 +115,23 @@ def _parse_cron_hours(value: str) -> str:
     return ",".join(normalized)
 
 
+def _load_category_filter_categories() -> list[str]:
+    raw = _load_json_from_env_or_file(
+        "CATEGORY_FILTER_CATEGORIES_JSON",
+        "CATEGORY_FILTER_CATEGORIES_FILE",
+    )
+    if raw is None:
+        return list(DEFAULT_ELDERLY_CATEGORY_LIST)
+    if not isinstance(raw, list):
+        raise HotContentFlowError(
+            "CATEGORY_FILTER_CATEGORIES_JSON/CATEGORY_FILTER_CATEGORIES_FILE must be a list"
+        )
+    categories = [str(item).strip() for item in raw if str(item).strip()]
+    if not categories:
+        raise HotContentFlowError("category filter categories cannot be empty")
+    return categories
+
+
 def load_flow_config(interval_override: int | None = None) -> FlowConfig:
     crawapi_base_url = _get_env("CRAWAPI_BASE_URL", settings.crawapi_base_url).rstrip("/")
     hot_rank_path = _get_env(
@@ -260,6 +278,31 @@ def load_flow_config(interval_override: int | None = None) -> FlowConfig:
             "DEMAND_QUALITY_LLM_MAX_TOKENS",
             settings.demand_quality_llm_max_tokens,
         ),
+        category_filter_llm_model=_get_env(
+            "CATEGORY_FILTER_LLM_MODEL",
+            settings.category_filter_llm_model,
+        ),
+        category_filter_llm_max_attempts=_get_env_int(
+            "CATEGORY_FILTER_LLM_MAX_ATTEMPTS",
+            settings.category_filter_llm_max_attempts,
+        ),
+        category_filter_llm_retry_sleep_seconds=_get_env_float(
+            "CATEGORY_FILTER_LLM_RETRY_SLEEP_SECONDS",
+            settings.category_filter_llm_retry_sleep_seconds,
+        ),
+        category_filter_llm_max_tokens=_get_env_int(
+            "CATEGORY_FILTER_LLM_MAX_TOKENS",
+            settings.category_filter_llm_max_tokens,
+        ),
+        category_filter_body_max_chars=_get_env_int(
+            "CATEGORY_FILTER_BODY_MAX_CHARS",
+            settings.category_filter_body_max_chars,
+        ),
+        category_filter_item_sleep_seconds=_get_env_float(
+            "CATEGORY_FILTER_ITEM_SLEEP_SECONDS",
+            settings.category_filter_item_sleep_seconds,
+        ),
+        category_filter_categories=_load_category_filter_categories(),
         sources=_load_sources(),
         mysql=MysqlConfig(
             host=_get_env("MYSQL_HOST", settings.mysql_host),

+ 25 - 3
app/hot_content/demand_hive_export.py

@@ -14,6 +14,7 @@ from app.hot_content.demand_export import (
 from app.hot_content.demand_quality import (
     attach_quality_scores_to_export_rows,
     build_feature_combo_text,
+    build_matched_element_texts,
     quality_passed,
 )
 
@@ -98,12 +99,12 @@ def _export_row_passes_quality(
             senior_threshold=senior_threshold,
         )
     if item_type == ITEM_TYPE_ELEMENT and _has_matched_demand(row):
-        feature_combo = build_feature_combo_text(export_rows)
-        if not feature_combo:
+        element_text = str(row.get("item_text") or "").strip()
+        if not element_text:
             return False
         return quality_passed(
             demand_type=TYPE_FEATURE_POINT,
-            demand_text=feature_combo,
+            demand_text=element_text,
             event_sense_json=event_sense_json,
             senior_fit_json=senior_fit_json,
             event_threshold=event_threshold,
@@ -193,6 +194,7 @@ def build_hive_rows_for_record(
 
     weight = _record_wxindex_score(export_rows) / WEIGHT_DIVISOR
     feature_combo = build_feature_combo_text(export_rows)
+    element_texts = build_matched_element_texts(export_rows)
     phrase_texts = _dedupe_texts(
         [
             str(row.get("item_text") or "").strip()
@@ -230,6 +232,26 @@ def build_hive_rows_for_record(
             )
         )
 
+    for element_text in element_texts:
+        if quality_passed(
+            demand_type=TYPE_FEATURE_POINT,
+            demand_text=element_text,
+            event_sense_json=event_sense_json,
+            senior_fit_json=senior_fit_json,
+            event_threshold=event_threshold,
+            senior_threshold=senior_threshold,
+        ):
+            hive_rows.append(
+                _build_hive_row(
+                    record_id=record_id,
+                    strategy=strategy,
+                    demand_name=element_text,
+                    weight=weight,
+                    demand_type=TYPE_FEATURE_POINT,
+                    partition_dt=partition_dt,
+                )
+            )
+
     for phrase_text in phrase_texts:
         hive_rows.append(
             _build_hive_row(

+ 126 - 44
app/hot_content/demand_quality.py

@@ -1,10 +1,10 @@
 """需求质量判断:事件性、老年性 LLM 评分。
 
 流程(串行两次 LLM,评分阶段互不截断):
-1. 微信指数达标后,构建待评需求(特征点组合 + 有匹配的短语)
-2. 对全部待评需求执行事件性 LLM 评分
+1. 微信指数达标后,构建待评需求(特征点组合 + 单个有匹配元素 + 有匹配的短语)
+2. 对全部待评需求执行事件性 LLM 评分(当前临时下线)
 3. 对同一批全部待评需求执行老年性 LLM 评分
-4. 导出表 / ODPS 写入时再过滤:标题保留(微信指数 + 灵感/目的点匹配)+ 事件性、老年性达标
+4. 导出表 / ODPS 写入时再过滤:标题保留(微信指数 + 灵感/目的点匹配)+ 老年性达标(事件性当前临时下线)
 """
 
 from __future__ import annotations
@@ -91,8 +91,8 @@ def _candidate_key(demand_type: str, demand_text: str) -> tuple[str, str]:
     return demand_type.strip(), _normalize_demand_key(demand_text)
 
 
-def build_feature_combo_text(export_rows: list[dict[str, Any]]) -> str:
-    element_texts = _dedupe_texts(
+def build_matched_element_texts(export_rows: list[dict[str, Any]]) -> list[str]:
+    return _dedupe_texts(
         [
             str(row.get("item_text") or "").strip()
             for row in export_rows
@@ -100,7 +100,30 @@ def build_feature_combo_text(export_rows: list[dict[str, Any]]) -> str:
             and _has_matched_demand(row)
         ]
     )
-    return " ".join(element_texts)
+
+
+def build_feature_combo_text(export_rows: list[dict[str, Any]]) -> str:
+    return " ".join(build_matched_element_texts(export_rows))
+
+
+def _append_feature_point_candidate(
+    candidates: list[dict[str, str]],
+    seen: set[tuple[str, str]],
+    demand_text: str,
+) -> None:
+    text = str(demand_text or "").strip()
+    if not text:
+        return
+    key = _candidate_key(TYPE_FEATURE_POINT, text)
+    if key in seen:
+        return
+    seen.add(key)
+    candidates.append(
+        {
+            "demand_type": TYPE_FEATURE_POINT,
+            "demand_text": text,
+        }
+    )
 
 
 def build_quality_candidates(
@@ -108,7 +131,7 @@ def build_quality_candidates(
     *,
     wxindex_threshold: float,
 ) -> list[dict[str, str]]:
-    """微信指数达标时,构建特征点组合与短语两类待评需求。"""
+    """微信指数达标时,构建特征点组合、单个元素与短语三类待评需求。"""
     if not passes_wxindex_gate(export_rows, wxindex_threshold=wxindex_threshold):
         return []
 
@@ -116,16 +139,10 @@ def build_quality_candidates(
     seen: set[tuple[str, str]] = set()
 
     feature_combo = build_feature_combo_text(export_rows)
-    if feature_combo:
-        key = _candidate_key(TYPE_FEATURE_POINT, feature_combo)
-        if key not in seen:
-            seen.add(key)
-            candidates.append(
-                {
-                    "demand_type": TYPE_FEATURE_POINT,
-                    "demand_text": feature_combo,
-                }
-            )
+    _append_feature_point_candidate(candidates, seen, feature_combo)
+
+    for element_text in build_matched_element_texts(export_rows):
+        _append_feature_point_candidate(candidates, seen, element_text)
 
     for row in export_rows:
         if str(row.get("item_type") or "") != ITEM_TYPE_PHRASE:
@@ -208,9 +225,13 @@ def quality_passed(
         event_sense_json=event_sense_json,
         senior_fit_json=senior_fit_json,
     )
-    if event_score is None or senior_score is None:
+    # TODO: 事件性判断临时下线,恢复时取消下方注释并删除老年性单判逻辑
+    # if event_score is None or senior_score is None:
+    #     return False
+    # return event_score >= event_threshold and senior_score >= senior_threshold
+    if senior_score is None:
         return False
-    return event_score >= event_threshold and senior_score >= senior_threshold
+    return senior_score >= senior_threshold
 
 
 def attach_quality_scores_to_export_rows(
@@ -219,15 +240,14 @@ def attach_quality_scores_to_export_rows(
     event_sense_json: dict[str, Any] | None,
     senior_fit_json: dict[str, Any] | None,
 ) -> list[dict[str, Any]]:
-    feature_combo = build_feature_combo_text(export_rows)
     rows: list[dict[str, Any]] = []
     for row in export_rows:
         item_type = str(row.get("item_type") or "")
         item_text = str(row.get("item_text") or "").strip()
-        if item_type == ITEM_TYPE_ELEMENT and feature_combo:
+        if item_type == ITEM_TYPE_ELEMENT and item_text and _has_matched_demand(row):
             event_score, senior_score = lookup_quality_scores(
                 demand_type=TYPE_FEATURE_POINT,
-                demand_text=feature_combo,
+                demand_text=item_text,
                 event_sense_json=event_sense_json,
                 senior_fit_json=senior_fit_json,
             )
@@ -454,24 +474,78 @@ def llm_score_senior_fit(
 
 ---
 
-# 二、适老品类白名单(命中即有基础分)
+# 二、高优品类强制评分白名单(最低分不得低于7分)
+
+以下品类为业务侧已确认的高优内容,评分最低保底7分。
+原黑名单中的"博弈策略""认知门槛高""泛人群"等扣分信号对本表品类完全失效。
+
+## 满分区间(9-10分)
+
+- 健康养生·老年健康(含老年专属标注时满分)
+- 生活技巧·安全防护·反诈防骗
+- 公共管理·医疗卫生·医保报销
+- 公共管理·补贴福利·老年群体补贴
+- 爱国情感·民族情感
+- 政治事件·领袖纪念
+- 民生政策·惠民政策(明确老年受益时满分,泛人群降为8)
+
+## 高分区间(8-9分)
+
+- 公共管理·治理监督·反腐
+- 国家实力·国际地位
+- 政策制度·国家统一
+- 外交事件·外交访问
+- 处世智慧·生存策略·经验总结
+- 处世智慧·生存策略·生活指导
+
+## 中高分区间(7-8分)
+
+- 处世智慧·价值取向·处世哲学
+- 民生政策·免费福利政策
+- 公共管理·补贴福利·生活服务补贴
+- 文化概念·文化传承
+- 时政评议·社会评议·社会公正
+- 军事谋略·战略运筹·战略方案
+- 社会问题·国家安全事件
+- 国际政治·外交立场
+- 国际政治·双边关系(中美关系等)
+- 时政评议·国际关系·中美关系
+- 社会问题·国际问题·两岸议题
+- 地标景观·交通枢纽
+- 花卉风格·高饱和花卉
+
+## 保底7分(区间锁定,不上浮)
+
+- 外交事件·博弈手段
+- 政治运作·政治博弈
+- 军事安全·能源安全
+- 公共管理·政策法规·行业规则
+- 社会问题·经济形势·农村农业
+- 民生生活·教育议题
+
+---
+
+# 三、高优品类内部评分细则
+
+## 细则1:叙事方式决定区间上限
 
-以下品类的词组,具备"适老性基础系数",可在此基础上评估具体程度:
+同一高优品类,叙事方式决定最终得分位置
 
-- 国家力量/民族自豪:阅兵、基建、外交胜利、撤侨、领土主权、中国强大
-- 健康养生:三高管理、心脑血管、养生食疗、长寿、防病(严格排除减肥/塑形/医美/脱发)
-- 防骗安全:电信诈骗、保健品骗局、新型骗局案例(极高优先级)
-- 惠民政策:养老金、医保报销、现金支付保障、物价、天气预警
-- 怀旧时光:70年代及以前老照片、经典老歌、老电影、童年记忆
-- 家庭亲情:隔辈亲、孝道、家庭互助(排除婆媳恶斗/剧烈伦理冲突)
-- 传统文化:节气、民俗、戏曲、国学、非遗
-- 正能量:见义勇为、拾金不昧、平凡善举、反腐倡廉
-- 人文科普:文化/历史/人文/健康等社科知识(非科技/自然猎奇)
-- 自然惊奇:自然奇观、动物趣闻(排除血腥/恐怖/猎奇阴暗)
+- 叙事/成就型(XX圆满完成、我国XX取得胜利)→ 取区间上限
+- 评议/立场型(分析XX走向、表达XX立场)→ 取区间中位
+- 策略/博弈型(对抗手段、如何反制)→ 取区间下限(最低7分)
+
+## 细则2:老年专属加成
+
+品类内容含"老年""中老年""50岁以上"等明确专属信号 → 区间内+1分(不超过10分)
+
+## 细则3:焦虑化叙事微降(高优品类内仍适用)
+
+使用"危机""崩溃""末日""警惕"等词渲染负面不确定性 → 区间内-1分(不低于7分保底)
 
 ---
 
-# 三、低分黑名单信号
+# 、低分黑名单信号
 
 ## 强制低分信号(命中任一,评分不超过3)
 
@@ -487,18 +561,20 @@ def llm_score_senior_fit(
 
 ## 中度扣分信号(命中使评分下浮1-2分)
 
-- 内容偏泛人群,缺乏老年专属场景
-- 认知门槛较高,需要背景知识才能理解
+- 内容偏泛人群,缺乏老年专属场景(如免费福利政策未明确针对老年人)
+- 认知门槛较高,需要背景知识才能理解(如军事专业术语密集)
 - 表达方式年轻化,但内容本身不排斥老年人
+- **焦虑化叙事**:即使话题本身适老,若使用"危机""崩溃""警惕""崩盘"等词渲染不确定性,触发此扣分信号——老年用户偏好"确定性"叙事,排斥焦虑化包装
+- 农村农业/教育议题:属泛人群内容,老年专属性弱,基础降至2-4分
 
 ---
 
-# 、评分标准(0-10)
+# 、评分标准(0-10)
 
-- 9-10:高度契合中老年用户核心关注点、典型生活场景或强情感诉求
-- 7-8:对中老年用户有较强吸引力或实用价值,场景清晰
-- 5-6:有一定相关性,但中老年专属属性一般,泛人群居多
-- 3-4:偏年轻群体或泛人群,老年性弱,中老年用户兴趣低
+- 9-10:高度契合中老年用户核心关注点、老年专属场景或强情感诉求(防骗、老年健康、养老金、民族自豪)
+- 7-8:对中老年用户有较强吸引力或实用价值,场景清晰(处世智慧、传统文化、家庭亲情、叙事型时政)
+- 5-6:有一定相关性,但中老年专属属性一般,泛人群居多(评议型时政、泛人群惠民、文化传承)
+- 3-4:偏年轻群体或认知门槛偏高,老年性弱
 - 1-2:明显面向年轻群体,中老年用户几乎不感兴趣
 - 0:与中老年用户完全无关,或存在强烈排斥信号
 
@@ -566,8 +642,14 @@ def run_demand_quality_pipeline(
         "max_tokens": max_tokens,
     }
 
-    event_sense_json = llm_score_event_sense(**llm_kwargs)
-    event_sense_json["threshold"] = event_threshold
+    # TODO: 事件性判断临时下线,恢复时取消下方注释并删除 stub 返回
+    # event_sense_json = llm_score_event_sense(**llm_kwargs)
+    # event_sense_json["threshold"] = event_threshold
+    event_sense_json = {
+        "source": channel_content_id,
+        "items": [],
+        "threshold": event_threshold,
+    }
 
     senior_fit_json = llm_score_senior_fit(**llm_kwargs)
     senior_fit_json["threshold"] = senior_threshold

+ 167 - 0
app/hot_content/repository.py

@@ -169,6 +169,129 @@ class HotContentRepository:
                 ),
             )
 
+    def mark_no_valid_content(
+        self,
+        *,
+        record_id: int,
+        reason: str,
+    ) -> None:
+        """搜不到文章或缺标题/正文:仅更新 execution_status + error_reason,不动分类字段。"""
+        sql = """
+            UPDATE hot_content_records
+            SET execution_status=%s,
+                error_reason=%s,
+                updated_at=NOW()
+            WHERE id=%s
+        """
+        with self.conn.cursor() as cursor:
+            cursor.execute(
+                sql,
+                (
+                    ExecutionStatus.NO_VALID_CONTENT,
+                    str(reason or "no valid content").strip(),
+                    record_id,
+                ),
+            )
+
+    def update_category_filter_result(
+        self,
+        *,
+        record_id: int,
+        passed: bool,
+        result_json: dict[str, Any],
+    ) -> None:
+        self._ensure_category_filter_columns()
+        status = (
+            ExecutionStatus.CATEGORY_FILTER_PASSED
+            if passed
+            else ExecutionStatus.CATEGORY_FILTER_REJECTED
+        )
+        reason = str(result_json.get("reason") or "").strip()
+        error_message = None if passed else (reason or "category filter rejected")
+        sql = """
+            UPDATE hot_content_records
+            SET execution_status=%s,
+                category_filter_passed=%s,
+                category_filter_reason=%s,
+                category_filter_json=%s,
+                error_reason=%s,
+                updated_at=NOW()
+            WHERE id=%s
+        """
+        with self.conn.cursor() as cursor:
+            cursor.execute(
+                sql,
+                (
+                    status,
+                    1 if passed else 0,
+                    reason or None,
+                    _json_dumps(result_json),
+                    error_message,
+                    record_id,
+                ),
+            )
+
+    def get_record_for_category_filter(self, record_id: int) -> dict[str, Any] | None:
+        self._ensure_category_filter_columns()
+        sql = """
+            SELECT
+                id,
+                source,
+                title,
+                article_title,
+                article_body,
+                article_url,
+                execution_status,
+                category_filter_passed,
+                category_filter_reason,
+                category_filter_json
+            FROM hot_content_records
+            WHERE id = %s
+            LIMIT 1
+        """
+        with self.conn.cursor() as cursor:
+            cursor.execute(sql, (record_id,))
+            row = cursor.fetchone()
+        if not row:
+            return None
+        category_filter_json = _json_loads(row.get("category_filter_json"))
+        passed_raw = row.get("category_filter_passed")
+        passed: bool | None
+        if passed_raw is None:
+            passed = None
+        else:
+            passed = bool(int(passed_raw))
+        return {
+            "id": int(row["id"]),
+            "source": str(row.get("source") or ""),
+            "title": str(row.get("title") or ""),
+            "article_title": str(row.get("article_title") or ""),
+            "article_body": str(row.get("article_body") or ""),
+            "article_url": str(row.get("article_url") or ""),
+            "execution_status": int(row.get("execution_status") or 0),
+            "category_filter_passed": passed,
+            "category_filter_reason": str(row.get("category_filter_reason") or ""),
+            "category_filter_json": (
+                category_filter_json if isinstance(category_filter_json, dict) else {}
+            ),
+        }
+
+    def get_category_filter_status(self, record_id: int) -> dict[str, Any] | None:
+        record = self.get_record_for_category_filter(record_id)
+        if not record:
+            return None
+        matched_category = None
+        category_filter_json = record.get("category_filter_json") or {}
+        if isinstance(category_filter_json, dict):
+            matched_category = category_filter_json.get("matched_category")
+        return {
+            "id": record["id"],
+            "passed": record["category_filter_passed"],
+            "reason": record["category_filter_reason"],
+            "matched_category": matched_category,
+            "execution_status": record["execution_status"],
+        }
+
     def update_decode_result(
         self,
         *,
@@ -815,6 +938,50 @@ class HotContentRepository:
                 if int((cursor.fetchone() or {}).get("cnt") or 0) == 0:
                     cursor.execute(alter_sql)
 
+    def _ensure_category_filter_columns(self) -> None:
+        with self.conn.cursor() as cursor:
+            for column_name, alter_sql in (
+                (
+                    "category_filter_json",
+                    """
+                    ALTER TABLE hot_content_records
+                    ADD COLUMN category_filter_json JSON NULL
+                    COMMENT '老年人兴趣分类筛选 LLM 结果'
+                    AFTER article_url
+                    """,
+                ),
+                (
+                    "category_filter_passed",
+                    """
+                    ALTER TABLE hot_content_records
+                    ADD COLUMN category_filter_passed TINYINT NULL
+                    COMMENT '分类筛选是否通过:1通过 0不通过 NULL未筛选'
+                    AFTER category_filter_json
+                    """,
+                ),
+                (
+                    "category_filter_reason",
+                    """
+                    ALTER TABLE hot_content_records
+                    ADD COLUMN category_filter_reason TEXT NULL
+                    COMMENT '分类筛选原因'
+                    AFTER category_filter_passed
+                    """,
+                ),
+            ):
+                cursor.execute(
+                    """
+                    SELECT COUNT(*) AS cnt
+                    FROM information_schema.COLUMNS
+                    WHERE TABLE_SCHEMA = DATABASE()
+                      AND TABLE_NAME = 'hot_content_records'
+                      AND COLUMN_NAME = %s
+                    """,
+                    (column_name,),
+                )
+                if int((cursor.fetchone() or {}).get("cnt") or 0) == 0:
+                    cursor.execute(alter_sql)
+
     def _ensure_demand_export_column(
         self,
         cursor: Any,

+ 119 - 4
app/hot_content/service.py

@@ -6,6 +6,7 @@ from datetime import datetime
 import time
 from typing import Any
 
+from app.hot_content.category_filter import llm_filter_record_content
 from app.hot_content.client import (
     JsonApiClient,
     build_url,
@@ -41,6 +42,7 @@ class HotContentFlowService:
     def run(self) -> dict[str, Any]:
         hot_titles = self.fetch_and_save_hot_titles()
         selected_contents = self.search_and_save_contents(hot_titles)
+        selected_contents = self.filter_contents_by_category(selected_contents)
         decode_resp = self.submit_decode_tasks(selected_contents)
         return self.build_summary(hot_titles, selected_contents, decode_resp)
 
@@ -52,7 +54,7 @@ class HotContentFlowService:
 
         for source_config in self.config.sources:
             rank_items = extract_rank_items(resp, source_config.source)
-            for rank_item in rank_items[: source_config.count]:
+            for rank_item in rank_items:
                 title = str(rank_item.get("title") or "").strip()
                 if not title:
                     continue
@@ -92,10 +94,19 @@ class HotContentFlowService:
                 "has_contribution_points"
             ):
                 continue
+            if execution_status == ExecutionStatus.CATEGORY_FILTER_REJECTED:
+                continue
 
             existing_body = str(hot.get("article_body") or "").strip()
             existing_title = str(hot.get("article_title") or "").strip()
-            if execution_status == ExecutionStatus.CONTENT_OK and existing_title and existing_body:
+            if (
+                execution_status in {
+                    ExecutionStatus.CONTENT_OK,
+                    ExecutionStatus.CATEGORY_FILTER_PASSED,
+                }
+                and existing_title
+                and existing_body
+            ):
                 selected_contents.append(
                     {
                         "record_id": hot["id"],
@@ -105,6 +116,9 @@ class HotContentFlowService:
                         "content_title": existing_title,
                         "body_text": existing_body,
                         "url": str(hot.get("article_url") or ""),
+                        "category_filter_passed": (
+                            execution_status == ExecutionStatus.CATEGORY_FILTER_PASSED
+                        ),
                     }
                 )
                 continue
@@ -125,9 +139,9 @@ class HotContentFlowService:
                     resp = self.api_client.post_json(keyword_url, payload)
                     selected = pick_first_valid_content(extract_keyword_items(resp))
                     if selected is None:
-                        self.repository.update_status(
+                        self.repository.mark_no_valid_content(
                             record_id=hot["id"],
-                            status=ExecutionStatus.NO_VALID_CONTENT,
+                            reason="no valid content",
                         )
                         selected_contents.append(
                             {
@@ -176,6 +190,101 @@ class HotContentFlowService:
                     )
         return selected_contents
 
+    def filter_contents_by_category(
+        self,
+        selected_contents: list[dict[str, Any]],
+    ) -> list[dict[str, Any]]:
+        filtered: list[dict[str, Any]] = []
+        category_list = list(self.config.category_filter_categories)
+        total = sum(1 for item in selected_contents if item.get("status") == "ok")
+        processed = 0
+
+        for item in selected_contents:
+            if item.get("status") != "ok":
+                filtered.append(item)
+                continue
+            if item.get("category_filter_passed"):
+                filtered.append(item)
+                continue
+
+            record_id = int(item["record_id"])
+            record = self.repository.get_record_for_category_filter(record_id)
+            if not record:
+                self.repository.mark_no_valid_content(
+                    record_id=record_id,
+                    reason="record not found",
+                )
+                filtered.append(
+                    {
+                        "record_id": record_id,
+                        "unique_key": item.get("unique_key"),
+                        "status": "no_valid_content",
+                        "reason": "record not found",
+                    }
+                )
+                continue
+            hot_title = str(record.get("title") or "").strip()
+            content_title = str(record.get("article_title") or "").strip()
+            body_text = str(record.get("article_body") or "").strip()
+            if not hot_title or not content_title or not body_text:
+                self.repository.mark_no_valid_content(
+                    record_id=record_id,
+                    reason="missing title or body",
+                )
+                filtered.append(
+                    {
+                        "record_id": record_id,
+                        "unique_key": item.get("unique_key"),
+                        "status": "no_valid_content",
+                        "reason": "missing title or body",
+                    }
+                )
+                continue
+
+            processed += 1
+            result = llm_filter_record_content(
+                record_id=record_id,
+                hot_title=hot_title,
+                content_title=content_title,
+                body_text=body_text,
+                category_list=category_list,
+                model=self.config.category_filter_llm_model,
+                max_attempts=self.config.category_filter_llm_max_attempts,
+                retry_sleep_seconds=self.config.category_filter_llm_retry_sleep_seconds,
+                max_tokens=self.config.category_filter_llm_max_tokens,
+                body_max_chars=self.config.category_filter_body_max_chars,
+            )
+            passed = bool(result.get("passed"))
+            self.repository.update_category_filter_result(
+                record_id=record_id,
+                passed=passed,
+                result_json=result,
+            )
+            if passed:
+                filtered.append(
+                    {
+                        **item,
+                        "category_filter_passed": True,
+                        "matched_category": result.get("matched_category"),
+                    }
+                )
+            else:
+                filtered.append(
+                    {
+                        "record_id": record_id,
+                        "unique_key": item.get("unique_key"),
+                        "status": "category_rejected",
+                    }
+                )
+
+            if (
+                processed < total
+                and self.config.category_filter_item_sleep_seconds > 0
+            ):
+                time.sleep(self.config.category_filter_item_sleep_seconds)
+
+        return filtered
+
     def submit_decode_tasks(self, selected_contents: list[dict[str, Any]]) -> dict[str, Any]:
         decode_items: list[dict[str, Any]] = []
         request_count = 0
@@ -257,6 +366,12 @@ class HotContentFlowService:
             "selected_failed_count": sum(
                 1 for item in selected_contents if item.get("status") != "ok"
             ),
+            "category_rejected_count": sum(
+                1 for item in selected_contents if item.get("status") == "category_rejected"
+            ),
+            "no_valid_content_count": sum(
+                1 for item in selected_contents if item.get("status") == "no_valid_content"
+            ),
             "decode_post_count": sum(1 for item in selected_contents if item.get("status") == "ok"),
             "decode_api_code": decode_resp.get("code") if isinstance(decode_resp, dict) else None,
             "decode_success_count": sum(

+ 5 - 0
app/hot_content/status.py

@@ -5,8 +5,13 @@ from __future__ import annotations
 
 class ExecutionStatus:
     HOT_SAVED = 10
+    # 搜不到文章、缺标题/正文:未到 LLM 分类,不写 category_filter_* 字段
     NO_VALID_CONTENT = 15
     CONTENT_OK = 20
+    # LLM 分类通过
+    CATEGORY_FILTER_PASSED = 21
+    # LLM 分类不通过(与 NO_VALID_CONTENT 互斥)
+    CATEGORY_FILTER_REJECTED = 22
     CONTENT_REQUEST_FAILED = 25
     DECODE_SUBMITTED = 30
     DECODE_SUCCESS = 40

+ 7 - 0
app/hot_content/types.py

@@ -59,5 +59,12 @@ class FlowConfig:
     demand_quality_llm_max_attempts: int
     demand_quality_llm_retry_sleep_seconds: float
     demand_quality_llm_max_tokens: int
+    category_filter_llm_model: str
+    category_filter_llm_max_attempts: int
+    category_filter_llm_retry_sleep_seconds: float
+    category_filter_llm_max_tokens: int
+    category_filter_body_max_chars: int
+    category_filter_item_sleep_seconds: float
+    category_filter_categories: list[str]
     sources: list[HotSourceConfig]
     mysql: MysqlConfig