1234567891011121314151617181920212223242526272829303132333435363738394041424344 |
- import asyncio
- from mq_http_sdk.mq_client import *
- from mq_http_sdk.mq_consumer import *
- from mq_http_sdk.mq_exception import MQExceptionBase
- sys.path.append(os.getcwd())
- from application.common.messageQueue import get_consumer, ack_message
- from application.common.log import AliyunLogger
- from application.common.mysql import MysqlHelper
- from application.config import TopicGroup
- async def run():
- """
- 传入参数,然后根据参数执行爬虫代码
- :return:
- """
- # 创建并等待一个子进程
- process = await asyncio.create_subprocess_shell("python3 test2.py")
- # 等待子进程完成
- await process.wait()
- async def main():
- spider_list = TopicGroup().produce()
- async_tasks = [] # 异步任务池
- while spider_list:
- for spider in spider_list:
- topic = spider['topic']
- group = spider['group']
- consumer = get_consumer(topic, group)
- messages = consumer.consume_message(batch=1, wait_seconds=5)
- if messages:
- task = asyncio.create_task(run())
- async_tasks.append(task)
- else:
- continue
- if __name__ == '__main__':
- # 运行主事件循环
- asyncio.run(main())
|