| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697 | """Created on January 4, 2024,@author: luojunhuidescription: 测试版本"""import asyncioimport jsonfrom mq_http_sdk.mq_consumer import *from mq_http_sdk.mq_exception import MQExceptionBasesys.path.append(os.getcwd())from application.common import get_consumer, ack_messagefrom application.config import TopicGroupasync def run(task_id, mode, platform):    """    传入参数,然后根据参数执行爬虫代码    :param task_id: 任务id    :param mode: 任务类型    :param platform: 哪个抓取平台    :return: None    """    # 创建一个aliyun日志对象    message = "{}: 开始一轮抓取".format(platform)    print(message)    # 创建并一个子进程    await asyncio.create_subprocess_shell(        "python3 scheduler/run_spider_online.py --task_id {} --mode {} --platform {}".format(            task_id, mode, platform        )    )    print("successfully run spider")async def consume_single_message(spider):    """    消费单个消息,若消费成功则启动爬虫新协程;    :param spider: 爬虫类    """    topic = spider["topic"]    group = spider["group"]    platform = spider["platform"]    mode = spider["mode"]    consumer = get_consumer(topic, group)    try:        messages = consumer.consume_message(wait_seconds=10, batch_size=1)        if messages:            # 在这里消费消息,做一些数据处理分析            for single_message in messages:                ack_message(                    mode=mode, platform=platform, recv_msgs=messages, consumer=consumer                )                message = "successfully consumed message"                print(message)                message_body = single_message.message_body                task_id = json.loads(message_body)["id"]                # 创建爬虫task                await asyncio.create_task(                    run(task_id, spider["mode"], spider["platform"])                )                message = "successfully created task"                print(message)        else:            message = "Messages Queue is Empty"            print(message)    except MQExceptionBase as err:        # Topic中没有消息可消费。        if err.type == "MessageNotExist":            message = "No new message! RequestId:{}\n".format(err.req_id)            print(message)        else:            message = "Consume Message Fail! Exception:{}\n".format(err)            print(message)async def main():    """    主函数    """    spider_list = TopicGroup().produce()    while spider_list:        async_tasks = []        for spider in spider_list:            task = asyncio.create_task(consume_single_message(spider))            async_tasks.append(task)        await asyncio.gather(*async_tasks)        # await asyncio.sleep(60)  # 每分钟接收一次MQ,if __name__ == "__main__":    # 运行主事件循环    asyncio.run(main())
 |