From 7a98eb479f758b1152c7ad66501d453c88254448 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=AE=B8=E6=99=93=E4=B8=9C?= <763795151@qq.com> Date: Fri, 19 Nov 2021 21:01:11 +0800 Subject: [PATCH] =?UTF-8?q?=E5=8F=98=E6=9B=B4=E5=89=AF=E6=9C=AC=E4=BF=A1?= =?UTF-8?q?=E6=81=AF=E6=9F=A5=E8=AF=A2?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pom.xml | 2 +- .../console/beans/ReplicaAssignment.java | 27 ++++ .../console/controller/TopicController.java | 4 + .../kafka/console/service/TopicService.java | 2 + .../service/impl/OperationServiceImpl.java | 3 +- .../service/impl/TopicServiceImpl.java | 12 ++ .../xuxd/kafka/console/utils/GsonUtil.java | 19 +++ .../scala/kafka/console/TopicConsole.scala | 72 ++++++++++- ui/src/utils/api.js | 4 + ui/src/views/topic/Topic.vue | 44 ++++++- ui/src/views/topic/UpdateReplica.vue | 121 ++++++++++++++++++ 11 files changed, 301 insertions(+), 9 deletions(-) create mode 100644 src/main/java/com/xuxd/kafka/console/beans/ReplicaAssignment.java create mode 100644 src/main/java/com/xuxd/kafka/console/utils/GsonUtil.java create mode 100644 ui/src/views/topic/UpdateReplica.vue diff --git a/pom.xml b/pom.xml index aafb851..cb57ca4 100644 --- a/pom.xml +++ b/pom.xml @@ -10,7 +10,7 @@ com.xuxd kafka-console-ui - 1.0.0 + 1.0.1 kafka-console-ui Kafka console manage ui diff --git a/src/main/java/com/xuxd/kafka/console/beans/ReplicaAssignment.java b/src/main/java/com/xuxd/kafka/console/beans/ReplicaAssignment.java new file mode 100644 index 0000000..d6856a2 --- /dev/null +++ b/src/main/java/com/xuxd/kafka/console/beans/ReplicaAssignment.java @@ -0,0 +1,27 @@ +package com.xuxd.kafka.console.beans; + +import java.util.List; +import lombok.Data; + +/** + * kafka-console-ui. + * + * @author xuxd + * @date 2021-11-19 17:07:50 + **/ +@Data +public class ReplicaAssignment { + + private long version = 1L; + + private List partitions; + + @Data + static class Partition { + private String topic; + + private int partition; + + private List replicas; + } +} diff --git a/src/main/java/com/xuxd/kafka/console/controller/TopicController.java b/src/main/java/com/xuxd/kafka/console/controller/TopicController.java index 986bbed..84031c1 100644 --- a/src/main/java/com/xuxd/kafka/console/controller/TopicController.java +++ b/src/main/java/com/xuxd/kafka/console/controller/TopicController.java @@ -71,4 +71,8 @@ public class TopicController { return topicService.addPartitions(topic, addNum, assignment); } + @GetMapping("/replica/assignment") + public Object getCurrentReplicaAssignment(@RequestParam String topic) { + return topicService.getCurrentReplicaAssignment(topic); + } } diff --git a/src/main/java/com/xuxd/kafka/console/service/TopicService.java b/src/main/java/com/xuxd/kafka/console/service/TopicService.java index 31ac5f5..99c9e42 100644 --- a/src/main/java/com/xuxd/kafka/console/service/TopicService.java +++ b/src/main/java/com/xuxd/kafka/console/service/TopicService.java @@ -26,4 +26,6 @@ public interface TopicService { ResponseData createTopic(NewTopic topic); ResponseData addPartitions(String topic, int addNum, List> newAssignmentst); + + ResponseData getCurrentReplicaAssignment(String topic); } diff --git a/src/main/java/com/xuxd/kafka/console/service/impl/OperationServiceImpl.java b/src/main/java/com/xuxd/kafka/console/service/impl/OperationServiceImpl.java index eb460a0..7c996c6 100644 --- a/src/main/java/com/xuxd/kafka/console/service/impl/OperationServiceImpl.java +++ b/src/main/java/com/xuxd/kafka/console/service/impl/OperationServiceImpl.java @@ -8,6 +8,7 @@ import com.xuxd.kafka.console.beans.dos.MinOffsetAlignmentDO; import com.xuxd.kafka.console.beans.vo.OffsetAlignmentVO; import com.xuxd.kafka.console.dao.MinOffsetAlignmentMapper; import com.xuxd.kafka.console.service.OperationService; +import com.xuxd.kafka.console.utils.GsonUtil; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -30,7 +31,7 @@ import scala.Tuple2; @Service public class OperationServiceImpl implements OperationService { - private Gson gson = new Gson(); + private Gson gson = GsonUtil.INSTANCE.get(); @Autowired private OperationConsole operationConsole; diff --git a/src/main/java/com/xuxd/kafka/console/service/impl/TopicServiceImpl.java b/src/main/java/com/xuxd/kafka/console/service/impl/TopicServiceImpl.java index 55636cf..137fd48 100644 --- a/src/main/java/com/xuxd/kafka/console/service/impl/TopicServiceImpl.java +++ b/src/main/java/com/xuxd/kafka/console/service/impl/TopicServiceImpl.java @@ -1,10 +1,13 @@ package com.xuxd.kafka.console.service.impl; +import com.google.gson.Gson; +import com.xuxd.kafka.console.beans.ReplicaAssignment; import com.xuxd.kafka.console.beans.ResponseData; import com.xuxd.kafka.console.beans.enums.TopicType; import com.xuxd.kafka.console.beans.vo.TopicDescriptionVO; import com.xuxd.kafka.console.beans.vo.TopicPartitionVO; import com.xuxd.kafka.console.service.TopicService; +import com.xuxd.kafka.console.utils.GsonUtil; import java.util.Collections; import java.util.Comparator; import java.util.HashMap; @@ -38,6 +41,8 @@ public class TopicServiceImpl implements TopicService { @Autowired private TopicConsole topicConsole; + private Gson gson = GsonUtil.INSTANCE.get(); + @Override public ResponseData getTopicNameList(boolean internal) { return ResponseData.create().data(topicConsole.getTopicNameList(internal)).success(); } @@ -130,4 +135,11 @@ public class TopicServiceImpl implements TopicService { return success ? ResponseData.create().success() : ResponseData.create().failed(tuple2._2()); } + + @Override public ResponseData getCurrentReplicaAssignment(String topic) { + Tuple2 tuple2 = topicConsole.getCurrentReplicaAssignmentJson(topic); + boolean success = (boolean) tuple2._1(); + + return success ? ResponseData.create().data(gson.fromJson(tuple2._2(), ReplicaAssignment.class)).success() : ResponseData.create().failed(tuple2._2()); + } } diff --git a/src/main/java/com/xuxd/kafka/console/utils/GsonUtil.java b/src/main/java/com/xuxd/kafka/console/utils/GsonUtil.java new file mode 100644 index 0000000..2ae1250 --- /dev/null +++ b/src/main/java/com/xuxd/kafka/console/utils/GsonUtil.java @@ -0,0 +1,19 @@ +package com.xuxd.kafka.console.utils; + +import com.google.gson.Gson; + +/** + * kafka-console-ui. + * + * @author xuxd + * @date 2021-11-19 17:01:01 + **/ +public enum GsonUtil { + INSTANCE; + + private Gson gson = new Gson(); + + public Gson get() { + return gson; + } +} diff --git a/src/main/scala/kafka/console/TopicConsole.scala b/src/main/scala/kafka/console/TopicConsole.scala index ad522c4..c8014bd 100644 --- a/src/main/scala/kafka/console/TopicConsole.scala +++ b/src/main/scala/kafka/console/TopicConsole.scala @@ -1,14 +1,17 @@ package kafka.console -import java.util -import java.util.concurrent.TimeUnit -import java.util.{Collections, List, Set} - import com.xuxd.kafka.console.config.KafkaConfig +import kafka.admin.ReassignPartitionsCommand.compareTopicPartitions +import kafka.utils.Json import org.apache.kafka.clients.admin._ -import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.errors.UnknownTopicOrPartitionException +import org.apache.kafka.common.{TopicPartition, TopicPartitionReplica} -import scala.jdk.CollectionConverters.{CollectionHasAsScala, SetHasAsJava} +import java.util +import java.util.concurrent.{ExecutionException, TimeUnit} +import java.util.{Collections, List, Set} +import scala.collection.{Map, Seq} +import scala.jdk.CollectionConverters.{CollectionHasAsScala, MapHasAsJava, MapHasAsScala, SeqHasAsJava, SetHasAsJava} /** * kafka-console-ui. @@ -121,4 +124,61 @@ class TopicConsole(config: KafkaConfig) extends KafkaConsole(config: KafkaConfig (false, e.getMessage) }).asInstanceOf[(Boolean, String)] } + + def getCurrentReplicaAssignmentJson(topic: String): (Boolean, String) = { + withAdminClientAndCatchError(admin => { + val json = formatAsReassignmentJson(getReplicaAssignmentForTopics(admin, Seq(topic)), Map.empty) + (true, json) + }, e => { + log.error("getCurrentReplicaAssignmentJson error, ", e) + (false, e.getMessage) + }).asInstanceOf[(Boolean, String)] + } + + /** + * Get the current replica assignments for some topics. + * + * @param adminClient The AdminClient to use. + * @param topics The topics to get information about. + * @return A map from partitions to broker assignments. + * If any topic can't be found, an exception will be thrown. + */ + private def getReplicaAssignmentForTopics(adminClient: Admin, + topics: Seq[String]) + : Map[TopicPartition, Seq[Int]] = { + describeTopics(adminClient, topics.toSet.asJava).flatMap { + case (topicName, topicDescription) => topicDescription.partitions.asScala.map { info => + (new TopicPartition(topicName, info.partition), info.replicas.asScala.map(_.id).toSeq) + } + } + } + + private def formatAsReassignmentJson(partitionsToBeReassigned: Map[TopicPartition, Seq[Int]], + replicaLogDirAssignment: Map[TopicPartitionReplica, String]): String = { + Json.encodeAsString(Map( + "version" -> 1, + "partitions" -> partitionsToBeReassigned.keySet.toBuffer.sortWith(compareTopicPartitions).map { + tp => + val replicas = partitionsToBeReassigned(tp) + Map( + "topic" -> tp.topic, + "partition" -> tp.partition, + "replicas" -> replicas.asJava + ).asJava + }.asJava + ).asJava) + } + + private def describeTopics(adminClient: Admin, + topics: Set[String]) + : Map[String, TopicDescription] = { + adminClient.describeTopics(topics).values.asScala.map { case (topicName, topicDescriptionFuture) => + try topicName -> topicDescriptionFuture.get + catch { + case t: ExecutionException if t.getCause.isInstanceOf[UnknownTopicOrPartitionException] => + throw new ExecutionException( + new UnknownTopicOrPartitionException(s"Topic $topicName not found.")) + } + } + } } diff --git a/ui/src/utils/api.js b/ui/src/utils/api.js index 0b96958..a4e47e3 100644 --- a/ui/src/utils/api.js +++ b/ui/src/utils/api.js @@ -117,6 +117,10 @@ export const KafkaTopicApi = { url: "/topic/partition/new", method: "post", }, + getCurrentReplicaAssignment: { + url: "/topic/replica/assignment", + method: "get", + }, }; export const KafkaConsumerApi = { diff --git a/ui/src/views/topic/Topic.vue b/ui/src/views/topic/Topic.vue index bafa48d..f5cb3b2 100644 --- a/ui/src/views/topic/Topic.vue +++ b/ui/src/views/topic/Topic.vue @@ -98,6 +98,27 @@ @click="openTopicConfigDialog(record.name)" >属性配置 + 变更副本 + + 发送统计 + + 限流 + + @@ -140,6 +166,7 @@ import CreateTopic from "@/views/topic/CreateTopic"; import AddPartition from "@/views/topic/AddPartition"; import ConsumedDetail from "@/views/topic/ConsumedDetail"; import TopicConfig from "@/views/topic/TopicConfig"; +import UpdateReplica from "@/views/topic/UpdateReplica"; export default { name: "Topic", @@ -149,6 +176,7 @@ export default { AddPartition, ConsumedDetail, TopicConfig, + UpdateReplica, }, data() { return { @@ -170,6 +198,7 @@ export default { showAddPartition: false, showConsumedDetailDialog: false, showTopicConfigDialog: false, + showUpdateReplicaDialog: false, }; }, methods: { @@ -250,6 +279,19 @@ export default { closeTopicConfigDialog() { this.showTopicConfigDialog = false; }, + openUpdateReplicaDialog(topic) { + this.showUpdateReplicaDialog = true; + this.selectDetail.resourceName = topic; + }, + closeUpdateReplicaDialog() { + this.showUpdateReplicaDialog = false; + }, + openMessageStatsDialog() { + this.$message.info("此功能尚不支持"); + }, + openThrottleDialog() { + this.$message.info("此功能尚不支持"); + }, }, created() { this.getTopicList(); @@ -281,7 +323,7 @@ const columns = [ title: "操作", key: "operation", scopedSlots: { customRender: "operation" }, - width: 500, + width: 800, }, ]; diff --git a/ui/src/views/topic/UpdateReplica.vue b/ui/src/views/topic/UpdateReplica.vue new file mode 100644 index 0000000..6126cd6 --- /dev/null +++ b/ui/src/views/topic/UpdateReplica.vue @@ -0,0 +1,121 @@ + + + + +