diff --git a/src/main/java/com/xuxd/kafka/console/service/impl/MessageServiceImpl.java b/src/main/java/com/xuxd/kafka/console/service/impl/MessageServiceImpl.java index 82a0052..1240e29 100644 --- a/src/main/java/com/xuxd/kafka/console/service/impl/MessageServiceImpl.java +++ b/src/main/java/com/xuxd/kafka/console/service/impl/MessageServiceImpl.java @@ -18,6 +18,7 @@ import java.util.stream.Collectors; import kafka.console.ConsumerConsole; import kafka.console.MessageConsole; import kafka.console.TopicConsole; +import lombok.extern.slf4j.Slf4j; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang3.StringUtils; import org.apache.kafka.clients.admin.TopicDescription; @@ -40,6 +41,7 @@ import org.springframework.stereotype.Service; * @author xuxd * @date 2021-12-11 09:43:44 **/ +@Slf4j @Service public class MessageServiceImpl implements MessageService, ApplicationContextAware { @@ -69,7 +71,9 @@ public class MessageServiceImpl implements MessageService, ApplicationContextAwa int maxNums = 10000; Set partitions = getPartitions(queryMessage); + long startTime = System.currentTimeMillis(); List> records = messageConsole.searchBy(partitions, queryMessage.getStartTime(), queryMessage.getEndTime(), maxNums); + log.info("search message by time, cost time: {}", (System.currentTimeMillis() - startTime)); List vos = records.stream().filter(record -> record.timestamp() <= queryMessage.getEndTime()) .map(ConsumerRecordVO::fromConsumerRecord).collect(Collectors.toList()); Map res = new HashMap<>(); diff --git a/src/main/scala/kafka/console/MessageConsole.scala b/src/main/scala/kafka/console/MessageConsole.scala index 6f1c937..324f87b 100644 --- a/src/main/scala/kafka/console/MessageConsole.scala +++ b/src/main/scala/kafka/console/MessageConsole.scala @@ -55,25 +55,26 @@ class MessageConsole(config: KafkaConfig) extends KafkaConsole(config: KafkaConf } else { val records = consumer.poll(Duration.ofMillis(timeoutMs)) - for ((tp, endOff) <- endOffTable) { - if (!terminate) { - val recordList = records.records(tp) - if (recordList.isEmpty) { - arrive.remove(tp) - } else { - val first = recordList.get(0) - if (first.offset() >= endOff) { - arrive.remove(tp) - } else { - res.addAll(recordList) - if (recordList.get(recordList.size() - 1).offset() >= endOff) { + if (records.isEmpty) { + terminate = true + } else { + for ((tp, endOff) <- endOffTable) { + if (!terminate) { + val recordList = records.records(tp) + if (!recordList.isEmpty) { + val first = recordList.get(0) + if (first.offset() >= endOff) { arrive.remove(tp) + } else { + res.addAll(recordList) + if (recordList.get(recordList.size() - 1).offset() >= endOff) { + arrive.remove(tp) + } } } - } - - if (arrive.isEmpty) { - terminate = true; + if (arrive.isEmpty) { + terminate = true; + } } } } diff --git a/ui/src/utils/request.js b/ui/src/utils/request.js index 27b6755..0af83fa 100644 --- a/ui/src/utils/request.js +++ b/ui/src/utils/request.js @@ -6,7 +6,7 @@ import { VueAxios } from "./axios"; const request = axios.create({ // API 请求的默认前缀 baseURL: process.env.VUE_APP_API_BASE_URL, - timeout: 30000, // 请求超时时间 + timeout: 120000, // 请求超时时间 }); // 异常拦截处理器 diff --git a/ui/src/views/message/MessageDetail.vue b/ui/src/views/message/MessageDetail.vue index 6852bb0..74eca06 100644 --- a/ui/src/views/message/MessageDetail.vue +++ b/ui/src/views/message/MessageDetail.vue @@ -150,7 +150,7 @@ export default { }, methods: { getMessageDetail() { - this.loading = false; + this.loading = true; const params = Object.assign({}, this.record, { keyDeserializer: this.keyDeserializer, valueDeserializer: this.valueDeserializer, diff --git a/ui/src/views/message/SearchByOffset.vue b/ui/src/views/message/SearchByOffset.vue index ab13e12..9b36cbf 100644 --- a/ui/src/views/message/SearchByOffset.vue +++ b/ui/src/views/message/SearchByOffset.vue @@ -8,10 +8,10 @@ @submit="handleSearch" > - + - + - + - + - +