main.py 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148
  1. import json
  2. import time
  3. import traceback
  4. from multiprocessing import Process, cpu_count
  5. from typing import List, Dict
  6. import asyncio
  7. from application.config.common.log.logger_manager import LoggerManager
  8. from utils.trace_utils import generate_trace_id
  9. from application.config.common import get_consumer, ack_message
  10. from application.functions.async_mysql_service import AsyncMysqlService
  11. from application.spiders.spider_registry import get_spider_class, SPIDER_CLASS_MAP
  12. from application.functions.rocketmq_consumer import AsyncRocketMQConsumer
  13. # ------------------------------- Topic 协程处理核心 -------------------------------
  14. # 每个进程共享的 mysql service 实例(全局变量)
  15. mysql_service: AsyncMysqlService = None
  16. async def async_handle_topic(topic: str):
  17. """
  18. 单个 topic 的消费逻辑,运行在协程中:
  19. - 从 MQ 中消费消息;
  20. - 根据消息内容执行对应爬虫;
  21. - 使用异步数据库服务查询配置;
  22. - 记录日志、确认消息。
  23. """
  24. logger = LoggerManager.get_logger(topic, "worker")
  25. aliyun_logger = LoggerManager.get_aliyun_logger(topic, "worker")
  26. # 每个 topic 创建独立的 consumer 实例
  27. consumer = AsyncRocketMQConsumer(topic_name=topic, group_id=topic)
  28. async def handle_single_message(message):
  29. trace_id = generate_trace_id()
  30. try:
  31. payload = json.loads(message.message_body)
  32. platform = payload["platform"]
  33. mode = payload["mode"]
  34. task_id = payload["id"]
  35. user_list = await mysql_service.get_user_list(task_id)
  36. rule_dict = await mysql_service.get_rule_dict(task_id)
  37. CrawlerClass = get_spider_class(topic)
  38. crawler = CrawlerClass(
  39. rule_dict=rule_dict,
  40. user_list=user_list,
  41. trace_id=trace_id
  42. )
  43. await crawler.run()
  44. # ack 由 run 成功后执行
  45. await consumer.ack_message(message.receipt_handle)
  46. aliyun_logger.logging(code="1000", message="任务成功完成并确认消息", trace_id=trace_id)
  47. except Exception as e:
  48. aliyun_logger.logging(
  49. code="9001",
  50. message=f"处理消息失败: {e}\n{traceback.format_exc()}",
  51. trace_id=trace_id,
  52. data=message.message_body,
  53. )
  54. # 消费循环启动
  55. await consumer.run_forever(handle_single_message)
  56. async def run_all_topics(topics: List[str]):
  57. """
  58. 启动当前进程中所有 topic 的协程监听任务。
  59. 初始化全局 AsyncMysqlService 实例。
  60. """
  61. global mysql_service
  62. mysql_service = AsyncMysqlService()
  63. await mysql_service.init() # 初始化连接池
  64. tasks = [asyncio.create_task(async_handle_topic(topic)) for topic in topics]
  65. await asyncio.gather(*tasks)
  66. def handle_topic_group(topics: List[str]):
  67. """
  68. 子进程入口函数:
  69. 启动异步事件循环处理该组 topics。
  70. """
  71. asyncio.run(run_all_topics(topics))
  72. # ------------------------------- 主调度部分 -------------------------------
  73. def split_topics(topics: List[str], num_groups: int) -> List[List[str]]:
  74. """
  75. 将所有 topic 平均划分为 num_groups 组,用于分配给子进程。
  76. """
  77. return [topics[i::num_groups] for i in range(num_groups)]
  78. def start_worker_process(group_id: int, topic_group: List[str], process_map: Dict[int, Process]):
  79. """
  80. 启动一个子进程处理一组 topic。
  81. """
  82. p = Process(target=handle_topic_group, args=(topic_group,), name=f"Worker-{group_id}")
  83. p.start()
  84. process_map[group_id] = p
  85. print(f"[主进程] 启动进程 PID={p.pid} 处理 topics={topic_group}")
  86. def main():
  87. """
  88. 主调度入口:
  89. - 获取全部爬虫 topic;
  90. - 按 CPU 核心数分组;
  91. - 启动子进程运行;
  92. - 监控子进程状态,自动恢复。
  93. """
  94. topic_list = list(SPIDER_CLASS_MAP.keys())
  95. print(f"[主进程] 监听 Topics: {topic_list}")
  96. num_cpus = cpu_count()
  97. topic_groups = split_topics(topic_list, num_cpus)
  98. print(f"[主进程] CPU 核心数: {num_cpus},将启动进程数: {len(topic_groups)}")
  99. process_map: Dict[int, Process] = {}
  100. for group_id, topic_group in enumerate(topic_groups):
  101. start_worker_process(group_id, topic_group, process_map)
  102. # 主进程持续监控子进程状态
  103. try:
  104. while True:
  105. time.sleep(5)
  106. for group_id, p in list(process_map.items()):
  107. if not p.is_alive():
  108. print(f"[监控] 进程 {p.name} PID={p.pid} 已崩溃,正在重启...")
  109. time.sleep(2)
  110. start_worker_process(group_id, topic_groups[group_id], process_map)
  111. except KeyboardInterrupt:
  112. print("[主进程] 接收到退出信号,终止所有子进程...")
  113. for p in process_map.values():
  114. p.terminate()
  115. for p in process_map.values():
  116. p.join()
  117. if __name__ == '__main__':
  118. main()