From efe4a59c7ece77a2940f87b8b124d9ff2fcab6ce Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=AE=B8=E6=99=93=E4=B8=9C?= <763795151@qq.com> Date: Wed, 10 Nov 2021 20:45:15 +0800 Subject: [PATCH] preferred as leader. --- .../console/beans/dto/ReplicationDTO.java | 17 ++ .../controller/OperationController.java | 6 + .../console/service/OperationService.java | 2 + .../service/impl/OperationServiceImpl.java | 15 ++ .../scala/kafka/console/KafkaConsole.scala | 6 +- .../kafka/console/OperationConsole.scala | 20 ++- ui/src/utils/api.js | 4 + ui/src/utils/request.js | 2 +- ui/src/views/op/ElectPreferredLeader.vue | 159 ++++++++++++++++++ ui/src/views/op/OffsetAlignmentTable.vue | 7 +- ui/src/views/op/Operation.vue | 39 +++-- 11 files changed, 261 insertions(+), 16 deletions(-) create mode 100644 src/main/java/com/xuxd/kafka/console/beans/dto/ReplicationDTO.java create mode 100644 ui/src/views/op/ElectPreferredLeader.vue diff --git a/src/main/java/com/xuxd/kafka/console/beans/dto/ReplicationDTO.java b/src/main/java/com/xuxd/kafka/console/beans/dto/ReplicationDTO.java new file mode 100644 index 0000000..54d27b0 --- /dev/null +++ b/src/main/java/com/xuxd/kafka/console/beans/dto/ReplicationDTO.java @@ -0,0 +1,17 @@ +package com.xuxd.kafka.console.beans.dto; + +import lombok.Data; + +/** + * kafka-console-ui. + * + * @author xuxd + * @date 2021-11-10 20:09:20 + **/ +@Data +public class ReplicationDTO { + + private String topic; + + private int partition; +} diff --git a/src/main/java/com/xuxd/kafka/console/controller/OperationController.java b/src/main/java/com/xuxd/kafka/console/controller/OperationController.java index ead2d52..400b7e5 100644 --- a/src/main/java/com/xuxd/kafka/console/controller/OperationController.java +++ b/src/main/java/com/xuxd/kafka/console/controller/OperationController.java @@ -1,5 +1,6 @@ package com.xuxd.kafka.console.controller; +import com.xuxd.kafka.console.beans.dto.ReplicationDTO; import com.xuxd.kafka.console.beans.dto.SyncDataDTO; import com.xuxd.kafka.console.service.OperationService; import org.apache.kafka.clients.admin.AdminClientConfig; @@ -46,4 +47,9 @@ public class OperationController { public Object deleteAlignment(@RequestParam Long id) { return operationService.deleteAlignmentById(id); } + + @PostMapping("/replication/preferred") + public Object electPreferredLeader(@RequestBody ReplicationDTO dto) { + return operationService.electPreferredLeader(dto.getTopic(), dto.getPartition()); + } } diff --git a/src/main/java/com/xuxd/kafka/console/service/OperationService.java b/src/main/java/com/xuxd/kafka/console/service/OperationService.java index 67ea35e..3c27108 100644 --- a/src/main/java/com/xuxd/kafka/console/service/OperationService.java +++ b/src/main/java/com/xuxd/kafka/console/service/OperationService.java @@ -18,4 +18,6 @@ public interface OperationService { ResponseData getAlignmentList(); ResponseData deleteAlignmentById(Long id); + + ResponseData electPreferredLeader(String topic, int partition); } diff --git a/src/main/java/com/xuxd/kafka/console/service/impl/OperationServiceImpl.java b/src/main/java/com/xuxd/kafka/console/service/impl/OperationServiceImpl.java index 863b2c2..eb460a0 100644 --- a/src/main/java/com/xuxd/kafka/console/service/impl/OperationServiceImpl.java +++ b/src/main/java/com/xuxd/kafka/console/service/impl/OperationServiceImpl.java @@ -9,9 +9,11 @@ import com.xuxd.kafka.console.beans.vo.OffsetAlignmentVO; import com.xuxd.kafka.console.dao.MinOffsetAlignmentMapper; import com.xuxd.kafka.console.service.OperationService; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Properties; +import java.util.Set; import kafka.console.OperationConsole; import org.apache.kafka.common.TopicPartition; import org.springframework.beans.factory.ObjectProvider; @@ -107,4 +109,17 @@ public class OperationServiceImpl implements OperationService { minOffsetAlignmentMapper.deleteById(id); return ResponseData.create().success(); } + + @Override public ResponseData electPreferredLeader(String topic, int partition) { + Set partitions = new HashSet<>(); + if (partition != -1) { + partitions.add(new TopicPartition(topic, partition)); + } else { + + partitions.addAll(operationConsole.getTopicPartitions(topic)); + } + Tuple2 tuple2 = operationConsole.electPreferredLeader(partitions); + + return (boolean) tuple2._1() ? ResponseData.create().success() : ResponseData.create().failed(tuple2._2()); + } } diff --git a/src/main/scala/kafka/console/KafkaConsole.scala b/src/main/scala/kafka/console/KafkaConsole.scala index 049e574..0820d4e 100644 --- a/src/main/scala/kafka/console/KafkaConsole.scala +++ b/src/main/scala/kafka/console/KafkaConsole.scala @@ -5,7 +5,7 @@ import java.util.Properties import com.xuxd.kafka.console.config.KafkaConfig import kafka.zk.{AdminZkClient, KafkaZkClient} import org.apache.kafka.clients.CommonClientConfigs -import org.apache.kafka.clients.admin.{Admin, AdminClientConfig} +import org.apache.kafka.clients.admin.{AbstractOptions, Admin, AdminClientConfig} import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer} import org.apache.kafka.common.config.SaslConfigs import org.apache.kafka.common.serialization.ByteArrayDeserializer @@ -69,6 +69,10 @@ class KafkaConsole(config: KafkaConfig) { Admin.create(props) } + protected def withTimeoutMs[T <: AbstractOptions[T]](options: T) = { + options.timeoutMs(timeoutMs) + } + private def createAdminClient(): Admin = { Admin.create(getProps()) } diff --git a/src/main/scala/kafka/console/OperationConsole.scala b/src/main/scala/kafka/console/OperationConsole.scala index d48afc3..33c9a07 100644 --- a/src/main/scala/kafka/console/OperationConsole.scala +++ b/src/main/scala/kafka/console/OperationConsole.scala @@ -4,11 +4,12 @@ import java.util.concurrent.TimeUnit import java.util.{Collections, Properties} import com.xuxd.kafka.console.config.KafkaConfig +import org.apache.kafka.clients.admin.ElectLeadersOptions import org.apache.kafka.clients.consumer.KafkaConsumer -import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.serialization.ByteArrayDeserializer +import org.apache.kafka.common.{ElectionType, TopicPartition} -import scala.jdk.CollectionConverters.{CollectionHasAsScala, ListHasAsScala, MapHasAsScala, SeqHasAsJava, SetHasAsScala} +import scala.jdk.CollectionConverters.{CollectionHasAsScala, ListHasAsScala, MapHasAsScala, SeqHasAsJava, SetHasAsJava, SetHasAsScala} /** * kafka-console-ui. @@ -194,4 +195,19 @@ class OperationConsole(config: KafkaConfig, topicConsole: TopicConsole, thatConsumer.close() } } + + def electPreferredLeader(partitions: util.Set[TopicPartition]): (Boolean, String) = { + withAdminClientAndCatchError(admin => { + admin.electLeaders(ElectionType.PREFERRED, partitions, withTimeoutMs(new ElectLeadersOptions)).all().get() + (true, "") + }, e => { + log.error("alter config error.", e) + (false, e.getMessage) + }).asInstanceOf[(Boolean, String)] + } + + def getTopicPartitions(topic: String): util.Set[TopicPartition] = { + val topicList = topicConsole.getTopicList(Collections.singleton(topic)) + topicList.asScala.flatMap(_.partitions().asScala.map(t => new TopicPartition(topic, t.partition()))).toSet.asJava + } } diff --git a/ui/src/utils/api.js b/ui/src/utils/api.js index e89eef7..f5e689d 100644 --- a/ui/src/utils/api.js +++ b/ui/src/utils/api.js @@ -182,4 +182,8 @@ export const KafkaOpApi = { url: "/op/sync/alignment", method: "delete", }, + electPreferredLeader: { + url: "/op/replication/preferred", + method: "post", + }, }; diff --git a/ui/src/utils/request.js b/ui/src/utils/request.js index ec58573..27b6755 100644 --- a/ui/src/utils/request.js +++ b/ui/src/utils/request.js @@ -6,7 +6,7 @@ import { VueAxios } from "./axios"; const request = axios.create({ // API 请求的默认前缀 baseURL: process.env.VUE_APP_API_BASE_URL, - timeout: 10000, // 请求超时时间 + timeout: 30000, // 请求超时时间 }); // 异常拦截处理器 diff --git a/ui/src/views/op/ElectPreferredLeader.vue b/ui/src/views/op/ElectPreferredLeader.vue new file mode 100644 index 0000000..e5bd8f4 --- /dev/null +++ b/ui/src/views/op/ElectPreferredLeader.vue @@ -0,0 +1,159 @@ + + + + + diff --git a/ui/src/views/op/OffsetAlignmentTable.vue b/ui/src/views/op/OffsetAlignmentTable.vue index 471eeb0..b527927 100644 --- a/ui/src/views/op/OffsetAlignmentTable.vue +++ b/ui/src/views/op/OffsetAlignmentTable.vue @@ -11,7 +11,12 @@ >
- +
      {{ diff --git a/ui/src/views/op/Operation.vue b/ui/src/views/op/Operation.vue index c9aa5d5..5a174d6 100644 --- a/ui/src/views/op/Operation.vue +++ b/ui/src/views/op/Operation.vue @@ -3,7 +3,9 @@

      - 首选副本作为leader + + 首选副本作为leader + 将集群中所有分区leader副本设置为首选副本

      @@ -11,15 +13,11 @@
      - - - - - - - - - +

      + 数据同步方案 + + 新老集群迁移、数据同步解决方案 +

      最小位移对齐 @@ -57,6 +55,10 @@ :visible="syncData.showOffsetAlignmentInfoDialog" @closeOffsetAlignmentInfoDialog="closeOffsetAlignmentInfoDialog" > +

      @@ -64,9 +66,15 @@ import SyncConsumerOffset from "@/views/op/SyncConsumerOffset"; import MinOffsetAlignment from "@/views/op/MinOffsetAlignment"; import OffsetAlignmentTable from "@/views/op/OffsetAlignmentTable"; +import ElectPreferredLeader from "@/views/op/ElectPreferredLeader"; export default { name: "Operation", - components: { SyncConsumerOffset, MinOffsetAlignment, OffsetAlignmentTable }, + components: { + SyncConsumerOffset, + MinOffsetAlignment, + OffsetAlignmentTable, + ElectPreferredLeader, + }, data() { return { syncData: { @@ -74,6 +82,9 @@ export default { showMinOffsetAlignmentDialog: false, showOffsetAlignmentInfoDialog: false, }, + replicationManager: { + showElectPreferredLeaderDialog: false, + }, }; }, methods: { @@ -95,6 +106,12 @@ export default { closeOffsetAlignmentInfoDialog() { this.syncData.showOffsetAlignmentInfoDialog = false; }, + openElectPreferredLeaderDialog() { + this.replicationManager.showElectPreferredLeaderDialog = true; + }, + closeElectPreferredLeaderDialog() { + this.replicationManager.showElectPreferredLeaderDialog = false; + }, }, };