|
@@ -61,12 +61,12 @@ async def consume_single_message(spider):
|
|
mode = spider["mode"]
|
|
mode = spider["mode"]
|
|
logger = AliyunLogger(platform=platform, mode=mode)
|
|
logger = AliyunLogger(platform=platform, mode=mode)
|
|
consumer = get_consumer(topic, group)
|
|
consumer = get_consumer(topic, group)
|
|
- trace_id = generate_trace_id()
|
|
|
|
try:
|
|
try:
|
|
messages = consumer.consume_message(wait_seconds=10, batch_size=1)
|
|
messages = consumer.consume_message(wait_seconds=10, batch_size=1)
|
|
if messages:
|
|
if messages:
|
|
# 在这里消费消息,做一些数据处理分析
|
|
# 在这里消费消息,做一些数据处理分析
|
|
for single_message in messages:
|
|
for single_message in messages:
|
|
|
|
+ trace_id = generate_trace_id()
|
|
message_id = single_message.message_id
|
|
message_id = single_message.message_id
|
|
Local.logger(platform, mode).info("收到一条消息\t{}{},trace_id={},message_id = {}".format(single_message, single_message.message_body, trace_id,message_id))
|
|
Local.logger(platform, mode).info("收到一条消息\t{}{},trace_id={},message_id = {}".format(single_message, single_message.message_body, trace_id,message_id))
|
|
ack_message(
|
|
ack_message(
|
|
@@ -88,17 +88,17 @@ async def consume_single_message(spider):
|
|
)
|
|
)
|
|
logger.logging(code=5001, message=f"successfully created task [message_id] = {message_id}",trace_id=trace_id)
|
|
logger.logging(code=5001, message=f"successfully created task [message_id] = {message_id}",trace_id=trace_id)
|
|
else:
|
|
else:
|
|
- logger.logging(code=5003, message="Messages Queue is Empty",trace_id=trace_id)
|
|
|
|
|
|
+ logger.logging(code=5003, message="Messages Queue is Empty")
|
|
|
|
|
|
except MQExceptionBase as err:
|
|
except MQExceptionBase as err:
|
|
tb = traceback.format_exc()
|
|
tb = traceback.format_exc()
|
|
# Topic中没有消息可消费。
|
|
# Topic中没有消息可消费。
|
|
if err.type == "MessageNotExist":
|
|
if err.type == "MessageNotExist":
|
|
message = "No new message! RequestId:{}\n{}".format(err.req_id,tb)
|
|
message = "No new message! RequestId:{}\n{}".format(err.req_id,tb)
|
|
- logger.logging(code="5004", message=message,trace_id=trace_id)
|
|
|
|
|
|
+ logger.logging(code="5004", message=message)
|
|
else:
|
|
else:
|
|
message = "Consume Message Fail! Exception:{}\n{}".format(err,tb)
|
|
message = "Consume Message Fail! Exception:{}\n{}".format(err,tb)
|
|
- logger.logging(code="5004", message=message,trace_id=trace_id)
|
|
|
|
|
|
+ logger.logging(code="5004", message=message)
|
|
|
|
|
|
|
|
|
|
async def main():
|
|
async def main():
|