main.py 1.2 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344
  1. import asyncio
  2. from mq_http_sdk.mq_client import *
  3. from mq_http_sdk.mq_consumer import *
  4. from mq_http_sdk.mq_exception import MQExceptionBase
  5. sys.path.append(os.getcwd())
  6. from application.common.messageQueue import get_consumer, ack_message
  7. from application.common.log import AliyunLogger
  8. from application.common.mysql import MysqlHelper
  9. from application.config import TopicGroup
  10. async def run():
  11. """
  12. 传入参数,然后根据参数执行爬虫代码
  13. :return:
  14. """
  15. # 创建并等待一个子进程
  16. process = await asyncio.create_subprocess_shell("python3 test2.py")
  17. # 等待子进程完成
  18. await process.wait()
  19. async def main():
  20. spider_list = TopicGroup().produce()
  21. async_tasks = [] # 异步任务池
  22. while spider_list:
  23. for spider in spider_list:
  24. topic = spider['topic']
  25. group = spider['group']
  26. consumer = get_consumer(topic, group)
  27. messages = consumer.consume_message(batch=1, wait_seconds=5)
  28. if messages:
  29. task = asyncio.create_task(run())
  30. async_tasks.append(task)
  31. else:
  32. continue
  33. if __name__ == '__main__':
  34. # 运行主事件循环
  35. asyncio.run(main())