| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161 |
- """统一定时任务调度入口。"""
- from __future__ import annotations
- import argparse
- import json
- import sys
- from datetime import datetime, timedelta
- from pathlib import Path
- from typing import Any
- PROJECT_ROOT = Path(__file__).resolve().parents[1]
- if str(PROJECT_ROOT) not in sys.path:
- sys.path.insert(0, str(PROJECT_ROOT))
- from app.hot_content.decode_result_service import run_once as run_decode_result_once
- from app.hot_content.config import load_flow_config
- from app.hot_content.postprocess_service import run_once as run_postprocess_once
- from app.hot_content.service import run_once
- from app.hot_content.timezone import SHANGHAI_TZ
- from app.hot_content.types import FlowConfig
- def _import_blocking_scheduler() -> Any:
- try:
- from apscheduler.schedulers.blocking import BlockingScheduler
- except ImportError as exc:
- raise RuntimeError("缺少依赖:请先执行 pip install -r requirements.txt") from exc
- return BlockingScheduler
- def run_hot_content_job(config: FlowConfig) -> None:
- try:
- summary = run_once(config)
- print(json.dumps(summary, ensure_ascii=False, indent=2))
- except Exception as exc:
- print(f"hot content flow failed: {exc}", file=sys.stderr)
- def run_decode_result_job(config: FlowConfig) -> None:
- """解构结果拉取 -> 后处理 -> 写入 ODPS 需求表。"""
- try:
- summary: dict[str, Any] = {"decode_result": run_decode_result_once(config)}
- try:
- summary["postprocess"] = run_postprocess_once(config)
- except Exception as exc:
- summary["postprocess_error"] = str(exc)
- print(f"postprocess flow failed: {exc}", file=sys.stderr)
- print(json.dumps(summary, ensure_ascii=False, indent=2))
- except Exception as exc:
- print(f"decode result flow failed: {exc}", file=sys.stderr)
- def run_postprocess_job(config: FlowConfig) -> None:
- try:
- summary = run_postprocess_once(config)
- print(json.dumps({"job": "postprocess", "summary": summary}, ensure_ascii=False, indent=2))
- except Exception as exc:
- print(f"postprocess flow failed: {exc}", file=sys.stderr)
- def register_hot_content_job(scheduler: Any, config: FlowConfig) -> None:
- scheduler.add_job(
- run_hot_content_job,
- trigger="cron",
- hour=config.hot_flow_cron_hours,
- minute=config.hot_flow_cron_minute,
- timezone=SHANGHAI_TZ,
- args=[config],
- id="hot_content_flow",
- name="热点内容抓取搜索解构流程",
- replace_existing=True,
- coalesce=True,
- max_instances=1,
- )
- def register_decode_result_job(scheduler: Any, config: FlowConfig) -> None:
- interval_seconds = max(config.decode_result_interval_seconds, 60)
- scheduler.add_job(
- run_decode_result_job,
- trigger="interval",
- seconds=interval_seconds,
- args=[config],
- id="decode_result_flow",
- name="解构结果拉取、后处理与 ODPS 需求表写入",
- replace_existing=True,
- coalesce=True,
- max_instances=1,
- next_run_time=datetime.now(SHANGHAI_TZ) + timedelta(seconds=interval_seconds),
- )
- def start_scheduler() -> None:
- BlockingScheduler = _import_blocking_scheduler()
- scheduler = BlockingScheduler(timezone=SHANGHAI_TZ)
- config = load_flow_config()
- register_hot_content_job(scheduler, config)
- register_decode_result_job(scheduler, config)
- print(
- "scheduler started, timezone=Asia/Shanghai, "
- "jobs=['hot_content_flow', 'decode_result_flow'], "
- f"hot_cron={config.hot_flow_cron_hours}:{config.hot_flow_cron_minute:02d}, "
- f"decode_result_interval={config.decode_result_interval_seconds}s"
- )
- scheduler.start()
- def parse_args() -> argparse.Namespace:
- parser = argparse.ArgumentParser(description="统一定时任务调度入口")
- parser.add_argument("--once", action="store_true", help="执行一次,不启动调度器")
- parser.add_argument(
- "--job",
- choices=("all", "hot-content", "decode-result", "postprocess"),
- default="all",
- help="--once 时选择执行哪个任务",
- )
- return parser.parse_args()
- def main() -> None:
- args = parse_args()
- if args.once:
- config = load_flow_config()
- if args.job in {"all", "hot-content"}:
- summary = run_once(config)
- print(
- json.dumps(
- {"job": "hot_content_flow", "summary": summary},
- ensure_ascii=False,
- indent=2,
- )
- )
- if args.job in {"all", "decode-result"}:
- summary = {"decode_result": run_decode_result_once(config)}
- try:
- summary["postprocess"] = run_postprocess_once(config)
- except Exception as exc:
- summary["postprocess_error"] = str(exc)
- print(
- json.dumps(
- {"job": "decode_result_flow", "summary": summary},
- ensure_ascii=False,
- indent=2,
- )
- )
- if args.job in {"postprocess"}:
- summary = run_postprocess_once(config)
- print(
- json.dumps(
- {"job": "postprocess", "summary": summary},
- ensure_ascii=False,
- indent=2,
- )
- )
- return
- start_scheduler()
- if __name__ == "__main__":
- main()
|