main.py 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111
  1. """
  2. Created on January 4, 2024,
  3. @author: luojunhui
  4. description: 票圈线上代码控制程序,分布式抓取消费核心
  5. """
  6. import asyncio
  7. import json
  8. import time
  9. from mq_http_sdk.mq_consumer import *
  10. from mq_http_sdk.mq_exception import MQExceptionBase
  11. sys.path.append(os.getcwd())
  12. from application.common import AliyunLogger, get_consumer, ack_message
  13. from application.config import TopicGroup
  14. from application.common.log import Local
  15. async def run(task_id, mode, platform):
  16. """
  17. 传入参数,然后根据参数执行爬虫代码
  18. :param task_id: 任务id
  19. :param mode: 任务类型
  20. :param platform: 哪个抓取平台
  21. :return: None
  22. """
  23. # 创建一个aliyun日志对象
  24. logger = AliyunLogger(platform=platform, mode=mode)
  25. logger.logging(code=1005, message="{}: 启动进程".format(platform))
  26. # 创建并一个子进程
  27. await asyncio.create_subprocess_shell(
  28. "python3 scheduler/run_spider_online.py --task_id {} --mode {} --platform {}".format(
  29. task_id, mode, platform
  30. )
  31. )
  32. logger.logging(code=5002, message="successfully run spider")
  33. async def consume_single_message(spider):
  34. """
  35. 消费单个消息,若消费成功则启动爬虫新协程;
  36. :param spider: 爬虫类
  37. """
  38. topic = spider["topic"]
  39. group = spider["group"]
  40. platform = spider["platform"]
  41. mode = spider["mode"]
  42. logger = AliyunLogger(platform=platform, mode=mode)
  43. consumer = get_consumer(topic, group)
  44. try:
  45. messages = consumer.consume_message(wait_seconds=10, batch_size=1)
  46. if messages:
  47. # 在这里消费消息,做一些数据处理分析
  48. for single_message in messages:
  49. Local.logger(platform, mode).info("收到一条消息\t{}{}".format(single_message, single_message.message_body))
  50. ack_message(
  51. mode=mode, platform=platform, recv_msgs=messages, consumer=consumer
  52. )
  53. logger.logging(
  54. code=5000,
  55. message="successfully consumed message",
  56. data=single_message.message_body,
  57. )
  58. message_body = single_message.message_body
  59. task_id = json.loads(message_body)["id"]
  60. # 创建爬虫task
  61. await asyncio.create_task(
  62. run(task_id, spider["mode"], spider["platform"])
  63. )
  64. logger.logging(code=5001, message="successfully created task")
  65. else:
  66. logger.logging(code=5003, message="Messages Queue is Empty")
  67. except MQExceptionBase as err:
  68. # Topic中没有消息可消费。
  69. if err.type == "MessageNotExist":
  70. message = "No new message! RequestId:{}\n".format(err.req_id)
  71. logger.logging(code="5004", message=message)
  72. else:
  73. message = "Consume Message Fail! Exception:{}\n".format(err)
  74. logger.logging(code="5004", message=message)
  75. async def main():
  76. """
  77. 主函数
  78. """
  79. spider_list = TopicGroup().produce()
  80. while spider_list:
  81. # print(spider_list)
  82. tasks = [consume_single_message(spider) for spider in spider_list]
  83. await asyncio.gather(*tasks)
  84. await asyncio.sleep(20)
  85. # print("Hello World {}".format(" ".join(spider_list)))
  86. # async_tasks = []
  87. # print(spider_list)
  88. # for spider in spider_list:
  89. # print(json.dumps(spider))
  90. # time.sleep(1000)
  91. # task = asyncio.create_task(consume_single_message(spider))
  92. # async_tasks.append(task)
  93. # await asyncio.gather(*async_tasks)
  94. # await asyncio.sleep(60) # 每分钟接收一次MQ,
  95. if __name__ == "__main__":
  96. # 运行主事件循环
  97. asyncio.run(main())