From b9548d1640fd6f696f77649cd74135209844c07d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=AE=B8=E6=99=93=E4=B8=9C?= <763795151@qq.com> Date: Mon, 13 Dec 2021 19:05:57 +0800 Subject: [PATCH] =?UTF-8?q?fix=E6=8C=89=E6=97=B6=E9=97=B4=E6=9F=A5?= =?UTF-8?q?=E8=AF=A2=E6=B6=88=E6=81=AFbug,=E5=8A=A0=E9=95=BF=E9=A1=B5?= =?UTF-8?q?=E9=9D=A2=E8=AF=B7=E6=B1=82=E8=B6=85=E6=97=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../service/impl/MessageServiceImpl.java | 4 +++ .../scala/kafka/console/MessageConsole.scala | 33 ++++++++++--------- ui/src/utils/request.js | 2 +- ui/src/views/message/MessageDetail.vue | 2 +- ui/src/views/message/SearchByOffset.vue | 10 +++--- ui/src/views/message/SearchByTime.vue | 13 +++++--- 6 files changed, 37 insertions(+), 27 deletions(-) diff --git a/src/main/java/com/xuxd/kafka/console/service/impl/MessageServiceImpl.java b/src/main/java/com/xuxd/kafka/console/service/impl/MessageServiceImpl.java index 82a0052..1240e29 100644 --- a/src/main/java/com/xuxd/kafka/console/service/impl/MessageServiceImpl.java +++ b/src/main/java/com/xuxd/kafka/console/service/impl/MessageServiceImpl.java @@ -18,6 +18,7 @@ import java.util.stream.Collectors; import kafka.console.ConsumerConsole; import kafka.console.MessageConsole; import kafka.console.TopicConsole; +import lombok.extern.slf4j.Slf4j; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang3.StringUtils; import org.apache.kafka.clients.admin.TopicDescription; @@ -40,6 +41,7 @@ import org.springframework.stereotype.Service; * @author xuxd * @date 2021-12-11 09:43:44 **/ +@Slf4j @Service public class MessageServiceImpl implements MessageService, ApplicationContextAware { @@ -69,7 +71,9 @@ public class MessageServiceImpl implements MessageService, ApplicationContextAwa int maxNums = 10000; Set partitions = getPartitions(queryMessage); + long startTime = System.currentTimeMillis(); List> records = messageConsole.searchBy(partitions, queryMessage.getStartTime(), queryMessage.getEndTime(), maxNums); + log.info("search message by time, cost time: {}", (System.currentTimeMillis() - startTime)); List vos = records.stream().filter(record -> record.timestamp() <= queryMessage.getEndTime()) .map(ConsumerRecordVO::fromConsumerRecord).collect(Collectors.toList()); Map res = new HashMap<>(); diff --git a/src/main/scala/kafka/console/MessageConsole.scala b/src/main/scala/kafka/console/MessageConsole.scala index 6f1c937..324f87b 100644 --- a/src/main/scala/kafka/console/MessageConsole.scala +++ b/src/main/scala/kafka/console/MessageConsole.scala @@ -55,25 +55,26 @@ class MessageConsole(config: KafkaConfig) extends KafkaConsole(config: KafkaConf } else { val records = consumer.poll(Duration.ofMillis(timeoutMs)) - for ((tp, endOff) <- endOffTable) { - if (!terminate) { - val recordList = records.records(tp) - if (recordList.isEmpty) { - arrive.remove(tp) - } else { - val first = recordList.get(0) - if (first.offset() >= endOff) { - arrive.remove(tp) - } else { - res.addAll(recordList) - if (recordList.get(recordList.size() - 1).offset() >= endOff) { + if (records.isEmpty) { + terminate = true + } else { + for ((tp, endOff) <- endOffTable) { + if (!terminate) { + val recordList = records.records(tp) + if (!recordList.isEmpty) { + val first = recordList.get(0) + if (first.offset() >= endOff) { arrive.remove(tp) + } else { + res.addAll(recordList) + if (recordList.get(recordList.size() - 1).offset() >= endOff) { + arrive.remove(tp) + } } } - } - - if (arrive.isEmpty) { - terminate = true; + if (arrive.isEmpty) { + terminate = true; + } } } } diff --git a/ui/src/utils/request.js b/ui/src/utils/request.js index 27b6755..0af83fa 100644 --- a/ui/src/utils/request.js +++ b/ui/src/utils/request.js @@ -6,7 +6,7 @@ import { VueAxios } from "./axios"; const request = axios.create({ // API 请求的默认前缀 baseURL: process.env.VUE_APP_API_BASE_URL, - timeout: 30000, // 请求超时时间 + timeout: 120000, // 请求超时时间 }); // 异常拦截处理器 diff --git a/ui/src/views/message/MessageDetail.vue b/ui/src/views/message/MessageDetail.vue index 6852bb0..74eca06 100644 --- a/ui/src/views/message/MessageDetail.vue +++ b/ui/src/views/message/MessageDetail.vue @@ -150,7 +150,7 @@ export default { }, methods: { getMessageDetail() { - this.loading = false; + this.loading = true; const params = Object.assign({}, this.record, { keyDeserializer: this.keyDeserializer, valueDeserializer: this.valueDeserializer, diff --git a/ui/src/views/message/SearchByOffset.vue b/ui/src/views/message/SearchByOffset.vue index ab13e12..9b36cbf 100644 --- a/ui/src/views/message/SearchByOffset.vue +++ b/ui/src/views/message/SearchByOffset.vue @@ -8,10 +8,10 @@ @submit="handleSearch" > - + - + - + - + - +