36 Commits

Author SHA1 Message Date
许晓东
e9f34e1d19 支持批量删除topic. 2022-07-24 11:52:48 +08:00
许晓东
ccdcebb24d 更新icon. 2022-07-24 11:09:25 +08:00
许晓东
7ddd75e34f 更新icon. 2022-07-24 11:09:21 +08:00
许晓东
aebea435fa 更新概览. 2022-07-14 21:59:25 +08:00
许晓东
ea788313c6 更新概览. 2022-07-14 21:59:18 +08:00
许晓东
727edfcca8 支持缓存生产者连接,缓存连接默认关闭 2022-07-09 18:54:20 +08:00
Xiaodong Xu
cc1989a74b Merge pull request #17 from comdotwww/main
解决 Windows 操作系统下 CMD 路径转义的问题
2022-07-07 22:46:39 +08:00
comdotwww
0196a90b69 Update start.bat 2022-07-07 22:05:46 +08:00
许晓东
9c3e3988e0 consumer连接属性处理、联系更新 2022-07-07 20:09:27 +08:00
许晓东
458e13c9e0 缓存连接 2022-07-05 10:19:51 +08:00
许晓东
979859b232 支持在线删除消息 2022-07-04 17:16:00 +08:00
许晓东
b163e5f776 升级kafka版本从2.8.0 -> 3.2.0,增加DockerCompose部署说明 2022-06-30 20:11:29 +08:00
Xiaodong Xu
d062e18940 Merge pull request #16 from wdkang123/main
new(md): Docker DockerCompose部署方式
2022-06-30 19:42:14 +08:00
武子康
87c1e7ba4a new(md): Docker DockerCompose部署方式 2022-06-30 19:12:42 +08:00
许晓东
5194c952f2 polish README. 2022-06-29 19:17:21 +08:00
许晓东
c1cc44d32f 修复集群无活跃节点时NPE,更新README. 2022-06-29 17:22:29 +08:00
许晓东
82fafe980d 修复集群无活跃节点时NPE,更新README. 2022-06-29 17:20:57 +08:00
许晓东
34752deca2 update wechat contact. 2022-06-17 10:10:00 +08:00
yinuo
9e42e2c72a 更新联系方式 2022-05-06 11:02:28 +08:00
dongyinuo
e531f5d786 Delete weixin_contact.jpeg 2022-05-06 11:00:56 +08:00
dongyinuo
10e75ac55d Update README.md
更新联系方式
2022-05-06 10:58:55 +08:00
yinuo
4a8d09dc89 更新联系方式 2022-05-06 10:56:22 +08:00
dongyinuo
116bc100a7 Add files via upload
替换微信群图片
2022-05-06 10:48:00 +08:00
Xiaodong Xu
b1feaad9f7 Merge pull request #14 from dongyinuo/feature/dongyinuo/add/contact
Feature/dongyinuo/add/contact
2022-04-29 17:27:19 +08:00
yinuo
4d372f8374 添加联系方式 2022-04-29 17:22:44 +08:00
yinuo
4b2c544c0d 添加联系方式 2022-04-29 17:21:32 +08:00
许晓东
8131cb1a42 发布1.0.4安装包下载地址 2022-02-16 20:01:06 +08:00
许晓东
1dd6466261 副本重分配 2022-02-16 19:50:35 +08:00
许晓东
dda08a2152 副本重分配-》生成分配计划 2022-02-15 20:13:07 +08:00
许晓东
01c7121ee4 集群节点列表有序 2022-01-22 23:33:13 +08:00
许晓东
d939d7653c 主页展示Broker API的版本兼容信息 2022-01-22 23:07:41 +08:00
许晓东
058cd5a24e 查询当前重分配,版本不支持异常处理 2022-01-20 13:44:37 +08:00
许晓东
db3f55ac4a polish README 2022-01-19 19:05:00 +08:00
许晓东
a311a34537 分区比较栈溢出bug修复 2022-01-18 20:42:11 +08:00
许晓东
e8fe2ea1c7 集群名称支持中文,消息查询可选择时间展示顺序 2022-01-13 14:19:17 +08:00
许晓东
10302dd39c v1.0.3安装包下载地址 2022-01-09 23:57:00 +08:00
53 changed files with 1841 additions and 124 deletions

View File

