变更副本

This commit is contained in:
许晓东
2021-11-23 19:59:52 +08:00
parent a219551802
commit 62569c4454
7 changed files with 129 additions and 4 deletions

View File

@@ -1,10 +1,11 @@
package kafka.console
import com.xuxd.kafka.console.config.KafkaConfig
import kafka.admin.ReassignPartitionsCommand.compareTopicPartitions
import kafka.admin.ReassignPartitionsCommand._
import kafka.utils.Json
import org.apache.kafka.clients.admin._
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException
import org.apache.kafka.common.utils.Time
import org.apache.kafka.common.{TopicPartition, TopicPartitionReplica}
import java.util
@@ -135,6 +136,65 @@ class TopicConsole(config: KafkaConfig) extends KafkaConsole(config: KafkaConfig
}).asInstanceOf[(Boolean, String)]
}
def updateReplicas(reassignmentJson: String, interBrokerThrottle: Long = -1L): (Boolean, String) = {
withAdminClientAndCatchError(admin => {
executeAssignment(admin, reassignmentJson, interBrokerThrottle)
(true, "")
}, e => {
log.error("executeAssignment error, ", e)
(false, e.getMessage)
}).asInstanceOf[(Boolean, String)]
}
/**
* Copy and modify from @{link kafka.admin.ReassignPartitionsCommand#executeAssignment}.
*/
def executeAssignment(adminClient: Admin,
reassignmentJson: String,
interBrokerThrottle: Long = -1L,
logDirThrottle: Long = -1L,
timeoutMs: Long = 30000L,
time: Time = Time.SYSTEM): Unit = {
val (proposedParts, proposedReplicas) = parseExecuteAssignmentArgs(reassignmentJson)
val currentReassignments = adminClient.
listPartitionReassignments().reassignments().get().asScala
// If there is an existing assignment
// This helps avoid surprising users.
if (currentReassignments.nonEmpty) {
throw new TerseReassignmentFailureException("Cannot execute because there is an existing partition assignment.")
}
verifyBrokerIds(adminClient, proposedParts.values.flatten.toSet)
val currentParts = getReplicaAssignmentForPartitions(adminClient, proposedParts.keySet.toSet)
log.info("currentPartitionReplicaAssignment: " + currentPartitionReplicaAssignmentToString(proposedParts, currentParts))
log.info(s"newPartitionReplicaAssignment: $reassignmentJson")
if (interBrokerThrottle >= 0 || logDirThrottle >= 0) {
if (interBrokerThrottle >= 0) {
val moveMap = calculateProposedMoveMap(currentReassignments, proposedParts, currentParts)
modifyReassignmentThrottle(adminClient, moveMap, interBrokerThrottle)
}
if (logDirThrottle >= 0) {
val movingBrokers = calculateMovingBrokers(proposedReplicas.keySet.toSet)
modifyLogDirThrottle(adminClient, movingBrokers, logDirThrottle)
}
}
// Execute the partition reassignments.
val errors = alterPartitionReassignments(adminClient, proposedParts)
if (errors.nonEmpty) {
throw new TerseReassignmentFailureException(
"Error reassigning partition(s):%n%s".format(
errors.keySet.toBuffer.sortWith(compareTopicPartitions).map { part =>
s"$part: ${errors(part).getMessage}"
}.mkString(System.lineSeparator())))
}
if (proposedReplicas.nonEmpty) {
executeMoves(adminClient, proposedReplicas, timeoutMs, time)
}
}
/**
* Get the current replica assignments for some topics.
*
@@ -181,4 +241,13 @@ class TopicConsole(config: KafkaConfig) extends KafkaConsole(config: KafkaConfig
}
}
}
private def modifyReassignmentThrottle(admin: Admin, moveMap: MoveMap, interBrokerThrottle: Long): Unit = {
val leaderThrottles = calculateLeaderThrottles(moveMap)
val followerThrottles = calculateFollowerThrottles(moveMap)
modifyTopicThrottles(admin, leaderThrottles, followerThrottles)
// val reassigningBrokers = calculateReassigningBrokers(moveMap)
// modifyInterBrokerThrottle(admin, reassigningBrokers, interBrokerThrottle)
}
}