123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475 |
- import json
- import traceback
- from typing import List
- import asyncio
- from core.utils.log.logger_manager import LoggerManager
- from core.utils.trace_utils import generate_trace_id
- from services.async_mysql_service import AsyncMysqlService
- from spiders.spider_registry import get_spider_class
- async def async_handle_topic(topic: str):
- """
- 单个 topic 的消费逻辑,运行在协程中:
- - 从 MQ 中消费消息;
- - 根据消息内容执行对应爬虫;
- - 使用异步数据库服务查询配置;
- - 记录日志、确认消息。
- """
- logger = LoggerManager.get_logger()
- aliyun_logger = LoggerManager.get_aliyun_logger()
- # 每个 topic 创建独立的 consumer 实例
- from services.rocketmq_consumer import AsyncRocketMQConsumer
- consumer = AsyncRocketMQConsumer(topic_name=topic, group_id=topic)
- async def handle_single_message(message):
- trace_id = generate_trace_id()
- try:
- payload = json.loads(message.message_body)
- task_id = payload["id"]
- async with AsyncMysqlService("system", "crawler") as mysql:
- user_list = await mysql.get_user_list(task_id)
- rule_dict = await mysql.get_rule_dict(task_id)
- CrawlerClass = get_spider_class(topic)
- crawler = CrawlerClass(
- rule_dict=rule_dict,
- user_list=user_list,
- trace_id=trace_id
- )
- await crawler.run()
- # ack 由 run 成功后执行
- await consumer.ack_message(message.receipt_handle)
- except Exception as e:
- aliyun_logger.logging(
- code="9001",
- message=f"处理消息失败: {str(e)}",
- trace_id=trace_id,
- data={
- "error_type": type(e).__name__,
- "stack_trace": traceback.format_exc(),
- "message_body": message.message_body
- }
- )
- # 消费循环启动
- await consumer.run_forever(handle_single_message)
- async def run_all_topics(topics: List[str]):
- """
- 启动当前进程中所有 topic 的协程监听任务。
- 初始化全局 AsyncMysqlService 实例。
- """
- tasks = [asyncio.create_task(async_handle_topic(topic)) for topic in topics]
- await asyncio.gather(*tasks)
- def handle_topic_group(topics: List[str]):
- """子进程入口函数:启动异步事件循环处理该组 topics。"""
- asyncio.run(run_all_topics(topics))
|