| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081 |
- import hashlib
- from decimal import Decimal, ROUND_HALF_UP
- from typing import Any
- from app.core.config import settings
- from app.strategies.batch_date import today_yyyymmdd
- from app.strategies.base import (
- BaseStrategy,
- DemandCandidate,
- GenerateContext,
- )
- from app.strategies.sources.supply_demand_content import fetch_demand_content_by_dt
- from app.strategies.staging_store import insert_staging_rows_skip_duplicates
- def build_supply_demand_id(*, strategy_name: str, demand_name: str, dt: str) -> str:
- raw = f"{strategy_name}{demand_name.strip()}{dt.strip()}"
- return hashlib.md5(raw.encode("utf-8")).hexdigest()
- def round_supply_score(value: object) -> float | None:
- if value is None:
- return None
- decimal_value = Decimal(str(value)).quantize(
- Decimal("0.0001"),
- rounding=ROUND_HALF_UP,
- )
- return float(decimal_value)
- class SupplyGapStrategyBase(BaseStrategy):
- """当下供需 gap 系列:从 demand_content 同步,逐行插入并跳过重复 demand_id。"""
- def validate_config(self, config: dict[str, Any]) -> bool:
- if not settings.supply_mysql_configured:
- return False
- return isinstance(config, dict)
- def build_demand_name(self, row: dict[str, Any]) -> str:
- raise NotImplementedError
- def generate(self, context: GenerateContext) -> list[DemandCandidate]:
- dt = today_yyyymmdd()
- rows = fetch_demand_content_by_dt(dt)
- candidates: list[DemandCandidate] = []
- for row in rows:
- row_dt = str(row.get("dt") or dt).strip()
- demand_name = self.build_demand_name(row).strip()
- if not demand_name or not row_dt:
- continue
- demand_type = row.get("demand_type")
- parsed_type = str(demand_type).strip() if demand_type is not None else None
- candidates.append(
- DemandCandidate(
- content=demand_name,
- priority_score=round_supply_score(row.get("score")),
- demand_id=build_supply_demand_id(
- strategy_name=self.name,
- demand_name=demand_name,
- dt=row_dt,
- ),
- demand_type=parsed_type,
- extra={"batch_date": row_dt},
- )
- )
- return candidates
- def write_staging(
- self,
- *,
- context: GenerateContext,
- candidates: list[DemandCandidate],
- ) -> dict[str, Any]:
- return insert_staging_rows_skip_duplicates(
- strategy_config_id=self.strategy_id,
- strategy_name=self.name,
- candidates=candidates,
- )
|