From 20535027bf1e4c33c8cadf513a97ad3db87fbca6 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?=E8=AE=B8=E6=99=93=E4=B8=9C?= <763795151@qq.com>
Date: Tue, 30 Nov 2021 19:49:02 +0800
Subject: [PATCH] =?UTF-8?q?=E5=8F=96=E6=B6=88=E6=AD=A3=E5=9C=A8=E8=BF=9B?=
=?UTF-8?q?=E8=A1=8C=E7=9A=84=E5=89=AF=E6=9C=AC=E9=87=8D=E5=88=86=E9=85=8D?=
=?UTF-8?q?=EF=BC=8C=E6=B6=88=E8=B4=B9=E7=BB=84->=E6=B6=88=E8=B4=B9?=
=?UTF-8?q?=E8=AF=A6=E6=83=85=E5=A2=9E=E5=8A=A0=E5=88=B7=E6=96=B0=E6=8C=89?=
=?UTF-8?q?=E9=92=AE?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
.../beans/vo/CurrentReassignmentVO.java | 25 +++
.../controller/OperationController.java | 11 ++
.../console/service/OperationService.java | 5 +
.../service/impl/OperationServiceImpl.java | 27 +++
.../kafka/console/OperationConsole.scala | 26 ++-
ui/src/utils/api.js | 8 +
ui/src/views/group/ConsumerDetail.vue | 9 +
ui/src/views/op/CurrentReassignments.vue | 166 ++++++++++++++++++
ui/src/views/op/Operation.vue | 16 +-
9 files changed, 288 insertions(+), 5 deletions(-)
create mode 100644 src/main/java/com/xuxd/kafka/console/beans/vo/CurrentReassignmentVO.java
create mode 100644 ui/src/views/op/CurrentReassignments.vue
diff --git a/src/main/java/com/xuxd/kafka/console/beans/vo/CurrentReassignmentVO.java b/src/main/java/com/xuxd/kafka/console/beans/vo/CurrentReassignmentVO.java
new file mode 100644
index 0000000..229db56
--- /dev/null
+++ b/src/main/java/com/xuxd/kafka/console/beans/vo/CurrentReassignmentVO.java
@@ -0,0 +1,25 @@
+package com.xuxd.kafka.console.beans.vo;
+
+import com.xuxd.kafka.console.beans.TopicPartition;
+import java.util.List;
+import lombok.Data;
+
+/**
+ * kafka-console-ui.
+ *
+ * @author xuxd
+ * @date 2021-11-30 16:03:41
+ **/
+@Data
+public class CurrentReassignmentVO {
+
+ private final String topic;
+
+ private final int partition;
+
+ private final List replicas;
+
+ private final List addingReplicas;
+
+ private final List removingReplicas;
+}
diff --git a/src/main/java/com/xuxd/kafka/console/controller/OperationController.java b/src/main/java/com/xuxd/kafka/console/controller/OperationController.java
index b1b83aa..5753652 100644
--- a/src/main/java/com/xuxd/kafka/console/controller/OperationController.java
+++ b/src/main/java/com/xuxd/kafka/console/controller/OperationController.java
@@ -1,5 +1,6 @@
package com.xuxd.kafka.console.controller;
+import com.xuxd.kafka.console.beans.TopicPartition;
import com.xuxd.kafka.console.beans.dto.BrokerThrottleDTO;
import com.xuxd.kafka.console.beans.dto.ReplicationDTO;
import com.xuxd.kafka.console.beans.dto.SyncDataDTO;
@@ -63,4 +64,14 @@ public class OperationController {
public Object removeThrottle(@RequestBody BrokerThrottleDTO dto) {
return operationService.removeThrottle(dto.getBrokerList());
}
+
+ @GetMapping("/replication/reassignments")
+ public Object currentReassignments() {
+ return operationService.currentReassignments();
+ }
+
+ @DeleteMapping("/replication/reassignments")
+ public Object cancelReassignment(@RequestBody TopicPartition partition) {
+ return operationService.cancelReassignment(new org.apache.kafka.common.TopicPartition(partition.getTopic(), partition.getPartition()));
+ }
}
diff --git a/src/main/java/com/xuxd/kafka/console/service/OperationService.java b/src/main/java/com/xuxd/kafka/console/service/OperationService.java
index e797fd1..a40e7eb 100644
--- a/src/main/java/com/xuxd/kafka/console/service/OperationService.java
+++ b/src/main/java/com/xuxd/kafka/console/service/OperationService.java
@@ -3,6 +3,7 @@ package com.xuxd.kafka.console.service;
import com.xuxd.kafka.console.beans.ResponseData;
import java.util.List;
import java.util.Properties;
+import org.apache.kafka.common.TopicPartition;
/**
* kafka-console-ui.
@@ -25,4 +26,8 @@ public interface OperationService {
ResponseData configThrottle(List brokerList, long size);
ResponseData removeThrottle(List brokerList);
+
+ ResponseData currentReassignments();
+
+ ResponseData cancelReassignment(TopicPartition partition);
}
diff --git a/src/main/java/com/xuxd/kafka/console/service/impl/OperationServiceImpl.java b/src/main/java/com/xuxd/kafka/console/service/impl/OperationServiceImpl.java
index 5898563..37cc84c 100644
--- a/src/main/java/com/xuxd/kafka/console/service/impl/OperationServiceImpl.java
+++ b/src/main/java/com/xuxd/kafka/console/service/impl/OperationServiceImpl.java
@@ -5,17 +5,21 @@ import com.google.gson.Gson;
import com.google.gson.JsonObject;
import com.xuxd.kafka.console.beans.ResponseData;
import com.xuxd.kafka.console.beans.dos.MinOffsetAlignmentDO;
+import com.xuxd.kafka.console.beans.vo.CurrentReassignmentVO;
import com.xuxd.kafka.console.beans.vo.OffsetAlignmentVO;
import com.xuxd.kafka.console.dao.MinOffsetAlignmentMapper;
import com.xuxd.kafka.console.service.OperationService;
import com.xuxd.kafka.console.utils.GsonUtil;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
+import java.util.stream.Collectors;
import kafka.console.OperationConsole;
+import org.apache.kafka.clients.admin.PartitionReassignment;
import org.apache.kafka.common.TopicPartition;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.beans.factory.annotation.Autowired;
@@ -135,4 +139,27 @@ public class OperationServiceImpl implements OperationService {
return (boolean) tuple2._1() ? ResponseData.create().success() : ResponseData.create().failed(tuple2._2());
}
+
+ @Override public ResponseData currentReassignments() {
+ Map reassignmentMap = operationConsole.currentReassignments();
+ List vos = reassignmentMap.entrySet().stream().map(entry -> {
+ TopicPartition partition = entry.getKey();
+ PartitionReassignment reassignment = entry.getValue();
+ return new CurrentReassignmentVO(partition.topic(),
+ partition.partition(), reassignment.replicas(), reassignment.addingReplicas(), reassignment.removingReplicas());
+ }).collect(Collectors.toList());
+ return ResponseData.create().data(vos).success();
+ }
+
+ @Override public ResponseData cancelReassignment(TopicPartition partition) {
+ Map res = operationConsole.cancelPartitionReassignments(Collections.singleton(partition));
+ if (!res.isEmpty()) {
+ StringBuilder sb = new StringBuilder("Failed: ");
+ res.forEach((p, t) -> {
+ sb.append(p.toString()).append(": ").append(t.getMessage()).append(System.lineSeparator());
+ });
+ return ResponseData.create().failed(sb.toString());
+ }
+ return ResponseData.create().success();
+ }
}
diff --git a/src/main/scala/kafka/console/OperationConsole.scala b/src/main/scala/kafka/console/OperationConsole.scala
index 8a192bd..feddfb0 100644
--- a/src/main/scala/kafka/console/OperationConsole.scala
+++ b/src/main/scala/kafka/console/OperationConsole.scala
@@ -2,14 +2,14 @@ package kafka.console
import com.xuxd.kafka.console.config.KafkaConfig
import kafka.admin.ReassignPartitionsCommand
-import org.apache.kafka.clients.admin.ElectLeadersOptions
+import org.apache.kafka.clients.admin.{ElectLeadersOptions, ListPartitionReassignmentsOptions, PartitionReassignment}
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.common.serialization.ByteArrayDeserializer
import org.apache.kafka.common.{ElectionType, TopicPartition}
import java.util.concurrent.TimeUnit
import java.util.{Collections, Properties}
-import scala.jdk.CollectionConverters.{CollectionHasAsScala, ListHasAsScala, MapHasAsScala, SeqHasAsJava, SetHasAsJava, SetHasAsScala}
+import scala.jdk.CollectionConverters.{CollectionHasAsScala, ListHasAsScala, MapHasAsJava, MapHasAsScala, SeqHasAsJava, SetHasAsJava, SetHasAsScala}
/**
* kafka-console-ui.
@@ -231,4 +231,26 @@ class OperationConsole(config: KafkaConfig, topicConsole: TopicConsole,
(false, e.getMessage)
}).asInstanceOf[(Boolean, String)]
}
+
+ /**
+ * current reassigning is active.
+ */
+ def currentReassignments(): util.Map[TopicPartition, PartitionReassignment] = {
+ withAdminClientAndCatchError(admin => {
+ admin.listPartitionReassignments(withTimeoutMs(new ListPartitionReassignmentsOptions)).reassignments().get()
+ }, e => {
+ Collections.emptyMap()
+ log.error("listPartitionReassignments error.", e)
+ }).asInstanceOf[util.Map[TopicPartition, PartitionReassignment]]
+ }
+
+ def cancelPartitionReassignments(reassignments: util.Set[TopicPartition]): util.Map[TopicPartition, Throwable] = {
+ withAdminClientAndCatchError(admin => {
+ val res = ReassignPartitionsCommand.cancelPartitionReassignments(admin, reassignments.asScala.toSet)
+ res.asJava
+ }, e => {
+ log.error("cancelPartitionReassignments error.", e)
+ throw e
+ }).asInstanceOf[util.Map[TopicPartition, Throwable]]
+ }
}
\ No newline at end of file
diff --git a/ui/src/utils/api.js b/ui/src/utils/api.js
index dff50a3..e1bc2d2 100644
--- a/ui/src/utils/api.js
+++ b/ui/src/utils/api.js
@@ -214,4 +214,12 @@ export const KafkaOpApi = {
url: "/op/broker/throttle",
method: "delete",
},
+ currentReassignments: {
+ url: "/op/replication/reassignments",
+ method: "get",
+ },
+ cancelReassignment: {
+ url: "/op/replication/reassignments",
+ method: "delete",
+ },
};
diff --git a/ui/src/views/group/ConsumerDetail.vue b/ui/src/views/group/ConsumerDetail.vue
index 4667220..3c41d68 100644
--- a/ui/src/views/group/ConsumerDetail.vue
+++ b/ui/src/views/group/ConsumerDetail.vue
@@ -47,6 +47,15 @@
@click="openResetOffsetByTimeDialog(k)"
>时间戳
+
+ 刷新
+
+
+
+
+
+
+
+ {{ i }}
+
+
+
+
+ {{ i }}
+
+
+
+
+ {{ i }}
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/ui/src/views/op/Operation.vue b/ui/src/views/op/Operation.vue
index 08b2bdc..3672071 100644
--- a/ui/src/views/op/Operation.vue
+++ b/ui/src/views/op/Operation.vue
@@ -30,7 +30,7 @@
将集群中所有分区leader副本设置为首选副本
-
+
副本变更详情
@@ -103,6 +103,10 @@
@closeRemoveThrottleDialog="closeRemoveThrottleDialog"
>
+
@@ -114,6 +118,7 @@ import ElectPreferredLeader from "@/views/op/ElectPreferredLeader";
import DataSyncScheme from "@/views/op/DataSyncScheme";
import ConfigThrottle from "@/views/op/ConfigThrottle";
import RemoveThrottle from "@/views/op/RemoveThrottle";
+import CurrentReassignments from "@/views/op/CurrentReassignments";
export default {
name: "Operation",
components: {
@@ -124,6 +129,7 @@ export default {
DataSyncScheme,
ConfigThrottle,
RemoveThrottle,
+ CurrentReassignments,
},
data() {
return {
@@ -135,6 +141,7 @@ export default {
},
replicationManager: {
showElectPreferredLeaderDialog: false,
+ showCurrentReassignmentsDialog: false,
},
brokerManager: {
showConfigThrottleDialog: false,
@@ -185,8 +192,11 @@ export default {
closeRemoveThrottleDialog() {
this.brokerManager.showRemoveThrottleDialog = false;
},
- openReplicaReassignmentDetailDialog() {
- this.$message.info("此功能尚不支持,下个版本支持");
+ openCurrentReassignmentsDialog() {
+ this.replicationManager.showCurrentReassignmentsDialog = true;
+ },
+ closeCurrentReassignmentsDialog() {
+ this.replicationManager.showCurrentReassignmentsDialog = false;
},
},
};