23 Commits

Author SHA1 Message Date
许晓东
8131cb1a42 发布1.0.4安装包下载地址 2022-02-16 20:01:06 +08:00
许晓东
1dd6466261 副本重分配 2022-02-16 19:50:35 +08:00
许晓东
dda08a2152 副本重分配-》生成分配计划 2022-02-15 20:13:07 +08:00
许晓东
01c7121ee4 集群节点列表有序 2022-01-22 23:33:13 +08:00
许晓东
d939d7653c 主页展示Broker API的版本兼容信息 2022-01-22 23:07:41 +08:00
许晓东
058cd5a24e 查询当前重分配,版本不支持异常处理 2022-01-20 13:44:37 +08:00
许晓东
db3f55ac4a polish README 2022-01-19 19:05:00 +08:00
许晓东
a311a34537 分区比较栈溢出bug修复 2022-01-18 20:42:11 +08:00
许晓东
e8fe2ea1c7 集群名称支持中文,消息查询可选择时间展示顺序 2022-01-13 14:19:17 +08:00
许晓东
10302dd39c v1.0.3安装包下载地址 2022-01-09 23:57:00 +08:00
许晓东
55a4483fcc 打包只打zip包 2022-01-09 23:45:39 +08:00
许晓东
4dd2412b78 polish README 2022-01-09 23:40:20 +08:00
许晓东
387c714072 polish README 2022-01-09 23:27:23 +08:00
许晓东
6a2d876d50 多集群支持,集群切换 2022-01-06 19:31:44 +08:00
许晓东
f5fb2c4f88 集群切换 2022-01-05 21:19:46 +08:00
许晓东
6f9676e259 集群列表、新增集群 2022-01-04 21:06:50 +08:00
许晓东
2427ce2c1e 准备开发多集群支持 2022-01-03 22:02:03 +08:00
许晓东
02abe67fce 消息查询过滤 2021-12-30 14:17:47 +08:00
许晓东
ad39f4e82c 消息查询过滤 2021-12-29 21:15:56 +08:00
许晓东
243c89b459 topic模糊查询,消息过滤页面配置 2021-12-28 20:39:07 +08:00
许晓东
11418cd6e0 去掉消息同步方案入口 2021-12-27 20:41:19 +08:00
许晓东
b19c6200d2 polish README 2021-12-21 14:57:16 +08:00
许晓东
5f6a06c100 1.0.2安装包下载地址 2021-12-21 14:44:55 +08:00
92 changed files with 2987 additions and 363 deletions

121
README.md
View File

