main.py 2.7 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485
  1. import asyncio
  2. import json
  3. from mq_http_sdk.mq_consumer import *
  4. from mq_http_sdk.mq_exception import MQExceptionBase
  5. sys.path.append(os.getcwd())
  6. from application.common import AliyunLogger, get_consumer, ack_message
  7. from application.config import TopicGroup
  8. async def run(task_id, mode, platform):
  9. """
  10. 传入参数,然后根据参数执行爬虫代码
  11. :param task_id: 任务id
  12. :param mode: 任务类型
  13. :param platform: 哪个抓取平台
  14. :return: None
  15. """
  16. # 创建一个aliyun日志对象
  17. logger = AliyunLogger(platform=platform, mode=mode)
  18. logger.logging(
  19. code=1003,
  20. message="{}: 开始一轮抓取".format(platform)
  21. )
  22. # 创建并一个子进程
  23. await asyncio.create_subprocess_shell(
  24. "python3 scheduler/run_spider_online.py --task_id {} --mode {} --platform {}".format(task_id, mode, platform)
  25. )
  26. async def consume_single_message(spider):
  27. """
  28. 消费单个消息,若消费成功则启动爬虫新协程;
  29. :param spider: 爬虫类
  30. """
  31. topic = spider['topic']
  32. group = spider['group']
  33. consumer = get_consumer(topic, group)
  34. try:
  35. messages = consumer.consume_message(wait_seconds=10, batch_size=1)
  36. if messages:
  37. # 在这里消费消息,做一些数据处理分析
  38. for single_message in messages:
  39. ack_message(mode=spider['mode'], platform=spider['platform'], recv_msgs=messages,
  40. consumer=consumer)
  41. message_body = single_message.message_body
  42. task_id = json.loads(message_body)['id']
  43. print("成功消费消息,正在准备启动爬虫任务")
  44. print(message_body)
  45. # 创建爬虫task
  46. await asyncio.create_task(run(task_id, spider['mode'], spider['platform']))
  47. print("爬虫任务启动完成")
  48. else:
  49. message = "Messages Queue is Empty"
  50. print(message)
  51. except MQExceptionBase as err:
  52. # Topic中没有消息可消费。
  53. if err.type == "MessageNotExist":
  54. message = f"No new message! RequestId:{err.req_id}\n"
  55. print(message)
  56. else:
  57. message = f"Consume Message Fail! Exception:{err}\n"
  58. print(message)
  59. async def main():
  60. """
  61. 主函数
  62. """
  63. spider_list = TopicGroup().produce()
  64. while spider_list:
  65. async_tasks = []
  66. for spider in spider_list:
  67. task = asyncio.create_task(consume_single_message(spider))
  68. async_tasks.append(task)
  69. await asyncio.gather(*async_tasks)
  70. await asyncio.sleep(60) # 每分钟接收一次MQ,
  71. if __name__ == '__main__':
  72. # 运行主事件循环
  73. asyncio.run(main())