import asyncio import json from mq_http_sdk.mq_consumer import * from mq_http_sdk.mq_exception import MQExceptionBase sys.path.append(os.getcwd()) from application.common.messageQueue import get_consumer, ack_message from application.common.log import AliyunLogger from application.common.mysql import MysqlHelper from application.config import TopicGroup async def run(task_id, mode, platform): """ 传入参数,然后根据参数执行爬虫代码 :return: None """ # 创建并等待一个子进程 await asyncio.create_subprocess_shell( "python3 scheduler/run_spider_online.py --task_id {} --mode {} --platform {}".format(task_id, mode, platform)) async def main(): spider_list = TopicGroup().produce() while spider_list: for spider in spider_list: # 获取消息 topic = spider['topic'] group = spider['group'] consumer = get_consumer(topic, group) try: messages = consumer.consume_message(wait_seconds=10, batch_size=1) if messages: # 在这里消费消息,做一些数据处理分析 for single_message in messages: ack_message(mode=spider['mode'], platform=spider['platform'], recv_msgs=messages, consumer=consumer) message_body = single_message.message_body task_id = json.loads(message_body)['id'] print(message_body) # 创建爬虫task await asyncio.create_task(run(task_id, spider['mode'], spider['platform'])) else: message = "Messages Queue is Empty" print(message) except MQExceptionBase as err: # Topic中没有消息可消费。 if err.type == "MessageNotExist": message = f"No new message! RequestId:{err.req_id}\n" print(message) continue else: message = f"Consume Message Fail! Exception:{err}\n" print(message) if __name__ == '__main__': # 运行主事件循环 asyncio.run(main())