main.py 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129
  1. import importlib
  2. import json
  3. import time
  4. import traceback
  5. from concurrent.futures import ThreadPoolExecutor, as_completed, Future
  6. from typing import Dict
  7. from application.common.logger_manager import LoggerManager
  8. from application.common.trace_utils import generate_trace_id
  9. from application.common import get_consumer, ack_message
  10. from crawler_worker.universal_crawler import UniversalCrawler
  11. from application.config import TopicGroup
  12. from application.functions.mysql_service import MysqlService
  13. from utils.config_loader import ConfigLoader
  14. def import_custom_class(class_path: str):
  15. """
  16. 动态导入爬虫类,例如 crawler_worker.custom.xx.Crawler
  17. """
  18. module_path, class_name = class_path.rsplit(".", 1)
  19. module = importlib.import_module(module_path)
  20. return getattr(module, class_name)
  21. def handle_message(topic: str, mode: str):
  22. """
  23. 单线程消费指定 topic 消息的核心逻辑,会持续轮询 MQ
  24. """
  25. consumer = get_consumer(topic_name=topic, group_id=topic)
  26. platform_config = ConfigLoader().get_platform_config(topic)
  27. while True:
  28. try:
  29. messages = consumer.consume_message(wait_seconds=10, batch_size=1)
  30. if not messages:
  31. continue
  32. for message in messages:
  33. trace_id = generate_trace_id()
  34. body = message.message_body
  35. try:
  36. payload = json.loads(body)
  37. platform = payload["platform"]
  38. mode = payload["mode"]
  39. task_id = payload["id"]
  40. # 初始化日志
  41. logger = LoggerManager.get_logger(platform, mode)
  42. aliyun_logger = LoggerManager.get_aliyun_logger(platform, mode)
  43. logger.info(f"[trace_id={trace_id}] 收到任务: {body}")
  44. # 初始化配置、用户与规则
  45. mysql_service = MysqlService(platform, mode, task_id)
  46. user_list = mysql_service.get_user_list()
  47. rule_dict = mysql_service.get_rule_dict()
  48. custom_class = platform_config.get("custom_class")
  49. # 实例化爬虫类
  50. CrawlerClass = import_custom_class(custom_class) if custom_class else UniversalCrawler
  51. crawler = CrawlerClass(
  52. platform_config=platform_config,
  53. rule_dict=rule_dict,
  54. user_list=user_list,
  55. trace_id=trace_id
  56. )
  57. crawler.run()
  58. # 爬虫成功,确认消息
  59. ack_message(mode, platform, message, consumer, trace_id=trace_id)
  60. aliyun_logger.logging(code="1000", message="任务成功完成并确认消息", trace_id=trace_id)
  61. except Exception as e:
  62. aliyun_logger.logging(
  63. code="9001",
  64. message=f"处理消息失败(未确认 ack): {e}\n{traceback.format_exc()}",
  65. trace_id=trace_id,
  66. data=body,
  67. )
  68. except Exception as err:
  69. logger = LoggerManager.get_logger(topic, mode)
  70. logger.error(f"[{topic}] 消费失败: {err}\n{traceback.format_exc()}")
  71. time.sleep(5) # 防止崩溃后频繁拉起
  72. def monitor_and_restart(future: Future, topic: str, mode: str, pool: ThreadPoolExecutor, thread_map: Dict[str, Future]):
  73. """
  74. 线程崩溃恢复监控器:线程挂掉后自动重启
  75. """
  76. try:
  77. future.result() # 获取结果,触发异常
  78. except Exception as e:
  79. print(f"[监控] 线程 {topic} 异常退出:{e},5秒后尝试重启")
  80. time.sleep(5)
  81. # 重新提交任务
  82. new_future = pool.submit(handle_message, topic, mode)
  83. thread_map[topic] = new_future
  84. # 注册新的回调
  85. new_future.add_done_callback(lambda f: monitor_and_restart(f, topic, mode, pool, thread_map))
  86. def main():
  87. topic_list = TopicGroup()
  88. print(f"监听 Topics:{topic_list}")
  89. # 限制最大线程数为 topic 数量
  90. pool = ThreadPoolExecutor(max_workers=len(topic_list))
  91. thread_map: Dict[str, Future] = {}
  92. for topic in topic_list:
  93. mode = topic.split("_")[1]
  94. future = pool.submit(handle_message, topic, mode)
  95. thread_map[topic] = future
  96. # 设置监控器:任务崩溃后自动重启
  97. future.add_done_callback(lambda f, t=topic, m=mode: monitor_and_restart(f, t, m, pool, thread_map))
  98. # 阻塞主线程防止退出(线程池会维持所有子线程)
  99. try:
  100. while True:
  101. time.sleep(60)
  102. except KeyboardInterrupt:
  103. print("接收到退出指令,正在关闭线程池...")
  104. pool.shutdown(wait=True)
  105. if __name__ == '__main__':
  106. main()