cli.py 802 B

1234567891011121314151617181920212223242526272829303132
  1. """热点内容流程命令行入口。"""
  2. from __future__ import annotations
  3. import argparse
  4. import json
  5. from app.hot_content.config import load_flow_config
  6. from app.hot_content.service import run_once
  7. def parse_args() -> argparse.Namespace:
  8. parser = argparse.ArgumentParser(description="热点内容 MySQL 入库与定时调度流程")
  9. parser.add_argument("--once", action="store_true", help="只执行一次,不进入循环调度")
  10. return parser.parse_args()
  11. def main() -> None:
  12. args = parse_args()
  13. config = load_flow_config()
  14. if args.once:
  15. summary = run_once(config)
  16. print(json.dumps(summary, ensure_ascii=False, indent=2))
  17. return
  18. from app.scheduler import start_scheduler
  19. start_scheduler()
  20. if __name__ == "__main__":
  21. main()