""" Created on January 4, 2024, @author: luojunhui description: 票圈线上代码控制程序,分布式抓取消费核心 """ import asyncio import json import time from mq_http_sdk.mq_consumer import * from mq_http_sdk.mq_exception import MQExceptionBase sys.path.append(os.getcwd()) from application.common import AliyunLogger, get_consumer, ack_message from application.config import TopicGroup from application.common.log import Local async def run(task_id, mode, platform): """ 传入参数,然后根据参数执行爬虫代码 :param task_id: 任务id :param mode: 任务类型 :param platform: 哪个抓取平台 :return: None """ # 创建一个aliyun日志对象 logger = AliyunLogger(platform=platform, mode=mode) logger.logging(code=1005, message="{}: 启动进程".format(platform)) # 创建并一个子进程 await asyncio.create_subprocess_shell( "python3 scheduler/run_spider_online.py --task_id {} --mode {} --platform {}".format( task_id, mode, platform ) ) logger.logging(code=5002, message="successfully run spider") async def consume_single_message(spider): """ 消费单个消息,若消费成功则启动爬虫新协程; :param spider: 爬虫类 """ topic = spider["topic"] group = spider["group"] platform = spider["platform"] mode = spider["mode"] logger = AliyunLogger(platform=platform, mode=mode) consumer = get_consumer(topic, group) try: messages = consumer.consume_message(wait_seconds=10, batch_size=1) if messages: # 在这里消费消息,做一些数据处理分析 for single_message in messages: Local.logger(platform, mode).info("收到一条消息\t{}{}".format(single_message, single_message.message_body)) ack_message( mode=mode, platform=platform, recv_msgs=messages, consumer=consumer ) logger.logging( code=5000, message="successfully consumed message", data=single_message.message_body, ) message_body = single_message.message_body task_id = json.loads(message_body)["id"] # 创建爬虫task await asyncio.create_task( run(task_id, spider["mode"], spider["platform"]) ) logger.logging(code=5001, message="successfully created task") else: logger.logging(code=5003, message="Messages Queue is Empty") except MQExceptionBase as err: # Topic中没有消息可消费。 if err.type == "MessageNotExist": message = "No new message! RequestId:{}\n".format(err.req_id) logger.logging(code="5004", message=message) else: message = "Consume Message Fail! Exception:{}\n".format(err) logger.logging(code="5004", message=message) async def main(): """ 主函数 """ spider_list = TopicGroup().produce() while spider_list: # print(spider_list) tasks = [consume_single_message(spider) for spider in spider_list] await asyncio.gather(*tasks) await asyncio.sleep(20) # print("Hello World {}".format(" ".join(spider_list))) # async_tasks = [] # print(spider_list) # for spider in spider_list: # print(json.dumps(spider)) # time.sleep(1000) # task = asyncio.create_task(consume_single_message(spider)) # async_tasks.append(task) # await asyncio.gather(*async_tasks) # await asyncio.sleep(60) # 每分钟接收一次MQ, if __name__ == "__main__": # 运行主事件循环 asyncio.run(main())