xueyiming 3 недель назад
Родитель
Сommit
5f91689212
2 измененных файлов с 106 добавлено и 28 удалено
  1. 105 28
      app/hot_content/demand_export.py
  2. 1 0
      app/hot_content/repository.py

+ 105 - 28
app/hot_content/demand_export.py

@@ -1,4 +1,11 @@
-"""微信指数达标后的需求元素/短语导出逻辑。"""
+"""需求元素/短语导出到 MySQL 的逻辑(全量,不做 ODPS 过滤)。
+
+MySQL hot_content_demand_exports 写入本模块产出的全量行:
+- 元素:点列表关联的全部高贡献词(含未匹配需求)
+- 短语:灵感点/目的点/关键点全部短语(含未匹配需求)
+
+ODPS 写入前的过滤(微信指数门槛、matched_demand 等)在 demand_hive_export 中处理。
+"""
 
 from __future__ import annotations
 
@@ -87,6 +94,72 @@ def build_word_lookup(words_rows: list[Any]) -> dict[str, dict[str, Any]]:
     return lookup
 
 
+def _has_matched_demand_text(value: Any) -> bool:
+    return bool(str(value or "").strip())
+
+
+def build_merged_word_lookup(
+    match_result: dict[str, Any],
+    contribution_points: dict[str, Any] | None,
+) -> dict[str, dict[str, Any]]:
+    """以 contribution 高贡献词为全集,匹配信息仅以 match_result 为准。"""
+    match_lookup = build_word_lookup(match_result.get("高贡献词列表") or [])
+    contribution_source = (
+        contribution_points if isinstance(contribution_points, dict) else match_result
+    )
+    words_rows = contribution_source.get("高贡献词列表") or []
+    if not isinstance(words_rows, list):
+        words_rows = []
+
+    lookup: dict[str, dict[str, Any]] = {}
+    for word_row in words_rows:
+        if not isinstance(word_row, dict):
+            continue
+        word_text = str(word_row.get("词") or "").strip()
+        if not word_text:
+            continue
+        match_row = match_lookup.get(word_text)
+        merged = {k: v for k, v in word_row.items() if k != "匹配需求列表"}
+        if isinstance(match_row, dict):
+            merged["匹配需求列表"] = list(match_row.get("匹配需求列表") or [])
+        else:
+            merged["匹配需求列表"] = []
+        lookup[word_text] = merged
+
+    for word_text, match_row in match_lookup.items():
+        if word_text not in lookup:
+            lookup[word_text] = match_row
+    return lookup
+
+
+def enrich_word_lookup_from_points(
+    word_lookup: dict[str, dict[str, Any]],
+    *,
+    points: list[Any],
+    match_result: dict[str, Any],
+) -> None:
+    """把点列表里出现、但高贡献词列表未收录的词补进 lookup。"""
+    if not isinstance(points, list):
+        return
+    for point_item in points:
+        if not isinstance(point_item, dict):
+            continue
+        match_words = point_item.get("匹配词列表") or []
+        if not isinstance(match_words, list):
+            continue
+        for hit in match_words:
+            if not isinstance(hit, dict):
+                continue
+            word_text = str(hit.get("词") or "").strip()
+            if not word_text or word_text in word_lookup:
+                continue
+            word_lookup[word_text] = _resolve_word_row(
+                word_text,
+                word_lookup=word_lookup,
+                match_result=match_result,
+            )
+
+
 def build_word_to_categories(points: list[Any]) -> dict[str, set[str]]:
     word_categories: dict[str, set[str]] = {}
     if not isinstance(points, list):
@@ -145,12 +218,17 @@ def _build_word_export_row(
     word_row: dict[str, Any],
     category: str,
 ) -> dict[str, Any]:
+    matched_demand = extract_matched_demand_names(word_row)
     return {
         "item_type": ITEM_TYPE_ELEMENT,
         "item_text": word_text,
         "point_category": category,
-        "matched_demand": extract_matched_demand_names(word_row),
-        "contribution_score": _to_contribution_score(word_row.get("贡献度")),
+        "matched_demand": matched_demand,
+        "contribution_score": (
+            _to_contribution_score(word_row.get("贡献度"))
+            if _has_matched_demand_text(matched_demand)
+            else None
+        ),
     }
 
 
