import asyncio from mq_http_sdk.mq_client import * from mq_http_sdk.mq_consumer import * from mq_http_sdk.mq_exception import MQExceptionBase sys.path.append(os.getcwd()) from application.common.messageQueue import get_consumer, ack_message from application.common.log import AliyunLogger from application.common.mysql import MysqlHelper from application.config import TopicGroup async def run(): """ 传入参数,然后根据参数执行爬虫代码 :return: """ # 创建并等待一个子进程 process = await asyncio.create_subprocess_shell("python3 test2.py") # 等待子进程完成 await process.wait() async def main(): spider_list = TopicGroup().produce() async_tasks = [] # 异步任务池 while spider_list: for spider in spider_list: topic = spider['topic'] group = spider['group'] consumer = get_consumer(topic, group) messages = consumer.consume_message(batch=1, wait_seconds=5) if messages: task = asyncio.create_task(run()) async_tasks.append(task) else: continue if __name__ == '__main__': # 运行主事件循环 asyncio.run(main())