|
@@ -41,66 +41,66 @@ import java.util.List;
|
|
import java.util.concurrent.ExecutorService;
|
|
import java.util.concurrent.ExecutorService;
|
|
import java.util.concurrent.Executors;
|
|
import java.util.concurrent.Executors;
|
|
|
|
|
|
-
|
|
+
|
|
- * @author ehlxr
|
|
+
|
|
- * @since 2023-06-09 15:24.
|
|
+
|
|
- */
|
|
+
|
|
-@Component
|
|
+
|
|
-public class EtlMQConsumer {
|
|
+
|
|
- private static final Logger log = LoggerFactory.getLogger(EtlMQConsumer.class);
|
|
+
|
|
- private final EtlService etlService;
|
|
+
|
|
- @Value("${rocketmq.accessKey:}")
|
|
+
|
|
- private String accessKey;
|
|
+
|
|
- @Value("${rocketmq.secretKey:}")
|
|
+
|
|
- private String secretKey;
|
|
+
|
|
- @Value("${rocketmq.httpEndpoint:}")
|
|
+
|
|
- private String httpEndpoint;
|
|
+
|
|
- @Value("${rocketmq.instanceId:}")
|
|
+
|
|
- private String instanceId;
|
|
+
|
|
- @Value("${rocketmq.crawler.etl.topic:}")
|
|
+
|
|
- private String topic;
|
|
+
|
|
- @Value("${rocketmq.crawler.etl.groupid:}")
|
|
+
|
|
- private String groupId;
|
|
+
|
|
- @Value("${consumer.thread.size:10}")
|
|
+
|
|
- private Integer threadSize;
|
|
+
|
|
-
|
|
+
|
|
- private MQConsumer consumer;
|
|
+
|
|
-
|
|
+
|
|
- public EtlMQConsumer(EtlService etlService) {
|
|
+
|
|
- this.etlService = etlService;
|
|
+
|
|
- }
|
|
+
|
|
-
|
|
+
|
|
- @PostConstruct
|
|
+
|
|
- public void init() {
|
|
+
|
|
- MQClient mqClient = new MQClient(httpEndpoint, accessKey, secretKey);
|
|
+
|
|
- consumer = mqClient.getConsumer(instanceId, topic, groupId, null);
|
|
+
|
|
-
|
|
+
|
|
- ExecutorService service = Executors.newFixedThreadPool(threadSize);
|
|
+
|
|
- for (int i = 0; i < threadSize; i++) {
|
|
+
|
|
- service.submit(this::consumeMsg);
|
|
+
|
|
- }
|
|
+
|
|
- }
|
|
+
|
|
-
|
|
+
|
|
- private void consumeMsg() {
|
|
+
|
|
- do {
|
|
+
|
|
- try {
|
|
+
|
|
- List<Message> messages = consumer.consumeMessage(1, 10);
|
|
+
|
|
- if (messages == null || messages.isEmpty()) {
|
|
+
|
|
- log.info("No new message, continue");
|
|
+
|
|
- continue;
|
|
+
|
|
- }
|
|
+
|
|
-
|
|
+
|
|
- messages.forEach(message -> {
|
|
+
|
|
- log.info("Receive message: {} from topic: {}, group: {}", message, topic, groupId);
|
|
+
|
|
- CrawlerEtlParam param = JSONObject.parseObject(message.getMessageBodyString(), CrawlerEtlParam.class);
|
|
+
|
|
- param.setMessageId(message.getMessageId());
|
|
+
|
|
- etlService.deal(param);
|
|
+
|
|
- log.info("Deal done of message: {} from topic: {}, group: {}", message, topic, groupId);
|
|
+
|
|
- consumer.ackMessage(Collections.singletonList(message.getReceiptHandle()));
|
|
+
|
|
- });
|
|
+
|
|
- } catch (Throwable e) {
|
|
+
|
|
- log.error("Consume message from topic: {}, group: {} error", topic, groupId, e);
|
|
+
|
|
- }
|
|
+
|
|
- } while (true);
|
|
+
|
|
- }
|
|
+
|
|
-}
|
|
+
|