Преглед на файлове

新热事件增加extend

xueyiming преди 6 дни
родител
ревизия
42fd024c3c
променени са 3 файла, в които са добавени 41 реда и са изтрити 3 реда
  1. 2 0
      app/hot_content/demand_pool_writer.py
  2. 27 2
      app/hot_content/repository.py
  3. 12 1
      app/hot_content/wxindex_heat_pattern.py

+ 2 - 0
app/hot_content/demand_pool_writer.py

@@ -185,6 +185,7 @@ class HotDemandPoolWriter:
                         "demand_type": row["type"],
                         "record_id": row.get("record_id") or 0,
                         "weight": row.get("weight"),
+                        "extend": row.get("extend"),
                     }
                     for row in rows_to_write
                 ]
@@ -349,6 +350,7 @@ def sync_wxindex_word_rows_to_odps(
                     "demand_type": row["type"],
                     "record_id": row.get("record_id") or 0,
                     "weight": row.get("weight"),
+                    "extend": row.get("extend"),
                 }
                 for row in rows_to_write
             ]

+ 27 - 2
app/hot_content/repository.py

@@ -1257,14 +1257,16 @@ class HotContentRepository:
                 demand_name,
                 demand_type,
                 record_id,
-                weight
+                weight,
+                extend
             )
-            VALUES (%s, %s, %s, %s, %s, %s, %s)
+            VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
             ON DUPLICATE KEY UPDATE
                 demand_name = VALUES(demand_name),
                 demand_type = VALUES(demand_type),
                 record_id = VALUES(record_id),
                 weight = VALUES(weight),
+                extend = VALUES(extend),
                 synced_at = CURRENT_TIMESTAMP
         """
         insert_rows = [
@@ -1276,6 +1278,7 @@ class HotContentRepository:
                 str(item.get("demand_type") or ""),
                 int(item.get("record_id") or 0),
                 float(item["weight"]) if item.get("weight") is not None else None,
+                str(item.get("extend") or "{}"),
             )
             for item in rows
             if str(item.get("demand_id") or "").strip()
@@ -1304,6 +1307,26 @@ class HotContentRepository:
                 """
             )
 
+    def _ensure_odps_sync_log_extend_column(self, cursor: Any) -> None:
+        cursor.execute(
+            """
+            SELECT COUNT(*) AS cnt
+            FROM information_schema.COLUMNS
+            WHERE TABLE_SCHEMA = DATABASE()
+              AND TABLE_NAME = 'hot_content_odps_sync_log'
+              AND COLUMN_NAME = 'extend'
+            """,
+        )
+        if int((cursor.fetchone() or {}).get("cnt") or 0) == 0:
+            cursor.execute(
+                """
+                ALTER TABLE hot_content_odps_sync_log
+                    ADD COLUMN extend TEXT NULL DEFAULT NULL
+                        COMMENT 'ODPS extend 扩展字段 JSON'
+                        AFTER weight
+                """
+            )
+
     def list_wxindex_word_scores(self, name: str) -> list[dict[str, Any]]:
         word = str(name or "").strip()
         if not word:
@@ -2529,6 +2552,7 @@ class HotContentRepository:
                 demand_type VARCHAR(32) NOT NULL DEFAULT '' COMMENT '特征点/短语',
                 record_id BIGINT UNSIGNED NOT NULL DEFAULT 0 COMMENT '来源 hot_content_records.id',
                 weight DOUBLE NULL DEFAULT NULL COMMENT 'ODPS 需求权重(记录 wxindex 最高分 / 1000000)',
+                extend TEXT NULL DEFAULT NULL COMMENT 'ODPS extend 扩展字段 JSON',
                 synced_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '写入 ODPS 时间',
                 PRIMARY KEY (id),
                 UNIQUE KEY uk_odps_sync (partition_dt, strategy, demand_id),
@@ -2539,3 +2563,4 @@ class HotContentRepository:
         with self.conn.cursor() as cursor:
             cursor.execute(sql)
             self._ensure_odps_sync_log_weight_column(cursor)
+            self._ensure_odps_sync_log_extend_column(cursor)

+ 12 - 1
app/hot_content/wxindex_heat_pattern.py

@@ -473,6 +473,13 @@ def score_wxindex_word_senior_fit(
     return batch_results.get(target_word, _empty_senior_fit_result())
 
 
+def build_wxindex_odps_extend(retain_reason: str | None) -> str:
+    method = str(retain_reason or "").strip()
+    if not method:
+        return "{}"
+    return json.dumps({"method": method}, ensure_ascii=False)
+
+
 def build_wxindex_word_hive_row(
     *,
     wxindex_word_record_id: int,
@@ -480,6 +487,7 @@ def build_wxindex_word_hive_row(
     strategy: str,
     partition_dt: str,
     max_score: float | None,
+    retain_reason: str | None = None,
 ) -> dict[str, Any]:
     normalized_name = str(word or "").strip()
     weight = 0.0
@@ -498,7 +506,7 @@ def build_wxindex_word_hive_row(
         "type": TYPE_PHRASE,
         "video_count": None,
         "video_list": [],
-        "extend": "{}",
+        "extend": build_wxindex_odps_extend(retain_reason),
         "dt": partition_dt,
     }
 
@@ -510,6 +518,7 @@ def build_wxindex_word_odps_sync_row(
     strategy: str,
     partition_dt: str,
     max_score: float | None,
+    retain_reason: str | None = None,
 ) -> dict[str, Any]:
     normalized_name = str(word or "").strip()
     weight = None
@@ -527,6 +536,7 @@ def build_wxindex_word_odps_sync_row(
         "demand_type": TYPE_PHRASE,
         "record_id": wxindex_word_record_id,
         "weight": weight,
+        "extend": build_wxindex_odps_extend(retain_reason),
     }
 
 
@@ -1573,6 +1583,7 @@ def run_wxindex_heat_pattern_daily_job(
                     strategy=strategy,
                     partition_dt=analyze_ymd,
                     max_score=result.get("max_score"),
+                    retain_reason=item.get("retain_reason"),
                 )
             )