123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129 |
- import importlib
- import json
- import time
- import traceback
- from concurrent.futures import ThreadPoolExecutor, as_completed, Future
- from typing import Dict
- from application.common.logger_manager import LoggerManager
- from application.common.trace_utils import generate_trace_id
- from application.common import get_consumer, ack_message
- from crawler_worker.universal_crawler import UniversalCrawler
- from application.config import TopicGroup
- from application.functions.mysql_service import MysqlService
- from utils.config_loader import ConfigLoader
- def import_custom_class(class_path: str):
- """
- 动态导入爬虫类,例如 crawler_worker.custom.xx.Crawler
- """
- module_path, class_name = class_path.rsplit(".", 1)
- module = importlib.import_module(module_path)
- return getattr(module, class_name)
- def handle_message(topic: str, mode: str):
- """
- 单线程消费指定 topic 消息的核心逻辑,会持续轮询 MQ
- """
- consumer = get_consumer(topic_name=topic, group_id=topic)
- platform_config = ConfigLoader().get_platform_config(topic)
- while True:
- try:
- messages = consumer.consume_message(wait_seconds=10, batch_size=1)
- if not messages:
- continue
- for message in messages:
- trace_id = generate_trace_id()
- body = message.message_body
- try:
- payload = json.loads(body)
- platform = payload["platform"]
- mode = payload["mode"]
- task_id = payload["id"]
- # 初始化日志
- logger = LoggerManager.get_logger(platform, mode)
- aliyun_logger = LoggerManager.get_aliyun_logger(platform, mode)
- logger.info(f"[trace_id={trace_id}] 收到任务: {body}")
- # 初始化配置、用户与规则
- mysql_service = MysqlService(platform, mode, task_id)
- user_list = mysql_service.get_user_list()
- rule_dict = mysql_service.get_rule_dict()
- custom_class = platform_config.get("custom_class")
- # 实例化爬虫类
- CrawlerClass = import_custom_class(custom_class) if custom_class else UniversalCrawler
- crawler = CrawlerClass(
- platform_config=platform_config,
- rule_dict=rule_dict,
- user_list=user_list,
- trace_id=trace_id
- )
- crawler.run()
- # 爬虫成功,确认消息
- ack_message(mode, platform, message, consumer, trace_id=trace_id)
- aliyun_logger.logging(code="1000", message="任务成功完成并确认消息", trace_id=trace_id)
- except Exception as e:
- aliyun_logger.logging(
- code="9001",
- message=f"处理消息失败(未确认 ack): {e}\n{traceback.format_exc()}",
- trace_id=trace_id,
- data=body,
- )
- except Exception as err:
- logger = LoggerManager.get_logger(topic, mode)
- logger.error(f"[{topic}] 消费失败: {err}\n{traceback.format_exc()}")
- time.sleep(5) # 防止崩溃后频繁拉起
- def monitor_and_restart(future: Future, topic: str, mode: str, pool: ThreadPoolExecutor, thread_map: Dict[str, Future]):
- """
- 线程崩溃恢复监控器:线程挂掉后自动重启
- """
- try:
- future.result() # 获取结果,触发异常
- except Exception as e:
- print(f"[监控] 线程 {topic} 异常退出:{e},5秒后尝试重启")
- time.sleep(5)
- # 重新提交任务
- new_future = pool.submit(handle_message, topic, mode)
- thread_map[topic] = new_future
- # 注册新的回调
- new_future.add_done_callback(lambda f: monitor_and_restart(f, topic, mode, pool, thread_map))
- def main():
- topic_list = TopicGroup()
- print(f"监听 Topics:{topic_list}")
- # 限制最大线程数为 topic 数量
- pool = ThreadPoolExecutor(max_workers=len(topic_list))
- thread_map: Dict[str, Future] = {}
- for topic in topic_list:
- mode = topic.split("_")[1]
- future = pool.submit(handle_message, topic, mode)
- thread_map[topic] = future
- # 设置监控器:任务崩溃后自动重启
- future.add_done_callback(lambda f, t=topic, m=mode: monitor_and_restart(f, t, m, pool, thread_map))
- # 阻塞主线程防止退出(线程池会维持所有子线程)
- try:
- while True:
- time.sleep(60)
- except KeyboardInterrupt:
- print("接收到退出指令,正在关闭线程池...")
- pool.shutdown(wait=True)
- if __name__ == '__main__':
- main()
|