123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263 |
- import asyncio
- import json
- from mq_http_sdk.mq_consumer import *
- from mq_http_sdk.mq_exception import MQExceptionBase
- sys.path.append(os.getcwd())
- from application.common.messageQueue import get_consumer, ack_message
- from application.common.log import AliyunLogger
- from application.common.mysql import MysqlHelper
- from application.config import TopicGroup
- async def run(task_id, mode, platform):
- """
- 传入参数,然后根据参数执行爬虫代码
- :return: None
- """
- # 创建并等待一个子进程
- await asyncio.create_subprocess_shell(
- "python3 scheduler/run_spider_online.py --task_id {} --mode {} --platform {}".format(task_id, mode, platform)
- )
- async def main():
- spider_list = TopicGroup().produce()
- while spider_list:
- for spider in spider_list:
- # 获取消息
- topic = spider['topic']
- group = spider['group']
- consumer = get_consumer(topic, group)
- try:
- messages = consumer.consume_message(wait_seconds=10, batch_size=1)
- if messages:
- # 在这里消费消息,做一些数据处理分析
- for single_message in messages:
- ack_message(mode=spider['mode'], platform=spider['platform'], recv_msgs=messages,
- consumer=consumer)
- message_body = single_message.message_body
- task_id = json.loads(message_body)['id']
- print(message_body)
- # 创建爬虫task
- await asyncio.create_task(run(task_id, spider['mode'], spider['platform']))
- else:
- message = "Messages Queue is Empty"
- print(message)
- except MQExceptionBase as err:
- # Topic中没有消息可消费。
- if err.type == "MessageNotExist":
- message = f"No new message! RequestId:{err.req_id}\n"
- print(message)
- continue
- else:
- message = f"Consume Message Fail! Exception:{err}\n"
- print(message)
- if __name__ == '__main__':
- # 运行主事件循环
- asyncio.run(main())
|