@@ -2,88 +2,71 @@
一款轻量级的kafka可视化管理平台安装配置快捷、简单易用。
为了开发的省事,没有国际化支持,只支持中文展示。
用过rocketmq-console吧前端展示风格跟那个有点类似。
## 安装包下载
以下两种方式2选一直接下载安装包或下载源码手动打包
* 点击下载(v1.0.1版本)[kafka-console-ui.tar.gz](https://github.com/xxd763795151/kafka-console-ui/releases/download/v1.0.1/kafka-console-ui.tar.gz) 或 [kafka-console-ui.zip](https://github.com/xxd763795151/kafka-console-ui/releases/download/v1.0.1/kafka-console-ui.zip)
* 参考下面的打包部署,下载源码重新打包(提交的最新功能特性)
## 页面预览
如果github能查看图片的话可以点击[查看菜单页面](./document/overview/概览.md),查看每个页面的样子
## 集群迁移支持说明
当前主分支及日后版本不再提供消息同步、集群迁移的解决方案,如有需要,查看:[集群迁移说明](./document/datasync/集群迁移.md)
## ACL说明
acl配置说明如果kafka集群启用了ACL但是控制台没看到Acl菜单可以查看[Acl配置启用说明](./document/acl/Acl.md)
## 功能支持
* 多集群支持
* 集群信息
* Topic管理
* 消费组管理
* 基于SASL_SCRAM认证授权管理
* 消息管理
* ACL
* 运维
![功能特性](./document/功能特性.png)
## 技术栈
* spring boot
* java、scala
* kafka
* h2
* vue
## kafka版本
* 当前使用的kafka 2.8.0
## 监控
仅提供运维管理功能监控、告警需要配合其它组件使用请查看https://blog.csdn.net/x763795151/article/details/119705372
# 打包、部署
## 打包
环境要求
* maven 3.6+
* jdk 8
* git
功能明细看这个脑图:
![功能特性](./document/img/功能特性.png)
## 安装包下载
点击下载(v1.0.4版本)[kafka-console-ui.zip](https://github.com/xxd763795151/kafka-console-ui/releases/download/v1.0.4/kafka-console-ui.zip)
## 快速使用
### Windows
1. 解压缩zip安装包
2. 进入bin目录必须在bin目录下双击执行`start.bat`启动
3. 停止:直接关闭启动的命令行窗口即可
### Linux或Mac OS
```
git clone https://github.com/xxd763795151/kafka-console-ui.git
cd kafka-console-ui
# linux或mac执行
sh package.sh
# windows执行
package.bat
```
打包成功,输出文件(以下2种归档类型)
* target/kafka-console-ui.tar.gz
* target/kafka-console-ui.zip
## 部署
### Mac OS 或 Linux
```
# 解压缩(以tar.gz为例)
tar -zxvf kafka-console-ui.tar.gz
# 解压缩
unzip kafka-console-ui.zip
# 进入解压缩后的目录
cd kafka-console-ui
# 编辑配置
vim config/application.yml
# 启动
sh bin/start.sh
# 停止
sh bin/shutdown.sh
```
### Windows
1.解压缩zip安装包
2.编辑配置文件 `config/application.yml`
3.进入bin目录必须在bin目录下执行`start.bat`启动
启动完成访问http://127.0.0.1:7766
### 访问地址
启动完成访问http://127.0.0.1:7766
# 开发环境
* jdk 8
* idea
* scala 2.13
* maven >=3.6+
* webstorm
除了webstorm是开发前端的ide可以根据自己需要代替jdk scala是必须有的。
# 本地开发配置
以我自己为例开发环境里的工具准备好然后代码clone到本地。
## 后端配置
1. 用idea打开项目
2. 打开idea的Project Structure(Settings) -> Modules -> 设置src/main/scala为Sources因为约定src/main/java是源码目录所以这里要再加一个
3. 打开idea的Project Structure(Settings) -> Libraries 添加scala sdk然后选择本地下载的scala 2.13的目录,确定添加进来
## 前端
前端代码在工程的ui目录下找个前端开发的ide打开进行开发即可。
## 注意
前后分离,直接启动后端如果未编译前端代码是没有前端页面的,可以先打包进行编译`sh package.sh`然后再用idea启动或者前端部分单独启动
# 页面示例
如果未启用ACL配置不会显示ACL的菜单页面所以导航栏上没有Acl这一项
### 配置集群
第一次启动打开浏览器后因为还没有配置kafka集群信息所以页面右上角可能会有错误信息比如No Cluster Info或者是没有集群信息请先切换集群之类的提示。
![集群](./document/集群.png)
![Topic](./document/Topic.png)
![消费组](./document/消费组.png)
![运维](./document/运维.png)
增加消息检索页面
![消息](./document/消息.png)
集群配置如下:
1. 点击页面上方导航栏的 [运维] 菜单
2. 点击集群管理下的 [集群切换] 按钮
3. 在弹框里点击 [新增集群]
4. 然后输入kafka集群地址和一个名称随便起个名字
5. 点击提交便增加成功了
6. 增加成功可以看到会话框已经有这个集群信息,然后点击右侧的 [切换] 按钮,便切换该集群为当前集群
后续如果再增加其它集群,就可以按上面这个流程,如果想切换到哪个集群,点击切换按钮,便会切换到对应的集群,页面的右上角会显示当前是使用的哪个集群,如果不确定,可以刷新下页面。
在新增集群的时候除了集群地址还可以输入集群的其它属性配置比如请求超时ACL配置等。如果开启了ACL切换到该集群的时候导航栏上便会出现ACL菜单支持进行相关操作目前是基于SASL_SCRAM认证授权管理支持的最完善其它的我也没验证过虽然是我开发的但是我也没具体全部验证这一块功能授权部分应该是通用的
## kafka版本
* 当前使用的kafka 2.8.0
## 监控
仅提供运维管理功能监控、告警需要配合其它组件如有需要建议请查看https://blog.csdn.net/x763795151/article/details/119705372
## 源码打包
如果想通过源码打包,查看:[源码打包说明](./document/package/源码打包.md)
## 本地开发
如果需要本地开发,开发环境配置查看:[本地开发](./document/develop/开发配置.md)

View File

@@ -4,7 +4,7 @@
xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2 http://maven.apache.org/xsd/assembly-1.1.2.xsd">
<id>rocketmq-reput</id>
<formats>
<format>tar.gz</format>
<!-- <format>tar.gz</format>-->
<format>zip</format>
</formats>

View File

Binary file not shown.

Before

Width:  |  Height:  |  Size: 42 KiB

36
document/acl/Acl.md Normal file
View File

@@ -0,0 +1,36 @@
# Acl配置启用说明
## 前言
可能有的同学是看了这篇文章来的:[如何通过可视化方式快捷管理kafka的acl配置](https://blog.csdn.net/x763795151/article/details/120200119)
这篇文章里可能说了是通过修改配置文件application.yml的方式来启用ACL示例如下
```yaml
kafka:
config:
# kafka broker地址多个以逗号分隔
bootstrap-server: 'localhost:9092'
# 服务端是否启用acl如果不启用下面的几项都忽略即可
enable-acl: true
# 只支持2种安全协议SASL_PLAINTEXT和PLAINTEXT启用acl则设置为SASL_PLAINTEXT不启用acl不需关心这个配置
security-protocol: SASL_PLAINTEXT
sasl-mechanism: SCRAM-SHA-256
# 超级管理员用户名在broker上已经配置为超级管理员
admin-username: admin
# 超级管理员密码
admin-password: admin
# 启动自动创建配置的超级管理员用户
admin-create: true
# broker连接的zk地址
zookeeper-addr: localhost:2181
sasl-jaas-config: org.apache.kafka.common.security.scram.ScramLoginModule required username="${kafka.config.admin-username}" password="${kafka.config.admin-password}";
```
其中说明了kafka.config.enable-acl配置项需要为true。
注意:**现在不再支持这种方式了**
## 新版本说明
因为现在支持多集群配置,关于多集群配置,可以看主页说明的 配置集群 介绍。
所以这里把这些额外的配置项都去掉了。
如果启用了ACL在页面上新增集群的时候在属性里配置集群的ACL相关信息如下![新增集群](./新增集群.png)
如果控制台检测到属性里有ACL相关属性配置切换到这个集群后ACL菜单会自动出现的。
注意只支持SASL。

View File

Binary file not shown.

After

Width:  |  Height:  |  Size: 245 KiB

View File

@@ -0,0 +1,11 @@
# 集群迁移
可能是有的同学看了我以前发的解决云上、云下集群迁移的方案,好奇看到了这里。
当时发的文章链接在这里:[kafka新老集群平滑迁移实践](https://blog.csdn.net/x763795151/article/details/121070563)
不过考虑到这些功能涉及到业务属性,已经在新的版本中都去掉了。
当前主分支及日后版本不再提供消息同步、集群迁移的解决方案如有需要可以使用single-data-sync分支的代码或者历史发布的v1.0.2(直接下载) [kafka-console-ui.zip](https://github.com/xxd763795151/kafka-console-ui/releases/download/v1.0.2/kafka-console-ui.zip) 的安装包。
v1.0.2版本及其之前的版本只支持单集群配置但是对于SASL_SCRAM认证授权管理功能相当完善。
后续版本会支持多集群管理并将v1.0.2之前的部分功能去掉或优化,目的是做为一个足够轻量的管理工具,不再涉及其它属性。

View File

@@ -0,0 +1,31 @@
# 本地开发配置说明
## 技术栈
* spring boot
* java、scala
* kafka
* h2
* vue
## 开发环境
* jdk 8
* idea
* scala 2.13
* maven >=3.6+
* webstorm
除了webstorm是开发前端的ide可以根据自己需要代替jdk scala是必须有的。
scala 2.13下载地址在这个页面最下面https://www.scala-lang.org/download/scala2.html
## 克隆代码
以我自己为例开发环境里的工具准备好然后代码clone到本地。
## 后端配置
1. 用idea打开项目
2. 打开idea的Project Structure(Settings) -> Modules -> 设置src/main/scala为Sources因为约定src/main/java是源码目录所以这里要再加一个源码目录
3. 打开idea的Project Structure(Settings) -> Libraries 添加scala sdk然后选择本地下载的scala 2.13的目录确定添加进来如果使用的idea可以直接勾选也可以不用先下载到本地
## 前端
前端代码在工程的ui目录下找个前端开发的ide如web storm打开进行开发即可。
## 注意
前后分离直接启动后端工程的话src/main/resources目录下可能是没有静态文件的所以直接通过浏览器访问应该是没页面的。
可以先打包编译一下前端文件,比如执行:`sh package.sh`然后再用idea启动。或者是后端用idea启动完成后找个前端的ide 比如web storm打开工程的ui目录下的前端项目单独启动进行开发。

View File

Binary file not shown.

After

Width:  |  Height:  |  Size: 99 KiB

View File

Binary file not shown.

After

Width:  |  Height:  |  Size: 45 KiB

View File

Binary file not shown.

After

Width:  |  Height:  |  Size: 38 KiB

View File

Binary file not shown.

After

Width:  |  Height:  |  Size: 75 KiB

View File

Binary file not shown.

After

Width:  |  Height:  |  Size: 36 KiB

View File

Binary file not shown.

After

Width:  |  Height:  |  Size: 45 KiB

View File

Binary file not shown.

After

Width:  |  Height:  |  Size: 58 KiB

View File

Binary file not shown.

After

Width:  |  Height:  |  Size: 26 KiB

View File

Binary file not shown.

After

Width:  |  Height:  |  Size: 57 KiB

View File

@@ -0,0 +1,32 @@
# 菜单预览
如果未启用ACL配置不会显示ACL的菜单页面所以导航栏上没有Acl这一项
## 集群
* 展示集群列表
![集群](./img/集群.png)
* 查看或修改集群配置
![Broker配置](./img/Broker配置.png)
## Topic
* Topic列表
![Topic](./img/Topic.png)
## 消费组
* 消费组列表
![消费组](./img/消费组.png)
## 消息
* 根据时间检索或过滤消息
![消息时间查询](./img/消息时间查询.png)
* 消息详情
![消息详情](./img/消息详情.png)
## 运维
* 运维页面
![运维](./img/运维.png)
* 集群切换
![集群切换](./img/集群切换.png)

View File

@@ -0,0 +1,32 @@
# 源码打包说明
可以直接下载最新代码,进行打包,最新代码相比已经发布的安装包可能会包含最新的特性
## 环境要求
* maven 3.6+
* jdk 8
* git非必须
maven是建议版本>=3.6版本3.4+和3.5+我也没试过3.3+的版本在windows上我试了下打包有bug可能把spring boot的application.yml打不到合适的目录。
如果3.6+在mac上也不行也是上面这个问题建议用最新版本试试。
## 源码下载
```
git clone https://github.com/xxd763795151/kafka-console-ui.git
```
或者直接在页面下载源码
## 打包
我已经写了个简单的打包脚本,直接执行即可。
### Windows
```
cd kafka-console-ui
# windows执行
package.bat
```
### Linux或Mac OS
```
cd kafka-console-ui
# linux或mac执行
sh package.sh
```

View File

Binary file not shown.

Before

Width:  |  Height:  |  Size: 492 KiB

View File

Binary file not shown.

Before

Width:  |  Height:  |  Size: 400 KiB

View File

Binary file not shown.

Before

Width:  |  Height:  |  Size: 53 KiB

View File

Binary file not shown.

Before

Width:  |  Height:  |  Size: 126 KiB

View File

Binary file not shown.

Before

Width:  |  Height:  |  Size: 30 KiB

View File

@@ -10,7 +10,7 @@
</parent>
<groupId>com.xuxd</groupId>
<artifactId>kafka-console-ui</artifactId>
<version>1.0.2</version>
<version>1.0.4</version>
<name>kafka-console-ui</name>
<description>Kafka console manage ui</description>
<properties>

View File

@@ -3,11 +3,13 @@ package com.xuxd.kafka.console;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.web.servlet.ServletComponentScan;
import org.springframework.scheduling.annotation.EnableScheduling;
@MapperScan("com.xuxd.kafka.console.dao")
@SpringBootApplication
@EnableScheduling
@ServletComponentScan
public class KafkaConsoleUiApplication {
public static void main(String[] args) {

View File

@@ -8,7 +8,7 @@ import org.apache.kafka.common.Node;
* @author xuxd
* @date 2021-10-08 14:03:21
**/
public class BrokerNode {
public class BrokerNode implements Comparable{
private int id;
@@ -80,4 +80,8 @@ public class BrokerNode {
public void setController(boolean controller) {
isController = controller;
}
@Override public int compareTo(Object o) {
return this.id - ((BrokerNode)o).id;
}
}

View File

@@ -0,0 +1,73 @@
package com.xuxd.kafka.console.beans;
import com.xuxd.kafka.console.beans.enums.FilterType;
import org.apache.kafka.common.serialization.Deserializer;
/**
* kafka-console-ui.
*
* @author xuxd
* @date 2021-12-29 15:30:08
**/
public class MessageFilter {
private FilterType filterType = FilterType.NONE;
private Object searchContent = null;
private String headerKey = null;
private String headerValue = null;
private Deserializer deserializer = null;
private boolean isContainsValue = false;
public FilterType getFilterType() {
return filterType;
}
public void setFilterType(FilterType filterType) {
this.filterType = filterType;
}
public Object getSearchContent() {
return searchContent;
}
public void setSearchContent(Object searchContent) {
this.searchContent = searchContent;
}
public String getHeaderKey() {
return headerKey;
}
public void setHeaderKey(String headerKey) {
this.headerKey = headerKey;
}
public String getHeaderValue() {
return headerValue;
}
public void setHeaderValue(String headerValue) {
this.headerValue = headerValue;
}
public Deserializer getDeserializer() {
return deserializer;
}
public void setDeserializer(Deserializer deserializer) {
this.deserializer = deserializer;
}
public boolean isContainsValue() {
return isContainsValue;
}
public void setContainsValue(boolean containsValue) {
isContainsValue = containsValue;
}
}

View File

@@ -1,5 +1,6 @@
package com.xuxd.kafka.console.beans;
import com.xuxd.kafka.console.beans.enums.FilterType;
import lombok.Data;
/**
@@ -24,4 +25,12 @@ public class QueryMessage {
private String keyDeserializer;
private String valueDeserializer;
private FilterType filter;
private String value;
private String headerKey;
private String headerValue;
}

View File

@@ -21,7 +21,7 @@ public class TopicPartition implements Comparable {
}
TopicPartition other = (TopicPartition) o;
if (!this.topic.equals(other.getTopic())) {
return this.compareTo(other);
return this.topic.compareTo(other.topic);
}
return this.partition - other.partition;

View File

@@ -0,0 +1,28 @@
package com.xuxd.kafka.console.beans.dos;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;
/**
* kafka-console-ui.
*
* @author xuxd
* @date 2021-12-31 09:54:24
**/
@Data
@TableName("t_cluster_info")
public class ClusterInfoDO {
@TableId(type = IdType.AUTO)
private Long id;
private String clusterName;
private String address;
private String properties;
private String updateTime;
}

View File

@@ -23,4 +23,6 @@ public class KafkaUserDO {
private String password;
private String updateTime;
private Long clusterInfoId;
}

View File

@@ -0,0 +1,38 @@
package com.xuxd.kafka.console.beans.dto;
import com.xuxd.kafka.console.beans.dos.ClusterInfoDO;
import com.xuxd.kafka.console.utils.ConvertUtil;
import lombok.Data;
import org.apache.commons.lang3.StringUtils;
/**
* kafka-console-ui.
*
* @author xuxd
* @date 2022-01-04 20:19:03
**/
@Data
public class ClusterInfoDTO {
private Long id;
private String clusterName;
private String address;
private String properties;
private String updateTime;
public ClusterInfoDO to() {
ClusterInfoDO infoDO = new ClusterInfoDO();
infoDO.setId(id);
infoDO.setClusterName(clusterName);
infoDO.setAddress(address);
if (StringUtils.isNotBlank(properties)) {
infoDO.setProperties(ConvertUtil.propertiesStr2JsonStr(properties));
}
return infoDO;
}
}

View File

@@ -0,0 +1,18 @@
package com.xuxd.kafka.console.beans.dto;
import java.util.List;
import lombok.Data;
/**
* kafka-console-ui.
*
* @author xuxd
* @date 2022-02-15 19:08:13
**/
@Data
public class ProposedAssignmentDTO {
private String topic;
private List<Integer> brokers;
}

View File

@@ -1,8 +1,10 @@
package com.xuxd.kafka.console.beans.dto;
import com.xuxd.kafka.console.beans.QueryMessage;
import com.xuxd.kafka.console.beans.enums.FilterType;
import java.util.Date;
import lombok.Data;
import org.apache.commons.lang3.StringUtils;
/**
* kafka-console-ui.
@@ -27,6 +29,14 @@ public class QueryMessageDTO {
private String valueDeserializer;
private String filter;
private String value;
private String headerKey;
private String headerValue;
public QueryMessage toQueryMessage() {
QueryMessage queryMessage = new QueryMessage();
queryMessage.setTopic(topic);
@@ -45,6 +55,21 @@ public class QueryMessageDTO {
queryMessage.setKeyDeserializer(keyDeserializer);
queryMessage.setValueDeserializer(valueDeserializer);
if (StringUtils.isNotBlank(filter)) {
queryMessage.setFilter(FilterType.valueOf(filter.toUpperCase()));
} else {
queryMessage.setFilter(FilterType.NONE);
}
if (StringUtils.isNotBlank(value)) {
queryMessage.setValue(value.trim());
}
if (StringUtils.isNotBlank(headerKey)) {
queryMessage.setHeaderKey(headerKey.trim());
}
if (StringUtils.isNotBlank(headerValue)) {
queryMessage.setHeaderValue(headerValue.trim());
}
return queryMessage;
}
}

View File

@@ -0,0 +1,11 @@
package com.xuxd.kafka.console.beans.enums;
/**
* kafka-console-ui.
*
* @author xuxd
* @date 2021-12-29 14:36:01
**/
public enum FilterType {
NONE, BODY, HEADER
}

View File

@@ -0,0 +1,24 @@
package com.xuxd.kafka.console.beans.vo;
import java.util.List;
import lombok.Data;
/**
* kafka-console-ui.
*
* @author xuxd
* @date 2022-01-22 16:24:58
**/
@Data
public class BrokerApiVersionVO {
private int brokerId;
private String host;
private int supportNums;
private int unSupportNums;
private List<String> versionInfo;
}

View File

@@ -0,0 +1,40 @@
package com.xuxd.kafka.console.beans.vo;
import com.xuxd.kafka.console.beans.dos.ClusterInfoDO;
import com.xuxd.kafka.console.utils.ConvertUtil;
import java.util.List;
import lombok.Data;
import org.apache.commons.lang3.StringUtils;
/**
* kafka-console-ui.
*
* @author xuxd
* @date 2022-01-04 19:16:11
**/
@Data
public class ClusterInfoVO {
private Long id;
private String clusterName;
private String address;
private List<String> properties;
private String updateTime;
public static ClusterInfoVO from(ClusterInfoDO infoDO) {
ClusterInfoVO vo = new ClusterInfoVO();
vo.setId(infoDO.getId());
vo.setClusterName(infoDO.getClusterName());
vo.setAddress(infoDO.getAddress());
vo.setUpdateTime(infoDO.getUpdateTime());
if (StringUtils.isNotBlank(infoDO.getProperties())) {
vo.setProperties(ConvertUtil.jsonStr2List(infoDO.getProperties()));
}
return vo;
}
}

View File

@@ -0,0 +1,66 @@
package com.xuxd.kafka.console.boot;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.xuxd.kafka.console.beans.dos.ClusterInfoDO;
import com.xuxd.kafka.console.config.KafkaConfig;
import com.xuxd.kafka.console.dao.ClusterInfoMapper;
import com.xuxd.kafka.console.utils.ConvertUtil;
import java.util.List;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.beans.factory.SmartInitializingSingleton;
import org.springframework.stereotype.Component;
/**
* kafka-console-ui.
*
* @author xuxd
* @date 2021-12-30 19:16:50
**/
@Slf4j
@Component
public class Bootstrap implements SmartInitializingSingleton {
public static final String DEFAULT_CLUSTER_NAME = "default";
private final KafkaConfig config;
private final ClusterInfoMapper clusterInfoMapper;
public Bootstrap(KafkaConfig config, ObjectProvider<ClusterInfoMapper> clusterInfoMapper) {
this.config = config;
this.clusterInfoMapper = clusterInfoMapper.getIfAvailable();
}
private void initialize() {
loadDefaultClusterConfig();
}
private void loadDefaultClusterConfig() {
log.info("load default kafka config.");
if (StringUtils.isBlank(config.getBootstrapServer())) {
return;
}
QueryWrapper<ClusterInfoDO> clusterInfoDOQueryWrapper = new QueryWrapper<>();
clusterInfoDOQueryWrapper.eq("cluster_name", DEFAULT_CLUSTER_NAME);
List<Object> objects = clusterInfoMapper.selectObjs(clusterInfoDOQueryWrapper);
if (CollectionUtils.isNotEmpty(objects)) {
log.warn("default kafka cluster config has existed[any of cluster name or address].");
return;
}
ClusterInfoDO infoDO = new ClusterInfoDO();
infoDO.setClusterName(DEFAULT_CLUSTER_NAME);
infoDO.setAddress(config.getBootstrapServer().trim());
infoDO.setProperties(ConvertUtil.toJsonString(config.getProperties()));
clusterInfoMapper.insert(infoDO);
log.info("Insert default config: {}", infoDO);
}
@Override public void afterSingletonsInstantiated() {
initialize();
}
}

View File

@@ -0,0 +1,74 @@
package com.xuxd.kafka.console.config;
import java.util.Properties;
import org.apache.kafka.clients.CommonClientConfigs;
/**
* kafka-console-ui.
*
* @author xuxd
* @date 2021-12-30 15:46:55
**/
public class ContextConfig {
public static final int DEFAULT_REQUEST_TIMEOUT_MS = 5000;
private Long clusterInfoId;
private String clusterName;
private String bootstrapServer;
private int requestTimeoutMs = DEFAULT_REQUEST_TIMEOUT_MS;
private Properties properties = new Properties();
public String getBootstrapServer() {
return bootstrapServer;
}
public void setBootstrapServer(String bootstrapServer) {
this.bootstrapServer = bootstrapServer;
}
public int getRequestTimeoutMs() {
return properties.containsKey(CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG) ?
Integer.parseInt(properties.getProperty(CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG)) : requestTimeoutMs;
}
public void setRequestTimeoutMs(int requestTimeoutMs) {
this.requestTimeoutMs = requestTimeoutMs;
}
public Properties getProperties() {
return properties;
}
public Long getClusterInfoId() {
return clusterInfoId;
}
public void setClusterInfoId(Long clusterInfoId) {
this.clusterInfoId = clusterInfoId;
}
public String getClusterName() {
return clusterName;
}
public void setClusterName(String clusterName) {
this.clusterName = clusterName;
}
public void setProperties(Properties properties) {
this.properties = properties;
}
@Override public String toString() {
return "KafkaContextConfig{" +
"bootstrapServer='" + bootstrapServer + '\'' +
", requestTimeoutMs=" + requestTimeoutMs +
", properties=" + properties +
'}';
}
}

View File

@@ -0,0 +1,12 @@
package com.xuxd.kafka.console.config;
/**
* kafka-console-ui.
*
* @author xuxd
* @date 2021-12-30 18:55:28
**/
public class ContextConfigHolder {
public static final ThreadLocal<ContextConfig> CONTEXT_CONFIG = new ThreadLocal<>();
}

View File

@@ -1,5 +1,6 @@
package com.xuxd.kafka.console.config;
import java.util.Properties;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;
@@ -15,23 +16,9 @@ public class KafkaConfig {
private String bootstrapServer;
private int requestTimeoutMs;
private String securityProtocol;
private String saslMechanism;
private String saslJaasConfig;
private String adminUsername;
private String adminPassword;
private boolean adminCreate;
private String zookeeperAddr;
private boolean enableAcl;
private Properties properties;
public String getBootstrapServer() {
return bootstrapServer;
@@ -41,62 +28,6 @@ public class KafkaConfig {
this.bootstrapServer = bootstrapServer;
}
public int getRequestTimeoutMs() {
return requestTimeoutMs;
}
public void setRequestTimeoutMs(int requestTimeoutMs) {
this.requestTimeoutMs = requestTimeoutMs;
}
public String getSecurityProtocol() {
return securityProtocol;
}
public void setSecurityProtocol(String securityProtocol) {
this.securityProtocol = securityProtocol;
}
public String getSaslMechanism() {
return saslMechanism;
}
public void setSaslMechanism(String saslMechanism) {
this.saslMechanism = saslMechanism;
}
public String getSaslJaasConfig() {
return saslJaasConfig;
}
public void setSaslJaasConfig(String saslJaasConfig) {
this.saslJaasConfig = saslJaasConfig;
}
public String getAdminUsername() {
return adminUsername;
}
public void setAdminUsername(String adminUsername) {
this.adminUsername = adminUsername;
}
public String getAdminPassword() {
return adminPassword;
}
public void setAdminPassword(String adminPassword) {
this.adminPassword = adminPassword;
}
public boolean isAdminCreate() {
return adminCreate;
}
public void setAdminCreate(boolean adminCreate) {
this.adminCreate = adminCreate;
}
public String getZookeeperAddr() {
return zookeeperAddr;
}
@@ -105,11 +36,11 @@ public class KafkaConfig {
this.zookeeperAddr = zookeeperAddr;
}
public boolean isEnableAcl() {
return enableAcl;
public Properties getProperties() {
return properties;
}
public void setEnableAcl(boolean enableAcl) {
this.enableAcl = enableAcl;
public void setProperties(Properties properties) {
this.properties = properties;
}
}

View File

@@ -1,8 +1,13 @@
package com.xuxd.kafka.console.controller;
import com.xuxd.kafka.console.beans.dto.ClusterInfoDTO;
import com.xuxd.kafka.console.service.ClusterService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.DeleteMapping;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.PutMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@@ -23,4 +28,34 @@ public class ClusterController {
public Object getClusterInfo() {
return clusterService.getClusterInfo();
}
@GetMapping("/info")
public Object getClusterInfoList() {
return clusterService.getClusterInfoList();
}
@PostMapping("/info")
public Object addClusterInfo(@RequestBody ClusterInfoDTO dto) {
return clusterService.addClusterInfo(dto.to());
}
@DeleteMapping("/info")
public Object deleteClusterInfo(@RequestBody ClusterInfoDTO dto) {
return clusterService.deleteClusterInfo(dto.getId());
}
@PutMapping("/info")
public Object updateClusterInfo(@RequestBody ClusterInfoDTO dto) {
return clusterService.updateClusterInfo(dto.to());
}
@GetMapping("/info/peek")
public Object peekClusterInfo() {
return clusterService.peekClusterInfo();
}
@GetMapping("/info/api/version")
public Object getBrokerApiVersionInfo() {
return clusterService.getBrokerApiVersionInfo();
}
}

View File

@@ -36,7 +36,7 @@ public class ConfigController {
this.configService = configService;
}
@GetMapping
@GetMapping("/console")
public Object getConfig() {
return ResponseData.create().data(configMap).success();
}

View File

@@ -2,6 +2,7 @@ package com.xuxd.kafka.console.controller;
import com.xuxd.kafka.console.beans.TopicPartition;
import com.xuxd.kafka.console.beans.dto.BrokerThrottleDTO;
import com.xuxd.kafka.console.beans.dto.ProposedAssignmentDTO;
import com.xuxd.kafka.console.beans.dto.ReplicationDTO;
import com.xuxd.kafka.console.beans.dto.SyncDataDTO;
import com.xuxd.kafka.console.service.OperationService;
@@ -74,4 +75,9 @@ public class OperationController {
public Object cancelReassignment(@RequestBody TopicPartition partition) {
return operationService.cancelReassignment(new org.apache.kafka.common.TopicPartition(partition.getTopic(), partition.getPartition()));
}
@PostMapping("/replication/reassignments/proposed")
public Object proposedAssignments(@RequestBody ProposedAssignmentDTO dto) {
return operationService.proposedAssignments(dto.getTopic(), dto.getBrokers());
}
}

View File

@@ -0,0 +1,13 @@
package com.xuxd.kafka.console.dao;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.xuxd.kafka.console.beans.dos.ClusterInfoDO;
/**
* kafka-console-ui.
*
* @author xuxd
* @date 2021-12-31 09:58:52
**/
public interface ClusterInfoMapper extends BaseMapper<ClusterInfoDO> {
}

View File

@@ -0,0 +1,87 @@
package com.xuxd.kafka.console.interceptor;
import com.xuxd.kafka.console.beans.ResponseData;
import com.xuxd.kafka.console.beans.dos.ClusterInfoDO;
import com.xuxd.kafka.console.config.ContextConfig;
import com.xuxd.kafka.console.config.ContextConfigHolder;
import com.xuxd.kafka.console.dao.ClusterInfoMapper;
import com.xuxd.kafka.console.utils.ConvertUtil;
import java.io.IOException;
import java.util.HashSet;
import java.util.Set;
import javax.servlet.Filter;
import javax.servlet.FilterChain;
import javax.servlet.ServletException;
import javax.servlet.ServletRequest;
import javax.servlet.ServletResponse;
import javax.servlet.annotation.WebFilter;
import javax.servlet.http.HttpServletRequest;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.MediaType;
/**
* kafka-console-ui.
*
* @author xuxd
* @date 2022-01-05 19:56:25
**/
@WebFilter(filterName = "context-set-filter", urlPatterns = {"/acl/*","/user/*","/cluster/*","/config/*","/consumer/*","/message/*","/topic/*","/op/*"})
@Slf4j
public class ContextSetFilter implements Filter {
private Set<String> excludes = new HashSet<>();
{
excludes.add("/cluster/info/peek");
excludes.add("/cluster/info");
excludes.add("/config/console");
}
@Autowired
private ClusterInfoMapper clusterInfoMapper;
@Override public void doFilter(ServletRequest req, ServletResponse response,
FilterChain chain) throws IOException, ServletException {
try {
HttpServletRequest request = (HttpServletRequest) req;
String uri = request.getRequestURI();
if (!excludes.contains(uri)) {
String headerId = request.getHeader(Header.ID);
if (StringUtils.isBlank(headerId)) {
// ResponseData failed = ResponseData.create().failed("Cluster info is null.");
ResponseData failed = ResponseData.create().failed("没有集群信息,请先切换集群");
response.setContentType(MediaType.APPLICATION_JSON_UTF8_VALUE);
response.getWriter().println(ConvertUtil.toJsonString(failed));
return;
} else {
ClusterInfoDO infoDO = clusterInfoMapper.selectById(Long.valueOf(headerId));
if (infoDO == null) {
ResponseData failed = ResponseData.create().failed("该集群找不到信息,请切换一个有效集群");
response.setContentType(MediaType.APPLICATION_JSON_UTF8_VALUE);
response.getWriter().println(ConvertUtil.toJsonString(failed));
return;
}
ContextConfig config = new ContextConfig();
config.setClusterInfoId(infoDO.getId());
config.setClusterName(infoDO.getClusterName());
config.setBootstrapServer(infoDO.getAddress());
if (StringUtils.isNotBlank(infoDO.getProperties())) {
config.setProperties(ConvertUtil.toProperties(infoDO.getProperties()));
}
ContextConfigHolder.CONTEXT_CONFIG.set(config);
}
}
chain.doFilter(req, response);
} finally {
ContextConfigHolder.CONTEXT_CONFIG.remove();
}
}
interface Header {
String ID = "X-Cluster-Info-Id";
String NAME = "X-Cluster-Info-Name";
}
}

View File

@@ -1,9 +1,22 @@
package com.xuxd.kafka.console.schedule;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.xuxd.kafka.console.beans.dos.ClusterInfoDO;
import com.xuxd.kafka.console.beans.dos.KafkaUserDO;
import com.xuxd.kafka.console.config.ContextConfig;
import com.xuxd.kafka.console.config.ContextConfigHolder;
import com.xuxd.kafka.console.dao.ClusterInfoMapper;
import com.xuxd.kafka.console.dao.KafkaUserMapper;
import com.xuxd.kafka.console.utils.ConvertUtil;
import com.xuxd.kafka.console.utils.SaslUtil;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import kafka.console.KafkaConfigConsole;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
@@ -21,25 +34,58 @@ public class KafkaAclSchedule {
private final KafkaConfigConsole configConsole;
public KafkaAclSchedule(KafkaUserMapper userMapper, KafkaConfigConsole configConsole) {
this.userMapper = userMapper;
this.configConsole = configConsole;
private final ClusterInfoMapper clusterInfoMapper;
public KafkaAclSchedule(ObjectProvider<KafkaUserMapper> userMapper,
ObjectProvider<KafkaConfigConsole> configConsole, ObjectProvider<ClusterInfoMapper> clusterInfoMapper) {
this.userMapper = userMapper.getIfAvailable();
this.configConsole = configConsole.getIfAvailable();
this.clusterInfoMapper = clusterInfoMapper.getIfAvailable();
}
@Scheduled(cron = "${cron.clear-dirty-user}")
public void clearDirtyKafkaUser() {
log.info("Start clear dirty data for kafka user from database.");
Set<String> userSet = configConsole.getUserList(null);
userMapper.selectList(null).forEach(u -> {
if (!userSet.contains(u.getUsername())) {
log.info("clear user: {} from database.", u.getUsername());
try {
userMapper.deleteById(u.getId());
} catch (Exception e) {
log.error("userMapper.deleteById error, user: " + u, e);
try {
log.info("Start clear dirty data for kafka user from database.");
List<ClusterInfoDO> clusterInfoDOS = clusterInfoMapper.selectList(null);
List<Long> clusterInfoIds = new ArrayList<>();
for (ClusterInfoDO infoDO : clusterInfoDOS) {
ContextConfig config = new ContextConfig();
config.setClusterInfoId(infoDO.getId());
config.setClusterName(infoDO.getClusterName());
config.setBootstrapServer(infoDO.getAddress());
if (StringUtils.isNotBlank(infoDO.getProperties())) {
config.setProperties(ConvertUtil.toProperties(infoDO.getProperties()));
}
ContextConfigHolder.CONTEXT_CONFIG.set(config);
if (SaslUtil.isEnableSasl() && SaslUtil.isEnableScram()) {
log.info("Start clear cluster: {}", infoDO.getClusterName());
Set<String> userSet = configConsole.getUserList(null);
QueryWrapper<KafkaUserDO> queryWrapper = new QueryWrapper<>();
queryWrapper.eq("cluster_info_id", infoDO.getId());
userMapper.selectList(queryWrapper).forEach(u -> {
if (!userSet.contains(u.getUsername())) {
log.info("clear user: {} from database.", u.getUsername());
try {
userMapper.deleteById(u.getId());
} catch (Exception e) {
log.error("userMapper.deleteById error, user: " + u, e);
}
}
});
clusterInfoIds.add(infoDO.getId());
}
}
});
log.info("Clear end.");
if (CollectionUtils.isNotEmpty(clusterInfoIds)) {
log.info("Clear the cluster id {}, which not found.", clusterInfoIds);
QueryWrapper<KafkaUserDO> wrapper = new QueryWrapper<>();
wrapper.notIn("cluster_info_id", clusterInfoIds);
userMapper.delete(wrapper);
}
log.info("Clear end.");
} finally {
ContextConfigHolder.CONTEXT_CONFIG.remove();
}
}
}

View File

@@ -0,0 +1,10 @@
package com.xuxd.kafka.console.service;
/**
* kafka-console-ui.
*
* @author xuxd
* @date 2021-12-31 11:42:43
**/
public interface ClusterInfoService {
}

View File

@@ -1,6 +1,7 @@
package com.xuxd.kafka.console.service;
import com.xuxd.kafka.console.beans.ResponseData;
import com.xuxd.kafka.console.beans.dos.ClusterInfoDO;
/**
* kafka-console-ui.
@@ -10,4 +11,16 @@ import com.xuxd.kafka.console.beans.ResponseData;
**/
public interface ClusterService {
ResponseData getClusterInfo();
ResponseData getClusterInfoList();
ResponseData addClusterInfo(ClusterInfoDO infoDO);
ResponseData deleteClusterInfo(Long id);
ResponseData updateClusterInfo(ClusterInfoDO infoDO);
ResponseData peekClusterInfo();
ResponseData getBrokerApiVersionInfo();
}

View File

@@ -30,4 +30,6 @@ public interface OperationService {
ResponseData currentReassignments();
ResponseData cancelReassignment(TopicPartition partition);
ResponseData proposedAssignments(String topic, List<Integer> brokerList);
}

View File

@@ -6,29 +6,37 @@ import com.xuxd.kafka.console.beans.CounterMap;
import com.xuxd.kafka.console.beans.ResponseData;
import com.xuxd.kafka.console.beans.dos.KafkaUserDO;
import com.xuxd.kafka.console.beans.vo.KafkaUserDetailVO;
import com.xuxd.kafka.console.config.KafkaConfig;
import com.xuxd.kafka.console.config.ContextConfigHolder;
import com.xuxd.kafka.console.dao.KafkaUserMapper;
import com.xuxd.kafka.console.service.AclService;
import com.xuxd.kafka.console.utils.SaslUtil;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.stream.Collectors;
import kafka.console.KafkaAclConsole;
import kafka.console.KafkaConfigConsole;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.admin.ScramMechanism;
import org.apache.kafka.clients.admin.UserScramCredentialsDescription;
import org.apache.kafka.common.acl.AclBinding;
import org.apache.kafka.common.acl.AclOperation;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.beans.factory.SmartInitializingSingleton;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import scala.Tuple2;
import static com.xuxd.kafka.console.utils.SaslUtil.isEnableSasl;
import static com.xuxd.kafka.console.utils.SaslUtil.isEnableScram;
/**
* kafka-console-ui.
*
@@ -37,7 +45,7 @@ import scala.Tuple2;
**/
@Slf4j
@Service
public class AclServiceImpl implements AclService, SmartInitializingSingleton {
public class AclServiceImpl implements AclService {
@Autowired
private KafkaConfigConsole configConsole;
@@ -45,9 +53,6 @@ public class AclServiceImpl implements AclService, SmartInitializingSingleton {
@Autowired
private KafkaAclConsole aclConsole;
@Autowired
private KafkaConfig kafkaConfig;
private final KafkaUserMapper kafkaUserMapper;
public AclServiceImpl(ObjectProvider<KafkaUserMapper> kafkaUserMapper) {
@@ -64,15 +69,23 @@ public class AclServiceImpl implements AclService, SmartInitializingSingleton {
}
@Override public ResponseData addOrUpdateUser(String name, String pass) {
if (!isEnableSasl()) {
return ResponseData.create().failed("Only support SASL protocol.");
}
if (!isEnableScram()) {
return ResponseData.create().failed("Only support SASL_SCRAM.");
}
log.info("add or update user, username: {}, password: {}", name, pass);
if (!configConsole.addOrUpdateUser(name, pass)) {
Tuple2<Object, String> tuple2 = configConsole.addOrUpdateUser(name, pass);
if (!(boolean) tuple2._1()) {
log.error("add user to kafka failed.");
return ResponseData.create().failed("add user to kafka failed");
return ResponseData.create().failed("add user to kafka failed: " + tuple2._2());
}
// save user info to database.
KafkaUserDO userDO = new KafkaUserDO();
userDO.setUsername(name);
userDO.setPassword(pass);
userDO.setClusterInfoId(ContextConfigHolder.CONTEXT_CONFIG.get().getClusterInfoId());
try {
Map<String, Object> map = new HashMap<>();
map.put("username", name);
@@ -86,12 +99,24 @@ public class AclServiceImpl implements AclService, SmartInitializingSingleton {
}
@Override public ResponseData deleteUser(String name) {
if (!isEnableSasl()) {
return ResponseData.create().failed("Only support SASL protocol.");
}
if (!isEnableScram()) {
return ResponseData.create().failed("Only support SASL_SCRAM.");
}
log.info("delete user: {}", name);
Tuple2<Object, String> tuple2 = configConsole.deleteUser(name);
return (boolean) tuple2._1() ? ResponseData.create().success() : ResponseData.create().failed(tuple2._2());
}
@Override public ResponseData deleteUserAndAuth(String name) {
if (!isEnableSasl()) {
return ResponseData.create().failed("Only support SASL protocol.");
}
if (!isEnableScram()) {
return ResponseData.create().failed("Only support SASL_SCRAM.");
}
log.info("delete user and authority: {}", name);
AclEntry entry = new AclEntry();
entry.setPrincipal(name);
@@ -120,7 +145,8 @@ public class AclServiceImpl implements AclService, SmartInitializingSingleton {
Map<String, Object> resultMap = new HashMap<>();
entryMap.forEach((k, v) -> {
Map<String, List<AclEntry>> map = v.stream().collect(Collectors.groupingBy(e -> e.getResourceType() + "#" + e.getName()));
if (k.equals(kafkaConfig.getAdminUsername())) {
String username = SaslUtil.findUsername(ContextConfigHolder.CONTEXT_CONFIG.get().getProperties().getProperty(SaslConfigs.SASL_JAAS_CONFIG));
if (k.equals(username)) {
Map<String, Object> map2 = new HashMap<>(map);
Map<String, Object> userMap = new HashMap<>();
userMap.put("role", "admin");
@@ -133,7 +159,8 @@ public class AclServiceImpl implements AclService, SmartInitializingSingleton {
detailList.values().forEach(u -> {
if (!resultMap.containsKey(u.name()) && !u.credentialInfos().isEmpty()) {
if (!u.name().equals(kafkaConfig.getAdminUsername())) {
String username = SaslUtil.findUsername(ContextConfigHolder.CONTEXT_CONFIG.get().getProperties().getProperty(SaslConfigs.SASL_JAAS_CONFIG));
if (!u.name().equals(username)) {
resultMap.put(u.name(), Collections.emptyMap());
} else {
Map<String, Object> map2 = new HashMap<>();
@@ -194,27 +221,29 @@ public class AclServiceImpl implements AclService, SmartInitializingSingleton {
}
Map<String, Object> param = new HashMap<>();
param.put("username", username);
param.put("cluster_info_id", ContextConfigHolder.CONTEXT_CONFIG.get().getClusterInfoId());
List<KafkaUserDO> dos = kafkaUserMapper.selectByMap(param);
if (dos.isEmpty()) {
vo.setConsistencyDescription("Password is null.");
} else {
vo.setPassword(dos.stream().findFirst().get().getPassword());
// check for consistency.
boolean consistent = configConsole.isPassConsistent(username, vo.getPassword());
vo.setConsistencyDescription(consistent ? "Consistent" : "Password is not consistent.");
// boolean consistent = configConsole.isPassConsistent(username, vo.getPassword());
// vo.setConsistencyDescription(consistent ? "Consistent" : "Password is not consistent.");
vo.setConsistencyDescription("Can not check password consistent.");
}
return ResponseData.create().data(vo).success();
}
@Override public void afterSingletonsInstantiated() {
if (kafkaConfig.isEnableAcl() && kafkaConfig.isAdminCreate()) {
log.info("Start create admin user, username: {}, password: {}", kafkaConfig.getAdminUsername(), kafkaConfig.getAdminPassword());
boolean done = configConsole.addOrUpdateUserWithZK(kafkaConfig.getAdminUsername(), kafkaConfig.getAdminPassword());
if (!done) {
log.error("Create admin failed.");
throw new IllegalStateException();
}
}
}
// @Override public void afterSingletonsInstantiated() {
// if (kafkaConfig.isEnableAcl() && kafkaConfig.isAdminCreate()) {
// log.info("Start create admin user, username: {}, password: {}", kafkaConfig.getAdminUsername(), kafkaConfig.getAdminPassword());
// boolean done = configConsole.addOrUpdateUserWithZK(kafkaConfig.getAdminUsername(), kafkaConfig.getAdminPassword());
// if (!done) {
// log.error("Create admin failed.");
// throw new IllegalStateException();
// }
// }
// }
}

View File

@@ -0,0 +1,14 @@
package com.xuxd.kafka.console.service.impl;
import com.xuxd.kafka.console.service.ClusterInfoService;
import org.springframework.stereotype.Service;
/**
* kafka-console-ui.
*
* @author xuxd
* @date 2021-12-31 11:42:59
**/
@Service
public class ClusterInfoServiceImpl implements ClusterInfoService {
}

View File

@@ -1,9 +1,27 @@
package com.xuxd.kafka.console.service.impl;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.xuxd.kafka.console.beans.ClusterInfo;
import com.xuxd.kafka.console.beans.ResponseData;
import com.xuxd.kafka.console.beans.dos.ClusterInfoDO;
import com.xuxd.kafka.console.beans.vo.BrokerApiVersionVO;
import com.xuxd.kafka.console.beans.vo.ClusterInfoVO;
import com.xuxd.kafka.console.dao.ClusterInfoMapper;
import com.xuxd.kafka.console.service.ClusterService;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.TreeSet;
import java.util.stream.Collectors;
import kafka.console.ClusterConsole;
import org.springframework.beans.factory.annotation.Autowired;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.NodeApiVersions;
import org.apache.kafka.common.Node;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.stereotype.Service;
/**
@@ -15,10 +33,78 @@ import org.springframework.stereotype.Service;
@Service
public class ClusterServiceImpl implements ClusterService {
@Autowired
private ClusterConsole clusterConsole;
private final ClusterConsole clusterConsole;
private final ClusterInfoMapper clusterInfoMapper;
public ClusterServiceImpl(ObjectProvider<ClusterConsole> clusterConsole,
ObjectProvider<ClusterInfoMapper> clusterInfoMapper) {
this.clusterConsole = clusterConsole.getIfAvailable();
this.clusterInfoMapper = clusterInfoMapper.getIfAvailable();
}
@Override public ResponseData getClusterInfo() {
return ResponseData.create().data(clusterConsole.clusterInfo()).success();
ClusterInfo clusterInfo = clusterConsole.clusterInfo();
clusterInfo.setNodes(new TreeSet<>(clusterInfo.getNodes()));
return ResponseData.create().data(clusterInfo).success();
}
@Override public ResponseData getClusterInfoList() {
return ResponseData.create().data(clusterInfoMapper.selectList(null)
.stream().map(ClusterInfoVO::from).collect(Collectors.toList())).success();
}
@Override public ResponseData addClusterInfo(ClusterInfoDO infoDO) {
QueryWrapper<ClusterInfoDO> queryWrapper = new QueryWrapper<>();
queryWrapper.eq("cluster_name", infoDO.getClusterName());
if (clusterInfoMapper.selectCount(queryWrapper) > 0) {
return ResponseData.create().failed("cluster name exist.");
}
clusterInfoMapper.insert(infoDO);
return ResponseData.create().success();
}
@Override public ResponseData deleteClusterInfo(Long id) {
clusterInfoMapper.deleteById(id);
return ResponseData.create().success();
}
@Override public ResponseData updateClusterInfo(ClusterInfoDO infoDO) {
clusterInfoMapper.updateById(infoDO);
return ResponseData.create().success();
}
@Override public ResponseData peekClusterInfo() {
List<ClusterInfoDO> dos = clusterInfoMapper.selectList(null);
if (CollectionUtils.isEmpty(dos)) {
return ResponseData.create().failed("No Cluster Info.");
}
return ResponseData.create().data(dos.stream().findFirst().map(ClusterInfoVO::from)).success();
}
@Override public ResponseData getBrokerApiVersionInfo() {
HashMap<Node, NodeApiVersions> map = clusterConsole.listBrokerVersionInfo();
List<BrokerApiVersionVO> list = new ArrayList<>(map.size());
map.forEach(((node, versions) -> {
BrokerApiVersionVO vo = new BrokerApiVersionVO();
vo.setBrokerId(node.id());
vo.setHost(node.host() + ":" + node.port());
vo.setSupportNums(versions.allSupportedApiVersions().size());
String versionInfo = versions.toString(true);
int from = 0;
int count = 0;
int index = -1;
while ((index = versionInfo.indexOf("UNSUPPORTED", from)) >= 0 && from < versionInfo.length()) {
count++;
from = index + 1;
}
vo.setUnSupportNums(count);
versionInfo = versionInfo.substring(1, versionInfo.length() - 2);
vo.setVersionInfo(Arrays.asList(StringUtils.split(versionInfo, ",")));
list.add(vo);
}));
Collections.sort(list, Comparator.comparingInt(BrokerApiVersionVO::getBrokerId));
return ResponseData.create().data(list).success();
}
}

View File

@@ -1,8 +1,10 @@
package com.xuxd.kafka.console.service.impl;
import com.xuxd.kafka.console.beans.MessageFilter;
import com.xuxd.kafka.console.beans.QueryMessage;
import com.xuxd.kafka.console.beans.ResponseData;
import com.xuxd.kafka.console.beans.SendMessage;
import com.xuxd.kafka.console.beans.enums.FilterType;
import com.xuxd.kafka.console.beans.vo.ConsumerRecordVO;
import com.xuxd.kafka.console.beans.vo.MessageDetailVO;
import com.xuxd.kafka.console.service.ConsumerService;
@@ -32,6 +34,7 @@ import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.DoubleDeserializer;
import org.apache.kafka.common.serialization.FloatDeserializer;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.annotation.Autowired;
@@ -70,23 +73,83 @@ public class MessageServiceImpl implements MessageService, ApplicationContextAwa
deserializerDict.put("Float", new FloatDeserializer());
deserializerDict.put("Double", new DoubleDeserializer());
deserializerDict.put("Byte", new BytesDeserializer());
deserializerDict.put("Long", new LongDeserializer());
}
public static String defaultDeserializer = "String";
@Override public ResponseData searchByTime(QueryMessage queryMessage) {
int maxNums = 10000;
int maxNums = 5000;
Object searchContent = null;
String headerKey = null;
String headerValue = null;
MessageFilter filter = new MessageFilter();
switch (queryMessage.getFilter()) {
case BODY:
if (StringUtils.isBlank(queryMessage.getValue())) {
queryMessage.setFilter(FilterType.NONE);
} else {
if (StringUtils.isBlank(queryMessage.getValueDeserializer())) {
queryMessage.setValueDeserializer(defaultDeserializer);
}
switch (queryMessage.getValueDeserializer()) {
case "String":
searchContent = String.valueOf(queryMessage.getValue());
filter.setContainsValue(true);
break;
case "Integer":
searchContent = Integer.valueOf(queryMessage.getValue());
break;
case "Float":
searchContent = Float.valueOf(queryMessage.getValue());
break;
case "Double":
searchContent = Double.valueOf(queryMessage.getValue());
break;
case "Long":
searchContent = Long.valueOf(queryMessage.getValue());
break;
default:
throw new IllegalArgumentException("Message body type not support.");
}
}
break;
case HEADER:
headerKey = queryMessage.getHeaderKey();
if (StringUtils.isBlank(headerKey)) {
queryMessage.setFilter(FilterType.NONE);
} else {
if (StringUtils.isNotBlank(queryMessage.getHeaderValue())) {
headerValue = String.valueOf(queryMessage.getHeaderValue());
}
}
break;
default:
break;
}
FilterType filterType = queryMessage.getFilter();
Deserializer deserializer = deserializerDict.get(queryMessage.getValueDeserializer());
filter.setFilterType(filterType);
filter.setSearchContent(searchContent);
filter.setDeserializer(deserializer);
filter.setHeaderKey(headerKey);
filter.setHeaderValue(headerValue);
Set<TopicPartition> partitions = getPartitions(queryMessage);
long startTime = System.currentTimeMillis();
List<ConsumerRecord<byte[], byte[]>> records = messageConsole.searchBy(partitions, queryMessage.getStartTime(), queryMessage.getEndTime(), maxNums);
Tuple2<List<ConsumerRecord<byte[], byte[]>>, Object> tuple2 = messageConsole.searchBy(partitions, queryMessage.getStartTime(), queryMessage.getEndTime(), maxNums, filter);
List<ConsumerRecord<byte[], byte[]>> records = tuple2._1();
log.info("search message by time, cost time: {}", (System.currentTimeMillis() - startTime));
List<ConsumerRecordVO> vos = records.stream().filter(record -> record.timestamp() <= queryMessage.getEndTime())
.map(ConsumerRecordVO::fromConsumerRecord).collect(Collectors.toList());
Map<String, Object> res = new HashMap<>();
vos = vos.subList(0, Math.min(maxNums, vos.size()));
res.put("maxNum", maxNums);
res.put("realNum", vos.size());
res.put("data", vos.subList(0, Math.min(maxNums, vos.size())));
res.put("searchNum", tuple2._2());
res.put("data", vos);
return ResponseData.create().data(res).success();
}

View File

@@ -1,6 +1,7 @@
package com.xuxd.kafka.console.service.impl;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.google.common.collect.Lists;
import com.google.gson.Gson;
import com.google.gson.JsonObject;
import com.xuxd.kafka.console.beans.ResponseData;
@@ -10,6 +11,7 @@ import com.xuxd.kafka.console.beans.vo.OffsetAlignmentVO;
import com.xuxd.kafka.console.dao.MinOffsetAlignmentMapper;
import com.xuxd.kafka.console.service.OperationService;
import com.xuxd.kafka.console.utils.GsonUtil;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@@ -19,6 +21,7 @@ import java.util.Properties;
import java.util.Set;
import java.util.stream.Collectors;
import kafka.console.OperationConsole;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.admin.PartitionReassignment;
import org.apache.kafka.common.TopicPartition;
import org.springframework.beans.factory.ObjectProvider;
@@ -162,4 +165,21 @@ public class OperationServiceImpl implements OperationService {
}
return ResponseData.create().success();
}
@Override public ResponseData proposedAssignments(String topic, List<Integer> brokerList) {
Map<String, Object> params = new HashMap<>();
params.put("version", 1);
Map<String, String> topicMap = new HashMap<>(1, 1.0f);
topicMap.put("topic", topic);
params.put("topics", Lists.newArrayList(topicMap));
List<String> list = brokerList.stream().map(String::valueOf).collect(Collectors.toList());
Map<TopicPartition, List<Object>> assignments = operationConsole.proposedAssignments(gson.toJson(params), StringUtils.join(list, ","));
List<CurrentReassignmentVO> res = new ArrayList<>(assignments.size());
assignments.forEach((tp, replicas) -> {
CurrentReassignmentVO vo = new CurrentReassignmentVO(tp.topic(), tp.partition(),
replicas.stream().map(x -> (Integer) x).collect(Collectors.toList()), null, null);
res.add(vo);
});
return ResponseData.create().data(res).success();
}
}

View File

@@ -1,9 +1,15 @@
package com.xuxd.kafka.console.utils;
import com.google.common.base.Preconditions;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import lombok.extern.slf4j.Slf4j;
import org.springframework.util.ClassUtils;
@@ -35,6 +41,52 @@ public class ConvertUtil {
}
});
}
Iterator<Map.Entry<String, Object>> iterator = res.entrySet().iterator();
while (iterator.hasNext()) {
if (iterator.next().getValue() == null) {
iterator.remove();
}
}
return res;
}
public static String toJsonString(Object src) {
return GsonUtil.INSTANCE.get().toJson(src);
}
public static Properties toProperties(String jsonStr) {
return GsonUtil.INSTANCE.get().fromJson(jsonStr, Properties.class);
}
public static String jsonStr2PropertiesStr(String jsonStr) {
StringBuilder sb = new StringBuilder();
Map<String, Object> map = GsonUtil.INSTANCE.get().fromJson(jsonStr, Map.class);
map.keySet().forEach(k -> {
sb.append(k).append("=").append(map.get(k).toString()).append(System.lineSeparator());
});
return sb.toString();
}
public static List<String> jsonStr2List(String jsonStr) {
List<String> res = new LinkedList<>();
Map<String, Object> map = GsonUtil.INSTANCE.get().fromJson(jsonStr, Map.class);
map.forEach((k, v) -> {
res.add(k + "=" + v);
});
return res;
}
public static String propertiesStr2JsonStr(String propertiesStr) {
String res = "{}";
try (ByteArrayInputStream baos = new ByteArrayInputStream(propertiesStr.getBytes())) {
Properties properties = new Properties();
properties.load(baos);
res = toJsonString(properties);
} catch (IOException e) {
log.error("propertiesStr2JsonStr error.", e);
}
return res;
}
}

View File

@@ -0,0 +1,61 @@
package com.xuxd.kafka.console.utils;
import com.xuxd.kafka.console.config.ContextConfigHolder;
import java.util.Properties;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.admin.ScramMechanism;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.security.auth.SecurityProtocol;
/**
* kafka-console-ui.
*
* @author xuxd
* @date 2022-01-06 11:07:41
**/
public class SaslUtil {
public static final Pattern JAAS_PATTERN = Pattern.compile("^.*(username=\"(.*)\"[ \t]+).*$");
private SaslUtil() {
}
public static String findUsername(String saslJaasConfig) {
Matcher matcher = JAAS_PATTERN.matcher(saslJaasConfig);
return matcher.find() ? matcher.group(2) : "";
}
public static boolean isEnableSasl() {
Properties properties = ContextConfigHolder.CONTEXT_CONFIG.get().getProperties();
if (!properties.containsKey(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG)) {
return false;
}
String s = properties.getProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG);
SecurityProtocol protocol = SecurityProtocol.valueOf(s);
switch (protocol) {
case SASL_SSL:
case SASL_PLAINTEXT:
return true;
default:
return false;
}
}
public static boolean isEnableScram() {
Properties properties = ContextConfigHolder.CONTEXT_CONFIG.get().getProperties();
if (!properties.containsKey(SaslConfigs.SASL_MECHANISM)) {
return false;
}
String s = properties.getProperty(SaslConfigs.SASL_MECHANISM);
ScramMechanism mechanism = ScramMechanism.fromMechanismName(s);
switch (mechanism) {
case UNKNOWN:
return false;
default:
return true;
}
}
}

View File

@@ -6,23 +6,12 @@ server:
kafka:
config:
# kafka broker地址多个以逗号分隔
bootstrap-server: 'localhost:9092'
request-timeout-ms: 5000
# 服务端是否启用acl如果不启用下面的所有配置都忽略即可只用配置上面的Kafka集群地址就行了
enable-acl: false
# 只支持2种安全协议SASL_PLAINTEXT和PLAINTEXT启用acl则设置为SASL_PLAINTEXT不启用acl不需关心这个配置
security-protocol: SASL_PLAINTEXT
sasl-mechanism: SCRAM-SHA-256
# 超级管理员用户名在broker上已经配置为超级管理员
admin-username: admin
# 超级管理员密码
admin-password: admin
# 启动自动创建配置的超级管理员用户
admin-create: false
# broker连接的zk地址如果启动自动创建配置的超级管理员用户则必须配置否则忽略
zookeeper-addr: 'localhost:2181'
sasl-jaas-config: org.apache.kafka.common.security.scram.ScramLoginModule required username="${kafka.config.admin-username}" password="${kafka.config.admin-password}";
# 如果不存在default集群启动的时候默认会把这个加载进来(如果这里配置集群地址了),如果已经存在,则不加载
# kafka broker地址多个以逗号分隔不是必须在这里配置也可以启动之后在页面上添加集群信息
bootstrap-server:
# 集群其它属性配置
properties:
# request.timeout.ms: 5000
spring:
application:
@@ -46,6 +35,7 @@ spring:
logging:
home: ./
# 基于scram方案的acl这里会记录创建的用户密码等信息定时扫描如果集群中已经不存在这些用户就把这些信息从db中清除掉
cron:
# clear-dirty-user: 0 * * * * ?
clear-dirty-user: 0 0 1 * * ?

View File

@@ -1,23 +1,37 @@
-- DROP TABLE IF EXISTS T_KAKFA_USER;
-- kafka ACL启用SASL_SCRAM中的用户
CREATE TABLE IF NOT EXISTS T_KAFKA_USER
(
ID IDENTITY NOT NULL COMMENT '主键ID',
USERNAME VARCHAR(50) NOT NULL DEFAULT '' COMMENT '',
PASSWORD VARCHAR(50) NOT NULL DEFAULT '' COMMENT '年龄',
UPDATE_TIME TIMESTAMP NOT NULL DEFAULT NOW() COMMENT '更新时间',
ID IDENTITY NOT NULL COMMENT '主键ID',
USERNAME VARCHAR(50) NOT NULL DEFAULT '' COMMENT '用户',
PASSWORD VARCHAR(50) NOT NULL DEFAULT '' COMMENT '密码',
UPDATE_TIME TIMESTAMP NOT NULL DEFAULT NOW() COMMENT '更新时间',
CLUSTER_INFO_ID BIGINT NOT NULL COMMENT '集群信息里的集群ID',
PRIMARY KEY (ID),
UNIQUE (USERNAME)
);
-- 消息同步解决方案中使用的位点对齐信息
CREATE TABLE IF NOT EXISTS T_MIN_OFFSET_ALIGNMENT
(
ID IDENTITY NOT NULL COMMENT '主键ID',
GROUP_ID VARCHAR(128) NOT NULL DEFAULT '' COMMENT 'groupId',
TOPIC VARCHAR(128) NOT NULL DEFAULT '' COMMENT 'topic',
THAT_OFFSET VARCHAR(512) NOT NULL DEFAULT '' COMMENT 'min offset for that kafka cluster',
THIS_OFFSET VARCHAR(512) NOT NULL DEFAULT '' COMMENT 'min offset for this kafka cluster',
UPDATE_TIME TIMESTAMP NOT NULL DEFAULT NOW() COMMENT '更新时间',
ID IDENTITY NOT NULL COMMENT '主键ID',
GROUP_ID VARCHAR(128) NOT NULL DEFAULT '' COMMENT 'groupId',
TOPIC VARCHAR(128) NOT NULL DEFAULT '' COMMENT 'topic',
THAT_OFFSET VARCHAR(512) NOT NULL DEFAULT '' COMMENT 'min offset for that kafka cluster',
THIS_OFFSET VARCHAR(512) NOT NULL DEFAULT '' COMMENT 'min offset for this kafka cluster',
UPDATE_TIME TIMESTAMP NOT NULL DEFAULT NOW() COMMENT '更新时间',
PRIMARY KEY (ID),
UNIQUE (GROUP_ID, TOPIC)
);
-- 多集群管理,每个集群的配置信息
CREATE TABLE IF NOT EXISTS T_CLUSTER_INFO
(
ID IDENTITY NOT NULL COMMENT '主键ID',
CLUSTER_NAME VARCHAR(128) NOT NULL DEFAULT '' COMMENT '集群名',
ADDRESS VARCHAR(256) NOT NULL DEFAULT '' COMMENT '集群地址',
PROPERTIES VARCHAR(512) NOT NULL DEFAULT '' COMMENT '集群的其它属性配置',
UPDATE_TIME TIMESTAMP NOT NULL DEFAULT NOW() COMMENT '更新时间',
PRIMARY KEY (ID),
UNIQUE (CLUSTER_NAME)
);

View File

@@ -0,0 +1,330 @@
package kafka.console
import com.xuxd.kafka.console.config.ContextConfigHolder
import kafka.utils.Implicits.MapExtensionMethods
import kafka.utils.Logging
import org.apache.kafka.clients._
import org.apache.kafka.clients.admin.AdminClientConfig
import org.apache.kafka.clients.consumer.internals.{ConsumerNetworkClient, RequestFuture}
import org.apache.kafka.common.Node
import org.apache.kafka.common.config.ConfigDef.ValidString.in
import org.apache.kafka.common.config.ConfigDef.{Importance, Type}
import org.apache.kafka.common.config.{AbstractConfig, ConfigDef}
import org.apache.kafka.common.errors.AuthenticationException
import org.apache.kafka.common.internals.ClusterResourceListeners
import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersionCollection
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.network.Selector
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests._
import org.apache.kafka.common.utils.{KafkaThread, LogContext, Time}
import java.io.IOException
import java.util.Properties
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.{ConcurrentLinkedQueue, TimeUnit}
import scala.jdk.CollectionConverters.{ListHasAsScala, MapHasAsJava, PropertiesHasAsScala, SetHasAsScala}
import scala.util.{Failure, Success, Try}
/**
* kafka-console-ui.
*
* Copy from {@link kafka.admin.BrokerApiVersionsCommand}.
*
* @author xuxd
* @date 2022-01-22 15:15:57
* */
object BrokerApiVersion extends Logging {
def listAllBrokerApiVersionInfo(): java.util.HashMap[Node, NodeApiVersions] = {
val res = new java.util.HashMap[Node, NodeApiVersions]()
val adminClient = createAdminClient()
try {
adminClient.awaitBrokers()
val brokerMap = adminClient.listAllBrokerVersionInfo()
brokerMap.forKeyValue {
(broker, versionInfoOrError) =>
versionInfoOrError match {
case Success(v) => {
res.put(broker, v)
}
case Failure(v) => logger.error(s"${broker} -> ERROR: ${v}\n")
}
}
} finally {
adminClient.close()
}
res
}
private def createAdminClient(): AdminClient = {
val props = new Properties()
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, ContextConfigHolder.CONTEXT_CONFIG.get().getBootstrapServer())
props.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, ContextConfigHolder.CONTEXT_CONFIG.get().getRequestTimeoutMs())
props.putAll(ContextConfigHolder.CONTEXT_CONFIG.get().getProperties())
AdminClient.create(props)
}
// org.apache.kafka.clients.admin.AdminClient doesn't currently expose a way to retrieve the supported api versions.
// We inline the bits we need from kafka.admin.AdminClient so that we can delete it.
private class AdminClient(val time: Time,
val client: ConsumerNetworkClient,
val bootstrapBrokers: List[Node]) extends Logging {
@volatile var running = true
val pendingFutures = new ConcurrentLinkedQueue[RequestFuture[ClientResponse]]()
val networkThread = new KafkaThread("admin-client-network-thread", () => {
try {
while (running)
client.poll(time.timer(Long.MaxValue))
} catch {
case t: Throwable =>
error("admin-client-network-thread exited", t)
} finally {
pendingFutures.forEach { future =>
try {
future.raise(Errors.UNKNOWN_SERVER_ERROR)
} catch {
case _: IllegalStateException => // It is OK if the future has been completed
}
}
pendingFutures.clear()
}
}, true)
networkThread.start()
private def send(target: Node,
request: AbstractRequest.Builder[_ <: AbstractRequest]): AbstractResponse = {
val future = client.send(target, request)
pendingFutures.add(future)
future.awaitDone(Long.MaxValue, TimeUnit.MILLISECONDS)
pendingFutures.remove(future)
if (future.succeeded())
future.value().responseBody()
else
throw future.exception()
}
private def sendAnyNode(request: AbstractRequest.Builder[_ <: AbstractRequest]): AbstractResponse = {
bootstrapBrokers.foreach { broker =>
try {
return send(broker, request)
} catch {
case e: AuthenticationException =>
throw e
case e: Exception =>
debug(s"Request ${request.apiKey()} failed against node $broker", e)
}
}
throw new RuntimeException(s"Request ${request.apiKey()} failed on brokers $bootstrapBrokers")
}
private def getApiVersions(node: Node): ApiVersionCollection = {
val response = send(node, new ApiVersionsRequest.Builder()).asInstanceOf[ApiVersionsResponse]
Errors.forCode(response.data.errorCode).maybeThrow()
response.data.apiKeys
}
/**
* Wait until there is a non-empty list of brokers in the cluster.
*/
def awaitBrokers(): Unit = {
var nodes = List[Node]()
val start = System.currentTimeMillis()
val maxWait = 30 * 1000
do {
nodes = findAllBrokers()
if (nodes.isEmpty) {
Thread.sleep(50)
}
}
while (nodes.isEmpty && (System.currentTimeMillis() - start < maxWait))
}
private def findAllBrokers(): List[Node] = {
val request = MetadataRequest.Builder.allTopics()
val response = sendAnyNode(request).asInstanceOf[MetadataResponse]
val errors = response.errors
if (!errors.isEmpty) {
logger.info(s"Metadata request contained errors: $errors")
}
// 在3.x版本中这个方法是buildCluster 代替cluster()了
// response.buildCluster.nodes.asScala.toList
response.cluster().nodes.asScala.toList
}
def listAllBrokerVersionInfo(): Map[Node, Try[NodeApiVersions]] =
findAllBrokers().map { broker =>
broker -> Try[NodeApiVersions](new NodeApiVersions(getApiVersions(broker)))
}.toMap
def close(): Unit = {
running = false
try {
client.close()
} catch {
case e: IOException =>
error("Exception closing nioSelector:", e)
}
}
}
private object AdminClient {
val DefaultConnectionMaxIdleMs = 9 * 60 * 1000
val DefaultRequestTimeoutMs = 5000
val DefaultSocketConnectionSetupMs = CommonClientConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MS_CONFIG
val DefaultSocketConnectionSetupMaxMs = CommonClientConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_CONFIG
val DefaultMaxInFlightRequestsPerConnection = 100
val DefaultReconnectBackoffMs = 50
val DefaultReconnectBackoffMax = 50
val DefaultSendBufferBytes = 128 * 1024
val DefaultReceiveBufferBytes = 32 * 1024
val DefaultRetryBackoffMs = 100
val AdminClientIdSequence = new AtomicInteger(1)
val AdminConfigDef = {
val config = new ConfigDef()
.define(
CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG,
Type.LIST,
Importance.HIGH,
CommonClientConfigs.BOOTSTRAP_SERVERS_DOC)
.define(CommonClientConfigs.CLIENT_DNS_LOOKUP_CONFIG,
Type.STRING,
ClientDnsLookup.USE_ALL_DNS_IPS.toString,
in(ClientDnsLookup.USE_ALL_DNS_IPS.toString,
ClientDnsLookup.RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY.toString),
Importance.MEDIUM,
CommonClientConfigs.CLIENT_DNS_LOOKUP_DOC)
.define(
CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,
ConfigDef.Type.STRING,
CommonClientConfigs.DEFAULT_SECURITY_PROTOCOL,
ConfigDef.Importance.MEDIUM,
CommonClientConfigs.SECURITY_PROTOCOL_DOC)
.define(
CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG,
ConfigDef.Type.INT,
DefaultRequestTimeoutMs,
ConfigDef.Importance.MEDIUM,
CommonClientConfigs.REQUEST_TIMEOUT_MS_DOC)
.define(
CommonClientConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MS_CONFIG,
ConfigDef.Type.LONG,
CommonClientConfigs.DEFAULT_SOCKET_CONNECTION_SETUP_TIMEOUT_MS,
ConfigDef.Importance.MEDIUM,
CommonClientConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MS_DOC)
.define(
CommonClientConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_CONFIG,
ConfigDef.Type.LONG,
CommonClientConfigs.DEFAULT_SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS,
ConfigDef.Importance.MEDIUM,
CommonClientConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_DOC)
.define(
CommonClientConfigs.RETRY_BACKOFF_MS_CONFIG,
ConfigDef.Type.LONG,
DefaultRetryBackoffMs,
ConfigDef.Importance.MEDIUM,
CommonClientConfigs.RETRY_BACKOFF_MS_DOC)
.withClientSslSupport()
.withClientSaslSupport()
config
}
class AdminConfig(originals: Map[_, _]) extends AbstractConfig(AdminConfigDef, originals.asJava, false)
def create(props: Properties): AdminClient = {
val properties = new Properties()
val names = props.stringPropertyNames()
for (name <- names.asScala.toSet) {
properties.put(name, props.get(name).toString())
}
create(properties.asScala.toMap)
}
def create(props: Map[String, _]): AdminClient = create(new AdminConfig(props))
def create(config: AdminConfig): AdminClient = {
val clientId = "admin-" + AdminClientIdSequence.getAndIncrement()
val logContext = new LogContext(s"[LegacyAdminClient clientId=$clientId] ")
val time = Time.SYSTEM
val metrics = new Metrics(time)
val metadata = new Metadata(100L, 60 * 60 * 1000L, logContext,
new ClusterResourceListeners)
val channelBuilder = ClientUtils.createChannelBuilder(config, time, logContext)
val requestTimeoutMs = config.getInt(CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG)
val connectionSetupTimeoutMs = config.getLong(CommonClientConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MS_CONFIG)
val connectionSetupTimeoutMaxMs = config.getLong(CommonClientConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_CONFIG)
val retryBackoffMs = config.getLong(CommonClientConfigs.RETRY_BACKOFF_MS_CONFIG)
val brokerUrls = config.getList(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG)
val clientDnsLookup = config.getString(CommonClientConfigs.CLIENT_DNS_LOOKUP_CONFIG)
val brokerAddresses = ClientUtils.parseAndValidateAddresses(brokerUrls, clientDnsLookup)
metadata.bootstrap(brokerAddresses)
val selector = new Selector(
DefaultConnectionMaxIdleMs,
metrics,
time,
"admin",
channelBuilder,
logContext)
// 版本不一样,这个地方的兼容性问题也不一样了
// 3.x版本用这个
// val networkClient = new NetworkClient(
// selector,
// metadata,
// clientId,
// DefaultMaxInFlightRequestsPerConnection,
// DefaultReconnectBackoffMs,
// DefaultReconnectBackoffMax,
// DefaultSendBufferBytes,
// DefaultReceiveBufferBytes,
// requestTimeoutMs,
// connectionSetupTimeoutMs,
// connectionSetupTimeoutMaxMs,
// time,
// true,
// new ApiVersions,
// logContext)
val networkClient = new NetworkClient(
selector,
metadata,
clientId,
DefaultMaxInFlightRequestsPerConnection,
DefaultReconnectBackoffMs,
DefaultReconnectBackoffMax,
DefaultSendBufferBytes,
DefaultReceiveBufferBytes,
requestTimeoutMs,
connectionSetupTimeoutMs,
connectionSetupTimeoutMaxMs,
ClientDnsLookup.USE_ALL_DNS_IPS,
time,
true,
new ApiVersions,
logContext)
val highLevelClient = new ConsumerNetworkClient(
logContext,
networkClient,
metadata,
time,
retryBackoffMs,
requestTimeoutMs,
Integer.MAX_VALUE)
new AdminClient(
time,
highLevelClient,
metadata.fetch.nodes.asScala.toList)
}
}
}

View File

@@ -1,12 +1,13 @@
package kafka.console
import com.xuxd.kafka.console.beans.{BrokerNode, ClusterInfo}
import com.xuxd.kafka.console.config.{ContextConfigHolder, KafkaConfig}
import org.apache.kafka.clients.NodeApiVersions
import org.apache.kafka.clients.admin.DescribeClusterResult
import org.apache.kafka.common.Node
import java.util.Collections
import java.util.concurrent.TimeUnit
import com.xuxd.kafka.console.beans.{BrokerNode, ClusterInfo}
import com.xuxd.kafka.console.config.KafkaConfig
import org.apache.kafka.clients.admin.DescribeClusterResult
import scala.jdk.CollectionConverters.{CollectionHasAsScala, SetHasAsJava, SetHasAsScala}
/**
@@ -19,6 +20,7 @@ class ClusterConsole(config: KafkaConfig) extends KafkaConsole(config: KafkaConf
def clusterInfo(): ClusterInfo = {
withAdminClientAndCatchError(admin => {
val timeoutMs = ContextConfigHolder.CONTEXT_CONFIG.get().getRequestTimeoutMs()
val clusterResult: DescribeClusterResult = admin.describeCluster()
val clusterInfo = new ClusterInfo
clusterInfo.setClusterId(clusterResult.clusterId().get(timeoutMs, TimeUnit.MILLISECONDS))
@@ -41,4 +43,8 @@ class ClusterConsole(config: KafkaConfig) extends KafkaConsole(config: KafkaConf
new ClusterInfo
}).asInstanceOf[ClusterInfo]
}
def listBrokerVersionInfo(): java.util.HashMap[Node, NodeApiVersions] = {
BrokerApiVersion.listAllBrokerApiVersionInfo()
}
}

View File

@@ -3,8 +3,7 @@ package kafka.console
import java.util
import java.util.Collections
import java.util.concurrent.TimeUnit
import com.xuxd.kafka.console.config.KafkaConfig
import com.xuxd.kafka.console.config.{ContextConfigHolder, KafkaConfig}
import kafka.console.ConfigConsole.BrokerLoggerConfigType
import kafka.server.ConfigType
import org.apache.kafka.clients.admin.{AlterConfigOp, Config, ConfigEntry, DescribeConfigsOptions}
@@ -69,6 +68,7 @@ class ConfigConsole(config: KafkaConfig) extends KafkaConsole(config: KafkaConfi
val configResource = new ConfigResource(getResourceTypeAndValidate(entityType, entityName), entityName)
val config = Map(configResource -> Collections.singletonList(new AlterConfigOp(entry, opType)).asInstanceOf[util.Collection[AlterConfigOp]])
val timeoutMs = ContextConfigHolder.CONTEXT_CONFIG.get().getRequestTimeoutMs()
admin.incrementalAlterConfigs(config.asJava).all().get(timeoutMs, TimeUnit.MILLISECONDS)
(true, "")
}, e => {

View File

@@ -1,6 +1,6 @@
package kafka.console
import com.xuxd.kafka.console.config.KafkaConfig
import com.xuxd.kafka.console.config.{ContextConfigHolder, KafkaConfig}
import org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo
import org.apache.kafka.clients.admin._
import org.apache.kafka.clients.consumer.{ConsumerConfig, OffsetAndMetadata, OffsetResetStrategy}
@@ -75,6 +75,7 @@ class ConsumerConsole(config: KafkaConfig) extends KafkaConsole(config: KafkaCon
val endOffsets = commitOffsets.keySet.map { topicPartition =>
topicPartition -> OffsetSpec.latest
}.toMap
val timeoutMs = ContextConfigHolder.CONTEXT_CONFIG.get().getRequestTimeoutMs()
admin.listOffsets(endOffsets.asJava).all().get(timeoutMs, TimeUnit.MILLISECONDS)
}, e => {
log.error("listOffsets error.", e)
@@ -166,6 +167,7 @@ class ConsumerConsole(config: KafkaConfig) extends KafkaConsole(config: KafkaCon
def resetPartitionToTargetOffset(groupId: String, partition: TopicPartition, offset: Long): (Boolean, String) = {
withAdminClientAndCatchError(admin => {
val timeoutMs = ContextConfigHolder.CONTEXT_CONFIG.get().getRequestTimeoutMs()
admin.alterConsumerGroupOffsets(groupId, Map(partition -> new OffsetAndMetadata(offset)).asJava).all().get(timeoutMs, TimeUnit.MILLISECONDS)
(true, "")
}, e => {
@@ -178,7 +180,7 @@ class ConsumerConsole(config: KafkaConfig) extends KafkaConsole(config: KafkaCon
timestamp: java.lang.Long): (Boolean, String) = {
withAdminClientAndCatchError(admin => {
val logOffsets = getLogTimestampOffsets(admin, groupId, topicPartitions.asScala, timestamp)
val timeoutMs = ContextConfigHolder.CONTEXT_CONFIG.get().getRequestTimeoutMs()
admin.alterConsumerGroupOffsets(groupId, logOffsets.asJava).all().get(timeoutMs, TimeUnit.MILLISECONDS)
(true, "")
}, e => {
@@ -256,6 +258,7 @@ class ConsumerConsole(config: KafkaConfig) extends KafkaConsole(config: KafkaCon
val timestampOffsets = topicPartitions.map { topicPartition =>
topicPartition -> OffsetSpec.forTimestamp(timestamp)
}.toMap
val timeoutMs = ContextConfigHolder.CONTEXT_CONFIG.get().getRequestTimeoutMs()
val offsets = admin.listOffsets(
timestampOffsets.asJava,
new ListOffsetsOptions().timeoutMs(timeoutMs)
@@ -280,6 +283,7 @@ class ConsumerConsole(config: KafkaConfig) extends KafkaConsole(config: KafkaCon
val endOffsets = topicPartitions.map { topicPartition =>
topicPartition -> OffsetSpec.latest
}.toMap
val timeoutMs = ContextConfigHolder.CONTEXT_CONFIG.get().getRequestTimeoutMs()
val offsets = admin.listOffsets(
endOffsets.asJava,
new ListOffsetsOptions().timeoutMs(timeoutMs)

View File

@@ -3,9 +3,8 @@ package kafka.console
import java.util
import java.util.concurrent.TimeUnit
import java.util.{Collections, List}
import com.xuxd.kafka.console.beans.AclEntry
import com.xuxd.kafka.console.config.KafkaConfig
import com.xuxd.kafka.console.config.{ContextConfigHolder, KafkaConfig}
import org.apache.commons.lang3.StringUtils
import org.apache.kafka.common.acl._
import org.apache.kafka.common.resource.{ResourcePattern, ResourcePatternFilter, ResourceType}
@@ -58,6 +57,7 @@ class KafkaAclConsole(config: KafkaConfig) extends KafkaConsole(config: KafkaCon
def addAcl(acls: List[AclBinding]): Boolean = {
withAdminClient(adminClient => {
try {
val timeoutMs = ContextConfigHolder.CONTEXT_CONFIG.get().getRequestTimeoutMs()
adminClient.createAcls(acls).all().get(timeoutMs, TimeUnit.MILLISECONDS)
true
} catch {
@@ -100,6 +100,7 @@ class KafkaAclConsole(config: KafkaConfig) extends KafkaConsole(config: KafkaCon
def deleteAcl(entry: AclEntry, allResource: Boolean, allPrincipal: Boolean, allOperation: Boolean): Boolean = {
withAdminClient(adminClient => {
try {
val timeoutMs = ContextConfigHolder.CONTEXT_CONFIG.get().getRequestTimeoutMs()
val result = adminClient.deleteAcls(Collections.singleton(entry.toAclBindingFilter(allResource, allPrincipal, allOperation))).all().get(timeoutMs, TimeUnit.MILLISECONDS)
log.info("delete acl: {}", result)
true
@@ -113,6 +114,7 @@ class KafkaAclConsole(config: KafkaConfig) extends KafkaConsole(config: KafkaCon
def deleteAcl(filters: util.Collection[AclBindingFilter]): Boolean = {
withAdminClient(adminClient => {
try {
val timeoutMs = ContextConfigHolder.CONTEXT_CONFIG.get().getRequestTimeoutMs()
val result = adminClient.deleteAcls(filters).all().get(timeoutMs, TimeUnit.MILLISECONDS)
log.info("delete acl: {}", result)
true

View File

@@ -1,16 +1,16 @@
package kafka.console
import com.xuxd.kafka.console.config.{ContextConfigHolder, KafkaConfig}
import kafka.server.ConfigType
import kafka.utils.Implicits.PropertiesOps
import org.apache.kafka.clients.admin._
import org.apache.kafka.common.config.SaslConfigs
import org.apache.kafka.common.security.scram.internals.{ScramCredentialUtils, ScramFormatter}
import java.security.MessageDigest
import java.util
import java.util.concurrent.TimeUnit
import java.util.{Properties, Set}
import com.xuxd.kafka.console.config.KafkaConfig
import kafka.server.ConfigType
import kafka.utils.Implicits.PropertiesOps
import org.apache.kafka.clients.admin._
import org.apache.kafka.common.security.scram.internals.{ScramCredentialUtils, ScramFormatter}
import scala.jdk.CollectionConverters.{CollectionHasAsScala, DictionaryHasAsScala, SeqHasAsJava}
/**
@@ -35,31 +35,32 @@ class KafkaConfigConsole(config: KafkaConfig) extends KafkaConsole(config: Kafka
}).asInstanceOf[util.Map[String, UserScramCredentialsDescription]]
}
def addOrUpdateUser(name: String, pass: String): Boolean = {
def addOrUpdateUser(name: String, pass: String): (Boolean, String) = {
withAdminClient(adminClient => {
try {
adminClient.alterUserScramCredentials(util.Arrays.asList(
new UserScramCredentialUpsertion(name,
new ScramCredentialInfo(ScramMechanism.fromMechanismName(config.getSaslMechanism), defaultIterations), pass)))
.all().get(timeoutMs, TimeUnit.MILLISECONDS)
true
val timeoutMs = ContextConfigHolder.CONTEXT_CONFIG.get().getRequestTimeoutMs()
val mechanisms = ContextConfigHolder.CONTEXT_CONFIG.get().getProperties().getProperty(SaslConfigs.SASL_MECHANISM).split(",").toSeq
val scrams = mechanisms.map(m => new UserScramCredentialUpsertion(name,
new ScramCredentialInfo(ScramMechanism.fromMechanismName(m), defaultIterations), pass))
adminClient.alterUserScramCredentials(scrams.asInstanceOf[Seq[UserScramCredentialAlteration]].asJava).all().get(timeoutMs, TimeUnit.MILLISECONDS)
(true, "")
} catch {
case ex: Exception => log.error("addOrUpdateUser error", ex)
false
(false, ex.getMessage)
}
}).asInstanceOf[Boolean]
}).asInstanceOf[(Boolean, String)]
}
def addOrUpdateUserWithZK(name: String, pass: String): Boolean = {
withZKClient(adminZkClient => {
try {
val credential = new ScramFormatter(org.apache.kafka.common.security.scram.internals.ScramMechanism.forMechanismName(config.getSaslMechanism))
val credential = new ScramFormatter(org.apache.kafka.common.security.scram.internals.ScramMechanism.forMechanismName(ContextConfigHolder.CONTEXT_CONFIG.get().getProperties().getProperty(SaslConfigs.SASL_MECHANISM)))
.generateCredential(pass, defaultIterations)
val credentialStr = ScramCredentialUtils.credentialToString(credential)
val userConfig: Properties = new Properties()
userConfig.put(config.getSaslMechanism, credentialStr)
userConfig.put(ContextConfigHolder.CONTEXT_CONFIG.get().getProperties().getProperty(SaslConfigs.SASL_MECHANISM), credentialStr)
val configs = adminZkClient.fetchEntityConfig(ConfigType.User, name)
userConfig ++= configs
@@ -101,6 +102,7 @@ class KafkaConfigConsole(config: KafkaConfig) extends KafkaConsole(config: Kafka
// .all().get(timeoutMs, TimeUnit.MILLISECONDS)
// all delete
val userDetail = getUserDetailList(util.Collections.singletonList(name))
val timeoutMs = ContextConfigHolder.CONTEXT_CONFIG.get().getRequestTimeoutMs()
userDetail.values().asScala.foreach(u => {
adminClient.alterUserScramCredentials(u.credentialInfos().asScala.map(s => new UserScramCredentialDeletion(u.name(), s.mechanism())
.asInstanceOf[UserScramCredentialAlteration]).toList.asJava)

View File

@@ -1,13 +1,11 @@
package kafka.console
import com.xuxd.kafka.console.config.KafkaConfig
import com.xuxd.kafka.console.config.{ContextConfigHolder, KafkaConfig}
import kafka.zk.{AdminZkClient, KafkaZkClient}
import org.apache.kafka.clients.CommonClientConfigs
import org.apache.kafka.clients.admin._
import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer, OffsetAndMetadata}
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.config.SaslConfigs
import org.apache.kafka.common.requests.ListOffsetsResponse
import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySerializer, StringSerializer}
import org.apache.kafka.common.utils.Time
@@ -25,7 +23,7 @@ import scala.jdk.CollectionConverters.{MapHasAsJava, MapHasAsScala}
* */
class KafkaConsole(config: KafkaConfig) {
protected val timeoutMs: Int = config.getRequestTimeoutMs
// protected val timeoutMs: Int = config.getRequestTimeoutMs
protected def withAdminClient(f: Admin => Any): Any = {
@@ -108,7 +106,7 @@ class KafkaConsole(config: KafkaConfig) {
}
protected def withTimeoutMs[T <: AbstractOptions[T]](options: T) = {
options.timeoutMs(timeoutMs)
options.timeoutMs(ContextConfigHolder.CONTEXT_CONFIG.get().getRequestTimeoutMs())
}
private def createAdminClient(): Admin = {
@@ -117,13 +115,9 @@ class KafkaConsole(config: KafkaConfig) {
private def getProps(): Properties = {
val props: Properties = new Properties();
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, config.getBootstrapServer)
props.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, config.getRequestTimeoutMs())
if (config.isEnableAcl) {
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, config.getSecurityProtocol())
props.put(SaslConfigs.SASL_MECHANISM, config.getSaslMechanism())
props.put(SaslConfigs.SASL_JAAS_CONFIG, config.getSaslJaasConfig())
}
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, ContextConfigHolder.CONTEXT_CONFIG.get().getBootstrapServer())
props.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, ContextConfigHolder.CONTEXT_CONFIG.get().getRequestTimeoutMs())
props.putAll(ContextConfigHolder.CONTEXT_CONFIG.get().getProperties())
props
}
}

View File

@@ -1,6 +1,9 @@
package kafka.console
import com.xuxd.kafka.console.config.KafkaConfig
import com.xuxd.kafka.console.beans.MessageFilter
import com.xuxd.kafka.console.beans.enums.FilterType
import com.xuxd.kafka.console.config.{ContextConfigHolder, KafkaConfig}
import org.apache.commons.lang3.StringUtils
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.TopicPartition
@@ -20,10 +23,11 @@ import scala.jdk.CollectionConverters.{CollectionHasAsScala, MapHasAsScala, SeqH
class MessageConsole(config: KafkaConfig) extends KafkaConsole(config: KafkaConfig) with Logging {
def searchBy(partitions: util.Collection[TopicPartition], startTime: Long, endTime: Long,
maxNums: Int): util.List[ConsumerRecord[Array[Byte], Array[Byte]]] = {
maxNums: Int, filter: MessageFilter): (util.List[ConsumerRecord[Array[Byte], Array[Byte]]], Int) = {
var startOffTable: immutable.Map[TopicPartition, Long] = Map.empty
var endOffTable: immutable.Map[TopicPartition, Long] = Map.empty
withAdminClientAndCatchError(admin => {
val timeoutMs = ContextConfigHolder.CONTEXT_CONFIG.get().getRequestTimeoutMs()
val startTable = KafkaConsole.getLogTimestampOffsets(admin, partitions.asScala.toSeq, startTime, timeoutMs)
startOffTable = startTable.map(t2 => (t2._1, t2._2.offset())).toMap
@@ -33,8 +37,44 @@ class MessageConsole(config: KafkaConfig) extends KafkaConsole(config: KafkaConf
log.error("getLogTimestampOffsets error.", e)
throw new RuntimeException("getLogTimestampOffsets error", e)
})
val headerValueBytes = if (StringUtils.isNotEmpty(filter.getHeaderValue())) filter.getHeaderValue().getBytes() else None
def filterMessage(record: ConsumerRecord[Array[Byte], Array[Byte]]): Boolean = {
filter.getFilterType() match {
case FilterType.BODY => {
val body = filter.getDeserializer().deserialize(record.topic(), record.value())
var contains = false
if (filter.isContainsValue) {
contains = body.asInstanceOf[String].contains(filter.getSearchContent().asInstanceOf[String])
} else {
contains = body.equals(filter.getSearchContent)
}
contains
}
case FilterType.HEADER => {
if (StringUtils.isNotEmpty(filter.getHeaderKey()) && StringUtils.isNotEmpty(filter.getHeaderValue())) {
val iterator = record.headers().headers(filter.getHeaderKey()).iterator()
var contains = false
while (iterator.hasNext() && !contains) {
val next = iterator.next().value()
contains = (next.sameElements(headerValueBytes.asInstanceOf[Array[Byte]]))
}
contains
} else if (StringUtils.isNotEmpty(filter.getHeaderKey()) && StringUtils.isEmpty(filter.getHeaderValue())) {
record.headers().headers(filter.getHeaderKey()).iterator().hasNext()
} else {
true
}
}
case FilterType.NONE => true
}
}
var terminate: Boolean = (startOffTable == endOffTable)
val res = new util.LinkedList[ConsumerRecord[Array[Byte], Array[Byte]]]()
// 检索的消息条数
var searchNums = 0
// 如果最小和最大偏移一致,就结束
if (!terminate) {
@@ -51,9 +91,10 @@ class MessageConsole(config: KafkaConfig) extends KafkaConsole(config: KafkaConf
// 1.所有查询分区达都到最大偏移的时候
while (!terminate) {
// 达到查询的最大条数
if (res.size() >= maxNums) {
if (searchNums >= maxNums) {
terminate = true
} else {
val timeoutMs = ContextConfigHolder.CONTEXT_CONFIG.get().getRequestTimeoutMs()
val records = consumer.poll(Duration.ofMillis(timeoutMs))
if (records.isEmpty) {
@@ -67,6 +108,7 @@ class MessageConsole(config: KafkaConfig) extends KafkaConsole(config: KafkaConf
if (first.offset() >= endOff) {
arrive.remove(tp)
} else {
searchNums += recordList.size()
//
// (String topic,
// int partition,
@@ -80,7 +122,7 @@ class MessageConsole(config: KafkaConfig) extends KafkaConsole(config: KafkaConf
// V value,
// Headers headers,
// Optional<Integer> leaderEpoch)
val nullVList = recordList.asScala.map(record => new ConsumerRecord[Array[Byte], Array[Byte]](record.topic(),
val nullVList = recordList.asScala.filter(filterMessage(_)).map(record => new ConsumerRecord[Array[Byte], Array[Byte]](record.topic(),
record.partition(),
record.offset(),
record.timestamp(),
@@ -114,7 +156,7 @@ class MessageConsole(config: KafkaConfig) extends KafkaConsole(config: KafkaConf
})
}
res
(res, searchNums)
}
def searchBy(
@@ -149,6 +191,7 @@ class MessageConsole(config: KafkaConfig) extends KafkaConsole(config: KafkaConf
var terminate = tpSet.isEmpty
while (!terminate) {
val timeoutMs = ContextConfigHolder.CONTEXT_CONFIG.get().getRequestTimeoutMs()
val records = consumer.poll(Duration.ofMillis(timeoutMs))
val tps = new util.HashSet(tpSet).asScala
for (tp <- tps) {

View File

@@ -1,6 +1,6 @@
package kafka.console
import com.xuxd.kafka.console.config.KafkaConfig
import com.xuxd.kafka.console.config.{ContextConfigHolder, KafkaConfig}
import kafka.admin.ReassignPartitionsCommand
import org.apache.kafka.clients.admin.{ElectLeadersOptions, ListPartitionReassignmentsOptions, PartitionReassignment}
import org.apache.kafka.clients.consumer.KafkaConsumer
@@ -34,6 +34,7 @@ class OperationConsole(config: KafkaConfig, topicConsole: TopicConsole,
throw new UnsupportedOperationException("exist consumer client.")
}
}
val timeoutMs = ContextConfigHolder.CONTEXT_CONFIG.get().getRequestTimeoutMs()
val thatGroupDescriptionList = thatAdmin.describeConsumerGroups(searchGroupIds).all().get(timeoutMs, TimeUnit.MILLISECONDS).values()
if (groupDescriptionList.isEmpty) {
throw new IllegalArgumentException("that consumer group info is null.")
@@ -101,6 +102,7 @@ class OperationConsole(config: KafkaConfig, topicConsole: TopicConsole,
thatMinOffset: util.Map[TopicPartition, Long]): (Boolean, String) = {
val thatAdmin = createAdminClient(props)
try {
val timeoutMs = ContextConfigHolder.CONTEXT_CONFIG.get().getRequestTimeoutMs()
val searchGroupIds = Collections.singleton(groupId)
val groupDescriptionList = consumerConsole.getConsumerGroupList(searchGroupIds)
if (groupDescriptionList.isEmpty) {
@@ -178,6 +180,7 @@ class OperationConsole(config: KafkaConfig, topicConsole: TopicConsole,
val thatConsumer = new KafkaConsumer(props, new ByteArrayDeserializer, new ByteArrayDeserializer)
try {
val timeoutMs = ContextConfigHolder.CONTEXT_CONFIG.get().getRequestTimeoutMs()
val thisTopicPartitions = consumerConsole.listSubscribeTopics(groupId).get(topic).asScala.sortBy(_.partition())
val thatTopicPartitionMap = thatAdmin.listConsumerGroupOffsets(
groupId
@@ -239,8 +242,8 @@ class OperationConsole(config: KafkaConfig, topicConsole: TopicConsole,
withAdminClientAndCatchError(admin => {
admin.listPartitionReassignments(withTimeoutMs(new ListPartitionReassignmentsOptions)).reassignments().get()
}, e => {
Collections.emptyMap()
log.error("listPartitionReassignments error.", e)
Collections.emptyMap()
}).asInstanceOf[util.Map[TopicPartition, PartitionReassignment]]
}
@@ -253,4 +256,20 @@ class OperationConsole(config: KafkaConfig, topicConsole: TopicConsole,
throw e
}).asInstanceOf[util.Map[TopicPartition, Throwable]]
}
def proposedAssignments(reassignmentJson: String,
brokerListString: String): util.Map[TopicPartition, util.List[Int]] = {
withAdminClientAndCatchError(admin => {
val map = ReassignPartitionsCommand.generateAssignment(admin, reassignmentJson, brokerListString, true)._1
val res = new util.HashMap[TopicPartition, util.List[Int]]()
for (tp <- map.keys) {
res.put(tp, map(tp).asJava)
// res.put(tp, map.getOrElse(tp, Seq.empty).asJava)
}
res
}, e => {
log.error("proposedAssignments error.", e)
throw e
})
}.asInstanceOf[util.Map[TopicPartition, util.List[Int]]]
}

View File

@@ -1,6 +1,6 @@
package kafka.console
import com.xuxd.kafka.console.config.KafkaConfig
import com.xuxd.kafka.console.config.{ContextConfigHolder, KafkaConfig}
import kafka.admin.ReassignPartitionsCommand._
import kafka.utils.Json
import org.apache.kafka.clients.admin._
@@ -28,6 +28,7 @@ class TopicConsole(config: KafkaConfig) extends KafkaConsole(config: KafkaConfig
* @return all topic name set.
*/
def getTopicNameList(internal: Boolean = true): Set[String] = {
val timeoutMs = ContextConfigHolder.CONTEXT_CONFIG.get().getRequestTimeoutMs()
withAdminClientAndCatchError(admin => admin.listTopics(new ListTopicsOptions().listInternal(internal)).names()
.get(timeoutMs, TimeUnit.MILLISECONDS),
e => {
@@ -42,6 +43,7 @@ class TopicConsole(config: KafkaConfig) extends KafkaConsole(config: KafkaConfig
* @return internal topic name set.
*/
def getInternalTopicNameList(): Set[String] = {
val timeoutMs = ContextConfigHolder.CONTEXT_CONFIG.get().getRequestTimeoutMs()
withAdminClientAndCatchError(admin => admin.listTopics(new ListTopicsOptions().listInternal(true)).listings()
.get(timeoutMs, TimeUnit.MILLISECONDS).asScala.filter(_.isInternal).map(_.name()).toSet[String].asJava,
e => {
@@ -69,6 +71,7 @@ class TopicConsole(config: KafkaConfig) extends KafkaConsole(config: KafkaConfig
*/
def deleteTopic(topic: String): (Boolean, String) = {
withAdminClientAndCatchError(admin => {
val timeoutMs = ContextConfigHolder.CONTEXT_CONFIG.get().getRequestTimeoutMs()
admin.deleteTopics(Collections.singleton(topic), new DeleteTopicsOptions().retryOnQuotaViolation(false)).all().get(timeoutMs, TimeUnit.MILLISECONDS)
(true, "")
},
@@ -103,6 +106,7 @@ class TopicConsole(config: KafkaConfig) extends KafkaConsole(config: KafkaConfig
*/
def createTopic(topic: NewTopic): (Boolean, String) = {
withAdminClientAndCatchError(admin => {
val timeoutMs = ContextConfigHolder.CONTEXT_CONFIG.get().getRequestTimeoutMs()
val createResult = admin.createTopics(Collections.singleton(topic), new CreateTopicsOptions().retryOnQuotaViolation(false))
createResult.all().get(timeoutMs, TimeUnit.MILLISECONDS)
(true, "")
@@ -117,6 +121,7 @@ class TopicConsole(config: KafkaConfig) extends KafkaConsole(config: KafkaConfig
*/
def createPartitions(newPartitions: util.Map[String, NewPartitions]): (Boolean, String) = {
withAdminClientAndCatchError(admin => {
val timeoutMs = ContextConfigHolder.CONTEXT_CONFIG.get().getRequestTimeoutMs()
admin.createPartitions(newPartitions,
new CreatePartitionsOptions().retryOnQuotaViolation(false)).all().get(timeoutMs, TimeUnit.MILLISECONDS)
(true, "")
@@ -241,6 +246,7 @@ class TopicConsole(config: KafkaConfig) extends KafkaConsole(config: KafkaConfig
.asScala.map(info => new TopicPartition(topic, info.partition())).toSeq
case None => throw new IllegalArgumentException("topic is not exist.")
}
val timeoutMs = ContextConfigHolder.CONTEXT_CONFIG.get().getRequestTimeoutMs()
val offsetMap = KafkaConsole.getLogTimestampOffsets(admin, partitions, timestamp, timeoutMs)
offsetMap.map(tuple2 => (tuple2._1, tuple2._2.offset())).toMap.asJava
}, e => {

12
ui/package-lock.json generated
View File

@@ -1866,9 +1866,9 @@
"optional": true
},
"loader-utils": {
"version": "2.0.0",
"resolved": "https://registry.npmjs.org/loader-utils/-/loader-utils-2.0.0.tgz",
"integrity": "sha512-rP4F0h2RaWSvPEkD7BLDFQnvSf+nK+wr3ESUjNTyAGobqrijmW92zc+SO6d4p4B1wh7+B/Jg1mkQe5NYUEHtHQ==",
"version": "2.0.2",
"resolved": "https://registry.npmjs.org/loader-utils/-/loader-utils-2.0.2.tgz",
"integrity": "sha512-TM57VeHptv569d/GKh6TAYdzKblwDNiumOdkFnejjD0XwTH87K90w3O7AiJRqdQoXygvi1VQTJTLGhJl7WqA7A==",
"dev": true,
"optional": true,
"requires": {
@@ -1897,9 +1897,9 @@
}
},
"vue-loader-v16": {
"version": "npm:vue-loader@16.5.0",
"resolved": "https://registry.npmjs.org/vue-loader/-/vue-loader-16.5.0.tgz",
"integrity": "sha512-WXh+7AgFxGTgb5QAkQtFeUcHNIEq3PGVQ8WskY5ZiFbWBkOwcCPRs4w/2tVyTbh2q6TVRlO3xfvIukUtjsu62A==",
"version": "npm:vue-loader@16.8.3",
"resolved": "https://registry.npmjs.org/vue-loader/-/vue-loader-16.8.3.tgz",
"integrity": "sha512-7vKN45IxsKxe5GcVCbc2qFU5aWzyiLrYJyUuMz4BQLKctCj/fmCa0w6fGiiQ2cLFetNcek1ppGJQDCup0c1hpA==",
"dev": true,
"optional": true,
"requires": {

View File

@@ -11,19 +11,24 @@
><router-link to="/group-page" class="pad-l-r">消费组</router-link>
<span>|</span
><router-link to="/message-page" class="pad-l-r">消息</router-link>
<span v-show="config.enableAcl">|</span
><router-link to="/acl-page" class="pad-l-r" v-show="config.enableAcl"
<span v-show="enableSasl">|</span
><router-link to="/acl-page" class="pad-l-r" v-show="enableSasl"
>Acl</router-link
>
<span>|</span
><router-link to="/op-page" class="pad-l-r">运维</router-link>
<span class="right">集群{{ clusterName }}</span>
</div>
<router-view class="content" />
</div>
</template>
<script>
import { KafkaConfigApi } from "@/utils/api";
import { KafkaClusterApi } from "@/utils/api";
import request from "@/utils/request";
import { mapMutations, mapState } from "vuex";
import { getClusterInfo } from "@/utils/local-cache";
import notification from "ant-design-vue/lib/notification";
import { CLUSTER } from "@/store/mutation-types";
export default {
data() {
@@ -32,12 +37,35 @@ export default {
};
},
created() {
request({
url: KafkaConfigApi.getConfig.url,
method: KafkaConfigApi.getConfig.method,
}).then((res) => {
this.config = res.data;
});
const clusterInfo = getClusterInfo();
if (!clusterInfo) {
request({
url: KafkaClusterApi.peekClusterInfo.url,
method: KafkaClusterApi.peekClusterInfo.method,
}).then((res) => {
if (res.code == 0) {
this.switchCluster(res.data);
} else {
notification.error({
message: "error",
description: res.msg,
});
}
});
} else {
this.switchCluster(clusterInfo);
}
},
computed: {
...mapState({
clusterName: (state) => state.clusterInfo.clusterName,
enableSasl: (state) => state.clusterInfo.enableSasl,
}),
},
methods: {
...mapMutations({
switchCluster: CLUSTER.SWITCH,
}),
},
};
</script>
@@ -90,4 +118,10 @@ export default {
top: 1%;
position: absolute;
}
.right {
float: right;
right: 1%;
top: 2%;
position: absolute;
}
</style>

View File

@@ -1,11 +1,34 @@
import Vue from "vue";
import Vuex from "vuex";
import { CLUSTER } from "@/store/mutation-types";
import { setClusterInfo } from "@/utils/local-cache";
Vue.use(Vuex);
export default new Vuex.Store({
state: {},
mutations: {},
state: {
clusterInfo: {
id: undefined,
clusterName: undefined,
enableSasl: false,
},
},
mutations: {
[CLUSTER.SWITCH](state, clusterInfo) {
state.clusterInfo.id = clusterInfo.id;
state.clusterInfo.clusterName = clusterInfo.clusterName;
let enableSasl = false;
for (let p in clusterInfo.properties) {
if (enableSasl) {
break;
}
enableSasl =
clusterInfo.properties[p].indexOf("security.protocol=SASL") != -1;
}
state.clusterInfo.enableSasl = enableSasl;
setClusterInfo(clusterInfo);
},
},
actions: {},
modules: {},
});

View File

@@ -0,0 +1,3 @@
export const CLUSTER = {
SWITCH: "switchCluster",
};

View File

@@ -51,7 +51,7 @@ export const KafkaAclApi = {
export const KafkaConfigApi = {
getConfig: {
url: "/config",
url: "/config/console",
method: "get",
},
getTopicConfig: {
@@ -183,6 +183,30 @@ export const KafkaClusterApi = {
url: "/cluster",
method: "get",
},
getClusterInfoList: {
url: "/cluster/info",
method: "get",
},
addClusterInfo: {
url: "/cluster/info",
method: "post",
},
deleteClusterInfo: {
url: "/cluster/info",
method: "delete",
},
updateClusterInfo: {
url: "/cluster/info",
method: "put",
},
peekClusterInfo: {
url: "/cluster/info/peek",
method: "get",
},
getBrokerApiVersionInfo: {
url: "/cluster/info/api/version",
method: "get",
},
};
export const KafkaOpApi = {
@@ -222,6 +246,10 @@ export const KafkaOpApi = {
url: "/op/replication/reassignments",
method: "delete",
},
proposedAssignment: {
url: "/op/replication/reassignments/proposed",
method: "post",
},
};
export const KafkaMessageApi = {
searchByTime: {

View File

@@ -1,3 +1,7 @@
export const ConstantEvent = {
updateUserDialogData: "updateUserDialogData",
};
export const Cache = {
clusterInfo: "clusterInfo",
};

View File

@@ -0,0 +1,10 @@
import { Cache } from "@/utils/constants";
export function setClusterInfo(clusterInfo) {
localStorage.setItem(Cache.clusterInfo, JSON.stringify(clusterInfo));
}
export function getClusterInfo() {
const str = localStorage.getItem(Cache.clusterInfo);
return str ? JSON.parse(str) : undefined;
}

View File

@@ -1,6 +1,7 @@
import axios from "axios";
import notification from "ant-design-vue/es/notification";
import { VueAxios } from "./axios";
import { getClusterInfo } from "@/utils/local-cache";
// 创建 axios 实例
const request = axios.create({
@@ -22,10 +23,14 @@ const errorHandler = (error) => {
};
// request interceptor
// request.interceptors.request.use(config => {
//
// return config
// }, errorHandler)
request.interceptors.request.use((config) => {
const clusterInfo = getClusterInfo();
if (clusterInfo) {
config.headers["X-Cluster-Info-Id"] = clusterInfo.id;
// config.headers["X-Cluster-Info-Name"] = encodeURIComponent(clusterInfo.clusterName);
}
return config;
}, errorHandler);
// response interceptor
request.interceptors.response.use((response) => {

View File

@@ -1,32 +1,128 @@
<template>
<div class="home">
<a-card title="kafka console 配置" style="width: 100%">
<!-- <a slot="extra" href="#">more</a>-->
<a-card title="控制台默认配置" class="card-style">
<p v-for="(v, k) in config" :key="k">{{ k }}={{ v }}</p>
</a-card>
<p></p>
<hr />
<h3>kafka API 版本兼容性</h3>
<a-spin :spinning="apiVersionInfoLoading">
<a-table
:columns="columns"
:data-source="brokerApiVersionInfo"
bordered
row-key="brokerId"
>
<div slot="operation" slot-scope="record">
<a-button
size="small"
href="javascript:;"
class="operation-btn"
@click="openApiVersionInfoDialog(record)"
>详情
</a-button>
</div>
</a-table>
</a-spin>
<VersionInfo
:version-info="apiVersionInfo"
:visible="showApiVersionInfoDialog"
@closeApiVersionInfoDialog="closeApiVersionInfoDialog"
>
</VersionInfo>
</div>
</template>
<script>
// @ is an alias to /src
import request from "@/utils/request";
import { KafkaConfigApi } from "@/utils/api";
import { KafkaConfigApi, KafkaClusterApi } from "@/utils/api";
import notification from "ant-design-vue/lib/notification";
import VersionInfo from "@/views/home/VersionInfo";
export default {
name: "Home",
components: {},
components: { VersionInfo },
data() {
return {
config: {},
columns,
brokerApiVersionInfo: [],
showApiVersionInfoDialog: false,
apiVersionInfo: [],
apiVersionInfoLoading: false,
};
},
methods: {
openApiVersionInfoDialog(record) {
this.apiVersionInfo = record.versionInfo;
this.showApiVersionInfoDialog = true;
},
closeApiVersionInfoDialog() {
this.showApiVersionInfoDialog = false;
},
},
created() {
request({
url: KafkaConfigApi.getConfig.url,
method: KafkaConfigApi.getConfig.method,
}).then((res) => {
this.config = res.data;
if (res.code == 0) {
this.config = res.data;
} else {
notification.error({
message: "error",
description: res.msg,
});
}
});
this.apiVersionInfoLoading = true;
request({
url: KafkaClusterApi.getBrokerApiVersionInfo.url,
method: KafkaClusterApi.getBrokerApiVersionInfo.method,
}).then((res) => {
this.apiVersionInfoLoading = false;
if (res.code == 0) {
this.brokerApiVersionInfo = res.data;
} else {
notification.error({
message: "error",
description: res.msg,
});
}
});
},
};
const columns = [
{
title: "id",
dataIndex: "brokerId",
key: "brokerId",
},
{
title: "地址",
dataIndex: "host",
key: "host",
},
{
title: "支持的api数量",
dataIndex: "supportNums",
key: "supportNums",
},
{
title: "不支持的api数量",
dataIndex: "unSupportNums",
key: "unSupportNums",
},
{
title: "操作",
key: "operation",
scopedSlots: { customRender: "operation" },
},
];
</script>
<style scoped>
.card-style {
width: 100%;
}
</style>

View File

@@ -232,11 +232,13 @@ export default {
}
},
onDeleteUser(row) {
this.loading = true;
request({
url: KafkaAclApi.deleteKafkaUser.url,
method: KafkaAclApi.deleteKafkaUser.method,
data: { username: row.username },
}).then((res) => {
this.loading = false;
this.getAclList();
if (res.code == 0) {
this.$message.success(res.msg);

View File

@@ -8,20 +8,22 @@
:footer="null"
@cancel="handleCancel"
>
<a-form :form="form" :label-col="{ span: 5 }" :wrapper-col="{ span: 12 }">
<a-form-item label="用户名">
<span>{{ user.username }}</span>
</a-form-item>
<a-form-item label="密码">
<span>{{ user.password }}</span>
</a-form-item>
<a-form-item label="凭证信息">
<span>{{ user.credentialInfos }}</span>
</a-form-item>
<a-form-item label="数据一致性说明">
<strong>{{ user.consistencyDescription }}</strong>
</a-form-item>
</a-form>
<a-spin :spinning="loading">
<a-form :form="form" :label-col="{ span: 5 }" :wrapper-col="{ span: 12 }">
<a-form-item label="用户名">
<span>{{ user.username }}</span>
</a-form-item>
<a-form-item label="密码">
<span>{{ user.password }}</span>
</a-form-item>
<a-form-item label="凭证信息">
<span>{{ user.credentialInfos }}</span>
</a-form-item>
<a-form-item label="数据一致性说明">
<strong>{{ user.consistencyDescription }}</strong>
</a-form-item>
</a-form>
</a-spin>
</a-modal>
</template>
@@ -47,6 +49,7 @@ export default {
show: this.visible,
form: this.$form.createForm(this, { name: "UserDetailForm" }),
user: {},
loading: false,
};
},
watch: {
@@ -63,11 +66,13 @@ export default {
},
getUserDetail() {
const api = KafkaAclApi.getKafkaUserDetail;
this.loading = true;
request({
url: api.url,
method: api.method,
params: { username: this.username },
}).then((res) => {
this.loading = false;
if (res.code != 0) {
this.$message.error(res.msg);
} else {

View File

@@ -45,6 +45,7 @@
import request from "@/utils/request";
import { KafkaClusterApi } from "@/utils/api";
import BrokerConfig from "@/views/cluster/BrokerConfig";
import notification from "ant-design-vue/lib/notification";
export default {
name: "Topic",
@@ -68,8 +69,15 @@ export default {
method: KafkaClusterApi.getClusterInfo.method,
}).then((res) => {
this.loading = false;
this.data = res.data.nodes;
this.clusterId = res.data.clusterId;
if (res.code == 0) {
this.data = res.data.nodes;
this.clusterId = res.data.clusterId;
} else {
notification.error({
message: "error",
description: res.msg,
});
}
});
},
openBrokerConfigDialog(record, isLoggerConfig) {

View File

@@ -196,7 +196,14 @@ export default {
data: this.queryParam,
}).then((res) => {
this.loading = false;
this.data = res.data.list;
if (res.code == 0) {
this.data = res.data.list;
} else {
notification.error({
message: "error",
description: res.msg,
});
}
});
},
deleteGroup(group) {

View File

@@ -0,0 +1,61 @@
<template>
<a-modal
title="API版本信息"
:visible="show"
:width="600"
:mask="false"
:destroyOnClose="true"
:footer="null"
:maskClosable="false"
@cancel="handleCancel"
>
<div>
<h3>格式说明</h3>
<p>请求类型(1)0 to n(2) [usage: v](3)</p>
<ol>
<li>表示客户端发出的请求类型</li>
<li>该请求在broker中支持的版本号区间</li>
<li>
表示当前控制台的kafka客户端使用的是v版本如果是UNSUPPORTED说明broker版本太老无法处理控制台的这些请求可能影响相关功能的使用
</li>
</ol>
<hr />
<ol>
<li v-for="info in versionInfo" v-bind:key="info">{{ info }}</li>
</ol>
</div>
</a-modal>
</template>
<script>
export default {
name: "APIVersionInfo",
props: {
versionInfo: {
type: Array,
},
visible: {
type: Boolean,
default: false,
},
},
data() {
return {
show: false,
};
},
watch: {
visible(v) {
this.show = v;
},
},
methods: {
handleCancel() {
this.$emit("closeApiVersionInfoDialog", {});
},
},
};
</script>
<style scoped></style>

View File

@@ -203,7 +203,7 @@ export default {
this.$emit("closeDetailDialog", { refresh: false });
},
formatTime(time) {
return moment(time).format("YYYY-MM-DD HH:mm:ss:SSS");
return time == -1 ? -1 : moment(time).format("YYYY-MM-DD HH:mm:ss:SSS");
},
keyDeserializerChange() {
this.getMessageDetail();

View File

@@ -9,6 +9,7 @@
return index;
}
"
@change="handleChange"
>
<div slot="operation" slot-scope="record">
<a-button
@@ -41,9 +42,9 @@ export default {
},
data() {
return {
columns: columns,
showDetailDialog: false,
record: {},
sortedInfo: null,
};
},
methods: {
@@ -54,42 +55,56 @@ export default {
closeDetailDialog() {
this.showDetailDialog = false;
},
},
};
const columns = [
{
title: "topic",
dataIndex: "topic",
key: "topic",
width: 300,
},
{
title: "分区",
dataIndex: "partition",
key: "partition",
},
{
title: "偏移",
dataIndex: "offset",
key: "offset",
},
{
title: "时间",
dataIndex: "timestamp",
key: "timestamp",
slots: { title: "timestamp" },
scopedSlots: { customRender: "timestamp" },
customRender: (text) => {
return moment(text).format("YYYY-MM-DD HH:mm:ss:SSS");
handleChange() {
this.sortedInfo = arguments[2];
},
},
{
title: "操作",
key: "operation",
scopedSlots: { customRender: "operation" },
width: 200,
computed: {
columns() {
let sortedInfo = this.sortedInfo || {};
const columns = [
{
title: "topic",
dataIndex: "topic",
key: "topic",
width: 300,
},
{
title: "分区",
dataIndex: "partition",
key: "partition",
},
{
title: "偏移",
dataIndex: "offset",
key: "offset",
},
{
title: "时间",
dataIndex: "timestamp",
key: "timestamp",
slots: { title: "timestamp" },
scopedSlots: { customRender: "timestamp" },
customRender: (text) => {
return text == -1
? -1
: moment(text).format("YYYY-MM-DD HH:mm:ss:SSS");
},
sorter: (a, b) => a.timestamp - b.timestamp,
sortOrder: sortedInfo.columnKey === "timestamp" && sortedInfo.order,
sortDirections: ["ascend", "descend"],
},
{
title: "操作",
key: "operation",
scopedSlots: { customRender: "operation" },
width: 200,
},
];
return columns;
},
},
];
};
</script>
<style scoped></style>

View File

@@ -59,13 +59,90 @@
</a-form-item>
</a-col>
</a-row>
<hr class="hr" />
<a-row :gutter="24">
<a-col :span="5">
<a-form-item label="消息过滤">
<a-select
class="filter-select"
option-filter-prop="children"
v-decorator="['filter', { initialValue: 'none' }]"
@change="onFilterChange"
>
<a-select-option value="none"> 不启用过滤 </a-select-option>
<a-select-option value="body">
根据消息体过滤
</a-select-option>
<a-select-option value="header">
根据消息头过滤
</a-select-option>
</a-select>
</a-form-item>
</a-col>
<div v-show="showBodyFilter">
<a-col :span="8">
<a-form-item label="消息内容">
<a-input
class="msg-body"
v-decorator="['value']"
placeholder="请输入消息内容"
/>
</a-form-item>
</a-col>
<a-col :span="8">
<a-form-item label="消息类型">
<a-select
v-decorator="[
'valueDeserializer',
{ initialValue: 'String' },
]"
class="body-type"
>
<a-select-option
v-for="v in deserializerList"
:key="v"
:value="v"
>
{{ v }}
</a-select-option>
</a-select>
<span class="hint"
>String类型模糊匹配数字类型绝对匹配其它不支持</span
>
</a-form-item>
</a-col>
</div>
<div v-show="showHeaderFilter">
<a-col :span="5">
<a-form-item label="Key">
<a-input
v-decorator="['headerKey']"
placeholder="消息头的key"
/>
</a-form-item>
</a-col>
<a-col :span="11">
<a-form-item label="Value">
<a-input
v-decorator="['headerValue']"
placeholder="消息头对应key的value"
/>
<span class="hint"
>消息头的value不是字符串类型就不要输入value用来过滤了可以只输入消息头的key过滤存在该key的消息</span
>
</a-form-item>
</a-col>
</div>
</a-row>
</a-form>
</div>
<p style="margin-top: 1%">
<strong
>检索条数:{{ data.realNum }}允许返回的最大条数:{{
>检索消息条数:{{ data.searchNum }}实际返回条数:{{
data.realNum
}},允许返回的最大条数:{{
data.maxNum
}}</strong
}},如果当前时间段消息量太大,可以缩小查询时间范围或指定某一个分区进行查询</strong
>
</p>
<MessageList :data="data.data"></MessageList>
@@ -97,6 +174,9 @@ export default {
rules: [{ type: "array", required: true, message: "请选择时间!" }],
},
data: defaultData,
deserializerList: [],
showBodyFilter: false,
showHeaderFilter: false,
};
},
methods: {
@@ -151,9 +231,43 @@ export default {
this.selectPartition = -1;
this.getPartitionInfo(topic);
},
onFilterChange(e) {
switch (e) {
case "body":
this.showBodyFilter = true;
this.showHeaderFilter = false;
break;
case "header":
this.showHeaderFilter = true;
this.showBodyFilter = false;
break;
default:
this.showBodyFilter = false;
this.showHeaderFilter = false;
break;
}
},
getDeserializerList() {
request({
url: KafkaMessageApi.deserializerList.url,
method: KafkaMessageApi.deserializerList.method,
}).then((res) => {
if (res.code != 0) {
notification.error({
message: "error",
description: res.msg,
});
} else {
this.deserializerList = res.data;
}
});
},
},
created() {
this.getDeserializerList();
},
};
const defaultData = { realNum: 0, maxNum: 0 };
const defaultData = { realNum: 0, maxNum: 0, searchNum: 0 };
</script>
<style scoped>
@@ -195,7 +309,31 @@ const defaultData = { realNum: 0, maxNum: 0 };
width: 400px !important;
}
.filter-select {
width: 160px !important;
}
.body-type {
width: 120px;
}
.msg-body {
width: 400px;
}
.type-select {
width: 150px !important;
}
.hint {
font-size: smaller;
color: green;
}
.ant-advanced-search-form {
padding-bottom: 0px;
}
.hr {
height: 1px;
border: none;
border-top: 1px dashed #0066cc;
}
</style>

View File

@@ -0,0 +1,163 @@
<template>
<a-modal
title="增加集群配置"
:visible="show"
:width="1000"
:mask="false"
:destroyOnClose="true"
:footer="null"
:maskClosable="false"
@cancel="handleCancel"
>
<div>
<a-spin :spinning="loading">
<a-form
:form="form"
:label-col="{ span: 5 }"
:wrapper-col="{ span: 12 }"
@submit="handleSubmit"
>
<a-form-item label="集群名称">
<a-input
v-decorator="[
'clusterName',
{
rules: [{ required: true, message: '输入集群名称!' }],
initialValue: clusterInfo.clusterName,
},
]"
placeholder="输入集群名称"
/>
</a-form-item>
<a-form-item label="集群地址">
<a-input
v-decorator="[
'address',
{
rules: [{ required: true, message: '输入集群地址!' }],
initialValue: clusterInfo.address,
},
]"
placeholder="输入集群地址"
/>
</a-form-item>
<a-form-item label="属性">
<a-textarea
rows="5"
placeholder='可选参数,集群其它属性配置:
request.timeout.ms=10000
security.protocol=SASL_PLAINTEXT
sasl.mechanism=SCRAM-SHA-256
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="name" password="password";
'
v-decorator="[
'properties',
{ initialValue: clusterInfo.properties },
]"
/>
</a-form-item>
<a-form-item :wrapper-col="{ span: 12, offset: 5 }">
<a-button type="primary" html-type="submit"> 提交</a-button>
</a-form-item>
</a-form>
</a-spin>
</div>
</a-modal>
</template>
<script>
import request from "@/utils/request";
import { KafkaClusterApi } from "@/utils/api";
import notification from "ant-design-vue/es/notification";
import { getClusterInfo } from "@/utils/local-cache";
import { mapMutations } from "vuex";
import { CLUSTER } from "@/store/mutation-types";
export default {
name: "AddClusterInfo",
props: {
visible: {
type: Boolean,
default: false,
},
isModify: {
type: Boolean,
default: false,
},
clusterInfo: {
type: Object,
default: () => defaultInfo,
},
closeDialogEvent: {
type: String,
default: "closeAddClusterInfoDialog",
},
},
data() {
return {
show: this.visible,
data: [],
loading: false,
form: this.$form.createForm(this, { name: "AddClusterInfoForm" }),
};
},
watch: {
visible(v) {
this.show = v;
},
},
methods: {
handleSubmit(e) {
e.preventDefault();
this.form.validateFields((err, values) => {
if (!err) {
this.loading = true;
const api = this.isModify
? KafkaClusterApi.updateClusterInfo
: KafkaClusterApi.addClusterInfo;
const data = this.isModify
? Object.assign({}, this.clusterInfo, values)
: Object.assign({}, values);
request({
url: api.url,
method: api.method,
data: data,
}).then((res) => {
this.loading = false;
if (res.code == 0) {
this.$message.success(res.msg);
this.$emit(this.closeDialogEvent, { refresh: true });
if (this.isModify) {
let clusterInfo = getClusterInfo();
if (
clusterInfo &&
clusterInfo.id &&
clusterInfo.id == this.clusterInfo.id &&
clusterInfo.clusterName != data.clusterName
) {
this.switchCluster(data);
}
}
} else {
notification.error({
message: "error",
description: res.msg,
});
}
});
}
});
},
handleCancel() {
this.data = [];
this.$emit(this.closeDialogEvent, { refresh: false });
},
...mapMutations({
switchCluster: CLUSTER.SWITCH,
}),
},
};
const defaultInfo = { clusterName: "", address: "", properties: "" };
</script>
<style scoped></style>

View File

@@ -0,0 +1,219 @@
<template>
<a-modal
title="集群信息"
:visible="show"
:width="1200"
:mask="false"
:destroyOnClose="true"
:footer="null"
:maskClosable="false"
@cancel="handleCancel"
>
<div>
<a-spin :spinning="loading">
<div>
<a-button
type="primary"
href="javascript:;"
class="operation-btn"
@click="openAddClusterInfoDialog"
>新增集群
</a-button>
<br /><br />
</div>
<a-table
:columns="columns"
:data-source="data"
bordered
:rowKey="(record) => record.id"
>
<div slot="properties" slot-scope="record">
<div v-for="p in record" :key="p">{{ p }}</div>
</div>
<div slot="operation" slot-scope="record">
<a-button
type="primary"
size="small"
href="javascript:;"
class="operation-btn"
@click="switchCluster(record)"
>切换
</a-button>
<a-button
size="small"
href="javascript:;"
class="operation-btn"
@click="openUpdateClusterInfoDialog(record)"
>编辑
</a-button>
<a-popconfirm
:title="'删除: ' + record.clusterName + ''"
ok-text="确认"
cancel-text="取消"
@confirm="deleteClusterInfo(record)"
>
<a-button
size="small"
href="javascript:;"
class="operation-btn"
type="danger"
>删除
</a-button>
</a-popconfirm>
</div>
</a-table>
<AddClusterInfo
:visible="showAddClusterInfoDialog"
@closeAddClusterInfoDialog="closeAddClusterInfoDialog"
>
</AddClusterInfo>
<AddClusterInfo
:visible="showUpdateClusterInfoDialog"
closeDialogEvent="closeUpdateClusterInfoDialog"
@closeUpdateClusterInfoDialog="closeUpdateClusterInfoDialog"
:cluster-info="select"
:is-modify="true"
>
</AddClusterInfo>
</a-spin>
</div>
</a-modal>
</template>
<script>
import request from "@/utils/request";
import { KafkaClusterApi } from "@/utils/api";
import AddClusterInfo from "@/views/op/AddClusterInfo";
import notification from "ant-design-vue/lib/notification";
import { mapMutations } from "vuex";
import { CLUSTER } from "@/store/mutation-types";
export default {
name: "Cluster",
components: { AddClusterInfo },
props: {
visible: {
type: Boolean,
default: false,
},
},
data() {
return {
columns: columns,
show: this.visible,
data: [],
loading: false,
showAddClusterInfoDialog: false,
showUpdateClusterInfoDialog: false,
select: {},
};
},
watch: {
visible(v) {
this.show = v;
if (this.show) {
this.getClusterInfoList();
}
},
},
methods: {
getClusterInfoList() {
this.loading = true;
request({
url: KafkaClusterApi.getClusterInfoList.url,
method: KafkaClusterApi.getClusterInfoList.method,
}).then((res) => {
this.loading = false;
this.data = res.data;
});
},
deleteClusterInfo(record) {
request({
url: KafkaClusterApi.deleteClusterInfo.url,
method: KafkaClusterApi.deleteClusterInfo.method,
data: Object.assign({}, { id: record.id }),
}).then((res) => {
this.loading = false;
if (res.code == 0) {
this.$message.success(res.msg);
this.getClusterInfoList();
} else {
notification.error({
message: "error",
description: res.msg,
});
}
});
},
handleCancel() {
this.data = [];
this.$emit("closeClusterInfoDialog", {});
},
openAddClusterInfoDialog() {
this.showAddClusterInfoDialog = true;
},
closeAddClusterInfoDialog(res) {
this.showAddClusterInfoDialog = false;
if (res.refresh) {
this.getClusterInfoList();
}
},
openUpdateClusterInfoDialog(record) {
this.showUpdateClusterInfoDialog = true;
const r = Object.assign({}, record);
if (r.properties) {
let str = "";
r.properties.forEach((e) => {
str = str + e + "\r\n";
});
r.properties = str;
}
this.select = r;
},
closeUpdateClusterInfoDialog(res) {
this.showUpdateClusterInfoDialog = false;
if (res.refresh) {
this.getClusterInfoList();
}
},
...mapMutations({
switchCluster: CLUSTER.SWITCH,
}),
},
};
const columns = [
{
title: "集群名称",
dataIndex: "clusterName",
key: "clusterName",
},
{
title: "地址",
dataIndex: "address",
key: "address",
width: 400,
},
{
title: "属性",
dataIndex: "properties",
key: "properties",
scopedSlots: { customRender: "properties" },
width: 300,
},
{
title: "操作",
key: "operation",
scopedSlots: { customRender: "operation" },
width: 200,
},
];
</script>
<style scoped>
.operation-btn {
margin-right: 3%;
}
</style>

View File

@@ -1,5 +1,18 @@
<template>
<div class="content">
<div class="content-module">
<a-card title="集群管理" style="width: 100%; text-align: left">
<p>
<a-button type="primary" @click="openClusterInfoDialog">
集群切换
</a-button>
<label>说明</label>
<span
>多集群管理增加删除集群配置切换选中集群为当前操作集群</span
>
</p>
</a-card>
</div>
<div class="content-module">
<a-card title="Broker管理" style="width: 100%; text-align: left">
<p>
@@ -36,9 +49,19 @@
<label>说明</label>
<span>查看正在进行副本变更/重分配的任务或者将其取消</span>
</p>
<p>
<a-button type="primary" @click="openReplicaReassignDialog">
副本重分配
</a-button>
<label>说明</label>
<span
>副本所在节点重新分配打个比方集群有6个节点分区1的3个副本在节点123现在将它们重新分配到345</span
>
</p>
</a-card>
</div>
<div class="content-module">
<!-- 隐藏数据同步相关-->
<div class="content-module" v-show="false">
<a-card title="数据同步" style="width: 100%; text-align: left">
<p v-show="true">
<a-button type="primary" @click="openDataSyncSchemeDialog">
@@ -107,6 +130,15 @@
:visible="replicationManager.showCurrentReassignmentsDialog"
@closeCurrentReassignmentsDialog="closeCurrentReassignmentsDialog"
></CurrentReassignments>
<ClusterInfo
:visible="clusterManager.showClusterInfoDialog"
@closeClusterInfoDialog="closeClusterInfoDialog"
></ClusterInfo>
<ReplicaReassign
:visible="replicationManager.showReplicaReassignDialog"
@closeReplicaReassignDialog="closeReplicaReassignDialog"
>
</ReplicaReassign>
</div>
</template>
@@ -119,6 +151,8 @@ import DataSyncScheme from "@/views/op/DataSyncScheme";
import ConfigThrottle from "@/views/op/ConfigThrottle";
import RemoveThrottle from "@/views/op/RemoveThrottle";
import CurrentReassignments from "@/views/op/CurrentReassignments";
import ClusterInfo from "@/views/op/ClusterInfo";
import ReplicaReassign from "@/views/op/ReplicaReassign";
export default {
name: "Operation",
components: {
@@ -130,6 +164,8 @@ export default {
ConfigThrottle,
RemoveThrottle,
CurrentReassignments,
ClusterInfo,
ReplicaReassign,
},
data() {
return {
@@ -142,11 +178,15 @@ export default {
replicationManager: {
showElectPreferredLeaderDialog: false,
showCurrentReassignmentsDialog: false,
showReplicaReassignDialog: false,
},
brokerManager: {
showConfigThrottleDialog: false,
showRemoveThrottleDialog: false,
},
clusterManager: {
showClusterInfoDialog: false,
},
};
},
methods: {
@@ -198,6 +238,18 @@ export default {
closeCurrentReassignmentsDialog() {
this.replicationManager.showCurrentReassignmentsDialog = false;
},
openClusterInfoDialog() {
this.clusterManager.showClusterInfoDialog = true;
},
closeClusterInfoDialog() {
this.clusterManager.showClusterInfoDialog = false;
},
openReplicaReassignDialog() {
this.replicationManager.showReplicaReassignDialog = true;
},
closeReplicaReassignDialog() {
this.replicationManager.showReplicaReassignDialog = false;
},
},
};
</script>

View File

@@ -0,0 +1,296 @@
<template>
<a-modal
title="副本重分配"
:visible="show"
:width="800"
:mask="false"
:destroyOnClose="true"
:footer="null"
:maskClosable="false"
@cancel="handleCancel"
>
<div>
<a-spin :spinning="loading">
<a-form
:form="form"
:label-col="{ span: 5 }"
:wrapper-col="{ span: 12 }"
@submit="handleSubmit"
>
<a-form-item label="Topic">
<a-select
@change="handleTopicChange"
show-search
option-filter-prop="children"
v-decorator="[
'topic',
{ rules: [{ required: true, message: '请选择一个topic!' }] },
]"
placeholder="请选择一个topic"
>
<a-select-option v-for="v in topicList" :key="v" :value="v">
{{ v }}
</a-select-option>
</a-select>
</a-form-item>
<a-form-item label="分配到Broker">
<a-select
mode="multiple"
option-filter-prop="children"
v-decorator="[
'brokers',
{
initialValue: brokers,
rules: [{ required: true, message: '请选择一个broker!' }],
},
]"
placeholder="请选择一个broker"
>
<a-select-option v-for="v in brokers" :key="v" :value="v">
<span v-if="v == -1">全部</span> <span v-else>{{ v }}</span>
</a-select-option>
</a-select>
</a-form-item>
<a-table
bordered
:columns="columns"
:data-source="currentAssignment"
:rowKey="
(record, index) => {
return index;
}
"
>
</a-table>
<a-form-item :wrapper-col="{ span: 12, offset: 5 }">
<a-button type="primary" html-type="submit">
重新生成分配计划
</a-button>
</a-form-item>
</a-form>
<hr />
<h2>新的分配计划</h2>
<a-table
bordered
:columns="columns"
:data-source="proposedAssignmentShow"
:rowKey="
(record, index) => {
return index;
}
"
>
</a-table>
<a-button type="danger" @click="updateAssignment"> 更新分配 </a-button>
</a-spin>
<hr />
<h4>注意</h4>
<ul>
<li>
副本重分配可以将副本分配到其它broker上通过选择上面的broker节点根据这几个节点生成分配方案
</li>
<li>
选择的broker的节点数量不能少于当前的副本数比如有3个副本至少需要3个broker节点
</li>
<li>
数据量太大考虑设置一下限流毕竟重新分配后不同broker之间可能做数据迁移
</li>
</ul>
</div>
</a-modal>
</template>
<script>
import request from "@/utils/request";
import { KafkaTopicApi, KafkaOpApi, KafkaClusterApi } from "@/utils/api";
import notification from "ant-design-vue/es/notification";
export default {
name: "ReplicaReassign",
props: {
visible: {
type: Boolean,
default: false,
},
},
data() {
return {
show: this.visible,
data: [],
loading: false,
form: this.$form.createForm(this, { name: "ReplicaReassignForm" }),
topicList: [],
partitions: [],
brokers: [],
currentAssignment: [],
proposedAssignment: [],
proposedAssignmentShow: [],
columns,
};
},
watch: {
visible(v) {
this.show = v;
if (this.show) {
this.clearData();
this.getTopicNameList();
this.getClusterInfo();
}
},
},
methods: {
handleSubmit(e) {
e.preventDefault();
this.form.validateFields((err, values) => {
if (!err) {
this.getProposedAssignment(values);
}
});
},
getTopicReplicaInfo(topic) {
this.loading = true;
request({
url: KafkaTopicApi.getCurrentReplicaAssignment.url + "?topic=" + topic,
method: KafkaTopicApi.getCurrentReplicaAssignment.method,
}).then((res) => {
this.loading = false;
if (res.code == 0) {
this.currentAssignment = res.data.partitions;
this.currentAssignment.forEach(
(e) => (e.replicas = e.replicas.join(","))
);
} else {
notification.error({
message: "error",
description: res.msg,
});
}
});
},
getTopicNameList() {
request({
url: KafkaTopicApi.getTopicNameList.url,
method: KafkaTopicApi.getTopicNameList.method,
}).then((res) => {
if (res.code == 0) {
this.topicList = res.data;
} else {
notification.error({
message: "error",
description: res.msg,
});
}
});
},
getPartitionInfo(topic) {
this.loading = true;
request({
url: KafkaTopicApi.getPartitionInfo.url + "?topic=" + topic,
method: KafkaTopicApi.getPartitionInfo.method,
}).then((res) => {
this.loading = false;
if (res.code != 0) {
notification.error({
message: "error",
description: res.msg,
});
} else {
this.partitions = res.data.map((v) => v.partition);
this.partitions.splice(0, 0, -1);
}
});
},
handleTopicChange(topic) {
// this.getPartitionInfo(topic);
this.clearData();
this.getTopicReplicaInfo(topic);
},
getClusterInfo() {
this.loading = true;
request({
url: KafkaClusterApi.getClusterInfo.url,
method: KafkaClusterApi.getClusterInfo.method,
}).then((res) => {
this.loading = false;
this.brokers = [];
res.data.nodes.forEach((node) => this.brokers.push(node.id));
});
},
getProposedAssignment(params) {
this.loading = true;
request({
url: KafkaOpApi.proposedAssignment.url,
method: KafkaOpApi.proposedAssignment.method,
data: params,
}).then((res) => {
this.loading = false;
if (res.code != 0) {
notification.error({
message: "error",
description: res.msg,
});
} else {
this.proposedAssignmentShow = res.data;
this.proposedAssignment = JSON.parse(
JSON.stringify(this.proposedAssignmentShow)
);
this.proposedAssignmentShow.forEach(
(e) => (e.replicas = e.replicas.join(","))
);
}
});
},
clearData() {
this.currentAssignment = [];
this.proposedAssignment = [];
this.proposedAssignmentShow = [];
},
handleCancel() {
this.data = [];
this.$emit("closeReplicaReassignDialog", { refresh: false });
},
updateAssignment() {
this.form.validateFields((err, values) => {
if (!err) {
if (this.proposedAssignment.length == 0) {
this.$message.warn("请先生成分配计划!");
return;
}
this.loading = true;
request({
url: KafkaTopicApi.updateReplicaAssignment.url,
method: KafkaTopicApi.updateReplicaAssignment.method,
data: { partitions: this.proposedAssignment },
}).then((res) => {
this.loading = false;
if (res.code == 0) {
this.$message.success(res.msg);
this.handleTopicChange(values.topic);
} else {
notification.error({
message: "error",
description: res.msg,
});
}
});
}
});
},
},
};
const columns = [
{
title: "分区",
dataIndex: "partition",
key: "partition",
},
{
title: "副本所在broker",
dataIndex: "replicas",
key: "replicas",
scopedSlots: { customRender: "replicas" },
},
];
</script>
<style scoped></style>

View File

@@ -54,7 +54,7 @@
formatTime(record.beginTime)
}}</span>
~
<span class="red-font">{{ formatTime(record.endTime) }}</span>
<span class="green-font">{{ formatTime(record.endTime) }}</span>
</p>
</a-table>
<p>友情提示点击+号展开可以查看当前分区的有效消息的时间范围</p>

View File

@@ -15,6 +15,7 @@
placeholder="topic"
class="input-w"
v-decorator="['topic']"
@change="onTopicUpdate"
/>
</a-form-item>
</a-col>
@@ -22,8 +23,9 @@
<a-form-item :label="`类型`">
<a-select
class="type-select"
v-decorator="['type', { initialValue: 'normal' }]"
placeholder="Please select a country"
v-model="type"
placeholder="选择类型"
@change="getTopicList"
>
<a-select-option value="all"> 所有</a-select-option>
<a-select-option value="normal"> 普通</a-select-option>
@@ -34,10 +36,10 @@
<a-col :span="8" :style="{ textAlign: 'right' }">
<a-form-item>
<a-button type="primary" html-type="submit"> 搜索</a-button>
<a-button :style="{ marginLeft: '8px' }" @click="handleReset">
重置
</a-button>
<a-button type="primary" html-type="submit"> 刷新</a-button>
<!-- <a-button :style="{ marginLeft: '8px' }" @click="handleReset">-->
<!-- 重置-->
<!-- </a-button>-->
</a-form-item>
</a-col>
</a-row>
@@ -48,7 +50,12 @@
>新增</a-button
>
</div>
<a-table :columns="columns" :data-source="data" bordered row-key="name">
<a-table
:columns="columns"
:data-source="filteredData"
bordered
row-key="name"
>
<div slot="partitions" slot-scope="text, record">
<a href="#" @click="openPartitionInfoDialog(record.name)"
>{{ text }}
@@ -215,6 +222,9 @@ export default {
showUpdateReplicaDialog: false,
showThrottleDialog: false,
showSendStatsDialog: false,
filterTopic: "",
filteredData: [],
type: "normal",
};
},
methods: {
@@ -222,13 +232,12 @@ export default {
e.preventDefault();
this.getTopicList();
},
handleReset() {
this.form.resetFields();
},
getTopicList() {
Object.assign(this.queryParam, this.form.getFieldsValue());
Object.assign(this.queryParam, { type: this.type });
// delete this.queryParam.topic;
this.loading = true;
request({
url: KafkaTopicApi.getTopicList.url,
@@ -236,7 +245,15 @@ export default {
params: this.queryParam,
}).then((res) => {
this.loading = false;
this.data = res.data;
if (res.code == 0) {
this.data = res.data;
this.filter();
} else {
notification.error({
message: "error",
description: res.msg,
});
}
});
},
deleteTopic(topic) {
@@ -255,6 +272,15 @@ export default {
}
});
},
onTopicUpdate(input) {
this.filterTopic = input.target.value;
this.filter();
},
filter() {
this.filteredData = this.data.filter(
(e) => e.name.indexOf(this.filterTopic) != -1
);
},
openPartitionInfoDialog(topic) {
this.selectDetail.resourceName = topic;
this.showPartitionInfo = true;