|
|
@@ -27,12 +27,36 @@ def _escape_sql_string(value: str) -> str:
|
|
|
return value.replace("'", "''")
|
|
|
|
|
|
|
|
|
+def apply_odps_daily_write_limit(
|
|
|
+ pending_rows: list[dict[str, Any]],
|
|
|
+ *,
|
|
|
+ existing_count: int,
|
|
|
+ daily_limit: int,
|
|
|
+) -> tuple[list[dict[str, Any]], list[dict[str, Any]], dict[str, Any]]:
|
|
|
+ """按每日上限截断待写入行。daily_limit <= 0 表示不限制。"""
|
|
|
+ limit_meta: dict[str, Any] = {
|
|
|
+ "daily_write_limit": daily_limit if daily_limit > 0 else None,
|
|
|
+ "daily_written_count": existing_count,
|
|
|
+ }
|
|
|
+ if daily_limit <= 0:
|
|
|
+ limit_meta["daily_remaining_quota"] = None
|
|
|
+ return pending_rows, [], limit_meta
|
|
|
+
|
|
|
+ remaining_quota = daily_limit - existing_count
|
|
|
+ limit_meta["daily_remaining_quota"] = max(remaining_quota, 0)
|
|
|
+ if remaining_quota <= 0:
|
|
|
+ return [], list(pending_rows), limit_meta
|
|
|
+ if len(pending_rows) <= remaining_quota:
|
|
|
+ return pending_rows, [], limit_meta
|
|
|
+ return pending_rows[:remaining_quota], pending_rows[remaining_quota:], limit_meta
|
|
|
+
|
|
|
+
|
|
|
class HotDemandPoolWriter:
|
|
|
def __init__(self, config: FlowConfig, repository: HotContentRepository):
|
|
|
self.config = config
|
|
|
self.repository = repository
|
|
|
|
|
|
- def sync_today(self) -> dict[str, Any]:
|
|
|
+ def plan_today(self) -> dict[str, Any]:
|
|
|
partition_dt = datetime.now(SHANGHAI_TZ).date().strftime("%Y%m%d")
|
|
|
strategy = self.config.hot_demand_pool_strategy
|
|
|
# 仅同步主表 hot_content_records.created_at 为当天的 record,写入当天 ODPS 分区。
|
|
|
@@ -52,6 +76,7 @@ class HotDemandPoolWriter:
|
|
|
strategy=strategy,
|
|
|
)
|
|
|
skip_demand_ids = synced_demand_ids | odps_existing_demand_ids
|
|
|
+ daily_written_count = len(skip_demand_ids)
|
|
|
|
|
|
pending_rows: list[dict[str, Any]] = []
|
|
|
skipped_rows: list[dict[str, Any]] = []
|
|
|
@@ -62,50 +87,78 @@ class HotDemandPoolWriter:
|
|
|
continue
|
|
|
pending_rows.append(row)
|
|
|
|
|
|
+ rows_to_write, limit_skipped_rows, limit_meta = apply_odps_daily_write_limit(
|
|
|
+ pending_rows,
|
|
|
+ existing_count=daily_written_count,
|
|
|
+ daily_limit=self.config.odps_daily_write_limit,
|
|
|
+ )
|
|
|
+
|
|
|
+ return {
|
|
|
+ "partition_dt": partition_dt,
|
|
|
+ "strategy": strategy,
|
|
|
+ "source_record_count": len(export_groups),
|
|
|
+ "candidate_row_count": len(hive_rows),
|
|
|
+ "pending_row_count": len(rows_to_write),
|
|
|
+ "skipped_row_count": len(skipped_rows),
|
|
|
+ "limit_skipped_row_count": len(limit_skipped_rows),
|
|
|
+ "rows_to_write": rows_to_write,
|
|
|
+ "skipped_rows": skipped_rows,
|
|
|
+ "limit_skipped_rows": limit_skipped_rows,
|
|
|
+ "target_table": self.config.demand_pool_source_table,
|
|
|
+ **limit_meta,
|
|
|
+ }
|
|
|
+
|
|
|
+ def sync_today(self) -> dict[str, Any]:
|
|
|
+ plan = self.plan_today()
|
|
|
+ rows_to_write = plan["rows_to_write"]
|
|
|
written_count = self._insert_partition_rows(
|
|
|
- hive_rows=pending_rows,
|
|
|
- partition_dt=partition_dt,
|
|
|
+ hive_rows=rows_to_write,
|
|
|
+ partition_dt=str(plan["partition_dt"]),
|
|
|
)
|
|
|
if written_count:
|
|
|
self.repository.save_odps_sync_logs(
|
|
|
[
|
|
|
{
|
|
|
- "partition_dt": partition_dt,
|
|
|
- "strategy": strategy,
|
|
|
+ "partition_dt": plan["partition_dt"],
|
|
|
+ "strategy": plan["strategy"],
|
|
|
"demand_id": row["demand_id"],
|
|
|
"demand_name": row["demand_name"],
|
|
|
"demand_type": row["type"],
|
|
|
"record_id": row.get("record_id") or 0,
|
|
|
}
|
|
|
- for row in pending_rows
|
|
|
+ for row in rows_to_write
|
|
|
]
|
|
|
)
|
|
|
|
|
|
pending_record_ids = sorted(
|
|
|
{
|
|
|
int(row.get("record_id") or 0)
|
|
|
- for row in pending_rows
|
|
|
+ for row in rows_to_write
|
|
|
if int(row.get("record_id") or 0) > 0
|
|
|
}
|
|
|
)
|
|
|
skipped_record_ids = sorted(
|
|
|
{
|
|
|
int(row.get("record_id") or 0)
|
|
|
- for row in skipped_rows
|
|
|
+ for row in plan["skipped_rows"] + plan["limit_skipped_rows"]
|
|
|
if int(row.get("record_id") or 0) > 0
|
|
|
}
|
|
|
)
|
|
|
return {
|
|
|
- "partition_dt": partition_dt,
|
|
|
- "strategy": strategy,
|
|
|
- "source_record_count": len(export_groups),
|
|
|
- "candidate_row_count": len(hive_rows),
|
|
|
- "pending_row_count": len(pending_rows),
|
|
|
- "skipped_row_count": len(skipped_rows),
|
|
|
+ "partition_dt": plan["partition_dt"],
|
|
|
+ "strategy": plan["strategy"],
|
|
|
+ "source_record_count": plan["source_record_count"],
|
|
|
+ "candidate_row_count": plan["candidate_row_count"],
|
|
|
+ "pending_row_count": plan["pending_row_count"],
|
|
|
+ "skipped_row_count": plan["skipped_row_count"],
|
|
|
+ "limit_skipped_row_count": plan["limit_skipped_row_count"],
|
|
|
+ "daily_write_limit": plan["daily_write_limit"],
|
|
|
+ "daily_written_count": plan["daily_written_count"],
|
|
|
+ "daily_remaining_quota": plan["daily_remaining_quota"],
|
|
|
"written_count": written_count,
|
|
|
"pending_record_ids": pending_record_ids,
|
|
|
"skipped_record_ids": skipped_record_ids,
|
|
|
- "target_table": self.config.demand_pool_source_table,
|
|
|
+ "target_table": plan["target_table"],
|
|
|
}
|
|
|
|
|
|
def _list_odps_partition_demand_ids(
|