jobs.py 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106
  1. from datetime import datetime
  2. from zoneinfo import ZoneInfo
  3. from app.services.demand_pool_strategy_daily_alert import run_daily_strategy_alert
  4. from app.services.strategy_generate_service import run_strategy_generation
  5. from app.sync.demand_pool_sync import run_full_sync, run_today_incremental_sync
  6. from app.sync.experiment_demand_pool_write import run_experiment_hourly_write
  7. from app.sync.substance_element_sync import sync_substance_elements
  8. from app.sync.vertical_category_sync import sync_vertical_categories
  9. def heartbeat_job() -> None:
  10. now = datetime.now(ZoneInfo("Asia/Shanghai")).isoformat()
  11. print(f"[scheduler] heartbeat at {now}")
  12. def demand_pool_full_sync_job() -> None:
  13. print("[scheduler] start full sync for demand pool")
  14. result = run_full_sync()
  15. print(f"[scheduler] full sync done: {result}")
  16. def demand_pool_today_incremental_sync_job() -> None:
  17. print("[scheduler] start incremental sync for demand pool")
  18. result = run_today_incremental_sync()
  19. print(f"[scheduler] incremental sync done: {result}")
  20. def demand_pool_daily_strategy_alert_job(partition_dt: str | None = None) -> None:
  21. print("[scheduler] start daily ODPS strategy alert for demand pool")
  22. try:
  23. result = run_daily_strategy_alert(partition_dt)
  24. print(f"[scheduler] daily strategy alert done: {result}")
  25. except Exception as exc:
  26. print(f"[scheduler] daily strategy alert failed: {exc}")
  27. raise
  28. def substance_element_daily_sync_job(partition_dt: str | None = None) -> None:
  29. print("[scheduler] start daily sync for substance elements")
  30. try:
  31. result = sync_substance_elements(partition_dt)
  32. print(
  33. "[scheduler] substance element sync done: "
  34. f"partition_dt={result['partition_dt']}, "
  35. f"base_fetched={result['base_fetched']}, "
  36. f"base_inserted={result['base_inserted']}, "
  37. f"effect_inserted={result['effect_inserted']}"
  38. )
  39. except Exception as exc:
  40. print(f"[scheduler] substance element sync failed: {exc}")
  41. raise
  42. def vertical_category_daily_sync_job(partition_dt: str | None = None) -> None:
  43. print("[scheduler] start daily sync for vertical categories")
  44. try:
  45. result = sync_vertical_categories(partition_dt)
  46. print(
  47. "[scheduler] vertical category sync done: "
  48. f"partition_dt={result['partition_dt']}, "
  49. f"base_fetched={result['base_fetched']}, "
  50. f"base_inserted={result['base_inserted']}, "
  51. f"effect_inserted={result['effect_inserted']}"
  52. )
  53. except Exception as exc:
  54. print(f"[scheduler] vertical category sync failed: {exc}")
  55. raise
  56. def strategy_staging_hourly_generate_job(batch_date: str | None = None) -> None:
  57. print("[scheduler] start hourly strategy generation for strategy_staging")
  58. try:
  59. result = run_strategy_generation(batch_date)
  60. print(
  61. "[scheduler] strategy generation done: "
  62. f"batch_date={result['batch_date']}, "
  63. f"active={result['active_count']}, "
  64. f"success={result['success_count']}, "
  65. f"skipped={result.get('skipped_count', 0)}, "
  66. f"errors={result['error_count']}"
  67. )
  68. if result["warnings"]:
  69. print(f"[scheduler] strategy generation warnings: {result['warnings']}")
  70. if result["errors"]:
  71. print(f"[scheduler] strategy generation errors: {result['errors']}")
  72. except Exception as exc:
  73. print(f"[scheduler] strategy generation failed: {exc}")
  74. raise
  75. def experiment_demand_pool_hourly_write_job(partition_dt: str | None = None) -> None:
  76. print("[scheduler] start hourly experiment write to ODPS demand pool tmp")
  77. try:
  78. result = run_experiment_hourly_write(partition_dt)
  79. print(
  80. "[scheduler] experiment demand pool write done: "
  81. f"partition_dt={result['partition_dt']}, "
  82. f"selected={result['selected_count']}, "
  83. f"written={result['written_count']}, "
  84. f"existing={result['existing_count']}"
  85. )
  86. except Exception as exc:
  87. print(f"[scheduler] experiment demand pool write failed: {exc}")
  88. raise