Jelajahi Sumber

修改整体流程。增加事件性和老年性判断

xueyiming 3 minggu lalu
induk
melakukan
42ee1e67f8

+ 32 - 2
app/core/config.py

@@ -153,11 +153,17 @@ class Settings:
     contribution_match_llm_max_attempts: int = 3
     contribution_match_llm_retry_sleep_seconds: float = 1.0
     contribution_match_llm_max_tokens: int = 4000
-    wxindex_llm_model: str = ""
+    wxindex_llm_model: str = "anthropic/claude-haiku-4-5"
     wxindex_llm_max_attempts: int = 3
-    wxindex_llm_max_tokens: int = 1200
+    wxindex_llm_max_tokens: int = 4000
     wxindex_api_url: str = "http://crawapi.piaoquantv.com/crawler/wei_xin/wxindex"
     wxindex_lookback_days: int = 7
+    demand_event_sense_threshold: float = 6.0
+    demand_senior_fit_threshold: float = 6.0
+    demand_quality_llm_model: str = "anthropic/claude-haiku-4-5"
+    demand_quality_llm_max_attempts: int = 3
+    demand_quality_llm_retry_sleep_seconds: float = 1.0
+    demand_quality_llm_max_tokens: int = 4000
 
     @classmethod
     def from_env(cls) -> "Settings":
@@ -327,6 +333,30 @@ class Settings:
                 "WXINDEX_LOOKBACK_DAYS",
                 defaults.wxindex_lookback_days,
             ),
+            demand_event_sense_threshold=_env_float(
+                "DEMAND_EVENT_SENSE_THRESHOLD",
+                defaults.demand_event_sense_threshold,
+            ),
+            demand_senior_fit_threshold=_env_float(
+                "DEMAND_SENIOR_FIT_THRESHOLD",
+                defaults.demand_senior_fit_threshold,
+            ),
+            demand_quality_llm_model=_env(
+                "DEMAND_QUALITY_LLM_MODEL",
+                defaults.demand_quality_llm_model,
+            ),
+            demand_quality_llm_max_attempts=_env_int(
+                "DEMAND_QUALITY_LLM_MAX_ATTEMPTS",
+                defaults.demand_quality_llm_max_attempts,
+            ),
+            demand_quality_llm_retry_sleep_seconds=_env_float(
+                "DEMAND_QUALITY_LLM_RETRY_SLEEP_SECONDS",
+                defaults.demand_quality_llm_retry_sleep_seconds,
+            ),
+            demand_quality_llm_max_tokens=_env_int(
+                "DEMAND_QUALITY_LLM_MAX_TOKENS",
+                defaults.demand_quality_llm_max_tokens,
+            ),
         )
 
 

+ 24 - 0
app/hot_content/config.py

@@ -236,6 +236,30 @@ def load_flow_config(interval_override: int | None = None) -> FlowConfig:
             "WXINDEX_LOOKBACK_DAYS",
             settings.wxindex_lookback_days,
         ),
+        demand_event_sense_threshold=_get_env_float(
+            "DEMAND_EVENT_SENSE_THRESHOLD",
+            settings.demand_event_sense_threshold,
+        ),
+        demand_senior_fit_threshold=_get_env_float(
+            "DEMAND_SENIOR_FIT_THRESHOLD",
+            settings.demand_senior_fit_threshold,
+        ),
+        demand_quality_llm_model=_get_env(
+            "DEMAND_QUALITY_LLM_MODEL",
+            settings.demand_quality_llm_model,
+        ),
+        demand_quality_llm_max_attempts=_get_env_int(
+            "DEMAND_QUALITY_LLM_MAX_ATTEMPTS",
+            settings.demand_quality_llm_max_attempts,
+        ),
+        demand_quality_llm_retry_sleep_seconds=_get_env_float(
+            "DEMAND_QUALITY_LLM_RETRY_SLEEP_SECONDS",
+            settings.demand_quality_llm_retry_sleep_seconds,
+        ),
+        demand_quality_llm_max_tokens=_get_env_int(
+            "DEMAND_QUALITY_LLM_MAX_TOKENS",
+            settings.demand_quality_llm_max_tokens,
+        ),
         sources=_load_sources(),
         mysql=MysqlConfig(
             host=_get_env("MYSQL_HOST", settings.mysql_host),

+ 71 - 9
app/hot_content/demand_export.py

@@ -401,6 +401,10 @@ def attach_wxindex_metadata(
     trend_json: dict[str, Any] | None,
     *,
     wxindex_threshold: float = WXINDEX_EXPORT_THRESHOLD,
+    event_sense_json: dict[str, Any] | None = None,
+    senior_fit_json: dict[str, Any] | None = None,
+    event_threshold: float = 0.0,
+    senior_threshold: float = 0.0,
 ) -> list[dict[str, Any]]:
     from app.hot_content.demand_hive_export import is_export_row_as_demand
 
@@ -453,12 +457,54 @@ def attach_wxindex_metadata(
                     normalized_row,
                     gate_rows,
                     wxindex_threshold=wxindex_threshold,
+                    event_sense_json=event_sense_json,
+                    senior_fit_json=senior_fit_json,
+                    event_threshold=event_threshold,
+                    senior_threshold=senior_threshold,
                 ),
             }
         )
     return rows
 
 
