|
@@ -39,12 +39,9 @@ import java.util.Objects;
|
|
|
@Component
|
|
|
@RocketMQMessageListener(endpoints = "${rocketmq.consumer.endpoints:rmq-cn-vym47zv2i03-vpc.cn-hangzhou.rmq.aliyuncs.com:8080}",
|
|
|
topic = "${rocketmq.consumer.pipelinetask.topic:topic_content_understanding_pipeline_task}",
|
|
|
- consumerGroup = "${rocketmq.consumer.pipelinetask.group:group_content_understanding_pipeline_task}", tag = "*")
|
|
|
+ consumerGroup = "${rocketmq.consumer.pipelinetask.group:group_content_understanding_pipeline_task}", tag = "*", consumptionThreadCount = 10)
|
|
|
public class ContentUnderstandingPipelineTaskConsumer implements RocketMQListener {
|
|
|
|
|
|
- @Autowired
|
|
|
- private ContentService contentService;
|
|
|
-
|
|
|
@Autowired
|
|
|
private PipelineService pipelineService;
|
|
|
|
|
@@ -61,6 +58,7 @@ public class ContentUnderstandingPipelineTaskConsumer implements RocketMQListene
|
|
|
log.info("Receive message {}", messageView);
|
|
|
MessageId messageId = messageView.getMessageId();
|
|
|
try {
|
|
|
+ // 1.解析消息
|
|
|
ByteBuffer body = messageView.getBody();
|
|
|
Charset charset = StandardCharsets.UTF_8;
|
|
|
String messageStr = charset.decode(body).toString();
|
|
@@ -108,6 +106,7 @@ public class ContentUnderstandingPipelineTaskConsumer implements RocketMQListene
|
|
|
dto1.setExtMap(extMap);
|
|
|
// 4.按照步骤执行pipeline每一步动作
|
|
|
pipelineService.executeTaskTreeNodeBFS(root, dto1);
|
|
|
+ return ConsumeResult.SUCCESS;
|
|
|
} catch (Exception e) {
|
|
|
log.error("Message is failed to be acknowledged, messageId={}", messageId, e);
|
|
|
}
|