diff --git a/src/main/java/com/xuxd/kafka/console/beans/dto/BrokerThrottleDTO.java b/src/main/java/com/xuxd/kafka/console/beans/dto/BrokerThrottleDTO.java new file mode 100644 index 0000000..131d033 --- /dev/null +++ b/src/main/java/com/xuxd/kafka/console/beans/dto/BrokerThrottleDTO.java @@ -0,0 +1,22 @@ +package com.xuxd.kafka.console.beans.dto; + +import com.xuxd.kafka.console.beans.enums.ThrottleUnit; +import java.util.ArrayList; +import java.util.List; +import lombok.Data; + +/** + * kafka-console-ui. + * + * @author xuxd + * @date 2021-11-24 19:37:10 + **/ +@Data +public class BrokerThrottleDTO { + + private List brokerList = new ArrayList<>(); + + private long throttle; + + private ThrottleUnit unit; +} diff --git a/src/main/java/com/xuxd/kafka/console/beans/enums/ThrottleUnit.java b/src/main/java/com/xuxd/kafka/console/beans/enums/ThrottleUnit.java new file mode 100644 index 0000000..7aabe24 --- /dev/null +++ b/src/main/java/com/xuxd/kafka/console/beans/enums/ThrottleUnit.java @@ -0,0 +1,18 @@ +package com.xuxd.kafka.console.beans.enums; + +/** + * kafka-console-ui. + * + * @author xuxd + * @date 2021-11-24 19:38:00 + **/ +public enum ThrottleUnit { + KB, MB; + + public long toKb(long size) { + if (this == MB) { + return 1024 * size; + } + return size; + } +} diff --git a/src/main/java/com/xuxd/kafka/console/controller/OperationController.java b/src/main/java/com/xuxd/kafka/console/controller/OperationController.java index 400b7e5..e4d06f5 100644 --- a/src/main/java/com/xuxd/kafka/console/controller/OperationController.java +++ b/src/main/java/com/xuxd/kafka/console/controller/OperationController.java @@ -1,5 +1,6 @@ package com.xuxd.kafka.console.controller; +import com.xuxd.kafka.console.beans.dto.BrokerThrottleDTO; import com.xuxd.kafka.console.beans.dto.ReplicationDTO; import com.xuxd.kafka.console.beans.dto.SyncDataDTO; import com.xuxd.kafka.console.service.OperationService; @@ -52,4 +53,9 @@ public class OperationController { public Object electPreferredLeader(@RequestBody ReplicationDTO dto) { return operationService.electPreferredLeader(dto.getTopic(), dto.getPartition()); } + + @PostMapping("/broker/throttle") + public Object configThrottle(@RequestBody BrokerThrottleDTO dto) { + return operationService.configThrottle(dto.getBrokerList(), dto.getUnit().toKb(dto.getThrottle())); + } } diff --git a/src/main/java/com/xuxd/kafka/console/service/OperationService.java b/src/main/java/com/xuxd/kafka/console/service/OperationService.java index 3c27108..ef10b2c 100644 --- a/src/main/java/com/xuxd/kafka/console/service/OperationService.java +++ b/src/main/java/com/xuxd/kafka/console/service/OperationService.java @@ -1,6 +1,7 @@ package com.xuxd.kafka.console.service; import com.xuxd.kafka.console.beans.ResponseData; +import java.util.List; import java.util.Properties; /** @@ -20,4 +21,6 @@ public interface OperationService { ResponseData deleteAlignmentById(Long id); ResponseData electPreferredLeader(String topic, int partition); + + ResponseData configThrottle(List brokerList, long size); } diff --git a/src/main/java/com/xuxd/kafka/console/service/impl/OperationServiceImpl.java b/src/main/java/com/xuxd/kafka/console/service/impl/OperationServiceImpl.java index 7c996c6..9ca6850 100644 --- a/src/main/java/com/xuxd/kafka/console/service/impl/OperationServiceImpl.java +++ b/src/main/java/com/xuxd/kafka/console/service/impl/OperationServiceImpl.java @@ -123,4 +123,10 @@ public class OperationServiceImpl implements OperationService { return (boolean) tuple2._1() ? ResponseData.create().success() : ResponseData.create().failed(tuple2._2()); } + + @Override public ResponseData configThrottle(List brokerList, long size) { + Tuple2 tuple2 = operationConsole.modifyInterBrokerThrottle(new HashSet<>(brokerList), size); + + return (boolean) tuple2._1() ? ResponseData.create().success() : ResponseData.create().failed(tuple2._2()); + } } diff --git a/src/main/scala/kafka/console/OperationConsole.scala b/src/main/scala/kafka/console/OperationConsole.scala index 5d8ad35..a1d3338 100644 --- a/src/main/scala/kafka/console/OperationConsole.scala +++ b/src/main/scala/kafka/console/OperationConsole.scala @@ -1,14 +1,14 @@ package kafka.console -import java.util.concurrent.TimeUnit -import java.util.{Collections, Properties} - import com.xuxd.kafka.console.config.KafkaConfig +import kafka.admin.ReassignPartitionsCommand import org.apache.kafka.clients.admin.ElectLeadersOptions import org.apache.kafka.clients.consumer.KafkaConsumer import org.apache.kafka.common.serialization.ByteArrayDeserializer import org.apache.kafka.common.{ElectionType, TopicPartition} +import java.util.concurrent.TimeUnit +import java.util.{Collections, Properties} import scala.jdk.CollectionConverters.{CollectionHasAsScala, ListHasAsScala, MapHasAsScala, SeqHasAsJava, SetHasAsJava, SetHasAsScala} /** @@ -210,4 +210,15 @@ class OperationConsole(config: KafkaConfig, topicConsole: TopicConsole, val topicList = topicConsole.getTopicList(Collections.singleton(topic)) topicList.asScala.flatMap(_.partitions().asScala.map(t => new TopicPartition(topic, t.partition()))).toSet.asJava } -} + + def modifyInterBrokerThrottle(reassigningBrokers: util.Set[Int], + interBrokerThrottle: Long): (Boolean, String) = { + withAdminClientAndCatchError(admin => { + ReassignPartitionsCommand.modifyInterBrokerThrottle(admin, reassigningBrokers.asScala.toSet, interBrokerThrottle) + (true, "") + }, e => { + log.error("modifyInterBrokerThrottle error.", e) + (false, e.getMessage) + }).asInstanceOf[(Boolean, String)] + } +} \ No newline at end of file diff --git a/ui/src/utils/api.js b/ui/src/utils/api.js index 5dffc44..c0c83e7 100644 --- a/ui/src/utils/api.js +++ b/ui/src/utils/api.js @@ -198,4 +198,8 @@ export const KafkaOpApi = { url: "/op/replication/preferred", method: "post", }, + configThrottle: { + url: "/op/broker/throttle", + method: "post", + }, }; diff --git a/ui/src/views/op/ConfigThrottle.vue b/ui/src/views/op/ConfigThrottle.vue new file mode 100644 index 0000000..b45df31 --- /dev/null +++ b/ui/src/views/op/ConfigThrottle.vue @@ -0,0 +1,157 @@ + + + + + diff --git a/ui/src/views/op/Operation.vue b/ui/src/views/op/Operation.vue index 4a23c94..dc7b0a7 100644 --- a/ui/src/views/op/Operation.vue +++ b/ui/src/views/op/Operation.vue @@ -3,7 +3,9 @@

- 配置限流 + + 配置限流 + 设置指定broker上的topic的副本之间数据同步占用的带宽,这个设置是broker级别的,但是设置后还要去对应的topic上进行限流配置,指定对这个topic的相关副本进行限制 + +

@@ -96,6 +103,7 @@ import MinOffsetAlignment from "@/views/op/MinOffsetAlignment"; import OffsetAlignmentTable from "@/views/op/OffsetAlignmentTable"; import ElectPreferredLeader from "@/views/op/ElectPreferredLeader"; import DataSyncScheme from "@/views/op/DataSyncScheme"; +import ConfigThrottle from "@/views/op/ConfigThrottle"; export default { name: "Operation", components: { @@ -104,6 +112,7 @@ export default { OffsetAlignmentTable, ElectPreferredLeader, DataSyncScheme, + ConfigThrottle, }, data() { return { @@ -116,6 +125,9 @@ export default { replicationManager: { showElectPreferredLeaderDialog: false, }, + brokerManager: { + showConfigThrottleDialog: false, + }, }; }, methods: { @@ -149,6 +161,12 @@ export default { closeElectPreferredLeaderDialog() { this.replicationManager.showElectPreferredLeaderDialog = false; }, + openConfigThrottleDialog() { + this.brokerManager.showConfigThrottleDialog = true; + }, + closeConfigThrottleDialog() { + this.brokerManager.showConfigThrottleDialog = false; + }, }, };