scheduler.py 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167
  1. """统一定时任务调度入口。"""
  2. from __future__ import annotations
  3. import argparse
  4. import json
  5. import sys
  6. from datetime import datetime, timedelta
  7. from pathlib import Path
  8. from typing import Any
  9. PROJECT_ROOT = Path(__file__).resolve().parents[1]
  10. if str(PROJECT_ROOT) not in sys.path:
  11. sys.path.insert(0, str(PROJECT_ROOT))
  12. from app.hot_content.decode_result_service import run_once as run_decode_result_once
  13. from app.hot_content.config import load_flow_config
  14. from app.hot_content.postprocess_service import run_once as run_postprocess_once
  15. from app.hot_content.service import run_once
  16. from app.hot_content.timezone import SHANGHAI_TZ
  17. from app.hot_content.types import FlowConfig
  18. def _import_blocking_scheduler() -> Any:
  19. try:
  20. from apscheduler.schedulers.blocking import BlockingScheduler
  21. except ImportError as exc:
  22. raise RuntimeError("缺少依赖:请先执行 pip install -r requirements.txt") from exc
  23. return BlockingScheduler
  24. def run_hot_content_job(config: FlowConfig) -> None:
  25. try:
  26. summary = run_once(config)
  27. print(json.dumps(summary, ensure_ascii=False, indent=2))
  28. except Exception as exc:
  29. print(f"hot content flow failed: {exc}", file=sys.stderr)
  30. def run_decode_result_job(config: FlowConfig) -> None:
  31. """解构结果拉取 -> 后处理 -> 写入 ODPS 需求表。"""
  32. summary: dict[str, Any] = {}
  33. try:
  34. summary["decode_result"] = run_decode_result_once(config)
  35. except Exception as exc:
  36. summary["decode_result_error"] = str(exc)
  37. print(f"decode result flow failed: {exc}", file=sys.stderr)
  38. try:
  39. summary["postprocess"] = run_postprocess_once(config)
  40. except Exception as exc:
  41. summary["postprocess_error"] = str(exc)
  42. print(f"postprocess flow failed: {exc}", file=sys.stderr)
  43. print(json.dumps(summary, ensure_ascii=False, indent=2))
  44. def run_postprocess_job(config: FlowConfig) -> None:
  45. try:
  46. summary = run_postprocess_once(config)
  47. print(json.dumps({"job": "postprocess", "summary": summary}, ensure_ascii=False, indent=2))
  48. except Exception as exc:
  49. print(f"postprocess flow failed: {exc}", file=sys.stderr)
  50. def register_hot_content_job(scheduler: Any, config: FlowConfig) -> None:
  51. scheduler.add_job(
  52. run_hot_content_job,
  53. trigger="cron",
  54. hour=config.hot_flow_cron_hours,
  55. minute=config.hot_flow_cron_minute,
  56. timezone=SHANGHAI_TZ,
  57. args=[config],
  58. id="hot_content_flow",
  59. name="热点内容抓取搜索解构流程",
  60. replace_existing=True,
  61. coalesce=True,
  62. max_instances=1,
  63. )
  64. def register_decode_result_job(scheduler: Any, config: FlowConfig) -> None:
  65. interval_seconds = max(config.decode_result_interval_seconds, 60)
  66. scheduler.add_job(
  67. run_decode_result_job,
  68. trigger="interval",
  69. seconds=interval_seconds,
  70. args=[config],
  71. id="decode_result_flow",
  72. name="解构结果拉取、后处理与 ODPS 需求表写入",
  73. replace_existing=True,
  74. coalesce=True,
  75. max_instances=1,
  76. next_run_time=datetime.now(SHANGHAI_TZ) + timedelta(seconds=interval_seconds),
  77. )
  78. def start_scheduler() -> None:
  79. BlockingScheduler = _import_blocking_scheduler()
  80. scheduler = BlockingScheduler(timezone=SHANGHAI_TZ)
  81. config = load_flow_config()
  82. register_hot_content_job(scheduler, config)
  83. register_decode_result_job(scheduler, config)
  84. print(
  85. "scheduler started, timezone=Asia/Shanghai, "
  86. "jobs=['hot_content_flow', 'decode_result_flow'], "
  87. f"hot_cron={config.hot_flow_cron_hours}:{config.hot_flow_cron_minute:02d}, "
  88. f"decode_result_interval={config.decode_result_interval_seconds}s"
  89. )
  90. scheduler.start()
  91. def parse_args() -> argparse.Namespace:
  92. parser = argparse.ArgumentParser(description="统一定时任务调度入口")
  93. parser.add_argument("--once", action="store_true", help="执行一次,不启动调度器")
  94. parser.add_argument(
  95. "--job",
  96. choices=("all", "hot-content", "decode-result", "postprocess"),
  97. default="all",
  98. help="--once 时选择执行哪个任务",
  99. )
  100. return parser.parse_args()
  101. def main() -> None:
  102. args = parse_args()
  103. if args.once:
  104. config = load_flow_config()
  105. if args.job in {"all", "hot-content"}:
  106. summary = run_once(config)
  107. print(
  108. json.dumps(
  109. {"job": "hot_content_flow", "summary": summary},
  110. ensure_ascii=False,
  111. indent=2,
  112. )
  113. )
  114. if args.job in {"all", "decode-result"}:
  115. summary: dict[str, Any] = {}
  116. try:
  117. summary["decode_result"] = run_decode_result_once(config)
  118. except Exception as exc:
  119. summary["decode_result_error"] = str(exc)
  120. try:
  121. summary["postprocess"] = run_postprocess_once(config)
  122. except Exception as exc:
  123. summary["postprocess_error"] = str(exc)
  124. print(
  125. json.dumps(
  126. {"job": "decode_result_flow", "summary": summary},
  127. ensure_ascii=False,
  128. indent=2,
  129. )
  130. )
  131. if args.job in {"postprocess"}:
  132. summary = run_postprocess_once(config)
  133. print(
  134. json.dumps(
  135. {"job": "postprocess", "summary": summary},
  136. ensure_ascii=False,
  137. indent=2,
  138. )
  139. )
  140. return
  141. start_scheduler()
  142. if __name__ == "__main__":
  143. main()