main.py 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130
  1. """
  2. Created on January 4, 2024,
  3. @author: luojunhui
  4. description: 票圈线上代码控制程序,分布式抓取消费核心
  5. """
  6. import asyncio
  7. import json
  8. import time
  9. import traceback
  10. import uuid
  11. from mq_http_sdk.mq_consumer import *
  12. from mq_http_sdk.mq_exception import MQExceptionBase
  13. sys.path.append(os.getcwd())
  14. from application.common import AliyunLogger, get_consumer, ack_message
  15. from application.config import TopicGroup
  16. from application.common.log import Local
  17. async def run(task_id, mode, platform,trace_id):
  18. """
  19. 传入参数,然后根据参数执行爬虫代码
  20. :param task_id: 任务id
  21. :param mode: 任务类型
  22. :param platform: 哪个抓取平台
  23. :return: None
  24. """
  25. # 创建一个aliyun日志对象
  26. logger = AliyunLogger(platform=platform, mode=mode)
  27. # 创建并一个子进程
  28. try:
  29. process = await asyncio.create_subprocess_shell(
  30. "python3 scheduler/run_spider_online.py --task_id {} --mode {} --platform {}".format(
  31. task_id, mode, platform
  32. )
  33. )
  34. # 检查进程是否已经启动
  35. if process.returncode is None:
  36. logger.logging(code=1005, message="{}: 启动进程成功,进程ID:{}".format(platform,process.pid),trace_id=trace_id)
  37. else:
  38. logger.logging(code=1006, message="{}: 启动进程失败".format(platform),trace_id=trace_id)
  39. # logger.logging(code=5002, message="successfully run spider")
  40. except Exception as e:
  41. logger.logging(code=1007,message= f"发生未知错误: {e}\n {traceback.format_exc()}",trace_id=trace_id)
  42. def generate_trace_id():
  43. timestamp = str(int(time.time() * 1000))
  44. unique_id = str(uuid.uuid4())
  45. return f"{unique_id}{timestamp}"
  46. async def consume_single_message(spider):
  47. """
  48. 消费单个消息,若消费成功则启动爬虫新协程;
  49. :param spider: 爬虫类
  50. """
  51. topic = spider["topic"]
  52. group = spider["group"]
  53. platform = spider["platform"]
  54. mode = spider["mode"]
  55. logger = AliyunLogger(platform=platform, mode=mode)
  56. consumer = get_consumer(topic, group)
  57. try:
  58. messages = consumer.consume_message(wait_seconds=10, batch_size=1)
  59. if messages:
  60. # 在这里消费消息,做一些数据处理分析
  61. for single_message in messages:
  62. trace_id = generate_trace_id()
  63. message_id = single_message.message_id
  64. Local.logger(platform, mode).info("收到一条消息\t{}{},trace_id={},message_id = {}".format(single_message, single_message.message_body, trace_id,message_id))
  65. ack_message(
  66. mode=mode, platform=platform, recv_msgs=single_message, consumer=consumer
  67. )
  68. logger.logging(
  69. code=5000,
  70. message=f"successfully consumed message [message_id] ={message_id}",
  71. data=single_message.message_body,
  72. trace_id=trace_id
  73. )
  74. message_body = single_message.message_body
  75. task_id = json.loads(message_body)["id"]
  76. Local.logger(platform, mode).info(
  77. f"task_id=={task_id} trace_id={trace_id} message_id ={message_id}")
  78. # 创建爬虫task
  79. await asyncio.create_task(
  80. run(task_id, spider["mode"], spider["platform"],trace_id)
  81. )
  82. logger.logging(code=5001, message=f"successfully created task [message_id] = {message_id}",trace_id=trace_id)
  83. else:
  84. logger.logging(code=5003, message="Messages Queue is Empty")
  85. except MQExceptionBase as err:
  86. tb = traceback.format_exc()
  87. # Topic中没有消息可消费。
  88. # if err.type == "MessageNotExist":
  89. # message = "No new message! RequestId:{}\n{}".format(err.req_id,tb)
  90. # logger.logging(code="5004", message=message)
  91. # else:
  92. message = f"Consume Message Fail! Exception:{err}\n{tb}"
  93. logger.logging(code="5004", message=message)
  94. async def main():
  95. """
  96. 主函数
  97. """
  98. spider_list = TopicGroup().produce()
  99. while spider_list:
  100. # print(spider_list)
  101. tasks = [consume_single_message(spider) for spider in spider_list]
  102. await asyncio.gather(*tasks)
  103. await asyncio.sleep(20)
  104. # print("Hello World {}".format(" ".join(spider_list)))
  105. # async_tasks = []
  106. # print(spider_list)
  107. # for spider in spider_list:
  108. # print(json.dumps(spider))
  109. # time.sleep(1000)
  110. # task = asyncio.create_task(consume_single_message(spider))
  111. # async_tasks.append(task)
  112. # await asyncio.gather(*async_tasks)
  113. # await asyncio.sleep(60) # 每分钟接收一次MQ,
  114. if __name__ == "__main__":
  115. # 运行主事件循环
  116. asyncio.run(main())