async_consumer.py 2.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475
  1. import json
  2. import traceback
  3. from typing import List
  4. import asyncio
  5. from core.utils.log.logger_manager import LoggerManager
  6. from core.utils.trace_utils import generate_trace_id
  7. from services.async_mysql_service import AsyncMysqlService
  8. from spiders.spider_registry import get_spider_class
  9. async def async_handle_topic(topic: str):
  10. """
  11. 单个 topic 的消费逻辑,运行在协程中:
  12. - 从 MQ 中消费消息;
  13. - 根据消息内容执行对应爬虫;
  14. - 使用异步数据库服务查询配置;
  15. - 记录日志、确认消息。
  16. """
  17. logger = LoggerManager.get_logger()
  18. aliyun_logger = LoggerManager.get_aliyun_logger()
  19. # 每个 topic 创建独立的 consumer 实例
  20. from services.rocketmq_consumer import AsyncRocketMQConsumer
  21. consumer = AsyncRocketMQConsumer(topic_name=topic, group_id=topic)
  22. async def handle_single_message(message):
  23. trace_id = generate_trace_id()
  24. try:
  25. payload = json.loads(message.message_body)
  26. task_id = payload["id"]
  27. async with AsyncMysqlService("system", "crawler") as mysql:
  28. user_list = await mysql.get_user_list(task_id)
  29. rule_dict = await mysql.get_rule_dict(task_id)
  30. CrawlerClass = get_spider_class(topic)
  31. crawler = CrawlerClass(
  32. rule_dict=rule_dict,
  33. user_list=user_list,
  34. trace_id=trace_id
  35. )
  36. await crawler.run()
  37. # ack 由 run 成功后执行
  38. await consumer.ack_message(message.receipt_handle)
  39. except Exception as e:
  40. aliyun_logger.logging(
  41. code="9001",
  42. message=f"处理消息失败: {str(e)}",
  43. trace_id=trace_id,
  44. data={
  45. "error_type": type(e).__name__,
  46. "stack_trace": traceback.format_exc(),
  47. "message_body": message.message_body
  48. }
  49. )
  50. # 消费循环启动
  51. await consumer.run_forever(handle_single_message)
  52. async def run_all_topics(topics: List[str]):
  53. """
  54. 启动当前进程中所有 topic 的协程监听任务。
  55. 初始化全局 AsyncMysqlService 实例。
  56. """
  57. tasks = [asyncio.create_task(async_handle_topic(topic)) for topic in topics]
  58. await asyncio.gather(*tasks)
  59. def handle_topic_group(topics: List[str]):
  60. """子进程入口函数:启动异步事件循环处理该组 topics。"""
  61. asyncio.run(run_all_topics(topics))