async_consumer.py 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167
  1. import asyncio
  2. import json
  3. import traceback
  4. from typing import List
  5. import signal
  6. from config import settings
  7. from core.base.async_redis_client import RedisManager
  8. from core.utils.log.logger_manager import LoggerManager
  9. from core.utils.trace_utils import generate_trace_id, TraceContext
  10. from services.async_mysql_service import AsyncMysqlService
  11. from spiders.spider_registry import get_spider_class
  12. logger = LoggerManager.get_logger()
  13. aliyun_logger = LoggerManager.get_aliyun_logger()
  14. async def async_handle_topic(topic: str, stop_event: asyncio.Event):
  15. """
  16. 单个 topic 的消费逻辑,运行在协程中:
  17. - 从 MQ 中消费消息(单条处理,处理完再拉取下一条);
  18. - 根据消息内容执行对应爬虫;
  19. - 使用异步数据库服务查询配置;
  20. - 记录日志、确认消息。
  21. """
  22. # 每个 topic 创建独立的 consumer 实例(使用优化后的 AsyncRocketMQConsumer)
  23. from services.async_mq_consumer import AsyncRocketMQConsumer
  24. from services.async_redis_service import AsyncRedisService
  25. redis_controller = AsyncRedisService()
  26. consumer = AsyncRocketMQConsumer(topic_name=topic, group_id=topic)
  27. async def handle_single_message(message):
  28. """处理单条消息的业务逻辑(不含拉取和循环)"""
  29. message_id = message.message_id
  30. status = await redis_controller.get_status(message_id)
  31. if status == "1":
  32. logger.info(f"[{topic}] MessageID: {message_id} 已执行完成,直接 Ack 跳过")
  33. await consumer.ack_message(message.receipt_handle)
  34. return
  35. if not status:
  36. logger.info(f"[{topic}] MessageID: {message_id} 未执行,标记为处理中")
  37. await redis_controller.mark_processing(message_id)
  38. try:
  39. payload = json.loads(message.message_body)
  40. task_id = payload["task_id"]
  41. logger.info(f"[{topic}]接收到任务消息: {task_id}")
  42. aliyun_logger.logging(
  43. code="1000",
  44. message="任务接收成功",
  45. data=payload,
  46. account=topic
  47. )
  48. # 从数据库查询配置
  49. async with AsyncMysqlService() as mysql:
  50. user_list = await mysql.get_user_list(task_id)
  51. rule_dict = await mysql.get_rule_dict(task_id)
  52. # 执行爬虫任务
  53. CrawlerClass = get_spider_class(topic)
  54. crawler = CrawlerClass(
  55. rule_dict=rule_dict,
  56. user_list=user_list,
  57. )
  58. await crawler.run() # 爬虫成功执行后再确认消息
  59. logger.info(f"[{topic}]任务 {task_id} 执行成功")
  60. aliyun_logger.logging(
  61. code="1010",
  62. message="任务执行成功",
  63. data={"task_id": task_id, "topic": topic},
  64. account=topic
  65. )
  66. await redis_controller.mark_done(message_id)
  67. except Exception as e:
  68. logger.error(f"[{topic}]任务处理失败 MessageID: {message_id}: {e} \n {traceback.format_exc()}")
  69. aliyun_logger.logging(
  70. code="9001",
  71. message=f"处理消息失败: {str(e)} \n {traceback.format_exc()}",
  72. data={
  73. "error_type": type(e).__name__,
  74. "stack_trace": traceback.format_exc(),
  75. "message_body": message.message_body
  76. },
  77. account=topic
  78. )
  79. # 处理失败不 Ack,消息会被 MQ 重新投递(依赖 MQ 的重试机制)
  80. # 独立的消费循环:拉取消息并调用处理函数
  81. async def consume_loop():
  82. logger.info(f"[{topic}] 启动消费循环,开始拉取消息...")
  83. while not stop_event.is_set(): # 监听停止信号,支持优雅退出
  84. try:
  85. # 拉取单条消息(依赖优化后的 receive_message,无消息时返回 None 不报错)
  86. message = await consumer.receive_message()
  87. if message:
  88. with TraceContext() as trace_id: # 生成 trace_id 并绑定到上下文
  89. # 有消息则处理,处理完成后再进入下一次循环
  90. await handle_single_message(message)
  91. else:
  92. # 无消息时短暂休眠,避免频繁空轮询
  93. await asyncio.sleep(1)
  94. except Exception as e:
  95. # 非消息处理的异常(如 MQ 连接失败),记录并重试
  96. logger.error(f"[{topic}] 消费循环异常: {e}", exc_info=True)
  97. aliyun_logger.logging(
  98. code="9002",
  99. message=f"{topic} 消费循环异常,即将重试: {str(e)}",
  100. data={"error_type": type(e).__name__, "stack_trace": traceback.format_exc()},
  101. account=topic
  102. )
  103. await asyncio.sleep(5) # 异常后延迟重试,减轻服务压力
  104. logger.info(f"[{topic}] 消费循环已停止(收到退出信号)")
  105. # 启动消费循环(这是消费逻辑的入口)
  106. await consume_loop()
  107. async def run_all_topics(topics: List[str]):
  108. await RedisManager.init(redis_url=settings.redis_url)
  109. stop_event = asyncio.Event()
  110. loop = asyncio.get_running_loop()
  111. def shutdown():
  112. """处理停止信号(如 Ctrl+C),触发优雅退出"""
  113. logger.warning("[系统] 收到停止信号,准备优雅退出...")
  114. aliyun_logger.logging(
  115. code="1600",
  116. message="[系统] 收到停止信号,准备优雅退出...",
  117. )
  118. stop_event.set()
  119. asyncio.create_task(RedisManager.close()) # 关闭连接池
  120. # 注册信号处理(支持 Ctrl+C 和 kill 命令)
  121. for sig in [signal.SIGINT, signal.SIGTERM]:
  122. loop.add_signal_handler(sig, shutdown)
  123. # 为每个 topic 创建独立协程任务
  124. tasks = [asyncio.create_task(async_handle_topic(topic, stop_event)) for topic in topics]
  125. await stop_event.wait() # 等待退出信号
  126. # 取消所有任务并等待结束
  127. logger.warning(f"[系统] 正在取消所有消费任务...")
  128. for task in tasks:
  129. task.cancel()
  130. # 收集任务结果,忽略取消异常
  131. results = await asyncio.gather(*tasks, return_exceptions=True)
  132. for idx, result in enumerate(results):
  133. if isinstance(result, Exception) and not isinstance(result, asyncio.CancelledError):
  134. logger.error(f"[系统] 任务 {topics[idx]} 异常退出: {result}")
  135. logger.warning(f"[系统] 所有任务已退出,进程已关闭...")
  136. aliyun_logger.logging(
  137. code="1602",
  138. message="[系统] 所有任务已退出,进程已关闭...",
  139. data={"task_count": len(tasks)}
  140. )
  141. def handle_topic_group(topics: List[str]):
  142. """子进程入口函数:启动异步事件循环处理该组 topics。"""
  143. asyncio.run(run_all_topics(topics))