@@ -193,8 +271,6 @@ def append_wxindex_keyword_rows(
         return
 
     word_row = _resolve_word_row(keyword, word_lookup=word_lookup, match_result=match_result)
-    if not extract_matched_demand_names(word_row):
-        return
     categories = ordered_point_categories(word_to_categories.get(keyword, set()))
     if categories:
         for category in categories:
@@ -211,26 +287,29 @@ def build_demand_export_rows(
     trend_json: dict[str, Any] | None = None,
 ) -> list[dict[str, Any]]:
     export_rows: list[dict[str, Any]] = []
-    words_rows = match_result.get("高贡献词列表") or []
-    if not isinstance(words_rows, list):
-        words_rows = []
 
     contribution_source = contribution_points if isinstance(contribution_points, dict) else match_result
-    points = match_result.get("点列表") or []
-    if not isinstance(points, list) or not points:
-        points = contribution_source.get("点列表") or []
+    points = contribution_source.get("点列表") or []
     if not isinstance(points, list):
         points = []
 
-    word_lookup = build_word_lookup(words_rows)
+    word_lookup = build_merged_word_lookup(match_result, contribution_points)
+    enrich_word_lookup_from_points(
+        word_lookup,
+        points=points,
+        match_result=match_result,
+    )
     word_to_categories = build_word_to_categories(points)
 
-    for word_text, word_row in word_lookup.items():
+    for word_text in sorted(set(word_lookup) | set(word_to_categories)):
         categories = ordered_point_categories(word_to_categories.get(word_text, set()))
         if not categories:
             continue
-        if not extract_matched_demand_names(word_row):
-            continue
+        word_row = word_lookup.get(word_text) or _resolve_word_row(
+            word_text,
+            word_lookup=word_lookup,
+            match_result=match_result,
+        )
         for category in categories:
             export_rows.append(_build_word_export_row(word_text, word_row, category))
 
@@ -241,15 +320,12 @@ def build_demand_export_rows(
         category = str(point_item.get("来源") or "").strip()
         if not point_text or category not in POINT_CATEGORIES:
             continue
-        matched_demand = extract_point_matched_demand_names(point_item, word_lookup)
-        if not matched_demand:
-            continue
         export_rows.append(
             {
                 "item_type": ITEM_TYPE_PHRASE,
                 "item_text": point_text,
                 "point_category": category,
-                "matched_demand": matched_demand,
+                "matched_demand": extract_point_matched_demand_names(point_item, word_lookup),
                 "contribution_score": None,
             }
         )
@@ -291,18 +367,24 @@ def attach_wxindex_metadata(
     rows: list[dict[str, Any]] = []
     for row in export_rows:
         matched_demand = str(row.get("matched_demand") or "").strip()
+        has_matched_demand = _has_matched_demand_text(matched_demand)
         has_record_wxindex = latest_score is not None
 
-        if has_record_wxindex and matched_demand:
+        if has_record_wxindex and has_matched_demand:
             wxindex_score = float(latest_score)
             wxindex_trend_value = trend
         else:
             wxindex_score = 0.0
             wxindex_trend_value = ""
 
+        normalized_row = dict(row)
+        if not has_matched_demand:
+            normalized_row["contribution_score"] = None
+
         rows.append(
             {
-                **row,
+                **normalized_row,
+                "matched_demand": matched_demand,
                 "wxindex_keyword": wxindex_keyword,
                 "wxindex_latest_score": wxindex_score,
                 "wxindex_trend": wxindex_trend_value,
@@ -380,11 +462,6 @@ def export_existing_records(
             summary["skipped"] += 1
             continue
 
-        latest_score = (
-            get_latest_wxindex_score(trend_json)
-            if isinstance(trend_json, dict)
-            else None
-        )
         export_rows = attach_wxindex_metadata(
             build_demand_export_rows(
                 match_json,
@@ -435,8 +512,8 @@ def parse_args(argv: list[str] | None = None) -> argparse.Namespace:
     parser = argparse.ArgumentParser(
         description=(
             "扫描已有 contribution_demand_match_json 记录,"
-            "导出全部元素/短语到 hot_content_demand_exports;"
-            "元素/短语按灵感点/目的点/关键点展开为多行,无点类型数据过滤;"
+            "全量导出元素/短语到 hot_content_demand_exports(含未匹配需求的项);"
+            "元素/短语按灵感点/目的点/关键点展开为多行,"
             "并补充获取微信指数的词、微信指数及趋势。"
         ),
     )

+ 1 - 0
app/hot_content/repository.py

@@ -512,6 +512,7 @@ class HotContentRepository:
                 cursor.executemany(insert_sql, insert_rows)
 
     def list_demand_export_groups(self) -> list[dict[str, Any]]:
+        """读取当天更新的导出分组,仅供 ODPS 日分区同步使用。"""
         self._ensure_demand_export_table()
         today_start = datetime.now(SHANGHAI_TZ).replace(
             hour=0,