diff --git a/src/main/java/com/xuxd/kafka/console/beans/BrokerNode.java b/src/main/java/com/xuxd/kafka/console/beans/BrokerNode.java new file mode 100644 index 0000000..02a6d6c --- /dev/null +++ b/src/main/java/com/xuxd/kafka/console/beans/BrokerNode.java @@ -0,0 +1,83 @@ +package com.xuxd.kafka.console.beans; + +import org.apache.kafka.common.Node; + +/** + * kafka-console-ui. + * + * @author xuxd + * @date 2021-10-08 14:03:21 + **/ +public class BrokerNode { + + private int id; + + private String idString; + + private String host; + + private int port; + + private String rack; + + private boolean isController; + + public static BrokerNode fromNode(Node node) { + BrokerNode brokerNode = new BrokerNode(); + brokerNode.setId(node.id()); + brokerNode.setIdString(node.idString()); + brokerNode.setHost(node.host()); + brokerNode.setPort(node.port()); + brokerNode.setRack(node.rack()); + + return brokerNode; + } + + public int getId() { + return id; + } + + public void setId(int id) { + this.id = id; + } + + public String getIdString() { + return idString; + } + + public void setIdString(String idString) { + this.idString = idString; + } + + public String getHost() { + return host; + } + + public void setHost(String host) { + this.host = host; + } + + public int getPort() { + return port; + } + + public void setPort(int port) { + this.port = port; + } + + public String getRack() { + return rack; + } + + public void setRack(String rack) { + this.rack = rack; + } + + public boolean isController() { + return isController; + } + + public void setController(boolean controller) { + isController = controller; + } +} diff --git a/src/main/java/com/xuxd/kafka/console/beans/ClusterInfo.java b/src/main/java/com/xuxd/kafka/console/beans/ClusterInfo.java new file mode 100644 index 0000000..1064052 --- /dev/null +++ b/src/main/java/com/xuxd/kafka/console/beans/ClusterInfo.java @@ -0,0 +1,42 @@ +package com.xuxd.kafka.console.beans; + +import java.util.Set; + +/** + * kafka-console-ui. + * + * @author xuxd + * @date 2021-10-08 13:57:48 + **/ +public class ClusterInfo { + + private Set nodes; + + private Set authorizedOperations; + + private String clusterId; + + public Set getNodes() { + return nodes; + } + + public void setNodes(Set nodes) { + this.nodes = nodes; + } + + public Set getAuthorizedOperations() { + return authorizedOperations; + } + + public void setAuthorizedOperations(Set authorizedOperations) { + this.authorizedOperations = authorizedOperations; + } + + public String getClusterId() { + return clusterId; + } + + public void setClusterId(String clusterId) { + this.clusterId = clusterId; + } +} diff --git a/src/main/java/com/xuxd/kafka/console/config/KafkaConfiguration.java b/src/main/java/com/xuxd/kafka/console/config/KafkaConfiguration.java index 4035072..1be32a1 100644 --- a/src/main/java/com/xuxd/kafka/console/config/KafkaConfiguration.java +++ b/src/main/java/com/xuxd/kafka/console/config/KafkaConfiguration.java @@ -1,5 +1,6 @@ package com.xuxd.kafka.console.config; +import kafka.console.ClusterConsole; import kafka.console.ConsumerConsole; import kafka.console.KafkaAclConsole; import kafka.console.KafkaConfigConsole; @@ -35,4 +36,9 @@ public class KafkaConfiguration { public ConsumerConsole consumerConsole(KafkaConfig config) { return new ConsumerConsole(config); } + + @Bean + public ClusterConsole clusterConsole(KafkaConfig config) { + return new ClusterConsole(config); + } } diff --git a/src/main/java/com/xuxd/kafka/console/controller/ClusterController.java b/src/main/java/com/xuxd/kafka/console/controller/ClusterController.java new file mode 100644 index 0000000..e456f3e --- /dev/null +++ b/src/main/java/com/xuxd/kafka/console/controller/ClusterController.java @@ -0,0 +1,26 @@ +package com.xuxd.kafka.console.controller; + +import com.xuxd.kafka.console.service.ClusterService; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; + +/** + * kafka-console-ui. + * + * @author xuxd + * @date 2021-10-08 14:26:11 + **/ +@RestController +@RequestMapping("/cluster") +public class ClusterController { + + @Autowired + private ClusterService clusterService; + + @GetMapping + public Object getClusterInfo() { + return clusterService.getClusterInfo(); + } +} diff --git a/src/main/java/com/xuxd/kafka/console/service/ClusterService.java b/src/main/java/com/xuxd/kafka/console/service/ClusterService.java new file mode 100644 index 0000000..141740c --- /dev/null +++ b/src/main/java/com/xuxd/kafka/console/service/ClusterService.java @@ -0,0 +1,13 @@ +package com.xuxd.kafka.console.service; + +import com.xuxd.kafka.console.beans.ResponseData; + +/** + * kafka-console-ui. + * + * @author xuxd + * @date 2021-10-08 14:22:30 + **/ +public interface ClusterService { + ResponseData getClusterInfo(); +} diff --git a/src/main/java/com/xuxd/kafka/console/service/impl/ClusterServiceImpl.java b/src/main/java/com/xuxd/kafka/console/service/impl/ClusterServiceImpl.java new file mode 100644 index 0000000..51d556b --- /dev/null +++ b/src/main/java/com/xuxd/kafka/console/service/impl/ClusterServiceImpl.java @@ -0,0 +1,24 @@ +package com.xuxd.kafka.console.service.impl; + +import com.xuxd.kafka.console.beans.ResponseData; +import com.xuxd.kafka.console.service.ClusterService; +import kafka.console.ClusterConsole; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +/** + * kafka-console-ui. + * + * @author xuxd + * @date 2021-10-08 14:23:09 + **/ +@Service +public class ClusterServiceImpl implements ClusterService { + + @Autowired + private ClusterConsole clusterConsole; + + @Override public ResponseData getClusterInfo() { + return ResponseData.create().data(clusterConsole.clusterInfo()).success(); + } +} diff --git a/src/main/scala/kafka/console/ClusterConsole.scala b/src/main/scala/kafka/console/ClusterConsole.scala new file mode 100644 index 0000000..e15cbe4 --- /dev/null +++ b/src/main/scala/kafka/console/ClusterConsole.scala @@ -0,0 +1,44 @@ +package kafka.console + +import java.util.Collections +import java.util.concurrent.TimeUnit + +import com.xuxd.kafka.console.beans.{BrokerNode, ClusterInfo} +import com.xuxd.kafka.console.config.KafkaConfig +import org.apache.kafka.clients.admin.DescribeClusterResult + +import scala.jdk.CollectionConverters.{CollectionHasAsScala, SetHasAsJava, SetHasAsScala} + +/** + * kafka-console-ui. cluster console. + * + * @author xuxd + * @date 2021-10-08 10:55:56 + * */ +class ClusterConsole(config: KafkaConfig) extends KafkaConsole(config: KafkaConfig) with Logging { + + def clusterInfo(): ClusterInfo = { + withAdminClientAndCatchError(admin => { + val clusterResult: DescribeClusterResult = admin.describeCluster() + val clusterInfo = new ClusterInfo + clusterInfo.setClusterId(clusterResult.clusterId().get(3000, TimeUnit.MILLISECONDS)) + val acls = clusterResult.authorizedOperations().get(3000, TimeUnit.MILLISECONDS) + if (acls != null) { + clusterInfo.setAuthorizedOperations(acls.asScala.map(_.toString).toSet[String].asJava) + } else { + clusterInfo.setAuthorizedOperations(Collections.emptySet()) + } + clusterInfo.setNodes(clusterResult.nodes().get(3000, TimeUnit.MILLISECONDS).asScala.map(BrokerNode.fromNode(_)).toSet[BrokerNode].asJava) + val id = clusterResult.controller().get(3000, TimeUnit.MILLISECONDS).id() + clusterInfo.getNodes.asScala.foreach(n => { + if (n.getId == id) { + n.setController(true) + } + }) + clusterInfo + }, eh => { + log.error("get clusterInfo error.", eh) + new ClusterInfo + }).asInstanceOf[ClusterInfo] + } +} diff --git a/ui/src/App.vue b/ui/src/App.vue index be109b9..4bb2897 100644 --- a/ui/src/App.vue +++ b/ui/src/App.vue @@ -12,7 +12,8 @@ >Acl - |运维 + |运维 diff --git a/ui/src/router/index.js b/ui/src/router/index.js index ecb026d..c47204c 100644 --- a/ui/src/router/index.js +++ b/ui/src/router/index.js @@ -37,6 +37,12 @@ const routes = [ component: () => import(/* webpackChunkName: "op" */ "../views/op/Operation.vue"), }, + { + path: "/cluster-page", + name: "Cluster", + component: () => + import(/* webpackChunkName: "cluster" */ "../views/cluster/Cluster.vue"), + }, ]; const router = new VueRouter({ diff --git a/ui/src/utils/api.js b/ui/src/utils/api.js index e8219df..d6c3467 100644 --- a/ui/src/utils/api.js +++ b/ui/src/utils/api.js @@ -85,3 +85,10 @@ export const KafkaConsumerApi = { method: "get", }, }; + +export const KafkaClusterApi = { + getClusterInfo: { + url: "/cluster", + method: "get", + }, +}; diff --git a/ui/src/views/cluster/Cluster.vue b/ui/src/views/cluster/Cluster.vue new file mode 100644 index 0000000..89e5f78 --- /dev/null +++ b/ui/src/views/cluster/Cluster.vue @@ -0,0 +1,92 @@ + + + + +