diff --git a/src/main/java/com/xuxd/kafka/console/beans/dto/TopicThrottleDTO.java b/src/main/java/com/xuxd/kafka/console/beans/dto/TopicThrottleDTO.java new file mode 100644 index 0000000..8054be9 --- /dev/null +++ b/src/main/java/com/xuxd/kafka/console/beans/dto/TopicThrottleDTO.java @@ -0,0 +1,21 @@ +package com.xuxd.kafka.console.beans.dto; + +import com.xuxd.kafka.console.beans.enums.TopicThrottleSwitch; +import java.util.List; +import lombok.Data; + +/** + * kafka-console-ui. + * + * @author xuxd + * @date 2021-11-26 15:33:37 + **/ +@Data +public class TopicThrottleDTO { + + private String topic; + + private List partitions; + + private TopicThrottleSwitch operation; +} diff --git a/src/main/java/com/xuxd/kafka/console/beans/enums/TopicThrottleSwitch.java b/src/main/java/com/xuxd/kafka/console/beans/enums/TopicThrottleSwitch.java new file mode 100644 index 0000000..7563e5b --- /dev/null +++ b/src/main/java/com/xuxd/kafka/console/beans/enums/TopicThrottleSwitch.java @@ -0,0 +1,11 @@ +package com.xuxd.kafka.console.beans.enums; + +/** + * kafka-console-ui. + * + * @author xuxd + * @date 2021-11-26 15:33:07 + **/ +public enum TopicThrottleSwitch { + ON,OFF; +} diff --git a/src/main/java/com/xuxd/kafka/console/controller/TopicController.java b/src/main/java/com/xuxd/kafka/console/controller/TopicController.java index 4a87e57..1051840 100644 --- a/src/main/java/com/xuxd/kafka/console/controller/TopicController.java +++ b/src/main/java/com/xuxd/kafka/console/controller/TopicController.java @@ -3,6 +3,7 @@ package com.xuxd.kafka.console.controller; import com.xuxd.kafka.console.beans.ReplicaAssignment; import com.xuxd.kafka.console.beans.dto.AddPartitionDTO; import com.xuxd.kafka.console.beans.dto.NewTopicDTO; +import com.xuxd.kafka.console.beans.dto.TopicThrottleDTO; import com.xuxd.kafka.console.beans.enums.TopicType; import com.xuxd.kafka.console.service.TopicService; import java.util.ArrayList; @@ -82,4 +83,9 @@ public class TopicController { public Object updateReplicaAssignment(@RequestBody ReplicaAssignment assignment) { return topicService.updateReplicaAssignment(assignment); } + + @PostMapping("/replica/throttle") + public Object configThrottle(@RequestBody TopicThrottleDTO dto) { + return topicService.configThrottle(dto.getTopic(), dto.getPartitions(), dto.getOperation()); + } } diff --git a/src/main/java/com/xuxd/kafka/console/service/TopicService.java b/src/main/java/com/xuxd/kafka/console/service/TopicService.java index ea8d46b..dd0f584 100644 --- a/src/main/java/com/xuxd/kafka/console/service/TopicService.java +++ b/src/main/java/com/xuxd/kafka/console/service/TopicService.java @@ -2,6 +2,7 @@ package com.xuxd.kafka.console.service; import com.xuxd.kafka.console.beans.ReplicaAssignment; import com.xuxd.kafka.console.beans.ResponseData; +import com.xuxd.kafka.console.beans.enums.TopicThrottleSwitch; import com.xuxd.kafka.console.beans.enums.TopicType; import java.util.List; import java.util.Map; @@ -31,4 +32,6 @@ public interface TopicService { ResponseData getCurrentReplicaAssignment(String topic); ResponseData updateReplicaAssignment(ReplicaAssignment assignment); + + ResponseData configThrottle(String topic, List partitions, TopicThrottleSwitch throttleSwitch); } diff --git a/src/main/java/com/xuxd/kafka/console/service/impl/TopicServiceImpl.java b/src/main/java/com/xuxd/kafka/console/service/impl/TopicServiceImpl.java index b1884d5..5bdc960 100644 --- a/src/main/java/com/xuxd/kafka/console/service/impl/TopicServiceImpl.java +++ b/src/main/java/com/xuxd/kafka/console/service/impl/TopicServiceImpl.java @@ -3,6 +3,7 @@ package com.xuxd.kafka.console.service.impl; import com.google.gson.Gson; import com.xuxd.kafka.console.beans.ReplicaAssignment; import com.xuxd.kafka.console.beans.ResponseData; +import com.xuxd.kafka.console.beans.enums.TopicThrottleSwitch; import com.xuxd.kafka.console.beans.enums.TopicType; import com.xuxd.kafka.console.beans.vo.TopicDescriptionVO; import com.xuxd.kafka.console.beans.vo.TopicPartitionVO; @@ -148,4 +149,19 @@ public class TopicServiceImpl implements TopicService { boolean success = (boolean) tuple2._1(); return success ? ResponseData.create().success() : ResponseData.create().failed(tuple2._2()); } + + @Override public ResponseData configThrottle(String topic, List partitions, TopicThrottleSwitch throttleSwitch) { + Tuple2 tuple2 = null; + switch (throttleSwitch) { + case ON: + tuple2 = topicConsole.configThrottle(topic, partitions); + break; + case OFF: + break; + default: + throw new IllegalArgumentException("switch is unknown."); + } + boolean success = (boolean) tuple2._1(); + return success ? ResponseData.create().success() : ResponseData.create().failed(tuple2._2()); + } } diff --git a/src/main/scala/kafka/console/TopicConsole.scala b/src/main/scala/kafka/console/TopicConsole.scala index 26a9635..946e496 100644 --- a/src/main/scala/kafka/console/TopicConsole.scala +++ b/src/main/scala/kafka/console/TopicConsole.scala @@ -172,7 +172,7 @@ class TopicConsole(config: KafkaConfig) extends KafkaConsole(config: KafkaConfig if (interBrokerThrottle >= 0) { val moveMap = calculateProposedMoveMap(currentReassignments, proposedParts, currentParts) - modifyReassignmentThrottle(adminClient, moveMap, interBrokerThrottle) + modifyReassignmentThrottle(adminClient, moveMap) } if (logDirThrottle >= 0) { @@ -195,6 +195,35 @@ class TopicConsole(config: KafkaConfig) extends KafkaConsole(config: KafkaConfig } } + def configThrottle(topic: String, partitions: util.List[Integer]): (Boolean, String) = { + withAdminClientAndCatchError(admin => { + val throttles = { + if (partitions.get(0) == -1) { + Map(topic -> "*") + } else { + val topicDescription = admin.describeTopics(Collections.singleton(topic), withTimeoutMs(new DescribeTopicsOptions)) + .all().get().values().asScala.toList + + def convert(partition: Integer, replicas: scala.List[Int]): String = { + replicas.map("%d:%d".format(partition, _)).toSet.mkString(",") + } + + val ptor = topicDescription.head.partitions().asScala.map(info => (info.partition(), info.replicas().asScala.map(_.id()))).toMap + val conf = partitions.asScala.map(partition => convert(partition, ptor.get(partition) match { + case Some(v) => v.toList + case None => throw new IllegalArgumentException + })).toList + Map(topic -> conf.mkString(",")) + } + } + modifyTopicThrottles(admin, throttles, throttles) + (true, "") + }, e => { + log.error("configThrottle error, ", e) + (false, e.getMessage) + }).asInstanceOf[(Boolean, String)] + } + /** * Get the current replica assignments for some topics. * @@ -242,12 +271,9 @@ class TopicConsole(config: KafkaConfig) extends KafkaConsole(config: KafkaConfig } } - private def modifyReassignmentThrottle(admin: Admin, moveMap: MoveMap, interBrokerThrottle: Long): Unit = { + private def modifyReassignmentThrottle(admin: Admin, moveMap: MoveMap): Unit = { val leaderThrottles = calculateLeaderThrottles(moveMap) val followerThrottles = calculateFollowerThrottles(moveMap) modifyTopicThrottles(admin, leaderThrottles, followerThrottles) - -// val reassigningBrokers = calculateReassigningBrokers(moveMap) -// modifyInterBrokerThrottle(admin, reassigningBrokers, interBrokerThrottle) } } diff --git a/ui/src/utils/api.js b/ui/src/utils/api.js index f0c9025..47e4ab4 100644 --- a/ui/src/utils/api.js +++ b/ui/src/utils/api.js @@ -125,6 +125,10 @@ export const KafkaTopicApi = { url: "/topic/replica/assignment", method: "post", }, + configThrottle: { + url: "/topic/replica/throttle", + method: "post", + }, }; export const KafkaConsumerApi = { diff --git a/ui/src/views/topic/ConfigTopicThrottle.vue b/ui/src/views/topic/ConfigTopicThrottle.vue new file mode 100644 index 0000000..d2f25bc --- /dev/null +++ b/ui/src/views/topic/ConfigTopicThrottle.vue @@ -0,0 +1,156 @@ + + + + + diff --git a/ui/src/views/topic/Topic.vue b/ui/src/views/topic/Topic.vue index f5cb3b2..e5c7bb9 100644 --- a/ui/src/views/topic/Topic.vue +++ b/ui/src/views/topic/Topic.vue @@ -116,7 +116,7 @@ size="small" href="javascript:;" class="operation-btn" - @click="openThrottleDialog" + @click="openThrottleDialog(record.name)" >限流 @@ -152,6 +152,11 @@ :topic="selectDetail.resourceName" @closeUpdateReplicaDialog="closeUpdateReplicaDialog" > + @@ -167,6 +172,7 @@ import AddPartition from "@/views/topic/AddPartition"; import ConsumedDetail from "@/views/topic/ConsumedDetail"; import TopicConfig from "@/views/topic/TopicConfig"; import UpdateReplica from "@/views/topic/UpdateReplica"; +import ConfigTopicThrottle from "@/views/topic/ConfigTopicThrottle"; export default { name: "Topic", @@ -177,6 +183,7 @@ export default { ConsumedDetail, TopicConfig, UpdateReplica, + ConfigTopicThrottle, }, data() { return { @@ -199,6 +206,7 @@ export default { showConsumedDetailDialog: false, showTopicConfigDialog: false, showUpdateReplicaDialog: false, + showThrottleDialog: false, }; }, methods: { @@ -289,8 +297,12 @@ export default { openMessageStatsDialog() { this.$message.info("此功能尚不支持"); }, - openThrottleDialog() { - this.$message.info("此功能尚不支持"); + openThrottleDialog(topic) { + this.showThrottleDialog = true; + this.selectDetail.resourceName = topic; + }, + closeThrottleDialog() { + this.showThrottleDialog = false; }, }, created() {