import importlib import threading import traceback import json import time import uuid from application.common import AliyunLogger, get_consumer, ack_message from application.common.log import Local from crawler_worker.universal_crawler import UniversalCrawler from application.config import TopicGroup from application.functions.mysql_service import MysqlService from utils.config_loader import ConfigLoader def generate_trace_id(): return f"{uuid.uuid4().hex}{int(time.time() * 1000)}" def import_custom_class(class_path): """ 动态导入模块中的类,如 crawler_worker.universal_crawler.UniversalCrawler """ module_path, class_name = class_path.rsplit(".", 1) print(module_path, class_name) module = importlib.import_module(module_path) return getattr(module, class_name) def handle_message(topic: str, mode: str): consumer = get_consumer(topic_name=topic, group_id=topic) logger = AliyunLogger(platform=topic, mode=mode) platform_config = ConfigLoader().get_platform_config(topic) while True: try: messages = consumer.consume_message(wait_seconds=10, batch_size=1) if not messages: continue for message in messages: trace_id = generate_trace_id() body = message.message_body try: payload = json.loads(body) platform = payload["platform"] mode = payload["mode"] task_id = payload["id"] mysql_service = MysqlService(task_id, mode, platform) logger.logging( 1001, "开始一轮抓取", data=payload, trace_id=trace_id ) Local.init_logger(platform, mode).info(f"[trace_id={trace_id}] 收到任务: {body}") # 加载 user_list 与 rule_dict user_list = mysql_service.get_user_list() rule_dict = mysql_service.get_rule_dict() custom_class = platform_config.get("custom_class") # 自定义类 try: if custom_class: CrawlerClass = import_custom_class(custom_class) else: CrawlerClass = UniversalCrawler crawler = CrawlerClass( platform_config=platform_config, # 把整段配置传进去 rule_dict=rule_dict, user_list=user_list, trace_id=trace_id ) crawler.run() except Exception as e: print(f"[{topic}] 爬虫运行异常: {e}") # 执行成功后 ack ack_message(mode, platform, message, consumer, trace_id=trace_id) logger.logging(code="1000", message="任务成功完成并确认消息", trace_id=trace_id) except Exception as e: logger.logging( code="9001", message=f"处理消息失败(未确认 ack): {e}\n{traceback.format_exc()}", trace_id=trace_id, data=body, ) # 不 ack,等待下次重试 except Exception as err: logger.logging(code="9002", message=f"消费失败: {err}\n{traceback.format_exc()}") def main(): topic_list = TopicGroup() print(f"监听 Topics:{topic_list}") threads = [] for topic in topic_list: mode = topic.split("_")[1] t = threading.Thread(target=handle_message, args=(topic, mode,)) t.start() threads.append(t) for t in threads: t.join() if __name__ == '__main__': main()