Browse Source

调整导出。修改prompt

xueyiming 3 tuần trước cách đây
mục cha
commit
ae5eb01865

+ 25 - 1
app/hot_content/demand_export.py

@@ -399,7 +399,11 @@ def build_demand_export_rows(
 def attach_wxindex_metadata(
     export_rows: list[dict[str, Any]],
     trend_json: dict[str, Any] | None,
+    *,
+    wxindex_threshold: float = WXINDEX_EXPORT_THRESHOLD,
 ) -> list[dict[str, Any]]:
+    from app.hot_content.demand_hive_export import is_export_row_as_demand
+
     latest_score = (
         get_latest_wxindex_score(trend_json)
         if isinstance(trend_json, dict)
@@ -408,11 +412,23 @@ def attach_wxindex_metadata(
     trend = get_wxindex_trend(trend_json) if isinstance(trend_json, dict) else ""
     wxindex_keyword = get_wxindex_keyword(trend_json)
     all_hot_keywords = format_wxindex_keywords(trend_json)
+    has_record_wxindex = latest_score is not None
+    record_wxindex_score = float(latest_score or 0)
+    # is_as_demand 与 ODPS 规则一致,需用落库后的 wxindex_latest_score 做标题门槛判断
+    gate_rows: list[dict[str, Any]] = []
+    for row in export_rows:
+        matched = str(row.get("matched_demand") or "").strip()
+        gate_row = dict(row)
+        if has_record_wxindex and _has_matched_demand_text(matched):
+            gate_row["wxindex_latest_score"] = record_wxindex_score
+        else:
+            gate_row["wxindex_latest_score"] = 0.0
+        gate_rows.append(gate_row)
+
     rows: list[dict[str, Any]] = []
     for row in export_rows:
         matched_demand = str(row.get("matched_demand") or "").strip()
         has_matched_demand = _has_matched_demand_text(matched_demand)
-        has_record_wxindex = latest_score is not None
 
         if has_record_wxindex and has_matched_demand:
             wxindex_score = float(latest_score)
@@ -433,6 +449,11 @@ def attach_wxindex_metadata(
                 "all_hot_keywords": all_hot_keywords,
                 "wxindex_latest_score": wxindex_score,
                 "wxindex_trend": wxindex_trend_value,
+                "is_as_demand": is_export_row_as_demand(
+                    normalized_row,
+                    gate_rows,
+                    wxindex_threshold=wxindex_threshold,
+                ),
             }
         )
     return rows
@@ -480,6 +501,7 @@ def export_existing_records(
     *,
     dry_run: bool,
     verbose: bool,
+    wxindex_threshold: float = WXINDEX_EXPORT_THRESHOLD,
 ) -> dict[str, int]:
     summary = {
         "scanned": 0,
@@ -516,6 +538,7 @@ def export_existing_records(
                 trend_json=trend_json if isinstance(trend_json, dict) else None,
             ),
             trend_json if isinstance(trend_json, dict) else None,
+            wxindex_threshold=wxindex_threshold,
         )
         if not export_rows:
             summary["no_export_rows"] += 1
@@ -593,6 +616,7 @@ def main(argv: list[str] | None = None) -> dict[str, int]:
             records,
             dry_run=args.dry_run,
             verbose=args.verbose,
+            wxindex_threshold=config.wxindex_score_threshold,
         )
     finally:
         repository.close()

+ 12 - 0
app/hot_content/demand_hive_export.py

@@ -68,6 +68,18 @@ def _has_matched_demand(row: dict[str, Any]) -> bool:
     return bool(str(row.get("matched_demand") or "").strip())
 
 
+def is_export_row_as_demand(
+    row: dict[str, Any],
+    export_rows: list[dict[str, Any]],
+    *,
+    wxindex_threshold: float,
+) -> int:
+    """是否与 ODPS 需求同步规则一致:标题保留且该行有匹配需求。返回 0/1。"""
+    if not _should_retain_title(export_rows, wxindex_threshold=wxindex_threshold):
+        return 0
+    return 1 if _has_matched_demand(row) else 0
+
+
 def build_hive_rows_for_record(
     export_rows: list[dict[str, Any]],
     *,

+ 37 - 4
app/hot_content/demand_pool_writer.py

@@ -27,13 +27,30 @@ def _escape_sql_string(value: str) -> str:
     return value.replace("'", "''")
 
 
+def _group_pending_rows_by_record(
+    pending_rows: list[dict[str, Any]],
+) -> list[list[dict[str, Any]]]:
+    groups: list[list[dict[str, Any]]] = []
+    for row in pending_rows:
+        record_id = int(row.get("record_id") or 0)
+        if groups and int(groups[-1][0].get("record_id") or 0) == record_id:
+            groups[-1].append(row)
+        else:
+            groups.append([row])
+    return groups
+
+
 def apply_odps_daily_write_limit(
     pending_rows: list[dict[str, Any]],
     *,
     existing_count: int,
     daily_limit: int,
 ) -> tuple[list[dict[str, Any]], list[dict[str, Any]], dict[str, Any]]:
-    """按每日上限截断待写入行。daily_limit <= 0 表示不限制。"""
+    """按每日上限截断待写入行,按标题(record_id)整批保留。
+
+    有剩余额度时,当前标题的全部 demand 行都会写入;若因此超过每日上限,仍写完该标题,
+    其后标题不再同步。daily_limit <= 0 表示不限制。
+    """
     limit_meta: dict[str, Any] = {
         "daily_write_limit": daily_limit if daily_limit > 0 else None,
         "daily_written_count": existing_count,
@@ -46,9 +63,25 @@ def apply_odps_daily_write_limit(
     limit_meta["daily_remaining_quota"] = max(remaining_quota, 0)
     if remaining_quota <= 0:
         return [], list(pending_rows), limit_meta
-    if len(pending_rows) <= remaining_quota:
-        return pending_rows, [], limit_meta
-    return pending_rows[:remaining_quota], pending_rows[remaining_quota:], limit_meta
+
+    record_groups = _group_pending_rows_by_record(pending_rows)
+    rows_to_write: list[dict[str, Any]] = []
+    limit_skipped: list[dict[str, Any]] = []
+
+    for index, record_rows in enumerate(record_groups):
+        if remaining_quota <= 0:
+            for rest_rows in record_groups[index:]:
+                limit_skipped.extend(rest_rows)
+            break
+
+        rows_to_write.extend(record_rows)
+        remaining_quota -= len(record_rows)
+        if remaining_quota < 0:
+            for rest_rows in record_groups[index + 1 :]:
+                limit_skipped.extend(rest_rows)
+            break
+
+    return rows_to_write, limit_skipped, limit_meta
 
 
 class HotDemandPoolWriter:

+ 30 - 12
app/hot_content/postprocess_service.py

@@ -295,6 +295,7 @@ class ContributionPostprocessService:
                 trend_json=trend_result if isinstance(trend_result, dict) else None,
             ),
             trend_result if isinstance(trend_result, dict) else None,
+            wxindex_threshold=self.config.wxindex_score_threshold,
         )
         self.repository.replace_demand_export_rows(
             record_id=int(record["id"]),
@@ -442,18 +443,35 @@ class ContributionPostprocessService:
             return {"source": channel_content_id, "matched": []}
 
         system_prompt = """
-        #角色
-        你是一个专业的语义匹配分析专家,擅长判断词语之间的语义关联性。
-        # 任务
-        我会提供两组数据:
-        1. 热点词列表:一组待匹配的热点词语
-        2. 需求词库:一组已有的需求词语
-        请你逐一分析每个热点词,判断它是否能与需求词库中的某个/某些需求词匹配上。
-        热点词要等于需求词,或者属于需求词,或者表达了相同含义。
-        # 输出规则
-        1. 严格输出 JSON 对象,禁止输出 JSON 之外的任何内容。
-        # 约束
-        1. 热点词和需求词必须来自给定词语,不能创造给定词语之外的词。
+你是一个专业的语义匹配分析专家,擅长判断词语之间的语义关联性。
+
+# 任务
+我会提供两组数据:
+1. 热点词列表:一组待匹配的热点词语
+2. 需求词库:一组已有的需求词语
+
+请你逐一分析每个热点词,判断它是否能与需求词库中的某个/某些需求词匹配上。
+
+# 匹配标准
+满足以下任意一条,则视为匹配成功:
+- 热点词与需求词含义相同或高度相近(如同义词、近义词)
+- 热点词是需求词的下位概念(热点词所指的事物属于需求词所描述的范畴)
+- 热点词与需求词在用户意图上高度一致
+
+以下情况,不得视为匹配:
+- 热点词仅与需求词中的某一个字/词相关,但未覆盖需求词的完整含义
+- 热点词与需求词只有表面字符重叠,语义方向不同
+- 热点词是需求词的上位概念(范围过宽,含义不够精确)
+- 两者只是同属某个大类,但具体含义差异明显
+
+# 多词组成的需求词处理规则
+若需求词由多个词语组成(如"XX类 YY问题"),热点词必须能够同时覆盖该需求词的所有关键语义成分,缺少任意一个关键成分则不视为匹配。
+
+# 输出规则
+严格输出 JSON 对象,禁止输出 JSON 之外的任何内容。
+
+# 约束
+热点词和需求词必须来自给定词语,不能创造给定词语之外的词。
         """
         user_payload = {
             "source": channel_content_id,

+ 14 - 1
app/hot_content/repository.py

@@ -483,10 +483,11 @@ class HotContentRepository:
                 all_hot_keywords,
                 wxindex_latest_score,
                 wxindex_trend,
+                is_as_demand,
                 created_at,
                 updated_at
             )
-            VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, NOW(), NOW())
+            VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, NOW(), NOW())
         """
         with self.conn.cursor() as cursor:
             cursor.execute(delete_sql, (record_id,))
@@ -505,6 +506,7 @@ class HotContentRepository:
                     str(item.get("all_hot_keywords") or ""),
                     float(item.get("wxindex_latest_score") or 0),
                     str(item.get("wxindex_trend") or ""),
+                    int(item.get("is_as_demand") or 0),
                 )
                 for item in rows
                 if str(item.get("item_type") or "").strip()
@@ -576,6 +578,7 @@ class HotContentRepository:
                 all_hot_keywords VARCHAR(1024) NOT NULL DEFAULT '' COMMENT '全部热点词',
                 wxindex_latest_score DOUBLE NOT NULL DEFAULT 0,
                 wxindex_trend VARCHAR(32) NOT NULL DEFAULT '' COMMENT '微信指数趋势',
+                is_as_demand TINYINT NOT NULL DEFAULT 0 COMMENT '是否作为需求:0否 1是',
                 created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
                 updated_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
                 PRIMARY KEY (id),
@@ -645,6 +648,16 @@ class HotContentRepository:
                 AFTER wxindex_keyword
                 """,
             )
+            self._ensure_demand_export_column(
+                cursor,
+                "is_as_demand",
+                """
+                ALTER TABLE hot_content_demand_exports
+                ADD COLUMN is_as_demand TINYINT NOT NULL DEFAULT 0
+                COMMENT '是否作为需求:0否 1是'
+                AFTER wxindex_trend
+                """,
+            )
 
     def _ensure_demand_export_column(
         self,