| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106 |
- from datetime import datetime
- from zoneinfo import ZoneInfo
- from app.services.demand_pool_strategy_daily_alert import run_daily_strategy_alert
- from app.services.strategy_generate_service import run_strategy_generation
- from app.sync.demand_pool_sync import run_full_sync, run_today_incremental_sync
- from app.sync.experiment_demand_pool_write import run_experiment_hourly_write
- from app.sync.substance_element_sync import sync_substance_elements
- from app.sync.vertical_category_sync import sync_vertical_categories
- def heartbeat_job() -> None:
- now = datetime.now(ZoneInfo("Asia/Shanghai")).isoformat()
- print(f"[scheduler] heartbeat at {now}")
- def demand_pool_full_sync_job() -> None:
- print("[scheduler] start full sync for demand pool")
- result = run_full_sync()
- print(f"[scheduler] full sync done: {result}")
- def demand_pool_today_incremental_sync_job() -> None:
- print("[scheduler] start incremental sync for demand pool")
- result = run_today_incremental_sync()
- print(f"[scheduler] incremental sync done: {result}")
- def demand_pool_daily_strategy_alert_job(partition_dt: str | None = None) -> None:
- print("[scheduler] start daily ODPS strategy alert for demand pool")
- try:
- result = run_daily_strategy_alert(partition_dt)
- print(f"[scheduler] daily strategy alert done: {result}")
- except Exception as exc:
- print(f"[scheduler] daily strategy alert failed: {exc}")
- raise
- def substance_element_daily_sync_job(partition_dt: str | None = None) -> None:
- print("[scheduler] start daily sync for substance elements")
- try:
- result = sync_substance_elements(partition_dt)
- print(
- "[scheduler] substance element sync done: "
- f"partition_dt={result['partition_dt']}, "
- f"base_fetched={result['base_fetched']}, "
- f"base_inserted={result['base_inserted']}, "
- f"effect_inserted={result['effect_inserted']}"
- )
- except Exception as exc:
- print(f"[scheduler] substance element sync failed: {exc}")
- raise
- def vertical_category_daily_sync_job(partition_dt: str | None = None) -> None:
- print("[scheduler] start daily sync for vertical categories")
- try:
- result = sync_vertical_categories(partition_dt)
- print(
- "[scheduler] vertical category sync done: "
- f"partition_dt={result['partition_dt']}, "
- f"base_fetched={result['base_fetched']}, "
- f"base_inserted={result['base_inserted']}, "
- f"effect_inserted={result['effect_inserted']}"
- )
- except Exception as exc:
- print(f"[scheduler] vertical category sync failed: {exc}")
- raise
- def strategy_staging_hourly_generate_job(batch_date: str | None = None) -> None:
- print("[scheduler] start hourly strategy generation for strategy_staging")
- try:
- result = run_strategy_generation(batch_date)
- print(
- "[scheduler] strategy generation done: "
- f"batch_date={result['batch_date']}, "
- f"active={result['active_count']}, "
- f"success={result['success_count']}, "
- f"skipped={result.get('skipped_count', 0)}, "
- f"errors={result['error_count']}"
- )
- if result["warnings"]:
- print(f"[scheduler] strategy generation warnings: {result['warnings']}")
- if result["errors"]:
- print(f"[scheduler] strategy generation errors: {result['errors']}")
- except Exception as exc:
- print(f"[scheduler] strategy generation failed: {exc}")
- raise
- def experiment_demand_pool_hourly_write_job(partition_dt: str | None = None) -> None:
- print("[scheduler] start hourly experiment write to ODPS demand pool tmp")
- try:
- result = run_experiment_hourly_write(partition_dt)
- print(
- "[scheduler] experiment demand pool write done: "
- f"partition_dt={result['partition_dt']}, "
- f"selected={result['selected_count']}, "
- f"written={result['written_count']}, "
- f"existing={result['existing_count']}"
- )
- except Exception as exc:
- print(f"[scheduler] experiment demand pool write failed: {exc}")
- raise
|