create amdin user when start.

This commit is contained in:
许晓东
2021-09-02 16:20:46 +08:00
parent 517af091af
commit 4f30511293
6 changed files with 121 additions and 11 deletions

10
pom.xml
View File

@@ -21,6 +21,7 @@
<ui.path>${project.basedir}/ui</ui.path>
<frontend-maven-plugin.version>1.11.0</frontend-maven-plugin.version>
<compiler.version>1.8</compiler.version>
<kafka.version>2.8.0</kafka.version>
</properties>
<dependencies>
<dependency>
@@ -53,10 +54,15 @@
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.8.0</version>
<version>${kafka.version}</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.13</artifactId>
<version>${kafka.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.commons/commons-lang3 -->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>

View File

@@ -23,6 +23,14 @@ public class KafkaConfig {
private String saslJaasConfig;
private String adminUsername;
private String adminPassword;
private boolean adminCreate;
private String zookeeperAddr;
public String getBootstrapServer() {
return bootstrapServer;
}
@@ -62,4 +70,36 @@ public class KafkaConfig {
public void setSaslJaasConfig(String saslJaasConfig) {
this.saslJaasConfig = saslJaasConfig;
}
public String getAdminUsername() {
return adminUsername;
}
public void setAdminUsername(String adminUsername) {
this.adminUsername = adminUsername;
}
public String getAdminPassword() {
return adminPassword;
}
public void setAdminPassword(String adminPassword) {
this.adminPassword = adminPassword;
}
public boolean isAdminCreate() {
return adminCreate;
}
public void setAdminCreate(boolean adminCreate) {
this.adminCreate = adminCreate;
}
public String getZookeeperAddr() {
return zookeeperAddr;
}
public void setZookeeperAddr(String zookeeperAddr) {
this.zookeeperAddr = zookeeperAddr;
}
}

View File

@@ -4,6 +4,7 @@ import com.xuxd.kafka.console.beans.AclEntry;
import com.xuxd.kafka.console.beans.CounterList;
import com.xuxd.kafka.console.beans.CounterMap;
import com.xuxd.kafka.console.beans.ResponseData;
import com.xuxd.kafka.console.config.KafkaConfig;
import com.xuxd.kafka.console.service.AclService;
import java.util.Collections;
import java.util.HashMap;
@@ -17,6 +18,7 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.admin.UserScramCredentialsDescription;
import org.apache.kafka.common.acl.AclBinding;
import org.springframework.beans.factory.SmartInitializingSingleton;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@@ -28,7 +30,7 @@ import org.springframework.stereotype.Service;
**/
@Slf4j
@Service
public class AclServiceImpl implements AclService {
public class AclServiceImpl implements AclService, SmartInitializingSingleton {
@Autowired
private KafkaConfigConsole configConsole;
@@ -36,6 +38,9 @@ public class AclServiceImpl implements AclService {
@Autowired
private KafkaAclConsole aclConsole;
@Autowired
private KafkaConfig kafkaConfig;
@Override public ResponseData<Set<String>> getUserList() {
try {
return ResponseData.create(Set.class).data(configConsole.getUserList(null)).success();
@@ -46,10 +51,12 @@ public class AclServiceImpl implements AclService {
}
@Override public ResponseData addOrUpdateUser(String name, String pass) {
log.info("add or update user, username: {}, password: {}", name, pass);
return configConsole.addOrUpdateUser(name, pass) ? ResponseData.create().success() : ResponseData.create().failed();
}
@Override public ResponseData deleteUser(String name) {
log.info("delete user: {}", name);
return configConsole.deleteUser(name) ? ResponseData.create().success() : ResponseData.create().failed();
}
@@ -108,4 +115,15 @@ public class AclServiceImpl implements AclService {
@Override public ResponseData deleteUserAcl(AclEntry entry) {
return aclConsole.deleteUserAcl(entry) ? ResponseData.create().success() : ResponseData.create().failed();
}
@Override public void afterSingletonsInstantiated() {
if (kafkaConfig.isAdminCreate()) {
log.info("Start create admin user, username: {}, password: {}", kafkaConfig.getAdminUsername(), kafkaConfig.getAdminPassword());
boolean done = configConsole.addOrUpdateUserWithZK(kafkaConfig.getAdminUsername(), kafkaConfig.getAdminPassword());
if (!done) {
log.error("Create admin failed.");
throw new IllegalStateException();
}
}
}
}

View File

@@ -5,9 +5,17 @@ server:
kafka:
config:
# kafka broker地址多个以逗号分隔
bootstrap-server: 'localhost:9092'
request-timeout-ms: 60000
security-protocol: SASL_PLAINTEXT
sasl-mechanism: SCRAM-SHA-256
sasl-jaas-config: org.apache.kafka.common.security.scram.ScramLoginModule required username="admin" password="admin";
# 超级管理员用户名在broker上已经配置为超级管理员
admin-username: admin
# 超级管理员密码
admin-password: admin
# 启动自动创建配置的超级管理员用户
admin-create: true
zookeeper-addr: localhost:2181
sasl-jaas-config: org.apache.kafka.common.security.scram.ScramLoginModule required username="${kafka.config.admin-username}" password="${kafka.config.admin-password}";

View File

@@ -1,11 +1,16 @@
package kafka.console
import java.util
import java.util.Set
import java.util.concurrent.TimeUnit
import java.util.{Properties, Set}
import com.xuxd.kafka.console.config.KafkaConfig
import kafka.server.ConfigType
import kafka.utils.Implicits.PropertiesOps
import kafka.zk.{AdminZkClient, KafkaZkClient}
import org.apache.kafka.clients.admin._
import org.apache.kafka.common.security.scram.internals.{ScramCredentialUtils, ScramFormatter}
import org.apache.kafka.common.utils.Time
/**
* kafka-console-ui.
@@ -45,6 +50,30 @@ class KafkaConfigConsole(config: KafkaConfig) extends KafkaConsole(config: Kafka
}).asInstanceOf[Boolean]
}
def addOrUpdateUserWithZK(name: String, pass: String): Boolean = {
val zkClient = KafkaZkClient(config.getZookeeperAddr, false, 30000, 30000, Int.MaxValue, Time.SYSTEM)
val adminZkClient = new AdminZkClient(zkClient)
try {
val credential = new ScramFormatter(org.apache.kafka.common.security.scram.internals.ScramMechanism.forMechanismName(config.getSaslMechanism))
.generateCredential(pass, defaultIterations)
val credentialStr = ScramCredentialUtils.credentialToString(credential)
val userConfig: Properties = new Properties()
userConfig.put(config.getSaslMechanism, credentialStr)
val configs = adminZkClient.fetchEntityConfig(ConfigType.User, name)
userConfig ++= configs
adminZkClient.changeConfigs(ConfigType.User, name, userConfig)
true
} catch {
case e: Exception => log.error("addOrUpdateAdminWithZK error.", e)
false
} finally {
zkClient.close()
}
}
def deleteUser(name: String): Boolean = {
withAdminClient(adminClient => {
try {

View File

@@ -54,12 +54,18 @@
</div>
<a-table :columns="columns" :data-source="data" bordered>
<a slot="operation" slot-scope="{}">
<a-button class="operation-btn">删除</a-button>
<a-button class="operation-btn">授予生产权限</a-button>
<a-button class="operation-btn">收回生产权限</a-button>
<a-button class="operation-btn">授予消费权限</a-button>
<a-button class="operation-btn">收回消费权限</a-button>
<a-button class="operation-btn">增加权限</a-button>
<a slot="operation" href="javascript:;" class="operation-btn" @click="onDeleteUser">删除</a>
<a slot="operation" href="javascript:;" class="operation-btn">授予生产权限</a>
<a slot="operation" href="javascript:;" class="operation-btn">收回生产权限</a>
<a slot="operation" href="javascript:;" class="operation-btn">授予消费权限</a>
<a slot="operation" href="javascript:;" class="operation-btn">收回消费权限</a>
<a slot="operation" href="javascript:;" class="operation-btn">增加权限</a>
<!-- <a-button class="operation-btn">删除</a-button>-->
<!-- <a-button class="operation-btn">授予生产权限</a-button>-->
<!-- <a-button class="operation-btn">收回生产权限</a-button>-->
<!-- <a-button class="operation-btn">授予消费权限</a-button>-->
<!-- <a-button class="operation-btn">收回消费权限</a-button>-->
<!-- <a-button class="operation-btn">增加权限</a-button>-->
</a>
<!-- <a-table-->
<!-- slot="expandedRowRender"-->
@@ -142,6 +148,9 @@ export default {
getAclList(this.data, this.queryParam);
}
},
onDeleteUser(row) {
console.log("delete user:", row)
}
},
created() {
getAclList(this.data, this.queryParam);