Files
kafka-console-ui/src/main/scala/kafka/console/TopicConsole.scala
2021-10-18 20:53:25 +08:00

125 lines
4.4 KiB
Scala

package kafka.console
import java.util
import java.util.concurrent.TimeUnit
import java.util.{Collections, List, Set}
import com.xuxd.kafka.console.config.KafkaConfig
import org.apache.kafka.clients.admin._
import org.apache.kafka.common.TopicPartition
import scala.jdk.CollectionConverters.{CollectionHasAsScala, SetHasAsJava}
/**
* kafka-console-ui.
*
* @author xuxd
* @date 2021-09-08 19:52:27
* */
class TopicConsole(config: KafkaConfig) extends KafkaConsole(config: KafkaConfig) with Logging {
/**
* get all topic name set.
*
* @return all topic name set.
*/
def getTopicNameList(internal: Boolean = true): Set[String] = {
withAdminClientAndCatchError(admin => admin.listTopics(new ListTopicsOptions().listInternal(internal)).names()
.get(3000, TimeUnit.MILLISECONDS),
e => {
log.error("listTopics error.", e)
Collections.emptySet()
}).asInstanceOf[Set[String]]
}
/**
* get all internal topic name set.
*
* @return internal topic name set.
*/
def getInternalTopicNameList(): Set[String] = {
withAdminClientAndCatchError(admin => admin.listTopics(new ListTopicsOptions().listInternal(true)).listings()
.get(3000, TimeUnit.MILLISECONDS).asScala.filter(_.isInternal).map(_.name()).toSet[String].asJava,
e => {
log.error("listInternalTopics error.", e)
Collections.emptySet()
}).asInstanceOf[Set[String]]
}
def getTopicList(topics: Set[String]): List[TopicDescription] = {
if (topics == null || topics.isEmpty) {
Collections.emptyList()
} else {
withAdminClientAndCatchError(admin => new util.ArrayList[TopicDescription](admin.describeTopics(topics).all().get().values()), e => {
log.error("describeTopics error.", e)
Collections.emptyList()
}).asInstanceOf[List[TopicDescription]]
}
}
/**
* delete topic by topic name.
*
* @param topic topic name.
* @return result or : fail message.
*/
def deleteTopic(topic: String): (Boolean, String) = {
withAdminClientAndCatchError(admin => {
admin.deleteTopics(Collections.singleton(topic), new DeleteTopicsOptions().retryOnQuotaViolation(false)).all().get(timeoutMs, TimeUnit.MILLISECONDS)
(true, "")
},
e => {
log.error("delete topic error, topic: " + topic, e)
(false, e.getMessage)
}).asInstanceOf[(Boolean, String)]
}
/**
* get topic begin offset and end offset.
*
* @param topic topic name.
* @param partitions topic partition info list.
* @return partition -> begin offset and end offset.
*/
def getTopicOffset(topic: String,
partitions: List[TopicPartition]): (util.Map[TopicPartition, Long], util.Map[TopicPartition, Long]) = {
withConsumerAndCatchError(consumer => {
val beginOffsets = consumer.beginningOffsets(partitions)
val endOffsets = consumer.endOffsets(partitions)
(beginOffsets, endOffsets)
}, e => {
log.error("getTopicOffset error, topic: " + topic, e)
(Collections.emptyMap(), Collections.emptyMap())
}).asInstanceOf[(util.Map[TopicPartition, Long], util.Map[TopicPartition, Long])]
}
/**
* create topic.
*/
def createTopic(topic: NewTopic): (Boolean, String) = {
withAdminClientAndCatchError(admin => {
val createResult = admin.createTopics(Collections.singleton(topic), new CreateTopicsOptions().retryOnQuotaViolation(false))
createResult.all().get(timeoutMs, TimeUnit.MILLISECONDS)
(true, "")
}, e => {
log.error("create topic error, topic: " + topic.name(), e)
(false, e.getMessage)
}).asInstanceOf[(Boolean, String)]
}
/**
* create new partition.
*/
def createPartitions(newPartitions: util.Map[String, NewPartitions]): (Boolean, String) = {
withAdminClientAndCatchError(admin => {
admin.createPartitions(newPartitions,
new CreatePartitionsOptions().retryOnQuotaViolation(false)).all().get(timeoutMs, TimeUnit.MILLISECONDS)
(true, "")
}, e => {
log.error("create partition error, ", e)
(false, e.getMessage)
}).asInstanceOf[(Boolean, String)]
}
}