diff --git a/.gitignore b/.gitignore index 8274d57..4dffb57 100644 --- a/.gitignore +++ b/.gitignore @@ -60,4 +60,5 @@ pnpm-debug.log* /ui/node /logs /LOGGING_HOME_IS_UNDEFINED -*.log \ No newline at end of file +*.log +/data \ No newline at end of file diff --git a/README.md b/README.md index af84ebe..49e8614 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,8 @@ # kafka可视化管理平台 -目前支持部分acl功能管理操作 +目前支持acl:SASL认证机制及受权管理操作 实现:spring boot + scala + vue + kafka +## kafka版本 +* kafka 2.8.0 # 打包、部署 ## 打包 环境要求 diff --git a/bin/start.sh b/bin/start.sh index 9af3864..f09352c 100644 --- a/bin/start.sh +++ b/bin/start.sh @@ -7,6 +7,10 @@ SCRIPT_DIR=`dirname $0` PROJECT_DIR="$SCRIPT_DIR/.." CONF_FILE="$PROJECT_DIR/config/application.yml" TARGET="$PROJECT_DIR/lib/kafka-console-ui.jar" + +#设置h2文件根目录 +DATA_DIR=$PROJECT_DIR + # 日志目录,默认为当前工程目录下 # 这个是错误输出,如果启动命令有误,输出到这个文件,应用日志不会输出到error.out,应用日志输出到上面的rocketmq-reput.log中 ERROR_OUT="$PROJECT_DIR/error.out" @@ -15,6 +19,6 @@ PROCESS_FLAG="kafka-console-ui-process-flag:${PROJECT_DIR}" JAVA_OPTS="$JAVA_OPTS $JAVA_MEM_OPTS" -nohup java -jar $JAVA_OPTS $TARGET --spring.config.location="$CONF_FILE" --logging.home="$PROJECT_DIR" $PROCESS_FLAG 1>/dev/null 2>$ERROR_OUT & +nohup java -jar $JAVA_OPTS $TARGET --spring.config.location="$CONF_FILE" --logging.home="$PROJECT_DIR" --data.dir=$DATA_DIR $PROCESS_FLAG 1>/dev/null 2>$ERROR_OUT & echo "Kafka-console-ui Started!" \ No newline at end of file diff --git a/src/main/java/com/xuxd/kafka/console/service/impl/AclServiceImpl.java b/src/main/java/com/xuxd/kafka/console/service/impl/AclServiceImpl.java index cfb5ca5..299b5da 100644 --- a/src/main/java/com/xuxd/kafka/console/service/impl/AclServiceImpl.java +++ b/src/main/java/com/xuxd/kafka/console/service/impl/AclServiceImpl.java @@ -199,8 +199,10 @@ public class AclServiceImpl implements AclService, SmartInitializingSingleton { vo.setConsistencyDescription("Password is null."); } else { vo.setPassword(dos.stream().findFirst().get().getPassword()); + // check for consistency. + boolean consistent = configConsole.isPassConsistent(username, vo.getPassword()); + vo.setConsistencyDescription(consistent ? "Consistent" : "Password is not consistent."); } - // check for consistency. return ResponseData.create().data(vo).success(); } diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 031d6ce..71395e1 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -26,13 +26,14 @@ spring: name: kafka-console-ui # h2 database datasource: -# url: jdbc:h2:file:/data/demo - url: jdbc:h2:mem:testdb + url: jdbc:h2:file:${data.dir:${user.dir}}/data/db/kafak-console +# url: jdbc:h2:mem:testdb driver-class-name: org.h2.Driver - username: sa - password: password + username: kafka + password: 1234567890 schema: classpath:db/schema-h2.sql # data: classpath:db/data-h2.sql + initialization-mode: always h2: console: enabled: true diff --git a/src/main/resources/db/schema-h2.sql b/src/main/resources/db/schema-h2.sql index 1dbec00..2cf6f33 100644 --- a/src/main/resources/db/schema-h2.sql +++ b/src/main/resources/db/schema-h2.sql @@ -1,6 +1,6 @@ -- DROP TABLE IF EXISTS T_KAKFA_USER; -CREATE TABLE if not exists T_KAFKA_USER +CREATE TABLE IF NOT EXISTS T_KAFKA_USER ( ID IDENTITY NOT NULL COMMENT '主键ID', USERNAME VARCHAR(50) NOT NULL DEFAULT '' COMMENT '姓名', @@ -8,5 +8,4 @@ CREATE TABLE if not exists T_KAFKA_USER UPDATE_TIME TIMESTAMP NOT NULL DEFAULT NOW() COMMENT '更新时间', PRIMARY KEY (ID), UNIQUE (USERNAME) -); - +); \ No newline at end of file diff --git a/src/main/scala/kafka/console/KafkaConfigConsole.scala b/src/main/scala/kafka/console/KafkaConfigConsole.scala index 9dfeaeb..ce2332c 100644 --- a/src/main/scala/kafka/console/KafkaConfigConsole.scala +++ b/src/main/scala/kafka/console/KafkaConfigConsole.scala @@ -1,5 +1,6 @@ package kafka.console +import java.security.MessageDigest import java.util import java.util.concurrent.TimeUnit import java.util.{Properties, Set} @@ -10,6 +11,8 @@ import kafka.utils.Implicits.PropertiesOps import org.apache.kafka.clients.admin._ import org.apache.kafka.common.security.scram.internals.{ScramCredentialUtils, ScramFormatter} +import scala.jdk.CollectionConverters.{CollectionHasAsScala, DictionaryHasAsScala, SeqHasAsJava} + /** * kafka-console-ui. * @@ -69,12 +72,41 @@ class KafkaConfigConsole(config: KafkaConfig) extends KafkaConsole(config: Kafka }).asInstanceOf[Boolean] } + /** + * password consistency check. + * return true: is consistent, or not. + */ + def isPassConsistent(username: String, password: String): Boolean = { + withZKClient(zkClient => { + val entityConfig = zkClient.fetchEntityConfig(ConfigType.User, username) + log.info(entityConfig.toString) + var res: Boolean = false + entityConfig.asScala.foreach(e => { + val credential = ScramCredentialUtils.credentialFromString(e._2.asInstanceOf[String]) + val scramFormatter = new ScramFormatter(org.apache.kafka.common.security.scram.internals.ScramMechanism.forMechanismName(e._1.asInstanceOf[String])) + val saltPassword = scramFormatter.saltedPassword(password, credential.salt(), credential.iterations()) + val expectStoredKey = credential.storedKey() + val computedStoredKey = scramFormatter.storedKey(scramFormatter.clientKey(saltPassword)) + res |= MessageDigest.isEqual(expectStoredKey, computedStoredKey) + }) + res + }).asInstanceOf[Boolean] + } + def deleteUser(name: String): (Boolean, String) = { withAdminClient(adminClient => { try { - adminClient.alterUserScramCredentials(util.Arrays.asList( - new UserScramCredentialDeletion(name, ScramMechanism.fromMechanismName(config.getSaslMechanism)))) - .all().get(3000, TimeUnit.MILLISECONDS) + // adminClient.alterUserScramCredentials(util.Arrays.asList( + // new UserScramCredentialDeletion(name, ScramMechanism.fromMechanismName(config.getSaslMechanism)))) + // .all().get(3000, TimeUnit.MILLISECONDS) + // all delete + val userDetail = getUserDetailList(util.Collections.singletonList(name)) + userDetail.values().asScala.foreach(u => { + adminClient.alterUserScramCredentials(u.credentialInfos().asScala.map(s => new UserScramCredentialDeletion(u.name(), s.mechanism()) + .asInstanceOf[UserScramCredentialAlteration]).toList.asJava) + .all().get(3000, TimeUnit.MILLISECONDS) + }) + (true, null) } catch { case ex: Exception => log.error("deleteUser error", ex)