From bd814d550d69e5961326b15d969e70b25cf621ef Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=AE=B8=E6=99=93=E4=B8=9C?= <763795151@qq.com> Date: Fri, 17 Dec 2021 20:06:23 +0800 Subject: [PATCH] =?UTF-8?q?=E6=8C=89=E6=97=B6=E9=97=B4=E6=9F=A5=E8=AF=A2?= =?UTF-8?q?=E6=B6=88=E6=81=AF=E5=8F=8A=E6=97=B6=E9=87=8A=E6=94=BE=E5=86=85?= =?UTF-8?q?=E5=AD=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../scala/kafka/console/MessageConsole.scala | 36 ++++- ui/src/views/message/Message.vue | 9 +- ui/src/views/message/SearchByOffset.vue | 9 +- ui/src/views/message/SearchByTime.vue | 7 +- ui/src/views/message/SendMessage.vue | 144 ++++++++++++++++++ 5 files changed, 191 insertions(+), 14 deletions(-) create mode 100644 ui/src/views/message/SendMessage.vue diff --git a/src/main/scala/kafka/console/MessageConsole.scala b/src/main/scala/kafka/console/MessageConsole.scala index 324f87b..0679a8a 100644 --- a/src/main/scala/kafka/console/MessageConsole.scala +++ b/src/main/scala/kafka/console/MessageConsole.scala @@ -8,7 +8,7 @@ import java.time.Duration import java.util import java.util.Properties import scala.collection.immutable -import scala.jdk.CollectionConverters.{CollectionHasAsScala, MapHasAsScala} +import scala.jdk.CollectionConverters.{CollectionHasAsScala, MapHasAsScala, SeqHasAsJava} /** * kafka-console-ui. @@ -60,20 +60,48 @@ class MessageConsole(config: KafkaConfig) extends KafkaConsole(config: KafkaConf } else { for ((tp, endOff) <- endOffTable) { if (!terminate) { - val recordList = records.records(tp) + var recordList = records.records(tp) if (!recordList.isEmpty) { val first = recordList.get(0) if (first.offset() >= endOff) { arrive.remove(tp) } else { - res.addAll(recordList) + // + // (String topic, + // int partition, + // long offset, + // long timestamp, + // TimestampType timestampType, + // Long checksum, + // int serializedKeySize, + // int serializedValueSize, + // K key, + // V value, + // Headers headers, + // Optional leaderEpoch) + val nullVList = recordList.asScala.map(record => new ConsumerRecord[Array[Byte], Array[Byte]](record.topic(), + record.partition(), + record.offset(), + record.timestamp(), + record.timestampType(), + record.checksum(), + record.serializedKeySize(), + record.serializedValueSize(), + record.key(), + null, + record.headers(), + record.leaderEpoch())).toSeq.asJava + res.addAll(nullVList) if (recordList.get(recordList.size() - 1).offset() >= endOff) { arrive.remove(tp) } + if (recordList != null) { + recordList = null + } } } if (arrive.isEmpty) { - terminate = true; + terminate = true } } } diff --git a/ui/src/views/message/Message.vue b/ui/src/views/message/Message.vue index 3f6875b..2ee892f 100644 --- a/ui/src/views/message/Message.vue +++ b/ui/src/views/message/Message.vue @@ -5,10 +5,12 @@ - + - + + + @@ -20,9 +22,10 @@ import SearchByOffset from "@/views/message/SearchByOffset"; import request from "@/utils/request"; import { KafkaTopicApi } from "@/utils/api"; import notification from "ant-design-vue/lib/notification"; +import SendMessage from "@/views/message/SendMessage"; export default { name: "Message", - components: { SearchByTime, SearchByOffset }, + components: { SearchByTime, SearchByOffset, SendMessage }, data() { return { loading: false, diff --git a/ui/src/views/message/SearchByOffset.vue b/ui/src/views/message/SearchByOffset.vue index 9b36cbf..e131e61 100644 --- a/ui/src/views/message/SearchByOffset.vue +++ b/ui/src/views/message/SearchByOffset.vue @@ -1,7 +1,7 @@