diff --git a/pom.xml b/pom.xml
index 777db59..68114d6 100644
--- a/pom.xml
+++ b/pom.xml
@@ -21,6 +21,7 @@
${project.basedir}/ui
1.11.0
1.8
+ 2.8.0
@@ -53,10 +54,15 @@
org.apache.kafka
kafka-clients
- 2.8.0
+ ${kafka.version}
+
+
+
+ org.apache.kafka
+ kafka_2.13
+ ${kafka.version}
-
org.apache.commons
commons-lang3
diff --git a/src/main/java/com/xuxd/kafka/console/config/KafkaConfig.java b/src/main/java/com/xuxd/kafka/console/config/KafkaConfig.java
index 0404d9b..8add192 100644
--- a/src/main/java/com/xuxd/kafka/console/config/KafkaConfig.java
+++ b/src/main/java/com/xuxd/kafka/console/config/KafkaConfig.java
@@ -23,6 +23,14 @@ public class KafkaConfig {
private String saslJaasConfig;
+ private String adminUsername;
+
+ private String adminPassword;
+
+ private boolean adminCreate;
+
+ private String zookeeperAddr;
+
public String getBootstrapServer() {
return bootstrapServer;
}
@@ -62,4 +70,36 @@ public class KafkaConfig {
public void setSaslJaasConfig(String saslJaasConfig) {
this.saslJaasConfig = saslJaasConfig;
}
+
+ public String getAdminUsername() {
+ return adminUsername;
+ }
+
+ public void setAdminUsername(String adminUsername) {
+ this.adminUsername = adminUsername;
+ }
+
+ public String getAdminPassword() {
+ return adminPassword;
+ }
+
+ public void setAdminPassword(String adminPassword) {
+ this.adminPassword = adminPassword;
+ }
+
+ public boolean isAdminCreate() {
+ return adminCreate;
+ }
+
+ public void setAdminCreate(boolean adminCreate) {
+ this.adminCreate = adminCreate;
+ }
+
+ public String getZookeeperAddr() {
+ return zookeeperAddr;
+ }
+
+ public void setZookeeperAddr(String zookeeperAddr) {
+ this.zookeeperAddr = zookeeperAddr;
+ }
}
diff --git a/src/main/java/com/xuxd/kafka/console/service/impl/AclServiceImpl.java b/src/main/java/com/xuxd/kafka/console/service/impl/AclServiceImpl.java
index 0f01f49..50bc8e8 100644
--- a/src/main/java/com/xuxd/kafka/console/service/impl/AclServiceImpl.java
+++ b/src/main/java/com/xuxd/kafka/console/service/impl/AclServiceImpl.java
@@ -4,6 +4,7 @@ import com.xuxd.kafka.console.beans.AclEntry;
import com.xuxd.kafka.console.beans.CounterList;
import com.xuxd.kafka.console.beans.CounterMap;
import com.xuxd.kafka.console.beans.ResponseData;
+import com.xuxd.kafka.console.config.KafkaConfig;
import com.xuxd.kafka.console.service.AclService;
import java.util.Collections;
import java.util.HashMap;
@@ -17,6 +18,7 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.admin.UserScramCredentialsDescription;
import org.apache.kafka.common.acl.AclBinding;
+import org.springframework.beans.factory.SmartInitializingSingleton;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@@ -28,7 +30,7 @@ import org.springframework.stereotype.Service;
**/
@Slf4j
@Service
-public class AclServiceImpl implements AclService {
+public class AclServiceImpl implements AclService, SmartInitializingSingleton {
@Autowired
private KafkaConfigConsole configConsole;
@@ -36,6 +38,9 @@ public class AclServiceImpl implements AclService {
@Autowired
private KafkaAclConsole aclConsole;
+ @Autowired
+ private KafkaConfig kafkaConfig;
+
@Override public ResponseData> getUserList() {
try {
return ResponseData.create(Set.class).data(configConsole.getUserList(null)).success();
@@ -46,10 +51,12 @@ public class AclServiceImpl implements AclService {
}
@Override public ResponseData addOrUpdateUser(String name, String pass) {
+ log.info("add or update user, username: {}, password: {}", name, pass);
return configConsole.addOrUpdateUser(name, pass) ? ResponseData.create().success() : ResponseData.create().failed();
}
@Override public ResponseData deleteUser(String name) {
+ log.info("delete user: {}", name);
return configConsole.deleteUser(name) ? ResponseData.create().success() : ResponseData.create().failed();
}
@@ -108,4 +115,15 @@ public class AclServiceImpl implements AclService {
@Override public ResponseData deleteUserAcl(AclEntry entry) {
return aclConsole.deleteUserAcl(entry) ? ResponseData.create().success() : ResponseData.create().failed();
}
+
+ @Override public void afterSingletonsInstantiated() {
+ if (kafkaConfig.isAdminCreate()) {
+ log.info("Start create admin user, username: {}, password: {}", kafkaConfig.getAdminUsername(), kafkaConfig.getAdminPassword());
+ boolean done = configConsole.addOrUpdateUserWithZK(kafkaConfig.getAdminUsername(), kafkaConfig.getAdminPassword());
+ if (!done) {
+ log.error("Create admin failed.");
+ throw new IllegalStateException();
+ }
+ }
+ }
}
diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml
index 834e2f3..ecf591b 100644
--- a/src/main/resources/application.yml
+++ b/src/main/resources/application.yml
@@ -5,9 +5,17 @@ server:
kafka:
config:
+ # kafka broker地址,多个以逗号分隔
bootstrap-server: 'localhost:9092'
request-timeout-ms: 60000
security-protocol: SASL_PLAINTEXT
sasl-mechanism: SCRAM-SHA-256
- sasl-jaas-config: org.apache.kafka.common.security.scram.ScramLoginModule required username="admin" password="admin";
+ # 超级管理员用户名,在broker上已经配置为超级管理员
+ admin-username: admin
+ # 超级管理员密码
+ admin-password: admin
+ # 启动自动创建配置的超级管理员用户
+ admin-create: true
+ zookeeper-addr: localhost:2181
+ sasl-jaas-config: org.apache.kafka.common.security.scram.ScramLoginModule required username="${kafka.config.admin-username}" password="${kafka.config.admin-password}";
diff --git a/src/main/scala/kafka/console/KafkaConfigConsole.scala b/src/main/scala/kafka/console/KafkaConfigConsole.scala
index 93c979f..aa0749a 100644
--- a/src/main/scala/kafka/console/KafkaConfigConsole.scala
+++ b/src/main/scala/kafka/console/KafkaConfigConsole.scala
@@ -1,11 +1,16 @@
package kafka.console
import java.util
-import java.util.Set
import java.util.concurrent.TimeUnit
+import java.util.{Properties, Set}
import com.xuxd.kafka.console.config.KafkaConfig
+import kafka.server.ConfigType
+import kafka.utils.Implicits.PropertiesOps
+import kafka.zk.{AdminZkClient, KafkaZkClient}
import org.apache.kafka.clients.admin._
+import org.apache.kafka.common.security.scram.internals.{ScramCredentialUtils, ScramFormatter}
+import org.apache.kafka.common.utils.Time
/**
* kafka-console-ui.
@@ -45,6 +50,30 @@ class KafkaConfigConsole(config: KafkaConfig) extends KafkaConsole(config: Kafka
}).asInstanceOf[Boolean]
}
+ def addOrUpdateUserWithZK(name: String, pass: String): Boolean = {
+
+ val zkClient = KafkaZkClient(config.getZookeeperAddr, false, 30000, 30000, Int.MaxValue, Time.SYSTEM)
+ val adminZkClient = new AdminZkClient(zkClient)
+ try {
+ val credential = new ScramFormatter(org.apache.kafka.common.security.scram.internals.ScramMechanism.forMechanismName(config.getSaslMechanism))
+ .generateCredential(pass, defaultIterations)
+ val credentialStr = ScramCredentialUtils.credentialToString(credential)
+
+ val userConfig: Properties = new Properties()
+ userConfig.put(config.getSaslMechanism, credentialStr)
+
+ val configs = adminZkClient.fetchEntityConfig(ConfigType.User, name)
+ userConfig ++= configs
+ adminZkClient.changeConfigs(ConfigType.User, name, userConfig)
+ true
+ } catch {
+ case e: Exception => log.error("addOrUpdateAdminWithZK error.", e)
+ false
+ } finally {
+ zkClient.close()
+ }
+ }
+
def deleteUser(name: String): Boolean = {
withAdminClient(adminClient => {
try {
diff --git a/ui/src/views/acl/Acl.vue b/ui/src/views/acl/Acl.vue
index fe154fd..ba6615f 100644
--- a/ui/src/views/acl/Acl.vue
+++ b/ui/src/views/acl/Acl.vue
@@ -54,12 +54,18 @@
- 删除
- 授予生产权限
- 收回生产权限
- 授予消费权限
- 收回消费权限
- 增加权限
+ 删除
+ 授予生产权限
+ 收回生产权限
+ 授予消费权限
+ 收回消费权限
+ 增加权限
+
+
+
+
+
+
@@ -142,6 +148,9 @@ export default {
getAclList(this.data, this.queryParam);
}
},
+ onDeleteUser(row) {
+ console.log("delete user:", row)
+ }
},
created() {
getAclList(this.data, this.queryParam);