diff --git a/src/main/java/com/xuxd/kafka/console/beans/dto/AddPartitionDTO.java b/src/main/java/com/xuxd/kafka/console/beans/dto/AddPartitionDTO.java new file mode 100644 index 0000000..94cee6b --- /dev/null +++ b/src/main/java/com/xuxd/kafka/console/beans/dto/AddPartitionDTO.java @@ -0,0 +1,21 @@ +package com.xuxd.kafka.console.beans.dto; + +import java.util.ArrayList; +import java.util.List; +import lombok.Data; + +/** + * kafka-console-ui. + * + * @author xuxd + * @date 2021-10-18 19:55:40 + **/ +@Data +public class AddPartitionDTO { + + private String topic; + + private int addNum; + + private List> assignment = new ArrayList<>(); +} diff --git a/src/main/java/com/xuxd/kafka/console/beans/dto/NewTopicDTO.java b/src/main/java/com/xuxd/kafka/console/beans/dto/NewTopicDTO.java index 7ad0faa..221b100 100644 --- a/src/main/java/com/xuxd/kafka/console/beans/dto/NewTopicDTO.java +++ b/src/main/java/com/xuxd/kafka/console/beans/dto/NewTopicDTO.java @@ -21,7 +21,7 @@ public class NewTopicDTO { private Map configs = new HashMap<>(); public NewTopic toNewTopic() { - NewTopic topic = new NewTopic(name, numPartitions, replicationFactor); + NewTopic topic = new NewTopic(name.trim(), numPartitions, replicationFactor); if (MapUtils.isNotEmpty(configs)) { topic.configs(configs); } diff --git a/src/main/java/com/xuxd/kafka/console/controller/TopicController.java b/src/main/java/com/xuxd/kafka/console/controller/TopicController.java index 27690fc..b027f82 100644 --- a/src/main/java/com/xuxd/kafka/console/controller/TopicController.java +++ b/src/main/java/com/xuxd/kafka/console/controller/TopicController.java @@ -1,5 +1,6 @@ package com.xuxd.kafka.console.controller; +import com.xuxd.kafka.console.beans.dto.AddPartitionDTO; import com.xuxd.kafka.console.beans.dto.NewTopicDTO; import com.xuxd.kafka.console.beans.enums.TopicType; import com.xuxd.kafka.console.service.TopicService; @@ -37,11 +38,16 @@ public class TopicController { @GetMapping("/partition") public Object getTopicPartitionInfo(@RequestParam String topic) { - return topicService.getTopicPartitionInfo(topic); + return topicService.getTopicPartitionInfo(topic.trim()); } @PostMapping("/new") public Object createNewTopic(@RequestBody NewTopicDTO topicDTO) { return topicService.createTopic(topicDTO.toNewTopic()); } + + @PostMapping("/partition/new") + public Object addPartition(@RequestBody AddPartitionDTO partitionDTO) { + return topicService.addPartitions(partitionDTO.getTopic().trim(), partitionDTO.getAddNum(), partitionDTO.getAssignment()); + } } diff --git a/src/main/java/com/xuxd/kafka/console/service/TopicService.java b/src/main/java/com/xuxd/kafka/console/service/TopicService.java index 55f4201..6901386 100644 --- a/src/main/java/com/xuxd/kafka/console/service/TopicService.java +++ b/src/main/java/com/xuxd/kafka/console/service/TopicService.java @@ -2,6 +2,9 @@ package com.xuxd.kafka.console.service; import com.xuxd.kafka.console.beans.ResponseData; import com.xuxd.kafka.console.beans.enums.TopicType; +import java.util.List; +import java.util.Map; +import java.util.Properties; import org.apache.kafka.clients.admin.NewTopic; /** @@ -21,4 +24,6 @@ public interface TopicService { ResponseData getTopicPartitionInfo(String topic); ResponseData createTopic(NewTopic topic); + + ResponseData addPartitions(String topic, int addNum, List> newAssignmentst); } diff --git a/src/main/java/com/xuxd/kafka/console/service/impl/TopicServiceImpl.java b/src/main/java/com/xuxd/kafka/console/service/impl/TopicServiceImpl.java index dbff6e7..a64cbda 100644 --- a/src/main/java/com/xuxd/kafka/console/service/impl/TopicServiceImpl.java +++ b/src/main/java/com/xuxd/kafka/console/service/impl/TopicServiceImpl.java @@ -16,6 +16,7 @@ import java.util.stream.Collectors; import kafka.console.TopicConsole; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; +import org.apache.kafka.clients.admin.NewPartitions; import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.clients.admin.TopicDescription; import org.apache.kafka.common.TopicPartition; @@ -112,4 +113,21 @@ public class TopicServiceImpl implements TopicService { Tuple2 createResult = topicConsole.createTopic(topic); return (boolean) createResult._1 ? ResponseData.create().success() : ResponseData.create().failed(String.valueOf(createResult._2)); } + + @Override public ResponseData addPartitions(String topic, int addNum, List> newAssignments) { + List list = topicConsole.getTopicList(Collections.singleton(topic)); + if (list.isEmpty()) { + return ResponseData.create().failed("topic not exist."); + } + TopicDescription topicDescription = list.get(0); + + Map param = new HashMap<>(); + param.put(topic, (newAssignments.size() > 0 ? NewPartitions.increaseTo(topicDescription.partitions().size() + addNum, newAssignments) : + NewPartitions.increaseTo(topicDescription.partitions().size() + addNum))); + + Tuple2 tuple2 = topicConsole.createPartitions(param); + boolean success = (boolean) tuple2._1(); + + return success ? ResponseData.create().success() : ResponseData.create().failed(tuple2._2()); + } } diff --git a/src/main/scala/kafka/console/TopicConsole.scala b/src/main/scala/kafka/console/TopicConsole.scala index 4970df8..a2b5afa 100644 --- a/src/main/scala/kafka/console/TopicConsole.scala +++ b/src/main/scala/kafka/console/TopicConsole.scala @@ -5,7 +5,7 @@ import java.util.concurrent.TimeUnit import java.util.{Collections, List, Set} import com.xuxd.kafka.console.config.KafkaConfig -import org.apache.kafka.clients.admin.{CreateTopicsOptions, DeleteTopicsOptions, ListTopicsOptions, NewTopic, TopicDescription} +import org.apache.kafka.clients.admin._ import org.apache.kafka.common.TopicPartition import scala.jdk.CollectionConverters.{CollectionHasAsScala, SetHasAsJava} @@ -107,4 +107,18 @@ class TopicConsole(config: KafkaConfig) extends KafkaConsole(config: KafkaConfig (false, e.getMessage) }).asInstanceOf[(Boolean, String)] } + + /** + * create new partition. + */ + def createPartitions(newPartitions: util.Map[String, NewPartitions]): (Boolean, String) = { + withAdminClientAndCatchError(admin => { + admin.createPartitions(newPartitions, + new CreatePartitionsOptions().retryOnQuotaViolation(false)).all().get(timeoutMs, TimeUnit.MILLISECONDS) + (true, "") + }, e => { + log.error("create partition error, ", e) + (false, e.getMessage) + }).asInstanceOf[(Boolean, String)] + } } diff --git a/ui/src/views/topic/AddPartition.vue b/ui/src/views/topic/AddPartition.vue new file mode 100644 index 0000000..f123e62 --- /dev/null +++ b/ui/src/views/topic/AddPartition.vue @@ -0,0 +1,144 @@ + + + + + diff --git a/ui/src/views/topic/Topic.vue b/ui/src/views/topic/Topic.vue index 4121604..aa5c49a 100644 --- a/ui/src/views/topic/Topic.vue +++ b/ui/src/views/topic/Topic.vue @@ -70,6 +70,13 @@ >删除 + 增加分区 + + @@ -93,10 +104,11 @@ import { KafkaTopicApi } from "@/utils/api"; import notification from "ant-design-vue/es/notification"; import PartitionInfo from "@/views/topic/PartitionInfo"; import CreateTopic from "@/views/topic/CreateTopic"; +import AddPartition from "@/views/topic/AddPartition"; export default { name: "Topic", - components: { PartitionInfo, CreateTopic }, + components: { PartitionInfo, CreateTopic, AddPartition }, data() { return { queryParam: { type: "normal" }, @@ -114,6 +126,7 @@ export default { showPartitionInfo: false, loading: false, showCreateTopic: false, + showAddPartition: false, }; }, methods: { @@ -170,6 +183,15 @@ export default { this.getTopicList(); } }, + openAddPartitionDialog() { + this.showAddPartition = true; + }, + closeAddPartitionDialog(res) { + this.showAddPartition = false; + if (res.refresh) { + this.getTopicList(); + } + }, }, created() { this.getTopicList();