scheduler.py 7.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215
  1. """统一定时任务调度入口。"""
  2. from __future__ import annotations
  3. import argparse
  4. import json
  5. import sys
  6. from datetime import datetime, timedelta
  7. from pathlib import Path
  8. from typing import Any
  9. PROJECT_ROOT = Path(__file__).resolve().parents[1]
  10. if str(PROJECT_ROOT) not in sys.path:
  11. sys.path.insert(0, str(PROJECT_ROOT))
  12. from app.hot_content.client import JsonApiClient
  13. from app.hot_content.decode_result_service import run_once as run_decode_result_once
  14. from app.hot_content.config import load_flow_config
  15. from app.hot_content.postprocess_service import run_once as run_postprocess_once
  16. from app.hot_content.repository import HotContentRepository
  17. from app.hot_content.service import run_once
  18. from app.hot_content.timezone import SHANGHAI_TZ
  19. from app.hot_content.types import FlowConfig
  20. from app.hot_content.wxindex_words import run_wxindex_words_daily_job
  21. def _import_blocking_scheduler() -> Any:
  22. try:
  23. from apscheduler.schedulers.blocking import BlockingScheduler
  24. except ImportError as exc:
  25. raise RuntimeError("缺少依赖:请先执行 pip install -r requirements.txt") from exc
  26. return BlockingScheduler
  27. def run_hot_content_job(config: FlowConfig) -> None:
  28. try:
  29. summary = run_once(config)
  30. print(json.dumps(summary, ensure_ascii=False, indent=2))
  31. except Exception as exc:
  32. print(f"hot content flow failed: {exc}", file=sys.stderr)
  33. def run_decode_result_job(config: FlowConfig) -> None:
  34. """解构结果拉取 -> 后处理 -> 写入 ODPS 需求表。"""
  35. summary: dict[str, Any] = {}
  36. try:
  37. summary["decode_result"] = run_decode_result_once(config)
  38. except Exception as exc:
  39. summary["decode_result_error"] = str(exc)
  40. print(f"decode result flow failed: {exc}", file=sys.stderr)
  41. try:
  42. summary["postprocess"] = run_postprocess_once(config)
  43. except Exception as exc:
  44. summary["postprocess_error"] = str(exc)
  45. print(f"postprocess flow failed: {exc}", file=sys.stderr)
  46. print(json.dumps(summary, ensure_ascii=False, indent=2))
  47. def run_postprocess_job(config: FlowConfig) -> None:
  48. try:
  49. summary = run_postprocess_once(config)
  50. print(json.dumps({"job": "postprocess", "summary": summary}, ensure_ascii=False, indent=2))
  51. except Exception as exc:
  52. print(f"postprocess flow failed: {exc}", file=sys.stderr)
  53. def run_wxindex_words_refresh_job(config: FlowConfig) -> None:
  54. repository = HotContentRepository(config.mysql)
  55. api_client = JsonApiClient(
  56. timeout_seconds=config.request_timeout_seconds,
  57. verify_ssl=config.https_verify_ssl,
  58. )
  59. try:
  60. summary = run_wxindex_words_daily_job(
  61. repository,
  62. api_client,
  63. config.wxindex_api_url,
  64. )
  65. print(
  66. json.dumps(
  67. {"job": "wxindex_words_refresh", "summary": summary},
  68. ensure_ascii=False,
  69. indent=2,
  70. )
  71. )
  72. except Exception as exc:
  73. print(f"wxindex words refresh failed: {exc}", file=sys.stderr)
  74. finally:
  75. repository.close()
  76. def register_hot_content_job(scheduler: Any, config: FlowConfig) -> None:
  77. scheduler.add_job(
  78. run_hot_content_job,
  79. trigger="cron",
  80. hour=config.hot_flow_cron_hours,
  81. minute=config.hot_flow_cron_minute,
  82. timezone=SHANGHAI_TZ,
  83. args=[config],
  84. id="hot_content_flow",
  85. name="热点内容抓取搜索解构流程",
  86. replace_existing=True,
  87. coalesce=True,
  88. max_instances=1,
  89. )
  90. def register_decode_result_job(scheduler: Any, config: FlowConfig) -> None:
  91. interval_seconds = max(config.decode_result_interval_seconds, 60)
  92. scheduler.add_job(
  93. run_decode_result_job,
  94. trigger="interval",
  95. seconds=interval_seconds,
  96. args=[config],
  97. id="decode_result_flow",
  98. name="解构结果拉取、后处理与 ODPS 需求表写入",
  99. replace_existing=True,
  100. coalesce=True,
  101. max_instances=1,
  102. next_run_time=datetime.now(SHANGHAI_TZ) + timedelta(seconds=interval_seconds),
  103. )
  104. def register_wxindex_words_refresh_job(scheduler: Any, config: FlowConfig) -> None:
  105. scheduler.add_job(
  106. run_wxindex_words_refresh_job,
  107. trigger="cron",
  108. hour=config.wxindex_words_cron_hour,
  109. minute=config.wxindex_words_cron_minute,
  110. timezone=SHANGHAI_TZ,
  111. args=[config],
  112. id="wxindex_words_refresh",
  113. name="微信指数词汇总表补全缺失日期并清理低均值词",
  114. replace_existing=True,
  115. coalesce=True,
  116. max_instances=1,
  117. )
  118. def start_scheduler() -> None:
  119. BlockingScheduler = _import_blocking_scheduler()
  120. scheduler = BlockingScheduler(timezone=SHANGHAI_TZ)
  121. config = load_flow_config()
  122. register_hot_content_job(scheduler, config)
  123. register_decode_result_job(scheduler, config)
  124. register_wxindex_words_refresh_job(scheduler, config)
  125. print(
  126. "scheduler started, timezone=Asia/Shanghai, "
  127. "jobs=['hot_content_flow', 'decode_result_flow', 'wxindex_words_refresh'], "
  128. f"hot_cron={config.hot_flow_cron_hours}:{config.hot_flow_cron_minute:02d}, "
  129. f"decode_result_interval={config.decode_result_interval_seconds}s, "
  130. f"wxindex_words_cron={config.wxindex_words_cron_hour}:{config.wxindex_words_cron_minute:02d}"
  131. )
  132. scheduler.start()
  133. def parse_args() -> argparse.Namespace:
  134. parser = argparse.ArgumentParser(description="统一定时任务调度入口")
  135. parser.add_argument("--once", action="store_true", help="执行一次,不启动调度器")
  136. parser.add_argument(
  137. "--job",
  138. choices=("all", "hot-content", "decode-result", "postprocess", "wxindex-refresh"),
  139. default="all",
  140. help="--once 时选择执行哪个任务",
  141. )
  142. return parser.parse_args()
  143. def main() -> None:
  144. args = parse_args()
  145. if args.once:
  146. config = load_flow_config()
  147. if args.job in {"all", "hot-content"}:
  148. summary = run_once(config)
  149. print(
  150. json.dumps(
  151. {"job": "hot_content_flow", "summary": summary},
  152. ensure_ascii=False,
  153. indent=2,
  154. )
  155. )
  156. if args.job in {"all", "decode-result"}:
  157. summary: dict[str, Any] = {}
  158. try:
  159. summary["decode_result"] = run_decode_result_once(config)
  160. except Exception as exc:
  161. summary["decode_result_error"] = str(exc)
  162. try:
  163. summary["postprocess"] = run_postprocess_once(config)
  164. except Exception as exc:
  165. summary["postprocess_error"] = str(exc)
  166. print(
  167. json.dumps(
  168. {"job": "decode_result_flow", "summary": summary},
  169. ensure_ascii=False,
  170. indent=2,
  171. )
  172. )
  173. if args.job in {"postprocess"}:
  174. summary = run_postprocess_once(config)
  175. print(
  176. json.dumps(
  177. {"job": "postprocess", "summary": summary},
  178. ensure_ascii=False,
  179. indent=2,
  180. )
  181. )
  182. if args.job in {"wxindex-refresh"}:
  183. run_wxindex_words_refresh_job(config)
  184. return
  185. start_scheduler()
  186. if __name__ == "__main__":
  187. main()