main_dev_version.py 2.9 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697
  1. """
  2. Created on January 4, 2024,
  3. @author: luojunhui
  4. description: 测试版本
  5. """
  6. import asyncio
  7. import json
  8. from mq_http_sdk.mq_consumer import *
  9. from mq_http_sdk.mq_exception import MQExceptionBase
  10. sys.path.append(os.getcwd())
  11. from application.common import get_consumer, ack_message
  12. from application.config import TopicGroup
  13. async def run(task_id, mode, platform):
  14. """
  15. 传入参数,然后根据参数执行爬虫代码
  16. :param task_id: 任务id
  17. :param mode: 任务类型
  18. :param platform: 哪个抓取平台
  19. :return: None
  20. """
  21. # 创建一个aliyun日志对象
  22. message = "{}: 开始一轮抓取".format(platform)
  23. print(message)
  24. # 创建并一个子进程
  25. await asyncio.create_subprocess_shell(
  26. "python3 scheduler/run_spider_online.py --task_id {} --mode {} --platform {}".format(
  27. task_id, mode, platform
  28. )
  29. )
  30. print("successfully run spider")
  31. async def consume_single_message(spider):
  32. """
  33. 消费单个消息,若消费成功则启动爬虫新协程;
  34. :param spider: 爬虫类
  35. """
  36. topic = spider["topic"]
  37. group = spider["group"]
  38. platform = spider["platform"]
  39. mode = spider["mode"]
  40. consumer = get_consumer(topic, group)
  41. try:
  42. messages = consumer.consume_message(wait_seconds=10, batch_size=1)
  43. if messages:
  44. # 在这里消费消息,做一些数据处理分析
  45. for single_message in messages:
  46. ack_message(
  47. mode=mode, platform=platform, recv_msgs=messages, consumer=consumer
  48. )
  49. message = "successfully consumed message"
  50. print(message)
  51. message_body = single_message.message_body
  52. task_id = json.loads(message_body)["id"]
  53. # 创建爬虫task
  54. await asyncio.create_task(
  55. run(task_id, spider["mode"], spider["platform"])
  56. )
  57. message = "successfully created task"
  58. print(message)
  59. else:
  60. message = "Messages Queue is Empty"
  61. print(message)
  62. except MQExceptionBase as err:
  63. # Topic中没有消息可消费。
  64. if err.type == "MessageNotExist":
  65. message = "No new message! RequestId:{}\n".format(err.req_id)
  66. print(message)
  67. else:
  68. message = "Consume Message Fail! Exception:{}\n".format(err)
  69. print(message)
  70. async def main():
  71. """
  72. 主函数
  73. """
  74. spider_list = TopicGroup().produce()
  75. while spider_list:
  76. async_tasks = []
  77. for spider in spider_list:
  78. task = asyncio.create_task(consume_single_message(spider))
  79. async_tasks.append(task)
  80. await asyncio.gather(*async_tasks)
  81. # await asyncio.sleep(60) # 每分钟接收一次MQ,
  82. if __name__ == "__main__":
  83. # 运行主事件循环
  84. asyncio.run(main())