diff --git a/src/main/java/com/xuxd/kafka/console/beans/vo/ConfigEntryVO.java b/src/main/java/com/xuxd/kafka/console/beans/vo/ConfigEntryVO.java new file mode 100644 index 0000000..b810352 --- /dev/null +++ b/src/main/java/com/xuxd/kafka/console/beans/vo/ConfigEntryVO.java @@ -0,0 +1,62 @@ +package com.xuxd.kafka.console.beans.vo; + +import java.util.HashMap; +import java.util.Map; +import lombok.Data; +import org.apache.kafka.clients.admin.ConfigEntry; + +/** + * kafka-console-ui. + * + * @author xuxd + * @date 2021-11-03 15:32:08 + **/ +@Data +public class ConfigEntryVO implements Comparable { + + private static final Map ORDER_DICTIONARY = new HashMap<>(); + + static { + ORDER_DICTIONARY.put("DYNAMIC_TOPIC_CONFIG", 0); + ORDER_DICTIONARY.put("DYNAMIC_BROKER_LOGGER_CONFIG", 1); + ORDER_DICTIONARY.put("DYNAMIC_BROKER_CONFIG", 2); + ORDER_DICTIONARY.put("DYNAMIC_DEFAULT_BROKER_CONFIG", 3); + ORDER_DICTIONARY.put("DEFAULT_CONFIG", 4); + ORDER_DICTIONARY.put("STATIC_BROKER_CONFIG", 5); + ORDER_DICTIONARY.put("UNKNOWN", 6); + } + + private String name; + + private String value; + + private String source; + + private boolean sensitive; + + private boolean readOnly; + + public static ConfigEntryVO from(ConfigEntry entry) { + ConfigEntryVO vo = new ConfigEntryVO(); + vo.name = entry.name(); + vo.value = entry.value(); + vo.source = entry.source().name(); + vo.sensitive = entry.isSensitive(); + vo.readOnly = entry.isReadOnly(); + return vo; + } + + @Override public int compareTo(Object o) { + + ConfigEntryVO that = (ConfigEntryVO) o; + + if (!this.source.equals(that.source)) { + return ORDER_DICTIONARY.get(this.source) - ORDER_DICTIONARY.get(that.source); + } + if (this.readOnly != that.readOnly) { + return this.readOnly ? 1 : -1; + } + + return this.name.compareTo(that.name); + } +} diff --git a/src/main/java/com/xuxd/kafka/console/controller/ConfigController.java b/src/main/java/com/xuxd/kafka/console/controller/ConfigController.java index 863c4bd..017506d 100644 --- a/src/main/java/com/xuxd/kafka/console/controller/ConfigController.java +++ b/src/main/java/com/xuxd/kafka/console/controller/ConfigController.java @@ -5,7 +5,6 @@ import com.xuxd.kafka.console.config.KafkaConfig; import com.xuxd.kafka.console.service.ConfigService; import com.xuxd.kafka.console.utils.ConvertUtil; import java.util.Map; -import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; @@ -43,7 +42,7 @@ public class ConfigController { } @GetMapping("/broker") - public Object getTopicConfig() { - return configService.getBrokerConfig(); + public Object getBrokerConfig(String brokerId) { + return configService.getBrokerConfig(brokerId); } } diff --git a/src/main/java/com/xuxd/kafka/console/service/ConfigService.java b/src/main/java/com/xuxd/kafka/console/service/ConfigService.java index 2350c4f..49b67d8 100644 --- a/src/main/java/com/xuxd/kafka/console/service/ConfigService.java +++ b/src/main/java/com/xuxd/kafka/console/service/ConfigService.java @@ -12,5 +12,5 @@ public interface ConfigService { ResponseData getTopicConfig(String topic); - ResponseData getBrokerConfig(); + ResponseData getBrokerConfig(String brokerId); } diff --git a/src/main/java/com/xuxd/kafka/console/service/impl/ConfigServiceImpl.java b/src/main/java/com/xuxd/kafka/console/service/impl/ConfigServiceImpl.java index 637fc81..851e0ee 100644 --- a/src/main/java/com/xuxd/kafka/console/service/impl/ConfigServiceImpl.java +++ b/src/main/java/com/xuxd/kafka/console/service/impl/ConfigServiceImpl.java @@ -1,8 +1,10 @@ package com.xuxd.kafka.console.service.impl; import com.xuxd.kafka.console.beans.ResponseData; +import com.xuxd.kafka.console.beans.vo.ConfigEntryVO; import com.xuxd.kafka.console.service.ConfigService; import java.util.List; +import java.util.stream.Collectors; import kafka.console.ConfigConsole; import org.apache.kafka.clients.admin.ConfigEntry; import org.springframework.beans.factory.annotation.Autowired; @@ -22,11 +24,14 @@ public class ConfigServiceImpl implements ConfigService { @Override public ResponseData getTopicConfig(String topic) { List configEntries = configConsole.getTopicConfig(topic); - return ResponseData.create().success(); + List vos = configEntries.stream().map(ConfigEntryVO::from).sorted().collect(Collectors.toList()); + return ResponseData.create().data(vos).success(); } - @Override public ResponseData getBrokerConfig() { - List configEntries = configConsole.getBrokerConfig(); - return ResponseData.create().success(); + @Override public ResponseData getBrokerConfig(String brokerId) { + List configEntries = configConsole.getBrokerConfig(brokerId); + List vos = configEntries.stream().map(ConfigEntryVO::from).sorted().collect(Collectors.toList()); + return ResponseData.create().data(vos).success(); } + } diff --git a/src/main/scala/kafka/console/ConfigConsole.scala b/src/main/scala/kafka/console/ConfigConsole.scala index deedee8..8ab8c0b 100644 --- a/src/main/scala/kafka/console/ConfigConsole.scala +++ b/src/main/scala/kafka/console/ConfigConsole.scala @@ -5,9 +5,9 @@ import java.util.Collections import java.util.concurrent.TimeUnit import com.xuxd.kafka.console.config.KafkaConfig -import kafka.admin.ConfigCommand.{BrokerDefaultEntityName, BrokerLoggerConfigType} +import kafka.admin.ConfigCommand.BrokerLoggerConfigType import kafka.server.ConfigType -import org.apache.kafka.clients.admin.{Admin, Config, ConfigEntry, DescribeConfigsOptions} +import org.apache.kafka.clients.admin.{Config, ConfigEntry, DescribeConfigsOptions} import org.apache.kafka.common.config.ConfigResource import org.apache.kafka.common.internals.Topic @@ -23,50 +23,39 @@ class ConfigConsole(config: KafkaConfig) extends KafkaConsole(config: KafkaConfi import java.util.List - - def getTopicConfig(topic: String) : List[ConfigEntry] = { + def getTopicConfig(topic: String): List[ConfigEntry] = { getConfig(ConfigType.Topic, topic) } - def getBrokerConfig() : List[ConfigEntry] = { - getConfig(ConfigType.Broker, "0") + def getBrokerConfig(broker: String): List[ConfigEntry] = { + getConfig(ConfigType.Broker, broker) } def getConfig(entityType: String, entityName: String): List[ConfigEntry] = { - getResourceConfig(entityType, entityName, true, false).asJava + getResourceConfig(entityType, entityName, false).asJava } - private def getResourceConfig(entityType: String, entityName: String, includeSynonyms: Boolean, - describeAll: Boolean) = { + private def getResourceConfig(entityType: String, entityName: String, includeSynonyms: Boolean) = { def validateBrokerId(): Unit = try entityName.toInt catch { case _: NumberFormatException => throw new IllegalArgumentException(s"The entity name for $entityType must be a valid integer broker id, found: $entityName") } - val (configResourceType, dynamicConfigSource) = entityType match { + val configResourceType = entityType match { case ConfigType.Topic => if (!entityName.isEmpty) Topic.validate(entityName) - (ConfigResource.Type.TOPIC, Some(ConfigEntry.ConfigSource.DYNAMIC_TOPIC_CONFIG)) - case ConfigType.Broker => entityName match { - case BrokerDefaultEntityName => - (ConfigResource.Type.BROKER, Some(ConfigEntry.ConfigSource.DYNAMIC_DEFAULT_BROKER_CONFIG)) - case _ => - validateBrokerId() - (ConfigResource.Type.BROKER, Some(ConfigEntry.ConfigSource.STATIC_BROKER_CONFIG)) - } + ConfigResource.Type.TOPIC + case ConfigType.Broker => + validateBrokerId() + ConfigResource.Type.BROKER case BrokerLoggerConfigType => if (!entityName.isEmpty) validateBrokerId() - (ConfigResource.Type.BROKER_LOGGER, None) + ConfigResource.Type.BROKER_LOGGER case entityType => throw new IllegalArgumentException(s"Invalid entity type: $entityType") } - val configSourceFilter = if (describeAll) - None - else - dynamicConfigSource - val configResource = new ConfigResource(configResourceType, entityName) val describeOptions = new DescribeConfigsOptions().includeSynonyms(includeSynonyms) val configs = withAdminClientAndCatchError(admin => Some(admin.describeConfigs(Collections.singleton(configResource), describeOptions) @@ -78,11 +67,7 @@ class ConfigConsole(config: KafkaConfig) extends KafkaConsole(config: KafkaConfi configs match { case None => Seq.empty - case Some(c: util.Map[ConfigResource, Config]) => c.get(configResource).entries.asScala - .filter(entry => configSourceFilter match { - case Some(configSource) => entry.source == configSource - case None => true - }).toSeq + case Some(c: util.Map[ConfigResource, Config]) => c.get(configResource).entries.asScala.toSeq } } diff --git a/ui/src/utils/api.js b/ui/src/utils/api.js index 3b170de..26fc0c4 100644 --- a/ui/src/utils/api.js +++ b/ui/src/utils/api.js @@ -54,6 +54,10 @@ export const KafkaConfigApi = { url: "/config", method: "get", }, + getBrokerConfig: { + url: "/config/broker", + method: "get", + }, }; export const KafkaTopicApi = { diff --git a/ui/src/views/cluster/BrokerConfig.vue b/ui/src/views/cluster/BrokerConfig.vue new file mode 100644 index 0000000..4bbce79 --- /dev/null +++ b/ui/src/views/cluster/BrokerConfig.vue @@ -0,0 +1,119 @@ + + + + + diff --git a/ui/src/views/cluster/Cluster.vue b/ui/src/views/cluster/Cluster.vue index 89e5f78..336c1af 100644 --- a/ui/src/views/cluster/Cluster.vue +++ b/ui/src/views/cluster/Cluster.vue @@ -6,15 +6,29 @@

集群ID:{{ clusterId }}

- +
{{ record.host }}:{{ record.port }}
+
+ 属性配置 + +
+ @@ -22,15 +36,19 @@