main.py 2.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263
  1. import asyncio
  2. import json
  3. from mq_http_sdk.mq_consumer import *
  4. from mq_http_sdk.mq_exception import MQExceptionBase
  5. sys.path.append(os.getcwd())
  6. from application.common.messageQueue import get_consumer, ack_message
  7. from application.common.log import AliyunLogger
  8. from application.common.mysql import MysqlHelper
  9. from application.config import TopicGroup
  10. async def run(task_id, mode, platform):
  11. """
  12. 传入参数,然后根据参数执行爬虫代码
  13. :return: None
  14. """
  15. # 创建并等待一个子进程
  16. await asyncio.create_subprocess_shell(
  17. "python3 scheduler/run_spider_online.py --task_id {} --mode {} --platform {}".format(task_id, mode, platform)
  18. )
  19. async def main():
  20. spider_list = TopicGroup().produce()
  21. while spider_list:
  22. for spider in spider_list:
  23. # 获取消息
  24. topic = spider['topic']
  25. group = spider['group']
  26. consumer = get_consumer(topic, group)
  27. try:
  28. messages = consumer.consume_message(wait_seconds=10, batch_size=1)
  29. if messages:
  30. # 在这里消费消息,做一些数据处理分析
  31. for single_message in messages:
  32. ack_message(mode=spider['mode'], platform=spider['platform'], recv_msgs=messages,
  33. consumer=consumer)
  34. message_body = single_message.message_body
  35. task_id = json.loads(message_body)['id']
  36. print(message_body)
  37. # 创建爬虫task
  38. await asyncio.create_task(run(task_id, spider['mode'], spider['platform']))
  39. else:
  40. message = "Messages Queue is Empty"
  41. print(message)
  42. except MQExceptionBase as err:
  43. # Topic中没有消息可消费。
  44. if err.type == "MessageNotExist":
  45. message = f"No new message! RequestId:{err.req_id}\n"
  46. print(message)
  47. continue
  48. else:
  49. message = f"Consume Message Fail! Exception:{err}\n"
  50. print(message)
  51. if __name__ == '__main__':
  52. # 运行主事件循环
  53. asyncio.run(main())