diff --git a/src/main/java/com/xuxd/kafka/console/beans/dto/AlterConfigDTO.java b/src/main/java/com/xuxd/kafka/console/beans/dto/AlterConfigDTO.java new file mode 100644 index 0000000..e4f8a91 --- /dev/null +++ b/src/main/java/com/xuxd/kafka/console/beans/dto/AlterConfigDTO.java @@ -0,0 +1,24 @@ +package com.xuxd.kafka.console.beans.dto; + +import lombok.Data; +import org.apache.kafka.clients.admin.ConfigEntry; + +/** + * kafka-console-ui. + * + * @author xuxd + * @date 2021-11-04 15:39:07 + **/ +@Data +public class AlterConfigDTO { + + private String entity; + + private String name; + + private String value; + + public ConfigEntry to() { + return new ConfigEntry(name, value); + } +} diff --git a/src/main/java/com/xuxd/kafka/console/beans/enums/AlterType.java b/src/main/java/com/xuxd/kafka/console/beans/enums/AlterType.java new file mode 100644 index 0000000..708c42e --- /dev/null +++ b/src/main/java/com/xuxd/kafka/console/beans/enums/AlterType.java @@ -0,0 +1,11 @@ +package com.xuxd.kafka.console.beans.enums; + +/** + * kafka-console-ui. + * + * @author xuxd + * @date 2021-11-04 15:36:09 + **/ +public enum AlterType { + SET,DELETE +} diff --git a/src/main/java/com/xuxd/kafka/console/beans/vo/ConfigEntryVO.java b/src/main/java/com/xuxd/kafka/console/beans/vo/ConfigEntryVO.java index b810352..7cb0a2d 100644 --- a/src/main/java/com/xuxd/kafka/console/beans/vo/ConfigEntryVO.java +++ b/src/main/java/com/xuxd/kafka/console/beans/vo/ConfigEntryVO.java @@ -50,13 +50,14 @@ public class ConfigEntryVO implements Comparable { ConfigEntryVO that = (ConfigEntryVO) o; - if (!this.source.equals(that.source)) { - return ORDER_DICTIONARY.get(this.source) - ORDER_DICTIONARY.get(that.source); - } if (this.readOnly != that.readOnly) { return this.readOnly ? 1 : -1; } + if (!this.source.equals(that.source)) { + return ORDER_DICTIONARY.get(this.source) - ORDER_DICTIONARY.get(that.source); + } + return this.name.compareTo(that.name); } } diff --git a/src/main/java/com/xuxd/kafka/console/controller/ConfigController.java b/src/main/java/com/xuxd/kafka/console/controller/ConfigController.java index 017506d..69f5f9e 100644 --- a/src/main/java/com/xuxd/kafka/console/controller/ConfigController.java +++ b/src/main/java/com/xuxd/kafka/console/controller/ConfigController.java @@ -1,11 +1,16 @@ package com.xuxd.kafka.console.controller; import com.xuxd.kafka.console.beans.ResponseData; +import com.xuxd.kafka.console.beans.dto.AlterConfigDTO; +import com.xuxd.kafka.console.beans.enums.AlterType; import com.xuxd.kafka.console.config.KafkaConfig; import com.xuxd.kafka.console.service.ConfigService; import com.xuxd.kafka.console.utils.ConvertUtil; import java.util.Map; +import org.springframework.web.bind.annotation.DeleteMapping; import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; @@ -45,4 +50,14 @@ public class ConfigController { public Object getBrokerConfig(String brokerId) { return configService.getBrokerConfig(brokerId); } + + @PostMapping("/broker") + public Object setBrokerConfig(@RequestBody AlterConfigDTO dto) { + return configService.alterBrokerConfig(dto.getEntity(), dto.to(), AlterType.SET); + } + + @DeleteMapping("/broker") + public Object deleteBrokerConfig(@RequestBody AlterConfigDTO dto) { + return configService.alterBrokerConfig(dto.getEntity(), dto.to(), AlterType.DELETE); + } } diff --git a/src/main/java/com/xuxd/kafka/console/service/ConfigService.java b/src/main/java/com/xuxd/kafka/console/service/ConfigService.java index 49b67d8..dc10a23 100644 --- a/src/main/java/com/xuxd/kafka/console/service/ConfigService.java +++ b/src/main/java/com/xuxd/kafka/console/service/ConfigService.java @@ -1,6 +1,8 @@ package com.xuxd.kafka.console.service; import com.xuxd.kafka.console.beans.ResponseData; +import com.xuxd.kafka.console.beans.enums.AlterType; +import org.apache.kafka.clients.admin.ConfigEntry; /** * kafka-console-ui. @@ -13,4 +15,6 @@ public interface ConfigService { ResponseData getTopicConfig(String topic); ResponseData getBrokerConfig(String brokerId); + + ResponseData alterBrokerConfig(String brokerId, ConfigEntry entry, AlterType type); } diff --git a/src/main/java/com/xuxd/kafka/console/service/impl/ConfigServiceImpl.java b/src/main/java/com/xuxd/kafka/console/service/impl/ConfigServiceImpl.java index 851e0ee..0af307e 100644 --- a/src/main/java/com/xuxd/kafka/console/service/impl/ConfigServiceImpl.java +++ b/src/main/java/com/xuxd/kafka/console/service/impl/ConfigServiceImpl.java @@ -1,6 +1,7 @@ package com.xuxd.kafka.console.service.impl; import com.xuxd.kafka.console.beans.ResponseData; +import com.xuxd.kafka.console.beans.enums.AlterType; import com.xuxd.kafka.console.beans.vo.ConfigEntryVO; import com.xuxd.kafka.console.service.ConfigService; import java.util.List; @@ -9,6 +10,7 @@ import kafka.console.ConfigConsole; import org.apache.kafka.clients.admin.ConfigEntry; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; +import scala.Tuple2; /** * kafka-console-ui. @@ -34,4 +36,17 @@ public class ConfigServiceImpl implements ConfigService { return ResponseData.create().data(vos).success(); } + @Override public ResponseData alterBrokerConfig(String brokerId, ConfigEntry entry, AlterType type) { + Tuple2 tuple2 = null; + switch (type) { + case SET: + tuple2 = configConsole.setBrokerConfig(brokerId, entry); + break; + case DELETE: + tuple2 = configConsole.deleteBrokerConfig(brokerId, entry); + break; + } + return (boolean) tuple2._1() ? ResponseData.create().success() : ResponseData.create().failed(tuple2._2()); + } + } diff --git a/src/main/scala/kafka/console/ConfigConsole.scala b/src/main/scala/kafka/console/ConfigConsole.scala index 8ab8c0b..541ecfe 100644 --- a/src/main/scala/kafka/console/ConfigConsole.scala +++ b/src/main/scala/kafka/console/ConfigConsole.scala @@ -7,11 +7,11 @@ import java.util.concurrent.TimeUnit import com.xuxd.kafka.console.config.KafkaConfig import kafka.admin.ConfigCommand.BrokerLoggerConfigType import kafka.server.ConfigType -import org.apache.kafka.clients.admin.{Config, ConfigEntry, DescribeConfigsOptions} +import org.apache.kafka.clients.admin.{AlterConfigOp, Config, ConfigEntry, DescribeConfigsOptions} import org.apache.kafka.common.config.ConfigResource import org.apache.kafka.common.internals.Topic -import scala.jdk.CollectionConverters.{CollectionHasAsScala, SeqHasAsJava} +import scala.jdk.CollectionConverters.{CollectionHasAsScala, MapHasAsJava, SeqHasAsJava} /** * kafka-console-ui. @@ -31,11 +31,52 @@ class ConfigConsole(config: KafkaConfig) extends KafkaConsole(config: KafkaConfi getConfig(ConfigType.Broker, broker) } + def setBrokerConfig(broker: String, entry: ConfigEntry): (Boolean, String) = { + alterConfig(ConfigType.Broker, broker, entry, AlterConfigOp.OpType.SET) + } + + def deleteBrokerConfig(broker: String, entry: ConfigEntry): (Boolean, String) = { + alterConfig(ConfigType.Broker, broker, entry, AlterConfigOp.OpType.DELETE) + } + def getConfig(entityType: String, entityName: String): List[ConfigEntry] = { getResourceConfig(entityType, entityName, false).asJava } + def alterConfig(entityType: String, entityName: String, entry: ConfigEntry, + opType: AlterConfigOp.OpType): (Boolean, String) = { + withAdminClientAndCatchError(admin => { + val configResource = new ConfigResource(getResourceTypeAndValidate(entityType, entityName), entityName) + + val config = Map(configResource -> Collections.singletonList(new AlterConfigOp(entry, opType)).asInstanceOf[util.Collection[AlterConfigOp]]) + admin.incrementalAlterConfigs(config.asJava).all().get(timeoutMs, TimeUnit.MILLISECONDS) + (true, "") + }, e => { + log.error("alter config error.", e) + (false, e.getMessage) + }).asInstanceOf[(Boolean, String)] + } + private def getResourceConfig(entityType: String, entityName: String, includeSynonyms: Boolean) = { + val configResourceType = getResourceTypeAndValidate(entityType, entityName) + + val configResource = new ConfigResource(configResourceType, entityName) + val describeOptions = new DescribeConfigsOptions().includeSynonyms(includeSynonyms) + val configs = withAdminClientAndCatchError(admin => Some(admin.describeConfigs(Collections.singleton(configResource), describeOptions) + .all.get(30, TimeUnit.SECONDS)), + e => { + log.error("describeConfigs error.", e) + None + }) + + configs match { + case None => Seq.empty + case Some(c: util.Map[ConfigResource, Config]) => c.get(configResource).entries.asScala.toSeq + } + + } + + private def getResourceTypeAndValidate(entityType: String, entityName: String): ConfigResource.Type = { def validateBrokerId(): Unit = try entityName.toInt catch { case _: NumberFormatException => throw new IllegalArgumentException(s"The entity name for $entityType must be a valid integer broker id, found: $entityName") @@ -55,20 +96,6 @@ class ConfigConsole(config: KafkaConfig) extends KafkaConsole(config: KafkaConfi ConfigResource.Type.BROKER_LOGGER case entityType => throw new IllegalArgumentException(s"Invalid entity type: $entityType") } - - val configResource = new ConfigResource(configResourceType, entityName) - val describeOptions = new DescribeConfigsOptions().includeSynonyms(includeSynonyms) - val configs = withAdminClientAndCatchError(admin => Some(admin.describeConfigs(Collections.singleton(configResource), describeOptions) - .all.get(30, TimeUnit.SECONDS)), - e => { - log.error("describeConfigs error.", e) - None - }) - - configs match { - case None => Seq.empty - case Some(c: util.Map[ConfigResource, Config]) => c.get(configResource).entries.asScala.toSeq - } - + configResourceType } } diff --git a/ui/src/utils/api.js b/ui/src/utils/api.js index 26fc0c4..a071277 100644 --- a/ui/src/utils/api.js +++ b/ui/src/utils/api.js @@ -58,6 +58,14 @@ export const KafkaConfigApi = { url: "/config/broker", method: "get", }, + setBrokerConfig: { + url: "/config/broker", + method: "post", + }, + deleteBrokerConfig: { + url: "/config/broker", + method: "delete", + }, }; export const KafkaTopicApi = { diff --git a/ui/src/views/cluster/BrokerConfig.vue b/ui/src/views/cluster/BrokerConfig.vue index 4bbce79..eb4e0f3 100644 --- a/ui/src/views/cluster/BrokerConfig.vue +++ b/ui/src/views/cluster/BrokerConfig.vue @@ -2,7 +2,7 @@
+
+ +

+
+ -
    -
      - {{ - i.topic - }}: - {{ - i.partition - }} -
    -
+
+ 编辑 + + + 删除 + + +
+
@@ -37,9 +65,11 @@ import request from "@/utils/request"; import { KafkaConfigApi } from "@/utils/api"; import notification from "ant-design-vue/es/notification"; +import EditConfig from "@/views/cluster/EditConfig"; export default { name: "BrokerConfig", + components: { EditConfig }, props: { group: { type: String, @@ -60,18 +90,22 @@ export default { show: this.visible, data: [], loading: false, + search: "", + filterData: [], + showEditConfigDialog: false, + selectData: {}, }; }, watch: { visible(v) { this.show = v; if (this.show) { - this.getPartitionInfo(); + this.getBrokerConfig(); } }, }, methods: { - getPartitionInfo() { + getBrokerConfig() { this.loading = true; request({ url: KafkaConfigApi.getBrokerConfig.url + "?brokerId=" + this.id, @@ -85,13 +119,55 @@ export default { }); } else { this.data = res.data; + this.searchData(); } }); }, + deleteBrokerConfig(record) { + this.selectData = record; + this.loading = true; + request({ + url: KafkaConfigApi.deleteBrokerConfig.url, + method: KafkaConfigApi.deleteBrokerConfig.method, + data: { + name: record.name, + value: record.value, + entity: this.id, + }, + }).then((res) => { + this.loading = false; + if (res.code != 0) { + notification.error({ + message: "error", + description: res.msg, + }); + } else { + this.getBrokerConfig(); + } + }); + }, + searchData() { + this.filterData = this.data.filter( + (e) => e.name.indexOf(this.search) >= 0 + ); + }, handleCancel() { this.data = []; this.$emit("closeBrokerConfigDialog", {}); }, + openEditConfigDialog(record) { + this.showEditConfigDialog = true; + this.selectData = record; + }, + closeEditConfigDialog(params) { + this.showEditConfigDialog = false; + if (params.refresh) { + this.getBrokerConfig(); + } + }, + isDynamic(source) { + return source.startsWith("DYNAMIC_"); + }, }, }; @@ -113,7 +189,17 @@ const columns = [ key: "source", width: 200, }, + { + title: "操作", + key: "operation", + scopedSlots: { customRender: "operation" }, + width: 150, + }, ]; - + diff --git a/ui/src/views/cluster/Cluster.vue b/ui/src/views/cluster/Cluster.vue index 336c1af..7152f0c 100644 --- a/ui/src/views/cluster/Cluster.vue +++ b/ui/src/views/cluster/Cluster.vue @@ -26,7 +26,7 @@ diff --git a/ui/src/views/cluster/EditConfig.vue b/ui/src/views/cluster/EditConfig.vue new file mode 100644 index 0000000..bf48c4f --- /dev/null +++ b/ui/src/views/cluster/EditConfig.vue @@ -0,0 +1,117 @@ + + + + +