|
@@ -25,13 +25,12 @@ async def run(task_id, mode, platform):
|
|
|
"""
|
|
|
# 创建一个aliyun日志对象
|
|
|
logger = AliyunLogger(platform=platform, mode=mode)
|
|
|
- logger.logging(
|
|
|
- code=1003,
|
|
|
- message="{}: 开始一轮抓取".format(platform)
|
|
|
- )
|
|
|
+ logger.logging(code=1003, message="{}: 开始一轮抓取".format(platform))
|
|
|
# 创建并一个子进程
|
|
|
await asyncio.create_subprocess_shell(
|
|
|
- "python3 scheduler/run_spider_online.py --task_id {} --mode {} --platform {}".format(task_id, mode, platform)
|
|
|
+ "python3 scheduler/run_spider_online.py --task_id {} --mode {} --platform {}".format(
|
|
|
+ task_id, mode, platform
|
|
|
+ )
|
|
|
)
|
|
|
logger.logging(code=5002, message="successfully run spider")
|
|
|
|
|
@@ -41,10 +40,10 @@ async def consume_single_message(spider):
|
|
|
消费单个消息,若消费成功则启动爬虫新协程;
|
|
|
:param spider: 爬虫类
|
|
|
"""
|
|
|
- topic = spider['topic']
|
|
|
- group = spider['group']
|
|
|
- platform = spider['platform']
|
|
|
- mode = spider['mode']
|
|
|
+ topic = spider["topic"]
|
|
|
+ group = spider["group"]
|
|
|
+ platform = spider["platform"]
|
|
|
+ mode = spider["mode"]
|
|
|
logger = AliyunLogger(platform=platform, mode=mode)
|
|
|
consumer = get_consumer(topic, group)
|
|
|
try:
|
|
@@ -53,16 +52,19 @@ async def consume_single_message(spider):
|
|
|
# 在这里消费消息,做一些数据处理分析
|
|
|
for single_message in messages:
|
|
|
ack_message(
|
|
|
- mode=mode,
|
|
|
- platform=platform,
|
|
|
- recv_msgs=messages,
|
|
|
- consumer=consumer
|
|
|
+ mode=mode, platform=platform, recv_msgs=messages, consumer=consumer
|
|
|
+ )
|
|
|
+ logger.logging(
|
|
|
+ code=5000,
|
|
|
+ message="successfully consumed message",
|
|
|
+ data=single_message.message_body,
|
|
|
)
|
|
|
- logger.logging(code=5000, message="successfully consumed message", data=single_message.message_body)
|
|
|
message_body = single_message.message_body
|
|
|
- task_id = json.loads(message_body)['id']
|
|
|
+ task_id = json.loads(message_body)["id"]
|
|
|
# 创建爬虫task
|
|
|
- await asyncio.create_task(run(task_id, spider['mode'], spider['platform']))
|
|
|
+ await asyncio.create_task(
|
|
|
+ run(task_id, spider["mode"], spider["platform"])
|
|
|
+ )
|
|
|
logger.logging(code=5001, message="successfully created task")
|
|
|
else:
|
|
|
logger.logging(code=5003, message="Messages Queue is Empty")
|
|
@@ -91,6 +93,6 @@ async def main():
|
|
|
await asyncio.sleep(60) # 每分钟接收一次MQ,
|
|
|
|
|
|
|
|
|
-if __name__ == '__main__':
|
|
|
+if __name__ == "__main__":
|
|
|
# 运行主事件循环
|
|
|
asyncio.run(main())
|