Kaynağa Gözat

微信指数增加词判断
导入odps需求表方式修改

xueyiming 3 hafta önce
ebeveyn
işleme
37dd6fa09e

+ 59 - 14
app/hot_content/demand_export.py

@@ -45,6 +45,44 @@ def get_wxindex_keyword(trend_json: dict[str, Any] | None) -> str:
     return str(trend_json.get("llm_selected_word") or "").strip()
 
 
+def get_wxindex_keywords(trend_json: dict[str, Any] | None) -> list[str]:
+    if not isinstance(trend_json, dict):
+        return []
+
+    raw_words: list[Any] = []
+    wxindex = trend_json.get("wxindex")
+    if isinstance(wxindex, dict):
+        keywords = wxindex.get("keywords")
+        if isinstance(keywords, list):
+            raw_words = keywords
+        elif keywords:
+            raw_words = [keywords]
+
+    if not raw_words:
+        selected_words = trend_json.get("llm_selected_words")
+        if isinstance(selected_words, list):
+            raw_words = selected_words
+        elif selected_words:
+            raw_words = [selected_words]
+
+    if not raw_words:
+        keyword = get_wxindex_keyword(trend_json)
+        return [keyword] if keyword else []
+
+    deduped: list[str] = []
+    seen: set[str] = set()
+    for item in raw_words:
+        word = str(item or "").strip()
+        if word and word not in seen:
+            seen.add(word)
+            deduped.append(word)
+    return deduped
+
+
+def format_wxindex_keywords(trend_json: dict[str, Any] | None) -> str:
+    return ",".join(get_wxindex_keywords(trend_json))
+
+
 def get_wxindex_trend(trend_json: dict[str, Any]) -> str:
     wxindex = trend_json.get("wxindex")
     if not isinstance(wxindex, dict):
@@ -259,25 +297,30 @@ def append_wxindex_keyword_rows(
     word_lookup: dict[str, dict[str, Any]],
     word_to_categories: dict[str, set[str]],
 ) -> None:
-    keyword = get_wxindex_keyword(trend_json)
-    if not keyword:
+    keywords = get_wxindex_keywords(trend_json)
+    if not keywords:
         return
 
-    has_keyword_row = any(
-        row.get("item_type") == ITEM_TYPE_ELEMENT and str(row.get("item_text") or "").strip() == keyword
+    existing_element_texts = {
+        str(row.get("item_text") or "").strip()
         for row in export_rows
-    )
-    if has_keyword_row:
-        return
+        if row.get("item_type") == ITEM_TYPE_ELEMENT
+    }
 
-    word_row = _resolve_word_row(keyword, word_lookup=word_lookup, match_result=match_result)
-    categories = ordered_point_categories(word_to_categories.get(keyword, set()))
-    if categories:
-        for category in categories:
-            export_rows.append(_build_word_export_row(keyword, word_row, category))
-        return
+    for keyword in keywords:
+        if keyword in existing_element_texts:
+            continue
+
+        word_row = _resolve_word_row(keyword, word_lookup=word_lookup, match_result=match_result)
+        categories = ordered_point_categories(word_to_categories.get(keyword, set()))
+        if categories:
+            for category in categories:
+                export_rows.append(_build_word_export_row(keyword, word_row, category))
+            existing_element_texts.add(keyword)
+            continue
 
