From 4f305112930e416f1625d61c4dcb0c686fbc788a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=AE=B8=E6=99=93=E4=B8=9C?= <763795151@qq.com> Date: Thu, 2 Sep 2021 16:20:46 +0800 Subject: [PATCH] create amdin user when start. --- pom.xml | 10 ++++- .../kafka/console/config/KafkaConfig.java | 40 +++++++++++++++++++ .../console/service/impl/AclServiceImpl.java | 20 +++++++++- src/main/resources/application.yml | 10 ++++- .../kafka/console/KafkaConfigConsole.scala | 31 +++++++++++++- ui/src/views/acl/Acl.vue | 21 +++++++--- 6 files changed, 121 insertions(+), 11 deletions(-) diff --git a/pom.xml b/pom.xml index 777db59..68114d6 100644 --- a/pom.xml +++ b/pom.xml @@ -21,6 +21,7 @@ ${project.basedir}/ui 1.11.0 1.8 + 2.8.0 @@ -53,10 +54,15 @@ org.apache.kafka kafka-clients - 2.8.0 + ${kafka.version} + + + + org.apache.kafka + kafka_2.13 + ${kafka.version} - org.apache.commons commons-lang3 diff --git a/src/main/java/com/xuxd/kafka/console/config/KafkaConfig.java b/src/main/java/com/xuxd/kafka/console/config/KafkaConfig.java index 0404d9b..8add192 100644 --- a/src/main/java/com/xuxd/kafka/console/config/KafkaConfig.java +++ b/src/main/java/com/xuxd/kafka/console/config/KafkaConfig.java @@ -23,6 +23,14 @@ public class KafkaConfig { private String saslJaasConfig; + private String adminUsername; + + private String adminPassword; + + private boolean adminCreate; + + private String zookeeperAddr; + public String getBootstrapServer() { return bootstrapServer; } @@ -62,4 +70,36 @@ public class KafkaConfig { public void setSaslJaasConfig(String saslJaasConfig) { this.saslJaasConfig = saslJaasConfig; } + + public String getAdminUsername() { + return adminUsername; + } + + public void setAdminUsername(String adminUsername) { + this.adminUsername = adminUsername; + } + + public String getAdminPassword() { + return adminPassword; + } + + public void setAdminPassword(String adminPassword) { + this.adminPassword = adminPassword; + } + + public boolean isAdminCreate() { + return adminCreate; + } + + public void setAdminCreate(boolean adminCreate) { + this.adminCreate = adminCreate; + } + + public String getZookeeperAddr() { + return zookeeperAddr; + } + + public void setZookeeperAddr(String zookeeperAddr) { + this.zookeeperAddr = zookeeperAddr; + } } diff --git a/src/main/java/com/xuxd/kafka/console/service/impl/AclServiceImpl.java b/src/main/java/com/xuxd/kafka/console/service/impl/AclServiceImpl.java index 0f01f49..50bc8e8 100644 --- a/src/main/java/com/xuxd/kafka/console/service/impl/AclServiceImpl.java +++ b/src/main/java/com/xuxd/kafka/console/service/impl/AclServiceImpl.java @@ -4,6 +4,7 @@ import com.xuxd.kafka.console.beans.AclEntry; import com.xuxd.kafka.console.beans.CounterList; import com.xuxd.kafka.console.beans.CounterMap; import com.xuxd.kafka.console.beans.ResponseData; +import com.xuxd.kafka.console.config.KafkaConfig; import com.xuxd.kafka.console.service.AclService; import java.util.Collections; import java.util.HashMap; @@ -17,6 +18,7 @@ import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.apache.kafka.clients.admin.UserScramCredentialsDescription; import org.apache.kafka.common.acl.AclBinding; +import org.springframework.beans.factory.SmartInitializingSingleton; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; @@ -28,7 +30,7 @@ import org.springframework.stereotype.Service; **/ @Slf4j @Service -public class AclServiceImpl implements AclService { +public class AclServiceImpl implements AclService, SmartInitializingSingleton { @Autowired private KafkaConfigConsole configConsole; @@ -36,6 +38,9 @@ public class AclServiceImpl implements AclService { @Autowired private KafkaAclConsole aclConsole; + @Autowired + private KafkaConfig kafkaConfig; + @Override public ResponseData> getUserList() { try { return ResponseData.create(Set.class).data(configConsole.getUserList(null)).success(); @@ -46,10 +51,12 @@ public class AclServiceImpl implements AclService { } @Override public ResponseData addOrUpdateUser(String name, String pass) { + log.info("add or update user, username: {}, password: {}", name, pass); return configConsole.addOrUpdateUser(name, pass) ? ResponseData.create().success() : ResponseData.create().failed(); } @Override public ResponseData deleteUser(String name) { + log.info("delete user: {}", name); return configConsole.deleteUser(name) ? ResponseData.create().success() : ResponseData.create().failed(); } @@ -108,4 +115,15 @@ public class AclServiceImpl implements AclService { @Override public ResponseData deleteUserAcl(AclEntry entry) { return aclConsole.deleteUserAcl(entry) ? ResponseData.create().success() : ResponseData.create().failed(); } + + @Override public void afterSingletonsInstantiated() { + if (kafkaConfig.isAdminCreate()) { + log.info("Start create admin user, username: {}, password: {}", kafkaConfig.getAdminUsername(), kafkaConfig.getAdminPassword()); + boolean done = configConsole.addOrUpdateUserWithZK(kafkaConfig.getAdminUsername(), kafkaConfig.getAdminPassword()); + if (!done) { + log.error("Create admin failed."); + throw new IllegalStateException(); + } + } + } } diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 834e2f3..ecf591b 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -5,9 +5,17 @@ server: kafka: config: + # kafka broker地址,多个以逗号分隔 bootstrap-server: 'localhost:9092' request-timeout-ms: 60000 security-protocol: SASL_PLAINTEXT sasl-mechanism: SCRAM-SHA-256 - sasl-jaas-config: org.apache.kafka.common.security.scram.ScramLoginModule required username="admin" password="admin"; + # 超级管理员用户名,在broker上已经配置为超级管理员 + admin-username: admin + # 超级管理员密码 + admin-password: admin + # 启动自动创建配置的超级管理员用户 + admin-create: true + zookeeper-addr: localhost:2181 + sasl-jaas-config: org.apache.kafka.common.security.scram.ScramLoginModule required username="${kafka.config.admin-username}" password="${kafka.config.admin-password}"; diff --git a/src/main/scala/kafka/console/KafkaConfigConsole.scala b/src/main/scala/kafka/console/KafkaConfigConsole.scala index 93c979f..aa0749a 100644 --- a/src/main/scala/kafka/console/KafkaConfigConsole.scala +++ b/src/main/scala/kafka/console/KafkaConfigConsole.scala @@ -1,11 +1,16 @@ package kafka.console import java.util -import java.util.Set import java.util.concurrent.TimeUnit +import java.util.{Properties, Set} import com.xuxd.kafka.console.config.KafkaConfig +import kafka.server.ConfigType +import kafka.utils.Implicits.PropertiesOps +import kafka.zk.{AdminZkClient, KafkaZkClient} import org.apache.kafka.clients.admin._ +import org.apache.kafka.common.security.scram.internals.{ScramCredentialUtils, ScramFormatter} +import org.apache.kafka.common.utils.Time /** * kafka-console-ui. @@ -45,6 +50,30 @@ class KafkaConfigConsole(config: KafkaConfig) extends KafkaConsole(config: Kafka }).asInstanceOf[Boolean] } + def addOrUpdateUserWithZK(name: String, pass: String): Boolean = { + + val zkClient = KafkaZkClient(config.getZookeeperAddr, false, 30000, 30000, Int.MaxValue, Time.SYSTEM) + val adminZkClient = new AdminZkClient(zkClient) + try { + val credential = new ScramFormatter(org.apache.kafka.common.security.scram.internals.ScramMechanism.forMechanismName(config.getSaslMechanism)) + .generateCredential(pass, defaultIterations) + val credentialStr = ScramCredentialUtils.credentialToString(credential) + + val userConfig: Properties = new Properties() + userConfig.put(config.getSaslMechanism, credentialStr) + + val configs = adminZkClient.fetchEntityConfig(ConfigType.User, name) + userConfig ++= configs + adminZkClient.changeConfigs(ConfigType.User, name, userConfig) + true + } catch { + case e: Exception => log.error("addOrUpdateAdminWithZK error.", e) + false + } finally { + zkClient.close() + } + } + def deleteUser(name: String): Boolean = { withAdminClient(adminClient => { try { diff --git a/ui/src/views/acl/Acl.vue b/ui/src/views/acl/Acl.vue index fe154fd..ba6615f 100644 --- a/ui/src/views/acl/Acl.vue +++ b/ui/src/views/acl/Acl.vue @@ -54,12 +54,18 @@ - 删除 - 授予生产权限 - 收回生产权限 - 授予消费权限 - 收回消费权限 - 增加权限 + 删除 + 授予生产权限 + 收回生产权限 + 授予消费权限 + 收回消费权限 + 增加权限 + + + + + + @@ -142,6 +148,9 @@ export default { getAclList(this.data, this.queryParam); } }, + onDeleteUser(row) { + console.log("delete user:", row) + } }, created() { getAclList(this.data, this.queryParam);