diff --git a/src/main/scala/kafka/console/MessageConsole.scala b/src/main/scala/kafka/console/MessageConsole.scala index 324f87b..0679a8a 100644 --- a/src/main/scala/kafka/console/MessageConsole.scala +++ b/src/main/scala/kafka/console/MessageConsole.scala @@ -8,7 +8,7 @@ import java.time.Duration import java.util import java.util.Properties import scala.collection.immutable -import scala.jdk.CollectionConverters.{CollectionHasAsScala, MapHasAsScala} +import scala.jdk.CollectionConverters.{CollectionHasAsScala, MapHasAsScala, SeqHasAsJava} /** * kafka-console-ui. @@ -60,20 +60,48 @@ class MessageConsole(config: KafkaConfig) extends KafkaConsole(config: KafkaConf } else { for ((tp, endOff) <- endOffTable) { if (!terminate) { - val recordList = records.records(tp) + var recordList = records.records(tp) if (!recordList.isEmpty) { val first = recordList.get(0) if (first.offset() >= endOff) { arrive.remove(tp) } else { - res.addAll(recordList) + // + // (String topic, + // int partition, + // long offset, + // long timestamp, + // TimestampType timestampType, + // Long checksum, + // int serializedKeySize, + // int serializedValueSize, + // K key, + // V value, + // Headers headers, + // Optional leaderEpoch) + val nullVList = recordList.asScala.map(record => new ConsumerRecord[Array[Byte], Array[Byte]](record.topic(), + record.partition(), + record.offset(), + record.timestamp(), + record.timestampType(), + record.checksum(), + record.serializedKeySize(), + record.serializedValueSize(), + record.key(), + null, + record.headers(), + record.leaderEpoch())).toSeq.asJava + res.addAll(nullVList) if (recordList.get(recordList.size() - 1).offset() >= endOff) { arrive.remove(tp) } + if (recordList != null) { + recordList = null + } } } if (arrive.isEmpty) { - terminate = true; + terminate = true } } } diff --git a/ui/src/views/message/Message.vue b/ui/src/views/message/Message.vue index 3f6875b..2ee892f 100644 --- a/ui/src/views/message/Message.vue +++ b/ui/src/views/message/Message.vue @@ -5,10 +5,12 @@ - + - + + + @@ -20,9 +22,10 @@ import SearchByOffset from "@/views/message/SearchByOffset"; import request from "@/utils/request"; import { KafkaTopicApi } from "@/utils/api"; import notification from "ant-design-vue/lib/notification"; +import SendMessage from "@/views/message/SendMessage"; export default { name: "Message", - components: { SearchByTime, SearchByOffset }, + components: { SearchByTime, SearchByOffset, SendMessage }, data() { return { loading: false, diff --git a/ui/src/views/message/SearchByOffset.vue b/ui/src/views/message/SearchByOffset.vue index 9b36cbf..e131e61 100644 --- a/ui/src/views/message/SearchByOffset.vue +++ b/ui/src/views/message/SearchByOffset.vue @@ -1,7 +1,7 @@