+def build_export_rows_from_record(
+    record: dict[str, Any],
+    *,
+    wxindex_threshold: float,
+    event_sense_json: dict[str, Any] | None = None,
+    senior_fit_json: dict[str, Any] | None = None,
+    event_threshold: float = 0.0,
+    senior_threshold: float = 0.0,
+) -> list[dict[str, Any]]:
+    from app.hot_content.demand_quality import attach_quality_scores_to_export_rows
+
+    match_json = record.get("contribution_demand_match_json")
+    if not isinstance(match_json, dict):
+        return []
+    contribution_points = record.get("contribution_points_json")
+    trend_json = record.get("wxindex_trend_json")
+    export_rows = attach_wxindex_metadata(
+        build_demand_export_rows(
+            match_json,
+            contribution_points=(
+                contribution_points if isinstance(contribution_points, dict) else None
+            ),
+            trend_json=trend_json if isinstance(trend_json, dict) else None,
+        ),
+        trend_json if isinstance(trend_json, dict) else None,
+        wxindex_threshold=wxindex_threshold,
+        event_sense_json=event_sense_json,
+        senior_fit_json=senior_fit_json,
+        event_threshold=event_threshold,
+        senior_threshold=senior_threshold,
+    )
+    return attach_quality_scores_to_export_rows(
+        export_rows,
+        event_sense_json=event_sense_json,
+        senior_fit_json=senior_fit_json,
+    )
+
+
 def _json_loads(value: Any) -> Any:
     if value is None:
         return None