@@ -1,13 +1,14 @@
# kafka可视化管理平台
一款轻量级的kafka可视化管理平台安装配置快捷、简单易用。
为了开发的省事,没有国际化支持,只支持中文展示。
为了开发的省事,没有国际化支持,页面只支持中文展示。
用过rocketmq-console吧前端展示风格跟那个有点类似。
## 页面预览
如果github能查看图片的话可以点击[查看菜单页面](./document/overview/概览.md),查看每个页面的样子
## 集群迁移支持说明
当前主分支及日后版本不再提供消息同步、集群迁移的解决方案,如有需要,查看:[集群迁移说明](./document/datasync/集群迁移.md)
## ACL说明
acl配置说明如果kafka集群启用了ACL但是控制台没看到Acl菜单可以查看[Acl配置启用说明](./document/acl/Acl.md)
## 功能支持
* 多集群支持
* 集群信息
@@ -21,7 +22,9 @@
![功能特性](./document/img/功能特性.png)
## 安装包下载
点击下载(v1.0.2版本)[kafka-console-ui.zip](https://github.com/xxd763795151/kafka-console-ui/releases/download/v1.0.2/kafka-console-ui.zip)
点击下载(v1.0.4版本)[kafka-console-ui.zip](https://github.com/xxd763795151/kafka-console-ui/releases/download/v1.0.4/kafka-console-ui.zip)
如果安装包下载的比较慢可以查看下面的源码打包说明把代码下载下来快速打包不过最新main分支代码刚升级了kafka版本到3.2.0,还没有充分测试,如果需要稳定版本,可以下载 1.0.4-release分支代码
## 快速使用
### Windows
@@ -60,7 +63,7 @@ sh bin/shutdown.sh
在新增集群的时候除了集群地址还可以输入集群的其它属性配置比如请求超时ACL配置等。如果开启了ACL切换到该集群的时候导航栏上便会出现ACL菜单支持进行相关操作目前是基于SASL_SCRAM认证授权管理支持的最完善其它的我也没验证过虽然是我开发的但是我也没具体全部验证这一块功能授权部分应该是通用的
## kafka版本
* 当前使用的kafka 2.8.0
* 当前使用的kafka 3.2.0
## 监控
仅提供运维管理功能监控、告警需要配合其它组件如有需要建议请查看https://blog.csdn.net/x763795151/article/details/119705372
@@ -69,3 +72,22 @@ sh bin/shutdown.sh
## 本地开发
如果需要本地开发,开发环境配置查看:[本地开发](./document/develop/开发配置.md)
## 登录认证和权限
目前主分支不支持登录认证,感谢@dongyinuo 同学开发了一版支持登录认证,及相关的按钮权限(主要有两个角色:管理员和普通开发人员)。
在分支feature/dongyinuo/20220501/devops 上。
如果有需要使用管理台登录认证的,可以切换到这个分支上进行打包,打包方式看 源码打包 说明。
默认登录账户admin/kafka-console-ui521
## DockerCompose部署
感谢@wdkang123 同学分享的部署方式,如果有需要请查看[DockerCompose部署方式](./document/deploy/docker部署.md)
## 联系方式
+ 微信群
<img src="./document/contact/weixin_contact.jpg" width="40%"/>
[//]: # (<img src="https://github.com/xxd763795151/kafka-console-ui/blob/main/document/contact/weixin_contact.jpeg" width="40%"/>)
+ 若联系方式失效, 请联系加一下微信, 说明意图
- xxd763795151
- wxid_7jy2ezljvebt12

View File

@@ -5,4 +5,4 @@ set JAVA_OPTS=-Xmx512m -Xms512m -Xmn256m -Xss256k
set CONFIG_FILE=../config/application.yml
set TARGET=../lib/kafka-console-ui.jar
set DATA_DIR=..
%JAVA_CMD% -jar %TARGET% --spring.config.location=%CONFIG_FILE% --data.dir=%DATA_DIR%
"%JAVA_CMD%" -jar %TARGET% --spring.config.location=%CONFIG_FILE% --data.dir=%DATA_DIR%

36
document/acl/Acl.md Normal file
View File

@@ -0,0 +1,36 @@
# Acl配置启用说明
## 前言
可能有的同学是看了这篇文章来的:[如何通过可视化方式快捷管理kafka的acl配置](https://blog.csdn.net/x763795151/article/details/120200119)
这篇文章里可能说了是通过修改配置文件application.yml的方式来启用ACL示例如下
```yaml
kafka:
config:
# kafka broker地址多个以逗号分隔
bootstrap-server: 'localhost:9092'
# 服务端是否启用acl如果不启用下面的几项都忽略即可
enable-acl: true
# 只支持2种安全协议SASL_PLAINTEXT和PLAINTEXT启用acl则设置为SASL_PLAINTEXT不启用acl不需关心这个配置
security-protocol: SASL_PLAINTEXT
sasl-mechanism: SCRAM-SHA-256
# 超级管理员用户名在broker上已经配置为超级管理员
admin-username: admin
# 超级管理员密码
admin-password: admin
# 启动自动创建配置的超级管理员用户
admin-create: true
# broker连接的zk地址
zookeeper-addr: localhost:2181
sasl-jaas-config: org.apache.kafka.common.security.scram.ScramLoginModule required username="${kafka.config.admin-username}" password="${kafka.config.admin-password}";
```
其中说明了kafka.config.enable-acl配置项需要为true。
注意:**现在不再支持这种方式了**
## 新版本说明
因为现在支持多集群配置,关于多集群配置,可以看主页说明的 配置集群 介绍。
所以这里把这些额外的配置项都去掉了。
如果启用了ACL在页面上新增集群的时候在属性里配置集群的ACL相关信息如下![新增集群](./新增集群.png)
如果控制台检测到属性里有ACL相关属性配置切换到这个集群后ACL菜单会自动出现的。
注意只支持SASL。

View File

Binary file not shown.

After

Width:  |  Height:  |  Size: 245 KiB

View File

Binary file not shown.

After

Width:  |  Height:  |  Size: 205 KiB

View File

@@ -0,0 +1,189 @@
# Docker/DockerCompose部署
# 1.快速上手
## 1.1 镜像拉取
```shell
docker pull wdkang/kafka-console-ui
```
## 1.2 查看镜像
```shell
docker images
```
## 1.3 启动服务
由于Docker内不会对数据进行持久化 所以这里推荐将数据目录映射到实体机中
详见 **2.数据持久**
```shell
docker run -d -p 7766:7766 wdkang/kafka-console-ui
```
## 1.4 查看状态
```shell
docker ps -a
```
## 1.5 查看日志
```shell
docker logs -f ${containerId}
```
## 1.6 访问服务
```shell
http://localhost:7766
```
# 2. 数据持久
推荐对数据进行持久化
## 2.1 新建目录
```shell
mkdir -p /home/kafka-console-ui/data /home/kafka-console-ui/log
cd /home/kafka-console-ui
```
## 2.2 启动服务
```shell
docker run -d -p 7766:7766 -v $PWD/data:/app/data -v $PWD/log:/app/log wdkang/kafka-console-ui
```
# 3.自主打包
## 3.1 构建镜像
**前置需求**
(可根据自身情况修改Dockerfile)
下载[kafka-console-ui.zip](https://github.com/xxd763795151/kafka-console-ui/releases)包
解压后 将Dockerfile放入文件夹的根目录
**Dockefile**
```dockerfile
# jdk
FROM openjdk:8-jdk-alpine
# label
LABEL by="https://github.com/xxd763795151/kafka-console-ui"
# root
RUN mkdir -p /app && cd /app
WORKDIR /app
# config log data
RUN mkdir -p /app/config && mkdir -p /app/log && mkdir -p /app/data && mkdir -p /app/lib
# add file
ADD ./lib/kafka-console-ui.jar /app/lib
ADD ./config /app/config
# port
EXPOSE 7766
# start server
CMD java -jar -Xmx512m -Xms512m -Xmn256m -Xss256k /app/lib/kafka-console-ui.jar --spring.config.location="/app/config/" --logging.home="/app/log" --data.dir="/app/data"
```
**进行打包**
在文件夹根目录下
(注意末尾有个点)
```shell
docker build -t ${your_docker_hub_addr} .
```
## 3.2 上传镜像
```shell
docker push ${your_docker_hub_addr}
```
# 4.容器编排
```dockerfile
# docker-compose 编排
version: '3'
services:
# 服务名
kafka-console-ui:
# 容器名
container_name: "kafka-console-ui"
# 端口
ports:
- "7766:7766"
# 持久化
volumes:
- ./data:/app/data
- ./log:/app/log
# 防止读写文件有问题
privileged: true
user: root
# 镜像地址
image: "wdkang/kafka-console-ui"
```
## 4.1 拉取镜像
```shell
docker-compose pull kafka-console-ui
```
## 4.2 构建启动
```shell
docker-compose up --detach --build kafka-console-ui
```
## 4.3 查看状态
```shell
docker-compose ps -a
```
## 4.3 停止服务
```shell
docker-compose down
```

View File

@@ -12,8 +12,11 @@
* scala 2.13
* maven >=3.6+
* webstorm
* Node
除了webstorm是开发前端的ide可以根据自己需要代替jdk scala是必须有的。
开发的时候我本地用的node版本是v14.16.0下载目录https://nodejs.org/download/release/v14.16.0/ . 过高或过低版本是否适用,我也没测试过。
scala 2.13下载地址在这个页面最下面https://www.scala-lang.org/download/scala2.html
## 克隆代码
@@ -21,7 +24,8 @@ scala 2.13下载地址在这个页面最下面https://www.scala-lang.org/d
## 后端配置
1. 用idea打开项目
2. 打开idea的Project Structure(Settings) -> Modules -> 设置src/main/scala为Sources因为约定src/main/java是源码目录所以这里要再加一个源码目录
3. 打开idea的Project Structure(Settings) -> Libraries 添加scala sdk然后选择本地下载的scala 2.13的目录确定添加进来如果使用的idea可以直接勾选也可以不用先下载到本地
3. 打开idea的Settings -> plugins 搜索scala plugin并安装然后应该是要重启idea生效这一步必须在第4步之前
4. 打开idea的Project Structure(Settings) -> Libraries 添加scala sdk然后选择本地下载的scala 2.13的目录确定添加进来如果使用的idea可以直接勾选也可以不用先下载到本地
## 前端
前端代码在工程的ui目录下找个前端开发的ide如web storm打开进行开发即可。

View File

Binary file not shown.

After

Width:  |  Height:  |  Size: 99 KiB

View File

Binary file not shown.

Before

Width:  |  Height:  |  Size: 99 KiB

After

Width:  |  Height:  |  Size: 103 KiB

View File

@@ -29,4 +29,6 @@ package.bat
cd kafka-console-ui
# linux或mac执行
sh package.sh
```
```
打包完成会在target目录下生成一个kafka-console-ui.zip的安装包

16
pom.xml
View File

@@ -10,7 +10,7 @@
</parent>
<groupId>com.xuxd</groupId>
<artifactId>kafka-console-ui</artifactId>
<version>1.0.3</version>
<version>1.0.5</version>
<name>kafka-console-ui</name>
<description>Kafka console manage ui</description>
<properties>
@@ -21,7 +21,7 @@
<ui.path>${project.basedir}/ui</ui.path>
<frontend-maven-plugin.version>1.11.0</frontend-maven-plugin.version>
<compiler.version>1.8</compiler.version>
<kafka.version>2.8.0</kafka.version>
<kafka.version>3.2.0</kafka.version>
<maven.assembly.plugin.version>3.0.0</maven.assembly.plugin.version>
<mybatis-plus-boot-starter.version>3.4.2</mybatis-plus-boot-starter.version>
<scala.version>2.13.6</scala.version>
@@ -76,6 +76,18 @@
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.13</artifactId>
<version>${kafka.version}</version>
<exclusions>
<exclusion>
<groupId>com.typesafe.scala-logging</groupId>
<artifactId>scala-logging_2.13</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.typesafe.scala-logging</groupId>
<artifactId>scala-logging_2.13</artifactId>
<version>3.9.2</version>
</dependency>
<dependency>

View File

@@ -1,18 +1,15 @@
package com.xuxd.kafka.console.beans;
import java.util.Objects;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.common.acl.AccessControlEntry;
import org.apache.kafka.common.acl.AccessControlEntryFilter;
import org.apache.kafka.common.acl.AclBinding;
import org.apache.kafka.common.acl.AclBindingFilter;
import org.apache.kafka.common.acl.AclOperation;
import org.apache.kafka.common.acl.AclPermissionType;
import org.apache.kafka.common.acl.*;
import org.apache.kafka.common.resource.PatternType;
import org.apache.kafka.common.resource.ResourcePattern;
import org.apache.kafka.common.resource.ResourcePatternFilter;
import org.apache.kafka.common.resource.ResourceType;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.common.utils.SecurityUtils;
import java.util.Objects;
/**
* kafka-console-ui.
@@ -41,7 +38,9 @@ public class AclEntry {
entry.setResourceType(binding.pattern().resourceType().name());
entry.setName(binding.pattern().name());
entry.setPatternType(binding.pattern().patternType().name());
entry.setPrincipal(KafkaPrincipal.fromString(binding.entry().principal()).getName());
// entry.setPrincipal(KafkaPrincipal.fromString(binding.entry().principal()).getName());
// 3.x版本使用该方法
entry.setPrincipal(SecurityUtils.parseKafkaPrincipal(binding.entry().principal()).getName());
entry.setHost(binding.entry().host());
entry.setOperation(binding.entry().operation().name());
entry.setPermissionType(binding.entry().permissionType().name());

View File

@@ -8,7 +8,7 @@ import org.apache.kafka.common.Node;
* @author xuxd
* @date 2021-10-08 14:03:21
**/
public class BrokerNode {
public class BrokerNode implements Comparable{
private int id;
@@ -80,4 +80,8 @@ public class BrokerNode {
public void setController(boolean controller) {
isController = controller;
}
@Override public int compareTo(Object o) {
return this.id - ((BrokerNode)o).id;
}
}

View File

@@ -21,7 +21,7 @@ public class TopicPartition implements Comparable {
}
TopicPartition other = (TopicPartition) o;
if (!this.topic.equals(other.getTopic())) {
return this.compareTo(other);
return this.topic.compareTo(other.topic);
}
return this.partition - other.partition;

View File

@@ -0,0 +1,18 @@
package com.xuxd.kafka.console.beans.dto;
import java.util.List;
import lombok.Data;
/**
* kafka-console-ui.
*
* @author xuxd
* @date 2022-02-15 19:08:13
**/
@Data
public class ProposedAssignmentDTO {
private String topic;
private List<Integer> brokers;
}

View File

@@ -0,0 +1,24 @@
package com.xuxd.kafka.console.beans.vo;
import java.util.List;
import lombok.Data;
/**
* kafka-console-ui.
*
* @author xuxd
* @date 2022-01-22 16:24:58
**/
@Data
public class BrokerApiVersionVO {
private int brokerId;
private String host;
private int supportNums;
private int unSupportNums;
private List<String> versionInfo;
}

View File

@@ -0,0 +1,33 @@
package com.xuxd.kafka.console.cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.cache.RemovalListener;
import kafka.console.KafkaConsole;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
public class TimeBasedCache<K, V> {
private LoadingCache<K, V> cache;
private KafkaConsole console;
public TimeBasedCache(CacheLoader<K, V> loader, RemovalListener<K, V> listener) {
cache = CacheBuilder.newBuilder()
.maximumSize(50) // maximum 100 records can be cached
.expireAfterAccess(30, TimeUnit.MINUTES) // cache will expire after 30 minutes of access
.removalListener(listener)
.build(loader);
}
public V get(K k) {
try {
return cache.get(k);
} catch (ExecutionException e) {
throw new RuntimeException("Get connection from cache error.", e);
}
}
}

View File

@@ -20,6 +20,12 @@ public class KafkaConfig {
private Properties properties;
private boolean cacheAdminConnection;
private boolean cacheProducerConnection;
private boolean cacheConsumerConnection;
public String getBootstrapServer() {
return bootstrapServer;
}
@@ -43,4 +49,28 @@ public class KafkaConfig {
public void setProperties(Properties properties) {
this.properties = properties;
}
public boolean isCacheAdminConnection() {
return cacheAdminConnection;
}
public void setCacheAdminConnection(boolean cacheAdminConnection) {
this.cacheAdminConnection = cacheAdminConnection;
}
public boolean isCacheProducerConnection() {
return cacheProducerConnection;
}
public void setCacheProducerConnection(boolean cacheProducerConnection) {
this.cacheProducerConnection = cacheProducerConnection;
}
public boolean isCacheConsumerConnection() {
return cacheConsumerConnection;
}
public void setCacheConsumerConnection(boolean cacheConsumerConnection) {
this.cacheConsumerConnection = cacheConsumerConnection;
}
}

View File

@@ -53,4 +53,9 @@ public class ClusterController {
public Object peekClusterInfo() {
return clusterService.peekClusterInfo();
}
@GetMapping("/info/api/version")
public Object getBrokerApiVersionInfo() {
return clusterService.getBrokerApiVersionInfo();
}
}

View File

@@ -1,14 +1,15 @@
package com.xuxd.kafka.console.controller;
import com.xuxd.kafka.console.beans.QueryMessage;
import com.xuxd.kafka.console.beans.ResponseData;
import com.xuxd.kafka.console.beans.SendMessage;
import com.xuxd.kafka.console.beans.dto.QueryMessageDTO;
import com.xuxd.kafka.console.service.MessageService;
import org.apache.commons.collections.CollectionUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.bind.annotation.*;
import java.util.List;
/**
* kafka-console-ui.
@@ -52,4 +53,12 @@ public class MessageController {
public Object resend(@RequestBody SendMessage message) {
return messageService.resend(message);
}
@DeleteMapping
public Object delete(@RequestBody List<QueryMessage> messages) {
if (CollectionUtils.isEmpty(messages)) {
return ResponseData.create().failed("params is null");
}
return messageService.delete(messages);
}
}

View File

@@ -2,6 +2,7 @@ package com.xuxd.kafka.console.controller;
import com.xuxd.kafka.console.beans.TopicPartition;
import com.xuxd.kafka.console.beans.dto.BrokerThrottleDTO;
import com.xuxd.kafka.console.beans.dto.ProposedAssignmentDTO;
import com.xuxd.kafka.console.beans.dto.ReplicationDTO;
import com.xuxd.kafka.console.beans.dto.SyncDataDTO;
import com.xuxd.kafka.console.service.OperationService;
@@ -74,4 +75,9 @@ public class OperationController {
public Object cancelReassignment(@RequestBody TopicPartition partition) {
return operationService.cancelReassignment(new org.apache.kafka.common.TopicPartition(partition.getTopic(), partition.getPartition()));
}
@PostMapping("/replication/reassignments/proposed")
public Object proposedAssignments(@RequestBody ProposedAssignmentDTO dto) {
return operationService.proposedAssignments(dto.getTopic(), dto.getBrokers());
}
}

View File

@@ -43,8 +43,8 @@ public class TopicController {
}
@DeleteMapping
public Object deleteTopic(@RequestParam String topic) {
return topicService.deleteTopic(topic);
public Object deleteTopic(@RequestBody List<String> topics) {
return topicService.deleteTopics(topics);
}
@GetMapping("/partition")

View File

@@ -21,4 +21,6 @@ public interface ClusterService {
ResponseData updateClusterInfo(ClusterInfoDO infoDO);
ResponseData peekClusterInfo();
ResponseData getBrokerApiVersionInfo();
}

View File

@@ -4,6 +4,8 @@ import com.xuxd.kafka.console.beans.QueryMessage;
import com.xuxd.kafka.console.beans.ResponseData;
import com.xuxd.kafka.console.beans.SendMessage;
import java.util.List;
/**
* kafka-console-ui.
*
@@ -23,4 +25,6 @@ public interface MessageService {
ResponseData send(SendMessage message);
ResponseData resend(SendMessage message);
ResponseData delete(List<QueryMessage> messages);
}

View File

@@ -30,4 +30,6 @@ public interface OperationService {
ResponseData currentReassignments();
ResponseData cancelReassignment(TopicPartition partition);
ResponseData proposedAssignments(String topic, List<Integer> brokerList);
}

View File

@@ -4,6 +4,8 @@ import com.xuxd.kafka.console.beans.ReplicaAssignment;
import com.xuxd.kafka.console.beans.ResponseData;
import com.xuxd.kafka.console.beans.enums.TopicThrottleSwitch;
import com.xuxd.kafka.console.beans.enums.TopicType;
import java.util.Collection;
import java.util.List;
import org.apache.kafka.clients.admin.NewTopic;
@@ -19,7 +21,7 @@ public interface TopicService {
ResponseData getTopicList(String topic, TopicType type);
ResponseData deleteTopic(String topic);
ResponseData deleteTopics(Collection<String> topics);
ResponseData getTopicPartitionInfo(String topic);

View File

@@ -1,15 +1,23 @@
package com.xuxd.kafka.console.service.impl;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.xuxd.kafka.console.beans.BrokerNode;
import com.xuxd.kafka.console.beans.ClusterInfo;
import com.xuxd.kafka.console.beans.ResponseData;
import com.xuxd.kafka.console.beans.dos.ClusterInfoDO;
import com.xuxd.kafka.console.beans.vo.BrokerApiVersionVO;
import com.xuxd.kafka.console.beans.vo.ClusterInfoVO;
import com.xuxd.kafka.console.dao.ClusterInfoMapper;
import com.xuxd.kafka.console.service.ClusterService;
import java.util.List;
import java.util.*;
import java.util.stream.Collectors;
import kafka.console.ClusterConsole;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.NodeApiVersions;
import org.apache.kafka.common.Node;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.stereotype.Service;
@@ -19,6 +27,7 @@ import org.springframework.stereotype.Service;
* @author xuxd
* @date 2021-10-08 14:23:09
**/
@Slf4j
@Service
public class ClusterServiceImpl implements ClusterService {
@@ -33,7 +42,14 @@ public class ClusterServiceImpl implements ClusterService {
}
@Override public ResponseData getClusterInfo() {
return ResponseData.create().data(clusterConsole.clusterInfo()).success();
ClusterInfo clusterInfo = clusterConsole.clusterInfo();
Set<BrokerNode> nodes = clusterInfo.getNodes();
if (nodes == null) {
log.error("集群节点信息为空,集群地址可能不正确或集群内没有活跃节点");
return ResponseData.create().failed("集群节点信息为空,集群地址可能不正确或集群内没有活跃节点");
}
clusterInfo.setNodes(new TreeSet<>(nodes));
return ResponseData.create().data(clusterInfo).success();
}
@Override public ResponseData getClusterInfoList() {
@@ -69,4 +85,29 @@ public class ClusterServiceImpl implements ClusterService {
return ResponseData.create().data(dos.stream().findFirst().map(ClusterInfoVO::from)).success();
}
@Override public ResponseData getBrokerApiVersionInfo() {
HashMap<Node, NodeApiVersions> map = clusterConsole.listBrokerVersionInfo();
List<BrokerApiVersionVO> list = new ArrayList<>(map.size());
map.forEach(((node, versions) -> {
BrokerApiVersionVO vo = new BrokerApiVersionVO();
vo.setBrokerId(node.id());
vo.setHost(node.host() + ":" + node.port());
vo.setSupportNums(versions.allSupportedApiVersions().size());
String versionInfo = versions.toString(true);
int from = 0;
int count = 0;
int index = -1;
while ((index = versionInfo.indexOf("UNSUPPORTED", from)) >= 0 && from < versionInfo.length()) {
count++;
from = index + 1;
}
vo.setUnSupportNums(count);
versionInfo = versionInfo.substring(1, versionInfo.length() - 2);
vo.setVersionInfo(Arrays.asList(StringUtils.split(versionInfo, ",")));
list.add(vo);
}));
Collections.sort(list, Comparator.comparingInt(BrokerApiVersionVO::getBrokerId));
return ResponseData.create().data(list).success();
}
}

View File

@@ -24,6 +24,7 @@ import kafka.console.TopicConsole;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.admin.RecordsToDelete;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerRecord;
@@ -242,6 +243,18 @@ public class MessageServiceImpl implements MessageService, ApplicationContextAwa
return success ? ResponseData.create().success("success: " + tuple2._2()) : ResponseData.create().failed(tuple2._2());
}
@Override
public ResponseData delete(List<QueryMessage> messages) {
Map<TopicPartition, RecordsToDelete> params = new HashMap<>(messages.size(), 1f);
messages.forEach(message -> {
params.put(new TopicPartition(message.getTopic(), message.getPartition()), RecordsToDelete.beforeOffset(message.getOffset()));
});
Tuple2<Object, String> tuple2 = messageConsole.delete(params);
boolean success = (boolean) tuple2._1();
return success ? ResponseData.create().success() : ResponseData.create().failed(tuple2._2());
}
private Map<TopicPartition, ConsumerRecord<byte[], byte[]>> searchRecordByOffset(QueryMessage queryMessage) {
Set<TopicPartition> partitions = getPartitions(queryMessage);

View File

@@ -1,6 +1,7 @@
package com.xuxd.kafka.console.service.impl;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.google.common.collect.Lists;
import com.google.gson.Gson;
import com.google.gson.JsonObject;
import com.xuxd.kafka.console.beans.ResponseData;
@@ -10,6 +11,7 @@ import com.xuxd.kafka.console.beans.vo.OffsetAlignmentVO;
import com.xuxd.kafka.console.dao.MinOffsetAlignmentMapper;
import com.xuxd.kafka.console.service.OperationService;
import com.xuxd.kafka.console.utils.GsonUtil;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@@ -19,6 +21,7 @@ import java.util.Properties;
import java.util.Set;
import java.util.stream.Collectors;
import kafka.console.OperationConsole;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.admin.PartitionReassignment;
import org.apache.kafka.common.TopicPartition;
import org.springframework.beans.factory.ObjectProvider;
@@ -162,4 +165,21 @@ public class OperationServiceImpl implements OperationService {
}
return ResponseData.create().success();
}
@Override public ResponseData proposedAssignments(String topic, List<Integer> brokerList) {
Map<String, Object> params = new HashMap<>();
params.put("version", 1);
Map<String, String> topicMap = new HashMap<>(1, 1.0f);
topicMap.put("topic", topic);
params.put("topics", Lists.newArrayList(topicMap));
List<String> list = brokerList.stream().map(String::valueOf).collect(Collectors.toList());
Map<TopicPartition, List<Object>> assignments = operationConsole.proposedAssignments(gson.toJson(params), StringUtils.join(list, ","));
List<CurrentReassignmentVO> res = new ArrayList<>(assignments.size());
assignments.forEach((tp, replicas) -> {
CurrentReassignmentVO vo = new CurrentReassignmentVO(tp.topic(), tp.partition(),
replicas.stream().map(x -> (Integer) x).collect(Collectors.toList()), null, null);
res.add(vo);
});
return ResponseData.create().data(res).success();
}
}

View File

@@ -9,16 +9,6 @@ import com.xuxd.kafka.console.beans.vo.TopicDescriptionVO;
import com.xuxd.kafka.console.beans.vo.TopicPartitionVO;
import com.xuxd.kafka.console.service.TopicService;
import com.xuxd.kafka.console.utils.GsonUtil;
import java.util.Calendar;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import kafka.console.MessageConsole;
import kafka.console.TopicConsole;
import lombok.extern.slf4j.Slf4j;
@@ -33,6 +23,10 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import scala.Tuple2;
import java.util.*;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
/**
* kafka-console-ui.
*
@@ -87,8 +81,8 @@ public class TopicServiceImpl implements TopicService {
return ResponseData.create().data(topicDescriptions.stream().map(d -> TopicDescriptionVO.from(d))).success();
}
@Override public ResponseData deleteTopic(String topic) {
Tuple2<Object, String> tuple2 = topicConsole.deleteTopic(topic);
@Override public ResponseData deleteTopics(Collection<String> topics) {
Tuple2<Object, String> tuple2 = topicConsole.deleteTopics(topics);
return (Boolean) tuple2._1 ? ResponseData.create().success() : ResponseData.create().failed(tuple2._2);
}

View File

@@ -1,4 +1,4 @@
package com.xuxd.kafka.console.interceptor;
package com.xuxd.kafka.console.service.interceptor;
import com.xuxd.kafka.console.beans.ResponseData;
import com.xuxd.kafka.console.beans.dos.ClusterInfoDO;

View File

@@ -1,4 +1,4 @@
package com.xuxd.kafka.console.interceptor;
package com.xuxd.kafka.console.service.interceptor;
import com.xuxd.kafka.console.beans.ResponseData;
import javax.servlet.http.HttpServletRequest;

View File

@@ -12,6 +12,14 @@ kafka:
# 集群其它属性配置
properties:
# request.timeout.ms: 5000
# 缓存连接,不缓存的情况下,每次请求建立连接. 即使每次请求建立连接其实也很快某些情况下开启ACL查询可能很慢可以设置连接缓存为true
# 或者想提高查询速度也可以设置下面连接缓存为true
# 缓存 admin client的连接
cache-admin-connection: false
# 缓存 producer的连接
cache-producer-connection: false
# 缓存 consumer的连接
cache-consumer-connection: false
spring:
application:

View File

@@ -0,0 +1,333 @@
package kafka.console
import com.xuxd.kafka.console.config.ContextConfigHolder
import kafka.utils.Implicits.MapExtensionMethods
import kafka.utils.Logging
import org.apache.kafka.clients._
import org.apache.kafka.clients.admin.AdminClientConfig
import org.apache.kafka.clients.consumer.internals.{ConsumerNetworkClient, RequestFuture}
import org.apache.kafka.common.Node
import org.apache.kafka.common.config.ConfigDef.ValidString.in
import org.apache.kafka.common.config.ConfigDef.{Importance, Type}
import org.apache.kafka.common.config.{AbstractConfig, ConfigDef}
import org.apache.kafka.common.errors.AuthenticationException
import org.apache.kafka.common.internals.ClusterResourceListeners
import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersionCollection
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.network.Selector
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests._
import org.apache.kafka.common.utils.{KafkaThread, LogContext, Time}
import org.slf4j.{Logger, LoggerFactory}
import java.io.IOException
import java.util.Properties
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.{ConcurrentLinkedQueue, TimeUnit}
import scala.jdk.CollectionConverters.{ListHasAsScala, MapHasAsJava, PropertiesHasAsScala, SetHasAsScala}
import scala.util.{Failure, Success, Try}
/**
* kafka-console-ui.
*
* Copy from {@link kafka.admin.BrokerApiVersionsCommand}.
*
* @author xuxd
* @date 2022-01-22 15:15:57
* */
object BrokerApiVersion{
protected lazy val log : Logger = LoggerFactory.getLogger(this.getClass)
def listAllBrokerApiVersionInfo(): java.util.HashMap[Node, NodeApiVersions] = {
val res = new java.util.HashMap[Node, NodeApiVersions]()
val adminClient = createAdminClient()
try {
adminClient.awaitBrokers()
val brokerMap = adminClient.listAllBrokerVersionInfo()
brokerMap.forKeyValue {
(broker, versionInfoOrError) =>
versionInfoOrError match {
case Success(v) => {
res.put(broker, v)
}
case Failure(v) => log.error(s"${broker} -> ERROR: ${v}\n")
}
}
} finally {
adminClient.close()
}
res
}
private def createAdminClient(): AdminClient = {
val props = new Properties()
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, ContextConfigHolder.CONTEXT_CONFIG.get().getBootstrapServer())
props.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, ContextConfigHolder.CONTEXT_CONFIG.get().getRequestTimeoutMs())
props.putAll(ContextConfigHolder.CONTEXT_CONFIG.get().getProperties())
AdminClient.create(props)
}
// org.apache.kafka.clients.admin.AdminClient doesn't currently expose a way to retrieve the supported api versions.
// We inline the bits we need from kafka.admin.AdminClient so that we can delete it.
private class AdminClient(val time: Time,
val client: ConsumerNetworkClient,
val bootstrapBrokers: List[Node]) extends Logging {
@volatile var running = true
val pendingFutures = new ConcurrentLinkedQueue[RequestFuture[ClientResponse]]()
val networkThread = new KafkaThread("admin-client-network-thread", () => {
try {
while (running)
client.poll(time.timer(Long.MaxValue))
} catch {
case t: Throwable =>
error("admin-client-network-thread exited", t)
} finally {
pendingFutures.forEach { future =>
try {
future.raise(Errors.UNKNOWN_SERVER_ERROR)
} catch {
case _: IllegalStateException => // It is OK if the future has been completed
}
}
pendingFutures.clear()
}
}, true)
networkThread.start()
private def send(target: Node,
request: AbstractRequest.Builder[_ <: AbstractRequest]): AbstractResponse = {
val future = client.send(target, request)
pendingFutures.add(future)
future.awaitDone(Long.MaxValue, TimeUnit.MILLISECONDS)
pendingFutures.remove(future)
if (future.succeeded())
future.value().responseBody()
else
throw future.exception()
}
private def sendAnyNode(request: AbstractRequest.Builder[_ <: AbstractRequest]): AbstractResponse = {
bootstrapBrokers.foreach { broker =>
try {
return send(broker, request)
} catch {
case e: AuthenticationException =>
throw e
case e: Exception =>
debug(s"Request ${request.apiKey()} failed against node $broker", e)
}
}
throw new RuntimeException(s"Request ${request.apiKey()} failed on brokers $bootstrapBrokers")
}
private def getApiVersions(node: Node): ApiVersionCollection = {
val response = send(node, new ApiVersionsRequest.Builder()).asInstanceOf[ApiVersionsResponse]
Errors.forCode(response.data.errorCode).maybeThrow()
response.data.apiKeys
}
/**
* Wait until there is a non-empty list of brokers in the cluster.
*/
def awaitBrokers(): Unit = {
var nodes = List[Node]()
val start = System.currentTimeMillis()
val maxWait = 30 * 1000
do {
nodes = findAllBrokers()
if (nodes.isEmpty) {
Thread.sleep(50)
}
}
while (nodes.isEmpty && (System.currentTimeMillis() - start < maxWait))
}
private def findAllBrokers(): List[Node] = {
val request = MetadataRequest.Builder.allTopics()
val response = sendAnyNode(request).asInstanceOf[MetadataResponse]
val errors = response.errors
if (!errors.isEmpty) {
log.info(s"Metadata request contained errors: $errors")
}
// 在3.x版本中这个方法是buildCluster 代替cluster()了
response.buildCluster.nodes.asScala.toList
// response.cluster().nodes.asScala.toList
}
def listAllBrokerVersionInfo(): Map[Node, Try[NodeApiVersions]] =
findAllBrokers().map { broker =>
broker -> Try[NodeApiVersions](new NodeApiVersions(getApiVersions(broker)))
}.toMap
def close(): Unit = {
running = false
try {
client.close()
} catch {
case e: IOException =>
error("Exception closing nioSelector:", e)
}
}
}
private object AdminClient {
val DefaultConnectionMaxIdleMs = 9 * 60 * 1000
val DefaultRequestTimeoutMs = 5000
val DefaultSocketConnectionSetupMs = CommonClientConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MS_CONFIG
val DefaultSocketConnectionSetupMaxMs = CommonClientConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_CONFIG
val DefaultMaxInFlightRequestsPerConnection = 100
val DefaultReconnectBackoffMs = 50
val DefaultReconnectBackoffMax = 50
val DefaultSendBufferBytes = 128 * 1024
val DefaultReceiveBufferBytes = 32 * 1024
val DefaultRetryBackoffMs = 100
val AdminClientIdSequence = new AtomicInteger(1)
val AdminConfigDef = {
val config = new ConfigDef()
.define(
CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG,
Type.LIST,
Importance.HIGH,
CommonClientConfigs.BOOTSTRAP_SERVERS_DOC)
.define(CommonClientConfigs.CLIENT_DNS_LOOKUP_CONFIG,
Type.STRING,
ClientDnsLookup.USE_ALL_DNS_IPS.toString,
in(ClientDnsLookup.USE_ALL_DNS_IPS.toString,
ClientDnsLookup.RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY.toString),
Importance.MEDIUM,
CommonClientConfigs.CLIENT_DNS_LOOKUP_DOC)
.define(
CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,
ConfigDef.Type.STRING,
CommonClientConfigs.DEFAULT_SECURITY_PROTOCOL,
ConfigDef.Importance.MEDIUM,
CommonClientConfigs.SECURITY_PROTOCOL_DOC)
.define(
CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG,
ConfigDef.Type.INT,
DefaultRequestTimeoutMs,
ConfigDef.Importance.MEDIUM,
CommonClientConfigs.REQUEST_TIMEOUT_MS_DOC)
.define(
CommonClientConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MS_CONFIG,
ConfigDef.Type.LONG,
CommonClientConfigs.DEFAULT_SOCKET_CONNECTION_SETUP_TIMEOUT_MS,
ConfigDef.Importance.MEDIUM,
CommonClientConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MS_DOC)
.define(
CommonClientConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_CONFIG,
ConfigDef.Type.LONG,
CommonClientConfigs.DEFAULT_SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS,
ConfigDef.Importance.MEDIUM,
CommonClientConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_DOC)
.define(
CommonClientConfigs.RETRY_BACKOFF_MS_CONFIG,
ConfigDef.Type.LONG,
DefaultRetryBackoffMs,
ConfigDef.Importance.MEDIUM,
CommonClientConfigs.RETRY_BACKOFF_MS_DOC)
.withClientSslSupport()
.withClientSaslSupport()
config
}
class AdminConfig(originals: Map[_, _]) extends AbstractConfig(AdminConfigDef, originals.asJava, false)
def create(props: Properties): AdminClient = {
val properties = new Properties()
val names = props.stringPropertyNames()
for (name <- names.asScala.toSet) {
properties.put(name, props.get(name).toString())
}
create(properties.asScala.toMap)
}
def create(props: Map[String, _]): AdminClient = create(new AdminConfig(props))
def create(config: AdminConfig): AdminClient = {
val clientId = "admin-" + AdminClientIdSequence.getAndIncrement()
val logContext = new LogContext(s"[LegacyAdminClient clientId=$clientId] ")
val time = Time.SYSTEM
val metrics = new Metrics(time)
val metadata = new Metadata(100L, 60 * 60 * 1000L, logContext,
new ClusterResourceListeners)
val channelBuilder = ClientUtils.createChannelBuilder(config, time, logContext)
val requestTimeoutMs = config.getInt(CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG)
val connectionSetupTimeoutMs = config.getLong(CommonClientConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MS_CONFIG)
val connectionSetupTimeoutMaxMs = config.getLong(CommonClientConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_CONFIG)
val retryBackoffMs = config.getLong(CommonClientConfigs.RETRY_BACKOFF_MS_CONFIG)
val brokerUrls = config.getList(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG)
val clientDnsLookup = config.getString(CommonClientConfigs.CLIENT_DNS_LOOKUP_CONFIG)
val brokerAddresses = ClientUtils.parseAndValidateAddresses(brokerUrls, clientDnsLookup)
metadata.bootstrap(brokerAddresses)
val selector = new Selector(
DefaultConnectionMaxIdleMs,
metrics,
time,
"admin",
channelBuilder,
logContext)
// 版本不一样,这个地方的兼容性问题也不一样了
// 3.x版本用这个
val networkClient = new NetworkClient(
selector,
metadata,
clientId,
DefaultMaxInFlightRequestsPerConnection,
DefaultReconnectBackoffMs,
DefaultReconnectBackoffMax,
DefaultSendBufferBytes,
DefaultReceiveBufferBytes,
requestTimeoutMs,
connectionSetupTimeoutMs,
connectionSetupTimeoutMaxMs,
time,
true,
new ApiVersions,
logContext)
// val networkClient = new NetworkClient(
// selector,
// metadata,
// clientId,
// DefaultMaxInFlightRequestsPerConnection,
// DefaultReconnectBackoffMs,
// DefaultReconnectBackoffMax,
// DefaultSendBufferBytes,
// DefaultReceiveBufferBytes,
// requestTimeoutMs,
// connectionSetupTimeoutMs,
// connectionSetupTimeoutMaxMs,
// ClientDnsLookup.USE_ALL_DNS_IPS,
// time,
// true,
// new ApiVersions,
// logContext)
val highLevelClient = new ConsumerNetworkClient(
logContext,
networkClient,
metadata,
time,
retryBackoffMs,
requestTimeoutMs,
Integer.MAX_VALUE)
new AdminClient(
time,
highLevelClient,
metadata.fetch.nodes.asScala.toList)
}
}
}

View File

@@ -1,11 +1,13 @@
package kafka.console
import com.xuxd.kafka.console.beans.{BrokerNode, ClusterInfo}
import com.xuxd.kafka.console.config.{ContextConfigHolder, KafkaConfig}
import org.apache.kafka.clients.NodeApiVersions
import org.apache.kafka.clients.admin.DescribeClusterResult
import org.apache.kafka.common.Node
import java.util.Collections
import java.util.concurrent.TimeUnit
import com.xuxd.kafka.console.beans.{BrokerNode, ClusterInfo}
import com.xuxd.kafka.console.config.{ContextConfigHolder, KafkaConfig}
import org.apache.kafka.clients.admin.DescribeClusterResult
import scala.jdk.CollectionConverters.{CollectionHasAsScala, SetHasAsJava, SetHasAsScala}
/**
@@ -41,4 +43,8 @@ class ClusterConsole(config: KafkaConfig) extends KafkaConsole(config: KafkaConf
new ClusterInfo
}).asInstanceOf[ClusterInfo]
}
def listBrokerVersionInfo(): java.util.HashMap[Node, NodeApiVersions] = {
BrokerApiVersion.listAllBrokerApiVersionInfo()
}
}

View File

@@ -1,18 +1,21 @@
package kafka.console
import com.google.common.cache.{CacheLoader, RemovalListener, RemovalNotification}
import com.xuxd.kafka.console.cache.TimeBasedCache
import com.xuxd.kafka.console.config.{ContextConfigHolder, KafkaConfig}
import kafka.zk.{AdminZkClient, KafkaZkClient}
import kafka.zk.AdminZkClient
import org.apache.kafka.clients.admin._
import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer, OffsetAndMetadata}
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.requests.ListOffsetsResponse
import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySerializer, StringSerializer}
import org.apache.kafka.common.utils.Time
import org.slf4j.{Logger, LoggerFactory}
import java.util.Properties
import java.util.concurrent.Executors
import scala.collection.{Map, Seq}
import scala.concurrent.{ExecutionContext, Future}
import scala.jdk.CollectionConverters.{MapHasAsJava, MapHasAsScala}
/**
@@ -27,11 +30,13 @@ class KafkaConsole(config: KafkaConfig) {
protected def withAdminClient(f: Admin => Any): Any = {
val admin = createAdminClient()
val admin = if (config.isCacheAdminConnection()) AdminCache.cache.get(ContextConfigHolder.CONTEXT_CONFIG.get().getBootstrapServer()) else createAdminClient()
try {
f(admin)
} finally {
admin.close()
if (!config.isCacheAdminConnection) {
admin.close()
}
}
}
@@ -45,33 +50,40 @@ class KafkaConsole(config: KafkaConfig) {
protected def withConsumerAndCatchError(f: KafkaConsumer[Array[Byte], Array[Byte]] => Any, eh: Exception => Any,
extra: Properties = new Properties()): Any = {
val props = getProps()
props.putAll(extra)
props.put(ConsumerConfig.CLIENT_ID_CONFIG, String.valueOf(System.currentTimeMillis()))
val consumer = new KafkaConsumer(props, new ByteArrayDeserializer, new ByteArrayDeserializer)
// val props = getProps()
// props.putAll(extra)
// props.put(ConsumerConfig.CLIENT_ID_CONFIG, String.valueOf(System.currentTimeMillis()))
// val consumer = new KafkaConsumer(props, new ByteArrayDeserializer, new ByteArrayDeserializer)
ConsumerCache.setProperties(extra)
val consumer = if (config.isCacheConsumerConnection) ConsumerCache.cache.get(ContextConfigHolder.CONTEXT_CONFIG.get().getBootstrapServer()) else KafkaConsole.createByteArrayKVConsumer(extra)
try {
f(consumer)
} catch {
case er: Exception => eh(er)
}
finally {
consumer.close()
ConsumerCache.clearProperties()
if (!config.isCacheConsumerConnection) {
consumer.close()
}
}
}
protected def withProducerAndCatchError(f: KafkaProducer[String, String] => Any, eh: Exception => Any,
extra: Properties = new Properties()): Any = {
val props = getProps()
props.putAll(extra)
props.put(ConsumerConfig.CLIENT_ID_CONFIG, String.valueOf(System.currentTimeMillis()))
val producer = new KafkaProducer[String, String](props, new StringSerializer, new StringSerializer)
ProducerCache.setProperties(extra)
val producer = if (config.isCacheProducerConnection) ProducerCache.cache.get(ContextConfigHolder.CONTEXT_CONFIG.get().getBootstrapServer) else KafkaConsole.createProducer(extra)
try {
f(producer)
} catch {
case er: Exception => eh(er)
}
finally {
producer.close()
ProducerCache.clearProperties()
if (!config.isCacheProducerConnection) {
producer.close()
}
}
}
@@ -79,7 +91,6 @@ class KafkaConsole(config: KafkaConfig) {
extra: Properties = new Properties()): Any = {
val props = getProps()
props.putAll(extra)
props.put(ConsumerConfig.CLIENT_ID_CONFIG, String.valueOf(System.currentTimeMillis()))
val producer = new KafkaProducer[Array[Byte], Array[Byte]](props, new ByteArraySerializer, new ByteArraySerializer)
try {
f(producer)
@@ -91,14 +102,17 @@ class KafkaConsole(config: KafkaConfig) {
}
}
@Deprecated
protected def withZKClient(f: AdminZkClient => Any): Any = {
val zkClient = KafkaZkClient(config.getZookeeperAddr, false, 30000, 30000, Int.MaxValue, Time.SYSTEM)
val adminZkClient = new AdminZkClient(zkClient)
try {
f(adminZkClient)
} finally {
zkClient.close()
}
// val zkClient = KafkaZkClient(config.getZookeeperAddr, false, 30000, 30000, Int.MaxValue, Time.SYSTEM)
// 3.x
// val zkClient = KafkaZkClient(config.getZookeeperAddr, false, 30000, 30000, Int.MaxValue, Time.SYSTEM, new ZKClientConfig(), "KafkaZkClient")
// val adminZkClient = new AdminZkClient(zkClient)
// try {
// f(adminZkClient)
// } finally {
// zkClient.close()
// }
}
protected def createAdminClient(props: Properties): Admin = {
@@ -110,20 +124,47 @@ class KafkaConsole(config: KafkaConfig) {
}
private def createAdminClient(): Admin = {
Admin.create(getProps())
KafkaConsole.createAdminClient()
}
private def getProps(): Properties = {
KafkaConsole.getProps()
}
}
object KafkaConsole {
val log: Logger = LoggerFactory.getLogger(this.getClass)
def createAdminClient(): Admin = {
Admin.create(getProps())
}
def createByteArrayKVConsumer(extra: Properties) : KafkaConsumer[Array[Byte], Array[Byte]] = {
val props = getProps()
props.putAll(extra)
props.put(ConsumerConfig.CLIENT_ID_CONFIG, String.valueOf(System.currentTimeMillis()))
new KafkaConsumer(props, new ByteArrayDeserializer, new ByteArrayDeserializer)
}
def createProducer(extra: Properties) : KafkaProducer[String, String] = {
val props = getProps()
props.putAll(extra)
new KafkaProducer(props, new StringSerializer, new StringSerializer)
}
def createByteArrayStringProducer(extra: Properties) : KafkaProducer[Array[Byte], Array[Byte]] = {
val props = getProps()
props.putAll(extra)
new KafkaProducer(props, new ByteArraySerializer, new ByteArraySerializer)
}
def getProps(): Properties = {
val props: Properties = new Properties();
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, ContextConfigHolder.CONTEXT_CONFIG.get().getBootstrapServer())
props.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, ContextConfigHolder.CONTEXT_CONFIG.get().getRequestTimeoutMs())
props.putAll(ContextConfigHolder.CONTEXT_CONFIG.get().getProperties())
props
}
}
object KafkaConsole {
val log: Logger = LoggerFactory.getLogger(this.getClass)
def getCommittedOffsets(admin: Admin, groupId: String,
timeoutMs: Integer): Map[TopicPartition, OffsetAndMetadata] = {
@@ -174,4 +215,88 @@ object KafkaConsole {
}.toMap
res
}
implicit val ec = ExecutionContext.fromExecutorService(Executors.newFixedThreadPool(2))
}
object AdminCache {
private val log: Logger = LoggerFactory.getLogger(this.getClass)
private val cacheLoader = new CacheLoader[String, Admin] {
override def load(key: String): Admin = KafkaConsole.createAdminClient()
}
private val removeListener = new RemovalListener[String, Admin] {
override def onRemoval(notification: RemovalNotification[String, Admin]): Unit = {
Future {
log.warn("Close expired admin connection: {}", notification.getKey)
notification.getValue.close()
log.warn("Close expired admin connection complete: {}", notification.getKey)
}(KafkaConsole.ec)
}
}
val cache = new TimeBasedCache[String, Admin](cacheLoader, removeListener)
}
object ConsumerCache {
private val log: Logger = LoggerFactory.getLogger(this.getClass)
private val threadLocal = new ThreadLocal[Properties]
private val cacheLoader = new CacheLoader[String, KafkaConsumer[Array[Byte], Array[Byte]]] {
override def load(key: String): KafkaConsumer[Array[Byte], Array[Byte]] = KafkaConsole.createByteArrayKVConsumer(threadLocal.get())
}
private val removeListener = new RemovalListener[String, KafkaConsumer[Array[Byte], Array[Byte]]] {
override def onRemoval(notification: RemovalNotification[String, KafkaConsumer[Array[Byte], Array[Byte]]]): Unit = {
Future {
log.warn("Close expired consumer connection: {}", notification.getKey)
notification.getValue.close()
log.warn("Close expired consumer connection complete: {}", notification.getKey)
}(KafkaConsole.ec)
}
}
val cache = new TimeBasedCache[String, KafkaConsumer[Array[Byte], Array[Byte]]](cacheLoader, removeListener)
def setProperties(props : Properties) : Unit = {
threadLocal.set(props)
}
def clearProperties() : Unit = {
threadLocal.remove()
}
}
object ProducerCache {
private val log: Logger = LoggerFactory.getLogger(this.getClass)
private val threadLocal = new ThreadLocal[Properties]
private val cacheLoader = new CacheLoader[String, KafkaProducer[String, String]] {
override def load(key: String): KafkaProducer[String, String] = KafkaConsole.createProducer(threadLocal.get())
}
private val removeListener = new RemovalListener[String, KafkaProducer[String, String]] {
override def onRemoval(notification: RemovalNotification[String, KafkaProducer[String, String]]): Unit = {
Future {
log.warn("Close expired producer connection: {}", notification.getKey)
notification.getValue.close()
log.warn("Close expired producer connection complete: {}", notification.getKey)
}(KafkaConsole.ec)
}
}
val cache = new TimeBasedCache[String, KafkaProducer[String, String]](cacheLoader, removeListener)
def setProperties(props : Properties) : Unit = {
threadLocal.set(props)
}
def clearProperties() : Unit = {
threadLocal.remove()
}
}

View File

@@ -4,13 +4,14 @@ import com.xuxd.kafka.console.beans.MessageFilter
import com.xuxd.kafka.console.beans.enums.FilterType
import com.xuxd.kafka.console.config.{ContextConfigHolder, KafkaConfig}
import org.apache.commons.lang3.StringUtils
import org.apache.kafka.clients.admin.{DeleteRecordsOptions, RecordsToDelete}
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.TopicPartition
import java.time.Duration
import java.util
import java.util.Properties
import java.util.{Properties}
import scala.collection.immutable
import scala.jdk.CollectionConverters.{CollectionHasAsScala, MapHasAsScala, SeqHasAsJava}
@@ -127,7 +128,7 @@ class MessageConsole(config: KafkaConfig) extends KafkaConsole(config: KafkaConf
record.offset(),
record.timestamp(),
record.timestampType(),
record.checksum(),
// record.checksum(),
record.serializedKeySize(),
record.serializedValueSize(),
record.key(),
@@ -236,4 +237,14 @@ class MessageConsole(config: KafkaConfig) extends KafkaConsole(config: KafkaConf
(false, e.getMessage)
}).asInstanceOf[(Boolean, String)]
}
def delete(recordsToDelete: util.Map[TopicPartition, RecordsToDelete]): (Boolean, String) = {
withAdminClientAndCatchError(admin => {
admin.deleteRecords(recordsToDelete, withTimeoutMs(new DeleteRecordsOptions())).all().get()
(true, "")
}, e => {
log.error("delete message error.", e)
(false, "delete error :" + e.getMessage)
}).asInstanceOf[(Boolean, String)]
}
}

View File

@@ -242,8 +242,8 @@ class OperationConsole(config: KafkaConfig, topicConsole: TopicConsole,
withAdminClientAndCatchError(admin => {
admin.listPartitionReassignments(withTimeoutMs(new ListPartitionReassignmentsOptions)).reassignments().get()
}, e => {
Collections.emptyMap()
log.error("listPartitionReassignments error.", e)
Collections.emptyMap()
}).asInstanceOf[util.Map[TopicPartition, PartitionReassignment]]
}
@@ -256,4 +256,20 @@ class OperationConsole(config: KafkaConfig, topicConsole: TopicConsole,
throw e
}).asInstanceOf[util.Map[TopicPartition, Throwable]]
}
def proposedAssignments(reassignmentJson: String,
brokerListString: String): util.Map[TopicPartition, util.List[Int]] = {
withAdminClientAndCatchError(admin => {
val map = ReassignPartitionsCommand.generateAssignment(admin, reassignmentJson, brokerListString, true)._1
val res = new util.HashMap[TopicPartition, util.List[Int]]()
for (tp <- map.keys) {
res.put(tp, map(tp).asJava)
// res.put(tp, map.getOrElse(tp, Seq.empty).asJava)
}
res
}, e => {
log.error("proposedAssignments error.", e)
throw e
})
}.asInstanceOf[util.Map[TopicPartition, util.List[Int]]]
}

View File

@@ -66,17 +66,17 @@ class TopicConsole(config: KafkaConfig) extends KafkaConsole(config: KafkaConfig
/**
* delete topic by topic name.
*
* @param topic topic name.
* @param topics topic name list.
* @return result or : fail message.
*/
def deleteTopic(topic: String): (Boolean, String) = {
def deleteTopics(topics: util.Collection[String]): (Boolean, String) = {
withAdminClientAndCatchError(admin => {
val timeoutMs = ContextConfigHolder.CONTEXT_CONFIG.get().getRequestTimeoutMs()
admin.deleteTopics(Collections.singleton(topic), new DeleteTopicsOptions().retryOnQuotaViolation(false)).all().get(timeoutMs, TimeUnit.MILLISECONDS)
admin.deleteTopics(topics, new DeleteTopicsOptions().retryOnQuotaViolation(false)).all().get(timeoutMs, TimeUnit.MILLISECONDS)
(true, "")
},
e => {
log.error("delete topic error, topic: " + topic, e)
log.error("delete topic error, topic: " + topics, e)
(false, e.getMessage)
}).asInstanceOf[(Boolean, String)]
}

View File

Binary file not shown.

Before

Width:  |  Height:  |  Size: 4.2 KiB

After

Width:  |  Height:  |  Size: 5.4 KiB

BIN
ui/public/vue.ico Normal file
View File

Binary file not shown.

After

Width:  |  Height:  |  Size: 4.2 KiB

View File

@@ -203,6 +203,10 @@ export const KafkaClusterApi = {
url: "/cluster/info/peek",
method: "get",
},
getBrokerApiVersionInfo: {
url: "/cluster/info/api/version",
method: "get",
},
};
export const KafkaOpApi = {
@@ -242,6 +246,10 @@ export const KafkaOpApi = {
url: "/op/replication/reassignments",
method: "delete",
},
proposedAssignment: {
url: "/op/replication/reassignments/proposed",
method: "post",
},
};
export const KafkaMessageApi = {
searchByTime: {
@@ -268,4 +276,8 @@ export const KafkaMessageApi = {
url: "/message/resend",
method: "post",
},
delete: {
url: "/message",
method: "delete",
},
};

View File

@@ -27,7 +27,7 @@ request.interceptors.request.use((config) => {
const clusterInfo = getClusterInfo();
if (clusterInfo) {
config.headers["X-Cluster-Info-Id"] = clusterInfo.id;
config.headers["X-Cluster-Info-Name"] = clusterInfo.clusterName;
// config.headers["X-Cluster-Info-Name"] = encodeURIComponent(clusterInfo.clusterName);
}
return config;
}, errorHandler);

View File

@@ -1,25 +1,66 @@
<template>
<div class="home">
<a-card title="kafka console 配置" style="width: 100%">
<!-- <a slot="extra" href="#">more</a>-->
<a-card title="控制台默认配置" class="card-style">
<p v-for="(v, k) in config" :key="k">{{ k }}={{ v }}</p>
</a-card>
<p></p>
<hr />
<h3>kafka API 版本兼容性</h3>
<a-spin :spinning="apiVersionInfoLoading">
<a-table
:columns="columns"
:data-source="brokerApiVersionInfo"
bordered
row-key="brokerId"
>
<div slot="operation" slot-scope="record">
<a-button
size="small"
href="javascript:;"
class="operation-btn"
@click="openApiVersionInfoDialog(record)"
>详情
</a-button>
</div>
</a-table>
</a-spin>
<VersionInfo
:version-info="apiVersionInfo"
:visible="showApiVersionInfoDialog"
@closeApiVersionInfoDialog="closeApiVersionInfoDialog"
>
</VersionInfo>
</div>
</template>
<script>
// @ is an alias to /src
import request from "@/utils/request";
import { KafkaConfigApi } from "@/utils/api";
import { KafkaConfigApi, KafkaClusterApi } from "@/utils/api";
import notification from "ant-design-vue/lib/notification";
import VersionInfo from "@/views/home/VersionInfo";
export default {
name: "Home",
components: {},
components: { VersionInfo },
data() {
return {
config: {},
columns,
brokerApiVersionInfo: [],
showApiVersionInfoDialog: false,
apiVersionInfo: [],
apiVersionInfoLoading: false,
};
},
methods: {
openApiVersionInfoDialog(record) {
this.apiVersionInfo = record.versionInfo;
this.showApiVersionInfoDialog = true;
},
closeApiVersionInfoDialog() {
this.showApiVersionInfoDialog = false;
},
},
created() {
request({
@@ -35,6 +76,53 @@ export default {
});
}
});
this.apiVersionInfoLoading = true;
request({
url: KafkaClusterApi.getBrokerApiVersionInfo.url,
method: KafkaClusterApi.getBrokerApiVersionInfo.method,
}).then((res) => {
this.apiVersionInfoLoading = false;
if (res.code == 0) {
this.brokerApiVersionInfo = res.data;
} else {
notification.error({
message: "error",
description: res.msg,
});
}
});
},
};
const columns = [
{
title: "id",
dataIndex: "brokerId",
key: "brokerId",
},
{
title: "地址",
dataIndex: "host",
key: "host",
},
{
title: "支持的api数量",
dataIndex: "supportNums",
key: "supportNums",
},
{
title: "不支持的api数量",
dataIndex: "unSupportNums",
key: "unSupportNums",
},
{
title: "操作",
key: "operation",
scopedSlots: { customRender: "operation" },
},
];
</script>
<style scoped>
.card-style {
width: 100%;
}
</style>

View File

@@ -0,0 +1,61 @@
<template>
<a-modal
title="API版本信息"
:visible="show"
:width="600"
:mask="false"
:destroyOnClose="true"
:footer="null"
:maskClosable="false"
@cancel="handleCancel"
>
<div>
<h3>格式说明</h3>
<p>请求类型(1)0 to n(2) [usage: v](3)</p>
<ol>
<li>表示客户端发出的请求类型</li>
<li>该请求在broker中支持的版本号区间</li>
<li>
表示当前控制台的kafka客户端使用的是v版本如果是UNSUPPORTED说明broker版本太老无法处理控制台的这些请求可能影响相关功能的使用
</li>
</ol>
<hr />
<ol>
<li v-for="info in versionInfo" v-bind:key="info">{{ info }}</li>
</ol>
</div>
</a-modal>
</template>
<script>
export default {
name: "APIVersionInfo",
props: {
versionInfo: {
type: Array,
},
visible: {
type: Boolean,
default: false,
},
},
data() {
return {
show: false,
};
},
watch: {
visible(v) {
this.show = v;
},
},
methods: {
handleCancel() {
this.$emit("closeApiVersionInfoDialog", {});
},
},
};
</script>
<style scoped></style>

View File

@@ -0,0 +1,196 @@
<template>
<div class="tab-content">
<a-spin :spinning="loading">
<div>
<h4 class="hint-content">
注意以下删除将删除该分区比该偏移位点小的所有消息不包含该位点
</h4>
<hr />
</div>
<div id="search-offset-form-advanced-search">
<a-form
class="ant-advanced-search-form"
:form="form"
@submit="handleSearch"
>
<a-row :gutter="24">
<a-col :span="9">
<a-form-item label="topic">
<a-select
class="topic-select"
@change="handleTopicChange"
show-search
option-filter-prop="children"
v-decorator="[
'topic',
{
rules: [{ required: true, message: '请选择一个topic!' }],
},
]"
placeholder="请选择一个topic"
>
<a-select-option v-for="v in topicList" :key="v" :value="v">
{{ v }}
</a-select-option>
</a-select>
</a-form-item>
</a-col>
<a-col :span="6">
<a-form-item label="分区">
<a-select
class="type-select"
show-search
option-filter-prop="children"
v-model="selectPartition"
placeholder="请选择一个分区"
>
<a-select-option v-for="v in partitions" :key="v" :value="v">
<span v-if="v == -1">全部</span> <span v-else>{{ v }}</span>
</a-select-option>
</a-select>
</a-form-item>
</a-col>
<a-col :span="7">
<a-form-item label="偏移">
<a-input
v-decorator="[
'offset',
{
rules: [{ required: true, message: '请输入消息偏移!' }],
},
]"
placeholder="消息偏移"
/>
</a-form-item>
</a-col>
<a-col :span="2" :style="{ textAlign: 'right' }">
<a-form-item>
<a-button type="primary" html-type="submit"> 执行删除</a-button>
</a-form-item>
</a-col>
</a-row>
</a-form>
</div>
</a-spin>
</div>
</template>
<script>
import request from "@/utils/request";
import { KafkaMessageApi, KafkaTopicApi } from "@/utils/api";
import notification from "ant-design-vue/lib/notification";
export default {
name: "DeleteMessage",
props: {
topicList: {
type: Array,
},
},
data() {
return {
loading: false,
form: this.$form.createForm(this, { name: "message_search_offset" }),
partitions: [],
selectPartition: undefined,
};
},
methods: {
handleSearch(e) {
e.preventDefault();
this.form.validateFields((err, values) => {
if (!err) {
const data = Object.assign({}, values, {
partition: this.selectPartition,
});
this.loading = true;
request({
url: KafkaMessageApi.delete.url,
method: KafkaMessageApi.delete.method,
data: [data],
}).then((res) => {
this.loading = false;
if (res.code == 0) {
this.$message.success(res.msg);
} else {
notification.error({
message: "error",
description: res.msg,
});
}
});
}
});
},
getPartitionInfo(topic) {
this.loading = true;
request({
url: KafkaTopicApi.getPartitionInfo.url + "?topic=" + topic,
method: KafkaTopicApi.getPartitionInfo.method,
}).then((res) => {
this.loading = false;
if (res.code != 0) {
notification.error({
message: "error",
description: res.msg,
});
} else {
this.partitions = res.data.map((v) => v.partition);
}
});
},
handleTopicChange(topic) {
this.selectPartition =
this.partitions.length > 0 ? this.partitions[0] : 0;
this.getPartitionInfo(topic);
},
},
};
</script>
<style scoped>
.tab-content {
width: 100%;
height: 100%;
}
.ant-advanced-search-form {
padding: 24px;
background: #fbfbfb;
border: 1px solid #d9d9d9;
border-radius: 6px;
}
.ant-advanced-search-form .ant-form-item {
display: flex;
}
.ant-advanced-search-form .ant-form-item-control-wrapper {
flex: 1;
}
#components-form-topic-advanced-search .ant-form {
max-width: none;
margin-bottom: 1%;
}
#search-offset-form-advanced-search .search-result-list {
margin-top: 16px;
border: 1px dashed #e9e9e9;
border-radius: 6px;
background-color: #fafafa;
min-height: 200px;
text-align: center;
padding-top: 80px;
}
.topic-select {
width: 400px !important;
}
.type-select {
width: 200px !important;
}
.hint-content {
color: red;
}
</style>

