升级kafka版本从2.8.0 -> 3.2.0,增加DockerCompose部署说明

This commit is contained in:
许晓东
2022-06-30 20:11:29 +08:00
parent d062e18940
commit b163e5f776
6 changed files with 77 additions and 57 deletions

View File

@@ -24,7 +24,7 @@ acl配置说明如果kafka集群启用了ACL但是控制台没看到Acl菜
## 安装包下载
点击下载(v1.0.4版本)[kafka-console-ui.zip](https://github.com/xxd763795151/kafka-console-ui/releases/download/v1.0.4/kafka-console-ui.zip)
如果安装包下载的比较慢,可以查看下面的源码打包说明,把代码下载下来,快速打包
如果安装包下载的比较慢,可以查看下面的源码打包说明,把代码下载下来,快速打包不过最新main分支代码刚升级了kafka版本到3.2.0,还没有充分测试,如果需要稳定版本,可以下载 1.0.4-release分支代码
## 快速使用
### Windows
@@ -63,7 +63,7 @@ sh bin/shutdown.sh
在新增集群的时候除了集群地址还可以输入集群的其它属性配置比如请求超时ACL配置等。如果开启了ACL切换到该集群的时候导航栏上便会出现ACL菜单支持进行相关操作目前是基于SASL_SCRAM认证授权管理支持的最完善其它的我也没验证过虽然是我开发的但是我也没具体全部验证这一块功能授权部分应该是通用的
## kafka版本
* 当前使用的kafka 2.8.0
* 当前使用的kafka 3.2.0
## 监控
仅提供运维管理功能监控、告警需要配合其它组件如有需要建议请查看https://blog.csdn.net/x763795151/article/details/119705372
@@ -79,6 +79,9 @@ sh bin/shutdown.sh
如果有需要使用管理台登录认证的,可以切换到这个分支上进行打包,打包方式看 源码打包 说明。
默认登录账户admin/kafka-console-ui521
## DockerCompose部署
感谢@wdkang123 同学分享的部署方式,如果有需要请查看[DockerCompose部署方式](./document/deploy/docker部署.md)
## 联系方式
+ 微信群
<img src="./document/contact/weixin_contact.jpg" width="40%"/>

14
pom.xml
View File

@@ -21,7 +21,7 @@
<ui.path>${project.basedir}/ui</ui.path>
<frontend-maven-plugin.version>1.11.0</frontend-maven-plugin.version>
<compiler.version>1.8</compiler.version>
<kafka.version>2.8.0</kafka.version>
<kafka.version>3.2.0</kafka.version>
<maven.assembly.plugin.version>3.0.0</maven.assembly.plugin.version>
<mybatis-plus-boot-starter.version>3.4.2</mybatis-plus-boot-starter.version>
<scala.version>2.13.6</scala.version>
@@ -76,6 +76,18 @@
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.13</artifactId>
<version>${kafka.version}</version>
<exclusions>
<exclusion>
<groupId>com.typesafe.scala-logging</groupId>
<artifactId>scala-logging_2.13</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.typesafe.scala-logging</groupId>
<artifactId>scala-logging_2.13</artifactId>
<version>3.9.2</version>
</dependency>
<dependency>

View File

@@ -1,18 +1,15 @@
package com.xuxd.kafka.console.beans;
import java.util.Objects;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.common.acl.AccessControlEntry;
import org.apache.kafka.common.acl.AccessControlEntryFilter;
import org.apache.kafka.common.acl.AclBinding;
import org.apache.kafka.common.acl.AclBindingFilter;
import org.apache.kafka.common.acl.AclOperation;
import org.apache.kafka.common.acl.AclPermissionType;
import org.apache.kafka.common.acl.*;
import org.apache.kafka.common.resource.PatternType;
import org.apache.kafka.common.resource.ResourcePattern;
import org.apache.kafka.common.resource.ResourcePatternFilter;
import org.apache.kafka.common.resource.ResourceType;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.common.utils.SecurityUtils;
import java.util.Objects;
/**
* kafka-console-ui.
@@ -41,7 +38,9 @@ public class AclEntry {
entry.setResourceType(binding.pattern().resourceType().name());
entry.setName(binding.pattern().name());
entry.setPatternType(binding.pattern().patternType().name());
entry.setPrincipal(KafkaPrincipal.fromString(binding.entry().principal()).getName());
// entry.setPrincipal(KafkaPrincipal.fromString(binding.entry().principal()).getName());
// 3.x版本使用该方法
entry.setPrincipal(SecurityUtils.parseKafkaPrincipal(binding.entry().principal()).getName());
entry.setHost(binding.entry().host());
entry.setOperation(binding.entry().operation().name());
entry.setPermissionType(binding.entry().permissionType().name());

View File

@@ -18,6 +18,7 @@ import org.apache.kafka.common.network.Selector
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests._
import org.apache.kafka.common.utils.{KafkaThread, LogContext, Time}
import org.slf4j.{Logger, LoggerFactory}
import java.io.IOException
import java.util.Properties
@@ -34,7 +35,9 @@ import scala.util.{Failure, Success, Try}
* @author xuxd
* @date 2022-01-22 15:15:57
* */
object BrokerApiVersion extends Logging {
object BrokerApiVersion{
protected lazy val log : Logger = LoggerFactory.getLogger(this.getClass)
def listAllBrokerApiVersionInfo(): java.util.HashMap[Node, NodeApiVersions] = {
val res = new java.util.HashMap[Node, NodeApiVersions]()
@@ -48,7 +51,7 @@ object BrokerApiVersion extends Logging {
case Success(v) => {
res.put(broker, v)
}
case Failure(v) => logger.error(s"${broker} -> ERROR: ${v}\n")
case Failure(v) => log.error(s"${broker} -> ERROR: ${v}\n")
}
}
} finally {
@@ -149,12 +152,12 @@ object BrokerApiVersion extends Logging {
val response = sendAnyNode(request).asInstanceOf[MetadataResponse]
val errors = response.errors
if (!errors.isEmpty) {
logger.info(s"Metadata request contained errors: $errors")
log.info(s"Metadata request contained errors: $errors")
}
// 在3.x版本中这个方法是buildCluster 代替cluster()了
// response.buildCluster.nodes.asScala.toList
response.cluster().nodes.asScala.toList
response.buildCluster.nodes.asScala.toList
// response.cluster().nodes.asScala.toList
}
def listAllBrokerVersionInfo(): Map[Node, Try[NodeApiVersions]] =
@@ -277,40 +280,40 @@ object BrokerApiVersion extends Logging {
// 版本不一样,这个地方的兼容性问题也不一样了
// 3.x版本用这个
// val networkClient = new NetworkClient(
// selector,
// metadata,
// clientId,
// DefaultMaxInFlightRequestsPerConnection,
// DefaultReconnectBackoffMs,
// DefaultReconnectBackoffMax,
// DefaultSendBufferBytes,
// DefaultReceiveBufferBytes,
// requestTimeoutMs,
// connectionSetupTimeoutMs,
// connectionSetupTimeoutMaxMs,
// time,
// true,
// new ApiVersions,
// logContext)
val networkClient = new NetworkClient(
selector,
metadata,
clientId,
DefaultMaxInFlightRequestsPerConnection,
DefaultReconnectBackoffMs,
DefaultReconnectBackoffMax,
DefaultSendBufferBytes,
DefaultReceiveBufferBytes,
requestTimeoutMs,
connectionSetupTimeoutMs,
connectionSetupTimeoutMaxMs,
time,
true,
new ApiVersions,
logContext)
val networkClient = new NetworkClient(
selector,
metadata,
clientId,
DefaultMaxInFlightRequestsPerConnection,
DefaultReconnectBackoffMs,
DefaultReconnectBackoffMax,
DefaultSendBufferBytes,
DefaultReceiveBufferBytes,
requestTimeoutMs,
connectionSetupTimeoutMs,
connectionSetupTimeoutMaxMs,
ClientDnsLookup.USE_ALL_DNS_IPS,
time,
true,
new ApiVersions,
logContext)
// val networkClient = new NetworkClient(
// selector,
// metadata,
// clientId,
// DefaultMaxInFlightRequestsPerConnection,
// DefaultReconnectBackoffMs,
// DefaultReconnectBackoffMax,
// DefaultSendBufferBytes,
// DefaultReceiveBufferBytes,
// requestTimeoutMs,
// connectionSetupTimeoutMs,
// connectionSetupTimeoutMaxMs,
// ClientDnsLookup.USE_ALL_DNS_IPS,
// time,
// true,
// new ApiVersions,
// logContext)
val highLevelClient = new ConsumerNetworkClient(
logContext,

View File

@@ -91,14 +91,17 @@ class KafkaConsole(config: KafkaConfig) {
}
}
@Deprecated
protected def withZKClient(f: AdminZkClient => Any): Any = {
val zkClient = KafkaZkClient(config.getZookeeperAddr, false, 30000, 30000, Int.MaxValue, Time.SYSTEM)
val adminZkClient = new AdminZkClient(zkClient)
try {
f(adminZkClient)
} finally {
zkClient.close()
}
// val zkClient = KafkaZkClient(config.getZookeeperAddr, false, 30000, 30000, Int.MaxValue, Time.SYSTEM)
// 3.x
// val zkClient = KafkaZkClient(config.getZookeeperAddr, false, 30000, 30000, Int.MaxValue, Time.SYSTEM, new ZKClientConfig(), "KafkaZkClient")
// val adminZkClient = new AdminZkClient(zkClient)
// try {
// f(adminZkClient)
// } finally {
// zkClient.close()
// }
}
protected def createAdminClient(props: Properties): Admin = {

View File

@@ -127,7 +127,7 @@ class MessageConsole(config: KafkaConfig) extends KafkaConsole(config: KafkaConf
record.offset(),
record.timestamp(),
record.timestampType(),
record.checksum(),
// record.checksum(),
record.serializedKeySize(),
record.serializedValueSize(),
record.key(),