main.py 2.3 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768
  1. import asyncio
  2. import json
  3. from mq_http_sdk.mq_consumer import *
  4. from mq_http_sdk.mq_exception import MQExceptionBase
  5. sys.path.append(os.getcwd())
  6. from application.common import MysqlHelper, AliyunLogger, get_consumer, ack_message
  7. from application.config import TopicGroup
  8. async def run(task_id, mode, platform):
  9. """
  10. 传入参数,然后根据参数执行爬虫代码
  11. :return: None
  12. """
  13. # 创建并等待一个子进程
  14. await asyncio.create_subprocess_shell(
  15. "python3 scheduler/run_spider_online.py --task_id {} --mode {} --platform {}".format(task_id, mode, platform)
  16. )
  17. async def consume_single_message(spider):
  18. topic = spider['topic']
  19. group = spider['group']
  20. consumer = get_consumer(topic, group)
  21. try:
  22. messages = consumer.consume_message(wait_seconds=10, batch_size=1)
  23. if messages:
  24. # 在这里消费消息,做一些数据处理分析
  25. for single_message in messages:
  26. ack_message(mode=spider['mode'], platform=spider['platform'], recv_msgs=messages,
  27. consumer=consumer)
  28. message_body = single_message.message_body
  29. task_id = json.loads(message_body)['id']
  30. print("成功消费消息,正在准备启动爬虫任务")
  31. print(message_body)
  32. # 创建爬虫task
  33. await asyncio.create_task(run(task_id, spider['mode'], spider['platform']))
  34. print("爬虫任务启动完成")
  35. else:
  36. message = "Messages Queue is Empty"
  37. print(message)
  38. except MQExceptionBase as err:
  39. # Topic中没有消息可消费。
  40. if err.type == "MessageNotExist":
  41. message = f"No new message! RequestId:{err.req_id}\n"
  42. print(message)
  43. else:
  44. message = f"Consume Message Fail! Exception:{err}\n"
  45. print(message)
  46. async def main():
  47. spider_list = TopicGroup().produce()
  48. while spider_list:
  49. async_tasks = []
  50. for spider in spider_list:
  51. task = asyncio.create_task(consume_single_message(spider))
  52. async_tasks.append(task)
  53. await asyncio.gather(*async_tasks)
  54. if __name__ == '__main__':
  55. # 运行主事件循环
  56. asyncio.run(main())