123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111 |
- """
- Created on January 4, 2024,
- @author: luojunhui
- description: 票圈线上代码控制程序,分布式抓取消费核心
- """
- import asyncio
- import json
- import time
- from mq_http_sdk.mq_consumer import *
- from mq_http_sdk.mq_exception import MQExceptionBase
- sys.path.append(os.getcwd())
- from application.common import AliyunLogger, get_consumer, ack_message
- from application.config import TopicGroup
- from application.common.log import Local
- async def run(task_id, mode, platform):
- """
- 传入参数,然后根据参数执行爬虫代码
- :param task_id: 任务id
- :param mode: 任务类型
- :param platform: 哪个抓取平台
- :return: None
- """
- # 创建一个aliyun日志对象
- logger = AliyunLogger(platform=platform, mode=mode)
- logger.logging(code=1005, message="{}: 启动进程".format(platform))
- # 创建并一个子进程
- await asyncio.create_subprocess_shell(
- "python3 scheduler/run_spider_online.py --task_id {} --mode {} --platform {}".format(
- task_id, mode, platform
- )
- )
- logger.logging(code=5002, message="successfully run spider")
- async def consume_single_message(spider):
- """
- 消费单个消息,若消费成功则启动爬虫新协程;
- :param spider: 爬虫类
- """
- topic = spider["topic"]
- group = spider["group"]
- platform = spider["platform"]
- mode = spider["mode"]
- logger = AliyunLogger(platform=platform, mode=mode)
- consumer = get_consumer(topic, group)
- try:
- messages = consumer.consume_message(wait_seconds=10, batch_size=1)
- if messages:
- # 在这里消费消息,做一些数据处理分析
- for single_message in messages:
- Local.logger(platform, mode).info("收到一条消息\t{}{}".format(single_message, single_message.message_body))
- ack_message(
- mode=mode, platform=platform, recv_msgs=messages, consumer=consumer
- )
- logger.logging(
- code=5000,
- message="successfully consumed message",
- data=single_message.message_body,
- )
- message_body = single_message.message_body
- task_id = json.loads(message_body)["id"]
- # 创建爬虫task
- await asyncio.create_task(
- run(task_id, spider["mode"], spider["platform"])
- )
- logger.logging(code=5001, message="successfully created task")
- else:
- logger.logging(code=5003, message="Messages Queue is Empty")
- except MQExceptionBase as err:
- # Topic中没有消息可消费。
- if err.type == "MessageNotExist":
- message = "No new message! RequestId:{}\n".format(err.req_id)
- logger.logging(code="5004", message=message)
- else:
- message = "Consume Message Fail! Exception:{}\n".format(err)
- logger.logging(code="5004", message=message)
- async def main():
- """
- 主函数
- """
- spider_list = TopicGroup().produce()
- while spider_list:
- # print(spider_list)
- tasks = [consume_single_message(spider) for spider in spider_list]
- await asyncio.gather(*tasks)
- await asyncio.sleep(20)
- # print("Hello World {}".format(" ".join(spider_list)))
- # async_tasks = []
- # print(spider_list)
- # for spider in spider_list:
- # print(json.dumps(spider))
- # time.sleep(1000)
- # task = asyncio.create_task(consume_single_message(spider))
- # async_tasks.append(task)
- # await asyncio.gather(*async_tasks)
- # await asyncio.sleep(60) # 每分钟接收一次MQ,
- if __name__ == "__main__":
- # 运行主事件循环
- asyncio.run(main())
|