View File

@@ -11,6 +11,9 @@
<a-tab-pane key="3" tab="在线发送">
<SendMessage :topic-list="topicList"></SendMessage>
</a-tab-pane>
<a-tab-pane key="4" tab="在线删除">
<DeleteMessage :topic-list="topicList"></DeleteMessage>
</a-tab-pane>
</a-tabs>
</a-spin>
</div>
@@ -23,9 +26,10 @@ import request from "@/utils/request";
import { KafkaTopicApi } from "@/utils/api";
import notification from "ant-design-vue/lib/notification";
import SendMessage from "@/views/message/SendMessage";
import DeleteMessage from "./DeleteMessage";
export default {
name: "Message",
components: { SearchByTime, SearchByOffset, SendMessage },
components: { DeleteMessage, SearchByTime, SearchByOffset, SendMessage },
data() {
return {
loading: false,

View File

@@ -203,7 +203,7 @@ export default {
this.$emit("closeDetailDialog", { refresh: false });
},
formatTime(time) {
return moment(time).format("YYYY-MM-DD HH:mm:ss:SSS");
return time == -1 ? -1 : moment(time).format("YYYY-MM-DD HH:mm:ss:SSS");
},
keyDeserializerChange() {
this.getMessageDetail();

View File

@@ -9,6 +9,7 @@
return index;
}
"
@change="handleChange"
>
<div slot="operation" slot-scope="record">
<a-button
@@ -41,9 +42,9 @@ export default {
},
data() {
return {
columns: columns,
showDetailDialog: false,
record: {},
sortedInfo: null,
};
},
methods: {
@@ -54,42 +55,56 @@ export default {
closeDetailDialog() {
this.showDetailDialog = false;
},
},
};
const columns = [
{
title: "topic",
dataIndex: "topic",
key: "topic",
width: 300,
},
{
title: "分区",
dataIndex: "partition",
key: "partition",
},
{
title: "偏移",
dataIndex: "offset",
key: "offset",
},
{
title: "时间",
dataIndex: "timestamp",
key: "timestamp",
slots: { title: "timestamp" },
scopedSlots: { customRender: "timestamp" },
customRender: (text) => {
return moment(text).format("YYYY-MM-DD HH:mm:ss:SSS");
handleChange() {
this.sortedInfo = arguments[2];
},
},
{
title: "操作",
key: "operation",
scopedSlots: { customRender: "operation" },
width: 200,
computed: {
columns() {
let sortedInfo = this.sortedInfo || {};
const columns = [
{
title: "topic",
dataIndex: "topic",
key: "topic",
width: 300,
},
{
title: "分区",
dataIndex: "partition",
key: "partition",
},
{
title: "偏移",
dataIndex: "offset",
key: "offset",
},
{
title: "时间",
dataIndex: "timestamp",
key: "timestamp",
slots: { title: "timestamp" },
scopedSlots: { customRender: "timestamp" },
customRender: (text) => {
return text == -1
? -1
: moment(text).format("YYYY-MM-DD HH:mm:ss:SSS");
},
sorter: (a, b) => a.timestamp - b.timestamp,
sortOrder: sortedInfo.columnKey === "timestamp" && sortedInfo.order,
sortDirections: ["ascend", "descend"],
},
{
title: "操作",
key: "operation",
scopedSlots: { customRender: "operation" },
width: 200,
},
];
return columns;
},
},
];
};
</script>
<style scoped></style>

View File

@@ -69,6 +69,9 @@ sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule require
import request from "@/utils/request";
import { KafkaClusterApi } from "@/utils/api";
import notification from "ant-design-vue/es/notification";
import { getClusterInfo } from "@/utils/local-cache";
import { mapMutations } from "vuex";
import { CLUSTER } from "@/store/mutation-types";
export default {
name: "AddClusterInfo",
@@ -124,6 +127,17 @@ export default {
if (res.code == 0) {
this.$message.success(res.msg);
this.$emit(this.closeDialogEvent, { refresh: true });
if (this.isModify) {
let clusterInfo = getClusterInfo();
if (
clusterInfo &&
clusterInfo.id &&
clusterInfo.id == this.clusterInfo.id &&
clusterInfo.clusterName != data.clusterName
) {
this.switchCluster(data);
}
}
} else {
notification.error({
message: "error",
@@ -138,6 +152,9 @@ export default {
this.data = [];
this.$emit(this.closeDialogEvent, { refresh: false });
},
...mapMutations({
switchCluster: CLUSTER.SWITCH,
}),
},
};
const defaultInfo = { clusterName: "", address: "", properties: "" };

View File

@@ -49,6 +49,15 @@
<label>说明</label>
<span>查看正在进行副本变更/重分配的任务或者将其取消</span>
</p>
<p>
<a-button type="primary" @click="openReplicaReassignDialog">
副本重分配
</a-button>
<label>说明</label>
<span
>副本所在节点重新分配打个比方集群有6个节点分区1的3个副本在节点123现在将它们重新分配到345</span
>
</p>
</a-card>
</div>
<!-- 隐藏数据同步相关-->
@@ -125,6 +134,11 @@
:visible="clusterManager.showClusterInfoDialog"
@closeClusterInfoDialog="closeClusterInfoDialog"
></ClusterInfo>
<ReplicaReassign
:visible="replicationManager.showReplicaReassignDialog"
@closeReplicaReassignDialog="closeReplicaReassignDialog"
>
</ReplicaReassign>
</div>
</template>
@@ -138,6 +152,7 @@ import ConfigThrottle from "@/views/op/ConfigThrottle";
import RemoveThrottle from "@/views/op/RemoveThrottle";
import CurrentReassignments from "@/views/op/CurrentReassignments";
import ClusterInfo from "@/views/op/ClusterInfo";
import ReplicaReassign from "@/views/op/ReplicaReassign";
export default {
name: "Operation",
components: {
@@ -150,6 +165,7 @@ export default {
RemoveThrottle,
CurrentReassignments,
ClusterInfo,
ReplicaReassign,
},
data() {
return {
@@ -162,6 +178,7 @@ export default {
replicationManager: {
showElectPreferredLeaderDialog: false,
showCurrentReassignmentsDialog: false,
showReplicaReassignDialog: false,
},
brokerManager: {
showConfigThrottleDialog: false,
@@ -227,6 +244,12 @@ export default {
closeClusterInfoDialog() {
this.clusterManager.showClusterInfoDialog = false;
},
openReplicaReassignDialog() {
this.replicationManager.showReplicaReassignDialog = true;
},
closeReplicaReassignDialog() {
this.replicationManager.showReplicaReassignDialog = false;
},
},
};
</script>

View File

@@ -0,0 +1,296 @@
<template>
<a-modal
title="副本重分配"
:visible="show"
:width="800"
:mask="false"
:destroyOnClose="true"
:footer="null"
:maskClosable="false"
@cancel="handleCancel"
>
<div>
<a-spin :spinning="loading">
<a-form
:form="form"
:label-col="{ span: 5 }"
:wrapper-col="{ span: 12 }"
@submit="handleSubmit"
>
<a-form-item label="Topic">
<a-select
@change="handleTopicChange"
show-search
option-filter-prop="children"
v-decorator="[
'topic',
{ rules: [{ required: true, message: '请选择一个topic!' }] },
]"
placeholder="请选择一个topic"
>
<a-select-option v-for="v in topicList" :key="v" :value="v">
{{ v }}
</a-select-option>
</a-select>
</a-form-item>
<a-form-item label="分配到Broker">
<a-select
mode="multiple"
option-filter-prop="children"
v-decorator="[
'brokers',
{
initialValue: brokers,
rules: [{ required: true, message: '请选择一个broker!' }],
},
]"
placeholder="请选择一个broker"
>
<a-select-option v-for="v in brokers" :key="v" :value="v">
<span v-if="v == -1">全部</span> <span v-else>{{ v }}</span>
</a-select-option>
</a-select>
</a-form-item>
<a-table
bordered
:columns="columns"
:data-source="currentAssignment"
:rowKey="
(record, index) => {
return index;
}
"
>
</a-table>
<a-form-item :wrapper-col="{ span: 12, offset: 5 }">
<a-button type="primary" html-type="submit">
重新生成分配计划
</a-button>
</a-form-item>
</a-form>
<hr />
<h2>新的分配计划</h2>
<a-table
bordered
:columns="columns"
:data-source="proposedAssignmentShow"
:rowKey="
(record, index) => {
return index;
}
"
>
</a-table>
<a-button type="danger" @click="updateAssignment"> 更新分配 </a-button>
</a-spin>
<hr />
<h4>注意</h4>
<ul>
<li>
副本重分配可以将副本分配到其它broker上通过选择上面的broker节点根据这几个节点生成分配方案
</li>
<li>
选择的broker的节点数量不能少于当前的副本数比如有3个副本至少需要3个broker节点
</li>
<li>
数据量太大考虑设置一下限流毕竟重新分配后不同broker之间可能做数据迁移
</li>
</ul>
</div>
</a-modal>
</template>
<script>
import request from "@/utils/request";
import { KafkaTopicApi, KafkaOpApi, KafkaClusterApi } from "@/utils/api";
import notification from "ant-design-vue/es/notification";
export default {
name: "ReplicaReassign",
props: {
visible: {
type: Boolean,
default: false,
},
},
data() {
return {
show: this.visible,
data: [],
loading: false,
form: this.$form.createForm(this, { name: "ReplicaReassignForm" }),
topicList: [],
partitions: [],
brokers: [],
currentAssignment: [],
proposedAssignment: [],
proposedAssignmentShow: [],
columns,
};
},
watch: {
visible(v) {
this.show = v;
if (this.show) {
this.clearData();
this.getTopicNameList();
this.getClusterInfo();
}
},
},
methods: {
handleSubmit(e) {
e.preventDefault();
this.form.validateFields((err, values) => {
if (!err) {
this.getProposedAssignment(values);
}
});
},
getTopicReplicaInfo(topic) {
this.loading = true;
request({
url: KafkaTopicApi.getCurrentReplicaAssignment.url + "?topic=" + topic,
method: KafkaTopicApi.getCurrentReplicaAssignment.method,
}).then((res) => {
this.loading = false;
if (res.code == 0) {
this.currentAssignment = res.data.partitions;
this.currentAssignment.forEach(
(e) => (e.replicas = e.replicas.join(","))
);
} else {
notification.error({
message: "error",
description: res.msg,
});
}
});
},
getTopicNameList() {
request({
url: KafkaTopicApi.getTopicNameList.url,
method: KafkaTopicApi.getTopicNameList.method,
}).then((res) => {
if (res.code == 0) {
this.topicList = res.data;
} else {
notification.error({
message: "error",
description: res.msg,
});
}
});
},
getPartitionInfo(topic) {
this.loading = true;
request({
url: KafkaTopicApi.getPartitionInfo.url + "?topic=" + topic,
method: KafkaTopicApi.getPartitionInfo.method,
}).then((res) => {
this.loading = false;
if (res.code != 0) {
notification.error({
message: "error",
description: res.msg,
});
} else {
this.partitions = res.data.map((v) => v.partition);
this.partitions.splice(0, 0, -1);
}
});
},
handleTopicChange(topic) {
// this.getPartitionInfo(topic);
this.clearData();
this.getTopicReplicaInfo(topic);
},
getClusterInfo() {
this.loading = true;
request({
url: KafkaClusterApi.getClusterInfo.url,
method: KafkaClusterApi.getClusterInfo.method,
}).then((res) => {
this.loading = false;
this.brokers = [];
res.data.nodes.forEach((node) => this.brokers.push(node.id));
});
},
getProposedAssignment(params) {
this.loading = true;
request({
url: KafkaOpApi.proposedAssignment.url,
method: KafkaOpApi.proposedAssignment.method,
data: params,
}).then((res) => {
this.loading = false;
if (res.code != 0) {
notification.error({
message: "error",
description: res.msg,
});
} else {
this.proposedAssignmentShow = res.data;
this.proposedAssignment = JSON.parse(
JSON.stringify(this.proposedAssignmentShow)
);
this.proposedAssignmentShow.forEach(
(e) => (e.replicas = e.replicas.join(","))
);
}
});
},
clearData() {
this.currentAssignment = [];
this.proposedAssignment = [];
this.proposedAssignmentShow = [];
},
handleCancel() {
this.data = [];
this.$emit("closeReplicaReassignDialog", { refresh: false });
},
updateAssignment() {
this.form.validateFields((err, values) => {
if (!err) {
if (this.proposedAssignment.length == 0) {
this.$message.warn("请先生成分配计划!");
return;
}
this.loading = true;
request({
url: KafkaTopicApi.updateReplicaAssignment.url,
method: KafkaTopicApi.updateReplicaAssignment.method,
data: { partitions: this.proposedAssignment },
}).then((res) => {
this.loading = false;
if (res.code == 0) {
this.$message.success(res.msg);
this.handleTopicChange(values.topic);
} else {
notification.error({
message: "error",
description: res.msg,
});
}
});
}
});
},
},
};
const columns = [
{
title: "分区",
dataIndex: "partition",
key: "partition",
},
{
title: "副本所在broker",
dataIndex: "replicas",
key: "replicas",
scopedSlots: { customRender: "replicas" },
},
];
</script>
<style scoped></style>

View File

@@ -49,10 +49,26 @@
<a-button type="primary" @click="openCreateTopicDialog"
>新增</a-button
>
<a-popconfirm
title="删除这些Topic?"
ok-text="确认"
cancel-text="取消"
@confirm="deleteTopics(selectedRowKeys)"
>
<a-button type="danger" class="btn-left" :disabled="!hasSelected" :loading="loading">
批量删除
</a-button>
</a-popconfirm>
<span style="margin-left: 8px">
<template v-if="hasSelected">
{{ `已选择 ${selectedRowKeys.length} 个Topic` }}
</template>
</span>
</div>
<a-table
:columns="columns"
:data-source="filteredData"
:row-selection="{ selectedRowKeys: selectedRowKeys, onChange: onSelectChange }"
bordered
row-key="name"
>
@@ -225,8 +241,14 @@ export default {
filterTopic: "",
filteredData: [],
type: "normal",
selectedRowKeys: [], // Check here to configure the default column
};
},
computed: {
hasSelected() {
return this.selectedRowKeys.length > 0;
},
},
methods: {
handleSearch(e) {
e.preventDefault();
@@ -256,14 +278,16 @@ export default {
}
});
},
deleteTopic(topic) {
deleteTopics(topics) {
request({
url: KafkaTopicApi.deleteTopic.url + "?topic=" + topic,
url: KafkaTopicApi.deleteTopic.url,
method: KafkaTopicApi.deleteTopic.method,
data: topics
}).then((res) => {
if (res.code == 0) {
this.$message.success(res.msg);
this.getTopicList();
this.selectedRowKeys = [];
} else {
notification.error({
message: "error",
@@ -272,6 +296,9 @@ export default {
}
});
},
deleteTopic(topic) {
this.deleteTopics([topic])
},
onTopicUpdate(input) {
this.filterTopic = input.target.value;
this.filter();
@@ -342,9 +369,13 @@ export default {
closeThrottleDialog() {
this.showThrottleDialog = false;
},
onSelectChange(selectedRowKeys) {
this.selectedRowKeys = selectedRowKeys;
},
},
created() {
this.getTopicList();
this.selectedRowKeys = [];
},
};
@@ -431,4 +462,8 @@ const columns = [
.type-select {
width: 200px !important;
}
.btn-left {
margin-left: 1%;
}
</style>