123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130 |
- import json
- import time
- import traceback
- from multiprocessing import Process, cpu_count
- from typing import List, Dict
- import asyncio
- from application.config.common import LoggerManager
- from utils.trace_utils import generate_trace_id
- from application.config.common import get_consumer, ack_message
- from functions import MysqlService
- from application.spiders.spider_registry import get_spider_class, SPIDER_CLASS_MAP
- # ------------------------------- Topic 协程处理核心 -------------------------------
- async def async_handle_topic(topic: str):
- consumer = get_consumer(topic_name=topic, group_id=topic)
- logger = LoggerManager.get_logger(topic, "worker")
- aliyun_logger = LoggerManager.get_aliyun_logger(topic, "worker")
- while True:
- try:
- messages = consumer.consume_message(wait_seconds=10, batch_size=1)
- if not messages:
- await asyncio.sleep(1)
- continue
- for message in messages:
- trace_id = generate_trace_id()
- try:
- payload = json.loads(message.message_body)
- platform = payload["platform"]
- mode = payload["mode"]
- task_id = payload["id"]
- mysql_service = MysqlService(platform, mode, task_id)
- user_list = mysql_service.get_user_list()
- rule_dict = mysql_service.get_rule_dict()
- CrawlerClass = get_spider_class(topic)
- crawler = CrawlerClass(
- rule_dict=rule_dict,
- user_list=user_list,
- trace_id=trace_id
- )
- await crawler.run()
- ack_message(mode, platform, message, consumer, trace_id=trace_id)
- aliyun_logger.logging(code="1000", message="任务成功完成并确认消息", trace_id=trace_id)
- except Exception as e:
- aliyun_logger.logging(
- code="9001",
- message=f"处理消息失败: {e}\n{traceback.format_exc()}",
- trace_id=trace_id,
- data=message.message_body,
- )
- except Exception as err:
- logger.error(f"[{topic}] 消费失败: {err}\n{traceback.format_exc()}")
- await asyncio.sleep(5)
- async def run_all_topics(topics: List[str]):
- """
- 启动当前进程内所有 topic 的协程任务。
- """
- tasks = [asyncio.create_task(async_handle_topic(topic)) for topic in topics]
- await asyncio.gather(*tasks)
- def handle_topic_group(topics: List[str]):
- """
- 子进程入口:运行一个事件循环,处理当前分组内的所有 topics。
- """
- asyncio.run(run_all_topics(topics))
- # ------------------------------- 主调度部分 -------------------------------
- def split_topics(topics: List[str], num_groups: int) -> List[List[str]]:
- """
- 将所有 topic 平均分配为 num_groups 组(用于多个进程)。
- """
- return [topics[i::num_groups] for i in range(num_groups)]
- def start_worker_process(group_id: int, topic_group: List[str], process_map: Dict[int, Process]):
- """
- 启动一个新的子进程来处理一组 topic。
- """
- p = Process(target=handle_topic_group, args=(topic_group,), name=f"Worker-{group_id}")
- p.start()
- process_map[group_id] = p
- print(f"[主进程] 启动进程 PID={p.pid} 处理 topics={topic_group}")
- def main():
- # 获取所有已注册的爬虫 topic 列表
- topic_list = list(SPIDER_CLASS_MAP.keys())
- print(f"监听 Topics: {topic_list}")
- # 使用 CPU 核心数决定进程数
- num_cpus = cpu_count()
- topic_groups = split_topics(topic_list, num_cpus)
- print(f"CPU 核心数: {num_cpus}, 启动进程数: {len(topic_groups)}")
- # 启动子进程
- process_map: Dict[int, Process] = {}
- for group_id, topic_group in enumerate(topic_groups):
- start_worker_process(group_id, topic_group, process_map)
- # 持续监控子进程状态,异常退出自动重启
- try:
- while True:
- time.sleep(5)
- for group_id, p in list(process_map.items()):
- if not p.is_alive():
- print(f"[监控] 进程 {p.name} PID={p.pid} 已崩溃,尝试重启中...")
- time.sleep(2)
- start_worker_process(group_id, topic_groups[group_id], process_map)
- except KeyboardInterrupt:
- print("接收到退出信号,终止所有进程...")
- for p in process_map.values():
- p.terminate()
- for p in process_map.values():
- p.join()
- if __name__ == '__main__':
- main()
|