集群信息

This commit is contained in:
许晓东
2021-10-08 15:40:21 +08:00
parent 73d44fe009
commit d4fdb739b9
11 changed files with 345 additions and 1 deletions

View File

@@ -0,0 +1,83 @@
package com.xuxd.kafka.console.beans;
import org.apache.kafka.common.Node;
/**
* kafka-console-ui.
*
* @author xuxd
* @date 2021-10-08 14:03:21
**/
public class BrokerNode {
private int id;
private String idString;
private String host;
private int port;
private String rack;
private boolean isController;
public static BrokerNode fromNode(Node node) {
BrokerNode brokerNode = new BrokerNode();
brokerNode.setId(node.id());
brokerNode.setIdString(node.idString());
brokerNode.setHost(node.host());
brokerNode.setPort(node.port());
brokerNode.setRack(node.rack());
return brokerNode;
}
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getIdString() {
return idString;
}
public void setIdString(String idString) {
this.idString = idString;
}
public String getHost() {
return host;
}
public void setHost(String host) {
this.host = host;
}
public int getPort() {
return port;
}
public void setPort(int port) {
this.port = port;
}
public String getRack() {
return rack;
}
public void setRack(String rack) {
this.rack = rack;
}
public boolean isController() {
return isController;
}
public void setController(boolean controller) {
isController = controller;
}
}

View File

@@ -0,0 +1,42 @@
package com.xuxd.kafka.console.beans;
import java.util.Set;
/**
* kafka-console-ui.
*
* @author xuxd
* @date 2021-10-08 13:57:48
**/
public class ClusterInfo {
private Set<BrokerNode> nodes;
private Set<String> authorizedOperations;
private String clusterId;
public Set<BrokerNode> getNodes() {
return nodes;
}
public void setNodes(Set<BrokerNode> nodes) {
this.nodes = nodes;
}
public Set<String> getAuthorizedOperations() {
return authorizedOperations;
}
public void setAuthorizedOperations(Set<String> authorizedOperations) {
this.authorizedOperations = authorizedOperations;
}
public String getClusterId() {
return clusterId;
}
public void setClusterId(String clusterId) {
this.clusterId = clusterId;
}
}

View File

@@ -1,5 +1,6 @@
package com.xuxd.kafka.console.config;
import kafka.console.ClusterConsole;
import kafka.console.ConsumerConsole;
import kafka.console.KafkaAclConsole;
import kafka.console.KafkaConfigConsole;
@@ -35,4 +36,9 @@ public class KafkaConfiguration {
public ConsumerConsole consumerConsole(KafkaConfig config) {
return new ConsumerConsole(config);
}
@Bean
public ClusterConsole clusterConsole(KafkaConfig config) {
return new ClusterConsole(config);
}
}

View File

@@ -0,0 +1,26 @@
package com.xuxd.kafka.console.controller;
import com.xuxd.kafka.console.service.ClusterService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* kafka-console-ui.
*
* @author xuxd
* @date 2021-10-08 14:26:11
**/
@RestController
@RequestMapping("/cluster")
public class ClusterController {
@Autowired
private ClusterService clusterService;
@GetMapping
public Object getClusterInfo() {
return clusterService.getClusterInfo();
}
}

View File

@@ -0,0 +1,13 @@
package com.xuxd.kafka.console.service;
import com.xuxd.kafka.console.beans.ResponseData;
/**
* kafka-console-ui.
*
* @author xuxd
* @date 2021-10-08 14:22:30
**/
public interface ClusterService {
ResponseData getClusterInfo();
}

View File

@@ -0,0 +1,24 @@
package com.xuxd.kafka.console.service.impl;
import com.xuxd.kafka.console.beans.ResponseData;
import com.xuxd.kafka.console.service.ClusterService;
import kafka.console.ClusterConsole;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
/**
* kafka-console-ui.
*
* @author xuxd
* @date 2021-10-08 14:23:09
**/
@Service
public class ClusterServiceImpl implements ClusterService {
@Autowired
private ClusterConsole clusterConsole;
@Override public ResponseData getClusterInfo() {
return ResponseData.create().data(clusterConsole.clusterInfo()).success();
}
}

View File

@@ -0,0 +1,44 @@
package kafka.console
import java.util.Collections
import java.util.concurrent.TimeUnit
import com.xuxd.kafka.console.beans.{BrokerNode, ClusterInfo}
import com.xuxd.kafka.console.config.KafkaConfig
import org.apache.kafka.clients.admin.DescribeClusterResult
import scala.jdk.CollectionConverters.{CollectionHasAsScala, SetHasAsJava, SetHasAsScala}
/**
* kafka-console-ui. cluster console.
*
* @author xuxd
* @date 2021-10-08 10:55:56
* */
class ClusterConsole(config: KafkaConfig) extends KafkaConsole(config: KafkaConfig) with Logging {
def clusterInfo(): ClusterInfo = {
withAdminClientAndCatchError(admin => {
val clusterResult: DescribeClusterResult = admin.describeCluster()
val clusterInfo = new ClusterInfo
clusterInfo.setClusterId(clusterResult.clusterId().get(3000, TimeUnit.MILLISECONDS))
val acls = clusterResult.authorizedOperations().get(3000, TimeUnit.MILLISECONDS)
if (acls != null) {
clusterInfo.setAuthorizedOperations(acls.asScala.map(_.toString).toSet[String].asJava)
} else {
clusterInfo.setAuthorizedOperations(Collections.emptySet())
}
clusterInfo.setNodes(clusterResult.nodes().get(3000, TimeUnit.MILLISECONDS).asScala.map(BrokerNode.fromNode(_)).toSet[BrokerNode].asJava)
val id = clusterResult.controller().get(3000, TimeUnit.MILLISECONDS).id()
clusterInfo.getNodes.asScala.foreach(n => {
if (n.getId == id) {
n.setController(true)
}
})
clusterInfo
}, eh => {
log.error("get clusterInfo error.", eh)
new ClusterInfo
}).asInstanceOf[ClusterInfo]
}
}