From 02abe67fce092f27feeae682e32de1814c20a4a1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=AE=B8=E6=99=93=E4=B8=9C?= <763795151@qq.com> Date: Thu, 30 Dec 2021 14:17:47 +0800 Subject: [PATCH] =?UTF-8?q?=E6=B6=88=E6=81=AF=E6=9F=A5=E8=AF=A2=E8=BF=87?= =?UTF-8?q?=E6=BB=A4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../console/service/impl/MessageServiceImpl.java | 9 ++++++--- src/main/scala/kafka/console/MessageConsole.scala | 4 ++-- ui/src/views/message/SearchByTime.vue | 15 ++++++++++----- 3 files changed, 18 insertions(+), 10 deletions(-) diff --git a/src/main/java/com/xuxd/kafka/console/service/impl/MessageServiceImpl.java b/src/main/java/com/xuxd/kafka/console/service/impl/MessageServiceImpl.java index f0ce85a..fc08e2e 100644 --- a/src/main/java/com/xuxd/kafka/console/service/impl/MessageServiceImpl.java +++ b/src/main/java/com/xuxd/kafka/console/service/impl/MessageServiceImpl.java @@ -79,7 +79,7 @@ public class MessageServiceImpl implements MessageService, ApplicationContextAwa public static String defaultDeserializer = "String"; @Override public ResponseData searchByTime(QueryMessage queryMessage) { - int maxNums = 10000; + int maxNums = 5000; Object searchContent = null; String headerKey = null; @@ -139,14 +139,17 @@ public class MessageServiceImpl implements MessageService, ApplicationContextAwa Set partitions = getPartitions(queryMessage); long startTime = System.currentTimeMillis(); - List> records = messageConsole.searchBy(partitions, queryMessage.getStartTime(), queryMessage.getEndTime(), maxNums, filter); + Tuple2>, Object> tuple2 = messageConsole.searchBy(partitions, queryMessage.getStartTime(), queryMessage.getEndTime(), maxNums, filter); + List> records = tuple2._1(); log.info("search message by time, cost time: {}", (System.currentTimeMillis() - startTime)); List vos = records.stream().filter(record -> record.timestamp() <= queryMessage.getEndTime()) .map(ConsumerRecordVO::fromConsumerRecord).collect(Collectors.toList()); Map res = new HashMap<>(); + vos = vos.subList(0, Math.min(maxNums, vos.size())); res.put("maxNum", maxNums); res.put("realNum", vos.size()); - res.put("data", vos.subList(0, Math.min(maxNums, vos.size()))); + res.put("searchNum", tuple2._2()); + res.put("data", vos); return ResponseData.create().data(res).success(); } diff --git a/src/main/scala/kafka/console/MessageConsole.scala b/src/main/scala/kafka/console/MessageConsole.scala index 19fb7cd..d31e754 100644 --- a/src/main/scala/kafka/console/MessageConsole.scala +++ b/src/main/scala/kafka/console/MessageConsole.scala @@ -23,7 +23,7 @@ import scala.jdk.CollectionConverters.{CollectionHasAsScala, MapHasAsScala, SeqH class MessageConsole(config: KafkaConfig) extends KafkaConsole(config: KafkaConfig) with Logging { def searchBy(partitions: util.Collection[TopicPartition], startTime: Long, endTime: Long, - maxNums: Int, filter: MessageFilter): util.List[ConsumerRecord[Array[Byte], Array[Byte]]] = { + maxNums: Int, filter: MessageFilter): (util.List[ConsumerRecord[Array[Byte], Array[Byte]]], Int) = { var startOffTable: immutable.Map[TopicPartition, Long] = Map.empty var endOffTable: immutable.Map[TopicPartition, Long] = Map.empty withAdminClientAndCatchError(admin => { @@ -154,7 +154,7 @@ class MessageConsole(config: KafkaConfig) extends KafkaConsole(config: KafkaConf }) } - res + (res, searchNums) } def searchBy( diff --git a/ui/src/views/message/SearchByTime.vue b/ui/src/views/message/SearchByTime.vue index f3d1360..aba99f6 100644 --- a/ui/src/views/message/SearchByTime.vue +++ b/ui/src/views/message/SearchByTime.vue @@ -92,7 +92,10 @@ 消息头的value不是字符串类型,就不要输入用来过滤了消息头的value不是字符串类型,就不要输入value用来过滤了,可以只输入消息头的key,过滤存在该key的消息 @@ -135,9 +138,11 @@

检索条数:{{ data.realNum }},允许返回的最大条数:{{ + >检索消息条数:{{ data.searchNum }},实际返回条数:{{ + data.realNum + }},允许返回的最大条数:{{ data.maxNum - }}

@@ -262,7 +267,7 @@ export default { this.getDeserializerList(); }, }; -const defaultData = { realNum: 0, maxNum: 0 }; +const defaultData = { realNum: 0, maxNum: 0, searchNum: 0 };