Explorar el Código

修改hive字段名称

xueyiming hace 1 semana
padre
commit
e4f5546180
Se han modificado 2 ficheros con 12 adiciones y 3 borrados
  1. 1 1
      README.md
  2. 11 2
      app/sync/demand_pool_sync.py

+ 1 - 1
README.md

@@ -63,7 +63,7 @@ npm run dev
 ## 需求池同步任务
 
 - 同步逻辑文件:`app/sync/demand_pool_sync.py`
-- 数据源 1:`dwd_multi_demand_pool_di`(原始字段直映射)
+- 数据源 1:`dwd_multi_demand_pool_di`(`extend` 写入 MySQL `ext_info`,其余字段直映射)
 - 数据源 2:`dwd_demand_pool_di`(映射规则:`strategy=近期需求`,`demand_id=MD5(strategy+demand+dt)`,`demand_name=demand`,`weight=score`,`video_count/video_list` 置空,`ext_info={}`)
 - 全量任务:`run_full_sync()`,默认同步分区 `20260507,20260508,20260509`
 - 增量任务:`run_today_incremental_sync()`,每小时同步当天分区

+ 11 - 2
app/sync/demand_pool_sync.py

@@ -34,6 +34,15 @@ def _serialize_video_list(value: object) -> str | None:
     return str(value)
 
 
+def _serialize_extend(value: object) -> str | None:
+    if value is None:
+        return None
+    if isinstance(value, (dict, list)):
+        return json.dumps(value, ensure_ascii=False)
+    raw = str(value).strip()
+    return raw or None
+
+
 def _normalize_secondary_weight(value: object) -> float | None:
     if value is None:
         return None
@@ -82,7 +91,7 @@ def _fetch_partition_rows_from_primary_source(partition_dt: str) -> list[dict[st
       `type`,
       video_count,
       video_list,
-      ext_info
+      `extend`
     FROM {source_table}
     WHERE dt = '{partition_dt}'
     """
@@ -104,7 +113,7 @@ def _fetch_partition_rows_from_primary_source(partition_dt: str) -> list[dict[st
                 "demand_type": record["type"],
                 "video_count": record["video_count"],
                 "video_list": _serialize_video_list(record["video_list"]),
-                "ext_info": record["ext_info"],
+                "ext_info": _serialize_extend(record["extend"]),
                 "dt": partition_dt,
             }