scheduler.py 9.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263
  1. """统一定时任务调度入口。"""
  2. from __future__ import annotations
  3. import argparse
  4. import json
  5. import sys
  6. from datetime import datetime, timedelta
  7. from pathlib import Path
  8. from typing import Any
  9. PROJECT_ROOT = Path(__file__).resolve().parents[1]
  10. if str(PROJECT_ROOT) not in sys.path:
  11. sys.path.insert(0, str(PROJECT_ROOT))
  12. from app.hot_content.client import JsonApiClient
  13. from app.hot_content.decode_result_service import run_once as run_decode_result_once
  14. from app.hot_content.config import load_flow_config
  15. from app.hot_content.postprocess_service import run_once as run_postprocess_once
  16. from app.hot_content.repository import HotContentRepository
  17. from app.hot_content.service import run_once
  18. from app.hot_content.timezone import SHANGHAI_TZ
  19. from app.hot_content.types import FlowConfig
  20. from app.hot_content.wxindex_words import run_wxindex_words_daily_job
  21. from app.hot_content.wxindex_heat_pattern import run_wxindex_heat_pattern_daily_job
  22. def _import_blocking_scheduler() -> Any:
  23. try:
  24. from apscheduler.schedulers.blocking import BlockingScheduler
  25. except ImportError as exc:
  26. raise RuntimeError("缺少依赖:请先执行 pip install -r requirements.txt") from exc
  27. return BlockingScheduler
  28. def run_hot_content_job(config: FlowConfig) -> None:
  29. try:
  30. summary = run_once(config)
  31. print(json.dumps(summary, ensure_ascii=False, indent=2))
  32. except Exception as exc:
  33. print(f"hot content flow failed: {exc}", file=sys.stderr)
  34. def run_decode_result_job(config: FlowConfig) -> None:
  35. """解构结果拉取 -> 后处理 -> 写入 ODPS 需求表。"""
  36. summary: dict[str, Any] = {}
  37. try:
  38. summary["decode_result"] = run_decode_result_once(config)
  39. except Exception as exc:
  40. summary["decode_result_error"] = str(exc)
  41. print(f"decode result flow failed: {exc}", file=sys.stderr)
  42. try:
  43. summary["postprocess"] = run_postprocess_once(config)
  44. except Exception as exc:
  45. summary["postprocess_error"] = str(exc)
  46. print(f"postprocess flow failed: {exc}", file=sys.stderr)
  47. print(json.dumps(summary, ensure_ascii=False, indent=2))
  48. def run_postprocess_job(config: FlowConfig) -> None:
  49. try:
  50. summary = run_postprocess_once(config)
  51. print(json.dumps({"job": "postprocess", "summary": summary}, ensure_ascii=False, indent=2))
  52. except Exception as exc:
  53. print(f"postprocess flow failed: {exc}", file=sys.stderr)
  54. def run_wxindex_words_refresh_job(config: FlowConfig) -> None:
  55. repository = HotContentRepository(config.mysql)
  56. api_client = JsonApiClient(
  57. timeout_seconds=config.request_timeout_seconds,
  58. verify_ssl=config.https_verify_ssl,
  59. )
  60. try:
  61. summary = run_wxindex_words_daily_job(
  62. repository,
  63. api_client,
  64. config.wxindex_api_url,
  65. )
  66. print(
  67. json.dumps(
  68. {"job": "wxindex_words_refresh", "summary": summary},
  69. ensure_ascii=False,
  70. indent=2,
  71. )
  72. )
  73. except Exception as exc:
  74. print(f"wxindex words refresh failed: {exc}", file=sys.stderr)
  75. finally:
  76. repository.close()
  77. def run_wxindex_heat_pattern_job(config: FlowConfig) -> None:
  78. repository = HotContentRepository(config.mysql)
  79. api_client = JsonApiClient(
  80. timeout_seconds=config.request_timeout_seconds,
  81. verify_ssl=config.https_verify_ssl,
  82. )
  83. try:
  84. summary = run_wxindex_heat_pattern_daily_job(
  85. repository,
  86. config=config,
  87. api_client=api_client,
  88. )
  89. print(
  90. json.dumps(
  91. {"job": "wxindex_heat_pattern", "summary": summary},
  92. ensure_ascii=False,
  93. indent=2,
  94. )
  95. )
  96. except Exception as exc:
  97. print(f"wxindex heat pattern failed: {exc}", file=sys.stderr)
  98. finally:
  99. repository.close()
  100. def register_hot_content_job(scheduler: Any, config: FlowConfig) -> None:
  101. scheduler.add_job(
  102. run_hot_content_job,
  103. trigger="cron",
  104. hour=config.hot_flow_cron_hours,
  105. minute=config.hot_flow_cron_minute,
  106. timezone=SHANGHAI_TZ,
  107. args=[config],
  108. id="hot_content_flow",
  109. name="热点内容抓取搜索解构流程",
  110. replace_existing=True,
  111. coalesce=True,
  112. max_instances=1,
  113. )
  114. def register_decode_result_job(scheduler: Any, config: FlowConfig) -> None:
  115. interval_seconds = max(config.decode_result_interval_seconds, 60)
  116. scheduler.add_job(
  117. run_decode_result_job,
  118. trigger="interval",
  119. seconds=interval_seconds,
  120. args=[config],
  121. id="decode_result_flow",
  122. name="解构结果拉取、后处理与 ODPS 需求表写入",
  123. replace_existing=True,
  124. coalesce=True,
  125. max_instances=1,
  126. next_run_time=datetime.now(SHANGHAI_TZ) + timedelta(seconds=interval_seconds),
  127. )
  128. def register_wxindex_words_refresh_job(scheduler: Any, config: FlowConfig) -> None:
  129. scheduler.add_job(
  130. run_wxindex_words_refresh_job,
  131. trigger="cron",
  132. hour=config.wxindex_words_cron_hours,
  133. minute=config.wxindex_words_cron_minute,
  134. timezone=SHANGHAI_TZ,
  135. args=[config],
  136. id="wxindex_words_refresh",
  137. name="微信指数词汇总表补全缺失日期并清理低均值词",
  138. replace_existing=True,
  139. coalesce=True,
  140. max_instances=1,
  141. )
  142. def register_wxindex_heat_pattern_job(scheduler: Any, config: FlowConfig) -> None:
  143. scheduler.add_job(
  144. run_wxindex_heat_pattern_job,
  145. trigger="cron",
  146. hour=config.wxindex_heat_pattern_cron_hours,
  147. minute=config.wxindex_heat_pattern_cron_minute,
  148. timezone=SHANGHAI_TZ,
  149. args=[config],
  150. id="wxindex_heat_pattern",
  151. name="微信指数热度模式分析(持续高热/上涨/暴涨)",
  152. replace_existing=True,
  153. coalesce=True,
  154. max_instances=1,
  155. )
  156. def start_scheduler() -> None:
  157. BlockingScheduler = _import_blocking_scheduler()
  158. scheduler = BlockingScheduler(timezone=SHANGHAI_TZ)
  159. config = load_flow_config()
  160. register_hot_content_job(scheduler, config)
  161. register_decode_result_job(scheduler, config)
  162. register_wxindex_words_refresh_job(scheduler, config)
  163. register_wxindex_heat_pattern_job(scheduler, config)
  164. print(
  165. "scheduler started, timezone=Asia/Shanghai, "
  166. "jobs=['hot_content_flow', 'decode_result_flow', 'wxindex_words_refresh', "
  167. "'wxindex_heat_pattern'], "
  168. f"hot_cron={config.hot_flow_cron_hours}:{config.hot_flow_cron_minute:02d}, "
  169. f"decode_result_interval={config.decode_result_interval_seconds}s, "
  170. f"wxindex_words_cron={config.wxindex_words_cron_hours}:{config.wxindex_words_cron_minute:02d}, "
  171. f"wxindex_heat_pattern_cron="
  172. f"{config.wxindex_heat_pattern_cron_hours}:{config.wxindex_heat_pattern_cron_minute:02d}"
  173. )
  174. scheduler.start()
  175. def parse_args() -> argparse.Namespace:
  176. parser = argparse.ArgumentParser(description="统一定时任务调度入口")
  177. parser.add_argument("--once", action="store_true", help="执行一次,不启动调度器")
  178. parser.add_argument(
  179. "--job",
  180. choices=("all", "hot-content", "decode-result", "postprocess", "wxindex-refresh", "wxindex-heat-pattern"),
  181. default="all",
  182. help="--once 时选择执行哪个任务",
  183. )
  184. return parser.parse_args()
  185. def main() -> None:
  186. args = parse_args()
  187. if args.once:
  188. config = load_flow_config()
  189. if args.job in {"all", "hot-content"}:
  190. summary = run_once(config)
  191. print(
  192. json.dumps(
  193. {"job": "hot_content_flow", "summary": summary},
  194. ensure_ascii=False,
  195. indent=2,
  196. )
  197. )
  198. if args.job in {"all", "decode-result"}:
  199. summary: dict[str, Any] = {}
  200. try:
  201. summary["decode_result"] = run_decode_result_once(config)
  202. except Exception as exc:
  203. summary["decode_result_error"] = str(exc)
  204. try:
  205. summary["postprocess"] = run_postprocess_once(config)
  206. except Exception as exc:
  207. summary["postprocess_error"] = str(exc)
  208. print(
  209. json.dumps(
  210. {"job": "decode_result_flow", "summary": summary},
  211. ensure_ascii=False,
  212. indent=2,
  213. )
  214. )
  215. if args.job in {"postprocess"}:
  216. summary = run_postprocess_once(config)
  217. print(
  218. json.dumps(
  219. {"job": "postprocess", "summary": summary},
  220. ensure_ascii=False,
  221. indent=2,
  222. )
  223. )
  224. if args.job in {"wxindex-refresh"}:
  225. run_wxindex_words_refresh_job(config)
  226. if args.job in {"wxindex-heat-pattern"}:
  227. run_wxindex_heat_pattern_job(config)
  228. return
  229. start_scheduler()
  230. if __name__ == "__main__":
  231. main()