xueyiming 3 hafta önce
ebeveyn
işleme
9de4876990

+ 2 - 0
app/hot_content/demand_cache_service.py

@@ -94,6 +94,7 @@ class DemandCacheService:
     ) -> tuple[list[str], str | None]:
         table = _safe_identifier(self.config.demand_pool_source_table)
         excluded = _escape_sql_string(self.config.demand_pool_excluded_strategy)
+        hot_strategy = _escape_sql_string(self.config.hot_demand_pool_strategy)
         partition_dts = _resolve_partition_dts(partition_dt)
         dt_clause = _build_dt_clause(partition_dts)
 
@@ -108,6 +109,7 @@ class DemandCacheService:
             WHERE {dt_clause}
               AND strategy IS NOT NULL
               AND strategy <> '{excluded}'
+              AND strategy <> '{hot_strategy}'
               AND demand_name IS NOT NULL
               AND TRIM(demand_name) <> ''
         ),

+ 20 - 10
app/hot_content/demand_export.py

@@ -193,6 +193,8 @@ def append_wxindex_keyword_rows(
         return
 
     word_row = _resolve_word_row(keyword, word_lookup=word_lookup, match_result=match_result)
+    if not extract_matched_demand_names(word_row):
+        return
     categories = ordered_point_categories(word_to_categories.get(keyword, set()))
     if categories:
         for category in categories:
@@ -214,7 +216,9 @@ def build_demand_export_rows(
         words_rows = []
 
     contribution_source = contribution_points if isinstance(contribution_points, dict) else match_result
-    points = contribution_source.get("点列表") or []
+    points = match_result.get("点列表") or []
+    if not isinstance(points, list) or not points:
+        points = contribution_source.get("点列表") or []
     if not isinstance(points, list):
         points = []
 
@@ -225,6 +229,8 @@ def build_demand_export_rows(
         categories = ordered_point_categories(word_to_categories.get(word_text, set()))
         if not categories:
             continue
+        if not extract_matched_demand_names(word_row):
+            continue
         for category in categories:
             export_rows.append(_build_word_export_row(word_text, word_row, category))
 
@@ -235,12 +241,15 @@ def build_demand_export_rows(
         category = str(point_item.get("来源") or "").strip()
         if not point_text or category not in POINT_CATEGORIES:
             continue
+        matched_demand = extract_point_matched_demand_names(point_item, word_lookup)
+        if not matched_demand:
+            continue
         export_rows.append(
             {
                 "item_type": ITEM_TYPE_PHRASE,
                 "item_text": point_text,
                 "point_category": category,
-                "matched_demand": extract_point_matched_demand_names(point_item, word_lookup),
+                "matched_demand": matched_demand,
                 "contribution_score": None,
             }
         )
@@ -281,17 +290,10 @@ def attach_wxindex_metadata(
     wxindex_keyword = get_wxindex_keyword(trend_json)
     rows: list[dict[str, Any]] = []
     for row in export_rows:
-        item_type = str(row.get("item_type") or "")
-        item_text = str(row.get("item_text") or "").strip()
         matched_demand = str(row.get("matched_demand") or "").strip()
         has_record_wxindex = latest_score is not None
-        is_wxindex_keyword = (
-            item_type == ITEM_TYPE_ELEMENT and wxindex_keyword and item_text == wxindex_keyword
-        )
 
-        if has_record_wxindex and (
-            matched_demand or item_type == ITEM_TYPE_PHRASE or is_wxindex_keyword
-        ):
+        if has_record_wxindex and matched_demand:
             wxindex_score = float(latest_score)
             wxindex_trend_value = trend
         else:
@@ -395,6 +397,14 @@ def export_existing_records(
         )
         if not export_rows:
             summary["no_export_rows"] += 1
+            if not dry_run:
+                repository.replace_demand_export_rows(
+                    record_id=record_id,
+                    source=str(row.get("source") or ""),
+                    hot_title=str(row.get("title") or ""),
+                    article_title=str(row.get("article_title") or ""),
+                    rows=[],
+                )
             continue
 
         if verbose or dry_run:

+ 1 - 1
app/hot_content/demand_hive_export.py

@@ -92,7 +92,7 @@ def build_hive_rows_for_record(
             str(row.get("item_text") or "").strip()
             for row in export_rows
             if str(row.get("item_type") or "") == ITEM_TYPE_PHRASE
-            and str(row.get("item_text") or "").strip()
+            and _has_matched_demand(row)
         ]
     )
 

+ 3 - 4
app/hot_content/postprocess_service.py

@@ -277,9 +277,6 @@ class ContributionPostprocessService:
             ),
             trend_result if isinstance(trend_result, dict) else None,
         )
-        if not export_rows:
-            return 0
-
         self.repository.replace_demand_export_rows(
             record_id=int(record["id"]),
             source=str(record.get("source") or ""),
@@ -539,7 +536,9 @@ class ContributionPostprocessService:
                 "latest_total_score": latest_score,
                 "threshold": threshold,
                 "latest_gt_threshold": (
-                    False if latest_score is None else latest_score > threshold
+                    False
+                    if latest_score is None
+                    else latest_score >= threshold
                 ),
                 "trend": calc_wxindex_trend(series),
             },

+ 13 - 2
app/hot_content/repository.py

@@ -4,7 +4,7 @@ from __future__ import annotations
 
 import hashlib
 import json
-from datetime import datetime
+from datetime import datetime, timedelta
 from typing import Any
 
 try:
@@ -16,6 +16,7 @@ except ImportError:  # pragma: no cover - runtime dependency check
 
 from app.hot_content.exceptions import HotContentFlowError
 from app.hot_content.status import ExecutionStatus, PostprocessStatus
+from app.hot_content.timezone import SHANGHAI_TZ
 from app.hot_content.types import MysqlConfig
 
 
@@ -512,6 +513,14 @@ class HotContentRepository:
 
     def list_demand_export_groups(self) -> list[dict[str, Any]]:
         self._ensure_demand_export_table()
+        today_start = datetime.now(SHANGHAI_TZ).replace(
+            hour=0,
+            minute=0,
+            second=0,
+            microsecond=0,
+            tzinfo=None,
+        )
+        today_end = today_start + timedelta(days=1)
         sql = """
             SELECT
                 record_id,
@@ -521,10 +530,12 @@ class HotContentRepository:
                 matched_demand,
                 wxindex_latest_score
             FROM hot_content_demand_exports
+            WHERE updated_at >= %s
+              AND updated_at < %s
             ORDER BY record_id ASC, id ASC
         """
         with self.conn.cursor() as cursor:
-            cursor.execute(sql)
+            cursor.execute(sql, (today_start, today_end))
             rows = cursor.fetchall()
 
         grouped: dict[int, list[dict[str, Any]]] = {}