scheduler.py 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161
  1. """统一定时任务调度入口。"""
  2. from __future__ import annotations
  3. import argparse
  4. import json
  5. import sys
  6. from datetime import datetime, timedelta
  7. from pathlib import Path
  8. from typing import Any
  9. PROJECT_ROOT = Path(__file__).resolve().parents[1]
  10. if str(PROJECT_ROOT) not in sys.path:
  11. sys.path.insert(0, str(PROJECT_ROOT))
  12. from app.hot_content.decode_result_service import run_once as run_decode_result_once
  13. from app.hot_content.config import load_flow_config
  14. from app.hot_content.postprocess_service import run_once as run_postprocess_once
  15. from app.hot_content.service import run_once
  16. from app.hot_content.timezone import SHANGHAI_TZ
  17. from app.hot_content.types import FlowConfig
  18. def _import_blocking_scheduler() -> Any:
  19. try:
  20. from apscheduler.schedulers.blocking import BlockingScheduler
  21. except ImportError as exc:
  22. raise RuntimeError("缺少依赖:请先执行 pip install -r requirements.txt") from exc
  23. return BlockingScheduler
  24. def run_hot_content_job(config: FlowConfig) -> None:
  25. try:
  26. summary = run_once(config)
  27. print(json.dumps(summary, ensure_ascii=False, indent=2))
  28. except Exception as exc:
  29. print(f"hot content flow failed: {exc}", file=sys.stderr)
  30. def run_decode_result_job(config: FlowConfig) -> None:
  31. """解构结果拉取 -> 后处理 -> 写入 ODPS 需求表。"""
  32. try:
  33. summary: dict[str, Any] = {"decode_result": run_decode_result_once(config)}
  34. try:
  35. summary["postprocess"] = run_postprocess_once(config)
  36. except Exception as exc:
  37. summary["postprocess_error"] = str(exc)
  38. print(f"postprocess flow failed: {exc}", file=sys.stderr)
  39. print(json.dumps(summary, ensure_ascii=False, indent=2))
  40. except Exception as exc:
  41. print(f"decode result flow failed: {exc}", file=sys.stderr)
  42. def run_postprocess_job(config: FlowConfig) -> None:
  43. try:
  44. summary = run_postprocess_once(config)
  45. print(json.dumps({"job": "postprocess", "summary": summary}, ensure_ascii=False, indent=2))
  46. except Exception as exc:
  47. print(f"postprocess flow failed: {exc}", file=sys.stderr)
  48. def register_hot_content_job(scheduler: Any, config: FlowConfig) -> None:
  49. scheduler.add_job(
  50. run_hot_content_job,
  51. trigger="cron",
  52. hour=config.hot_flow_cron_hours,
  53. minute=config.hot_flow_cron_minute,
  54. timezone=SHANGHAI_TZ,
  55. args=[config],
  56. id="hot_content_flow",
  57. name="热点内容抓取搜索解构流程",
  58. replace_existing=True,
  59. coalesce=True,
  60. max_instances=1,
  61. )
  62. def register_decode_result_job(scheduler: Any, config: FlowConfig) -> None:
  63. interval_seconds = max(config.decode_result_interval_seconds, 60)
  64. scheduler.add_job(
  65. run_decode_result_job,
  66. trigger="interval",
  67. seconds=interval_seconds,
  68. args=[config],
  69. id="decode_result_flow",
  70. name="解构结果拉取、后处理与 ODPS 需求表写入",
  71. replace_existing=True,
  72. coalesce=True,
  73. max_instances=1,
  74. next_run_time=datetime.now(SHANGHAI_TZ) + timedelta(seconds=interval_seconds),
  75. )
  76. def start_scheduler() -> None:
  77. BlockingScheduler = _import_blocking_scheduler()
  78. scheduler = BlockingScheduler(timezone=SHANGHAI_TZ)
  79. config = load_flow_config()
  80. register_hot_content_job(scheduler, config)
  81. register_decode_result_job(scheduler, config)
  82. print(
  83. "scheduler started, timezone=Asia/Shanghai, "
  84. "jobs=['hot_content_flow', 'decode_result_flow'], "
  85. f"hot_cron={config.hot_flow_cron_hours}:{config.hot_flow_cron_minute:02d}, "
  86. f"decode_result_interval={config.decode_result_interval_seconds}s"
  87. )
  88. scheduler.start()
  89. def parse_args() -> argparse.Namespace:
  90. parser = argparse.ArgumentParser(description="统一定时任务调度入口")
  91. parser.add_argument("--once", action="store_true", help="执行一次,不启动调度器")
  92. parser.add_argument(
  93. "--job",
  94. choices=("all", "hot-content", "decode-result", "postprocess"),
  95. default="all",
  96. help="--once 时选择执行哪个任务",
  97. )
  98. return parser.parse_args()
  99. def main() -> None:
  100. args = parse_args()
  101. if args.once:
  102. config = load_flow_config()
  103. if args.job in {"all", "hot-content"}:
  104. summary = run_once(config)
  105. print(
  106. json.dumps(
  107. {"job": "hot_content_flow", "summary": summary},
  108. ensure_ascii=False,
  109. indent=2,
  110. )
  111. )
  112. if args.job in {"all", "decode-result"}:
  113. summary = {"decode_result": run_decode_result_once(config)}
  114. try:
  115. summary["postprocess"] = run_postprocess_once(config)
  116. except Exception as exc:
  117. summary["postprocess_error"] = str(exc)
  118. print(
  119. json.dumps(
  120. {"job": "decode_result_flow", "summary": summary},
  121. ensure_ascii=False,
  122. indent=2,
  123. )
  124. )
  125. if args.job in {"postprocess"}:
  126. summary = run_postprocess_once(config)
  127. print(
  128. json.dumps(
  129. {"job": "postprocess", "summary": summary},
  130. ensure_ascii=False,
  131. indent=2,
  132. )
  133. )
  134. return
  135. start_scheduler()
  136. if __name__ == "__main__":
  137. main()