|
@@ -5,88 +5,101 @@ from multiprocessing import Process, cpu_count
|
|
|
from typing import List, Dict
|
|
|
import asyncio
|
|
|
|
|
|
-from application.config.common import LoggerManager
|
|
|
+from application.config.common.log.logger_manager import LoggerManager
|
|
|
from utils.trace_utils import generate_trace_id
|
|
|
from application.config.common import get_consumer, ack_message
|
|
|
-from application.functions import MysqlService
|
|
|
+from application.functions.async_mysql_service import AsyncMysqlService
|
|
|
from application.spiders.spider_registry import get_spider_class, SPIDER_CLASS_MAP
|
|
|
-
|
|
|
+from application.functions.rocketmq_consumer import AsyncRocketMQConsumer
|
|
|
|
|
|
# ------------------------------- Topic 协程处理核心 -------------------------------
|
|
|
+
|
|
|
+# 每个进程共享的 mysql service 实例(全局变量)
|
|
|
+mysql_service: AsyncMysqlService = None
|
|
|
+
|
|
|
+
|
|
|
async def async_handle_topic(topic: str):
|
|
|
- consumer = get_consumer(topic_name=topic, group_id=topic)
|
|
|
+ """
|
|
|
+ 单个 topic 的消费逻辑,运行在协程中:
|
|
|
+ - 从 MQ 中消费消息;
|
|
|
+ - 根据消息内容执行对应爬虫;
|
|
|
+ - 使用异步数据库服务查询配置;
|
|
|
+ - 记录日志、确认消息。
|
|
|
+ """
|
|
|
logger = LoggerManager.get_logger(topic, "worker")
|
|
|
aliyun_logger = LoggerManager.get_aliyun_logger(topic, "worker")
|
|
|
|
|
|
- while True:
|
|
|
+ # 每个 topic 创建独立的 consumer 实例
|
|
|
+ consumer = AsyncRocketMQConsumer(topic_name=topic, group_id=topic)
|
|
|
+
|
|
|
+ async def handle_single_message(message):
|
|
|
+ trace_id = generate_trace_id()
|
|
|
try:
|
|
|
- messages = consumer.consume_message(wait_seconds=10, batch_size=1)
|
|
|
- if not messages:
|
|
|
- await asyncio.sleep(1)
|
|
|
- continue
|
|
|
-
|
|
|
- for message in messages:
|
|
|
- trace_id = generate_trace_id()
|
|
|
- try:
|
|
|
- payload = json.loads(message.message_body)
|
|
|
- platform = payload["platform"]
|
|
|
- mode = payload["mode"]
|
|
|
- task_id = payload["id"]
|
|
|
-
|
|
|
- mysql_service = MysqlService(platform, mode, task_id)
|
|
|
- user_list = mysql_service.get_user_list()
|
|
|
- rule_dict = mysql_service.get_rule_dict()
|
|
|
-
|
|
|
- CrawlerClass = get_spider_class(topic)
|
|
|
- crawler = CrawlerClass(
|
|
|
- rule_dict=rule_dict,
|
|
|
- user_list=user_list,
|
|
|
- trace_id=trace_id
|
|
|
- )
|
|
|
-
|
|
|
- await crawler.run()
|
|
|
-
|
|
|
- ack_message(mode, platform, message, consumer, trace_id=trace_id)
|
|
|
- aliyun_logger.logging(code="1000", message="任务成功完成并确认消息", trace_id=trace_id)
|
|
|
-
|
|
|
- except Exception as e:
|
|
|
- aliyun_logger.logging(
|
|
|
- code="9001",
|
|
|
- message=f"处理消息失败: {e}\n{traceback.format_exc()}",
|
|
|
- trace_id=trace_id,
|
|
|
- data=message.message_body,
|
|
|
- )
|
|
|
- except Exception as err:
|
|
|
- logger.error(f"[{topic}] 消费失败: {err}\n{traceback.format_exc()}")
|
|
|
- await asyncio.sleep(5)
|
|
|
+ payload = json.loads(message.message_body)
|
|
|
+ platform = payload["platform"]
|
|
|
+ mode = payload["mode"]
|
|
|
+ task_id = payload["id"]
|
|
|
+
|
|
|
+ user_list = await mysql_service.get_user_list(task_id)
|
|
|
+ rule_dict = await mysql_service.get_rule_dict(task_id)
|
|
|
+
|
|
|
+ CrawlerClass = get_spider_class(topic)
|
|
|
+ crawler = CrawlerClass(
|
|
|
+ rule_dict=rule_dict,
|
|
|
+ user_list=user_list,
|
|
|
+ trace_id=trace_id
|
|
|
+ )
|
|
|
+ await crawler.run()
|
|
|
+
|
|
|
+ # ack 由 run 成功后执行
|
|
|
+ await consumer.ack_message(message.receipt_handle)
|
|
|
+ aliyun_logger.logging(code="1000", message="任务成功完成并确认消息", trace_id=trace_id)
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ aliyun_logger.logging(
|
|
|
+ code="9001",
|
|
|
+ message=f"处理消息失败: {e}\n{traceback.format_exc()}",
|
|
|
+ trace_id=trace_id,
|
|
|
+ data=message.message_body,
|
|
|
+ )
|
|
|
+
|
|
|
+ # 消费循环启动
|
|
|
+ await consumer.run_forever(handle_single_message)
|
|
|
|
|
|
|
|
|
async def run_all_topics(topics: List[str]):
|
|
|
"""
|
|
|
- 启动当前进程内所有 topic 的协程任务。
|
|
|
+ 启动当前进程中所有 topic 的协程监听任务。
|
|
|
+ 初始化全局 AsyncMysqlService 实例。
|
|
|
"""
|
|
|
+ global mysql_service
|
|
|
+ mysql_service = AsyncMysqlService()
|
|
|
+ await mysql_service.init() # 初始化连接池
|
|
|
+
|
|
|
tasks = [asyncio.create_task(async_handle_topic(topic)) for topic in topics]
|
|
|
await asyncio.gather(*tasks)
|
|
|
|
|
|
|
|
|
def handle_topic_group(topics: List[str]):
|
|
|
"""
|
|
|
- 子进程入口:运行一个事件循环,处理当前分组内的所有 topics。
|
|
|
+ 子进程入口函数:
|
|
|
+ 启动异步事件循环处理该组 topics。
|
|
|
"""
|
|
|
asyncio.run(run_all_topics(topics))
|
|
|
|
|
|
|
|
|
# ------------------------------- 主调度部分 -------------------------------
|
|
|
+
|
|
|
def split_topics(topics: List[str], num_groups: int) -> List[List[str]]:
|
|
|
"""
|
|
|
- 将所有 topic 平均分配为 num_groups 组(用于多个进程)。
|
|
|
+ 将所有 topic 平均划分为 num_groups 组,用于分配给子进程。
|
|
|
"""
|
|
|
return [topics[i::num_groups] for i in range(num_groups)]
|
|
|
|
|
|
|
|
|
def start_worker_process(group_id: int, topic_group: List[str], process_map: Dict[int, Process]):
|
|
|
"""
|
|
|
- 启动一个新的子进程来处理一组 topic。
|
|
|
+ 启动一个子进程处理一组 topic。
|
|
|
"""
|
|
|
p = Process(target=handle_topic_group, args=(topic_group,), name=f"Worker-{group_id}")
|
|
|
p.start()
|
|
@@ -95,31 +108,36 @@ def start_worker_process(group_id: int, topic_group: List[str], process_map: Dic
|
|
|
|
|
|
|
|
|
def main():
|
|
|
- # 获取所有已注册的爬虫 topic 列表
|
|
|
+ """
|
|
|
+ 主调度入口:
|
|
|
+ - 获取全部爬虫 topic;
|
|
|
+ - 按 CPU 核心数分组;
|
|
|
+ - 启动子进程运行;
|
|
|
+ - 监控子进程状态,自动恢复。
|
|
|
+ """
|
|
|
topic_list = list(SPIDER_CLASS_MAP.keys())
|
|
|
- print(f"监听 Topics: {topic_list}")
|
|
|
+ print(f"[主进程] 监听 Topics: {topic_list}")
|
|
|
|
|
|
- # 使用 CPU 核心数决定进程数
|
|
|
num_cpus = cpu_count()
|
|
|
topic_groups = split_topics(topic_list, num_cpus)
|
|
|
- print(f"CPU 核心数: {num_cpus}, 启动进程数: {len(topic_groups)}")
|
|
|
+ print(f"[主进程] CPU 核心数: {num_cpus},将启动进程数: {len(topic_groups)}")
|
|
|
|
|
|
- # 启动子进程
|
|
|
process_map: Dict[int, Process] = {}
|
|
|
+
|
|
|
for group_id, topic_group in enumerate(topic_groups):
|
|
|
start_worker_process(group_id, topic_group, process_map)
|
|
|
|
|
|
- # 持续监控子进程状态,异常退出自动重启
|
|
|
+ # 主进程持续监控子进程状态
|
|
|
try:
|
|
|
while True:
|
|
|
time.sleep(5)
|
|
|
for group_id, p in list(process_map.items()):
|
|
|
if not p.is_alive():
|
|
|
- print(f"[监控] 进程 {p.name} PID={p.pid} 已崩溃,尝试重启中...")
|
|
|
+ print(f"[监控] 进程 {p.name} PID={p.pid} 已崩溃,正在重启...")
|
|
|
time.sleep(2)
|
|
|
start_worker_process(group_id, topic_groups[group_id], process_map)
|
|
|
except KeyboardInterrupt:
|
|
|
- print("接收到退出信号,终止所有进程...")
|
|
|
+ print("[主进程] 接收到退出信号,终止所有子进程...")
|
|
|
for p in process_map.values():
|
|
|
p.terminate()
|
|
|
for p in process_map.values():
|