async_consumer.py 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147
  1. import asyncio
  2. import json
  3. import traceback
  4. from typing import List
  5. import signal
  6. from core.utils.log.logger_manager import LoggerManager
  7. from core.utils.trace_utils import generate_trace_id
  8. from services.async_mysql_service import AsyncMysqlService
  9. from spiders.spider_registry import get_spider_class
  10. logger = LoggerManager.get_logger()
  11. aliyun_logger = LoggerManager.get_aliyun_logger()
  12. async def async_handle_topic(topic: str,stop_event: asyncio.Event):
  13. """
  14. 单个 topic 的消费逻辑,运行在协程中:
  15. - 从 MQ 中消费消息;
  16. - 根据消息内容执行对应爬虫;
  17. - 使用异步数据库服务查询配置;
  18. - 记录日志、确认消息。
  19. """
  20. # 每个 topic 创建独立的 consumer 实例
  21. from services.async_mq_consumer import AsyncRocketMQConsumer
  22. consumer = AsyncRocketMQConsumer(topic_name=topic, group_id=topic)
  23. async def handle_single_message(message):
  24. trace_id = generate_trace_id()
  25. try:
  26. payload = json.loads(message.message_body)
  27. task_id = payload["id"]
  28. logger.info(f"{trace_id} - 接收到任务消息: {task_id}")
  29. aliyun_logger.logging(
  30. code="1000",
  31. message="任务接收成功",
  32. data=payload,
  33. trace_id=trace_id,
  34. account=topic
  35. )
  36. async with AsyncMysqlService() as mysql:
  37. user_list = await mysql.get_user_list(task_id)
  38. rule_dict = await mysql.get_rule_dict(task_id)
  39. CrawlerClass = get_spider_class(topic)
  40. crawler = CrawlerClass(
  41. rule_dict=rule_dict,
  42. user_list=user_list,
  43. trace_id=trace_id
  44. )
  45. await crawler.run()
  46. # ack 由 run 成功后执行
  47. await consumer.ack_message(message.receipt_handle)
  48. logger.info(f"{trace_id} - 任务 {task_id} 执行成功并已 Ack")
  49. aliyun_logger.logging(
  50. code="1010",
  51. message="任务执行成功",
  52. trace_id=trace_id,
  53. data={
  54. "task_id": task_id,
  55. "topic": topic
  56. },
  57. account=topic
  58. )
  59. except Exception as e:
  60. logger.error(f"{trace_id} - 任务处理失败: {e} /n {traceback.format_exc()}")
  61. aliyun_logger.logging(
  62. code="9001",
  63. message=f"处理消息失败: {str(e)} /n {traceback.format_exc()}",
  64. trace_id=trace_id,
  65. data={
  66. "error_type": type(e).__name__,
  67. "stack_trace": traceback.format_exc(),
  68. "message_body": message.message_body
  69. },
  70. account=topic
  71. )
  72. # 自动重启消费循环
  73. while not stop_event.is_set():
  74. try:
  75. await consumer.run_forever(handle_single_message)
  76. except Exception as e:
  77. aliyun_logger.logging(
  78. code="9002",
  79. message=f"{topic} 消费循环异常即将重启: {str(e)}",
  80. data={
  81. "error_type": type(e).__name__,
  82. "stack_trace": traceback.format_exc(),
  83. },
  84. account=topic
  85. )
  86. logger.warning(f"[{topic}] 消费循环异常: {e},5秒后重启")
  87. await asyncio.sleep(5)
  88. async def run_all_topics(topics: List[str]):
  89. stop_event = asyncio.Event()
  90. loop = asyncio.get_running_loop()
  91. def shutdown():
  92. logger.warning("[系统] 收到停止信号,准备优雅退出...")
  93. aliyun_logger.logging(
  94. code="1600",
  95. message="[系统] 收到停止信号,准备优雅退出...",
  96. )
  97. stop_event.set()
  98. for sig in [signal.SIGINT, signal.SIGTERM]:
  99. loop.add_signal_handler(sig, shutdown)
  100. tasks = [asyncio.create_task(async_handle_topic(topic, stop_event)) for topic in topics]
  101. await stop_event.wait() # 等待停止信号
  102. logger.warning(f"[系统] 正在取消所有消费任务...{tasks}")
  103. aliyun_logger.logging(
  104. code="1601",
  105. message="[系统] 收到停止信号,准备优雅退出...",
  106. data=f"任务列表{tasks}"
  107. )
  108. for task in tasks:
  109. task.cancel()
  110. results = await asyncio.gather(*tasks, return_exceptions=True)
  111. for idx, result in enumerate(results):
  112. if isinstance(result, Exception):
  113. logger.error(f"[系统] 任务 {topics[idx]} 异常退出: {result}")
  114. logger.warning(f"[系统] 所有任务已退出,进程已关闭...")
  115. aliyun_logger.logging(
  116. code="1602",
  117. message="[系统] 所有任务已退出,进程已关闭...",
  118. data=f"任务列表{tasks}"
  119. )
  120. def handle_topic_group(topics: List[str]):
  121. """子进程入口函数:启动异步事件循环处理该组 topics。"""
  122. asyncio.run(run_all_topics(topics))