main.py 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596
  1. """
  2. Created on January 4, 2024,
  3. @author: luojunhui
  4. description: 票圈线上代码控制程序,分布式抓取消费核心
  5. """
  6. import asyncio
  7. import json
  8. from mq_http_sdk.mq_consumer import *
  9. from mq_http_sdk.mq_exception import MQExceptionBase
  10. sys.path.append(os.getcwd())
  11. from application.common import AliyunLogger, get_consumer, ack_message
  12. from application.config import TopicGroup
  13. async def run(task_id, mode, platform):
  14. """
  15. 传入参数,然后根据参数执行爬虫代码
  16. :param task_id: 任务id
  17. :param mode: 任务类型
  18. :param platform: 哪个抓取平台
  19. :return: None
  20. """
  21. # 创建一个aliyun日志对象
  22. logger = AliyunLogger(platform=platform, mode=mode)
  23. logger.logging(
  24. code=1003,
  25. message="{}: 开始一轮抓取".format(platform)
  26. )
  27. # 创建并一个子进程
  28. await asyncio.create_subprocess_shell(
  29. "python3 scheduler/run_spider_online.py --task_id {} --mode {} --platform {}".format(task_id, mode, platform)
  30. )
  31. logger.logging(code=5002, message="successfully run spider")
  32. async def consume_single_message(spider):
  33. """
  34. 消费单个消息,若消费成功则启动爬虫新协程;
  35. :param spider: 爬虫类
  36. """
  37. topic = spider['topic']
  38. group = spider['group']
  39. platform = spider['platform']
  40. mode = spider['mode']
  41. logger = AliyunLogger(platform=platform, mode=mode)
  42. consumer = get_consumer(topic, group)
  43. try:
  44. messages = consumer.consume_message(wait_seconds=10, batch_size=1)
  45. if messages:
  46. # 在这里消费消息,做一些数据处理分析
  47. for single_message in messages:
  48. ack_message(
  49. mode=mode,
  50. platform=platform,
  51. recv_msgs=messages,
  52. consumer=consumer
  53. )
  54. logger.logging(code=5000, message="successfully consumed message", data=single_message.message_body)
  55. message_body = single_message.message_body
  56. task_id = json.loads(message_body)['id']
  57. # 创建爬虫task
  58. await asyncio.create_task(run(task_id, spider['mode'], spider['platform']))
  59. logger.logging(code=5001, message="successfully created task")
  60. else:
  61. logger.logging(code=5003, message="Messages Queue is Empty")
  62. except MQExceptionBase as err:
  63. # Topic中没有消息可消费。
  64. if err.type == "MessageNotExist":
  65. message = "No new message! RequestId:{}\n".format(err.req_id)
  66. logger.logging(code="5004", message=message)
  67. else:
  68. message = "Consume Message Fail! Exception:{}\n".format(err)
  69. logger.logging(code="5004", message=message)
  70. async def main():
  71. """
  72. 主函数
  73. """
  74. spider_list = TopicGroup().produce()
  75. while spider_list:
  76. async_tasks = []
  77. for spider in spider_list:
  78. task = asyncio.create_task(consume_single_message(spider))
  79. async_tasks.append(task)
  80. await asyncio.gather(*async_tasks)
  81. await asyncio.sleep(60) # 每分钟接收一次MQ,
  82. if __name__ == '__main__':
  83. # 运行主事件循环
  84. asyncio.run(main())