请求超时时间
This commit is contained in:
@@ -3,7 +3,7 @@
|
||||
为了开发的省事,没有多语言支持,只支持中文展示。
|
||||
用过rocketmq-console吧,对,前端展示风格跟那个有点类似。
|
||||
## 安装包下载
|
||||
* 点击下载:[kafka-console-ui.tar.gz](http://43.128.31.53/kafka-console-ui.tar.gz)
|
||||
* 点击下载:[kafka-console-ui.tar.gz](http://43.128.31.53/kafka-console-ui.tar.gz) 或 [kafka-console-ui.zip](http://43.128.31.53/kafka-console-ui.zip)
|
||||
* 参考下面的打包部署,下载源码重新打包
|
||||
## 功能支持
|
||||
* 集群信息
|
||||
@@ -25,7 +25,7 @@
|
||||
# 打包、部署
|
||||
## 打包
|
||||
环境要求
|
||||
* maven 3+
|
||||
* maven 3.6+
|
||||
* jdk 8
|
||||
* git
|
||||
```
|
||||
@@ -89,4 +89,6 @@ java -jar target/kafka-console-ui.jar
|
||||
2. 打开idea的Project Structure(Settings) -> Modules -> 设置src/main/scala为Sources,因为约定src/main/java是源码目录,所以这里要再加一个
|
||||
3. 打开idea的Project Structure(Settings) -> Libraries 添加scala sdk,然后选择本地下载的scala 2.13的目录,确定添加进来
|
||||
## 前端
|
||||
前端代码在工程的ui目录下,找个前端开发的ide打开进行开发即可。
|
||||
前端代码在工程的ui目录下,找个前端开发的ide打开进行开发即可。
|
||||
## 注意
|
||||
前后分离,直接启动后端如果未编译前端代码是没有前端页面的,可以先打包进行编译`sh package.sh`,然后再用idea启动,或者前端部分单独启动
|
||||
@@ -8,7 +8,7 @@ kafka:
|
||||
config:
|
||||
# kafka broker地址,多个以逗号分隔
|
||||
bootstrap-server: 'localhost:9092'
|
||||
request-timeout-ms: 60000
|
||||
request-timeout-ms: 5000
|
||||
# 服务端是否启用acl,如果不启用,下面的所有配置都忽略即可,只用配置上面的Kafka集群地址就行了
|
||||
enable-acl: false
|
||||
# 只支持2种安全协议SASL_PLAINTEXT和PLAINTEXT,启用acl则设置为SASL_PLAINTEXT,不启用acl不需关心这个配置
|
||||
@@ -17,7 +17,7 @@ kafka:
|
||||
# 超级管理员用户名,在broker上已经配置为超级管理员
|
||||
admin-username: admin
|
||||
# 超级管理员密码
|
||||
admin-password: admin!QAZ
|
||||
admin-password: admin
|
||||
# 启动自动创建配置的超级管理员用户
|
||||
admin-create: false
|
||||
# broker连接的zk地址,如果启动自动创建配置的超级管理员用户则必须配置,否则忽略
|
||||
@@ -35,7 +35,7 @@ spring:
|
||||
username: kafka
|
||||
password: 1234567890
|
||||
schema: classpath:db/schema-h2.sql
|
||||
# data: classpath:db/data-h2.sql
|
||||
# data: classpath:db/data-h2.sqldata.dir
|
||||
initialization-mode: always
|
||||
h2:
|
||||
console:
|
||||
|
||||
@@ -21,15 +21,15 @@ class ClusterConsole(config: KafkaConfig) extends KafkaConsole(config: KafkaConf
|
||||
withAdminClientAndCatchError(admin => {
|
||||
val clusterResult: DescribeClusterResult = admin.describeCluster()
|
||||
val clusterInfo = new ClusterInfo
|
||||
clusterInfo.setClusterId(clusterResult.clusterId().get(3000, TimeUnit.MILLISECONDS))
|
||||
val acls = clusterResult.authorizedOperations().get(3000, TimeUnit.MILLISECONDS)
|
||||
clusterInfo.setClusterId(clusterResult.clusterId().get(timeoutMs, TimeUnit.MILLISECONDS))
|
||||
val acls = clusterResult.authorizedOperations().get(timeoutMs, TimeUnit.MILLISECONDS)
|
||||
if (acls != null) {
|
||||
clusterInfo.setAuthorizedOperations(acls.asScala.map(_.toString).toSet[String].asJava)
|
||||
} else {
|
||||
clusterInfo.setAuthorizedOperations(Collections.emptySet())
|
||||
}
|
||||
clusterInfo.setNodes(clusterResult.nodes().get(3000, TimeUnit.MILLISECONDS).asScala.map(BrokerNode.fromNode(_)).toSet[BrokerNode].asJava)
|
||||
val id = clusterResult.controller().get(3000, TimeUnit.MILLISECONDS).id()
|
||||
clusterInfo.setNodes(clusterResult.nodes().get(timeoutMs, TimeUnit.MILLISECONDS).asScala.map(BrokerNode.fromNode(_)).toSet[BrokerNode].asJava)
|
||||
val id = clusterResult.controller().get(timeoutMs, TimeUnit.MILLISECONDS).id()
|
||||
clusterInfo.getNodes.asScala.foreach(n => {
|
||||
if (n.getId == id) {
|
||||
n.setController(true)
|
||||
|
||||
@@ -58,7 +58,7 @@ class KafkaAclConsole(config: KafkaConfig) extends KafkaConsole(config: KafkaCon
|
||||
def addAcl(acls: List[AclBinding]): Boolean = {
|
||||
withAdminClient(adminClient => {
|
||||
try {
|
||||
adminClient.createAcls(acls).all().get(3000, TimeUnit.MILLISECONDS)
|
||||
adminClient.createAcls(acls).all().get(timeoutMs, TimeUnit.MILLISECONDS)
|
||||
true
|
||||
} catch {
|
||||
case e: Exception => log.error("addAcl error", e)
|
||||
@@ -100,7 +100,7 @@ class KafkaAclConsole(config: KafkaConfig) extends KafkaConsole(config: KafkaCon
|
||||
def deleteAcl(entry: AclEntry, allResource: Boolean, allPrincipal: Boolean, allOperation: Boolean): Boolean = {
|
||||
withAdminClient(adminClient => {
|
||||
try {
|
||||
val result = adminClient.deleteAcls(Collections.singleton(entry.toAclBindingFilter(allResource, allPrincipal, allOperation))).all().get(3000, TimeUnit.MILLISECONDS)
|
||||
val result = adminClient.deleteAcls(Collections.singleton(entry.toAclBindingFilter(allResource, allPrincipal, allOperation))).all().get(timeoutMs, TimeUnit.MILLISECONDS)
|
||||
log.info("delete acl: {}", result)
|
||||
true
|
||||
} catch {
|
||||
@@ -113,7 +113,7 @@ class KafkaAclConsole(config: KafkaConfig) extends KafkaConsole(config: KafkaCon
|
||||
def deleteAcl(filters: util.Collection[AclBindingFilter]): Boolean = {
|
||||
withAdminClient(adminClient => {
|
||||
try {
|
||||
val result = adminClient.deleteAcls(filters).all().get(3000, TimeUnit.MILLISECONDS)
|
||||
val result = adminClient.deleteAcls(filters).all().get(timeoutMs, TimeUnit.MILLISECONDS)
|
||||
log.info("delete acl: {}", result)
|
||||
true
|
||||
} catch {
|
||||
|
||||
@@ -41,7 +41,7 @@ class KafkaConfigConsole(config: KafkaConfig) extends KafkaConsole(config: Kafka
|
||||
adminClient.alterUserScramCredentials(util.Arrays.asList(
|
||||
new UserScramCredentialUpsertion(name,
|
||||
new ScramCredentialInfo(ScramMechanism.fromMechanismName(config.getSaslMechanism), defaultIterations), pass)))
|
||||
.all().get(3000, TimeUnit.MILLISECONDS)
|
||||
.all().get(timeoutMs, TimeUnit.MILLISECONDS)
|
||||
true
|
||||
} catch {
|
||||
case ex: Exception => log.error("addOrUpdateUser error", ex)
|
||||
@@ -98,13 +98,13 @@ class KafkaConfigConsole(config: KafkaConfig) extends KafkaConsole(config: Kafka
|
||||
try {
|
||||
// adminClient.alterUserScramCredentials(util.Arrays.asList(
|
||||
// new UserScramCredentialDeletion(name, ScramMechanism.fromMechanismName(config.getSaslMechanism))))
|
||||
// .all().get(3000, TimeUnit.MILLISECONDS)
|
||||
// .all().get(timeoutMs, TimeUnit.MILLISECONDS)
|
||||
// all delete
|
||||
val userDetail = getUserDetailList(util.Collections.singletonList(name))
|
||||
userDetail.values().asScala.foreach(u => {
|
||||
adminClient.alterUserScramCredentials(u.credentialInfos().asScala.map(s => new UserScramCredentialDeletion(u.name(), s.mechanism())
|
||||
.asInstanceOf[UserScramCredentialAlteration]).toList.asJava)
|
||||
.all().get(3000, TimeUnit.MILLISECONDS)
|
||||
.all().get(timeoutMs, TimeUnit.MILLISECONDS)
|
||||
})
|
||||
|
||||
(true, null)
|
||||
|
||||
@@ -19,7 +19,7 @@ import org.apache.kafka.common.utils.Time
|
||||
* */
|
||||
class KafkaConsole(config: KafkaConfig) {
|
||||
|
||||
protected val timeoutMs: Int = 3000
|
||||
protected val timeoutMs: Int = config.getRequestTimeoutMs
|
||||
|
||||
protected def withAdminClient(f: Admin => Any): Any = {
|
||||
|
||||
|
||||
@@ -25,7 +25,7 @@ class TopicConsole(config: KafkaConfig) extends KafkaConsole(config: KafkaConfig
|
||||
*/
|
||||
def getTopicNameList(internal: Boolean = true): Set[String] = {
|
||||
withAdminClientAndCatchError(admin => admin.listTopics(new ListTopicsOptions().listInternal(internal)).names()
|
||||
.get(3000, TimeUnit.MILLISECONDS),
|
||||
.get(timeoutMs, TimeUnit.MILLISECONDS),
|
||||
e => {
|
||||
log.error("listTopics error.", e)
|
||||
Collections.emptySet()
|
||||
@@ -39,7 +39,7 @@ class TopicConsole(config: KafkaConfig) extends KafkaConsole(config: KafkaConfig
|
||||
*/
|
||||
def getInternalTopicNameList(): Set[String] = {
|
||||
withAdminClientAndCatchError(admin => admin.listTopics(new ListTopicsOptions().listInternal(true)).listings()
|
||||
.get(3000, TimeUnit.MILLISECONDS).asScala.filter(_.isInternal).map(_.name()).toSet[String].asJava,
|
||||
.get(timeoutMs, TimeUnit.MILLISECONDS).asScala.filter(_.isInternal).map(_.name()).toSet[String].asJava,
|
||||
e => {
|
||||
log.error("listInternalTopics error.", e)
|
||||
Collections.emptySet()
|
||||
|
||||
Reference in New Issue
Block a user