xueyiming преди 2 седмици
родител
ревизия
9ff80a5c18
променени са 3 файла, в които са добавени 78 реда и са изтрити 4 реда
  1. 2 0
      app/services/demand_pool_service.py
  2. 66 3
      app/sync/demand_pool_sync.py
  3. 10 1
      frontend/src/App.tsx

+ 2 - 0
app/services/demand_pool_service.py

@@ -73,6 +73,7 @@ def query_demand_pool_records(
         "strategy": "strategy",
         "demand_name": "demand_name",
         "weight": "weight",
+        "type": "`type`",
         "video_count": "video_count",
         "dt": "dt",
     }
@@ -100,6 +101,7 @@ def query_demand_pool_records(
             demand_id,
             demand_name,
             weight,
+            `type`,
             video_count,
             video_list,
             ext_info,

+ 66 - 3
app/sync/demand_pool_sync.py

@@ -14,6 +14,10 @@ from app.odps.client import get_odps_client
 IDENTIFIER_RE = re.compile(r"^[A-Za-z_][A-Za-z0-9_]*$")
 BATCH_SIZE = 500
 SHANGHAI_TZ = ZoneInfo("Asia/Shanghai")
+# 与 MySQL `multi_demand_pool_di`.`type` VARCHAR(32) 对齐
+_SECONDARY_TYPE_MAX_LEN = 32
+# 与 MySQL `multi_demand_pool_di`.`demand_name` VARCHAR(256) 对齐(次源为 merge_leve2:demand)
+_SECONDARY_DEMAND_NAME_MAX_LEN = 256
 
 
 def _safe_identifier(name: str) -> str:
@@ -40,6 +44,33 @@ def _normalize_secondary_weight(value: object) -> float | None:
     return float(decimal_value)
 
 
+def _type_from_extend(value: object) -> str | None:
+    """从 dwd_demand_pool_di.extend JSON 中解析 type 字段。"""
+    if value is None:
+        return None
+    if isinstance(value, dict):
+        parsed: object = value
+    else:
+        raw = str(value).strip()
+        if not raw:
+            return None
+        try:
+            parsed = json.loads(raw)
+        except json.JSONDecodeError:
+            return None
+    if not isinstance(parsed, dict):
+        return None
+    nested = parsed.get("type")
+    if nested is None:
+        return None
+    text_value = str(nested).strip()
+    if not text_value:
+        return None
+    if len(text_value) > _SECONDARY_TYPE_MAX_LEN:
+        return text_value[:_SECONDARY_TYPE_MAX_LEN]
+    return text_value
+
+
 def _fetch_partition_rows_from_primary_source(partition_dt: str) -> list[dict[str, object]]:
     source_table = _safe_identifier(settings.demand_pool_source_table)
     sql = f"""
@@ -48,6 +79,7 @@ def _fetch_partition_rows_from_primary_source(partition_dt: str) -> list[dict[st
       demand_id,
       demand_name,
       weight,
+      `type`,
       video_count,
       video_list,
       ext_info
@@ -69,6 +101,7 @@ def _fetch_partition_rows_from_primary_source(partition_dt: str) -> list[dict[st
                 "demand_id": demand_id,
                 "demand_name": record["demand_name"],
                 "weight": record["weight"],
+                "demand_type": record["type"],
                 "video_count": record["video_count"],
                 "video_list": _serialize_video_list(record["video_list"]),
                 "ext_info": record["ext_info"],
@@ -83,12 +116,29 @@ def _build_secondary_demand_id(demand_name: str, partition_dt: str) -> str:
     return hashlib.md5(raw_value.encode("utf-8")).hexdigest()
 
 
+def _secondary_demand_display_name(merge_leve2: object, demand: str) -> str:
+    """次源 demand_name:`merge_leve2:demand`;merge 为空则退化为仅 demand。"""
+    part = demand.strip()
+    if not part:
+        return ""
+    merge_s = str(merge_leve2 or "").strip()
+    if merge_s:
+        combined = f"{merge_s}:{part}"
+    else:
+        combined = part
+    if len(combined) > _SECONDARY_DEMAND_NAME_MAX_LEN:
+        return combined[:_SECONDARY_DEMAND_NAME_MAX_LEN]
+    return combined
+
+
 def _fetch_partition_rows_from_secondary_source(partition_dt: str) -> list[dict[str, object]]:
     source_table = _safe_identifier(settings.demand_pool_secondary_source_table)
     sql = f"""
     SELECT
+      `merge_leve2`,
       demand,
-      score
+      score,
+      `extend`
     FROM {source_table}
     WHERE dt = '{partition_dt}'
     """
@@ -98,7 +148,14 @@ def _fetch_partition_rows_from_secondary_source(partition_dt: str) -> list[dict[
     dedup_rows: dict[str, dict[str, object]] = {}
     with instance.open_reader(tunnel=True) as reader:
         for record in reader:
-            demand_name = str(record["demand"] or "").strip()
+            demand_raw = str(record["demand"] or "").strip()
+            if not demand_raw:
+                continue
+
+            demand_name = _secondary_demand_display_name(
+                record.get("merge_leve2"),
+                demand_raw,
+            )
             if not demand_name:
                 continue
 
@@ -108,6 +165,7 @@ def _fetch_partition_rows_from_secondary_source(partition_dt: str) -> list[dict[
                 "demand_id": demand_id,
                 "demand_name": demand_name,
                 "weight": _normalize_secondary_weight(record["score"]),
+                "demand_type": _type_from_extend(record["extend"]),
                 "video_count": None,
                 "video_list": None,
                 "ext_info": settings.demand_pool_secondary_default_ext_info,
@@ -125,8 +183,9 @@ def _ensure_target_table() -> None:
         id BIGINT AUTO_INCREMENT COMMENT '自增id' PRIMARY KEY,
         strategy VARCHAR(64) NULL COMMENT '策略',
         demand_id VARCHAR(64) NULL COMMENT '需求id',
-        demand_name VARCHAR(64) NULL COMMENT '需求',
+        demand_name VARCHAR(256) NULL COMMENT '需求',
         weight DOUBLE NULL COMMENT '权重',
+        `type` VARCHAR(32) NULL COMMENT '需求类型',
         video_count BIGINT NULL COMMENT '视频数量',
         video_list TEXT NULL COMMENT '视频列表',
         ext_info TEXT NULL COMMENT '扩展字段',
@@ -154,6 +213,7 @@ def _upsert_rows_by_demand_id(rows: list[dict[str, object]]) -> int:
             demand_id,
             demand_name,
             weight,
+            `type`,
             video_count,
             video_list,
             ext_info,
@@ -165,6 +225,7 @@ def _upsert_rows_by_demand_id(rows: list[dict[str, object]]) -> int:
             :demand_id,
             :demand_name,
             :weight,
+            :demand_type,
             :video_count,
             :video_list,
             :ext_info,
@@ -174,6 +235,7 @@ def _upsert_rows_by_demand_id(rows: list[dict[str, object]]) -> int:
             strategy = VALUES(strategy),
             demand_name = VALUES(demand_name),
             weight = VALUES(weight),
+            `type` = VALUES(`type`),
             video_count = VALUES(video_count),
             video_list = VALUES(video_list),
             ext_info = VALUES(ext_info),
@@ -183,6 +245,7 @@ def _upsert_rows_by_demand_id(rows: list[dict[str, object]]) -> int:
                     strategy <=> VALUES(strategy)
                     AND demand_name <=> VALUES(demand_name)
                     AND weight <=> VALUES(weight)
+                    AND `type` <=> VALUES(`type`)
                     AND video_count <=> VALUES(video_count)
                     AND video_list <=> VALUES(video_list)
                     AND ext_info <=> VALUES(ext_info)

+ 10 - 1
frontend/src/App.tsx

@@ -27,6 +27,7 @@ type DemandPoolItem = {
   id: number;
   strategy: string | null;
   demand_name: string | null;
+  type: string | null;
   weight: number | null;
   video_count: number | null;
   dt: string | null;
@@ -317,6 +318,14 @@ function DemandPoolPanel() {
         sorter: true,
         sortOrder: getSortOrderForColumn("demand_name"),
       },
+      {
+        title: "需求类型",
+        dataIndex: "type",
+        width: 120,
+        render: (v) => v ?? "-",
+        sorter: true,
+        sortOrder: getSortOrderForColumn("type"),
+      },
       {
         title: "权重",
         dataIndex: "weight",
@@ -474,7 +483,7 @@ function DemandPoolPanel() {
               columns={columns}
               dataSource={data.items}
               pagination={false}
-              scroll={{ x: 920 }}
+              scroll={{ x: 1060 }}
               rowClassName={(_, index) => (index % 2 === 0 ? "row-even" : "row-odd")}
               onChange={(_, __, sorter) => {
                 if (Array.isArray(sorter)) {