"""统一定时任务调度入口。""" from __future__ import annotations import argparse import json import sys from datetime import datetime, timedelta from pathlib import Path from typing import Any PROJECT_ROOT = Path(__file__).resolve().parents[1] if str(PROJECT_ROOT) not in sys.path: sys.path.insert(0, str(PROJECT_ROOT)) from app.hot_content.client import JsonApiClient from app.hot_content.decode_result_service import run_once as run_decode_result_once from app.hot_content.config import load_flow_config from app.hot_content.postprocess_service import run_once as run_postprocess_once from app.hot_content.repository import HotContentRepository from app.hot_content.service import run_once from app.hot_content.timezone import SHANGHAI_TZ from app.hot_content.types import FlowConfig from app.hot_content.wxindex_words import run_wxindex_words_daily_job def _import_blocking_scheduler() -> Any: try: from apscheduler.schedulers.blocking import BlockingScheduler except ImportError as exc: raise RuntimeError("缺少依赖:请先执行 pip install -r requirements.txt") from exc return BlockingScheduler def run_hot_content_job(config: FlowConfig) -> None: try: summary = run_once(config) print(json.dumps(summary, ensure_ascii=False, indent=2)) except Exception as exc: print(f"hot content flow failed: {exc}", file=sys.stderr) def run_decode_result_job(config: FlowConfig) -> None: """解构结果拉取 -> 后处理 -> 写入 ODPS 需求表。""" summary: dict[str, Any] = {} try: summary["decode_result"] = run_decode_result_once(config) except Exception as exc: summary["decode_result_error"] = str(exc) print(f"decode result flow failed: {exc}", file=sys.stderr) try: summary["postprocess"] = run_postprocess_once(config) except Exception as exc: summary["postprocess_error"] = str(exc) print(f"postprocess flow failed: {exc}", file=sys.stderr) print(json.dumps(summary, ensure_ascii=False, indent=2)) def run_postprocess_job(config: FlowConfig) -> None: try: summary = run_postprocess_once(config) print(json.dumps({"job": "postprocess", "summary": summary}, ensure_ascii=False, indent=2)) except Exception as exc: print(f"postprocess flow failed: {exc}", file=sys.stderr) def run_wxindex_words_refresh_job(config: FlowConfig) -> None: repository = HotContentRepository(config.mysql) api_client = JsonApiClient( timeout_seconds=config.request_timeout_seconds, verify_ssl=config.https_verify_ssl, ) try: summary = run_wxindex_words_daily_job( repository, api_client, config.wxindex_api_url, ) print( json.dumps( {"job": "wxindex_words_refresh", "summary": summary}, ensure_ascii=False, indent=2, ) ) except Exception as exc: print(f"wxindex words refresh failed: {exc}", file=sys.stderr) finally: repository.close() def register_hot_content_job(scheduler: Any, config: FlowConfig) -> None: scheduler.add_job( run_hot_content_job, trigger="cron", hour=config.hot_flow_cron_hours, minute=config.hot_flow_cron_minute, timezone=SHANGHAI_TZ, args=[config], id="hot_content_flow", name="热点内容抓取搜索解构流程", replace_existing=True, coalesce=True, max_instances=1, ) def register_decode_result_job(scheduler: Any, config: FlowConfig) -> None: interval_seconds = max(config.decode_result_interval_seconds, 60) scheduler.add_job( run_decode_result_job, trigger="interval", seconds=interval_seconds, args=[config], id="decode_result_flow", name="解构结果拉取、后处理与 ODPS 需求表写入", replace_existing=True, coalesce=True, max_instances=1, next_run_time=datetime.now(SHANGHAI_TZ) + timedelta(seconds=interval_seconds), ) def register_wxindex_words_refresh_job(scheduler: Any, config: FlowConfig) -> None: scheduler.add_job( run_wxindex_words_refresh_job, trigger="cron", hour=config.wxindex_words_cron_hour, minute=config.wxindex_words_cron_minute, timezone=SHANGHAI_TZ, args=[config], id="wxindex_words_refresh", name="微信指数词汇总表补全缺失日期并清理低均值词", replace_existing=True, coalesce=True, max_instances=1, ) def start_scheduler() -> None: BlockingScheduler = _import_blocking_scheduler() scheduler = BlockingScheduler(timezone=SHANGHAI_TZ) config = load_flow_config() register_hot_content_job(scheduler, config) register_decode_result_job(scheduler, config) register_wxindex_words_refresh_job(scheduler, config) print( "scheduler started, timezone=Asia/Shanghai, " "jobs=['hot_content_flow', 'decode_result_flow', 'wxindex_words_refresh'], " f"hot_cron={config.hot_flow_cron_hours}:{config.hot_flow_cron_minute:02d}, " f"decode_result_interval={config.decode_result_interval_seconds}s, " f"wxindex_words_cron={config.wxindex_words_cron_hour}:{config.wxindex_words_cron_minute:02d}" ) scheduler.start() def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser(description="统一定时任务调度入口") parser.add_argument("--once", action="store_true", help="执行一次,不启动调度器") parser.add_argument( "--job", choices=("all", "hot-content", "decode-result", "postprocess", "wxindex-refresh"), default="all", help="--once 时选择执行哪个任务", ) return parser.parse_args() def main() -> None: args = parse_args() if args.once: config = load_flow_config() if args.job in {"all", "hot-content"}: summary = run_once(config) print( json.dumps( {"job": "hot_content_flow", "summary": summary}, ensure_ascii=False, indent=2, ) ) if args.job in {"all", "decode-result"}: summary: dict[str, Any] = {} try: summary["decode_result"] = run_decode_result_once(config) except Exception as exc: summary["decode_result_error"] = str(exc) try: summary["postprocess"] = run_postprocess_once(config) except Exception as exc: summary["postprocess_error"] = str(exc) print( json.dumps( {"job": "decode_result_flow", "summary": summary}, ensure_ascii=False, indent=2, ) ) if args.job in {"postprocess"}: summary = run_postprocess_once(config) print( json.dumps( {"job": "postprocess", "summary": summary}, ensure_ascii=False, indent=2, ) ) if args.job in {"wxindex-refresh"}: run_wxindex_words_refresh_job(config) return start_scheduler() if __name__ == "__main__": main()