소스 검색

dwd_demand_pool_di 暂停同步

xueyiming 1 주 전
부모
커밋
6a595c8140
4개의 변경된 파일7개의 추가작업 그리고 3개의 파일을 삭제
  1. 1 0
      .env.example
  2. 1 1
      README.md
  3. 2 0
      app/core/config.py
  4. 3 2
      app/sync/demand_pool_sync.py

+ 1 - 0
.env.example

@@ -15,6 +15,7 @@ ODPS_PROJECT=
 ODPS_ENDPOINT=
 DEMAND_POOL_SOURCE_TABLE=dwd_multi_demand_pool_di
 DEMAND_POOL_SECONDARY_SOURCE_TABLE=dwd_demand_pool_di
+DEMAND_POOL_SECONDARY_SYNC_ENABLED=false
 DEMAND_POOL_SECONDARY_STRATEGY=近期需求
 DEMAND_POOL_SECONDARY_DEFAULT_EXT_INFO={}
 DEMAND_POOL_TARGET_TABLE=multi_demand_pool_di

+ 1 - 1
README.md

@@ -64,7 +64,7 @@ npm run dev
 
 - 同步逻辑文件:`app/sync/demand_pool_sync.py`
 - 数据源 1:`dwd_multi_demand_pool_di`(`extend` 写入 MySQL `ext_info`,其余字段直映射)
-- 数据源 2:`dwd_demand_pool_di`(映射规则:`strategy=近期需求`,`demand_id=MD5(strategy+demand+dt)`,`demand_name=demand`,`weight=score`,`video_count/video_list` 置空,`ext_info={}`)
+- 数据源 2:`dwd_demand_pool_di`(默认关闭,`DEMAND_POOL_SECONDARY_SYNC_ENABLED=true` 时启用;映射规则:`strategy=近期需求`,`demand_id=MD5(strategy+demand+dt)`,`demand_name=demand`,`weight=score`,`video_count/video_list` 置空,`ext_info={}`)
 - 全量任务:`run_full_sync()`,默认同步分区 `20260507,20260508,20260509`
 - 增量任务:`run_today_incremental_sync()`,每小时同步当天分区
 - 去重策略:ODPS 分区内先按 `demand_id` 去重,写入 MySQL 时使用唯一键 UPSERT

+ 2 - 0
app/core/config.py

@@ -21,6 +21,8 @@ class Settings(BaseSettings):
     odps_endpoint: str = "http://service.odps.aliyun.com/api"
     demand_pool_source_table: str = "dwd_multi_demand_pool_di"
     demand_pool_secondary_source_table: str = "dwd_demand_pool_di"
+    # 次源 dwd_demand_pool_di 同步开关;false 时仅同步主源 dwd_multi_demand_pool_di
+    demand_pool_secondary_sync_enabled: bool = False
     demand_pool_secondary_strategy: str = "近期需求"
     demand_pool_secondary_default_ext_info: str = "{}"
     demand_pool_target_table: str = "multi_demand_pool_di"

+ 3 - 2
app/sync/demand_pool_sync.py

@@ -279,8 +279,9 @@ def sync_partition(partition_dt: str) -> int:
     merged_rows: dict[str, dict[str, object]] = {}
     for row in _fetch_partition_rows_from_primary_source(partition_dt):
         merged_rows[str(row["demand_id"])] = row
-    for row in _fetch_partition_rows_from_secondary_source(partition_dt):
-        merged_rows[str(row["demand_id"])] = row
+    if settings.demand_pool_secondary_sync_enabled:
+        for row in _fetch_partition_rows_from_secondary_source(partition_dt):
+            merged_rows[str(row["demand_id"])] = row
 
     return _upsert_rows_by_demand_id(list(merged_rows.values()))