Compare commits
10 Commits
v1.0.3
...
1.0.4-rele
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
8131cb1a42 | ||
|
|
1dd6466261 | ||
|
|
dda08a2152 | ||
|
|
01c7121ee4 | ||
|
|
d939d7653c | ||
|
|
058cd5a24e | ||
|
|
db3f55ac4a | ||
|
|
a311a34537 | ||
|
|
e8fe2ea1c7 | ||
|
|
10302dd39c |
@@ -7,7 +7,8 @@
|
|||||||
如果github能查看图片的话,可以点击[查看菜单页面](./document/overview/概览.md),查看每个页面的样子
|
如果github能查看图片的话,可以点击[查看菜单页面](./document/overview/概览.md),查看每个页面的样子
|
||||||
## 集群迁移支持说明
|
## 集群迁移支持说明
|
||||||
当前主分支及日后版本不再提供消息同步、集群迁移的解决方案,如有需要,查看:[集群迁移说明](./document/datasync/集群迁移.md)
|
当前主分支及日后版本不再提供消息同步、集群迁移的解决方案,如有需要,查看:[集群迁移说明](./document/datasync/集群迁移.md)
|
||||||
|
## ACL说明
|
||||||
|
acl配置说明,如果kafka集群启用了ACL,但是控制台没看到Acl菜单,可以查看:[Acl配置启用说明](./document/acl/Acl.md)
|
||||||
## 功能支持
|
## 功能支持
|
||||||
* 多集群支持
|
* 多集群支持
|
||||||
* 集群信息
|
* 集群信息
|
||||||
@@ -21,7 +22,7 @@
|
|||||||

|

