preferred as leader.

This commit is contained in:
许晓东
2021-11-10 20:45:15 +08:00
parent ce88ddc4b5
commit efe4a59c7e
11 changed files with 261 additions and 16 deletions

View File

@@ -4,11 +4,12 @@ import java.util.concurrent.TimeUnit
import java.util.{Collections, Properties}
import com.xuxd.kafka.console.config.KafkaConfig
import org.apache.kafka.clients.admin.ElectLeadersOptions
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.ByteArrayDeserializer
import org.apache.kafka.common.{ElectionType, TopicPartition}
import scala.jdk.CollectionConverters.{CollectionHasAsScala, ListHasAsScala, MapHasAsScala, SeqHasAsJava, SetHasAsScala}
import scala.jdk.CollectionConverters.{CollectionHasAsScala, ListHasAsScala, MapHasAsScala, SeqHasAsJava, SetHasAsJava, SetHasAsScala}
/**
* kafka-console-ui.
@@ -194,4 +195,19 @@ class OperationConsole(config: KafkaConfig, topicConsole: TopicConsole,
thatConsumer.close()
}
}
def electPreferredLeader(partitions: util.Set[TopicPartition]): (Boolean, String) = {
withAdminClientAndCatchError(admin => {
admin.electLeaders(ElectionType.PREFERRED, partitions, withTimeoutMs(new ElectLeadersOptions)).all().get()
(true, "")
}, e => {
log.error("alter config error.", e)
(false, e.getMessage)
}).asInstanceOf[(Boolean, String)]
}
def getTopicPartitions(topic: String): util.Set[TopicPartition] = {
val topicList = topicConsole.getTopicList(Collections.singleton(topic))
topicList.asScala.flatMap(_.partitions().asScala.map(t => new TopicPartition(topic, t.partition()))).toSet.asJava
}
}