diff --git a/src/main/java/com/xuxd/kafka/console/beans/ReplicaAssignment.java b/src/main/java/com/xuxd/kafka/console/beans/ReplicaAssignment.java index d6856a2..9a555e2 100644 --- a/src/main/java/com/xuxd/kafka/console/beans/ReplicaAssignment.java +++ b/src/main/java/com/xuxd/kafka/console/beans/ReplicaAssignment.java @@ -16,6 +16,8 @@ public class ReplicaAssignment { private List partitions; + private long interBrokerThrottle = -1; + @Data static class Partition { private String topic; diff --git a/src/main/java/com/xuxd/kafka/console/controller/TopicController.java b/src/main/java/com/xuxd/kafka/console/controller/TopicController.java index 5a9f8b1..4a87e57 100644 --- a/src/main/java/com/xuxd/kafka/console/controller/TopicController.java +++ b/src/main/java/com/xuxd/kafka/console/controller/TopicController.java @@ -80,6 +80,6 @@ public class TopicController { @PostMapping("/replica/assignment") public Object updateReplicaAssignment(@RequestBody ReplicaAssignment assignment) { - return "topicService.getCurrentReplicaAssignment(topic)"; + return topicService.updateReplicaAssignment(assignment); } } diff --git a/src/main/java/com/xuxd/kafka/console/service/TopicService.java b/src/main/java/com/xuxd/kafka/console/service/TopicService.java index 99c9e42..ea8d46b 100644 --- a/src/main/java/com/xuxd/kafka/console/service/TopicService.java +++ b/src/main/java/com/xuxd/kafka/console/service/TopicService.java @@ -1,5 +1,6 @@ package com.xuxd.kafka.console.service; +import com.xuxd.kafka.console.beans.ReplicaAssignment; import com.xuxd.kafka.console.beans.ResponseData; import com.xuxd.kafka.console.beans.enums.TopicType; import java.util.List; @@ -28,4 +29,6 @@ public interface TopicService { ResponseData addPartitions(String topic, int addNum, List> newAssignmentst); ResponseData getCurrentReplicaAssignment(String topic); + + ResponseData updateReplicaAssignment(ReplicaAssignment assignment); } diff --git a/src/main/java/com/xuxd/kafka/console/service/impl/TopicServiceImpl.java b/src/main/java/com/xuxd/kafka/console/service/impl/TopicServiceImpl.java index 137fd48..b1884d5 100644 --- a/src/main/java/com/xuxd/kafka/console/service/impl/TopicServiceImpl.java +++ b/src/main/java/com/xuxd/kafka/console/service/impl/TopicServiceImpl.java @@ -142,4 +142,10 @@ public class TopicServiceImpl implements TopicService { return success ? ResponseData.create().data(gson.fromJson(tuple2._2(), ReplicaAssignment.class)).success() : ResponseData.create().failed(tuple2._2()); } + + @Override public ResponseData updateReplicaAssignment(ReplicaAssignment assignment) { + Tuple2 tuple2 = topicConsole.updateReplicas(gson.toJson(assignment), assignment.getInterBrokerThrottle()); + boolean success = (boolean) tuple2._1(); + return success ? ResponseData.create().success() : ResponseData.create().failed(tuple2._2()); + } } diff --git a/src/main/scala/kafka/console/TopicConsole.scala b/src/main/scala/kafka/console/TopicConsole.scala index c8014bd..26a9635 100644 --- a/src/main/scala/kafka/console/TopicConsole.scala +++ b/src/main/scala/kafka/console/TopicConsole.scala @@ -1,10 +1,11 @@ package kafka.console import com.xuxd.kafka.console.config.KafkaConfig -import kafka.admin.ReassignPartitionsCommand.compareTopicPartitions +import kafka.admin.ReassignPartitionsCommand._ import kafka.utils.Json import org.apache.kafka.clients.admin._ import org.apache.kafka.common.errors.UnknownTopicOrPartitionException +import org.apache.kafka.common.utils.Time import org.apache.kafka.common.{TopicPartition, TopicPartitionReplica} import java.util @@ -135,6 +136,65 @@ class TopicConsole(config: KafkaConfig) extends KafkaConsole(config: KafkaConfig }).asInstanceOf[(Boolean, String)] } + def updateReplicas(reassignmentJson: String, interBrokerThrottle: Long = -1L): (Boolean, String) = { + withAdminClientAndCatchError(admin => { + executeAssignment(admin, reassignmentJson, interBrokerThrottle) + (true, "") + }, e => { + log.error("executeAssignment error, ", e) + (false, e.getMessage) + }).asInstanceOf[(Boolean, String)] + } + + /** + * Copy and modify from @{link kafka.admin.ReassignPartitionsCommand#executeAssignment}. + */ + def executeAssignment(adminClient: Admin, + reassignmentJson: String, + interBrokerThrottle: Long = -1L, + logDirThrottle: Long = -1L, + timeoutMs: Long = 30000L, + time: Time = Time.SYSTEM): Unit = { + val (proposedParts, proposedReplicas) = parseExecuteAssignmentArgs(reassignmentJson) + val currentReassignments = adminClient. + listPartitionReassignments().reassignments().get().asScala + // If there is an existing assignment + // This helps avoid surprising users. + if (currentReassignments.nonEmpty) { + throw new TerseReassignmentFailureException("Cannot execute because there is an existing partition assignment.") + } + verifyBrokerIds(adminClient, proposedParts.values.flatten.toSet) + val currentParts = getReplicaAssignmentForPartitions(adminClient, proposedParts.keySet.toSet) + log.info("currentPartitionReplicaAssignment: " + currentPartitionReplicaAssignmentToString(proposedParts, currentParts)) + log.info(s"newPartitionReplicaAssignment: $reassignmentJson") + + if (interBrokerThrottle >= 0 || logDirThrottle >= 0) { + + if (interBrokerThrottle >= 0) { + val moveMap = calculateProposedMoveMap(currentReassignments, proposedParts, currentParts) + modifyReassignmentThrottle(adminClient, moveMap, interBrokerThrottle) + } + + if (logDirThrottle >= 0) { + val movingBrokers = calculateMovingBrokers(proposedReplicas.keySet.toSet) + modifyLogDirThrottle(adminClient, movingBrokers, logDirThrottle) + } + } + + // Execute the partition reassignments. + val errors = alterPartitionReassignments(adminClient, proposedParts) + if (errors.nonEmpty) { + throw new TerseReassignmentFailureException( + "Error reassigning partition(s):%n%s".format( + errors.keySet.toBuffer.sortWith(compareTopicPartitions).map { part => + s"$part: ${errors(part).getMessage}" + }.mkString(System.lineSeparator()))) + } + if (proposedReplicas.nonEmpty) { + executeMoves(adminClient, proposedReplicas, timeoutMs, time) + } + } + /** * Get the current replica assignments for some topics. * @@ -181,4 +241,13 @@ class TopicConsole(config: KafkaConfig) extends KafkaConsole(config: KafkaConfig } } } + + private def modifyReassignmentThrottle(admin: Admin, moveMap: MoveMap, interBrokerThrottle: Long): Unit = { + val leaderThrottles = calculateLeaderThrottles(moveMap) + val followerThrottles = calculateFollowerThrottles(moveMap) + modifyTopicThrottles(admin, leaderThrottles, followerThrottles) + +// val reassigningBrokers = calculateReassigningBrokers(moveMap) +// modifyInterBrokerThrottle(admin, reassigningBrokers, interBrokerThrottle) + } } diff --git a/ui/src/views/op/Operation.vue b/ui/src/views/op/Operation.vue index 97a2c3f..4a23c94 100644 --- a/ui/src/views/op/Operation.vue +++ b/ui/src/views/op/Operation.vue @@ -1,5 +1,21 @@