diff --git a/src/main/java/com/xuxd/kafka/console/beans/dto/ProposedAssignmentDTO.java b/src/main/java/com/xuxd/kafka/console/beans/dto/ProposedAssignmentDTO.java new file mode 100644 index 0000000..94c5183 --- /dev/null +++ b/src/main/java/com/xuxd/kafka/console/beans/dto/ProposedAssignmentDTO.java @@ -0,0 +1,18 @@ +package com.xuxd.kafka.console.beans.dto; + +import java.util.List; +import lombok.Data; + +/** + * kafka-console-ui. + * + * @author xuxd + * @date 2022-02-15 19:08:13 + **/ +@Data +public class ProposedAssignmentDTO { + + private String topic; + + private List brokers; +} diff --git a/src/main/java/com/xuxd/kafka/console/controller/OperationController.java b/src/main/java/com/xuxd/kafka/console/controller/OperationController.java index 5753652..cf5e509 100644 --- a/src/main/java/com/xuxd/kafka/console/controller/OperationController.java +++ b/src/main/java/com/xuxd/kafka/console/controller/OperationController.java @@ -2,6 +2,7 @@ package com.xuxd.kafka.console.controller; import com.xuxd.kafka.console.beans.TopicPartition; import com.xuxd.kafka.console.beans.dto.BrokerThrottleDTO; +import com.xuxd.kafka.console.beans.dto.ProposedAssignmentDTO; import com.xuxd.kafka.console.beans.dto.ReplicationDTO; import com.xuxd.kafka.console.beans.dto.SyncDataDTO; import com.xuxd.kafka.console.service.OperationService; @@ -74,4 +75,9 @@ public class OperationController { public Object cancelReassignment(@RequestBody TopicPartition partition) { return operationService.cancelReassignment(new org.apache.kafka.common.TopicPartition(partition.getTopic(), partition.getPartition())); } + + @PostMapping("/replication/reassignments/proposed") + public Object proposedAssignments(@RequestBody ProposedAssignmentDTO dto) { + return operationService.proposedAssignments(dto.getTopic(), dto.getBrokers()); + } } diff --git a/src/main/java/com/xuxd/kafka/console/service/OperationService.java b/src/main/java/com/xuxd/kafka/console/service/OperationService.java index a40e7eb..66b2a2c 100644 --- a/src/main/java/com/xuxd/kafka/console/service/OperationService.java +++ b/src/main/java/com/xuxd/kafka/console/service/OperationService.java @@ -30,4 +30,6 @@ public interface OperationService { ResponseData currentReassignments(); ResponseData cancelReassignment(TopicPartition partition); + + ResponseData proposedAssignments(String topic, List brokerList); } diff --git a/src/main/java/com/xuxd/kafka/console/service/impl/OperationServiceImpl.java b/src/main/java/com/xuxd/kafka/console/service/impl/OperationServiceImpl.java index 37cc84c..b5a9343 100644 --- a/src/main/java/com/xuxd/kafka/console/service/impl/OperationServiceImpl.java +++ b/src/main/java/com/xuxd/kafka/console/service/impl/OperationServiceImpl.java @@ -1,6 +1,7 @@ package com.xuxd.kafka.console.service.impl; import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; +import com.google.common.collect.Lists; import com.google.gson.Gson; import com.google.gson.JsonObject; import com.xuxd.kafka.console.beans.ResponseData; @@ -10,6 +11,7 @@ import com.xuxd.kafka.console.beans.vo.OffsetAlignmentVO; import com.xuxd.kafka.console.dao.MinOffsetAlignmentMapper; import com.xuxd.kafka.console.service.OperationService; import com.xuxd.kafka.console.utils.GsonUtil; +import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -19,6 +21,7 @@ import java.util.Properties; import java.util.Set; import java.util.stream.Collectors; import kafka.console.OperationConsole; +import org.apache.commons.lang3.StringUtils; import org.apache.kafka.clients.admin.PartitionReassignment; import org.apache.kafka.common.TopicPartition; import org.springframework.beans.factory.ObjectProvider; @@ -162,4 +165,21 @@ public class OperationServiceImpl implements OperationService { } return ResponseData.create().success(); } + + @Override public ResponseData proposedAssignments(String topic, List brokerList) { + Map params = new HashMap<>(); + params.put("version", 1); + Map topicMap = new HashMap<>(1, 1.0f); + topicMap.put("topic", topic); + params.put("topics", Lists.newArrayList(topicMap)); + List list = brokerList.stream().map(String::valueOf).collect(Collectors.toList()); + Map> assignments = operationConsole.proposedAssignments(gson.toJson(params), StringUtils.join(list, ",")); + List res = new ArrayList<>(assignments.size()); + assignments.forEach((tp, replicas) -> { + CurrentReassignmentVO vo = new CurrentReassignmentVO(tp.topic(), tp.partition(), + replicas.stream().map(x -> (Integer) x).collect(Collectors.toList()), null, null); + res.add(vo); + }); + return ResponseData.create().data(res).success(); + } } diff --git a/src/main/scala/kafka/console/OperationConsole.scala b/src/main/scala/kafka/console/OperationConsole.scala index 1438f38..4a7fd47 100644 --- a/src/main/scala/kafka/console/OperationConsole.scala +++ b/src/main/scala/kafka/console/OperationConsole.scala @@ -256,4 +256,20 @@ class OperationConsole(config: KafkaConfig, topicConsole: TopicConsole, throw e }).asInstanceOf[util.Map[TopicPartition, Throwable]] } + + def proposedAssignments(reassignmentJson: String, + brokerListString: String): util.Map[TopicPartition, util.List[Int]] = { + withAdminClientAndCatchError(admin => { + val map = ReassignPartitionsCommand.generateAssignment(admin, reassignmentJson, brokerListString, true)._1 + val res = new util.HashMap[TopicPartition, util.List[Int]]() + for (tp <- map.keys) { + res.put(tp, map(tp).asJava) +// res.put(tp, map.getOrElse(tp, Seq.empty).asJava) + } + res + }, e => { + log.error("proposedAssignments error.", e) + throw e + }) + }.asInstanceOf[util.Map[TopicPartition, util.List[Int]]] } \ No newline at end of file diff --git a/ui/src/utils/api.js b/ui/src/utils/api.js index ae88c1e..123641c 100644 --- a/ui/src/utils/api.js +++ b/ui/src/utils/api.js @@ -246,6 +246,10 @@ export const KafkaOpApi = { url: "/op/replication/reassignments", method: "delete", }, + proposedAssignment: { + url: "/op/replication/reassignments/proposed", + method: "post", + }, }; export const KafkaMessageApi = { searchByTime: { diff --git a/ui/src/views/Home.vue b/ui/src/views/Home.vue index 071c19b..55eac8e 100644 --- a/ui/src/views/Home.vue +++ b/ui/src/views/Home.vue @@ -6,22 +6,24 @@


kafka API 版本兼容性

- -
- 详情 - -
-
+ + +
+ 详情 + +
+
+
{ + this.apiVersionInfoLoading = false; if (res.code == 0) { this.brokerApiVersionInfo = res.data; } else { diff --git a/ui/src/views/op/Operation.vue b/ui/src/views/op/Operation.vue index 847ffac..74e4c58 100644 --- a/ui/src/views/op/Operation.vue +++ b/ui/src/views/op/Operation.vue @@ -49,6 +49,15 @@ 查看正在进行副本变更/重分配的任务,或者将其取消

+

+ + 副本重分配 + + + 副本所在节点重新分配,打个比方,集群有6个节点,分区1的3个副本在节点1、2、3上,现在将它们重新分配到3、4、5上 +

@@ -125,6 +134,11 @@ :visible="clusterManager.showClusterInfoDialog" @closeClusterInfoDialog="closeClusterInfoDialog" > + + @@ -138,6 +152,7 @@ import ConfigThrottle from "@/views/op/ConfigThrottle"; import RemoveThrottle from "@/views/op/RemoveThrottle"; import CurrentReassignments from "@/views/op/CurrentReassignments"; import ClusterInfo from "@/views/op/ClusterInfo"; +import ReplicaReassign from "@/views/op/ReplicaReassign"; export default { name: "Operation", components: { @@ -150,6 +165,7 @@ export default { RemoveThrottle, CurrentReassignments, ClusterInfo, + ReplicaReassign, }, data() { return { @@ -162,6 +178,7 @@ export default { replicationManager: { showElectPreferredLeaderDialog: false, showCurrentReassignmentsDialog: false, + showReplicaReassignDialog: false, }, brokerManager: { showConfigThrottleDialog: false, @@ -227,6 +244,12 @@ export default { closeClusterInfoDialog() { this.clusterManager.showClusterInfoDialog = false; }, + openReplicaReassignDialog() { + this.replicationManager.showReplicaReassignDialog = true; + }, + closeReplicaReassignDialog() { + this.replicationManager.showReplicaReassignDialog = false; + }, }, }; diff --git a/ui/src/views/op/ReplicaReassign.vue b/ui/src/views/op/ReplicaReassign.vue new file mode 100644 index 0000000..f29684e --- /dev/null +++ b/ui/src/views/op/ReplicaReassign.vue @@ -0,0 +1,257 @@ + + + + +