| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155 |
- """近期热点需求写入 Hive 需求池表。"""
- from __future__ import annotations
- import re
- from datetime import datetime
- from typing import Any
- from app.aliyun_odps.client import get_odps_client
- from app.hot_content.demand_hive_export import build_hive_rows_from_export_groups
- from app.hot_content.exceptions import HotContentFlowError
- from app.hot_content.repository import HotContentRepository
- from app.hot_content.timezone import SHANGHAI_TZ
- from app.hot_content.types import FlowConfig
- IDENTIFIER_RE = re.compile(r"^[A-Za-z_][A-Za-z0-9_]*(?:\.[A-Za-z_][A-Za-z0-9_]*)?$")
- HIVE_COLUMNS = (
- "strategy",
- "demand_id",
- "demand_name",
- "weight",
- "type",
- "video_count",
- "video_list",
- "extend",
- )
- def _safe_identifier(name: str) -> str:
- value = name.strip()
- if not IDENTIFIER_RE.match(value):
- raise HotContentFlowError(f"invalid sql identifier: {name}")
- return value
- def _escape_sql_string(value: str) -> str:
- return value.replace("'", "''")
- class HotDemandPoolWriter:
- def __init__(self, config: FlowConfig, repository: HotContentRepository):
- self.config = config
- self.repository = repository
- def sync_today(self) -> dict[str, Any]:
- partition_dt = datetime.now(SHANGHAI_TZ).date().strftime("%Y%m%d")
- export_groups = self.repository.list_demand_export_groups()
- hive_rows = build_hive_rows_from_export_groups(
- export_groups,
- strategy=self.config.hot_demand_pool_strategy,
- partition_dt=partition_dt,
- wxindex_threshold=self.config.wxindex_score_threshold,
- )
- written_count = self._write_partition(
- hive_rows=hive_rows,
- partition_dt=partition_dt,
- strategy=self.config.hot_demand_pool_strategy,
- )
- return {
- "partition_dt": partition_dt,
- "strategy": self.config.hot_demand_pool_strategy,
- "source_record_count": len(export_groups),
- "hive_row_count": len(hive_rows),
- "written_count": written_count,
- "target_table": self.config.demand_pool_source_table,
- }
- def _write_partition(
- self,
- *,
- hive_rows: list[dict[str, Any]],
- partition_dt: str,
- strategy: str,
- ) -> int:
- table_name = _safe_identifier(self.config.demand_pool_source_table)
- odps_client = get_odps_client()
- table = odps_client.get_table(table_name)
- partition_spec = f"dt={partition_dt}"
- preserved_rows = self._read_preserved_rows(
- table=table,
- partition_spec=partition_spec,
- strategy=strategy,
- )
- payload_rows = preserved_rows + [
- self._to_write_row(row) for row in hive_rows
- ]
- if not payload_rows and table.exist_partition(partition_spec):
- odps_client.write_table(
- table_name,
- [],
- partition=partition_spec,
- create_partition=True,
- overwrite=True,
- )
- return 0
- odps_client.write_table(
- table_name,
- payload_rows,
- partition=partition_spec,
- create_partition=True,
- overwrite=True,
- )
- return len(hive_rows)
- @staticmethod
- def _read_preserved_rows(
- *,
- table: Any,
- partition_spec: str,
- strategy: str,
- ) -> list[list[Any]]:
- if not table.exist_partition(partition_spec):
- return []
- preserved_rows: list[list[Any]] = []
- with table.open_reader(partition=partition_spec) as reader:
- for record in reader:
- if str(record["strategy"] or "") == strategy:
- continue
- preserved_rows.append(
- [
- record["strategy"],
- record["demand_id"],
- record["demand_name"],
- record["weight"],
- record["type"],
- record["video_count"],
- record["video_list"],
- record["extend"],
- ]
- )
- return preserved_rows
- @staticmethod
- def _to_write_row(row: dict[str, Any]) -> list[Any]:
- return [
- row["strategy"],
- row["demand_id"],
- row["demand_name"],
- float(row["weight"]),
- row["type"],
- row["video_count"],
- row["video_list"],
- row["extend"],
- ]
- def sync_hot_demands_to_hive(
- config: FlowConfig,
- repository: HotContentRepository,
- ) -> dict[str, Any]:
- writer = HotDemandPoolWriter(config, repository)
- return writer.sync_today()
|