-    export_rows.append(_build_word_export_row(keyword, word_row, ""))
+        export_rows.append(_build_word_export_row(keyword, word_row, ""))
+        existing_element_texts.add(keyword)
 
 
 def build_demand_export_rows(
@@ -364,6 +407,7 @@ def attach_wxindex_metadata(
     )
     trend = get_wxindex_trend(trend_json) if isinstance(trend_json, dict) else ""
     wxindex_keyword = get_wxindex_keyword(trend_json)
+    all_hot_keywords = format_wxindex_keywords(trend_json)
     rows: list[dict[str, Any]] = []
     for row in export_rows:
         matched_demand = str(row.get("matched_demand") or "").strip()
@@ -386,6 +430,7 @@ def attach_wxindex_metadata(
                 **normalized_row,
                 "matched_demand": matched_demand,
                 "wxindex_keyword": wxindex_keyword,
+                "all_hot_keywords": all_hot_keywords,
                 "wxindex_latest_score": wxindex_score,
                 "wxindex_trend": wxindex_trend_value,
             }

+ 9 - 0
app/hot_content/demand_hive_export.py

@@ -71,6 +71,7 @@ def _has_matched_demand(row: dict[str, Any]) -> bool:
 def build_hive_rows_for_record(
     export_rows: list[dict[str, Any]],
     *,
+    record_id: int,
     strategy: str,
     partition_dt: str,
     wxindex_threshold: float,
@@ -101,6 +102,7 @@ def build_hive_rows_for_record(
         demand_name = " ".join(element_texts)
         hive_rows.append(
             _build_hive_row(
+                record_id=record_id,
                 strategy=strategy,
                 demand_name=demand_name,
                 weight=weight,
@@ -112,6 +114,7 @@ def build_hive_rows_for_record(
     for phrase_text in phrase_texts:
         hive_rows.append(
             _build_hive_row(
+                record_id=record_id,
                 strategy=strategy,
                 demand_name=phrase_text,
                 weight=weight,
@@ -135,8 +138,12 @@ def build_hive_rows_from_export_groups(
         export_rows = group.get("export_rows") or []
         if not isinstance(export_rows, list):
             continue
+        record_id = int(group.get("record_id") or 0)
+        if record_id <= 0:
+            continue
         for hive_row in build_hive_rows_for_record(
             export_rows,
+            record_id=record_id,
             strategy=strategy,
             partition_dt=partition_dt,
             wxindex_threshold=wxindex_threshold,
@@ -151,6 +158,7 @@ def build_hive_rows_from_export_groups(
 
 def _build_hive_row(
     *,
+    record_id: int,
     strategy: str,
     demand_name: str,
     weight: float,
@@ -159,6 +167,7 @@ def _build_hive_row(
 ) -> dict[str, Any]:
     normalized_name = demand_name.strip()
     return {
+        "record_id": record_id,
         "strategy": strategy,
         "demand_id": build_demand_id(
             strategy=strategy,

+ 114 - 84
app/hot_content/demand_pool_writer.py

@@ -14,16 +14,6 @@ from app.hot_content.timezone import SHANGHAI_TZ
 from app.hot_content.types import FlowConfig
 
 IDENTIFIER_RE = re.compile(r"^[A-Za-z_][A-Za-z0-9_]*(?:\.[A-Za-z_][A-Za-z0-9_]*)?$")
-HIVE_COLUMNS = (
-    "strategy",
-    "demand_id",
-    "demand_name",
-    "weight",
-    "type",
-    "video_count",
-    "video_list",
-    "extend",
-)
 
 
 def _safe_identifier(name: str) -> str:
@@ -44,107 +34,147 @@ class HotDemandPoolWriter:
 
     def sync_today(self) -> dict[str, Any]:
         partition_dt = datetime.now(SHANGHAI_TZ).date().strftime("%Y%m%d")
+        strategy = self.config.hot_demand_pool_strategy
+        # 仅同步主表 hot_content_records.created_at 为当天的 record,写入当天 ODPS 分区。
         export_groups = self.repository.list_demand_export_groups()
         hive_rows = build_hive_rows_from_export_groups(
             export_groups,
-            strategy=self.config.hot_demand_pool_strategy,
+            strategy=strategy,
             partition_dt=partition_dt,
             wxindex_threshold=self.config.wxindex_score_threshold,
         )
-        written_count = self._write_partition(
-            hive_rows=hive_rows,
+        synced_demand_ids = self.repository.list_synced_odps_demand_ids(
+            partition_dt=partition_dt,
+            strategy=strategy,
+        )
+        odps_existing_demand_ids = self._list_odps_partition_demand_ids(
+            partition_dt=partition_dt,
+            strategy=strategy,
+        )
+        skip_demand_ids = synced_demand_ids | odps_existing_demand_ids
+
+        pending_rows: list[dict[str, Any]] = []
+        skipped_rows: list[dict[str, Any]] = []
+        for row in hive_rows:
+            demand_id = str(row.get("demand_id") or "").strip()
+            if demand_id in skip_demand_ids:
+                skipped_rows.append(row)
+                continue
+            pending_rows.append(row)
+
+        written_count = self._insert_partition_rows(
+            hive_rows=pending_rows,
             partition_dt=partition_dt,
-            strategy=self.config.hot_demand_pool_strategy,
+        )
+        if written_count:
+            self.repository.save_odps_sync_logs(
+                [
+                    {
+                        "partition_dt": partition_dt,
+                        "strategy": strategy,
+                        "demand_id": row["demand_id"],
+                        "demand_name": row["demand_name"],
+                        "demand_type": row["type"],
+                        "record_id": row.get("record_id") or 0,
+                    }
+                    for row in pending_rows
+                ]
+            )
+
+        pending_record_ids = sorted(
+            {
+                int(row.get("record_id") or 0)
+                for row in pending_rows
+                if int(row.get("record_id") or 0) > 0
+            }
+        )
+        skipped_record_ids = sorted(
+            {
+                int(row.get("record_id") or 0)
+                for row in skipped_rows
+                if int(row.get("record_id") or 0) > 0
+            }
         )
         return {
             "partition_dt": partition_dt,
-            "strategy": self.config.hot_demand_pool_strategy,
+            "strategy": strategy,
             "source_record_count": len(export_groups),
-            "hive_row_count": len(hive_rows),
+            "candidate_row_count": len(hive_rows),
+            "pending_row_count": len(pending_rows),
+            "skipped_row_count": len(skipped_rows),
             "written_count": written_count,
+            "pending_record_ids": pending_record_ids,
+            "skipped_record_ids": skipped_record_ids,
             "target_table": self.config.demand_pool_source_table,
         }
 
-    def _write_partition(
+    def _list_odps_partition_demand_ids(
         self,
         *,
-        hive_rows: list[dict[str, Any]],
         partition_dt: str,
         strategy: str,
-    ) -> int:
+    ) -> set[str]:
         table_name = _safe_identifier(self.config.demand_pool_source_table)
         odps_client = get_odps_client()
-        table = odps_client.get_table(table_name)
-        partition_spec = f"dt={partition_dt}"
-
-        preserved_rows = self._read_preserved_rows(
-            table=table,
-            partition_spec=partition_spec,
-            strategy=strategy,
-        )
-        payload_rows = preserved_rows + [
-            self._to_write_row(row) for row in hive_rows
-        ]
-        if not payload_rows and table.exist_partition(partition_spec):
-            odps_client.write_table(
-                table_name,
-                [],
-                partition=partition_spec,
-                create_partition=True,
-                overwrite=True,
-            )
+        sql = f"""
+        SELECT demand_id
+        FROM {table_name}
+        WHERE dt = '{_escape_sql_string(partition_dt)}'
+          AND strategy = '{_escape_sql_string(strategy)}'
+        """
+        try:
+            instance = odps_client.execute_sql(sql)
+            demand_ids: set[str] = set()
+            with instance.open_reader(tunnel=True) as reader:
+                for record in reader:
+                    demand_id = str(record["demand_id"] or "").strip()
+                    if demand_id:
+                        demand_ids.add(demand_id)
+            return demand_ids
+        except Exception:
+            return set()
+
+    def _insert_partition_rows(
+        self,
+        *,
+        hive_rows: list[dict[str, Any]],
+        partition_dt: str,
+    ) -> int:
+        if not hive_rows:
             return 0
 
-        odps_client.write_table(
-            table_name,
-            payload_rows,
-            partition=partition_spec,
-            create_partition=True,
-            overwrite=True,
+        table_name = _safe_identifier(self.config.demand_pool_source_table)
+        odps_client = get_odps_client()
+        select_sql = " UNION ALL ".join(
+            self._build_row_select(row) for row in hive_rows
         )
+        sql = f"""
+        INSERT INTO TABLE {table_name} PARTITION (dt='{_escape_sql_string(partition_dt)}')
+        {select_sql}
+        """
+        instance = odps_client.execute_sql(sql)
+        instance.wait_for_success()
         return len(hive_rows)
 
     @staticmethod
-    def _read_preserved_rows(
-        *,
-        table: Any,
-        partition_spec: str,
-        strategy: str,
-    ) -> list[list[Any]]:
-        if not table.exist_partition(partition_spec):
-            return []
-
-        preserved_rows: list[list[Any]] = []
-        with table.open_reader(partition=partition_spec) as reader:
-            for record in reader:
-                if str(record["strategy"] or "") == strategy:
-                    continue
-                preserved_rows.append(
-                    [
-                        record["strategy"],
-                        record["demand_id"],
-                        record["demand_name"],
-                        record["weight"],
-                        record["type"],
-                        record["video_count"],
-                        record["video_list"],
-                        record["extend"],
-                    ]
-                )
-        return preserved_rows
-
-    @staticmethod
-    def _to_write_row(row: dict[str, Any]) -> list[Any]:
-        return [
-            row["strategy"],
-            row["demand_id"],
-            row["demand_name"],
-            float(row["weight"]),
-            row["type"],
-            row["video_count"],
-            row["video_list"],
-            row["extend"],
-        ]
+    def _build_row_select(row: dict[str, Any]) -> str:
+        strategy = _escape_sql_string(str(row["strategy"]))
+        demand_id = _escape_sql_string(str(row["demand_id"]))
+        demand_name = _escape_sql_string(str(row["demand_name"]))
+        weight = float(row["weight"])
+        demand_type = _escape_sql_string(str(row["type"]))
+        extend = _escape_sql_string(str(row.get("extend") or "{}"))
+        return f"""
+        SELECT
+            '{strategy}' AS strategy,
+            '{demand_id}' AS demand_id,
+            '{demand_name}' AS demand_name,
+            {weight} AS weight,
+            '{demand_type}' AS type,
+            CAST(NULL AS BIGINT) AS video_count,
+            array() AS video_list,
+            '{extend}' AS extend
+        """
 
 
 def sync_hot_demands_to_hive(

+ 121 - 44
app/hot_content/postprocess_service.py

@@ -107,6 +107,25 @@ def _resolve_demand_name(
     return demand_lookup.get(value) or demand_lookup.get(_normalize_demand_key(value))
 
 
+def _collect_matched_demand_names(matched_word_rows: list[Any]) -> list[str]:
+    demand_names: list[str] = []
+    seen: set[str] = set()
+    for row in matched_word_rows:
+        if not isinstance(row, dict):
+            continue
+        match_rows = row.get("匹配需求列表") or []
+        if not isinstance(match_rows, list):
+            continue
+        for match in match_rows:
+            if not isinstance(match, dict):
+                continue
+            demand_name = str(match.get("demand_name") or "").strip()
+            if demand_name and demand_name not in seen:
+                seen.add(demand_name)
+                demand_names.append(demand_name)
+    return demand_names
+
+
 class ContributionPostprocessService:
     def __init__(
         self,
@@ -488,75 +507,112 @@ class ContributionPostprocessService:
         if not isinstance(matched_word_rows, list) or not matched_word_rows:
             return None
 
-        candidate_words = [
+        contribution_words = [
             str(row.get("词") or "").strip()
             for row in matched_word_rows
             if isinstance(row, dict) and str(row.get("词") or "").strip()
         ]
-        if not candidate_words:
+        if not contribution_words:
             return None
 
         channel_content_id = str(
             match_result.get("channelContentId") or record.get("unique_key") or ""
         )
         article_title, body_text = self.extract_article_text(record)
-        if len(candidate_words) == 1:
-            pick = {
-                "selected_word": candidate_words[0],
-                "reason": "only one candidate word",
-            }
-        else:
-            pick = self.llm_pick_best_word(
-                channel_content_id=channel_content_id,
-                article_title=article_title,
-                body_text=body_text,
-                candidate_words=candidate_words,
-            )
-        selected_word = pick["selected_word"]
+        matched_demands = _collect_matched_demand_names(matched_word_rows)
+
+        pick = self.llm_extract_wxindex_words(
+            channel_content_id=channel_content_id,
+            article_title=article_title,
+            body_text=body_text,
+            contribution_words=contribution_words,
+            matched_demands=matched_demands,
+        )
+        selected_words = pick["selected_words"]
         start_ymd, end_ymd = _get_recent_range(self.config.wxindex_lookback_days)
-        wx_payload = {
-            "keyword": selected_word,
-            "start_ymd": start_ymd,
-            "end_ymd": end_ymd,
-        }
-        wx_resp = self.api_client.post_json(self.config.wxindex_api_url, wx_payload)
-        series = _parse_total_scores(wx_resp)
-        latest_score = series[-1]["total_score"] if series else None
         threshold = float(self.config.wxindex_score_threshold)
+
+        wxindex_searches: list[dict[str, Any]] = []
+        for keyword in selected_words:
+            wx_payload = {
+                "keyword": keyword,
+                "start_ymd": start_ymd,
+                "end_ymd": end_ymd,
+            }
+            wx_resp = self.api_client.post_json(self.config.wxindex_api_url, wx_payload)
+            series = _parse_total_scores(wx_resp)
+            latest_score = series[-1]["total_score"] if series else None
+            wxindex_searches.append(
+                {
+                    "keyword": keyword,
+                    "start_ymd": start_ymd,
+                    "end_ymd": end_ymd,
+                    "total_score_7d": series,
+                    "latest_total_score": latest_score,
+                    "threshold": threshold,
+                    "latest_gt_threshold": (
+                        False
+                        if latest_score is None
+                        else latest_score >= threshold
+                    ),
+                    "trend": calc_wxindex_trend(series),
+                }
+            )
+
+        searchable = [
+            item
+            for item in wxindex_searches
+            if item.get("latest_total_score") is not None
+        ]
+        if not searchable:
+            raise WxindexSelectionSkipped(
+                f"no wxindex score for any keyword in {channel_content_id}: "
+                f"{selected_words}"
+            )
+
+        best = max(searchable, key=lambda item: float(item["latest_total_score"]))
+        selected_word = str(best["keyword"])
+        latest_score = best["latest_total_score"]
+        series = best["total_score_7d"]
         return {
             "channelContentId": channel_content_id,
             "article_title": article_title,
+            "llm_selected_words": selected_words,
             "llm_selected_word": selected_word,
             "llm_reason": pick["reason"],
+            "wxindex_searches": wxindex_searches,
             "wxindex": {
                 "keyword": selected_word,
+                "keywords": selected_words,
                 "start_ymd": start_ymd,
                 "end_ymd": end_ymd,
                 "total_score_7d": series,
                 "latest_total_score": latest_score,
                 "threshold": threshold,
-                "latest_gt_threshold": (
-                    False
-                    if latest_score is None
-                    else latest_score >= threshold
-                ),
-                "trend": calc_wxindex_trend(series),
+                "latest_gt_threshold": latest_score >= threshold,
+                "trend": best["trend"],
             },
         }
 
-    def llm_pick_best_word(
+    def llm_extract_wxindex_words(
         self,
         *,
         channel_content_id: str,
         article_title: str,
         body_text: str,
-        candidate_words: list[str],
-    ) -> dict[str, str]:
+        contribution_words: list[str],
+        matched_demands: list[str],
+    ) -> dict[str, Any]:
         system_prompt = """
         #角色
-        你是一个专业的语义分析专家,擅长精准概括整篇文章。
+        你是一个专业的语义分析专家,擅长从文章中提取简洁、精准的热搜检索词
         # 任务
-        我会提供一篇文章的标题、正文和候选词列表,请你选择一个最能代表文章内容的词。
+        我会提供文章标题、正文,以及两类备选词来源:
+        1. 高贡献词:文章贡献度较高的关键词
+        2. 已匹配需求:已与需求库匹配上的需求名
+        请结合标题、正文与上述备选词,提取用于「微信指数」热度检索的词。
+        需自行从标题中识别可检索的关键词;词应简洁(2-4 字)、概括、精准覆盖事件。
+        若文章涉及多个子事件,可分别提取多个词,每个词覆盖部分事件。
         # 输出规则
         1. 严格输出 JSON 对象,禁止输出 JSON 之外的任何内容。
         """
@@ -564,15 +620,21 @@ class ContributionPostprocessService:
             "source": channel_content_id,
             "article_title": article_title,
             "article_body_text": body_text,
-            "candidate_words": candidate_words,
+            "contribution_words": contribution_words,
+            "matched_demands": matched_demands,
             "output_schema": {
                 "source": "string",
-                "selected_word": "string, must be selected from candidate_words",
+                "selected_words": [
+                    "string, concise keyword for wxindex search, one or more"
+                ],
                 "reason": "string",
             },
             "constraints": [
-                "selected_word 必须来自 candidate_words",
-                "reason 简洁说明,不超过40字",
+                "selected_words 为数组,至少 1 个词,可多个",
+                "每个词简洁(2-4 字),适合微信指数检索",
+                "结合标题、高贡献词、已匹配需求提炼,可合并改写,不必逐字照搬",
+                "多个词应分别覆盖不同事件或角度,避免语义重复",
+                "reason 简洁说明,不超过60字",
                 "仅输出 JSON 对象,不要 markdown 代码块",
             ],
         }
@@ -593,20 +655,35 @@ class ContributionPostprocessService:
                     max_tokens=max(self.config.wxindex_llm_max_tokens, 1),
                 )
                 parsed = _extract_json_object(str(resp.get("content") or ""))
-                selected_word = str(parsed.get("selected_word") or "").strip()
+                raw_words = parsed.get("selected_words")
+                if isinstance(raw_words, str):
+                    raw_words = [raw_words]
+                if not isinstance(raw_words, list):
+                    legacy_word = str(parsed.get("selected_word") or "").strip()
+                    raw_words = [legacy_word] if legacy_word else []
+
+                selected_words: list[str] = []
+                seen: set[str] = set()
+                for item in raw_words:
+                    word = str(item or "").strip()
+                    if word and word not in seen:
+                        seen.add(word)
+                        selected_words.append(word)
+
                 reason = str(parsed.get("reason") or "").strip()
-                if selected_word not in candidate_words:
+                if not selected_words:
                     raise WxindexSelectionSkipped(
-                        f"selected_word not in candidates for {channel_content_id}: "
-                        f"{selected_word}"
+                        f"selected_words empty for {channel_content_id}"
                     )
-                return {"selected_word": selected_word, "reason": reason}
+                return {"selected_words": selected_words, "reason": reason}
+            except WxindexSelectionSkipped:
+                raise
             except (OpenRouterCallError, HotContentFlowError) as exc:
                 last_error = exc
                 if attempt < max(self.config.wxindex_llm_max_attempts, 1):
                     continue
         raise HotContentFlowError(
-            f"llm pick word failed for {channel_content_id}: {last_error}"
+            f"llm extract wxindex words failed for {channel_content_id}: {last_error}"
         ) from last_error
 
     @staticmethod

+ 104 - 12
app/hot_content/repository.py

@@ -480,12 +480,13 @@ class HotContentRepository:
                 matched_demand,
                 contribution_score,
                 wxindex_keyword,
+                all_hot_keywords,
                 wxindex_latest_score,
                 wxindex_trend,
                 created_at,
                 updated_at
             )
-            VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, NOW(), NOW())
+            VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, NOW(), NOW())
         """
         with self.conn.cursor() as cursor:
             cursor.execute(delete_sql, (record_id,))
@@ -501,6 +502,7 @@ class HotContentRepository:
                     str(item.get("matched_demand") or ""),
                     item.get("contribution_score"),
                     str(item.get("wxindex_keyword") or ""),
+                    str(item.get("all_hot_keywords") or ""),
                     float(item.get("wxindex_latest_score") or 0),
                     str(item.get("wxindex_trend") or ""),
                 )
@@ -512,7 +514,7 @@ class HotContentRepository:
                 cursor.executemany(insert_sql, insert_rows)
 
     def list_demand_export_groups(self) -> list[dict[str, Any]]:
-        """读取当天更新的导出分组,仅供 ODPS 日分区同步使用。"""
+        """读取主表当天创建的 record 对应导出分组,仅供 ODPS 当天分区同步(不跨天)。"""
         self._ensure_demand_export_table()
         today_start = datetime.now(SHANGHAI_TZ).replace(
             hour=0,
@@ -524,16 +526,17 @@ class HotContentRepository:
         today_end = today_start + timedelta(days=1)
         sql = """
             SELECT
-                record_id,
-                item_type,
-                item_text,
-                point_category,
-                matched_demand,
-                wxindex_latest_score
-            FROM hot_content_demand_exports
-            WHERE updated_at >= %s
-              AND updated_at < %s
-            ORDER BY record_id ASC, id ASC
+                e.record_id,
+                e.item_type,
+                e.item_text,
+                e.point_category,
+                e.matched_demand,
+                e.wxindex_latest_score
+            FROM hot_content_demand_exports e
+            INNER JOIN hot_content_records r ON r.id = e.record_id
+            WHERE r.created_at >= %s
+              AND r.created_at < %s
+            ORDER BY e.record_id ASC, e.id ASC
         """
         with self.conn.cursor() as cursor:
             cursor.execute(sql, (today_start, today_end))
@@ -570,6 +573,7 @@ class HotContentRepository:
                 matched_demand VARCHAR(1024) NOT NULL DEFAULT '' COMMENT '匹配到的需求',
                 contribution_score DOUBLE NULL COMMENT '贡献分,仅元素有值',
                 wxindex_keyword VARCHAR(1024) NOT NULL DEFAULT '' COMMENT '获取微信指数的元素',
+                all_hot_keywords VARCHAR(1024) NOT NULL DEFAULT '' COMMENT '全部热点词',
                 wxindex_latest_score DOUBLE NOT NULL DEFAULT 0,
                 wxindex_trend VARCHAR(32) NOT NULL DEFAULT '' COMMENT '微信指数趋势',
                 created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
@@ -631,6 +635,16 @@ class HotContentRepository:
                 AFTER contribution_score
                 """,
             )
+            self._ensure_demand_export_column(
+                cursor,
+                "all_hot_keywords",
+                """
+                ALTER TABLE hot_content_demand_exports
+                ADD COLUMN all_hot_keywords VARCHAR(1024) NOT NULL DEFAULT ''
+                COMMENT '全部热点词'
+                AFTER wxindex_keyword
+                """,
+            )
 
     def _ensure_demand_export_column(
         self,
@@ -650,3 +664,81 @@ class HotContentRepository:
         )
         if int((cursor.fetchone() or {}).get("cnt") or 0) == 0:
             cursor.execute(alter_sql)
+
+    def list_synced_odps_demand_ids(
+        self,
+        *,
+        partition_dt: str,
+        strategy: str,
+    ) -> set[str]:
+        self._ensure_odps_sync_log_table()
+        sql = """
+            SELECT demand_id
+            FROM hot_content_odps_sync_log
+            WHERE partition_dt = %s
+              AND strategy = %s
+        """
+        with self.conn.cursor() as cursor:
+            cursor.execute(sql, (partition_dt, strategy))
+            rows = cursor.fetchall()
+        return {
+            str(row.get("demand_id") or "").strip()
+            for row in rows
+            if str(row.get("demand_id") or "").strip()
+        }
+
+    def save_odps_sync_logs(self, rows: list[dict[str, Any]]) -> int:
+        if not rows:
+            return 0
+        self._ensure_odps_sync_log_table()
+        sql = """
+            INSERT INTO hot_content_odps_sync_log (
+                partition_dt,
+                strategy,
+                demand_id,
+                demand_name,
+                demand_type,
+                record_id
+            )
+            VALUES (%s, %s, %s, %s, %s, %s)
+            ON DUPLICATE KEY UPDATE
+                demand_name = VALUES(demand_name),
+                demand_type = VALUES(demand_type),
+                record_id = VALUES(record_id),
+                synced_at = CURRENT_TIMESTAMP
+        """
+        insert_rows = [
+            (
+                str(item.get("partition_dt") or ""),
+                str(item.get("strategy") or ""),
+                str(item.get("demand_id") or ""),
+                str(item.get("demand_name") or ""),
+                str(item.get("demand_type") or ""),
+                int(item.get("record_id") or 0),
+            )
+            for item in rows
+            if str(item.get("demand_id") or "").strip()
+        ]
+        with self.conn.cursor() as cursor:
+            cursor.executemany(sql, insert_rows)
+        return len(insert_rows)
+
+    def _ensure_odps_sync_log_table(self) -> None:
+        sql = """
+            CREATE TABLE IF NOT EXISTS hot_content_odps_sync_log (
+                id BIGINT UNSIGNED NOT NULL AUTO_INCREMENT,
+                partition_dt VARCHAR(8) NOT NULL COMMENT 'ODPS 分区 dt',
+                strategy VARCHAR(128) NOT NULL COMMENT '需求 strategy',
+                demand_id CHAR(32) NOT NULL COMMENT 'ODPS demand_id',
+                demand_name VARCHAR(1024) NOT NULL DEFAULT '' COMMENT '需求名',
+                demand_type VARCHAR(32) NOT NULL DEFAULT '' COMMENT '特征点/短语',
+                record_id BIGINT UNSIGNED NOT NULL DEFAULT 0 COMMENT '来源 hot_content_records.id',
+                synced_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '写入 ODPS 时间',
+                PRIMARY KEY (id),
+                UNIQUE KEY uk_odps_sync (partition_dt, strategy, demand_id),
+                KEY idx_record_partition (record_id, partition_dt),
+                KEY idx_partition_strategy (partition_dt, strategy)
+            ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci
+        """
+        with self.conn.cursor() as cursor:
+            cursor.execute(sql)

+ 14 - 8
app/scheduler.py

@@ -39,16 +39,18 @@ def run_hot_content_job(config: FlowConfig) -> None:
 
 def run_decode_result_job(config: FlowConfig) -> None:
     """解构结果拉取 -> 后处理 -> 写入 ODPS 需求表。"""
+    summary: dict[str, Any] = {}
     try:
-        summary: dict[str, Any] = {"decode_result": run_decode_result_once(config)}
-        try:
-            summary["postprocess"] = run_postprocess_once(config)
-        except Exception as exc:
-            summary["postprocess_error"] = str(exc)
-            print(f"postprocess flow failed: {exc}", file=sys.stderr)
-        print(json.dumps(summary, ensure_ascii=False, indent=2))
+        summary["decode_result"] = run_decode_result_once(config)
     except Exception as exc:
+        summary["decode_result_error"] = str(exc)
         print(f"decode result flow failed: {exc}", file=sys.stderr)
+    try:
+        summary["postprocess"] = run_postprocess_once(config)
+    except Exception as exc:
+        summary["postprocess_error"] = str(exc)
+        print(f"postprocess flow failed: {exc}", file=sys.stderr)
+    print(json.dumps(summary, ensure_ascii=False, indent=2))
 
 
 def run_postprocess_job(config: FlowConfig) -> None:
@@ -132,7 +134,11 @@ def main() -> None:
                 )
             )
         if args.job in {"all", "decode-result"}:
-            summary = {"decode_result": run_decode_result_once(config)}
+            summary: dict[str, Any] = {}
+            try:
+                summary["decode_result"] = run_decode_result_once(config)
+            except Exception as exc:
+                summary["decode_result_error"] = str(exc)
             try:
                 summary["postprocess"] = run_postprocess_once(config)
             except Exception as exc: