|
@@ -135,6 +135,7 @@ public class EtlMQConsumer {
|
|
|
private void consumeMsg() {
|
|
|
final MQConsumer consumer = mqClient.getConsumer(instanceId, topic, groupId, null);
|
|
|
do {
|
|
|
+ List<String> handles = new ArrayList<>();
|
|
|
try {
|
|
|
List<Message> messages = consumer.consumeMessage(10, 10);
|
|
|
if (messages == null || messages.isEmpty()) {
|
|
@@ -142,17 +143,16 @@ public class EtlMQConsumer {
|
|
|
continue;
|
|
|
}
|
|
|
|
|
|
- List<String> handles = new ArrayList<>();
|
|
|
messages.forEach(message -> {
|
|
|
log.info("Receive message: {} from topic: {}, group: {}", message, topic, groupId);
|
|
|
CrawlerVideoVO video = JSONObject.parseObject(message.getMessageBodyString(), CrawlerVideoVO.class);
|
|
|
priorityPool.execute(new RunnablePriority(video));
|
|
|
handles.add(message.getReceiptHandle());
|
|
|
});
|
|
|
- consumer.ackMessage(handles);
|
|
|
} catch (Throwable e) {
|
|
|
log.error("Consume message from topic: {}, group: {} error", topic, groupId, e);
|
|
|
}
|
|
|
+ consumer.ackMessage(handles);
|
|
|
} while (true);
|
|
|
}
|
|
|
}
|