diff --git a/README.md b/README.md index 85f210d..bf3d923 100644 --- a/README.md +++ b/README.md @@ -24,7 +24,7 @@ acl配置说明,如果kafka集群启用了ACL,但是控制台没看到Acl菜 ## 安装包下载 点击下载(v1.0.4版本):[kafka-console-ui.zip](https://github.com/xxd763795151/kafka-console-ui/releases/download/v1.0.4/kafka-console-ui.zip) -如果安装包下载的比较慢,可以查看下面的源码打包说明,把代码下载下来,快速打包 +如果安装包下载的比较慢,可以查看下面的源码打包说明,把代码下载下来,快速打包,不过最新main分支代码刚升级了kafka版本到3.2.0,还没有充分测试,如果需要稳定版本,可以下载 1.0.4-release分支代码 ## 快速使用 ### Windows @@ -63,7 +63,7 @@ sh bin/shutdown.sh 在新增集群的时候,除了集群地址还可以输入集群的其它属性配置,比如请求超时,ACL配置等。如果开启了ACL,切换到该集群的时候,导航栏上便会出现ACL菜单,支持进行相关操作(目前是基于SASL_SCRAM认证授权管理支持的最完善,其它的我也没验证过,虽然是我开发的,但是我也没具体全部验证这一块功能,授权部分应该是通用的) ## kafka版本 -* 当前使用的kafka 2.8.0 +* 当前使用的kafka 3.2.0 ## 监控 仅提供运维管理功能,监控、告警需要配合其它组件,如有需要,建议请查看:https://blog.csdn.net/x763795151/article/details/119705372 @@ -79,6 +79,9 @@ sh bin/shutdown.sh 如果有需要使用管理台登录认证的,可以切换到这个分支上进行打包,打包方式看 源码打包 说明。 默认登录账户:admin/kafka-console-ui521 +## DockerCompose部署 +感谢@wdkang123 同学分享的部署方式,如果有需要请查看[DockerCompose部署方式](./document/deploy/docker部署.md) + ## 联系方式 + 微信群 diff --git a/pom.xml b/pom.xml index 3d62f14..3b56fc9 100644 --- a/pom.xml +++ b/pom.xml @@ -21,7 +21,7 @@ ${project.basedir}/ui 1.11.0 1.8 - 2.8.0 + 3.2.0 3.0.0 3.4.2 2.13.6 @@ -76,6 +76,18 @@ org.apache.kafka kafka_2.13 ${kafka.version} + + + com.typesafe.scala-logging + scala-logging_2.13 + + + + + + com.typesafe.scala-logging + scala-logging_2.13 + 3.9.2 diff --git a/src/main/java/com/xuxd/kafka/console/beans/AclEntry.java b/src/main/java/com/xuxd/kafka/console/beans/AclEntry.java index 3d392d0..e4901b3 100644 --- a/src/main/java/com/xuxd/kafka/console/beans/AclEntry.java +++ b/src/main/java/com/xuxd/kafka/console/beans/AclEntry.java @@ -1,18 +1,15 @@ package com.xuxd.kafka.console.beans; -import java.util.Objects; import org.apache.commons.lang3.StringUtils; -import org.apache.kafka.common.acl.AccessControlEntry; -import org.apache.kafka.common.acl.AccessControlEntryFilter; -import org.apache.kafka.common.acl.AclBinding; -import org.apache.kafka.common.acl.AclBindingFilter; -import org.apache.kafka.common.acl.AclOperation; -import org.apache.kafka.common.acl.AclPermissionType; +import org.apache.kafka.common.acl.*; import org.apache.kafka.common.resource.PatternType; import org.apache.kafka.common.resource.ResourcePattern; import org.apache.kafka.common.resource.ResourcePatternFilter; import org.apache.kafka.common.resource.ResourceType; import org.apache.kafka.common.security.auth.KafkaPrincipal; +import org.apache.kafka.common.utils.SecurityUtils; + +import java.util.Objects; /** * kafka-console-ui. @@ -41,7 +38,9 @@ public class AclEntry { entry.setResourceType(binding.pattern().resourceType().name()); entry.setName(binding.pattern().name()); entry.setPatternType(binding.pattern().patternType().name()); - entry.setPrincipal(KafkaPrincipal.fromString(binding.entry().principal()).getName()); +// entry.setPrincipal(KafkaPrincipal.fromString(binding.entry().principal()).getName()); + // 3.x版本使用该方法 + entry.setPrincipal(SecurityUtils.parseKafkaPrincipal(binding.entry().principal()).getName()); entry.setHost(binding.entry().host()); entry.setOperation(binding.entry().operation().name()); entry.setPermissionType(binding.entry().permissionType().name()); diff --git a/src/main/scala/kafka/console/BrokerApiVersion.scala b/src/main/scala/kafka/console/BrokerApiVersion.scala index 173b052..6fe1a45 100644 --- a/src/main/scala/kafka/console/BrokerApiVersion.scala +++ b/src/main/scala/kafka/console/BrokerApiVersion.scala @@ -18,6 +18,7 @@ import org.apache.kafka.common.network.Selector import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.requests._ import org.apache.kafka.common.utils.{KafkaThread, LogContext, Time} +import org.slf4j.{Logger, LoggerFactory} import java.io.IOException import java.util.Properties @@ -34,7 +35,9 @@ import scala.util.{Failure, Success, Try} * @author xuxd * @date 2022-01-22 15:15:57 * */ -object BrokerApiVersion extends Logging { +object BrokerApiVersion{ + + protected lazy val log : Logger = LoggerFactory.getLogger(this.getClass) def listAllBrokerApiVersionInfo(): java.util.HashMap[Node, NodeApiVersions] = { val res = new java.util.HashMap[Node, NodeApiVersions]() @@ -48,7 +51,7 @@ object BrokerApiVersion extends Logging { case Success(v) => { res.put(broker, v) } - case Failure(v) => logger.error(s"${broker} -> ERROR: ${v}\n") + case Failure(v) => log.error(s"${broker} -> ERROR: ${v}\n") } } } finally { @@ -149,12 +152,12 @@ object BrokerApiVersion extends Logging { val response = sendAnyNode(request).asInstanceOf[MetadataResponse] val errors = response.errors if (!errors.isEmpty) { - logger.info(s"Metadata request contained errors: $errors") + log.info(s"Metadata request contained errors: $errors") } // 在3.x版本中这个方法是buildCluster 代替cluster()了 - // response.buildCluster.nodes.asScala.toList - response.cluster().nodes.asScala.toList + response.buildCluster.nodes.asScala.toList +// response.cluster().nodes.asScala.toList } def listAllBrokerVersionInfo(): Map[Node, Try[NodeApiVersions]] = @@ -277,40 +280,40 @@ object BrokerApiVersion extends Logging { // 版本不一样,这个地方的兼容性问题也不一样了 // 3.x版本用这个 - // val networkClient = new NetworkClient( - // selector, - // metadata, - // clientId, - // DefaultMaxInFlightRequestsPerConnection, - // DefaultReconnectBackoffMs, - // DefaultReconnectBackoffMax, - // DefaultSendBufferBytes, - // DefaultReceiveBufferBytes, - // requestTimeoutMs, - // connectionSetupTimeoutMs, - // connectionSetupTimeoutMaxMs, - // time, - // true, - // new ApiVersions, - // logContext) + val networkClient = new NetworkClient( + selector, + metadata, + clientId, + DefaultMaxInFlightRequestsPerConnection, + DefaultReconnectBackoffMs, + DefaultReconnectBackoffMax, + DefaultSendBufferBytes, + DefaultReceiveBufferBytes, + requestTimeoutMs, + connectionSetupTimeoutMs, + connectionSetupTimeoutMaxMs, + time, + true, + new ApiVersions, + logContext) - val networkClient = new NetworkClient( - selector, - metadata, - clientId, - DefaultMaxInFlightRequestsPerConnection, - DefaultReconnectBackoffMs, - DefaultReconnectBackoffMax, - DefaultSendBufferBytes, - DefaultReceiveBufferBytes, - requestTimeoutMs, - connectionSetupTimeoutMs, - connectionSetupTimeoutMaxMs, - ClientDnsLookup.USE_ALL_DNS_IPS, - time, - true, - new ApiVersions, - logContext) +// val networkClient = new NetworkClient( +// selector, +// metadata, +// clientId, +// DefaultMaxInFlightRequestsPerConnection, +// DefaultReconnectBackoffMs, +// DefaultReconnectBackoffMax, +// DefaultSendBufferBytes, +// DefaultReceiveBufferBytes, +// requestTimeoutMs, +// connectionSetupTimeoutMs, +// connectionSetupTimeoutMaxMs, +// ClientDnsLookup.USE_ALL_DNS_IPS, +// time, +// true, +// new ApiVersions, +// logContext) val highLevelClient = new ConsumerNetworkClient( logContext, diff --git a/src/main/scala/kafka/console/KafkaConsole.scala b/src/main/scala/kafka/console/KafkaConsole.scala index 88f01a7..3b64314 100644 --- a/src/main/scala/kafka/console/KafkaConsole.scala +++ b/src/main/scala/kafka/console/KafkaConsole.scala @@ -91,14 +91,17 @@ class KafkaConsole(config: KafkaConfig) { } } + @Deprecated protected def withZKClient(f: AdminZkClient => Any): Any = { - val zkClient = KafkaZkClient(config.getZookeeperAddr, false, 30000, 30000, Int.MaxValue, Time.SYSTEM) - val adminZkClient = new AdminZkClient(zkClient) - try { - f(adminZkClient) - } finally { - zkClient.close() - } +// val zkClient = KafkaZkClient(config.getZookeeperAddr, false, 30000, 30000, Int.MaxValue, Time.SYSTEM) + // 3.x +// val zkClient = KafkaZkClient(config.getZookeeperAddr, false, 30000, 30000, Int.MaxValue, Time.SYSTEM, new ZKClientConfig(), "KafkaZkClient") +// val adminZkClient = new AdminZkClient(zkClient) +// try { +// f(adminZkClient) +// } finally { +// zkClient.close() +// } } protected def createAdminClient(props: Properties): Admin = { diff --git a/src/main/scala/kafka/console/MessageConsole.scala b/src/main/scala/kafka/console/MessageConsole.scala index 7fb3fe9..2ee9446 100644 --- a/src/main/scala/kafka/console/MessageConsole.scala +++ b/src/main/scala/kafka/console/MessageConsole.scala @@ -127,7 +127,7 @@ class MessageConsole(config: KafkaConfig) extends KafkaConsole(config: KafkaConf record.offset(), record.timestamp(), record.timestampType(), - record.checksum(), +// record.checksum(), record.serializedKeySize(), record.serializedValueSize(), record.key(),