topic 限流
This commit is contained in:
@@ -172,7 +172,7 @@ class TopicConsole(config: KafkaConfig) extends KafkaConsole(config: KafkaConfig
|
||||
|
||||
if (interBrokerThrottle >= 0) {
|
||||
val moveMap = calculateProposedMoveMap(currentReassignments, proposedParts, currentParts)
|
||||
modifyReassignmentThrottle(adminClient, moveMap, interBrokerThrottle)
|
||||
modifyReassignmentThrottle(adminClient, moveMap)
|
||||
}
|
||||
|
||||
if (logDirThrottle >= 0) {
|
||||
@@ -195,6 +195,35 @@ class TopicConsole(config: KafkaConfig) extends KafkaConsole(config: KafkaConfig
|
||||
}
|
||||
}
|
||||
|
||||
def configThrottle(topic: String, partitions: util.List[Integer]): (Boolean, String) = {
|
||||
withAdminClientAndCatchError(admin => {
|
||||
val throttles = {
|
||||
if (partitions.get(0) == -1) {
|
||||
Map(topic -> "*")
|
||||
} else {
|
||||
val topicDescription = admin.describeTopics(Collections.singleton(topic), withTimeoutMs(new DescribeTopicsOptions))
|
||||
.all().get().values().asScala.toList
|
||||
|
||||
def convert(partition: Integer, replicas: scala.List[Int]): String = {
|
||||
replicas.map("%d:%d".format(partition, _)).toSet.mkString(",")
|
||||
}
|
||||
|
||||
val ptor = topicDescription.head.partitions().asScala.map(info => (info.partition(), info.replicas().asScala.map(_.id()))).toMap
|
||||
val conf = partitions.asScala.map(partition => convert(partition, ptor.get(partition) match {
|
||||
case Some(v) => v.toList
|
||||
case None => throw new IllegalArgumentException
|
||||
})).toList
|
||||
Map(topic -> conf.mkString(","))
|
||||
}
|
||||
}
|
||||
modifyTopicThrottles(admin, throttles, throttles)
|
||||
(true, "")
|
||||
}, e => {
|
||||
log.error("configThrottle error, ", e)
|
||||
(false, e.getMessage)
|
||||
}).asInstanceOf[(Boolean, String)]
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the current replica assignments for some topics.
|
||||
*
|
||||
@@ -242,12 +271,9 @@ class TopicConsole(config: KafkaConfig) extends KafkaConsole(config: KafkaConfig
|
||||
}
|
||||
}
|
||||
|
||||
private def modifyReassignmentThrottle(admin: Admin, moveMap: MoveMap, interBrokerThrottle: Long): Unit = {
|
||||
private def modifyReassignmentThrottle(admin: Admin, moveMap: MoveMap): Unit = {
|
||||
val leaderThrottles = calculateLeaderThrottles(moveMap)
|
||||
val followerThrottles = calculateFollowerThrottles(moveMap)
|
||||
modifyTopicThrottles(admin, leaderThrottles, followerThrottles)
|
||||
|
||||
// val reassigningBrokers = calculateReassigningBrokers(moveMap)
|
||||
// modifyInterBrokerThrottle(admin, reassigningBrokers, interBrokerThrottle)
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user