소스 검색

增加需求直接写入hive表

xueyiming 1 주 전
부모
커밋
5cc98a456f
2개의 변경된 파일137개의 추가작업 그리고 0개의 파일을 삭제
  1. 129 0
      examples/demand/data_query_tools.py
  2. 8 0
      examples/demand/run.py

+ 129 - 0
examples/demand/data_query_tools.py

@@ -1,3 +1,4 @@
+import hashlib
 from zoneinfo import ZoneInfo
 
 from odps import ODPS
@@ -50,6 +51,134 @@ def execute_odps_sql(sql) -> bool:
         return False
 
 
+_STRATEGY_GAP = "当下供需gap"
+_STRATEGY_GAP_FENCI = "当下供需gap-分词"
+_HIVE_TABLE = "loghubods.dwd_multi_demand_pool_di"
+_HIVE_DT_FMT = "%Y%m%d"  # 分区格式:yyyymmdd,如 20260519
+_CHINA_TZ = ZoneInfo("Asia/Shanghai")
+
+
+def _hive_partition_dt() -> str:
+    """中国时区(Asia/Shanghai)当天日期,格式 yyyymmdd。"""
+    return datetime.now(_CHINA_TZ).date().strftime(_HIVE_DT_FMT)
+
+
+def _escape_odps_string(value: object) -> str:
+    return str(value).replace("'", "''")
+
+
+def _format_odps_string_array(values: list) -> str:
+    if not values:
+        return "ARRAY()"
+    parts = [f"'{_escape_odps_string(v)}'" for v in values]
+    return f"ARRAY({','.join(parts)})"
+
+
+def _parse_ext_data(ext_data_raw: object) -> dict:
+    if isinstance(ext_data_raw, dict):
+        return ext_data_raw
+    if isinstance(ext_data_raw, str) and ext_data_raw.strip():
+        try:
+            return json.loads(ext_data_raw)
+        except json.JSONDecodeError:
+            return {}
+    return {}
+
+
+def _build_hive_select_part(
+        strategy: str,
+        demand_id: str,
+        demand_name: str,
+        weight: float,
+        type_str: str,
+        video_count: int,
+        video_ids: list[str],
+        extend_json: str,
+) -> str:
+    return (
+        "SELECT "
+        f"'{_escape_odps_string(strategy)}' AS strategy, "
+        f"'{_escape_odps_string(demand_id)}' AS demand_id, "
+        f"'{_escape_odps_string(demand_name)}' AS demand_name, "
+        f"{weight} AS weight, "
+        f"'{_escape_odps_string(type_str)}' AS `type`, "
+        f"{video_count} AS video_count, "
+        f"{_format_odps_string_array(video_ids)} AS video_list, "
+        f"'{_escape_odps_string(extend_json)}' AS extend"
+    )
+
+
+def _insert_hive_select_parts(select_parts: list[str], partition_dt: str) -> bool:
+    if not select_parts:
+        return True
+    union_sql = "\nUNION ALL\n".join(select_parts)
+    insert_sql = f"""
+INSERT INTO TABLE {_HIVE_TABLE}
+PARTITION (dt='{partition_dt}')
+(strategy, demand_id, demand_name, weight, `type`, video_count, video_list, extend)
+{union_sql}
+"""
+    return execute_odps_sql(insert_sql)
+
+
+def write_dwd_multi_demand_pool_di_to_hive(rows: list[dict]) -> int:
+    """
+    将行数据映射并写入 loghubods.dwd_multi_demand_pool_di(尽力插入,不校验结果)。
+
+    分区与 demand_id 的日期均为中国时区当天(yyyymmdd),不使用行内 dt 字段。
+    执行两次 INSERT(同表、同分区),策略不同:
+    1) 当下供需gap: demand_name=merge_leve2+' '+name, demand_id=md5(strategy+demand_name+dt)
+    2) 当下供需gap-分词: demand_name=name, demand_id=md5(strategy+name+dt)
+    """
+    if not rows:
+        return 0
+
+    china_today = _hive_partition_dt()
+    gap_parts: list[str] = []
+    fenci_parts: list[str] = []
+
+    for row in rows:
+        merge_leve2 = str(row.get("merge_leve2") or "").strip()
+        name = str(row.get("name") or "").strip()
+        if not merge_leve2 or not name:
+            continue
+
+        weight = round(float(row.get("score") or 0.0), 6)
+
+        ext_data = _parse_ext_data(row.get("ext_data"))
+        type_str = str(ext_data.get("type") or "").strip()
+        video_ids = ext_data.get("video_ids") or []
+        if not isinstance(video_ids, list):
+            video_ids = []
+        video_ids = [str(v).strip() for v in video_ids if v is not None and str(v).strip()]
+        video_count = len(video_ids)
+        extend_json = json.dumps({"品类": merge_leve2}, ensure_ascii=False)
+
+        demand_name_gap = f"{merge_leve2} {name}"
+        demand_id_gap = hashlib.md5(f"{_STRATEGY_GAP}{demand_name_gap}{china_today}".encode("utf-8")).hexdigest()
+        gap_parts.append(
+            _build_hive_select_part(
+                _STRATEGY_GAP, demand_id_gap, demand_name_gap,
+                weight, type_str, video_count, video_ids, extend_json,
+            )
+        )
+
+        demand_id_fenci = hashlib.md5(f"{_STRATEGY_GAP_FENCI}{name}{china_today}".encode("utf-8")).hexdigest()
+        fenci_parts.append(
+            _build_hive_select_part(
+                _STRATEGY_GAP_FENCI, demand_id_fenci, name,
+                weight, type_str, video_count, video_ids, extend_json,
+            )
+        )
+
+    if not gap_parts:
+        return 0
+
+    _insert_hive_select_parts(gap_parts, china_today)
+    _insert_hive_select_parts(fenci_parts, china_today)
+    return len(gap_parts) + len(fenci_parts)
+
+
 def write_feature_point_data_to_hive(names: list[str]) -> int:
     """
     将需求名称写入 Hive 表 feature_point_data(按北京时间当天分区)。

+ 8 - 0
examples/demand/run.py

@@ -319,6 +319,14 @@ def write_demand_items_to_mysql(execution_id: int, merge_level2: str) -> int:
 
     affected = mysql_db.insert_many("demand_content", rows)
     log(f"[mysql] 写入 demand_content 完成,rows={len(rows)}, affected={affected}")
+
+    try:
+        from examples.demand.data_query_tools import write_dwd_multi_demand_pool_di_to_hive
+
+        hive_written = write_dwd_multi_demand_pool_di_to_hive(rows=rows)
+        log(f"[hive] 写入 dwd_multi_demand_pool_di 完成,rows={hive_written}, dt={dt_value}")
+    except Exception as e:
+        log(f"[hive] 写入 dwd_multi_demand_pool_di 异常(MySQL 已成功):{e}")
     # with open(f'/Users/shimeng/Desktop/py/Agent/examples/demand/result/{merge_level2}.json', 'w',
     #           encoding='utf-8') as f:
     #     json.dump(rows, f, ensure_ascii=False, indent=4)