|
@@ -1,3 +1,8 @@
|
|
|
+"""
|
|
|
+Created on January 4, 2024,
|
|
|
+@author: luojunhui
|
|
|
+description: 票圈线上代码控制程序,分布式抓取消费核心
|
|
|
+"""
|
|
|
import asyncio
|
|
|
import json
|
|
|
|
|
@@ -28,6 +33,7 @@ async def run(task_id, mode, platform):
|
|
|
await asyncio.create_subprocess_shell(
|
|
|
"python3 scheduler/run_spider_online.py --task_id {} --mode {} --platform {}".format(task_id, mode, platform)
|
|
|
)
|
|
|
+ logger.logging(code=5002, message="successfully run spider")
|
|
|
|
|
|
|
|
|
async def consume_single_message(spider):
|
|
@@ -37,33 +43,38 @@ async def consume_single_message(spider):
|
|
|
"""
|
|
|
topic = spider['topic']
|
|
|
group = spider['group']
|
|
|
+ platform = spider['platform']
|
|
|
+ mode = spider['mode']
|
|
|
+ logger = AliyunLogger(platform=platform, mode=mode)
|
|
|
consumer = get_consumer(topic, group)
|
|
|
try:
|
|
|
messages = consumer.consume_message(wait_seconds=10, batch_size=1)
|
|
|
if messages:
|
|
|
# 在这里消费消息,做一些数据处理分析
|
|
|
for single_message in messages:
|
|
|
- ack_message(mode=spider['mode'], platform=spider['platform'], recv_msgs=messages,
|
|
|
- consumer=consumer)
|
|
|
+ ack_message(
|
|
|
+ mode=mode,
|
|
|
+ platform=platform,
|
|
|
+ recv_msgs=messages,
|
|
|
+ consumer=consumer
|
|
|
+ )
|
|
|
+ logger.logging(code=5000, message="successfully consumed message", data=single_message.message_body)
|
|
|
message_body = single_message.message_body
|
|
|
task_id = json.loads(message_body)['id']
|
|
|
- print("成功消费消息,正在准备启动爬虫任务")
|
|
|
- print(message_body)
|
|
|
# 创建爬虫task
|
|
|
await asyncio.create_task(run(task_id, spider['mode'], spider['platform']))
|
|
|
- print("爬虫任务启动完成")
|
|
|
+ logger.logging(code=5001, message="successfully created task")
|
|
|
else:
|
|
|
- message = "Messages Queue is Empty"
|
|
|
- print(message)
|
|
|
+ logger.logging(code=5003, message="Messages Queue is Empty")
|
|
|
|
|
|
except MQExceptionBase as err:
|
|
|
# Topic中没有消息可消费。
|
|
|
if err.type == "MessageNotExist":
|
|
|
- message = f"No new message! RequestId:{err.req_id}\n"
|
|
|
- print(message)
|
|
|
+ message = "No new message! RequestId:{}\n".format(err.req_id)
|
|
|
+ logger.logging(code="5004", message=message)
|
|
|
else:
|
|
|
- message = f"Consume Message Fail! Exception:{err}\n"
|
|
|
- print(message)
|
|
|
+ message = "Consume Message Fail! Exception:{}\n".format(err)
|
|
|
+ logger.logging(code="5004", message=message)
|
|
|
|
|
|
|
|
|
async def main():
|
|
@@ -77,7 +88,7 @@ async def main():
|
|
|
task = asyncio.create_task(consume_single_message(spider))
|
|
|
async_tasks.append(task)
|
|
|
await asyncio.gather(*async_tasks)
|
|
|
- await asyncio.sleep(60) # 每分钟接收一次MQ,
|
|
|
+ await asyncio.sleep(60) # 每分钟接收一次MQ,
|
|
|
|
|
|
|
|
|
if __name__ == '__main__':
|