@@ -483,7 +529,9 @@ def fetch_export_candidate_records(cursor: Any, limit: int) -> list[dict[str, An
             article_title,
             contribution_points_json,
             contribution_demand_match_json,
-            wxindex_trend_json
+            wxindex_trend_json,
+            demand_event_sense_json,
+            demand_senior_fit_json
         FROM hot_content_records
         WHERE contribution_demand_match_json IS NOT NULL
           AND TRIM(CAST(contribution_demand_match_json AS CHAR)) <> ''
@@ -502,6 +550,8 @@ def export_existing_records(
     dry_run: bool,
     verbose: bool,
     wxindex_threshold: float = WXINDEX_EXPORT_THRESHOLD,
+    event_threshold: float = 0.0,
+    senior_threshold: float = 0.0,
 ) -> dict[str, int]:
     summary = {
         "scanned": 0,
@@ -519,6 +569,8 @@ def export_existing_records(
             match_json = _json_loads(row.get("contribution_demand_match_json"))
             contribution_points = _json_loads(row.get("contribution_points_json"))
             trend_json = _json_loads(row.get("wxindex_trend_json"))
+            event_sense_json = _json_loads(row.get("demand_event_sense_json"))
+            senior_fit_json = _json_loads(row.get("demand_senior_fit_json"))
         except json.JSONDecodeError:
             summary["invalid_json"] += 1
             if verbose:
@@ -529,16 +581,24 @@ def export_existing_records(
             summary["skipped"] += 1
             continue
 
-        export_rows = attach_wxindex_metadata(
-            build_demand_export_rows(
-                match_json,
-                contribution_points=(
-                    contribution_points if isinstance(contribution_points, dict) else None
-                ),
-                trend_json=trend_json if isinstance(trend_json, dict) else None,
+        normalized_record = {
+            "contribution_demand_match_json": match_json,
+            "contribution_points_json": contribution_points,
+            "wxindex_trend_json": trend_json,
+            "demand_event_sense_json": (
+                event_sense_json if isinstance(event_sense_json, dict) else {}
+            ),
+            "demand_senior_fit_json": (
+                senior_fit_json if isinstance(senior_fit_json, dict) else {}
             ),
-            trend_json if isinstance(trend_json, dict) else None,
+        }
+        export_rows = build_export_rows_from_record(
+            normalized_record,
             wxindex_threshold=wxindex_threshold,
+            event_sense_json=normalized_record["demand_event_sense_json"],
+            senior_fit_json=normalized_record["demand_senior_fit_json"],
+            event_threshold=event_threshold,
+            senior_threshold=senior_threshold,
         )
         if not export_rows:
             summary["no_export_rows"] += 1
@@ -617,6 +677,8 @@ def main(argv: list[str] | None = None) -> dict[str, int]:
             dry_run=args.dry_run,
             verbose=args.verbose,
             wxindex_threshold=config.wxindex_score_threshold,
+            event_threshold=config.demand_event_sense_threshold,
+            senior_threshold=config.demand_senior_fit_threshold,
         )
     finally:
         repository.close()

+ 197 - 14
app/hot_content/demand_hive_export.py

@@ -5,7 +5,17 @@ from __future__ import annotations
 import hashlib
 from typing import Any
 
-from app.hot_content.demand_export import ITEM_TYPE_ELEMENT, ITEM_TYPE_PHRASE
+from app.hot_content.demand_export import (
+    ITEM_TYPE_ELEMENT,
+    ITEM_TYPE_PHRASE,
+    attach_wxindex_metadata,
+    build_demand_export_rows,
+)
+from app.hot_content.demand_quality import (
+    attach_quality_scores_to_export_rows,
+    build_feature_combo_text,
+    quality_passed,
+)
 
 TITLE_RETAIN_POINT_CATEGORIES = frozenset({"灵感点", "目的点"})
 TYPE_FEATURE_POINT = "特征点"
@@ -68,16 +78,102 @@ def _has_matched_demand(row: dict[str, Any]) -> bool:
     return bool(str(row.get("matched_demand") or "").strip())
 
 
+def _export_row_passes_quality(
+    row: dict[str, Any],
+    *,
+    export_rows: list[dict[str, Any]],
+    event_sense_json: dict[str, Any] | None,
+    senior_fit_json: dict[str, Any] | None,
+    event_threshold: float,
+    senior_threshold: float,
+) -> bool:
+    item_type = str(row.get("item_type") or "")
+    if item_type == ITEM_TYPE_PHRASE:
+        return quality_passed(
+            demand_type=TYPE_PHRASE,
+            demand_text=str(row.get("item_text") or "").strip(),
+            event_sense_json=event_sense_json,
+            senior_fit_json=senior_fit_json,
+            event_threshold=event_threshold,
+            senior_threshold=senior_threshold,
+        )
+    if item_type == ITEM_TYPE_ELEMENT and _has_matched_demand(row):
+        feature_combo = build_feature_combo_text(export_rows)
+        if not feature_combo:
+            return False
+        return quality_passed(
+            demand_type=TYPE_FEATURE_POINT,
+            demand_text=feature_combo,
+            event_sense_json=event_sense_json,
+            senior_fit_json=senior_fit_json,
+            event_threshold=event_threshold,
+            senior_threshold=senior_threshold,
+        )
+    return False
+
+
 def is_export_row_as_demand(
     row: dict[str, Any],
     export_rows: list[dict[str, Any]],
     *,
     wxindex_threshold: float,
+    event_sense_json: dict[str, Any] | None = None,
+    senior_fit_json: dict[str, Any] | None = None,
+    event_threshold: float = 0.0,
+    senior_threshold: float = 0.0,
 ) -> int:
-    """是否与 ODPS 需求同步规则一致:标题保留且该行有匹配需求。返回 0/1。"""
+    """是否与 ODPS 需求同步规则一致:标题保留、质量达标且该行有匹配需求。返回 0/1。"""
     if not _should_retain_title(export_rows, wxindex_threshold=wxindex_threshold):
         return 0
-    return 1 if _has_matched_demand(row) else 0
+    if not _has_matched_demand(row):
+        return 0
+    if event_sense_json is not None or senior_fit_json is not None:
+        if not _export_row_passes_quality(
+            row,
+            export_rows=export_rows,
+            event_sense_json=event_sense_json,
+            senior_fit_json=senior_fit_json,
+            event_threshold=event_threshold,
+            senior_threshold=senior_threshold,
+        ):
+            return 0
+    return 1
+
+
+def _build_export_rows_for_record(
+    record: dict[str, Any],
+    *,
+    wxindex_threshold: float,
+    event_sense_json: dict[str, Any] | None,
+    senior_fit_json: dict[str, Any] | None,
+    event_threshold: float,
+    senior_threshold: float,
+) -> list[dict[str, Any]]:
+    match_result = record.get("contribution_demand_match_json")
+    if not isinstance(match_result, dict):
+        return []
+    contribution_points = record.get("contribution_points_json")
+    trend_json = record.get("wxindex_trend_json")
+    export_rows = attach_wxindex_metadata(
+        build_demand_export_rows(
+            match_result,
+            contribution_points=(
+                contribution_points if isinstance(contribution_points, dict) else None
+            ),
+            trend_json=trend_json if isinstance(trend_json, dict) else None,
+        ),
+        trend_json if isinstance(trend_json, dict) else None,
+        wxindex_threshold=wxindex_threshold,
+        event_sense_json=event_sense_json,
+        senior_fit_json=senior_fit_json,
+        event_threshold=event_threshold,
+        senior_threshold=senior_threshold,
+    )
+    return attach_quality_scores_to_export_rows(
+        export_rows,
+        event_sense_json=event_sense_json,
+        senior_fit_json=senior_fit_json,
+    )
 
 
 def build_hive_rows_for_record(
@@ -87,36 +183,47 @@ def build_hive_rows_for_record(
     strategy: str,
     partition_dt: str,
     wxindex_threshold: float,
+    event_sense_json: dict[str, Any] | None = None,
+    senior_fit_json: dict[str, Any] | None = None,
+    event_threshold: float = 0.0,
+    senior_threshold: float = 0.0,
 ) -> list[dict[str, Any]]:
     if not _should_retain_title(export_rows, wxindex_threshold=wxindex_threshold):
         return []
 
     weight = _record_wxindex_score(export_rows) / WEIGHT_DIVISOR
-
-    element_texts = _dedupe_texts(
-        [
-            str(row.get("item_text") or "").strip()
-            for row in export_rows
-            if str(row.get("item_type") or "") == ITEM_TYPE_ELEMENT and _has_matched_demand(row)
-        ]
-    )
+    feature_combo = build_feature_combo_text(export_rows)
     phrase_texts = _dedupe_texts(
         [
             str(row.get("item_text") or "").strip()
             for row in export_rows
             if str(row.get("item_type") or "") == ITEM_TYPE_PHRASE
             and _has_matched_demand(row)
+            and quality_passed(
+                demand_type=TYPE_PHRASE,
+                demand_text=str(row.get("item_text") or "").strip(),
+                event_sense_json=event_sense_json,
+                senior_fit_json=senior_fit_json,
+                event_threshold=event_threshold,
+                senior_threshold=senior_threshold,
+            )
         ]
     )
 
     hive_rows: list[dict[str, Any]] = []
-    if element_texts:
-        demand_name = " ".join(element_texts)
+    if feature_combo and quality_passed(
+        demand_type=TYPE_FEATURE_POINT,
+        demand_text=feature_combo,
+        event_sense_json=event_sense_json,
+        senior_fit_json=senior_fit_json,
+        event_threshold=event_threshold,
+        senior_threshold=senior_threshold,
+    ):
         hive_rows.append(
             _build_hive_row(
                 record_id=record_id,
                 strategy=strategy,
-                demand_name=demand_name,
+                demand_name=feature_combo,
                 weight=weight,
                 demand_type=TYPE_FEATURE_POINT,
                 partition_dt=partition_dt,
@@ -137,12 +244,51 @@ def build_hive_rows_for_record(
     return hive_rows
 
 
+def build_hive_rows_for_odps_record(
+    record: dict[str, Any],
+    *,
+    strategy: str,
+    partition_dt: str,
+    wxindex_threshold: float,
+    event_threshold: float,
+    senior_threshold: float,
+) -> list[dict[str, Any]]:
+    event_sense_json = record.get("demand_event_sense_json")
+    senior_fit_json = record.get("demand_senior_fit_json")
+    if not isinstance(event_sense_json, dict):
+        event_sense_json = {}
+    if not isinstance(senior_fit_json, dict):
+        senior_fit_json = {}
+
+    export_rows = _build_export_rows_for_record(
+        record,
+        wxindex_threshold=wxindex_threshold,
+        event_sense_json=event_sense_json,
+        senior_fit_json=senior_fit_json,
+        event_threshold=event_threshold,
+        senior_threshold=senior_threshold,
+    )
+    return build_hive_rows_for_record(
+        export_rows,
+        record_id=int(record.get("id") or 0),
+        strategy=strategy,
+        partition_dt=partition_dt,
+        wxindex_threshold=wxindex_threshold,
+        event_sense_json=event_sense_json,
+        senior_fit_json=senior_fit_json,
+        event_threshold=event_threshold,
+        senior_threshold=senior_threshold,
+    )
+
+
 def build_hive_rows_from_export_groups(
     export_groups: list[dict[str, Any]],
     *,
     strategy: str,
     partition_dt: str,
     wxindex_threshold: float,
+    event_threshold: float = 0.0,
+    senior_threshold: float = 0.0,
 ) -> list[dict[str, Any]]:
     rows: list[dict[str, Any]] = []
     seen_demand_ids: set[str] = set()
@@ -153,12 +299,49 @@ def build_hive_rows_from_export_groups(
         record_id = int(group.get("record_id") or 0)
         if record_id <= 0:
             continue
+        event_sense_json = group.get("demand_event_sense_json")
+        senior_fit_json = group.get("demand_senior_fit_json")
         for hive_row in build_hive_rows_for_record(
             export_rows,
             record_id=record_id,
             strategy=strategy,
             partition_dt=partition_dt,
             wxindex_threshold=wxindex_threshold,
+            event_sense_json=event_sense_json if isinstance(event_sense_json, dict) else None,
+            senior_fit_json=senior_fit_json if isinstance(senior_fit_json, dict) else None,
+            event_threshold=event_threshold,
+            senior_threshold=senior_threshold,
+        ):
+            demand_id = str(hive_row["demand_id"])
+            if demand_id in seen_demand_ids:
+                continue
+            seen_demand_ids.add(demand_id)
+            rows.append(hive_row)
+    return rows
+
+
+def build_hive_rows_from_odps_records(
+    records: list[dict[str, Any]],
+    *,
+    strategy: str,
+    partition_dt: str,
+    wxindex_threshold: float,
+    event_threshold: float,
+    senior_threshold: float,
+) -> list[dict[str, Any]]:
+    rows: list[dict[str, Any]] = []
+    seen_demand_ids: set[str] = set()
+    for record in records:
+        record_id = int(record.get("id") or 0)
+        if record_id <= 0:
+            continue
+        for hive_row in build_hive_rows_for_odps_record(
+            record,
+            strategy=strategy,
+            partition_dt=partition_dt,
+            wxindex_threshold=wxindex_threshold,
+            event_threshold=event_threshold,
+            senior_threshold=senior_threshold,
         ):
             demand_id = str(hive_row["demand_id"])
             if demand_id in seen_demand_ids:

+ 8 - 6
app/hot_content/demand_pool_writer.py

@@ -7,7 +7,7 @@ from datetime import datetime
 from typing import Any
 
 from app.aliyun_odps.client import get_odps_client
-from app.hot_content.demand_hive_export import build_hive_rows_from_export_groups
+from app.hot_content.demand_hive_export import build_hive_rows_from_odps_records
 from app.hot_content.exceptions import HotContentFlowError
 from app.hot_content.repository import HotContentRepository
 from app.hot_content.timezone import SHANGHAI_TZ
@@ -92,13 +92,15 @@ class HotDemandPoolWriter:
     def plan_today(self) -> dict[str, Any]:
         partition_dt = datetime.now(SHANGHAI_TZ).date().strftime("%Y%m%d")
         strategy = self.config.hot_demand_pool_strategy
-        # 仅同步主表 hot_content_records.created_at 为当天的 record,写入当天 ODPS 分区。
-        export_groups = self.repository.list_demand_export_groups()
-        hive_rows = build_hive_rows_from_export_groups(
-            export_groups,
+        # 从主表 hot_content_records 读取当天记录及质量评分,写入当天 ODPS 分区。
+        odps_records = self.repository.list_odps_sync_records()
+        hive_rows = build_hive_rows_from_odps_records(
+            odps_records,
             strategy=strategy,
             partition_dt=partition_dt,
             wxindex_threshold=self.config.wxindex_score_threshold,
+            event_threshold=self.config.demand_event_sense_threshold,
+            senior_threshold=self.config.demand_senior_fit_threshold,
         )
         synced_demand_ids = self.repository.list_synced_odps_demand_ids(
             partition_dt=partition_dt,
@@ -129,7 +131,7 @@ class HotDemandPoolWriter:
         return {
             "partition_dt": partition_dt,
             "strategy": strategy,
-            "source_record_count": len(export_groups),
+            "source_record_count": len(odps_records),
             "candidate_row_count": len(hive_rows),
             "pending_row_count": len(rows_to_write),
             "skipped_row_count": len(skipped_rows),

+ 76 - 6
app/hot_content/postprocess_service.py

@@ -20,8 +20,10 @@ from app.hot_content.types import FlowConfig
 from app.hot_content.demand_export import (
     attach_wxindex_metadata,
     build_demand_export_rows,
+    build_export_rows_from_record,
 )
 from app.hot_content.demand_pool_writer import sync_hot_demands_to_hive
+from app.hot_content.demand_quality import run_demand_quality_pipeline
 from app.hot_content.wxindex_trend import calc_wxindex_trend
 
 
@@ -174,6 +176,7 @@ class ContributionPostprocessService:
 
         matched_count = 0
         wxindex_count = 0
+        quality_count = 0
         exported_count = 0
         skipped_count = 0
         failed_count = 0
@@ -214,6 +217,7 @@ class ContributionPostprocessService:
                         status=PostprocessStatus.SKIPPED,
                         error_message="no matched demand words",
                     )
+                    self._save_empty_demand_quality(record_id=record_id)
                     exported_count += self.export_demand_terms_if_needed(
                         record=record,
                         match_result=match_result,
@@ -226,12 +230,25 @@ class ContributionPostprocessService:
                     record_id=record_id,
                     trend_json=trend_result,
                 )
+                event_sense_json, senior_fit_json = self.run_demand_quality_judgment(
+                    record=record,
+                    match_result=match_result,
+                    trend_result=trend_result,
+                )
+                self.repository.save_demand_quality(
+                    record_id=record_id,
+                    event_sense_json=event_sense_json,
+                    senior_fit_json=senior_fit_json,
+                )
                 exported_count += self.export_demand_terms_if_needed(
                     record=record,
                     match_result=match_result,
                     trend_result=trend_result,
+                    event_sense_json=event_sense_json,
+                    senior_fit_json=senior_fit_json,
                 )
                 wxindex_count += 1
+                quality_count += 1
             except WxindexSelectionSkipped as exc:
                 self.repository.update_postprocess_status(
                     record_id=record_id,
@@ -239,6 +256,7 @@ class ContributionPostprocessService:
                     error_message=str(exc),
                 )
                 if isinstance(match_result, dict):
+                    self._save_empty_demand_quality(record_id=record_id)
                     exported_count += self.export_demand_terms_if_needed(
                         record=record,
                         match_result=match_result,
@@ -264,6 +282,7 @@ class ContributionPostprocessService:
                 "candidate_count": len(records),
                 "matched_count": matched_count,
                 "wxindex_count": wxindex_count,
+                "quality_count": quality_count,
                 "exported_count": exported_count,
                 "skipped_count": skipped_count,
                 "failed_count": failed_count,
@@ -277,14 +296,25 @@ class ContributionPostprocessService:
             result["hive_sync_error"] = str(exc)
         return result
 
-    def export_demand_terms_if_needed(
+    def _save_empty_demand_quality(self, *, record_id: int) -> None:
+        self.repository.save_demand_quality(
+            record_id=record_id,
+            event_sense_json={},
+            senior_fit_json={},
+            update_status=False,
+        )
+
+    def run_demand_quality_judgment(
         self,
         *,
         record: dict[str, Any],
         match_result: dict[str, Any],
-        trend_result: dict[str, Any] | None,
-    ) -> int:
-        export_rows = attach_wxindex_metadata(
+        trend_result: dict[str, Any],
+    ) -> tuple[dict[str, Any], dict[str, Any]]:
+        channel_content_id = str(
+            match_result.get("channelContentId") or record.get("unique_key") or ""
+        ).strip()
+        base_export_rows = attach_wxindex_metadata(
             build_demand_export_rows(
                 match_result,
                 contribution_points=(
@@ -292,10 +322,50 @@ class ContributionPostprocessService:
                     if isinstance(record.get("contribution_points_json"), dict)
                     else None
                 ),
-                trend_json=trend_result if isinstance(trend_result, dict) else None,
+                trend_json=trend_result,
             ),
-            trend_result if isinstance(trend_result, dict) else None,
+            trend_result,
+            wxindex_threshold=self.config.wxindex_score_threshold,
+        )
+        return run_demand_quality_pipeline(
+            channel_content_id=channel_content_id,
+            export_rows=base_export_rows,
+            wxindex_threshold=self.config.wxindex_score_threshold,
+            event_threshold=self.config.demand_event_sense_threshold,
+            senior_threshold=self.config.demand_senior_fit_threshold,
+            model=self.config.demand_quality_llm_model,
+            max_attempts=self.config.demand_quality_llm_max_attempts,
+            retry_sleep_seconds=self.config.demand_quality_llm_retry_sleep_seconds,
+            max_tokens=self.config.demand_quality_llm_max_tokens,
+        )
+
+    def export_demand_terms_if_needed(
+        self,
+        *,
+        record: dict[str, Any],
+        match_result: dict[str, Any],
+        trend_result: dict[str, Any] | None,
+        event_sense_json: dict[str, Any] | None = None,
+        senior_fit_json: dict[str, Any] | None = None,
+    ) -> int:
+        normalized_record = {
+            "contribution_demand_match_json": match_result,
+            "contribution_points_json": (
+                record.get("contribution_points_json")
+                if isinstance(record.get("contribution_points_json"), dict)
+                else None
+            ),
+            "wxindex_trend_json": trend_result if isinstance(trend_result, dict) else None,
+            "demand_event_sense_json": event_sense_json if isinstance(event_sense_json, dict) else {},
+            "demand_senior_fit_json": senior_fit_json if isinstance(senior_fit_json, dict) else {},
+        }
+        export_rows = build_export_rows_from_record(
+            normalized_record,
             wxindex_threshold=self.config.wxindex_score_threshold,
+            event_sense_json=normalized_record["demand_event_sense_json"],
+            senior_fit_json=normalized_record["demand_senior_fit_json"],
+            event_threshold=self.config.demand_event_sense_threshold,
+            senior_threshold=self.config.demand_senior_fit_threshold,
         )
         self.repository.replace_demand_export_rows(
             record_id=int(record["id"]),

+ 157 - 1
app/hot_content/repository.py

@@ -440,6 +440,47 @@ class HotContentRepository:
                 ),
             )
 
+    def save_demand_quality(
+        self,
+        *,
+        record_id: int,
+        event_sense_json: dict[str, Any],
+        senior_fit_json: dict[str, Any],
+        update_status: bool = True,
+    ) -> None:
+        self._ensure_record_quality_columns()
+        if update_status:
+            sql = """
+                UPDATE hot_content_records
+                SET demand_event_sense_json=%s,
+                    demand_senior_fit_json=%s,
+                    postprocess_status=%s,
+                    postprocess_error_reason=NULL,
+                    updated_at=NOW()
+                WHERE id=%s
+            """
+            params = (
+                _json_dumps(event_sense_json),
+                _json_dumps(senior_fit_json),
+                PostprocessStatus.QUALITY_DONE,
+                record_id,
+            )
+        else:
+            sql = """
+                UPDATE hot_content_records
+                SET demand_event_sense_json=%s,
+                    demand_senior_fit_json=%s,
+                    updated_at=NOW()
+                WHERE id=%s
+            """
+            params = (
+                _json_dumps(event_sense_json),
+                _json_dumps(senior_fit_json),
+                record_id,
+            )
+        with self.conn.cursor() as cursor:
+            cursor.execute(sql, params)
+
     def update_postprocess_status(
         self,
         *,
@@ -484,10 +525,12 @@ class HotContentRepository:
                 wxindex_latest_score,
                 wxindex_trend,
                 is_as_demand,
+                event_sense_score,
+                senior_fit_score,
                 created_at,
                 updated_at
             )
-            VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, NOW(), NOW())
+            VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, NOW(), NOW())
         """
         with self.conn.cursor() as cursor:
             cursor.execute(delete_sql, (record_id,))
@@ -507,6 +550,8 @@ class HotContentRepository:
                     float(item.get("wxindex_latest_score") or 0),
                     str(item.get("wxindex_trend") or ""),
                     int(item.get("is_as_demand") or 0),
+                    item.get("event_sense_score"),
+                    item.get("senior_fit_score"),
                 )
                 for item in rows
                 if str(item.get("item_type") or "").strip()
@@ -515,6 +560,62 @@ class HotContentRepository:
             if insert_rows:
                 cursor.executemany(insert_sql, insert_rows)
 
+    def list_odps_sync_records(self) -> list[dict[str, Any]]:
+        """读取当天创建且已完成质量判断的新记录,供 ODPS 同步(不处理历史数据)。"""
+        self._ensure_record_quality_columns()
+        today_start = datetime.now(SHANGHAI_TZ).replace(
+            hour=0,
+            minute=0,
+            second=0,
+            microsecond=0,
+            tzinfo=None,
+        )
+        today_end = today_start + timedelta(days=1)
+        sql = """
+            SELECT
+                id,
+                contribution_points_json,
+                contribution_demand_match_json,
+                wxindex_trend_json,
+                demand_event_sense_json,
+                demand_senior_fit_json
+            FROM hot_content_records
+            WHERE created_at >= %s
+              AND created_at < %s
+              AND postprocess_status = %s
+              AND contribution_demand_match_json IS NOT NULL
+              AND TRIM(CAST(contribution_demand_match_json AS CHAR)) <> ''
+            ORDER BY id ASC
+        """
+        with self.conn.cursor() as cursor:
+            cursor.execute(
+                sql,
+                (today_start, today_end, PostprocessStatus.QUALITY_DONE),
+            )
+            rows = cursor.fetchall()
+
+        records: list[dict[str, Any]] = []
+        for row in rows:
+            records.append(
+                {
+                    "id": int(row["id"]),
+                    "contribution_points_json": _json_loads(
+                        row.get("contribution_points_json")
+                    ),
+                    "contribution_demand_match_json": _json_loads(
+                        row.get("contribution_demand_match_json")
+                    ),
+                    "wxindex_trend_json": _json_loads(row.get("wxindex_trend_json")),
+                    "demand_event_sense_json": _json_loads(
+                        row.get("demand_event_sense_json")
+                    ),
+                    "demand_senior_fit_json": _json_loads(
+                        row.get("demand_senior_fit_json")
+                    ),
+                }
+            )
+        return records
+
     def list_demand_export_groups(self) -> list[dict[str, Any]]:
         """读取主表当天创建的 record 对应导出分组,仅供 ODPS 当天分区同步(不跨天)。"""
         self._ensure_demand_export_table()
@@ -658,6 +759,61 @@ class HotContentRepository:
                 AFTER wxindex_trend
                 """,
             )
+            self._ensure_demand_export_column(
+                cursor,
+                "event_sense_score",
+                """
+                ALTER TABLE hot_content_demand_exports
+                ADD COLUMN event_sense_score DOUBLE NULL
+                COMMENT '事件性得分 0-10'
+                AFTER is_as_demand
+                """,
+            )
+            self._ensure_demand_export_column(
+                cursor,
+                "senior_fit_score",
+                """
+                ALTER TABLE hot_content_demand_exports
+                ADD COLUMN senior_fit_score DOUBLE NULL
+                COMMENT '老年性得分 0-10'
+                AFTER event_sense_score
+                """,
+            )
+
+    def _ensure_record_quality_columns(self) -> None:
+        with self.conn.cursor() as cursor:
+            for column_name, alter_sql in (
+                (
+                    "demand_event_sense_json",
+                    """
+                    ALTER TABLE hot_content_records
+                    ADD COLUMN demand_event_sense_json JSON NULL
+                    COMMENT '需求事件性 LLM 评分结果'
+                    AFTER wxindex_trend_json
+                    """,
+                ),
+                (
+                    "demand_senior_fit_json",
+                    """
+                    ALTER TABLE hot_content_records
+                    ADD COLUMN demand_senior_fit_json JSON NULL
+                    COMMENT '需求老年性 LLM 评分结果'
+                    AFTER demand_event_sense_json
+                    """,
+                ),
+            ):
+                cursor.execute(
+                    """
+                    SELECT COUNT(*) AS cnt
+                    FROM information_schema.COLUMNS
+                    WHERE TABLE_SCHEMA = DATABASE()
+                      AND TABLE_NAME = 'hot_content_records'
+                      AND COLUMN_NAME = %s
+                    """,
+                    (column_name,),
+                )
+                if int((cursor.fetchone() or {}).get("cnt") or 0) == 0:
+                    cursor.execute(alter_sql)
 
     def _ensure_demand_export_column(
         self,

+ 1 - 0
app/hot_content/status.py

@@ -21,6 +21,7 @@ class PostprocessStatus:
     PENDING = 0
     DEMAND_MATCHED = 10
     WXINDEX_DONE = 20
+    QUALITY_DONE = 30
     SKIPPED = 90
     FAILED = 99
 

+ 6 - 0
app/hot_content/types.py

@@ -53,5 +53,11 @@ class FlowConfig:
     wxindex_llm_max_tokens: int
     wxindex_api_url: str
     wxindex_lookback_days: int
+    demand_event_sense_threshold: float
+    demand_senior_fit_threshold: float
+    demand_quality_llm_model: str
+    demand_quality_llm_max_attempts: int
+    demand_quality_llm_retry_sleep_seconds: float
+    demand_quality_llm_max_tokens: int
     sources: list[HotSourceConfig]
     mysql: MysqlConfig