import importlib import json import time import traceback from concurrent.futures import ThreadPoolExecutor, as_completed, Future from typing import Dict from application.common.logger_manager import LoggerManager from application.common.trace_utils import generate_trace_id from application.common import get_consumer, ack_message from crawler_worker.universal_crawler import UniversalCrawler from application.config import TopicGroup from application.functions.mysql_service import MysqlService from utils.config_loader import ConfigLoader def import_custom_class(class_path: str): """ 动态导入爬虫类,例如 crawler_worker.custom.xx.Crawler """ module_path, class_name = class_path.rsplit(".", 1) module = importlib.import_module(module_path) return getattr(module, class_name) def handle_message(topic: str, mode: str): """ 单线程消费指定 topic 消息的核心逻辑,会持续轮询 MQ """ consumer = get_consumer(topic_name=topic, group_id=topic) platform_config = ConfigLoader().get_platform_config(topic) while True: try: messages = consumer.consume_message(wait_seconds=10, batch_size=1) if not messages: continue for message in messages: trace_id = generate_trace_id() body = message.message_body try: payload = json.loads(body) platform = payload["platform"] mode = payload["mode"] task_id = payload["id"] # 初始化日志 logger = LoggerManager.get_logger(platform, mode) aliyun_logger = LoggerManager.get_aliyun_logger(platform, mode) logger.info(f"[trace_id={trace_id}] 收到任务: {body}") # 初始化配置、用户与规则 mysql_service = MysqlService(platform, mode, task_id) user_list = mysql_service.get_user_list() rule_dict = mysql_service.get_rule_dict() custom_class = platform_config.get("custom_class") # 实例化爬虫类 CrawlerClass = import_custom_class(custom_class) if custom_class else UniversalCrawler crawler = CrawlerClass( platform_config=platform_config, rule_dict=rule_dict, user_list=user_list, trace_id=trace_id ) crawler.run() # 爬虫成功,确认消息 ack_message(mode, platform, message, consumer, trace_id=trace_id) aliyun_logger.logging(code="1000", message="任务成功完成并确认消息", trace_id=trace_id) except Exception as e: aliyun_logger.logging( code="9001", message=f"处理消息失败(未确认 ack): {e}\n{traceback.format_exc()}", trace_id=trace_id, data=body, ) except Exception as err: logger = LoggerManager.get_logger(topic, mode) logger.error(f"[{topic}] 消费失败: {err}\n{traceback.format_exc()}") time.sleep(5) # 防止崩溃后频繁拉起 def monitor_and_restart(future: Future, topic: str, mode: str, pool: ThreadPoolExecutor, thread_map: Dict[str, Future]): """ 线程崩溃恢复监控器:线程挂掉后自动重启 """ try: future.result() # 获取结果,触发异常 except Exception as e: print(f"[监控] 线程 {topic} 异常退出:{e},5秒后尝试重启") time.sleep(5) # 重新提交任务 new_future = pool.submit(handle_message, topic, mode) thread_map[topic] = new_future # 注册新的回调 new_future.add_done_callback(lambda f: monitor_and_restart(f, topic, mode, pool, thread_map)) def main(): topic_list = TopicGroup() print(f"监听 Topics:{topic_list}") # 限制最大线程数为 topic 数量 pool = ThreadPoolExecutor(max_workers=len(topic_list)) thread_map: Dict[str, Future] = {} for topic in topic_list: mode = topic.split("_")[1] future = pool.submit(handle_message, topic, mode) thread_map[topic] = future # 设置监控器:任务崩溃后自动重启 future.add_done_callback(lambda f, t=topic, m=mode: monitor_and_restart(f, t, m, pool, thread_map)) # 阻塞主线程防止退出(线程池会维持所有子线程) try: while True: time.sleep(60) except KeyboardInterrupt: print("接收到退出指令,正在关闭线程池...") pool.shutdown(wait=True) if __name__ == '__main__': main()