10 Commits

Author SHA1 Message Date
许晓东
8131cb1a42 发布1.0.4安装包下载地址 2022-02-16 20:01:06 +08:00
许晓东
1dd6466261 副本重分配 2022-02-16 19:50:35 +08:00
许晓东
dda08a2152 副本重分配-》生成分配计划 2022-02-15 20:13:07 +08:00
许晓东
01c7121ee4 集群节点列表有序 2022-01-22 23:33:13 +08:00
许晓东
d939d7653c 主页展示Broker API的版本兼容信息 2022-01-22 23:07:41 +08:00
许晓东
058cd5a24e 查询当前重分配,版本不支持异常处理 2022-01-20 13:44:37 +08:00
许晓东
db3f55ac4a polish README 2022-01-19 19:05:00 +08:00
许晓东
a311a34537 分区比较栈溢出bug修复 2022-01-18 20:42:11 +08:00
许晓东
e8fe2ea1c7 集群名称支持中文,消息查询可选择时间展示顺序 2022-01-13 14:19:17 +08:00
许晓东
10302dd39c v1.0.3安装包下载地址 2022-01-09 23:57:00 +08:00
27 changed files with 1146 additions and 132 deletions

View File

@@ -7,7 +7,8 @@
如果github能查看图片的话可以点击[查看菜单页面](./document/overview/概览.md),查看每个页面的样子
## 集群迁移支持说明
当前主分支及日后版本不再提供消息同步、集群迁移的解决方案,如有需要,查看:[集群迁移说明](./document/datasync/集群迁移.md)
## ACL说明
acl配置说明如果kafka集群启用了ACL但是控制台没看到Acl菜单可以查看[Acl配置启用说明](./document/acl/Acl.md)
## 功能支持
* 多集群支持
* 集群信息
@@ -21,7 +22,7 @@
![功能特性](./document/img/功能特性.png)
## 安装包下载
点击下载(v1.0.2版本)[kafka-console-ui.zip](https://github.com/xxd763795151/kafka-console-ui/releases/download/v1.0.2/kafka-console-ui.zip)
点击下载(v1.0.4版本)[kafka-console-ui.zip](https://github.com/xxd763795151/kafka-console-ui/releases/download/v1.0.4/kafka-console-ui.zip)
## 快速使用
### Windows

36
document/acl/Acl.md Normal file
View File

@@ -0,0 +1,36 @@
# Acl配置启用说明
## 前言
可能有的同学是看了这篇文章来的:[如何通过可视化方式快捷管理kafka的acl配置](https://blog.csdn.net/x763795151/article/details/120200119)
这篇文章里可能说了是通过修改配置文件application.yml的方式来启用ACL示例如下
```yaml
kafka:
config:
# kafka broker地址多个以逗号分隔
bootstrap-server: 'localhost:9092'
# 服务端是否启用acl如果不启用下面的几项都忽略即可
enable-acl: true
# 只支持2种安全协议SASL_PLAINTEXT和PLAINTEXT启用acl则设置为SASL_PLAINTEXT不启用acl不需关心这个配置
security-protocol: SASL_PLAINTEXT
sasl-mechanism: SCRAM-SHA-256
# 超级管理员用户名在broker上已经配置为超级管理员
admin-username: admin
# 超级管理员密码
admin-password: admin
# 启动自动创建配置的超级管理员用户
admin-create: true
# broker连接的zk地址
zookeeper-addr: localhost:2181
sasl-jaas-config: org.apache.kafka.common.security.scram.ScramLoginModule required username="${kafka.config.admin-username}" password="${kafka.config.admin-password}";
```
其中说明了kafka.config.enable-acl配置项需要为true。
注意:**现在不再支持这种方式了**
## 新版本说明
因为现在支持多集群配置,关于多集群配置,可以看主页说明的 配置集群 介绍。
所以这里把这些额外的配置项都去掉了。
如果启用了ACL在页面上新增集群的时候在属性里配置集群的ACL相关信息如下![新增集群](./新增集群.png)
如果控制台检测到属性里有ACL相关属性配置切换到这个集群后ACL菜单会自动出现的。
注意只支持SASL。

View File

Binary file not shown.

After

Width:  |  Height:  |  Size: 245 KiB

View File

@@ -10,7 +10,7 @@
</parent>
<groupId>com.xuxd</groupId>
<artifactId>kafka-console-ui</artifactId>
<version>1.0.3</version>
<version>1.0.4</version>
<name>kafka-console-ui</name>
<description>Kafka console manage ui</description>
<properties>

View File

@@ -8,7 +8,7 @@ import org.apache.kafka.common.Node;
* @author xuxd
* @date 2021-10-08 14:03:21
**/
public class BrokerNode {
public class BrokerNode implements Comparable{
private int id;
@@ -80,4 +80,8 @@ public class BrokerNode {
public void setController(boolean controller) {
isController = controller;
}
@Override public int compareTo(Object o) {
return this.id - ((BrokerNode)o).id;
}
}

View File

@@ -21,7 +21,7 @@ public class TopicPartition implements Comparable {
}
TopicPartition other = (TopicPartition) o;
if (!this.topic.equals(other.getTopic())) {
return this.compareTo(other);
return this.topic.compareTo(other.topic);
}
return this.partition - other.partition;

View File

@@ -0,0 +1,18 @@
package com.xuxd.kafka.console.beans.dto;
import java.util.List;
import lombok.Data;
/**
* kafka-console-ui.
*
* @author xuxd
* @date 2022-02-15 19:08:13
**/
@Data
public class ProposedAssignmentDTO {
private String topic;
private List<Integer> brokers;
}

View File

@@ -0,0 +1,24 @@
package com.xuxd.kafka.console.beans.vo;
import java.util.List;
import lombok.Data;
/**
* kafka-console-ui.
*
* @author xuxd
* @date 2022-01-22 16:24:58
**/
@Data
public class BrokerApiVersionVO {
private int brokerId;
private String host;
private int supportNums;
private int unSupportNums;
private List<String> versionInfo;
}

View File

@@ -53,4 +53,9 @@ public class ClusterController {
public Object peekClusterInfo() {
return clusterService.peekClusterInfo();
}
@GetMapping("/info/api/version")
public Object getBrokerApiVersionInfo() {
return clusterService.getBrokerApiVersionInfo();
}
}

View File

@@ -2,6 +2,7 @@ package com.xuxd.kafka.console.controller;
import com.xuxd.kafka.console.beans.TopicPartition;
import com.xuxd.kafka.console.beans.dto.BrokerThrottleDTO;
import com.xuxd.kafka.console.beans.dto.ProposedAssignmentDTO;
import com.xuxd.kafka.console.beans.dto.ReplicationDTO;
import com.xuxd.kafka.console.beans.dto.SyncDataDTO;
import com.xuxd.kafka.console.service.OperationService;
@@ -74,4 +75,9 @@ public class OperationController {
public Object cancelReassignment(@RequestBody TopicPartition partition) {
return operationService.cancelReassignment(new org.apache.kafka.common.TopicPartition(partition.getTopic(), partition.getPartition()));
}
@PostMapping("/replication/reassignments/proposed")
public Object proposedAssignments(@RequestBody ProposedAssignmentDTO dto) {
return operationService.proposedAssignments(dto.getTopic(), dto.getBrokers());
}
}

View File

@@ -21,4 +21,6 @@ public interface ClusterService {
ResponseData updateClusterInfo(ClusterInfoDO infoDO);
ResponseData peekClusterInfo();
ResponseData getBrokerApiVersionInfo();
}

View File

@@ -30,4 +30,6 @@ public interface OperationService {
ResponseData currentReassignments();
ResponseData cancelReassignment(TopicPartition partition);
ResponseData proposedAssignments(String topic, List<Integer> brokerList);
}

View File

@@ -1,15 +1,26 @@
package com.xuxd.kafka.console.service.impl;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.xuxd.kafka.console.beans.ClusterInfo;
import com.xuxd.kafka.console.beans.ResponseData;
import com.xuxd.kafka.console.beans.dos.ClusterInfoDO;
import com.xuxd.kafka.console.beans.vo.BrokerApiVersionVO;
import com.xuxd.kafka.console.beans.vo.ClusterInfoVO;
import com.xuxd.kafka.console.dao.ClusterInfoMapper;
import com.xuxd.kafka.console.service.ClusterService;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.TreeSet;
import java.util.stream.Collectors;
import kafka.console.ClusterConsole;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.NodeApiVersions;
import org.apache.kafka.common.Node;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.stereotype.Service;
@@ -33,7 +44,9 @@ public class ClusterServiceImpl implements ClusterService {
}
@Override public ResponseData getClusterInfo() {
return ResponseData.create().data(clusterConsole.clusterInfo()).success();
ClusterInfo clusterInfo = clusterConsole.clusterInfo();
clusterInfo.setNodes(new TreeSet<>(clusterInfo.getNodes()));
return ResponseData.create().data(clusterInfo).success();
}
@Override public ResponseData getClusterInfoList() {
@@ -69,4 +82,29 @@ public class ClusterServiceImpl implements ClusterService {
return ResponseData.create().data(dos.stream().findFirst().map(ClusterInfoVO::from)).success();
}
@Override public ResponseData getBrokerApiVersionInfo() {
HashMap<Node, NodeApiVersions> map = clusterConsole.listBrokerVersionInfo();
List<BrokerApiVersionVO> list = new ArrayList<>(map.size());
map.forEach(((node, versions) -> {
BrokerApiVersionVO vo = new BrokerApiVersionVO();
vo.setBrokerId(node.id());
vo.setHost(node.host() + ":" + node.port());
vo.setSupportNums(versions.allSupportedApiVersions().size());
String versionInfo = versions.toString(true);
int from = 0;
int count = 0;
int index = -1;
while ((index = versionInfo.indexOf("UNSUPPORTED", from)) >= 0 && from < versionInfo.length()) {
count++;
from = index + 1;
}
vo.setUnSupportNums(count);
versionInfo = versionInfo.substring(1, versionInfo.length() - 2);
vo.setVersionInfo(Arrays.asList(StringUtils.split(versionInfo, ",")));
list.add(vo);
}));
Collections.sort(list, Comparator.comparingInt(BrokerApiVersionVO::getBrokerId));
return ResponseData.create().data(list).success();
}
}

View File

@@ -1,6 +1,7 @@
package com.xuxd.kafka.console.service.impl;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.google.common.collect.Lists;
import com.google.gson.Gson;
import com.google.gson.JsonObject;
import com.xuxd.kafka.console.beans.ResponseData;
@@ -10,6 +11,7 @@ import com.xuxd.kafka.console.beans.vo.OffsetAlignmentVO;
import com.xuxd.kafka.console.dao.MinOffsetAlignmentMapper;
import com.xuxd.kafka.console.service.OperationService;
import com.xuxd.kafka.console.utils.GsonUtil;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@@ -19,6 +21,7 @@ import java.util.Properties;
import java.util.Set;
import java.util.stream.Collectors;
import kafka.console.OperationConsole;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.admin.PartitionReassignment;
import org.apache.kafka.common.TopicPartition;
import org.springframework.beans.factory.ObjectProvider;
@@ -162,4 +165,21 @@ public class OperationServiceImpl implements OperationService {
}
return ResponseData.create().success();
}
@Override public ResponseData proposedAssignments(String topic, List<Integer> brokerList) {
Map<String, Object> params = new HashMap<>();
params.put("version", 1);
Map<String, String> topicMap = new HashMap<>(1, 1.0f);
topicMap.put("topic", topic);
params.put("topics", Lists.newArrayList(topicMap));
List<String> list = brokerList.stream().map(String::valueOf).collect(Collectors.toList());
Map<TopicPartition, List<Object>> assignments = operationConsole.proposedAssignments(gson.toJson(params), StringUtils.join(list, ","));
List<CurrentReassignmentVO> res = new ArrayList<>(assignments.size());
assignments.forEach((tp, replicas) -> {
CurrentReassignmentVO vo = new CurrentReassignmentVO(tp.topic(), tp.partition(),
replicas.stream().map(x -> (Integer) x).collect(Collectors.toList()), null, null);
res.add(vo);
});
return ResponseData.create().data(res).success();
}
}

View File

@@ -0,0 +1,330 @@
package kafka.console
import com.xuxd.kafka.console.config.ContextConfigHolder
import kafka.utils.Implicits.MapExtensionMethods
import kafka.utils.Logging
import org.apache.kafka.clients._
import org.apache.kafka.clients.admin.AdminClientConfig
import org.apache.kafka.clients.consumer.internals.{ConsumerNetworkClient, RequestFuture}
import org.apache.kafka.common.Node
import org.apache.kafka.common.config.ConfigDef.ValidString.in
import org.apache.kafka.common.config.ConfigDef.{Importance, Type}
import org.apache.kafka.common.config.{AbstractConfig, ConfigDef}
import org.apache.kafka.common.errors.AuthenticationException
import org.apache.kafka.common.internals.ClusterResourceListeners
import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersionCollection
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.network.Selector
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests._
import org.apache.kafka.common.utils.{KafkaThread, LogContext, Time}
import java.io.IOException
import java.util.Properties
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.{ConcurrentLinkedQueue, TimeUnit}
import scala.jdk.CollectionConverters.{ListHasAsScala, MapHasAsJava, PropertiesHasAsScala, SetHasAsScala}
import scala.util.{Failure, Success, Try}
/**
* kafka-console-ui.
*
* Copy from {@link kafka.admin.BrokerApiVersionsCommand}.
*
* @author xuxd
* @date 2022-01-22 15:15:57
* */
object BrokerApiVersion extends Logging {
def listAllBrokerApiVersionInfo(): java.util.HashMap[Node, NodeApiVersions] = {
val res = new java.util.HashMap[Node, NodeApiVersions]()
val adminClient = createAdminClient()
try {
adminClient.awaitBrokers()
val brokerMap = adminClient.listAllBrokerVersionInfo()
brokerMap.forKeyValue {
(broker, versionInfoOrError) =>
versionInfoOrError match {
case Success(v) => {
res.put(broker, v)
}
case Failure(v) => logger.error(s"${broker} -> ERROR: ${v}\n")
}
}
} finally {
adminClient.close()
}
res
}
private def createAdminClient(): AdminClient = {
val props = new Properties()
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, ContextConfigHolder.CONTEXT_CONFIG.get().getBootstrapServer())
props.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, ContextConfigHolder.CONTEXT_CONFIG.get().getRequestTimeoutMs())
props.putAll(ContextConfigHolder.CONTEXT_CONFIG.get().getProperties())
AdminClient.create(props)
}
// org.apache.kafka.clients.admin.AdminClient doesn't currently expose a way to retrieve the supported api versions.
// We inline the bits we need from kafka.admin.AdminClient so that we can delete it.
private class AdminClient(val time: Time,
val client: ConsumerNetworkClient,
val bootstrapBrokers: List[Node]) extends Logging {
@volatile var running = true
val pendingFutures = new ConcurrentLinkedQueue[RequestFuture[ClientResponse]]()
val networkThread = new KafkaThread("admin-client-network-thread", () => {
try {
while (running)
client.poll(time.timer(Long.MaxValue))
} catch {
case t: Throwable =>
error("admin-client-network-thread exited", t)
} finally {
pendingFutures.forEach { future =>
try {
future.raise(Errors.UNKNOWN_SERVER_ERROR)
} catch {
case _: IllegalStateException => // It is OK if the future has been completed
}
}
pendingFutures.clear()
}
}, true)
networkThread.start()
private def send(target: Node,
request: AbstractRequest.Builder[_ <: AbstractRequest]): AbstractResponse = {
val future = client.send(target, request)
pendingFutures.add(future)
future.awaitDone(Long.MaxValue, TimeUnit.MILLISECONDS)
pendingFutures.remove(future)
if (future.succeeded())
future.value().responseBody()
else
throw future.exception()
}
private def sendAnyNode(request: AbstractRequest.Builder[_ <: AbstractRequest]): AbstractResponse = {
bootstrapBrokers.foreach { broker =>
try {
return send(broker, request)
} catch {
case e: AuthenticationException =>
throw e
case e: Exception =>
debug(s"Request ${request.apiKey()} failed against node $broker", e)
}
}
throw new RuntimeException(s"Request ${request.apiKey()} failed on brokers $bootstrapBrokers")
}
private def getApiVersions(node: Node): ApiVersionCollection = {
val response = send(node, new ApiVersionsRequest.Builder()).asInstanceOf[ApiVersionsResponse]
Errors.forCode(response.data.errorCode).maybeThrow()
response.data.apiKeys
}
/**
* Wait until there is a non-empty list of brokers in the cluster.
*/
def awaitBrokers(): Unit = {
var nodes = List[Node]()
val start = System.currentTimeMillis()
val maxWait = 30 * 1000
do {
nodes = findAllBrokers()
if (nodes.isEmpty) {
Thread.sleep(50)
}
}
while (nodes.isEmpty && (System.currentTimeMillis() - start < maxWait))
}
private def findAllBrokers(): List[Node] = {
val request = MetadataRequest.Builder.allTopics()
val response = sendAnyNode(request).asInstanceOf[MetadataResponse]
val errors = response.errors
if (!errors.isEmpty) {
logger.info(s"Metadata request contained errors: $errors")
}
// 在3.x版本中这个方法是buildCluster 代替cluster()了
// response.buildCluster.nodes.asScala.toList
response.cluster().nodes.asScala.toList
}
def listAllBrokerVersionInfo(): Map[Node, Try[NodeApiVersions]] =
findAllBrokers().map { broker =>
broker -> Try[NodeApiVersions](new NodeApiVersions(getApiVersions(broker)))
}.toMap
def close(): Unit = {
running = false
try {
client.close()
} catch {
case e: IOException =>
error("Exception closing nioSelector:", e)
}
}
}
private object AdminClient {
val DefaultConnectionMaxIdleMs = 9 * 60 * 1000
val DefaultRequestTimeoutMs = 5000
val DefaultSocketConnectionSetupMs = CommonClientConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MS_CONFIG
val DefaultSocketConnectionSetupMaxMs = CommonClientConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_CONFIG
val DefaultMaxInFlightRequestsPerConnection = 100
val DefaultReconnectBackoffMs = 50
val DefaultReconnectBackoffMax = 50
val DefaultSendBufferBytes = 128 * 1024
val DefaultReceiveBufferBytes = 32 * 1024
val DefaultRetryBackoffMs = 100
val AdminClientIdSequence = new AtomicInteger(1)
val AdminConfigDef = {
val config = new ConfigDef()
.define(
CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG,
Type.LIST,
Importance.HIGH,
CommonClientConfigs.BOOTSTRAP_SERVERS_DOC)
.define(CommonClientConfigs.CLIENT_DNS_LOOKUP_CONFIG,
Type.STRING,
ClientDnsLookup.USE_ALL_DNS_IPS.toString,
in(ClientDnsLookup.USE_ALL_DNS_IPS.toString,
ClientDnsLookup.RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY.toString),
Importance.MEDIUM,
CommonClientConfigs.CLIENT_DNS_LOOKUP_DOC)
.define(
CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,
ConfigDef.Type.STRING,
CommonClientConfigs.DEFAULT_SECURITY_PROTOCOL,
ConfigDef.Importance.MEDIUM,
CommonClientConfigs.SECURITY_PROTOCOL_DOC)
.define(
CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG,
ConfigDef.Type.INT,
DefaultRequestTimeoutMs,
ConfigDef.Importance.MEDIUM,
CommonClientConfigs.REQUEST_TIMEOUT_MS_DOC)
.define(
CommonClientConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MS_CONFIG,
ConfigDef.Type.LONG,
CommonClientConfigs.DEFAULT_SOCKET_CONNECTION_SETUP_TIMEOUT_MS,
ConfigDef.Importance.MEDIUM,
CommonClientConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MS_DOC)
.define(
CommonClientConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_CONFIG,
ConfigDef.Type.LONG,
CommonClientConfigs.DEFAULT_SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS,
ConfigDef.Importance.MEDIUM,
CommonClientConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_DOC)
.define(
CommonClientConfigs.RETRY_BACKOFF_MS_CONFIG,
ConfigDef.Type.LONG,
DefaultRetryBackoffMs,
ConfigDef.Importance.MEDIUM,
CommonClientConfigs.RETRY_BACKOFF_MS_DOC)
.withClientSslSupport()
.withClientSaslSupport()
config
}
class AdminConfig(originals: Map[_, _]) extends AbstractConfig(AdminConfigDef, originals.asJava, false)
def create(props: Properties): AdminClient = {
val properties = new Properties()
val names = props.stringPropertyNames()
for (name <- names.asScala.toSet) {
properties.put(name, props.get(name).toString())
}
create(properties.asScala.toMap)
}
def create(props: Map[String, _]): AdminClient = create(new AdminConfig(props))
def create(config: AdminConfig): AdminClient = {
val clientId = "admin-" + AdminClientIdSequence.getAndIncrement()
val logContext = new LogContext(s"[LegacyAdminClient clientId=$clientId] ")
val time = Time.SYSTEM
val metrics = new Metrics(time)
val metadata = new Metadata(100L, 60 * 60 * 1000L, logContext,
new ClusterResourceListeners)
val channelBuilder = ClientUtils.createChannelBuilder(config, time, logContext)
val requestTimeoutMs = config.getInt(CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG)
val connectionSetupTimeoutMs = config.getLong(CommonClientConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MS_CONFIG)
val connectionSetupTimeoutMaxMs = config.getLong(CommonClientConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_CONFIG)
val retryBackoffMs = config.getLong(CommonClientConfigs.RETRY_BACKOFF_MS_CONFIG)
val brokerUrls = config.getList(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG)
val clientDnsLookup = config.getString(CommonClientConfigs.CLIENT_DNS_LOOKUP_CONFIG)
val brokerAddresses = ClientUtils.parseAndValidateAddresses(brokerUrls, clientDnsLookup)
metadata.bootstrap(brokerAddresses)
val selector = new Selector(
DefaultConnectionMaxIdleMs,
metrics,
time,
"admin",
channelBuilder,
logContext)
// 版本不一样,这个地方的兼容性问题也不一样了
// 3.x版本用这个
// val networkClient = new NetworkClient(
// selector,
// metadata,
// clientId,
// DefaultMaxInFlightRequestsPerConnection,
// DefaultReconnectBackoffMs,
// DefaultReconnectBackoffMax,
// DefaultSendBufferBytes,
// DefaultReceiveBufferBytes,
// requestTimeoutMs,
// connectionSetupTimeoutMs,
// connectionSetupTimeoutMaxMs,
// time,
// true,
// new ApiVersions,
// logContext)
val networkClient = new NetworkClient(
selector,
metadata,
clientId,
DefaultMaxInFlightRequestsPerConnection,
DefaultReconnectBackoffMs,
DefaultReconnectBackoffMax,
DefaultSendBufferBytes,
DefaultReceiveBufferBytes,
requestTimeoutMs,
connectionSetupTimeoutMs,
connectionSetupTimeoutMaxMs,
ClientDnsLookup.USE_ALL_DNS_IPS,
time,
true,
new ApiVersions,
logContext)
val highLevelClient = new ConsumerNetworkClient(
logContext,
networkClient,
metadata,
time,
retryBackoffMs,
requestTimeoutMs,
Integer.MAX_VALUE)
new AdminClient(
time,
highLevelClient,
metadata.fetch.nodes.asScala.toList)
}
}
}

View File

@@ -1,11 +1,13 @@
package kafka.console
import com.xuxd.kafka.console.beans.{BrokerNode, ClusterInfo}
import com.xuxd.kafka.console.config.{ContextConfigHolder, KafkaConfig}
import org.apache.kafka.clients.NodeApiVersions
import org.apache.kafka.clients.admin.DescribeClusterResult
import org.apache.kafka.common.Node
import java.util.Collections
import java.util.concurrent.TimeUnit
import com.xuxd.kafka.console.beans.{BrokerNode, ClusterInfo}
import com.xuxd.kafka.console.config.{ContextConfigHolder, KafkaConfig}
import org.apache.kafka.clients.admin.DescribeClusterResult
import scala.jdk.CollectionConverters.{CollectionHasAsScala, SetHasAsJava, SetHasAsScala}
/**
@@ -41,4 +43,8 @@ class ClusterConsole(config: KafkaConfig) extends KafkaConsole(config: KafkaConf
new ClusterInfo
}).asInstanceOf[ClusterInfo]
}
def listBrokerVersionInfo(): java.util.HashMap[Node, NodeApiVersions] = {
BrokerApiVersion.listAllBrokerApiVersionInfo()
}
}

View File

@@ -242,8 +242,8 @@ class OperationConsole(config: KafkaConfig, topicConsole: TopicConsole,
withAdminClientAndCatchError(admin => {
admin.listPartitionReassignments(withTimeoutMs(new ListPartitionReassignmentsOptions)).reassignments().get()
}, e => {
Collections.emptyMap()
log.error("listPartitionReassignments error.", e)
Collections.emptyMap()
}).asInstanceOf[util.Map[TopicPartition, PartitionReassignment]]
}
@@ -256,4 +256,20 @@ class OperationConsole(config: KafkaConfig, topicConsole: TopicConsole,
throw e
}).asInstanceOf[util.Map[TopicPartition, Throwable]]
}
def proposedAssignments(reassignmentJson: String,
brokerListString: String): util.Map[TopicPartition, util.List[Int]] = {
withAdminClientAndCatchError(admin => {
val map = ReassignPartitionsCommand.generateAssignment(admin, reassignmentJson, brokerListString, true)._1
val res = new util.HashMap[TopicPartition, util.List[Int]]()
for (tp <- map.keys) {
res.put(tp, map(tp).asJava)
// res.put(tp, map.getOrElse(tp, Seq.empty).asJava)
}
res
}, e => {
log.error("proposedAssignments error.", e)
throw e
})
}.asInstanceOf[util.Map[TopicPartition, util.List[Int]]]
}

160
ui/package-lock.json generated
View File

@@ -1820,6 +1820,63 @@
"integrity": "sha1-/q7SVZc9LndVW4PbwIhRpsY1IPo=",
"dev": true
},
"ansi-styles": {
"version": "4.3.0",
"resolved": "https://registry.npmjs.org/ansi-styles/-/ansi-styles-4.3.0.tgz",
"integrity": "sha512-zbB9rCJAT1rbjiVDb2hqKFHNYLxgtk8NURxZ3IZwD3F6NtxbXZQCnnSi1Lkx+IDohdPlFp222wVALIheZJQSEg==",
"dev": true,
"optional": true,
"requires": {
"color-convert": "^2.0.1"
}
},
"chalk": {
"version": "4.1.2",
"resolved": "https://registry.npmjs.org/chalk/-/chalk-4.1.2.tgz",
"integrity": "sha512-oKnbhFyRIXpUuez8iBMmyEa4nbj4IOQyuhc/wy9kY7/WVPcwIO9VA668Pu8RkO7+0G76SLROeyw9CpQ061i4mA==",
"dev": true,
"optional": true,
"requires": {
"ansi-styles": "^4.1.0",
"supports-color": "^7.1.0"
}
},
"color-convert": {
"version": "2.0.1",
"resolved": "https://registry.npmjs.org/color-convert/-/color-convert-2.0.1.tgz",
"integrity": "sha512-RRECPsj7iu/xb5oKYcsFHSppFNnsj/52OVTRKb4zP5onXwVF3zVmmToNcOfGC+CRDpfK/U584fMg38ZHCaElKQ==",
"dev": true,
"optional": true,
"requires": {
"color-name": "~1.1.4"
}
},
"color-name": {
"version": "1.1.4",
"resolved": "https://registry.npmjs.org/color-name/-/color-name-1.1.4.tgz",
"integrity": "sha512-dOy+3AuW3a2wNbZHIuMZpTcgjGuLU/uBL/ubcZF9OXbDo8ff4O8yVp5Bf0efS8uEoYo5q4Fx7dY9OgQGXgAsQA==",
"dev": true,
"optional": true
},
"has-flag": {
"version": "4.0.0",
"resolved": "https://registry.npmjs.org/has-flag/-/has-flag-4.0.0.tgz",
"integrity": "sha512-EykJT/Q1KjTWctppgIAgfSO0tKVuZUjhgMr17kqTumMl6Afv3EISleU7qZUzoXDFTAHTDC4NOoG/ZxU3EvlMPQ==",
"dev": true,
"optional": true
},
"loader-utils": {
"version": "2.0.2",
"resolved": "https://registry.npmjs.org/loader-utils/-/loader-utils-2.0.2.tgz",
"integrity": "sha512-TM57VeHptv569d/GKh6TAYdzKblwDNiumOdkFnejjD0XwTH87K90w3O7AiJRqdQoXygvi1VQTJTLGhJl7WqA7A==",
"dev": true,
"optional": true,
"requires": {
"big.js": "^5.2.2",
"emojis-list": "^3.0.0",
"json5": "^2.1.2"
}
},
"ssri": {
"version": "8.0.1",
"resolved": "https://registry.nlark.com/ssri/download/ssri-8.0.1.tgz",
@@ -1828,6 +1885,28 @@
"requires": {
"minipass": "^3.1.1"
}
},
"supports-color": {
"version": "7.2.0",
"resolved": "https://registry.npmjs.org/supports-color/-/supports-color-7.2.0.tgz",
"integrity": "sha512-qpCAvRl9stuOHveKsn7HncJRvv501qIacKzQlO/+Lwxc9+0q2wLyv4Dfvt80/DPn2pqOBsJdDiogXGR9+OvwRw==",
"dev": true,
"optional": true,
"requires": {
"has-flag": "^4.0.0"
}
},
"vue-loader-v16": {
"version": "npm:vue-loader@16.8.3",
"resolved": "https://registry.npmjs.org/vue-loader/-/vue-loader-16.8.3.tgz",
"integrity": "sha512-7vKN45IxsKxe5GcVCbc2qFU5aWzyiLrYJyUuMz4BQLKctCj/fmCa0w6fGiiQ2cLFetNcek1ppGJQDCup0c1hpA==",
"dev": true,
"optional": true,
"requires": {
"chalk": "^4.1.0",
"hash-sum": "^2.0.0",
"loader-utils": "^2.0.0"
}
}
}
},
@@ -12097,87 +12176,6 @@
}
}
},
"vue-loader-v16": {
"version": "npm:vue-loader@16.8.3",
"resolved": "https://registry.npmjs.org/vue-loader/-/vue-loader-16.8.3.tgz",
"integrity": "sha512-7vKN45IxsKxe5GcVCbc2qFU5aWzyiLrYJyUuMz4BQLKctCj/fmCa0w6fGiiQ2cLFetNcek1ppGJQDCup0c1hpA==",
"dev": true,
"optional": true,
"requires": {
"chalk": "^4.1.0",
"hash-sum": "^2.0.0",
"loader-utils": "^2.0.0"
},
"dependencies": {
"ansi-styles": {
"version": "4.3.0",
"resolved": "https://registry.npmjs.org/ansi-styles/-/ansi-styles-4.3.0.tgz",
"integrity": "sha512-zbB9rCJAT1rbjiVDb2hqKFHNYLxgtk8NURxZ3IZwD3F6NtxbXZQCnnSi1Lkx+IDohdPlFp222wVALIheZJQSEg==",
"dev": true,
"optional": true,
"requires": {
"color-convert": "^2.0.1"
}
},
"chalk": {
"version": "4.1.2",
"resolved": "https://registry.npmjs.org/chalk/-/chalk-4.1.2.tgz",
"integrity": "sha512-oKnbhFyRIXpUuez8iBMmyEa4nbj4IOQyuhc/wy9kY7/WVPcwIO9VA668Pu8RkO7+0G76SLROeyw9CpQ061i4mA==",
"dev": true,
"optional": true,
"requires": {
"ansi-styles": "^4.1.0",
"supports-color": "^7.1.0"
}
},
"color-convert": {
"version": "2.0.1",
"resolved": "https://registry.npmjs.org/color-convert/-/color-convert-2.0.1.tgz",
"integrity": "sha512-RRECPsj7iu/xb5oKYcsFHSppFNnsj/52OVTRKb4zP5onXwVF3zVmmToNcOfGC+CRDpfK/U584fMg38ZHCaElKQ==",
"dev": true,
"optional": true,
"requires": {
"color-name": "~1.1.4"
}
},
"color-name": {
"version": "1.1.4",
"resolved": "https://registry.npmjs.org/color-name/-/color-name-1.1.4.tgz",
"integrity": "sha512-dOy+3AuW3a2wNbZHIuMZpTcgjGuLU/uBL/ubcZF9OXbDo8ff4O8yVp5Bf0efS8uEoYo5q4Fx7dY9OgQGXgAsQA==",
"dev": true,
"optional": true
},
"has-flag": {
"version": "4.0.0",
"resolved": "https://registry.npmjs.org/has-flag/-/has-flag-4.0.0.tgz",
"integrity": "sha512-EykJT/Q1KjTWctppgIAgfSO0tKVuZUjhgMr17kqTumMl6Afv3EISleU7qZUzoXDFTAHTDC4NOoG/ZxU3EvlMPQ==",
"dev": true,
"optional": true
},
"loader-utils": {
"version": "2.0.2",
"resolved": "https://registry.npmjs.org/loader-utils/-/loader-utils-2.0.2.tgz",
"integrity": "sha512-TM57VeHptv569d/GKh6TAYdzKblwDNiumOdkFnejjD0XwTH87K90w3O7AiJRqdQoXygvi1VQTJTLGhJl7WqA7A==",
"dev": true,
"optional": true,
"requires": {
"big.js": "^5.2.2",
"emojis-list": "^3.0.0",
"json5": "^2.1.2"
}
},
"supports-color": {
"version": "7.2.0",
"resolved": "https://registry.npmjs.org/supports-color/-/supports-color-7.2.0.tgz",
"integrity": "sha512-qpCAvRl9stuOHveKsn7HncJRvv501qIacKzQlO/+Lwxc9+0q2wLyv4Dfvt80/DPn2pqOBsJdDiogXGR9+OvwRw==",
"dev": true,
"optional": true,
"requires": {
"has-flag": "^4.0.0"
}
}
}
},
"vue-ref": {
"version": "2.0.0",
"resolved": "https://registry.npm.taobao.org/vue-ref/download/vue-ref-2.0.0.tgz",

View File

@@ -203,6 +203,10 @@ export const KafkaClusterApi = {
url: "/cluster/info/peek",
method: "get",
},
getBrokerApiVersionInfo: {
url: "/cluster/info/api/version",
method: "get",
},
};
export const KafkaOpApi = {
@@ -242,6 +246,10 @@ export const KafkaOpApi = {
url: "/op/replication/reassignments",
method: "delete",
},
proposedAssignment: {
url: "/op/replication/reassignments/proposed",
method: "post",
},
};
export const KafkaMessageApi = {
searchByTime: {

View File

@@ -27,7 +27,7 @@ request.interceptors.request.use((config) => {
const clusterInfo = getClusterInfo();
if (clusterInfo) {
config.headers["X-Cluster-Info-Id"] = clusterInfo.id;
config.headers["X-Cluster-Info-Name"] = clusterInfo.clusterName;
// config.headers["X-Cluster-Info-Name"] = encodeURIComponent(clusterInfo.clusterName);
}
return config;
}, errorHandler);

View File

@@ -1,25 +1,66 @@
<template>
<div class="home">
<a-card title="kafka console 配置" style="width: 100%">
<!-- <a slot="extra" href="#">more</a>-->
<a-card title="控制台默认配置" class="card-style">
<p v-for="(v, k) in config" :key="k">{{ k }}={{ v }}</p>
</a-card>
<p></p>
<hr />
<h3>kafka API 版本兼容性</h3>
<a-spin :spinning="apiVersionInfoLoading">
<a-table
:columns="columns"
:data-source="brokerApiVersionInfo"
bordered
row-key="brokerId"
>
<div slot="operation" slot-scope="record">
<a-button
size="small"
href="javascript:;"
class="operation-btn"
@click="openApiVersionInfoDialog(record)"
>详情
</a-button>
</div>
</a-table>
</a-spin>
<VersionInfo
:version-info="apiVersionInfo"
:visible="showApiVersionInfoDialog"
@closeApiVersionInfoDialog="closeApiVersionInfoDialog"
>
</VersionInfo>
</div>
</template>
<script>
// @ is an alias to /src
import request from "@/utils/request";
import { KafkaConfigApi } from "@/utils/api";
import { KafkaConfigApi, KafkaClusterApi } from "@/utils/api";
import notification from "ant-design-vue/lib/notification";
import VersionInfo from "@/views/home/VersionInfo";
export default {
name: "Home",
components: {},
components: { VersionInfo },
data() {
return {
config: {},
columns,
brokerApiVersionInfo: [],
showApiVersionInfoDialog: false,
apiVersionInfo: [],
apiVersionInfoLoading: false,
};
},
methods: {
openApiVersionInfoDialog(record) {
this.apiVersionInfo = record.versionInfo;
this.showApiVersionInfoDialog = true;
},
closeApiVersionInfoDialog() {
this.showApiVersionInfoDialog = false;
},
},
created() {
request({
@@ -35,6 +76,53 @@ export default {
});
}
});
this.apiVersionInfoLoading = true;
request({
url: KafkaClusterApi.getBrokerApiVersionInfo.url,
method: KafkaClusterApi.getBrokerApiVersionInfo.method,
}).then((res) => {
this.apiVersionInfoLoading = false;
if (res.code == 0) {
this.brokerApiVersionInfo = res.data;
} else {
notification.error({
message: "error",
description: res.msg,
});
}
});
},
};
const columns = [
{
title: "id",
dataIndex: "brokerId",
key: "brokerId",
},
{
title: "地址",
dataIndex: "host",
key: "host",
},
{
title: "支持的api数量",
dataIndex: "supportNums",
key: "supportNums",
},
{
title: "不支持的api数量",
dataIndex: "unSupportNums",
key: "unSupportNums",
},
{
title: "操作",
key: "operation",
scopedSlots: { customRender: "operation" },
},
];
</script>
<style scoped>
.card-style {
width: 100%;
}
</style>

View File

@@ -0,0 +1,61 @@
<template>
<a-modal
title="API版本信息"
:visible="show"
:width="600"
:mask="false"
:destroyOnClose="true"
:footer="null"
:maskClosable="false"
@cancel="handleCancel"
>
<div>
<h3>格式说明</h3>
<p>请求类型(1)0 to n(2) [usage: v](3)</p>
<ol>
<li>表示客户端发出的请求类型</li>
<li>该请求在broker中支持的版本号区间</li>
<li>
表示当前控制台的kafka客户端使用的是v版本如果是UNSUPPORTED说明broker版本太老无法处理控制台的这些请求可能影响相关功能的使用
</li>
</ol>
<hr />
<ol>
<li v-for="info in versionInfo" v-bind:key="info">{{ info }}</li>
</ol>
</div>
</a-modal>
</template>
<script>
export default {
name: "APIVersionInfo",
props: {
versionInfo: {
type: Array,
},
visible: {
type: Boolean,
default: false,
},
},
data() {
return {
show: false,
};
},
watch: {
visible(v) {
this.show = v;
},
},
methods: {
handleCancel() {
this.$emit("closeApiVersionInfoDialog", {});
},
},
};
</script>
<style scoped></style>

View File

@@ -203,7 +203,7 @@ export default {
this.$emit("closeDetailDialog", { refresh: false });
},
formatTime(time) {
return moment(time).format("YYYY-MM-DD HH:mm:ss:SSS");
return time == -1 ? -1 : moment(time).format("YYYY-MM-DD HH:mm:ss:SSS");
},
keyDeserializerChange() {
this.getMessageDetail();

View File

@@ -9,6 +9,7 @@
return index;
}
"
@change="handleChange"
>
<div slot="operation" slot-scope="record">
<a-button
@@ -41,9 +42,9 @@ export default {
},
data() {
return {
columns: columns,
showDetailDialog: false,
record: {},
sortedInfo: null,
};
},
methods: {
@@ -54,42 +55,56 @@ export default {
closeDetailDialog() {
this.showDetailDialog = false;
},
},
};
const columns = [
{
title: "topic",
dataIndex: "topic",
key: "topic",
width: 300,
},
{
title: "分区",
dataIndex: "partition",
key: "partition",
},
{
title: "偏移",
dataIndex: "offset",
key: "offset",
},
{
title: "时间",
dataIndex: "timestamp",
key: "timestamp",
slots: { title: "timestamp" },
scopedSlots: { customRender: "timestamp" },
customRender: (text) => {
return moment(text).format("YYYY-MM-DD HH:mm:ss:SSS");
handleChange() {
this.sortedInfo = arguments[2];
},
},
{
title: "操作",
key: "operation",
scopedSlots: { customRender: "operation" },
width: 200,
computed: {
columns() {
let sortedInfo = this.sortedInfo || {};
const columns = [
{
title: "topic",
dataIndex: "topic",
key: "topic",
width: 300,
},
{
title: "分区",
dataIndex: "partition",
key: "partition",
},
{
title: "偏移",
dataIndex: "offset",
key: "offset",
},
{
title: "时间",
dataIndex: "timestamp",
key: "timestamp",
slots: { title: "timestamp" },
scopedSlots: { customRender: "timestamp" },
customRender: (text) => {
return text == -1
? -1
: moment(text).format("YYYY-MM-DD HH:mm:ss:SSS");
},
sorter: (a, b) => a.timestamp - b.timestamp,
sortOrder: sortedInfo.columnKey === "timestamp" && sortedInfo.order,
sortDirections: ["ascend", "descend"],
},
{
title: "操作",
key: "operation",
scopedSlots: { customRender: "operation" },
width: 200,
},
];
return columns;
},
},
];
};
</script>
<style scoped></style>

View File

@@ -69,6 +69,9 @@ sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule require
import request from "@/utils/request";
import { KafkaClusterApi } from "@/utils/api";
import notification from "ant-design-vue/es/notification";
import { getClusterInfo } from "@/utils/local-cache";
import { mapMutations } from "vuex";
import { CLUSTER } from "@/store/mutation-types";
export default {
name: "AddClusterInfo",
@@ -124,6 +127,17 @@ export default {
if (res.code == 0) {
this.$message.success(res.msg);
this.$emit(this.closeDialogEvent, { refresh: true });
if (this.isModify) {
let clusterInfo = getClusterInfo();
if (
clusterInfo &&
clusterInfo.id &&
clusterInfo.id == this.clusterInfo.id &&
clusterInfo.clusterName != data.clusterName
) {
this.switchCluster(data);
}
}
} else {
notification.error({
message: "error",
@@ -138,6 +152,9 @@ export default {
this.data = [];
this.$emit(this.closeDialogEvent, { refresh: false });
},
...mapMutations({
switchCluster: CLUSTER.SWITCH,
}),
},
};
const defaultInfo = { clusterName: "", address: "", properties: "" };

View File

@@ -49,6 +49,15 @@
<label>说明</label>
<span>查看正在进行副本变更/重分配的任务或者将其取消</span>
</p>
<p>
<a-button type="primary" @click="openReplicaReassignDialog">
副本重分配
</a-button>
<label>说明</label>
<span
>副本所在节点重新分配打个比方集群有6个节点分区1的3个副本在节点123现在将它们重新分配到345</span
>
</p>
</a-card>
</div>
<!-- 隐藏数据同步相关-->
@@ -125,6 +134,11 @@
:visible="clusterManager.showClusterInfoDialog"
@closeClusterInfoDialog="closeClusterInfoDialog"
></ClusterInfo>
<ReplicaReassign
:visible="replicationManager.showReplicaReassignDialog"
@closeReplicaReassignDialog="closeReplicaReassignDialog"
>
</ReplicaReassign>
</div>
</template>
@@ -138,6 +152,7 @@ import ConfigThrottle from "@/views/op/ConfigThrottle";
import RemoveThrottle from "@/views/op/RemoveThrottle";
import CurrentReassignments from "@/views/op/CurrentReassignments";
import ClusterInfo from "@/views/op/ClusterInfo";
import ReplicaReassign from "@/views/op/ReplicaReassign";
export default {
name: "Operation",
components: {
@@ -150,6 +165,7 @@ export default {
RemoveThrottle,
CurrentReassignments,
ClusterInfo,
ReplicaReassign,
},
data() {
return {
@@ -162,6 +178,7 @@ export default {
replicationManager: {
showElectPreferredLeaderDialog: false,
showCurrentReassignmentsDialog: false,
showReplicaReassignDialog: false,
},
brokerManager: {
showConfigThrottleDialog: false,
@@ -227,6 +244,12 @@ export default {
closeClusterInfoDialog() {
this.clusterManager.showClusterInfoDialog = false;
},
openReplicaReassignDialog() {
this.replicationManager.showReplicaReassignDialog = true;
},
closeReplicaReassignDialog() {
this.replicationManager.showReplicaReassignDialog = false;
},
},
};
</script>

View File

@@ -0,0 +1,296 @@
<template>
<a-modal
title="副本重分配"
:visible="show"
:width="800"
:mask="false"
:destroyOnClose="true"
:footer="null"
:maskClosable="false"
@cancel="handleCancel"
>
<div>
<a-spin :spinning="loading">
<a-form
:form="form"
:label-col="{ span: 5 }"
:wrapper-col="{ span: 12 }"
@submit="handleSubmit"
>
<a-form-item label="Topic">
<a-select
@change="handleTopicChange"
show-search
option-filter-prop="children"
v-decorator="[
'topic',
{ rules: [{ required: true, message: '请选择一个topic!' }] },
]"
placeholder="请选择一个topic"
>
<a-select-option v-for="v in topicList" :key="v" :value="v">
{{ v }}
</a-select-option>
</a-select>
</a-form-item>
<a-form-item label="分配到Broker">
<a-select
mode="multiple"
option-filter-prop="children"
v-decorator="[
'brokers',
{
initialValue: brokers,
rules: [{ required: true, message: '请选择一个broker!' }],
},
]"
placeholder="请选择一个broker"
>
<a-select-option v-for="v in brokers" :key="v" :value="v">
<span v-if="v == -1">全部</span> <span v-else>{{ v }}</span>
</a-select-option>
</a-select>
</a-form-item>
<a-table
bordered
:columns="columns"
:data-source="currentAssignment"
:rowKey="
(record, index) => {
return index;
}
"
>
</a-table>
<a-form-item :wrapper-col="{ span: 12, offset: 5 }">
<a-button type="primary" html-type="submit">
重新生成分配计划
</a-button>
</a-form-item>
</a-form>
<hr />
<h2>新的分配计划</h2>
<a-table
bordered
:columns="columns"
:data-source="proposedAssignmentShow"
:rowKey="
(record, index) => {
return index;
}
"
>
</a-table>
<a-button type="danger" @click="updateAssignment"> 更新分配 </a-button>
</a-spin>
<hr />
<h4>注意</h4>
<ul>
<li>
副本重分配可以将副本分配到其它broker上通过选择上面的broker节点根据这几个节点生成分配方案
</li>
<li>
选择的broker的节点数量不能少于当前的副本数比如有3个副本至少需要3个broker节点
</li>
<li>
数据量太大考虑设置一下限流毕竟重新分配后不同broker之间可能做数据迁移
</li>
</ul>
</div>
</a-modal>
</template>
<script>
import request from "@/utils/request";
import { KafkaTopicApi, KafkaOpApi, KafkaClusterApi } from "@/utils/api";
import notification from "ant-design-vue/es/notification";
export default {
name: "ReplicaReassign",
props: {
visible: {
type: Boolean,
default: false,
},
},
data() {
return {
show: this.visible,
data: [],
loading: false,
form: this.$form.createForm(this, { name: "ReplicaReassignForm" }),
topicList: [],
partitions: [],
brokers: [],
currentAssignment: [],
proposedAssignment: [],
proposedAssignmentShow: [],
columns,
};
},
watch: {
visible(v) {
this.show = v;
if (this.show) {
this.clearData();
this.getTopicNameList();
this.getClusterInfo();
}
},
},
methods: {
handleSubmit(e) {
e.preventDefault();
this.form.validateFields((err, values) => {
if (!err) {
this.getProposedAssignment(values);
}
});
},
getTopicReplicaInfo(topic) {
this.loading = true;
request({
url: KafkaTopicApi.getCurrentReplicaAssignment.url + "?topic=" + topic,
method: KafkaTopicApi.getCurrentReplicaAssignment.method,
}).then((res) => {
this.loading = false;
if (res.code == 0) {
this.currentAssignment = res.data.partitions;
this.currentAssignment.forEach(
(e) => (e.replicas = e.replicas.join(","))
);
} else {
notification.error({
message: "error",
description: res.msg,
});
}
});
},
getTopicNameList() {
request({
url: KafkaTopicApi.getTopicNameList.url,
method: KafkaTopicApi.getTopicNameList.method,
}).then((res) => {
if (res.code == 0) {
this.topicList = res.data;
} else {
notification.error({
message: "error",
description: res.msg,
});
}
});
},
getPartitionInfo(topic) {
this.loading = true;
request({
url: KafkaTopicApi.getPartitionInfo.url + "?topic=" + topic,
method: KafkaTopicApi.getPartitionInfo.method,
}).then((res) => {
this.loading = false;
if (res.code != 0) {
notification.error({
message: "error",
description: res.msg,
});
} else {
this.partitions = res.data.map((v) => v.partition);
this.partitions.splice(0, 0, -1);
}
});
},
handleTopicChange(topic) {
// this.getPartitionInfo(topic);
this.clearData();
this.getTopicReplicaInfo(topic);
},
getClusterInfo() {
this.loading = true;
request({
url: KafkaClusterApi.getClusterInfo.url,
method: KafkaClusterApi.getClusterInfo.method,
}).then((res) => {
this.loading = false;
this.brokers = [];
res.data.nodes.forEach((node) => this.brokers.push(node.id));
});
},
getProposedAssignment(params) {
this.loading = true;
request({
url: KafkaOpApi.proposedAssignment.url,
method: KafkaOpApi.proposedAssignment.method,
data: params,
}).then((res) => {
this.loading = false;
if (res.code != 0) {
notification.error({
message: "error",
description: res.msg,
});
} else {
this.proposedAssignmentShow = res.data;
this.proposedAssignment = JSON.parse(
JSON.stringify(this.proposedAssignmentShow)
);
this.proposedAssignmentShow.forEach(
(e) => (e.replicas = e.replicas.join(","))
);
}
});
},
clearData() {
this.currentAssignment = [];
this.proposedAssignment = [];
this.proposedAssignmentShow = [];
},
handleCancel() {
this.data = [];
this.$emit("closeReplicaReassignDialog", { refresh: false });
},
updateAssignment() {
this.form.validateFields((err, values) => {
if (!err) {
if (this.proposedAssignment.length == 0) {
this.$message.warn("请先生成分配计划!");
return;
}
this.loading = true;
request({
url: KafkaTopicApi.updateReplicaAssignment.url,
method: KafkaTopicApi.updateReplicaAssignment.method,
data: { partitions: this.proposedAssignment },
}).then((res) => {
this.loading = false;
if (res.code == 0) {
this.$message.success(res.msg);
this.handleTopicChange(values.topic);
} else {
notification.error({
message: "error",
description: res.msg,
});
}
});
}
});
},
},
};
const columns = [
{
title: "分区",
dataIndex: "partition",
key: "partition",
},
{
title: "副本所在broker",
dataIndex: "replicas",
key: "replicas",
scopedSlots: { customRender: "replicas" },
},
];
</script>
<style scoped></style>