main.py 3.2 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798
  1. """
  2. Created on January 4, 2024,
  3. @author: luojunhui
  4. description: 票圈线上代码控制程序,分布式抓取消费核心
  5. """
  6. import asyncio
  7. import json
  8. from mq_http_sdk.mq_consumer import *
  9. from mq_http_sdk.mq_exception import MQExceptionBase
  10. sys.path.append(os.getcwd())
  11. from application.common import AliyunLogger, get_consumer, ack_message
  12. from application.config import TopicGroup
  13. async def run(task_id, mode, platform):
  14. """
  15. 传入参数,然后根据参数执行爬虫代码
  16. :param task_id: 任务id
  17. :param mode: 任务类型
  18. :param platform: 哪个抓取平台
  19. :return: None
  20. """
  21. # 创建一个aliyun日志对象
  22. logger = AliyunLogger(platform=platform, mode=mode)
  23. logger.logging(code=1005, message="{}: 启动进程".format(platform))
  24. # 创建并一个子进程
  25. await asyncio.create_subprocess_shell(
  26. "python3 scheduler/run_spider_online.py --task_id {} --mode {} --platform {}".format(
  27. task_id, mode, platform
  28. )
  29. )
  30. logger.logging(code=5002, message="successfully run spider")
  31. async def consume_single_message(spider):
  32. """
  33. 消费单个消息,若消费成功则启动爬虫新协程;
  34. :param spider: 爬虫类
  35. """
  36. topic = spider["topic"]
  37. group = spider["group"]
  38. platform = spider["platform"]
  39. mode = spider["mode"]
  40. logger = AliyunLogger(platform=platform, mode=mode)
  41. consumer = get_consumer(topic, group)
  42. try:
  43. messages = consumer.consume_message(wait_seconds=10, batch_size=1)
  44. if messages:
  45. # 在这里消费消息,做一些数据处理分析
  46. for single_message in messages:
  47. ack_message(
  48. mode=mode, platform=platform, recv_msgs=messages, consumer=consumer
  49. )
  50. logger.logging(
  51. code=5000,
  52. message="successfully consumed message",
  53. data=single_message.message_body,
  54. )
  55. message_body = single_message.message_body
  56. task_id = json.loads(message_body)["id"]
  57. # 创建爬虫task
  58. await asyncio.create_task(
  59. run(task_id, spider["mode"], spider["platform"])
  60. )
  61. logger.logging(code=5001, message="successfully created task")
  62. else:
  63. logger.logging(code=5003, message="Messages Queue is Empty")
  64. except MQExceptionBase as err:
  65. # Topic中没有消息可消费。
  66. if err.type == "MessageNotExist":
  67. message = "No new message! RequestId:{}\n".format(err.req_id)
  68. logger.logging(code="5004", message=message)
  69. else:
  70. message = "Consume Message Fail! Exception:{}\n".format(err)
  71. logger.logging(code="5004", message=message)
  72. async def main():
  73. """
  74. 主函数
  75. """
  76. spider_list = TopicGroup().produce()
  77. while spider_list:
  78. async_tasks = []
  79. for spider in spider_list:
  80. task = asyncio.create_task(consume_single_message(spider))
  81. async_tasks.append(task)
  82. await asyncio.gather(*async_tasks)
  83. await asyncio.sleep(60) # 每分钟接收一次MQ,
  84. if __name__ == "__main__":
  85. # 运行主事件循环
  86. asyncio.run(main())