diff --git a/README.md b/README.md index 5cd31a4..5f77cae 100644 --- a/README.md +++ b/README.md @@ -3,7 +3,7 @@ 为了开发的省事,没有多语言支持,只支持中文展示。 用过rocketmq-console吧,对,前端展示风格跟那个有点类似。 ## 安装包下载 -* 点击下载:[kafka-console-ui.tar.gz](http://43.128.31.53/kafka-console-ui.tar.gz) +* 点击下载:[kafka-console-ui.tar.gz](http://43.128.31.53/kafka-console-ui.tar.gz) 或 [kafka-console-ui.zip](http://43.128.31.53/kafka-console-ui.zip) * 参考下面的打包部署,下载源码重新打包 ## 功能支持 * 集群信息 @@ -25,7 +25,7 @@ # 打包、部署 ## 打包 环境要求 -* maven 3+ +* maven 3.6+ * jdk 8 * git ``` @@ -89,4 +89,6 @@ java -jar target/kafka-console-ui.jar 2. 打开idea的Project Structure(Settings) -> Modules -> 设置src/main/scala为Sources,因为约定src/main/java是源码目录,所以这里要再加一个 3. 打开idea的Project Structure(Settings) -> Libraries 添加scala sdk,然后选择本地下载的scala 2.13的目录,确定添加进来 ## 前端 -前端代码在工程的ui目录下,找个前端开发的ide打开进行开发即可。 \ No newline at end of file +前端代码在工程的ui目录下,找个前端开发的ide打开进行开发即可。 +## 注意 +前后分离,直接启动后端如果未编译前端代码是没有前端页面的,可以先打包进行编译`sh package.sh`,然后再用idea启动,或者前端部分单独启动 \ No newline at end of file diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 8aa0f33..5290308 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -8,7 +8,7 @@ kafka: config: # kafka broker地址,多个以逗号分隔 bootstrap-server: 'localhost:9092' - request-timeout-ms: 60000 + request-timeout-ms: 5000 # 服务端是否启用acl,如果不启用,下面的所有配置都忽略即可,只用配置上面的Kafka集群地址就行了 enable-acl: false # 只支持2种安全协议SASL_PLAINTEXT和PLAINTEXT,启用acl则设置为SASL_PLAINTEXT,不启用acl不需关心这个配置 @@ -17,7 +17,7 @@ kafka: # 超级管理员用户名,在broker上已经配置为超级管理员 admin-username: admin # 超级管理员密码 - admin-password: admin!QAZ + admin-password: admin # 启动自动创建配置的超级管理员用户 admin-create: false # broker连接的zk地址,如果启动自动创建配置的超级管理员用户则必须配置,否则忽略 @@ -35,7 +35,7 @@ spring: username: kafka password: 1234567890 schema: classpath:db/schema-h2.sql -# data: classpath:db/data-h2.sql +# data: classpath:db/data-h2.sqldata.dir initialization-mode: always h2: console: diff --git a/src/main/scala/kafka/console/ClusterConsole.scala b/src/main/scala/kafka/console/ClusterConsole.scala index e15cbe4..bafe6ff 100644 --- a/src/main/scala/kafka/console/ClusterConsole.scala +++ b/src/main/scala/kafka/console/ClusterConsole.scala @@ -21,15 +21,15 @@ class ClusterConsole(config: KafkaConfig) extends KafkaConsole(config: KafkaConf withAdminClientAndCatchError(admin => { val clusterResult: DescribeClusterResult = admin.describeCluster() val clusterInfo = new ClusterInfo - clusterInfo.setClusterId(clusterResult.clusterId().get(3000, TimeUnit.MILLISECONDS)) - val acls = clusterResult.authorizedOperations().get(3000, TimeUnit.MILLISECONDS) + clusterInfo.setClusterId(clusterResult.clusterId().get(timeoutMs, TimeUnit.MILLISECONDS)) + val acls = clusterResult.authorizedOperations().get(timeoutMs, TimeUnit.MILLISECONDS) if (acls != null) { clusterInfo.setAuthorizedOperations(acls.asScala.map(_.toString).toSet[String].asJava) } else { clusterInfo.setAuthorizedOperations(Collections.emptySet()) } - clusterInfo.setNodes(clusterResult.nodes().get(3000, TimeUnit.MILLISECONDS).asScala.map(BrokerNode.fromNode(_)).toSet[BrokerNode].asJava) - val id = clusterResult.controller().get(3000, TimeUnit.MILLISECONDS).id() + clusterInfo.setNodes(clusterResult.nodes().get(timeoutMs, TimeUnit.MILLISECONDS).asScala.map(BrokerNode.fromNode(_)).toSet[BrokerNode].asJava) + val id = clusterResult.controller().get(timeoutMs, TimeUnit.MILLISECONDS).id() clusterInfo.getNodes.asScala.foreach(n => { if (n.getId == id) { n.setController(true) diff --git a/src/main/scala/kafka/console/KafkaAclConsole.scala b/src/main/scala/kafka/console/KafkaAclConsole.scala index ef85d3a..0fd0c3e 100644 --- a/src/main/scala/kafka/console/KafkaAclConsole.scala +++ b/src/main/scala/kafka/console/KafkaAclConsole.scala @@ -58,7 +58,7 @@ class KafkaAclConsole(config: KafkaConfig) extends KafkaConsole(config: KafkaCon def addAcl(acls: List[AclBinding]): Boolean = { withAdminClient(adminClient => { try { - adminClient.createAcls(acls).all().get(3000, TimeUnit.MILLISECONDS) + adminClient.createAcls(acls).all().get(timeoutMs, TimeUnit.MILLISECONDS) true } catch { case e: Exception => log.error("addAcl error", e) @@ -100,7 +100,7 @@ class KafkaAclConsole(config: KafkaConfig) extends KafkaConsole(config: KafkaCon def deleteAcl(entry: AclEntry, allResource: Boolean, allPrincipal: Boolean, allOperation: Boolean): Boolean = { withAdminClient(adminClient => { try { - val result = adminClient.deleteAcls(Collections.singleton(entry.toAclBindingFilter(allResource, allPrincipal, allOperation))).all().get(3000, TimeUnit.MILLISECONDS) + val result = adminClient.deleteAcls(Collections.singleton(entry.toAclBindingFilter(allResource, allPrincipal, allOperation))).all().get(timeoutMs, TimeUnit.MILLISECONDS) log.info("delete acl: {}", result) true } catch { @@ -113,7 +113,7 @@ class KafkaAclConsole(config: KafkaConfig) extends KafkaConsole(config: KafkaCon def deleteAcl(filters: util.Collection[AclBindingFilter]): Boolean = { withAdminClient(adminClient => { try { - val result = adminClient.deleteAcls(filters).all().get(3000, TimeUnit.MILLISECONDS) + val result = adminClient.deleteAcls(filters).all().get(timeoutMs, TimeUnit.MILLISECONDS) log.info("delete acl: {}", result) true } catch { diff --git a/src/main/scala/kafka/console/KafkaConfigConsole.scala b/src/main/scala/kafka/console/KafkaConfigConsole.scala index ce2332c..69464e3 100644 --- a/src/main/scala/kafka/console/KafkaConfigConsole.scala +++ b/src/main/scala/kafka/console/KafkaConfigConsole.scala @@ -41,7 +41,7 @@ class KafkaConfigConsole(config: KafkaConfig) extends KafkaConsole(config: Kafka adminClient.alterUserScramCredentials(util.Arrays.asList( new UserScramCredentialUpsertion(name, new ScramCredentialInfo(ScramMechanism.fromMechanismName(config.getSaslMechanism), defaultIterations), pass))) - .all().get(3000, TimeUnit.MILLISECONDS) + .all().get(timeoutMs, TimeUnit.MILLISECONDS) true } catch { case ex: Exception => log.error("addOrUpdateUser error", ex) @@ -98,13 +98,13 @@ class KafkaConfigConsole(config: KafkaConfig) extends KafkaConsole(config: Kafka try { // adminClient.alterUserScramCredentials(util.Arrays.asList( // new UserScramCredentialDeletion(name, ScramMechanism.fromMechanismName(config.getSaslMechanism)))) - // .all().get(3000, TimeUnit.MILLISECONDS) + // .all().get(timeoutMs, TimeUnit.MILLISECONDS) // all delete val userDetail = getUserDetailList(util.Collections.singletonList(name)) userDetail.values().asScala.foreach(u => { adminClient.alterUserScramCredentials(u.credentialInfos().asScala.map(s => new UserScramCredentialDeletion(u.name(), s.mechanism()) .asInstanceOf[UserScramCredentialAlteration]).toList.asJava) - .all().get(3000, TimeUnit.MILLISECONDS) + .all().get(timeoutMs, TimeUnit.MILLISECONDS) }) (true, null) diff --git a/src/main/scala/kafka/console/KafkaConsole.scala b/src/main/scala/kafka/console/KafkaConsole.scala index 0820d4e..8e496ca 100644 --- a/src/main/scala/kafka/console/KafkaConsole.scala +++ b/src/main/scala/kafka/console/KafkaConsole.scala @@ -19,7 +19,7 @@ import org.apache.kafka.common.utils.Time * */ class KafkaConsole(config: KafkaConfig) { - protected val timeoutMs: Int = 3000 + protected val timeoutMs: Int = config.getRequestTimeoutMs protected def withAdminClient(f: Admin => Any): Any = { diff --git a/src/main/scala/kafka/console/TopicConsole.scala b/src/main/scala/kafka/console/TopicConsole.scala index a2b5afa..ad522c4 100644 --- a/src/main/scala/kafka/console/TopicConsole.scala +++ b/src/main/scala/kafka/console/TopicConsole.scala @@ -25,7 +25,7 @@ class TopicConsole(config: KafkaConfig) extends KafkaConsole(config: KafkaConfig */ def getTopicNameList(internal: Boolean = true): Set[String] = { withAdminClientAndCatchError(admin => admin.listTopics(new ListTopicsOptions().listInternal(internal)).names() - .get(3000, TimeUnit.MILLISECONDS), + .get(timeoutMs, TimeUnit.MILLISECONDS), e => { log.error("listTopics error.", e) Collections.emptySet() @@ -39,7 +39,7 @@ class TopicConsole(config: KafkaConfig) extends KafkaConsole(config: KafkaConfig */ def getInternalTopicNameList(): Set[String] = { withAdminClientAndCatchError(admin => admin.listTopics(new ListTopicsOptions().listInternal(true)).listings() - .get(3000, TimeUnit.MILLISECONDS).asScala.filter(_.isInternal).map(_.name()).toSet[String].asJava, + .get(timeoutMs, TimeUnit.MILLISECONDS).asScala.filter(_.isInternal).map(_.name()).toSet[String].asJava, e => { log.error("listInternalTopics error.", e) Collections.emptySet()