|
||||||
|
|
||||||
## 安装包下载
|
## 安装包下载
|
||||||
点击下载(v1.0.2版本):[kafka-console-ui.zip](https://github.com/xxd763795151/kafka-console-ui/releases/download/v1.0.2/kafka-console-ui.zip)
|
点击下载(v1.0.4版本):[kafka-console-ui.zip](https://github.com/xxd763795151/kafka-console-ui/releases/download/v1.0.4/kafka-console-ui.zip)
|
||||||
|
|
||||||
## 快速使用
|
## 快速使用
|
||||||
### Windows
|
### Windows
|
||||||
|
|||||||
36
document/acl/Acl.md
Normal file
36
document/acl/Acl.md
Normal file
@@ -0,0 +1,36 @@
|
|||||||
|
# Acl配置启用说明
|
||||||
|
## 前言
|
||||||
|
可能有的同学是看了这篇文章来的:[如何通过可视化方式快捷管理kafka的acl配置](https://blog.csdn.net/x763795151/article/details/120200119)
|
||||||
|
|
||||||
|
这篇文章里可能说了是通过修改配置文件application.yml的方式来启用ACL,示例如下:
|
||||||
|
```yaml
|
||||||
|
kafka:
|
||||||
|
config:
|
||||||
|
# kafka broker地址,多个以逗号分隔
|
||||||
|
bootstrap-server: 'localhost:9092'
|
||||||
|
# 服务端是否启用acl,如果不启用,下面的几项都忽略即可
|
||||||
|
enable-acl: true
|
||||||
|
# 只支持2种安全协议SASL_PLAINTEXT和PLAINTEXT,启用acl则设置为SASL_PLAINTEXT,不启用acl不需关心这个配置
|
||||||
|
security-protocol: SASL_PLAINTEXT
|
||||||
|
sasl-mechanism: SCRAM-SHA-256
|
||||||
|
# 超级管理员用户名,在broker上已经配置为超级管理员
|
||||||
|
admin-username: admin
|
||||||
|
# 超级管理员密码
|
||||||
|
admin-password: admin
|
||||||
|
# 启动自动创建配置的超级管理员用户
|
||||||
|
admin-create: true
|
||||||
|
# broker连接的zk地址
|
||||||
|
zookeeper-addr: localhost:2181
|
||||||
|
sasl-jaas-config: org.apache.kafka.common.security.scram.ScramLoginModule required username="${kafka.config.admin-username}" password="${kafka.config.admin-password}";
|
||||||
|
```
|
||||||
|
其中说明了kafka.config.enable-acl配置项需要为true。
|
||||||
|
|
||||||
|
注意:**现在不再支持这种方式了**
|
||||||
|
## 新版本说明
|
||||||
|
因为现在支持多集群配置,关于多集群配置,可以看主页说明的 配置集群 介绍。
|
||||||
|
所以这里把这些额外的配置项都去掉了。
|
||||||
|
|
||||||
|
如果启用了ACL,在页面上新增集群的时候,在属性里配置集群的ACL相关信息,如下:
|
||||||
|
如果控制台检测到属性里有ACL相关属性配置,切换到这个集群后,ACL菜单会自动出现的。
|
||||||
|
|
||||||
|
注意:只支持SASL。
|
||||||
BIN
document/acl/新增集群.png
Normal file
BIN
document/acl/新增集群.png
Normal file
Binary file not shown.
|
After Width: | Height: | Size: 245 KiB |
2
pom.xml
2
pom.xml
@@ -10,7 +10,7 @@
|
|||||||
</parent>
|
</parent>
|
||||||
<groupId>com.xuxd</groupId>
|
<groupId>com.xuxd</groupId>
|
||||||
<artifactId>kafka-console-ui</artifactId>
|
<artifactId>kafka-console-ui</artifactId>
|
||||||
<version>1.0.3</version>
|
<version>1.0.4</version>
|
||||||
<name>kafka-console-ui</name>
|
<name>kafka-console-ui</name>
|
||||||
<description>Kafka console manage ui</description>
|
<description>Kafka console manage ui</description>
|
||||||
<properties>
|
<properties>
|
||||||
|
|||||||
@@ -8,7 +8,7 @@ import org.apache.kafka.common.Node;
|
|||||||
* @author xuxd
|
* @author xuxd
|
||||||
* @date 2021-10-08 14:03:21
|
* @date 2021-10-08 14:03:21
|
||||||
**/
|
**/
|
||||||
public class BrokerNode {
|
public class BrokerNode implements Comparable{
|
||||||
|
|
||||||
private int id;
|
private int id;
|
||||||
|
|
||||||
@@ -80,4 +80,8 @@ public class BrokerNode {
|
|||||||
public void setController(boolean controller) {
|
public void setController(boolean controller) {
|
||||||
isController = controller;
|
isController = controller;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override public int compareTo(Object o) {
|
||||||
|
return this.id - ((BrokerNode)o).id;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -21,7 +21,7 @@ public class TopicPartition implements Comparable {
|
|||||||
}
|
}
|
||||||
TopicPartition other = (TopicPartition) o;
|
TopicPartition other = (TopicPartition) o;
|
||||||
if (!this.topic.equals(other.getTopic())) {
|
if (!this.topic.equals(other.getTopic())) {
|
||||||
return this.compareTo(other);
|
return this.topic.compareTo(other.topic);
|
||||||
}
|
}
|
||||||
|
|
||||||
return this.partition - other.partition;
|
return this.partition - other.partition;
|
||||||
|
|||||||
@@ -0,0 +1,18 @@
|
|||||||
|
package com.xuxd.kafka.console.beans.dto;
|
||||||
|
|
||||||
|
import java.util.List;
|
||||||
|
import lombok.Data;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* kafka-console-ui.
|
||||||
|
*
|
||||||
|
* @author xuxd
|
||||||
|
* @date 2022-02-15 19:08:13
|
||||||
|
**/
|
||||||
|
@Data
|
||||||
|
public class ProposedAssignmentDTO {
|
||||||
|
|
||||||
|
private String topic;
|
||||||
|
|
||||||
|
private List<Integer> brokers;
|
||||||
|
}
|
||||||
@@ -0,0 +1,24 @@
|
|||||||
|
package com.xuxd.kafka.console.beans.vo;
|
||||||
|
|
||||||
|
import java.util.List;
|
||||||
|
import lombok.Data;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* kafka-console-ui.
|
||||||
|
*
|
||||||
|
* @author xuxd
|
||||||
|
* @date 2022-01-22 16:24:58
|
||||||
|
**/
|
||||||
|
@Data
|
||||||
|
public class BrokerApiVersionVO {
|
||||||
|
|
||||||
|
private int brokerId;
|
||||||
|
|
||||||
|
private String host;
|
||||||
|
|
||||||
|
private int supportNums;
|
||||||
|
|
||||||
|
private int unSupportNums;
|
||||||
|
|
||||||
|
private List<String> versionInfo;
|
||||||
|
}
|
||||||
@@ -53,4 +53,9 @@ public class ClusterController {
|
|||||||
public Object peekClusterInfo() {
|
public Object peekClusterInfo() {
|
||||||
return clusterService.peekClusterInfo();
|
return clusterService.peekClusterInfo();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@GetMapping("/info/api/version")
|
||||||
|
public Object getBrokerApiVersionInfo() {
|
||||||
|
return clusterService.getBrokerApiVersionInfo();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -2,6 +2,7 @@ package com.xuxd.kafka.console.controller;
|
|||||||
|
|
||||||
import com.xuxd.kafka.console.beans.TopicPartition;
|
import com.xuxd.kafka.console.beans.TopicPartition;
|
||||||
import com.xuxd.kafka.console.beans.dto.BrokerThrottleDTO;
|
import com.xuxd.kafka.console.beans.dto.BrokerThrottleDTO;
|
||||||
|
import com.xuxd.kafka.console.beans.dto.ProposedAssignmentDTO;
|
||||||
import com.xuxd.kafka.console.beans.dto.ReplicationDTO;
|
import com.xuxd.kafka.console.beans.dto.ReplicationDTO;
|
||||||
import com.xuxd.kafka.console.beans.dto.SyncDataDTO;
|
import com.xuxd.kafka.console.beans.dto.SyncDataDTO;
|
||||||
import com.xuxd.kafka.console.service.OperationService;
|
import com.xuxd.kafka.console.service.OperationService;
|
||||||
@@ -74,4 +75,9 @@ public class OperationController {
|
|||||||
public Object cancelReassignment(@RequestBody TopicPartition partition) {
|
public Object cancelReassignment(@RequestBody TopicPartition partition) {
|
||||||
return operationService.cancelReassignment(new org.apache.kafka.common.TopicPartition(partition.getTopic(), partition.getPartition()));
|
return operationService.cancelReassignment(new org.apache.kafka.common.TopicPartition(partition.getTopic(), partition.getPartition()));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@PostMapping("/replication/reassignments/proposed")
|
||||||
|
public Object proposedAssignments(@RequestBody ProposedAssignmentDTO dto) {
|
||||||
|
return operationService.proposedAssignments(dto.getTopic(), dto.getBrokers());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -21,4 +21,6 @@ public interface ClusterService {
|
|||||||
ResponseData updateClusterInfo(ClusterInfoDO infoDO);
|
ResponseData updateClusterInfo(ClusterInfoDO infoDO);
|
||||||
|
|
||||||
ResponseData peekClusterInfo();
|
ResponseData peekClusterInfo();
|
||||||
|
|
||||||
|
ResponseData getBrokerApiVersionInfo();
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -30,4 +30,6 @@ public interface OperationService {
|
|||||||
ResponseData currentReassignments();
|
ResponseData currentReassignments();
|
||||||
|
|
||||||
ResponseData cancelReassignment(TopicPartition partition);
|
ResponseData cancelReassignment(TopicPartition partition);
|
||||||
|
|
||||||
|
ResponseData proposedAssignments(String topic, List<Integer> brokerList);
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,15 +1,26 @@
|
|||||||
package com.xuxd.kafka.console.service.impl;
|
package com.xuxd.kafka.console.service.impl;
|
||||||
|
|
||||||
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
|
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
|
||||||
|
import com.xuxd.kafka.console.beans.ClusterInfo;
|
||||||
import com.xuxd.kafka.console.beans.ResponseData;
|
import com.xuxd.kafka.console.beans.ResponseData;
|
||||||
import com.xuxd.kafka.console.beans.dos.ClusterInfoDO;
|
import com.xuxd.kafka.console.beans.dos.ClusterInfoDO;
|
||||||
|
import com.xuxd.kafka.console.beans.vo.BrokerApiVersionVO;
|
||||||
import com.xuxd.kafka.console.beans.vo.ClusterInfoVO;
|
import com.xuxd.kafka.console.beans.vo.ClusterInfoVO;
|
||||||
import com.xuxd.kafka.console.dao.ClusterInfoMapper;
|
import com.xuxd.kafka.console.dao.ClusterInfoMapper;
|
||||||
import com.xuxd.kafka.console.service.ClusterService;
|
import com.xuxd.kafka.console.service.ClusterService;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.Comparator;
|
||||||
|
import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.TreeSet;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
import kafka.console.ClusterConsole;
|
import kafka.console.ClusterConsole;
|
||||||
import org.apache.commons.collections.CollectionUtils;
|
import org.apache.commons.collections.CollectionUtils;
|
||||||
|
import org.apache.commons.lang3.StringUtils;
|
||||||
|
import org.apache.kafka.clients.NodeApiVersions;
|
||||||
|
import org.apache.kafka.common.Node;
|
||||||
import org.springframework.beans.factory.ObjectProvider;
|
import org.springframework.beans.factory.ObjectProvider;
|
||||||
import org.springframework.stereotype.Service;
|
import org.springframework.stereotype.Service;
|
||||||
|
|
||||||
@@ -33,7 +44,9 @@ public class ClusterServiceImpl implements ClusterService {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override public ResponseData getClusterInfo() {
|
@Override public ResponseData getClusterInfo() {
|
||||||
return ResponseData.create().data(clusterConsole.clusterInfo()).success();
|
ClusterInfo clusterInfo = clusterConsole.clusterInfo();
|
||||||
|
clusterInfo.setNodes(new TreeSet<>(clusterInfo.getNodes()));
|
||||||
|
return ResponseData.create().data(clusterInfo).success();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override public ResponseData getClusterInfoList() {
|
@Override public ResponseData getClusterInfoList() {
|
||||||
@@ -69,4 +82,29 @@ public class ClusterServiceImpl implements ClusterService {
|
|||||||
return ResponseData.create().data(dos.stream().findFirst().map(ClusterInfoVO::from)).success();
|
return ResponseData.create().data(dos.stream().findFirst().map(ClusterInfoVO::from)).success();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override public ResponseData getBrokerApiVersionInfo() {
|
||||||
|
HashMap<Node, NodeApiVersions> map = clusterConsole.listBrokerVersionInfo();
|
||||||
|
List<BrokerApiVersionVO> list = new ArrayList<>(map.size());
|
||||||
|
map.forEach(((node, versions) -> {
|
||||||
|
BrokerApiVersionVO vo = new BrokerApiVersionVO();
|
||||||
|
vo.setBrokerId(node.id());
|
||||||
|
vo.setHost(node.host() + ":" + node.port());
|
||||||
|
vo.setSupportNums(versions.allSupportedApiVersions().size());
|
||||||
|
String versionInfo = versions.toString(true);
|
||||||
|
int from = 0;
|
||||||
|
int count = 0;
|
||||||
|
int index = -1;
|
||||||
|
while ((index = versionInfo.indexOf("UNSUPPORTED", from)) >= 0 && from < versionInfo.length()) {
|
||||||
|
count++;
|
||||||
|
from = index + 1;
|
||||||
|
}
|
||||||
|
vo.setUnSupportNums(count);
|
||||||
|
versionInfo = versionInfo.substring(1, versionInfo.length() - 2);
|
||||||
|
vo.setVersionInfo(Arrays.asList(StringUtils.split(versionInfo, ",")));
|
||||||
|
list.add(vo);
|
||||||
|
}));
|
||||||
|
Collections.sort(list, Comparator.comparingInt(BrokerApiVersionVO::getBrokerId));
|
||||||
|
return ResponseData.create().data(list).success();
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,6 +1,7 @@
|
|||||||
package com.xuxd.kafka.console.service.impl;
|
package com.xuxd.kafka.console.service.impl;
|
||||||
|
|
||||||
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
|
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
|
||||||
|
import com.google.common.collect.Lists;
|
||||||
import com.google.gson.Gson;
|
import com.google.gson.Gson;
|
||||||
import com.google.gson.JsonObject;
|
import com.google.gson.JsonObject;
|
||||||
import com.xuxd.kafka.console.beans.ResponseData;
|
import com.xuxd.kafka.console.beans.ResponseData;
|
||||||
@@ -10,6 +11,7 @@ import com.xuxd.kafka.console.beans.vo.OffsetAlignmentVO;
|
|||||||
import com.xuxd.kafka.console.dao.MinOffsetAlignmentMapper;
|
import com.xuxd.kafka.console.dao.MinOffsetAlignmentMapper;
|
||||||
import com.xuxd.kafka.console.service.OperationService;
|
import com.xuxd.kafka.console.service.OperationService;
|
||||||
import com.xuxd.kafka.console.utils.GsonUtil;
|
import com.xuxd.kafka.console.utils.GsonUtil;
|
||||||
|
import java.util.ArrayList;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
@@ -19,6 +21,7 @@ import java.util.Properties;
|
|||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
import kafka.console.OperationConsole;
|
import kafka.console.OperationConsole;
|
||||||
|
import org.apache.commons.lang3.StringUtils;
|
||||||
import org.apache.kafka.clients.admin.PartitionReassignment;
|
import org.apache.kafka.clients.admin.PartitionReassignment;
|
||||||
import org.apache.kafka.common.TopicPartition;
|
import org.apache.kafka.common.TopicPartition;
|
||||||
import org.springframework.beans.factory.ObjectProvider;
|
import org.springframework.beans.factory.ObjectProvider;
|
||||||
@@ -162,4 +165,21 @@ public class OperationServiceImpl implements OperationService {
|
|||||||
}
|
}
|
||||||
return ResponseData.create().success();
|
return ResponseData.create().success();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override public ResponseData proposedAssignments(String topic, List<Integer> brokerList) {
|
||||||
|
Map<String, Object> params = new HashMap<>();
|
||||||
|
params.put("version", 1);
|
||||||
|
Map<String, String> topicMap = new HashMap<>(1, 1.0f);
|
||||||
|
topicMap.put("topic", topic);
|
||||||
|
params.put("topics", Lists.newArrayList(topicMap));
|
||||||
|
List<String> list = brokerList.stream().map(String::valueOf).collect(Collectors.toList());
|
||||||
|
Map<TopicPartition, List<Object>> assignments = operationConsole.proposedAssignments(gson.toJson(params), StringUtils.join(list, ","));
|
||||||
|
List<CurrentReassignmentVO> res = new ArrayList<>(assignments.size());
|
||||||
|
assignments.forEach((tp, replicas) -> {
|
||||||
|
CurrentReassignmentVO vo = new CurrentReassignmentVO(tp.topic(), tp.partition(),
|
||||||
|
replicas.stream().map(x -> (Integer) x).collect(Collectors.toList()), null, null);
|
||||||
|
res.add(vo);
|
||||||
|
});
|
||||||
|
return ResponseData.create().data(res).success();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
330
src/main/scala/kafka/console/BrokerApiVersion.scala
Normal file
330
src/main/scala/kafka/console/BrokerApiVersion.scala
Normal file
@@ -0,0 +1,330 @@
|
|||||||
|
package kafka.console
|
||||||
|
|
||||||
|
import com.xuxd.kafka.console.config.ContextConfigHolder
|
||||||
|
import kafka.utils.Implicits.MapExtensionMethods
|
||||||
|
import kafka.utils.Logging
|
||||||
|
import org.apache.kafka.clients._
|
||||||
|
import org.apache.kafka.clients.admin.AdminClientConfig
|
||||||
|
import org.apache.kafka.clients.consumer.internals.{ConsumerNetworkClient, RequestFuture}
|
||||||
|
import org.apache.kafka.common.Node
|
||||||
|
import org.apache.kafka.common.config.ConfigDef.ValidString.in
|
||||||
|
import org.apache.kafka.common.config.ConfigDef.{Importance, Type}
|
||||||
|
import org.apache.kafka.common.config.{AbstractConfig, ConfigDef}
|
||||||
|
import org.apache.kafka.common.errors.AuthenticationException
|
||||||
|
import org.apache.kafka.common.internals.ClusterResourceListeners
|
||||||
|
import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersionCollection
|
||||||
|
import org.apache.kafka.common.metrics.Metrics
|
||||||
|
import org.apache.kafka.common.network.Selector
|
||||||
|
import org.apache.kafka.common.protocol.Errors
|
||||||
|
import org.apache.kafka.common.requests._
|
||||||
|
import org.apache.kafka.common.utils.{KafkaThread, LogContext, Time}
|
||||||
|
|
||||||
|
import java.io.IOException
|
||||||
|
import java.util.Properties
|
||||||
|
import java.util.concurrent.atomic.AtomicInteger
|
||||||
|
import java.util.concurrent.{ConcurrentLinkedQueue, TimeUnit}
|
||||||
|
import scala.jdk.CollectionConverters.{ListHasAsScala, MapHasAsJava, PropertiesHasAsScala, SetHasAsScala}
|
||||||
|
import scala.util.{Failure, Success, Try}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* kafka-console-ui.
|
||||||
|
*
|
||||||
|
* Copy from {@link kafka.admin.BrokerApiVersionsCommand}.
|
||||||
|
*
|
||||||
|
* @author xuxd
|
||||||
|
* @date 2022-01-22 15:15:57
|
||||||
|
* */
|
||||||
|
object BrokerApiVersion extends Logging {
|
||||||
|
|
||||||
|
def listAllBrokerApiVersionInfo(): java.util.HashMap[Node, NodeApiVersions] = {
|
||||||
|
val res = new java.util.HashMap[Node, NodeApiVersions]()
|
||||||
|
val adminClient = createAdminClient()
|
||||||
|
try {
|
||||||
|
adminClient.awaitBrokers()
|
||||||
|
val brokerMap = adminClient.listAllBrokerVersionInfo()
|
||||||
|
brokerMap.forKeyValue {
|
||||||
|
(broker, versionInfoOrError) =>
|
||||||
|
versionInfoOrError match {
|
||||||
|
case Success(v) => {
|
||||||
|
res.put(broker, v)
|
||||||
|
}
|
||||||
|
case Failure(v) => logger.error(s"${broker} -> ERROR: ${v}\n")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
adminClient.close()
|
||||||
|
}
|
||||||
|
|
||||||
|
res
|
||||||
|
}
|
||||||
|
|
||||||
|
private def createAdminClient(): AdminClient = {
|
||||||
|
val props = new Properties()
|
||||||
|
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, ContextConfigHolder.CONTEXT_CONFIG.get().getBootstrapServer())
|
||||||
|
props.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, ContextConfigHolder.CONTEXT_CONFIG.get().getRequestTimeoutMs())
|
||||||
|
props.putAll(ContextConfigHolder.CONTEXT_CONFIG.get().getProperties())
|
||||||
|
AdminClient.create(props)
|
||||||
|
}
|
||||||
|
|
||||||
|
// org.apache.kafka.clients.admin.AdminClient doesn't currently expose a way to retrieve the supported api versions.
|
||||||
|
// We inline the bits we need from kafka.admin.AdminClient so that we can delete it.
|
||||||
|
private class AdminClient(val time: Time,
|
||||||
|
val client: ConsumerNetworkClient,
|
||||||
|
val bootstrapBrokers: List[Node]) extends Logging {
|
||||||
|
|
||||||
|
@volatile var running = true
|
||||||
|
val pendingFutures = new ConcurrentLinkedQueue[RequestFuture[ClientResponse]]()
|
||||||
|
|
||||||
|
val networkThread = new KafkaThread("admin-client-network-thread", () => {
|
||||||
|
try {
|
||||||
|
while (running)
|
||||||
|
client.poll(time.timer(Long.MaxValue))
|
||||||
|
} catch {
|
||||||
|
case t: Throwable =>
|
||||||
|
error("admin-client-network-thread exited", t)
|
||||||
|
} finally {
|
||||||
|
pendingFutures.forEach { future =>
|
||||||
|
try {
|
||||||
|
future.raise(Errors.UNKNOWN_SERVER_ERROR)
|
||||||
|
} catch {
|
||||||
|
case _: IllegalStateException => // It is OK if the future has been completed
|
||||||
|
}
|
||||||
|
}
|
||||||
|
pendingFutures.clear()
|
||||||
|
}
|
||||||
|
}, true)
|
||||||
|
|
||||||
|
networkThread.start()
|
||||||
|
|
||||||
|
private def send(target: Node,
|
||||||
|
request: AbstractRequest.Builder[_ <: AbstractRequest]): AbstractResponse = {
|
||||||
|
val future = client.send(target, request)
|
||||||
|
pendingFutures.add(future)
|
||||||
|
future.awaitDone(Long.MaxValue, TimeUnit.MILLISECONDS)
|
||||||
|
pendingFutures.remove(future)
|
||||||
|
if (future.succeeded())
|
||||||
|
future.value().responseBody()
|
||||||
|
else
|
||||||
|
throw future.exception()
|
||||||
|
}
|
||||||
|
|
||||||
|
private def sendAnyNode(request: AbstractRequest.Builder[_ <: AbstractRequest]): AbstractResponse = {
|
||||||
|
bootstrapBrokers.foreach { broker =>
|
||||||
|
try {
|
||||||
|
return send(broker, request)
|
||||||
|
} catch {
|
||||||
|
case e: AuthenticationException =>
|
||||||
|
throw e
|
||||||
|
case e: Exception =>
|
||||||
|
debug(s"Request ${request.apiKey()} failed against node $broker", e)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
throw new RuntimeException(s"Request ${request.apiKey()} failed on brokers $bootstrapBrokers")
|
||||||
|
}
|
||||||
|
|
||||||
|
private def getApiVersions(node: Node): ApiVersionCollection = {
|
||||||
|
val response = send(node, new ApiVersionsRequest.Builder()).asInstanceOf[ApiVersionsResponse]
|
||||||
|
Errors.forCode(response.data.errorCode).maybeThrow()
|
||||||
|
response.data.apiKeys
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Wait until there is a non-empty list of brokers in the cluster.
|
||||||
|
*/
|
||||||
|
def awaitBrokers(): Unit = {
|
||||||
|
var nodes = List[Node]()
|
||||||
|
val start = System.currentTimeMillis()
|
||||||
|
val maxWait = 30 * 1000
|
||||||
|
do {
|
||||||
|
nodes = findAllBrokers()
|
||||||
|
if (nodes.isEmpty) {
|
||||||
|
Thread.sleep(50)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
while (nodes.isEmpty && (System.currentTimeMillis() - start < maxWait))
|
||||||
|
}
|
||||||
|
|
||||||
|
private def findAllBrokers(): List[Node] = {
|
||||||
|
val request = MetadataRequest.Builder.allTopics()
|
||||||
|
val response = sendAnyNode(request).asInstanceOf[MetadataResponse]
|
||||||
|
val errors = response.errors
|
||||||
|
if (!errors.isEmpty) {
|
||||||
|
logger.info(s"Metadata request contained errors: $errors")
|
||||||
|
}
|
||||||
|
|
||||||
|
// 在3.x版本中这个方法是buildCluster 代替cluster()了
|
||||||
|
// response.buildCluster.nodes.asScala.toList
|
||||||
|
response.cluster().nodes.asScala.toList
|
||||||
|
}
|
||||||
|
|
||||||
|
def listAllBrokerVersionInfo(): Map[Node, Try[NodeApiVersions]] =
|
||||||
|
findAllBrokers().map { broker =>
|
||||||
|
broker -> Try[NodeApiVersions](new NodeApiVersions(getApiVersions(broker)))
|
||||||
|
}.toMap
|
||||||
|
|
||||||
|
def close(): Unit = {
|
||||||
|
running = false
|
||||||
|
try {
|
||||||
|
client.close()
|
||||||
|
} catch {
|
||||||
|
case e: IOException =>
|
||||||
|
error("Exception closing nioSelector:", e)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
private object AdminClient {
|
||||||
|
val DefaultConnectionMaxIdleMs = 9 * 60 * 1000
|
||||||
|
val DefaultRequestTimeoutMs = 5000
|
||||||
|
val DefaultSocketConnectionSetupMs = CommonClientConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MS_CONFIG
|
||||||
|
val DefaultSocketConnectionSetupMaxMs = CommonClientConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_CONFIG
|
||||||
|
val DefaultMaxInFlightRequestsPerConnection = 100
|
||||||
|
val DefaultReconnectBackoffMs = 50
|
||||||
|
val DefaultReconnectBackoffMax = 50
|
||||||
|
val DefaultSendBufferBytes = 128 * 1024
|
||||||
|
val DefaultReceiveBufferBytes = 32 * 1024
|
||||||
|
val DefaultRetryBackoffMs = 100
|
||||||
|
|
||||||
|
val AdminClientIdSequence = new AtomicInteger(1)
|
||||||
|
val AdminConfigDef = {
|
||||||
|
val config = new ConfigDef()
|
||||||
|
.define(
|
||||||
|
CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG,
|
||||||
|
Type.LIST,
|
||||||
|
Importance.HIGH,
|
||||||
|
CommonClientConfigs.BOOTSTRAP_SERVERS_DOC)
|
||||||
|
.define(CommonClientConfigs.CLIENT_DNS_LOOKUP_CONFIG,
|
||||||
|
Type.STRING,
|
||||||
|
ClientDnsLookup.USE_ALL_DNS_IPS.toString,
|
||||||
|
in(ClientDnsLookup.USE_ALL_DNS_IPS.toString,
|
||||||
|
ClientDnsLookup.RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY.toString),
|
||||||
|
Importance.MEDIUM,
|
||||||
|
CommonClientConfigs.CLIENT_DNS_LOOKUP_DOC)
|
||||||
|
.define(
|
||||||
|
CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,
|
||||||
|
ConfigDef.Type.STRING,
|
||||||
|
CommonClientConfigs.DEFAULT_SECURITY_PROTOCOL,
|
||||||
|
ConfigDef.Importance.MEDIUM,
|
||||||
|
CommonClientConfigs.SECURITY_PROTOCOL_DOC)
|
||||||
|
.define(
|
||||||
|
CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG,
|
||||||
|
ConfigDef.Type.INT,
|
||||||
|
DefaultRequestTimeoutMs,
|
||||||
|
ConfigDef.Importance.MEDIUM,
|
||||||
|
CommonClientConfigs.REQUEST_TIMEOUT_MS_DOC)
|
||||||
|
.define(
|
||||||
|
CommonClientConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MS_CONFIG,
|
||||||
|
ConfigDef.Type.LONG,
|
||||||
|
CommonClientConfigs.DEFAULT_SOCKET_CONNECTION_SETUP_TIMEOUT_MS,
|
||||||
|
ConfigDef.Importance.MEDIUM,
|
||||||
|
CommonClientConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MS_DOC)
|
||||||
|
.define(
|
||||||
|
CommonClientConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_CONFIG,
|
||||||
|
ConfigDef.Type.LONG,
|
||||||
|
CommonClientConfigs.DEFAULT_SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS,
|
||||||
|
ConfigDef.Importance.MEDIUM,
|
||||||
|
CommonClientConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_DOC)
|
||||||
|
.define(
|
||||||
|
CommonClientConfigs.RETRY_BACKOFF_MS_CONFIG,
|
||||||
|
ConfigDef.Type.LONG,
|
||||||
|
DefaultRetryBackoffMs,
|
||||||
|
ConfigDef.Importance.MEDIUM,
|
||||||
|
CommonClientConfigs.RETRY_BACKOFF_MS_DOC)
|
||||||
|
.withClientSslSupport()
|
||||||
|
.withClientSaslSupport()
|
||||||
|
config
|
||||||
|
}
|
||||||
|
|
||||||
|
class AdminConfig(originals: Map[_, _]) extends AbstractConfig(AdminConfigDef, originals.asJava, false)
|
||||||
|
|
||||||
|
def create(props: Properties): AdminClient = {
|
||||||
|
val properties = new Properties()
|
||||||
|
val names = props.stringPropertyNames()
|
||||||
|
for (name <- names.asScala.toSet) {
|
||||||
|
properties.put(name, props.get(name).toString())
|
||||||
|
}
|
||||||
|
create(properties.asScala.toMap)
|
||||||
|
}
|
||||||
|
|
||||||
|
def create(props: Map[String, _]): AdminClient = create(new AdminConfig(props))
|
||||||
|
|
||||||
|
def create(config: AdminConfig): AdminClient = {
|
||||||
|
val clientId = "admin-" + AdminClientIdSequence.getAndIncrement()
|
||||||
|
val logContext = new LogContext(s"[LegacyAdminClient clientId=$clientId] ")
|
||||||
|
val time = Time.SYSTEM
|
||||||
|
val metrics = new Metrics(time)
|
||||||
|
val metadata = new Metadata(100L, 60 * 60 * 1000L, logContext,
|
||||||
|
new ClusterResourceListeners)
|
||||||
|
val channelBuilder = ClientUtils.createChannelBuilder(config, time, logContext)
|
||||||
|
val requestTimeoutMs = config.getInt(CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG)
|
||||||
|
val connectionSetupTimeoutMs = config.getLong(CommonClientConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MS_CONFIG)
|
||||||
|
val connectionSetupTimeoutMaxMs = config.getLong(CommonClientConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_CONFIG)
|
||||||
|
val retryBackoffMs = config.getLong(CommonClientConfigs.RETRY_BACKOFF_MS_CONFIG)
|
||||||
|
|
||||||
|
val brokerUrls = config.getList(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG)
|
||||||
|
val clientDnsLookup = config.getString(CommonClientConfigs.CLIENT_DNS_LOOKUP_CONFIG)
|
||||||
|
val brokerAddresses = ClientUtils.parseAndValidateAddresses(brokerUrls, clientDnsLookup)
|
||||||
|
metadata.bootstrap(brokerAddresses)
|
||||||
|
|
||||||
|
val selector = new Selector(
|
||||||
|
DefaultConnectionMaxIdleMs,
|
||||||
|
metrics,
|
||||||
|
time,
|
||||||
|
"admin",
|
||||||
|
channelBuilder,
|
||||||
|
logContext)
|
||||||
|
|
||||||
|
// 版本不一样,这个地方的兼容性问题也不一样了
|
||||||
|
// 3.x版本用这个
|
||||||
|
// val networkClient = new NetworkClient(
|
||||||
|
// selector,
|
||||||
|
// metadata,
|
||||||
|
// clientId,
|
||||||
|
// DefaultMaxInFlightRequestsPerConnection,
|
||||||
|
// DefaultReconnectBackoffMs,
|
||||||
|
// DefaultReconnectBackoffMax,
|
||||||
|
// DefaultSendBufferBytes,
|
||||||
|
// DefaultReceiveBufferBytes,
|
||||||
|
// requestTimeoutMs,
|
||||||
|
// connectionSetupTimeoutMs,
|
||||||
|
// connectionSetupTimeoutMaxMs,
|
||||||
|
// time,
|
||||||
|
// true,
|
||||||
|
// new ApiVersions,
|
||||||
|
// logContext)
|
||||||
|
|
||||||
|
val networkClient = new NetworkClient(
|
||||||
|
selector,
|
||||||
|
metadata,
|
||||||
|
clientId,
|
||||||
|
DefaultMaxInFlightRequestsPerConnection,
|
||||||
|
DefaultReconnectBackoffMs,
|
||||||
|
DefaultReconnectBackoffMax,
|
||||||
|
DefaultSendBufferBytes,
|
||||||
|
DefaultReceiveBufferBytes,
|
||||||
|
requestTimeoutMs,
|
||||||
|
connectionSetupTimeoutMs,
|
||||||
|
connectionSetupTimeoutMaxMs,
|
||||||
|
ClientDnsLookup.USE_ALL_DNS_IPS,
|
||||||
|
time,
|
||||||
|
true,
|
||||||
|
new ApiVersions,
|
||||||
|
logContext)
|
||||||
|
|
||||||
|
val highLevelClient = new ConsumerNetworkClient(
|
||||||
|
logContext,
|
||||||
|
networkClient,
|
||||||
|
metadata,
|
||||||
|
time,
|
||||||
|
retryBackoffMs,
|
||||||
|
requestTimeoutMs,
|
||||||
|
Integer.MAX_VALUE)
|
||||||
|
|
||||||
|
new AdminClient(
|
||||||
|
time,
|
||||||
|
highLevelClient,
|
||||||
|
metadata.fetch.nodes.asScala.toList)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -1,11 +1,13 @@
|
|||||||
package kafka.console
|
package kafka.console
|
||||||
|
|
||||||
|
import com.xuxd.kafka.console.beans.{BrokerNode, ClusterInfo}
|
||||||
|
import com.xuxd.kafka.console.config.{ContextConfigHolder, KafkaConfig}
|
||||||
|
import org.apache.kafka.clients.NodeApiVersions
|
||||||
|
import org.apache.kafka.clients.admin.DescribeClusterResult
|
||||||
|
import org.apache.kafka.common.Node
|
||||||
|
|
||||||
import java.util.Collections
|
import java.util.Collections
|
||||||
import java.util.concurrent.TimeUnit
|
import java.util.concurrent.TimeUnit
|
||||||
import com.xuxd.kafka.console.beans.{BrokerNode, ClusterInfo}
|
|
||||||
import com.xuxd.kafka.console.config.{ContextConfigHolder, KafkaConfig}
|
|
||||||
import org.apache.kafka.clients.admin.DescribeClusterResult
|
|
||||||
|
|
||||||
import scala.jdk.CollectionConverters.{CollectionHasAsScala, SetHasAsJava, SetHasAsScala}
|
import scala.jdk.CollectionConverters.{CollectionHasAsScala, SetHasAsJava, SetHasAsScala}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -41,4 +43,8 @@ class ClusterConsole(config: KafkaConfig) extends KafkaConsole(config: KafkaConf
|
|||||||
new ClusterInfo
|
new ClusterInfo
|
||||||
}).asInstanceOf[ClusterInfo]
|
}).asInstanceOf[ClusterInfo]
|
||||||
}
|
}
|
||||||
|
|
||||||
|
def listBrokerVersionInfo(): java.util.HashMap[Node, NodeApiVersions] = {
|
||||||
|
BrokerApiVersion.listAllBrokerApiVersionInfo()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -242,8 +242,8 @@ class OperationConsole(config: KafkaConfig, topicConsole: TopicConsole,
|
|||||||
withAdminClientAndCatchError(admin => {
|
withAdminClientAndCatchError(admin => {
|
||||||
admin.listPartitionReassignments(withTimeoutMs(new ListPartitionReassignmentsOptions)).reassignments().get()
|
admin.listPartitionReassignments(withTimeoutMs(new ListPartitionReassignmentsOptions)).reassignments().get()
|
||||||
}, e => {
|
}, e => {
|
||||||
Collections.emptyMap()
|
|
||||||
log.error("listPartitionReassignments error.", e)
|
log.error("listPartitionReassignments error.", e)
|
||||||
|
Collections.emptyMap()
|
||||||
}).asInstanceOf[util.Map[TopicPartition, PartitionReassignment]]
|
}).asInstanceOf[util.Map[TopicPartition, PartitionReassignment]]
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -256,4 +256,20 @@ class OperationConsole(config: KafkaConfig, topicConsole: TopicConsole,
|
|||||||
throw e
|
throw e
|
||||||
}).asInstanceOf[util.Map[TopicPartition, Throwable]]
|
}).asInstanceOf[util.Map[TopicPartition, Throwable]]
|
||||||
}
|
}
|
||||||
|
|
||||||
|
def proposedAssignments(reassignmentJson: String,
|
||||||
|
brokerListString: String): util.Map[TopicPartition, util.List[Int]] = {
|
||||||
|
withAdminClientAndCatchError(admin => {
|
||||||
|
val map = ReassignPartitionsCommand.generateAssignment(admin, reassignmentJson, brokerListString, true)._1
|
||||||
|
val res = new util.HashMap[TopicPartition, util.List[Int]]()
|
||||||
|
for (tp <- map.keys) {
|
||||||
|
res.put(tp, map(tp).asJava)
|
||||||
|
// res.put(tp, map.getOrElse(tp, Seq.empty).asJava)
|
||||||
|
}
|
||||||
|
res
|
||||||
|
}, e => {
|
||||||
|
log.error("proposedAssignments error.", e)
|
||||||
|
throw e
|
||||||
|
})
|
||||||
|
}.asInstanceOf[util.Map[TopicPartition, util.List[Int]]]
|
||||||
}
|
}
|
||||||
160
ui/package-lock.json
generated
160
ui/package-lock.json
generated
@@ -1820,6 +1820,63 @@
|
|||||||
"integrity": "sha1-/q7SVZc9LndVW4PbwIhRpsY1IPo=",
|
"integrity": "sha1-/q7SVZc9LndVW4PbwIhRpsY1IPo=",
|
||||||
"dev": true
|
"dev": true
|
||||||
},
|
},
|
||||||
|
"ansi-styles": {
|
||||||
|
"version": "4.3.0",
|
||||||
|
"resolved": "https://registry.npmjs.org/ansi-styles/-/ansi-styles-4.3.0.tgz",
|
||||||
|
"integrity": "sha512-zbB9rCJAT1rbjiVDb2hqKFHNYLxgtk8NURxZ3IZwD3F6NtxbXZQCnnSi1Lkx+IDohdPlFp222wVALIheZJQSEg==",
|
||||||
|
"dev": true,
|
||||||
|
"optional": true,
|
||||||
|
"requires": {
|
||||||
|
"color-convert": "^2.0.1"
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"chalk": {
|
||||||
|
"version": "4.1.2",
|
||||||
|
"resolved": "https://registry.npmjs.org/chalk/-/chalk-4.1.2.tgz",
|
||||||
|
"integrity": "sha512-oKnbhFyRIXpUuez8iBMmyEa4nbj4IOQyuhc/wy9kY7/WVPcwIO9VA668Pu8RkO7+0G76SLROeyw9CpQ061i4mA==",
|
||||||
|
"dev": true,
|
||||||
|
"optional": true,
|
||||||
|
"requires": {
|
||||||
|
"ansi-styles": "^4.1.0",
|
||||||
|
"supports-color": "^7.1.0"
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"color-convert": {
|
||||||
|
"version": "2.0.1",
|
||||||
|
"resolved": "https://registry.npmjs.org/color-convert/-/color-convert-2.0.1.tgz",
|
||||||
|
"integrity": "sha512-RRECPsj7iu/xb5oKYcsFHSppFNnsj/52OVTRKb4zP5onXwVF3zVmmToNcOfGC+CRDpfK/U584fMg38ZHCaElKQ==",
|
||||||
|
"dev": true,
|
||||||
|
"optional": true,
|
||||||
|
"requires": {
|
||||||
|
"color-name": "~1.1.4"
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"color-name": {
|
||||||
|
"version": "1.1.4",
|
||||||
|
"resolved": "https://registry.npmjs.org/color-name/-/color-name-1.1.4.tgz",
|
||||||
|
"integrity": "sha512-dOy+3AuW3a2wNbZHIuMZpTcgjGuLU/uBL/ubcZF9OXbDo8ff4O8yVp5Bf0efS8uEoYo5q4Fx7dY9OgQGXgAsQA==",
|
||||||
|
"dev": true,
|
||||||
|
"optional": true
|
||||||
|
},
|
||||||
|
"has-flag": {
|
||||||
|
"version": "4.0.0",
|
||||||
|
"resolved": "https://registry.npmjs.org/has-flag/-/has-flag-4.0.0.tgz",
|
||||||
|
"integrity": "sha512-EykJT/Q1KjTWctppgIAgfSO0tKVuZUjhgMr17kqTumMl6Afv3EISleU7qZUzoXDFTAHTDC4NOoG/ZxU3EvlMPQ==",
|
||||||
|
"dev": true,
|
||||||
|
"optional": true
|
||||||
|
},
|
||||||
|
"loader-utils": {
|
||||||
|
"version": "2.0.2",
|
||||||
|
"resolved": "https://registry.npmjs.org/loader-utils/-/loader-utils-2.0.2.tgz",
|
||||||
|
"integrity": "sha512-TM57VeHptv569d/GKh6TAYdzKblwDNiumOdkFnejjD0XwTH87K90w3O7AiJRqdQoXygvi1VQTJTLGhJl7WqA7A==",
|
||||||
|
"dev": true,
|
||||||
|
"optional": true,
|
||||||
|
"requires": {
|
||||||
|
"big.js": "^5.2.2",
|
||||||
|
"emojis-list": "^3.0.0",
|
||||||
|
"json5": "^2.1.2"
|
||||||
|
}
|
||||||
|
},
|
||||||
"ssri": {
|
"ssri": {
|
||||||
"version": "8.0.1",
|
"version": "8.0.1",
|
||||||
"resolved": "https://registry.nlark.com/ssri/download/ssri-8.0.1.tgz",
|
"resolved": "https://registry.nlark.com/ssri/download/ssri-8.0.1.tgz",
|
||||||
@@ -1828,6 +1885,28 @@
|
|||||||
"requires": {
|
"requires": {
|
||||||
"minipass": "^3.1.1"
|
"minipass": "^3.1.1"
|
||||||
}
|
}
|
||||||
|
},
|
||||||
|
"supports-color": {
|
||||||
|
"version": "7.2.0",
|
||||||
|
"resolved": "https://registry.npmjs.org/supports-color/-/supports-color-7.2.0.tgz",
|
||||||
|
"integrity": "sha512-qpCAvRl9stuOHveKsn7HncJRvv501qIacKzQlO/+Lwxc9+0q2wLyv4Dfvt80/DPn2pqOBsJdDiogXGR9+OvwRw==",
|
||||||
|
"dev": true,
|
||||||
|
"optional": true,
|
||||||
|
"requires": {
|
||||||
|
"has-flag": "^4.0.0"
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"vue-loader-v16": {
|
||||||
|
"version": "npm:vue-loader@16.8.3",
|
||||||
|
"resolved": "https://registry.npmjs.org/vue-loader/-/vue-loader-16.8.3.tgz",
|
||||||
|
"integrity": "sha512-7vKN45IxsKxe5GcVCbc2qFU5aWzyiLrYJyUuMz4BQLKctCj/fmCa0w6fGiiQ2cLFetNcek1ppGJQDCup0c1hpA==",
|
||||||
|
"dev": true,
|
||||||
|
"optional": true,
|
||||||
|
"requires": {
|
||||||
|
"chalk": "^4.1.0",
|
||||||
|
"hash-sum": "^2.0.0",
|
||||||
|
"loader-utils": "^2.0.0"
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
@@ -12097,87 +12176,6 @@
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
"vue-loader-v16": {
|
|
||||||
"version": "npm:vue-loader@16.8.3",
|
|
||||||
"resolved": "https://registry.npmjs.org/vue-loader/-/vue-loader-16.8.3.tgz",
|
|
||||||
"integrity": "sha512-7vKN45IxsKxe5GcVCbc2qFU5aWzyiLrYJyUuMz4BQLKctCj/fmCa0w6fGiiQ2cLFetNcek1ppGJQDCup0c1hpA==",
|
|
||||||
"dev": true,
|
|
||||||
"optional": true,
|
|
||||||
"requires": {
|
|
||||||
"chalk": "^4.1.0",
|
|
||||||
"hash-sum": "^2.0.0",
|
|
||||||
"loader-utils": "^2.0.0"
|
|
||||||
},
|
|
||||||
"dependencies": {
|
|
||||||
"ansi-styles": {
|
|
||||||
"version": "4.3.0",
|
|
||||||
"resolved": "https://registry.npmjs.org/ansi-styles/-/ansi-styles-4.3.0.tgz",
|
|
||||||
"integrity": "sha512-zbB9rCJAT1rbjiVDb2hqKFHNYLxgtk8NURxZ3IZwD3F6NtxbXZQCnnSi1Lkx+IDohdPlFp222wVALIheZJQSEg==",
|
|
||||||
"dev": true,
|
|
||||||
"optional": true,
|
|
||||||
"requires": {
|
|
||||||
"color-convert": "^2.0.1"
|
|
||||||
}
|
|
||||||
},
|
|
||||||
"chalk": {
|
|
||||||
"version": "4.1.2",
|
|
||||||
"resolved": "https://registry.npmjs.org/chalk/-/chalk-4.1.2.tgz",
|
|
||||||
"integrity": "sha512-oKnbhFyRIXpUuez8iBMmyEa4nbj4IOQyuhc/wy9kY7/WVPcwIO9VA668Pu8RkO7+0G76SLROeyw9CpQ061i4mA==",
|
|
||||||
"dev": true,
|
|
||||||
"optional": true,
|
|
||||||
"requires": {
|
|
||||||
"ansi-styles": "^4.1.0",
|
|
||||||
"supports-color": "^7.1.0"
|
|
||||||
}
|
|
||||||
},
|
|
||||||
"color-convert": {
|
|
||||||
"version": "2.0.1",
|
|
||||||
"resolved": "https://registry.npmjs.org/color-convert/-/color-convert-2.0.1.tgz",
|
|
||||||
"integrity": "sha512-RRECPsj7iu/xb5oKYcsFHSppFNnsj/52OVTRKb4zP5onXwVF3zVmmToNcOfGC+CRDpfK/U584fMg38ZHCaElKQ==",
|
|
||||||
"dev": true,
|
|
||||||
"optional": true,
|
|
||||||
"requires": {
|
|
||||||
"color-name": "~1.1.4"
|
|
||||||
}
|
|
||||||
},
|
|
||||||
"color-name": {
|
|
||||||
"version": "1.1.4",
|
|
||||||
"resolved": "https://registry.npmjs.org/color-name/-/color-name-1.1.4.tgz",
|
|
||||||
"integrity": "sha512-dOy+3AuW3a2wNbZHIuMZpTcgjGuLU/uBL/ubcZF9OXbDo8ff4O8yVp5Bf0efS8uEoYo5q4Fx7dY9OgQGXgAsQA==",
|
|
||||||
"dev": true,
|
|
||||||
"optional": true
|
|
||||||
},
|
|
||||||
"has-flag": {
|
|
||||||
"version": "4.0.0",
|
|
||||||
"resolved": "https://registry.npmjs.org/has-flag/-/has-flag-4.0.0.tgz",
|
|
||||||
"integrity": "sha512-EykJT/Q1KjTWctppgIAgfSO0tKVuZUjhgMr17kqTumMl6Afv3EISleU7qZUzoXDFTAHTDC4NOoG/ZxU3EvlMPQ==",
|
|
||||||
"dev": true,
|
|
||||||
"optional": true
|
|
||||||
},
|
|
||||||
"loader-utils": {
|
|
||||||
"version": "2.0.2",
|
|
||||||
"resolved": "https://registry.npmjs.org/loader-utils/-/loader-utils-2.0.2.tgz",
|
|
||||||
"integrity": "sha512-TM57VeHptv569d/GKh6TAYdzKblwDNiumOdkFnejjD0XwTH87K90w3O7AiJRqdQoXygvi1VQTJTLGhJl7WqA7A==",
|
|
||||||
"dev": true,
|
|
||||||
"optional": true,
|
|
||||||
"requires": {
|
|
||||||
"big.js": "^5.2.2",
|
|
||||||
"emojis-list": "^3.0.0",
|
|
||||||
"json5": "^2.1.2"
|
|
||||||
}
|
|
||||||
},
|
|
||||||
"supports-color": {
|
|
||||||
"version": "7.2.0",
|
|
||||||
"resolved": "https://registry.npmjs.org/supports-color/-/supports-color-7.2.0.tgz",
|
|
||||||
"integrity": "sha512-qpCAvRl9stuOHveKsn7HncJRvv501qIacKzQlO/+Lwxc9+0q2wLyv4Dfvt80/DPn2pqOBsJdDiogXGR9+OvwRw==",
|
|
||||||
"dev": true,
|
|
||||||
"optional": true,
|
|
||||||
"requires": {
|
|
||||||
"has-flag": "^4.0.0"
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
},
|
|
||||||
"vue-ref": {
|
"vue-ref": {
|
||||||
"version": "2.0.0",
|
"version": "2.0.0",
|
||||||
"resolved": "https://registry.npm.taobao.org/vue-ref/download/vue-ref-2.0.0.tgz",
|
"resolved": "https://registry.npm.taobao.org/vue-ref/download/vue-ref-2.0.0.tgz",
|
||||||
|
|||||||
@@ -203,6 +203,10 @@ export const KafkaClusterApi = {
|
|||||||
url: "/cluster/info/peek",
|
url: "/cluster/info/peek",
|
||||||
method: "get",
|
method: "get",
|
||||||
},
|
},
|
||||||
|
getBrokerApiVersionInfo: {
|
||||||
|
url: "/cluster/info/api/version",
|
||||||
|
method: "get",
|
||||||
|
},
|
||||||
};
|
};
|
||||||
|
|
||||||
export const KafkaOpApi = {
|
export const KafkaOpApi = {
|
||||||
@@ -242,6 +246,10 @@ export const KafkaOpApi = {
|
|||||||
url: "/op/replication/reassignments",
|
url: "/op/replication/reassignments",
|
||||||
method: "delete",
|
method: "delete",
|
||||||
},
|
},
|
||||||
|
proposedAssignment: {
|
||||||
|
url: "/op/replication/reassignments/proposed",
|
||||||
|
method: "post",
|
||||||
|
},
|
||||||
};
|
};
|
||||||
export const KafkaMessageApi = {
|
export const KafkaMessageApi = {
|
||||||
searchByTime: {
|
searchByTime: {
|
||||||
|
|||||||
@@ -27,7 +27,7 @@ request.interceptors.request.use((config) => {
|
|||||||
const clusterInfo = getClusterInfo();
|
const clusterInfo = getClusterInfo();
|
||||||
if (clusterInfo) {
|
if (clusterInfo) {
|
||||||
config.headers["X-Cluster-Info-Id"] = clusterInfo.id;
|
config.headers["X-Cluster-Info-Id"] = clusterInfo.id;
|
||||||
config.headers["X-Cluster-Info-Name"] = clusterInfo.clusterName;
|
// config.headers["X-Cluster-Info-Name"] = encodeURIComponent(clusterInfo.clusterName);
|
||||||
}
|
}
|
||||||
return config;
|
return config;
|
||||||
}, errorHandler);
|
}, errorHandler);
|
||||||
|
|||||||
@@ -1,25 +1,66 @@
|
|||||||
<template>
|
<template>
|
||||||
<div class="home">
|
<div class="home">
|
||||||
<a-card title="kafka console 配置" style="width: 100%">
|
<a-card title="控制台默认配置" class="card-style">
|
||||||
<!-- <a slot="extra" href="#">more</a>-->
|
|
||||||
<p v-for="(v, k) in config" :key="k">{{ k }}={{ v }}</p>
|
<p v-for="(v, k) in config" :key="k">{{ k }}={{ v }}</p>
|
||||||
</a-card>
|
</a-card>
|
||||||
|
<p></p>
|
||||||
|
<hr />
|
||||||
|
<h3>kafka API 版本兼容性</h3>
|
||||||
|
<a-spin :spinning="apiVersionInfoLoading">
|
||||||
|
<a-table
|
||||||
|
:columns="columns"
|
||||||
|
:data-source="brokerApiVersionInfo"
|
||||||
|
bordered
|
||||||
|
row-key="brokerId"
|
||||||
|
>
|
||||||
|
<div slot="operation" slot-scope="record">
|
||||||
|
<a-button
|
||||||
|
size="small"
|
||||||
|
href="javascript:;"
|
||||||
|
class="operation-btn"
|
||||||
|
@click="openApiVersionInfoDialog(record)"
|
||||||
|
>详情
|
||||||
|
</a-button>
|
||||||
|
</div>
|
||||||
|
</a-table>
|
||||||
|
</a-spin>
|
||||||
|
<VersionInfo
|
||||||
|
:version-info="apiVersionInfo"
|
||||||
|
:visible="showApiVersionInfoDialog"
|
||||||
|
@closeApiVersionInfoDialog="closeApiVersionInfoDialog"
|
||||||
|
>
|
||||||
|
</VersionInfo>
|
||||||
</div>
|
</div>
|
||||||
</template>
|
</template>
|
||||||
|
|
||||||
<script>
|
<script>
|
||||||
// @ is an alias to /src
|
// @ is an alias to /src
|
||||||
import request from "@/utils/request";
|
import request from "@/utils/request";
|
||||||
import { KafkaConfigApi } from "@/utils/api";
|
import { KafkaConfigApi, KafkaClusterApi } from "@/utils/api";
|
||||||
import notification from "ant-design-vue/lib/notification";
|
import notification from "ant-design-vue/lib/notification";
|
||||||
|
import VersionInfo from "@/views/home/VersionInfo";
|
||||||
export default {
|
export default {
|
||||||
name: "Home",
|
name: "Home",
|
||||||
components: {},
|
components: { VersionInfo },
|
||||||
data() {
|
data() {
|
||||||
return {
|
return {
|
||||||
config: {},
|
config: {},
|
||||||
|
columns,
|
||||||
|
brokerApiVersionInfo: [],
|
||||||
|
showApiVersionInfoDialog: false,
|
||||||
|
apiVersionInfo: [],
|
||||||
|
apiVersionInfoLoading: false,
|
||||||
};
|
};
|
||||||
},
|
},
|
||||||
|
methods: {
|
||||||
|
openApiVersionInfoDialog(record) {
|
||||||
|
this.apiVersionInfo = record.versionInfo;
|
||||||
|
this.showApiVersionInfoDialog = true;
|
||||||
|
},
|
||||||
|
closeApiVersionInfoDialog() {
|
||||||
|
this.showApiVersionInfoDialog = false;
|
||||||
|
},
|
||||||
|
},
|
||||||
|
|
||||||
created() {
|
created() {
|
||||||
request({
|
request({
|
||||||
@@ -35,6 +76,53 @@ export default {
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
this.apiVersionInfoLoading = true;
|
||||||
|
request({
|
||||||
|
url: KafkaClusterApi.getBrokerApiVersionInfo.url,
|
||||||
|
method: KafkaClusterApi.getBrokerApiVersionInfo.method,
|
||||||
|
}).then((res) => {
|
||||||
|
this.apiVersionInfoLoading = false;
|
||||||
|
if (res.code == 0) {
|
||||||
|
this.brokerApiVersionInfo = res.data;
|
||||||
|
} else {
|
||||||
|
notification.error({
|
||||||
|
message: "error",
|
||||||
|
description: res.msg,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
});
|
||||||
},
|
},
|
||||||
};
|
};
|
||||||
|
const columns = [
|
||||||
|
{
|
||||||
|
title: "id",
|
||||||
|
dataIndex: "brokerId",
|
||||||
|
key: "brokerId",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
title: "地址",
|
||||||
|
dataIndex: "host",
|
||||||
|
key: "host",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
title: "支持的api数量",
|
||||||
|
dataIndex: "supportNums",
|
||||||
|
key: "supportNums",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
title: "不支持的api数量",
|
||||||
|
dataIndex: "unSupportNums",
|
||||||
|
key: "unSupportNums",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
title: "操作",
|
||||||
|
key: "operation",
|
||||||
|
scopedSlots: { customRender: "operation" },
|
||||||
|
},
|
||||||
|
];
|
||||||
</script>
|
</script>
|
||||||
|
<style scoped>
|
||||||
|
.card-style {
|
||||||
|
width: 100%;
|
||||||
|
}
|
||||||
|
</style>
|
||||||
|
|||||||
61
ui/src/views/home/VersionInfo.vue
Normal file
61
ui/src/views/home/VersionInfo.vue
Normal file
@@ -0,0 +1,61 @@
|
|||||||
|
<template>
|
||||||
|
<a-modal
|
||||||
|
title="API版本信息"
|
||||||
|
:visible="show"
|
||||||
|
:width="600"
|
||||||
|
:mask="false"
|
||||||
|
:destroyOnClose="true"
|
||||||
|
:footer="null"
|
||||||
|
:maskClosable="false"
|
||||||
|
@cancel="handleCancel"
|
||||||
|
>
|
||||||
|
<div>
|
||||||
|
<h3>格式说明</h3>
|
||||||
|
<p>请求类型(1):0 to n(2) [usage: v](3)</p>
|
||||||
|
<ol>
|
||||||
|
<li>表示客户端发出的请求类型</li>
|
||||||
|
<li>该请求在broker中支持的版本号区间</li>
|
||||||
|
<li>
|
||||||
|
表示当前控制台的kafka客户端使用的是v版本,如果是UNSUPPORTED,说明broker版本太老,无法处理控制台的这些请求,可能影响相关功能的使用
|
||||||
|
</li>
|
||||||
|
</ol>
|
||||||
|
|
||||||
|
<hr />
|
||||||
|
<ol>
|
||||||
|
<li v-for="info in versionInfo" v-bind:key="info">{{ info }}</li>
|
||||||
|
</ol>
|
||||||
|
</div>
|
||||||
|
</a-modal>
|
||||||
|
</template>
|
||||||
|
|
||||||
|
<script>
|
||||||
|
export default {
|
||||||
|
name: "APIVersionInfo",
|
||||||
|
props: {
|
||||||
|
versionInfo: {
|
||||||
|
type: Array,
|
||||||
|
},
|
||||||
|
visible: {
|
||||||
|
type: Boolean,
|
||||||
|
default: false,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
data() {
|
||||||
|
return {
|
||||||
|
show: false,
|
||||||
|
};
|
||||||
|
},
|
||||||
|
watch: {
|
||||||
|
visible(v) {
|
||||||
|
this.show = v;
|
||||||
|
},
|
||||||
|
},
|
||||||
|
methods: {
|
||||||
|
handleCancel() {
|
||||||
|
this.$emit("closeApiVersionInfoDialog", {});
|
||||||
|
},
|
||||||
|
},
|
||||||
|
};
|
||||||
|
</script>
|
||||||
|
|
||||||
|
<style scoped></style>
|
||||||
@@ -203,7 +203,7 @@ export default {
|
|||||||
this.$emit("closeDetailDialog", { refresh: false });
|
this.$emit("closeDetailDialog", { refresh: false });
|
||||||
},
|
},
|
||||||
formatTime(time) {
|
formatTime(time) {
|
||||||
return moment(time).format("YYYY-MM-DD HH:mm:ss:SSS");
|
return time == -1 ? -1 : moment(time).format("YYYY-MM-DD HH:mm:ss:SSS");
|
||||||
},
|
},
|
||||||
keyDeserializerChange() {
|
keyDeserializerChange() {
|
||||||
this.getMessageDetail();
|
this.getMessageDetail();
|
||||||
|
|||||||
@@ -9,6 +9,7 @@
|
|||||||
return index;
|
return index;
|
||||||
}
|
}
|
||||||
"
|
"
|
||||||
|
@change="handleChange"
|
||||||
>
|
>
|
||||||
<div slot="operation" slot-scope="record">
|
<div slot="operation" slot-scope="record">
|
||||||
<a-button
|
<a-button
|
||||||
@@ -41,9 +42,9 @@ export default {
|
|||||||
},
|
},
|
||||||
data() {
|
data() {
|
||||||
return {
|
return {
|
||||||
columns: columns,
|
|
||||||
showDetailDialog: false,
|
showDetailDialog: false,
|
||||||
record: {},
|
record: {},
|
||||||
|
sortedInfo: null,
|
||||||
};
|
};
|
||||||
},
|
},
|
||||||
methods: {
|
methods: {
|
||||||
@@ -54,42 +55,56 @@ export default {
|
|||||||
closeDetailDialog() {
|
closeDetailDialog() {
|
||||||
this.showDetailDialog = false;
|
this.showDetailDialog = false;
|
||||||
},
|
},
|
||||||
},
|
handleChange() {
|
||||||
};
|
this.sortedInfo = arguments[2];
|
||||||
const columns = [
|
|
||||||
{
|
|
||||||
title: "topic",
|
|
||||||
dataIndex: "topic",
|
|
||||||
key: "topic",
|
|
||||||
width: 300,
|
|
||||||
},
|
|
||||||
{
|
|
||||||
title: "分区",
|
|
||||||
dataIndex: "partition",
|
|
||||||
key: "partition",
|
|
||||||
},
|
|
||||||
{
|
|
||||||
title: "偏移",
|
|
||||||
dataIndex: "offset",
|
|
||||||
key: "offset",
|
|
||||||
},
|
|
||||||
{
|
|
||||||
title: "时间",
|
|
||||||
dataIndex: "timestamp",
|
|
||||||
key: "timestamp",
|
|
||||||
slots: { title: "timestamp" },
|
|
||||||
scopedSlots: { customRender: "timestamp" },
|
|
||||||
customRender: (text) => {
|
|
||||||
return moment(text).format("YYYY-MM-DD HH:mm:ss:SSS");
|
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
{
|
computed: {
|
||||||
title: "操作",
|
columns() {
|
||||||
key: "operation",
|
let sortedInfo = this.sortedInfo || {};
|
||||||
scopedSlots: { customRender: "operation" },
|
const columns = [
|
||||||
width: 200,
|
{
|
||||||
|
title: "topic",
|
||||||
|
dataIndex: "topic",
|
||||||
|
key: "topic",
|
||||||
|
width: 300,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
title: "分区",
|
||||||
|
dataIndex: "partition",
|
||||||
|
key: "partition",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
title: "偏移",
|
||||||
|
dataIndex: "offset",
|
||||||
|
key: "offset",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
title: "时间",
|
||||||
|
dataIndex: "timestamp",
|
||||||
|
key: "timestamp",
|
||||||
|
slots: { title: "timestamp" },
|
||||||
|
scopedSlots: { customRender: "timestamp" },
|
||||||
|
customRender: (text) => {
|
||||||
|
return text == -1
|
||||||
|
? -1
|
||||||
|
: moment(text).format("YYYY-MM-DD HH:mm:ss:SSS");
|
||||||
|
},
|
||||||
|
sorter: (a, b) => a.timestamp - b.timestamp,
|
||||||
|
sortOrder: sortedInfo.columnKey === "timestamp" && sortedInfo.order,
|
||||||
|
sortDirections: ["ascend", "descend"],
|
||||||
|
},
|
||||||
|
{
|
||||||
|
title: "操作",
|
||||||
|
key: "operation",
|
||||||
|
scopedSlots: { customRender: "operation" },
|
||||||
|
width: 200,
|
||||||
|
},
|
||||||
|
];
|
||||||
|
return columns;
|
||||||
|
},
|
||||||
},
|
},
|
||||||
];
|
};
|
||||||
</script>
|
</script>
|
||||||
|
|
||||||
<style scoped></style>
|
<style scoped></style>
|
||||||
|
|||||||
@@ -69,6 +69,9 @@ sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule require
|
|||||||
import request from "@/utils/request";
|
import request from "@/utils/request";
|
||||||
import { KafkaClusterApi } from "@/utils/api";
|
import { KafkaClusterApi } from "@/utils/api";
|
||||||
import notification from "ant-design-vue/es/notification";
|
import notification from "ant-design-vue/es/notification";
|
||||||
|
import { getClusterInfo } from "@/utils/local-cache";
|
||||||
|
import { mapMutations } from "vuex";
|
||||||
|
import { CLUSTER } from "@/store/mutation-types";
|
||||||
|
|
||||||
export default {
|
export default {
|
||||||
name: "AddClusterInfo",
|
name: "AddClusterInfo",
|
||||||
@@ -124,6 +127,17 @@ export default {
|
|||||||
if (res.code == 0) {
|
if (res.code == 0) {
|
||||||
this.$message.success(res.msg);
|
this.$message.success(res.msg);
|
||||||
this.$emit(this.closeDialogEvent, { refresh: true });
|
this.$emit(this.closeDialogEvent, { refresh: true });
|
||||||
|
if (this.isModify) {
|
||||||
|
let clusterInfo = getClusterInfo();
|
||||||
|
if (
|
||||||
|
clusterInfo &&
|
||||||
|
clusterInfo.id &&
|
||||||
|
clusterInfo.id == this.clusterInfo.id &&
|
||||||
|
clusterInfo.clusterName != data.clusterName
|
||||||
|
) {
|
||||||
|
this.switchCluster(data);
|
||||||
|
}
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
notification.error({
|
notification.error({
|
||||||
message: "error",
|
message: "error",
|
||||||
@@ -138,6 +152,9 @@ export default {
|
|||||||
this.data = [];
|
this.data = [];
|
||||||
this.$emit(this.closeDialogEvent, { refresh: false });
|
this.$emit(this.closeDialogEvent, { refresh: false });
|
||||||
},
|
},
|
||||||
|
...mapMutations({
|
||||||
|
switchCluster: CLUSTER.SWITCH,
|
||||||
|
}),
|
||||||
},
|
},
|
||||||
};
|
};
|
||||||
const defaultInfo = { clusterName: "", address: "", properties: "" };
|
const defaultInfo = { clusterName: "", address: "", properties: "" };
|
||||||
|
|||||||
@@ -49,6 +49,15 @@
|
|||||||
<label>说明:</label>
|
<label>说明:</label>
|
||||||
<span>查看正在进行副本变更/重分配的任务,或者将其取消</span>
|
<span>查看正在进行副本变更/重分配的任务,或者将其取消</span>
|
||||||
</p>
|
</p>
|
||||||
|
<p>
|
||||||
|
<a-button type="primary" @click="openReplicaReassignDialog">
|
||||||
|
副本重分配
|
||||||
|
</a-button>
|
||||||
|
<label>说明:</label>
|
||||||
|
<span
|
||||||
|
>副本所在节点重新分配,打个比方,集群有6个节点,分区1的3个副本在节点1、2、3上,现在将它们重新分配到3、4、5上</span
|
||||||
|
>
|
||||||
|
</p>
|
||||||
</a-card>
|
</a-card>
|
||||||
</div>
|
</div>
|
||||||
<!-- 隐藏数据同步相关-->
|
<!-- 隐藏数据同步相关-->
|
||||||
@@ -125,6 +134,11 @@
|
|||||||
:visible="clusterManager.showClusterInfoDialog"
|
:visible="clusterManager.showClusterInfoDialog"
|
||||||
@closeClusterInfoDialog="closeClusterInfoDialog"
|
@closeClusterInfoDialog="closeClusterInfoDialog"
|
||||||
></ClusterInfo>
|
></ClusterInfo>
|
||||||
|
<ReplicaReassign
|
||||||
|
:visible="replicationManager.showReplicaReassignDialog"
|
||||||
|
@closeReplicaReassignDialog="closeReplicaReassignDialog"
|
||||||
|
>
|
||||||
|
</ReplicaReassign>
|
||||||
</div>
|
</div>
|
||||||
</template>
|
</template>
|
||||||
|
|
||||||
@@ -138,6 +152,7 @@ import ConfigThrottle from "@/views/op/ConfigThrottle";
|
|||||||
import RemoveThrottle from "@/views/op/RemoveThrottle";
|
import RemoveThrottle from "@/views/op/RemoveThrottle";
|
||||||
import CurrentReassignments from "@/views/op/CurrentReassignments";
|
import CurrentReassignments from "@/views/op/CurrentReassignments";
|
||||||
import ClusterInfo from "@/views/op/ClusterInfo";
|
import ClusterInfo from "@/views/op/ClusterInfo";
|
||||||
|
import ReplicaReassign from "@/views/op/ReplicaReassign";
|
||||||
export default {
|
export default {
|
||||||
name: "Operation",
|
name: "Operation",
|
||||||
components: {
|
components: {
|
||||||
@@ -150,6 +165,7 @@ export default {
|
|||||||
RemoveThrottle,
|
RemoveThrottle,
|
||||||
CurrentReassignments,
|
CurrentReassignments,
|
||||||
ClusterInfo,
|
ClusterInfo,
|
||||||
|
ReplicaReassign,
|
||||||
},
|
},
|
||||||
data() {
|
data() {
|
||||||
return {
|
return {
|
||||||
@@ -162,6 +178,7 @@ export default {
|
|||||||
replicationManager: {
|
replicationManager: {
|
||||||
showElectPreferredLeaderDialog: false,
|
showElectPreferredLeaderDialog: false,
|
||||||
showCurrentReassignmentsDialog: false,
|
showCurrentReassignmentsDialog: false,
|
||||||
|
showReplicaReassignDialog: false,
|
||||||
},
|
},
|
||||||
brokerManager: {
|
brokerManager: {
|
||||||
showConfigThrottleDialog: false,
|
showConfigThrottleDialog: false,
|
||||||
@@ -227,6 +244,12 @@ export default {
|
|||||||
closeClusterInfoDialog() {
|
closeClusterInfoDialog() {
|
||||||
this.clusterManager.showClusterInfoDialog = false;
|
this.clusterManager.showClusterInfoDialog = false;
|
||||||
},
|
},
|
||||||
|
openReplicaReassignDialog() {
|
||||||
|
this.replicationManager.showReplicaReassignDialog = true;
|
||||||
|
},
|
||||||
|
closeReplicaReassignDialog() {
|
||||||
|
this.replicationManager.showReplicaReassignDialog = false;
|
||||||
|
},
|
||||||
},
|
},
|
||||||
};
|
};
|
||||||
</script>
|
</script>
|
||||||
|
|||||||
296
ui/src/views/op/ReplicaReassign.vue
Normal file
296
ui/src/views/op/ReplicaReassign.vue
Normal file
@@ -0,0 +1,296 @@
|
|||||||
|
<template>
|
||||||
|
<a-modal
|
||||||
|
title="副本重分配"
|
||||||
|
:visible="show"
|
||||||
|
:width="800"
|
||||||
|
:mask="false"
|
||||||
|
:destroyOnClose="true"
|
||||||
|
:footer="null"
|
||||||
|
:maskClosable="false"
|
||||||
|
@cancel="handleCancel"
|
||||||
|
>
|
||||||
|
<div>
|
||||||
|
<a-spin :spinning="loading">
|
||||||
|
<a-form
|
||||||
|
:form="form"
|
||||||
|
:label-col="{ span: 5 }"
|
||||||
|
:wrapper-col="{ span: 12 }"
|
||||||
|
@submit="handleSubmit"
|
||||||
|
>
|
||||||
|
<a-form-item label="Topic">
|
||||||
|
<a-select
|
||||||
|
@change="handleTopicChange"
|
||||||
|
show-search
|
||||||
|
option-filter-prop="children"
|
||||||
|
v-decorator="[
|
||||||
|
'topic',
|
||||||
|
{ rules: [{ required: true, message: '请选择一个topic!' }] },
|
||||||
|
]"
|
||||||
|
placeholder="请选择一个topic"
|
||||||
|
>
|
||||||
|
<a-select-option v-for="v in topicList" :key="v" :value="v">
|
||||||
|
{{ v }}
|
||||||
|
</a-select-option>
|
||||||
|
</a-select>
|
||||||
|
</a-form-item>
|
||||||
|
<a-form-item label="分配到Broker">
|
||||||
|
<a-select
|
||||||
|
mode="multiple"
|
||||||
|
option-filter-prop="children"
|
||||||
|
v-decorator="[
|
||||||
|
'brokers',
|
||||||
|
{
|
||||||
|
initialValue: brokers,
|
||||||
|
rules: [{ required: true, message: '请选择一个broker!' }],
|
||||||
|
},
|
||||||
|
]"
|
||||||
|
placeholder="请选择一个broker"
|
||||||
|
>
|
||||||
|
<a-select-option v-for="v in brokers" :key="v" :value="v">
|
||||||
|
<span v-if="v == -1">全部</span> <span v-else>{{ v }}</span>
|
||||||
|
</a-select-option>
|
||||||
|
</a-select>
|
||||||
|
</a-form-item>
|
||||||
|
<a-table
|
||||||
|
bordered
|
||||||
|
:columns="columns"
|
||||||
|
:data-source="currentAssignment"
|
||||||
|
:rowKey="
|
||||||
|
(record, index) => {
|
||||||
|
return index;
|
||||||
|
}
|
||||||
|
"
|
||||||
|
>
|
||||||
|
</a-table>
|
||||||
|
<a-form-item :wrapper-col="{ span: 12, offset: 5 }">
|
||||||
|
<a-button type="primary" html-type="submit">
|
||||||
|
重新生成分配计划
|
||||||
|
</a-button>
|
||||||
|
</a-form-item>
|
||||||
|
</a-form>
|
||||||
|
<hr />
|
||||||
|
<h2>新的分配计划</h2>
|
||||||
|
<a-table
|
||||||
|
bordered
|
||||||
|
:columns="columns"
|
||||||
|
:data-source="proposedAssignmentShow"
|
||||||
|
:rowKey="
|
||||||
|
(record, index) => {
|
||||||
|
return index;
|
||||||
|
}
|
||||||
|
"
|
||||||
|
>
|
||||||
|
</a-table>
|
||||||
|
<a-button type="danger" @click="updateAssignment"> 更新分配 </a-button>
|
||||||
|
</a-spin>
|
||||||
|
<hr />
|
||||||
|
<h4>注意</h4>
|
||||||
|
<ul>
|
||||||
|
<li>
|
||||||
|
副本重分配,可以将副本分配到其它broker上,通过选择上面的broker节点,根据这几个节点生成分配方案
|
||||||
|
</li>
|
||||||
|
<li>
|
||||||
|
选择的broker的节点数量不能少于当前的副本数,比如有3个副本,至少需要3个broker节点
|
||||||
|
</li>
|
||||||
|
<li>
|
||||||
|
数据量太大,考虑设置一下限流,毕竟重新分配后,不同broker之间可能做数据迁移
|
||||||
|
</li>
|
||||||
|
</ul>
|
||||||
|
</div>
|
||||||
|
</a-modal>
|
||||||
|
</template>
|
||||||
|
|
||||||
|
<script>
|
||||||
|
import request from "@/utils/request";
|
||||||
|
import { KafkaTopicApi, KafkaOpApi, KafkaClusterApi } from "@/utils/api";
|
||||||
|
import notification from "ant-design-vue/es/notification";
|
||||||
|
export default {
|
||||||
|
name: "ReplicaReassign",
|
||||||
|
props: {
|
||||||
|
visible: {
|
||||||
|
type: Boolean,
|
||||||
|
default: false,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
data() {
|
||||||
|
return {
|
||||||
|
show: this.visible,
|
||||||
|
data: [],
|
||||||
|
loading: false,
|
||||||
|
form: this.$form.createForm(this, { name: "ReplicaReassignForm" }),
|
||||||
|
topicList: [],
|
||||||
|
partitions: [],
|
||||||
|
brokers: [],
|
||||||
|
currentAssignment: [],
|
||||||
|
proposedAssignment: [],
|
||||||
|
proposedAssignmentShow: [],
|
||||||
|
columns,
|
||||||
|
};
|
||||||
|
},
|
||||||
|
watch: {
|
||||||
|
visible(v) {
|
||||||
|
this.show = v;
|
||||||
|
if (this.show) {
|
||||||
|
this.clearData();
|
||||||
|
this.getTopicNameList();
|
||||||
|
this.getClusterInfo();
|
||||||
|
}
|
||||||
|
},
|
||||||
|
},
|
||||||
|
methods: {
|
||||||
|
handleSubmit(e) {
|
||||||
|
e.preventDefault();
|
||||||
|
this.form.validateFields((err, values) => {
|
||||||
|
if (!err) {
|
||||||
|
this.getProposedAssignment(values);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
},
|
||||||
|
getTopicReplicaInfo(topic) {
|
||||||
|
this.loading = true;
|
||||||
|
request({
|
||||||
|
url: KafkaTopicApi.getCurrentReplicaAssignment.url + "?topic=" + topic,
|
||||||
|
method: KafkaTopicApi.getCurrentReplicaAssignment.method,
|
||||||
|
}).then((res) => {
|
||||||
|
this.loading = false;
|
||||||
|
if (res.code == 0) {
|
||||||
|
this.currentAssignment = res.data.partitions;
|
||||||
|
this.currentAssignment.forEach(
|
||||||
|
(e) => (e.replicas = e.replicas.join(","))
|
||||||
|
);
|
||||||
|
} else {
|
||||||
|
notification.error({
|
||||||
|
message: "error",
|
||||||
|
description: res.msg,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
});
|
||||||
|
},
|
||||||
|
getTopicNameList() {
|
||||||
|
request({
|
||||||
|
url: KafkaTopicApi.getTopicNameList.url,
|
||||||
|
method: KafkaTopicApi.getTopicNameList.method,
|
||||||
|
}).then((res) => {
|
||||||
|
if (res.code == 0) {
|
||||||
|
this.topicList = res.data;
|
||||||
|
} else {
|
||||||
|
notification.error({
|
||||||
|
message: "error",
|
||||||
|
description: res.msg,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
});
|
||||||
|
},
|
||||||
|
getPartitionInfo(topic) {
|
||||||
|
this.loading = true;
|
||||||
|
request({
|
||||||
|
url: KafkaTopicApi.getPartitionInfo.url + "?topic=" + topic,
|
||||||
|
method: KafkaTopicApi.getPartitionInfo.method,
|
||||||
|
}).then((res) => {
|
||||||
|
this.loading = false;
|
||||||
|
if (res.code != 0) {
|
||||||
|
notification.error({
|
||||||
|
message: "error",
|
||||||
|
description: res.msg,
|
||||||
|
});
|
||||||
|
} else {
|
||||||
|
this.partitions = res.data.map((v) => v.partition);
|
||||||
|
this.partitions.splice(0, 0, -1);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
},
|
||||||
|
handleTopicChange(topic) {
|
||||||
|
// this.getPartitionInfo(topic);
|
||||||
|
this.clearData();
|
||||||
|
this.getTopicReplicaInfo(topic);
|
||||||
|
},
|
||||||
|
getClusterInfo() {
|
||||||
|
this.loading = true;
|
||||||
|
request({
|
||||||
|
url: KafkaClusterApi.getClusterInfo.url,
|
||||||
|
method: KafkaClusterApi.getClusterInfo.method,
|
||||||
|
}).then((res) => {
|
||||||
|
this.loading = false;
|
||||||
|
this.brokers = [];
|
||||||
|
res.data.nodes.forEach((node) => this.brokers.push(node.id));
|
||||||
|
});
|
||||||
|
},
|
||||||
|
getProposedAssignment(params) {
|
||||||
|
this.loading = true;
|
||||||
|
request({
|
||||||
|
url: KafkaOpApi.proposedAssignment.url,
|
||||||
|
method: KafkaOpApi.proposedAssignment.method,
|
||||||
|
data: params,
|
||||||
|
}).then((res) => {
|
||||||
|
this.loading = false;
|
||||||
|
if (res.code != 0) {
|
||||||
|
notification.error({
|
||||||
|
message: "error",
|
||||||
|
description: res.msg,
|
||||||
|
});
|
||||||
|
} else {
|
||||||
|
this.proposedAssignmentShow = res.data;
|
||||||
|
this.proposedAssignment = JSON.parse(
|
||||||
|
JSON.stringify(this.proposedAssignmentShow)
|
||||||
|
);
|
||||||
|
this.proposedAssignmentShow.forEach(
|
||||||
|
(e) => (e.replicas = e.replicas.join(","))
|
||||||
|
);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
},
|
||||||
|
clearData() {
|
||||||
|
this.currentAssignment = [];
|
||||||
|
this.proposedAssignment = [];
|
||||||
|
this.proposedAssignmentShow = [];
|
||||||
|
},
|
||||||
|
handleCancel() {
|
||||||
|
this.data = [];
|
||||||
|
this.$emit("closeReplicaReassignDialog", { refresh: false });
|
||||||
|
},
|
||||||
|
updateAssignment() {
|
||||||
|
this.form.validateFields((err, values) => {
|
||||||
|
if (!err) {
|
||||||
|
if (this.proposedAssignment.length == 0) {
|
||||||
|
this.$message.warn("请先生成分配计划!");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
this.loading = true;
|
||||||
|
request({
|
||||||
|
url: KafkaTopicApi.updateReplicaAssignment.url,
|
||||||
|
method: KafkaTopicApi.updateReplicaAssignment.method,
|
||||||
|
data: { partitions: this.proposedAssignment },
|
||||||
|
}).then((res) => {
|
||||||
|
this.loading = false;
|
||||||
|
if (res.code == 0) {
|
||||||
|
this.$message.success(res.msg);
|
||||||
|
this.handleTopicChange(values.topic);
|
||||||
|
} else {
|
||||||
|
notification.error({
|
||||||
|
message: "error",
|
||||||
|
description: res.msg,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
});
|
||||||
|
},
|
||||||
|
},
|
||||||
|
};
|
||||||
|
|
||||||
|
const columns = [
|
||||||
|
{
|
||||||
|
title: "分区",
|
||||||
|
dataIndex: "partition",
|
||||||
|
key: "partition",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
title: "副本所在broker",
|
||||||
|
dataIndex: "replicas",
|
||||||
|
key: "replicas",
|
||||||
|
scopedSlots: { customRender: "replicas" },
|
||||||
|
},
|
||||||
|
];
|
||||||
|
</script>
|
||||||
|
|
||||||
|
<style scoped></style>
|
||||||
Reference in New Issue
Block a user