Compare commits
27 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
e9f34e1d19 | ||
|
|
ccdcebb24d | ||
|
|
7ddd75e34f | ||
|
|
aebea435fa | ||
|
|
ea788313c6 | ||
|
|
727edfcca8 | ||
|
|
cc1989a74b | ||
|
|
0196a90b69 | ||
|
|
9c3e3988e0 | ||
|
|
458e13c9e0 | ||
|
|
979859b232 | ||
|
|
b163e5f776 | ||
|
|
d062e18940 | ||
|
|
87c1e7ba4a | ||
|
|
5194c952f2 | ||
|
|
c1cc44d32f | ||
|
|
82fafe980d | ||
|
|
34752deca2 | ||
|
|
9e42e2c72a | ||
|
|
e531f5d786 | ||
|
|
10e75ac55d | ||
|
|
4a8d09dc89 | ||
|
|
116bc100a7 | ||
|
|
b1feaad9f7 | ||
|
|
4d372f8374 | ||
|
|
4b2c544c0d | ||
|
|
8131cb1a42 |
27
README.md
@@ -1,6 +1,6 @@
|
||||
# kafka可视化管理平台
|
||||
一款轻量级的kafka可视化管理平台,安装配置快捷、简单易用。
|
||||
为了开发的省事,没有国际化支持,只支持中文展示。
|
||||
为了开发的省事,没有国际化支持,页面只支持中文展示。
|
||||
用过rocketmq-console吧,对,前端展示风格跟那个有点类似。
|
||||
|
||||
## 页面预览
|
||||
@@ -22,7 +22,9 @@ acl配置说明,如果kafka集群启用了ACL,但是控制台没看到Acl菜
|
||||

|
||||
|
||||
## 安装包下载
|
||||
点击下载(v1.0.3版本):[kafka-console-ui.zip](https://github.com/xxd763795151/kafka-console-ui/releases/download/v1.0.3/kafka-console-ui.zip)
|
||||
点击下载(v1.0.4版本):[kafka-console-ui.zip](https://github.com/xxd763795151/kafka-console-ui/releases/download/v1.0.4/kafka-console-ui.zip)
|
||||
|
||||
如果安装包下载的比较慢,可以查看下面的源码打包说明,把代码下载下来,快速打包,不过最新main分支代码刚升级了kafka版本到3.2.0,还没有充分测试,如果需要稳定版本,可以下载 1.0.4-release分支代码
|
||||
|
||||
## 快速使用
|
||||
### Windows
|
||||
@@ -61,7 +63,7 @@ sh bin/shutdown.sh
|
||||
在新增集群的时候,除了集群地址还可以输入集群的其它属性配置,比如请求超时,ACL配置等。如果开启了ACL,切换到该集群的时候,导航栏上便会出现ACL菜单,支持进行相关操作(目前是基于SASL_SCRAM认证授权管理支持的最完善,其它的我也没验证过,虽然是我开发的,但是我也没具体全部验证这一块功能,授权部分应该是通用的)
|
||||
|
||||
## kafka版本
|
||||
* 当前使用的kafka 2.8.0
|
||||
* 当前使用的kafka 3.2.0
|
||||
## 监控
|
||||
仅提供运维管理功能,监控、告警需要配合其它组件,如有需要,建议请查看:https://blog.csdn.net/x763795151/article/details/119705372
|
||||
|
||||
@@ -70,3 +72,22 @@ sh bin/shutdown.sh
|
||||
|
||||
## 本地开发
|
||||
如果需要本地开发,开发环境配置查看:[本地开发](./document/develop/开发配置.md)
|
||||
|
||||
## 登录认证和权限
|
||||
目前主分支不支持登录认证,感谢@dongyinuo 同学开发了一版支持登录认证,及相关的按钮权限(主要有两个角色:管理员和普通开发人员)。
|
||||
在分支:feature/dongyinuo/20220501/devops 上。
|
||||
如果有需要使用管理台登录认证的,可以切换到这个分支上进行打包,打包方式看 源码打包 说明。
|
||||
默认登录账户:admin/kafka-console-ui521
|
||||
|
||||
## DockerCompose部署
|
||||
感谢@wdkang123 同学分享的部署方式,如果有需要请查看[DockerCompose部署方式](./document/deploy/docker部署.md)
|
||||
|
||||
## 联系方式
|
||||
+ 微信群
|
||||
<img src="./document/contact/weixin_contact.jpg" width="40%"/>
|
||||
|
||||
[//]: # (<img src="https://github.com/xxd763795151/kafka-console-ui/blob/main/document/contact/weixin_contact.jpeg" width="40%"/>)
|
||||
|
||||
+ 若联系方式失效, 请联系加一下微信, 说明意图
|
||||
- xxd763795151
|
||||
- wxid_7jy2ezljvebt12
|
||||
|
||||
@@ -5,4 +5,4 @@ set JAVA_OPTS=-Xmx512m -Xms512m -Xmn256m -Xss256k
|
||||
set CONFIG_FILE=../config/application.yml
|
||||
set TARGET=../lib/kafka-console-ui.jar
|
||||
set DATA_DIR=..
|
||||
%JAVA_CMD% -jar %TARGET% --spring.config.location=%CONFIG_FILE% --data.dir=%DATA_DIR%
|
||||
"%JAVA_CMD%" -jar %TARGET% --spring.config.location=%CONFIG_FILE% --data.dir=%DATA_DIR%
|
||||
|
||||
BIN
document/contact/weixin_contact.jpg
Normal file
|
After Width: | Height: | Size: 205 KiB |
189
document/deploy/docker部署.md
Normal file
@@ -0,0 +1,189 @@
|
||||
# Docker/DockerCompose部署
|
||||
|
||||
# 1.快速上手
|
||||
|
||||
## 1.1 镜像拉取
|
||||
|
||||
```shell
|
||||
docker pull wdkang/kafka-console-ui
|
||||
```
|
||||
|
||||
|
||||
|
||||
## 1.2 查看镜像
|
||||
|
||||
```shell
|
||||
docker images
|
||||
```
|
||||
|
||||
|
||||
|
||||
## 1.3 启动服务
|
||||
|
||||
由于Docker内不会对数据进行持久化 所以这里推荐将数据目录映射到实体机中
|
||||
|
||||
详见 **2.数据持久**
|
||||
|
||||
```shell
|
||||
docker run -d -p 7766:7766 wdkang/kafka-console-ui
|
||||
```
|
||||
|
||||
|
||||
|
||||
## 1.4 查看状态
|
||||
|
||||
```shell
|
||||
docker ps -a
|
||||
```
|
||||
|
||||
|
||||
|
||||
## 1.5 查看日志
|
||||
|
||||
```shell
|
||||
docker logs -f ${containerId}
|
||||
```
|
||||
|
||||
|
||||
|
||||
## 1.6 访问服务
|
||||
|
||||
```shell
|
||||
http://localhost:7766
|
||||
```
|
||||
|
||||
|
||||
|
||||
# 2. 数据持久
|
||||
|
||||
推荐对数据进行持久化
|
||||
|
||||
## 2.1 新建目录
|
||||
|
||||
```shell
|
||||
mkdir -p /home/kafka-console-ui/data /home/kafka-console-ui/log
|
||||
cd /home/kafka-console-ui
|
||||
```
|
||||
|
||||
## 2.2 启动服务
|
||||
|
||||
```shell
|
||||
docker run -d -p 7766:7766 -v $PWD/data:/app/data -v $PWD/log:/app/log wdkang/kafka-console-ui
|
||||
```
|
||||
|
||||
|
||||
|
||||
# 3.自主打包
|
||||
|
||||
## 3.1 构建镜像
|
||||
|
||||
**前置需求**
|
||||
|
||||
(可根据自身情况修改Dockerfile)
|
||||
|
||||
下载[kafka-console-ui.zip](https://github.com/xxd763795151/kafka-console-ui/releases)包
|
||||
|
||||
解压后 将Dockerfile放入文件夹的根目录
|
||||
|
||||
**Dockefile**
|
||||
|
||||
```dockerfile
|
||||
# jdk
|
||||
FROM openjdk:8-jdk-alpine
|
||||
# label
|
||||
LABEL by="https://github.com/xxd763795151/kafka-console-ui"
|
||||
# root
|
||||
RUN mkdir -p /app && cd /app
|
||||
WORKDIR /app
|
||||
# config log data
|
||||
RUN mkdir -p /app/config && mkdir -p /app/log && mkdir -p /app/data && mkdir -p /app/lib
|
||||
# add file
|
||||
ADD ./lib/kafka-console-ui.jar /app/lib
|
||||
ADD ./config /app/config
|
||||
# port
|
||||
EXPOSE 7766
|
||||
# start server
|
||||
CMD java -jar -Xmx512m -Xms512m -Xmn256m -Xss256k /app/lib/kafka-console-ui.jar --spring.config.location="/app/config/" --logging.home="/app/log" --data.dir="/app/data"
|
||||
|
||||
```
|
||||
|
||||
**进行打包**
|
||||
|
||||
在文件夹根目录下
|
||||
|
||||
(注意末尾有个点)
|
||||
|
||||
```shell
|
||||
docker build -t ${your_docker_hub_addr} .
|
||||
```
|
||||
|
||||
|
||||
|
||||
## 3.2 上传镜像
|
||||
|
||||
```shell
|
||||
docker push ${your_docker_hub_addr}
|
||||
```
|
||||
|
||||
|
||||
|
||||
# 4.容器编排
|
||||
|
||||
```dockerfile
|
||||
# docker-compose 编排
|
||||
version: '3'
|
||||
services:
|
||||
# 服务名
|
||||
kafka-console-ui:
|
||||
# 容器名
|
||||
container_name: "kafka-console-ui"
|
||||
# 端口
|
||||
ports:
|
||||
- "7766:7766"
|
||||
# 持久化
|
||||
volumes:
|
||||
- ./data:/app/data
|
||||
- ./log:/app/log
|
||||
# 防止读写文件有问题
|
||||
privileged: true
|
||||
user: root
|
||||
# 镜像地址
|
||||
image: "wdkang/kafka-console-ui"
|
||||
```
|
||||
|
||||
|
||||
|
||||
## 4.1 拉取镜像
|
||||
|
||||
```shell
|
||||
docker-compose pull kafka-console-ui
|
||||
```
|
||||
|
||||
|
||||
|
||||
## 4.2 构建启动
|
||||
|
||||
```shell
|
||||
docker-compose up --detach --build kafka-console-ui
|
||||
```
|
||||
|
||||
|
||||
|
||||
## 4.3 查看状态
|
||||
|
||||
```shell
|
||||
docker-compose ps -a
|
||||
```
|
||||
|
||||
|
||||
|
||||
## 4.3 停止服务
|
||||
|
||||
```shell
|
||||
docker-compose down
|
||||
```
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
@@ -12,8 +12,11 @@
|
||||
* scala 2.13
|
||||
* maven >=3.6+
|
||||
* webstorm
|
||||
* Node
|
||||
|
||||
除了webstorm是开发前端的ide可以根据自己需要代替,jdk scala是必须有的。
|
||||
|
||||
开发的时候,我本地用的node版本是v14.16.0,下载目录:https://nodejs.org/download/release/v14.16.0/ . 过高或过低版本是否适用,我也没测试过。
|
||||
|
||||
scala 2.13下载地址,在这个页面最下面:https://www.scala-lang.org/download/scala2.html
|
||||
## 克隆代码
|
||||
@@ -21,7 +24,8 @@ scala 2.13下载地址,在这个页面最下面:https://www.scala-lang.org/d
|
||||
## 后端配置
|
||||
1. 用idea打开项目
|
||||
2. 打开idea的Project Structure(Settings) -> Modules -> 设置src/main/scala为Sources,因为约定src/main/java是源码目录,所以这里要再加一个源码目录
|
||||
3. 打开idea的Project Structure(Settings) -> Libraries 添加scala sdk,然后选择本地下载的scala 2.13的目录,确定添加进来(如果使用的idea可以直接勾选,也可以不用先下载到本地)
|
||||
3. 打开idea的Settings -> plugins 搜索scala plugin并安装,然后应该是要重启idea生效,这一步必须在第4步之前
|
||||
4. 打开idea的Project Structure(Settings) -> Libraries 添加scala sdk,然后选择本地下载的scala 2.13的目录,确定添加进来(如果使用的idea可以直接勾选,也可以不用先下载到本地)
|
||||
## 前端
|
||||
前端代码在工程的ui目录下,找个前端开发的ide如web storm打开进行开发即可。
|
||||
|
||||
|
||||
BIN
document/img/功能特性-old.png
Normal file
|
After Width: | Height: | Size: 99 KiB |
|
Before Width: | Height: | Size: 99 KiB After Width: | Height: | Size: 103 KiB |
@@ -29,4 +29,6 @@ package.bat
|
||||
cd kafka-console-ui
|
||||
# linux或mac执行
|
||||
sh package.sh
|
||||
```
|
||||
```
|
||||
|
||||
打包完成,会在target目录下生成一个kafka-console-ui.zip的安装包
|
||||
16
pom.xml
@@ -10,7 +10,7 @@
|
||||
</parent>
|
||||
<groupId>com.xuxd</groupId>
|
||||
<artifactId>kafka-console-ui</artifactId>
|
||||
<version>1.0.4</version>
|
||||
<version>1.0.5</version>
|
||||
<name>kafka-console-ui</name>
|
||||
<description>Kafka console manage ui</description>
|
||||
<properties>
|
||||
@@ -21,7 +21,7 @@
|
||||
<ui.path>${project.basedir}/ui</ui.path>
|
||||
<frontend-maven-plugin.version>1.11.0</frontend-maven-plugin.version>
|
||||
<compiler.version>1.8</compiler.version>
|
||||
<kafka.version>2.8.0</kafka.version>
|
||||
<kafka.version>3.2.0</kafka.version>
|
||||
<maven.assembly.plugin.version>3.0.0</maven.assembly.plugin.version>
|
||||
<mybatis-plus-boot-starter.version>3.4.2</mybatis-plus-boot-starter.version>
|
||||
<scala.version>2.13.6</scala.version>
|
||||
@@ -76,6 +76,18 @@
|
||||
<groupId>org.apache.kafka</groupId>
|
||||
<artifactId>kafka_2.13</artifactId>
|
||||
<version>${kafka.version}</version>
|
||||
<exclusions>
|
||||
<exclusion>
|
||||
<groupId>com.typesafe.scala-logging</groupId>
|
||||
<artifactId>scala-logging_2.13</artifactId>
|
||||
</exclusion>
|
||||
</exclusions>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>com.typesafe.scala-logging</groupId>
|
||||
<artifactId>scala-logging_2.13</artifactId>
|
||||
<version>3.9.2</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
|
||||
@@ -1,18 +1,15 @@
|
||||
package com.xuxd.kafka.console.beans;
|
||||
|
||||
import java.util.Objects;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.kafka.common.acl.AccessControlEntry;
|
||||
import org.apache.kafka.common.acl.AccessControlEntryFilter;
|
||||
import org.apache.kafka.common.acl.AclBinding;
|
||||
import org.apache.kafka.common.acl.AclBindingFilter;
|
||||
import org.apache.kafka.common.acl.AclOperation;
|
||||
import org.apache.kafka.common.acl.AclPermissionType;
|
||||
import org.apache.kafka.common.acl.*;
|
||||
import org.apache.kafka.common.resource.PatternType;
|
||||
import org.apache.kafka.common.resource.ResourcePattern;
|
||||
import org.apache.kafka.common.resource.ResourcePatternFilter;
|
||||
import org.apache.kafka.common.resource.ResourceType;
|
||||
import org.apache.kafka.common.security.auth.KafkaPrincipal;
|
||||
import org.apache.kafka.common.utils.SecurityUtils;
|
||||
|
||||
import java.util.Objects;
|
||||
|
||||
/**
|
||||
* kafka-console-ui.
|
||||
@@ -41,7 +38,9 @@ public class AclEntry {
|
||||
entry.setResourceType(binding.pattern().resourceType().name());
|
||||
entry.setName(binding.pattern().name());
|
||||
entry.setPatternType(binding.pattern().patternType().name());
|
||||
entry.setPrincipal(KafkaPrincipal.fromString(binding.entry().principal()).getName());
|
||||
// entry.setPrincipal(KafkaPrincipal.fromString(binding.entry().principal()).getName());
|
||||
// 3.x版本使用该方法
|
||||
entry.setPrincipal(SecurityUtils.parseKafkaPrincipal(binding.entry().principal()).getName());
|
||||
entry.setHost(binding.entry().host());
|
||||
entry.setOperation(binding.entry().operation().name());
|
||||
entry.setPermissionType(binding.entry().permissionType().name());
|
||||
|
||||
33
src/main/java/com/xuxd/kafka/console/cache/TimeBasedCache.java
vendored
Normal file
@@ -0,0 +1,33 @@
|
||||
package com.xuxd.kafka.console.cache;
|
||||
|
||||
import com.google.common.cache.CacheBuilder;
|
||||
import com.google.common.cache.CacheLoader;
|
||||
import com.google.common.cache.LoadingCache;
|
||||
import com.google.common.cache.RemovalListener;
|
||||
import kafka.console.KafkaConsole;
|
||||
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
public class TimeBasedCache<K, V> {
|
||||
private LoadingCache<K, V> cache;
|
||||
|
||||
private KafkaConsole console;
|
||||
|
||||
public TimeBasedCache(CacheLoader<K, V> loader, RemovalListener<K, V> listener) {
|
||||
cache = CacheBuilder.newBuilder()
|
||||
.maximumSize(50) // maximum 100 records can be cached
|
||||
.expireAfterAccess(30, TimeUnit.MINUTES) // cache will expire after 30 minutes of access
|
||||
.removalListener(listener)
|
||||
.build(loader);
|
||||
|
||||
}
|
||||
|
||||
public V get(K k) {
|
||||
try {
|
||||
return cache.get(k);
|
||||
} catch (ExecutionException e) {
|
||||
throw new RuntimeException("Get connection from cache error.", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -20,6 +20,12 @@ public class KafkaConfig {
|
||||
|
||||
private Properties properties;
|
||||
|
||||
private boolean cacheAdminConnection;
|
||||
|
||||
private boolean cacheProducerConnection;
|
||||
|
||||
private boolean cacheConsumerConnection;
|
||||
|
||||
public String getBootstrapServer() {
|
||||
return bootstrapServer;
|
||||
}
|
||||
@@ -43,4 +49,28 @@ public class KafkaConfig {
|
||||
public void setProperties(Properties properties) {
|
||||
this.properties = properties;
|
||||
}
|
||||
|
||||
public boolean isCacheAdminConnection() {
|
||||
return cacheAdminConnection;
|
||||
}
|
||||
|
||||
public void setCacheAdminConnection(boolean cacheAdminConnection) {
|
||||
this.cacheAdminConnection = cacheAdminConnection;
|
||||
}
|
||||
|
||||
public boolean isCacheProducerConnection() {
|
||||
return cacheProducerConnection;
|
||||
}
|
||||
|
||||
public void setCacheProducerConnection(boolean cacheProducerConnection) {
|
||||
this.cacheProducerConnection = cacheProducerConnection;
|
||||
}
|
||||
|
||||
public boolean isCacheConsumerConnection() {
|
||||
return cacheConsumerConnection;
|
||||
}
|
||||
|
||||
public void setCacheConsumerConnection(boolean cacheConsumerConnection) {
|
||||
this.cacheConsumerConnection = cacheConsumerConnection;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,14 +1,15 @@
|
||||
package com.xuxd.kafka.console.controller;
|
||||
|
||||
import com.xuxd.kafka.console.beans.QueryMessage;
|
||||
import com.xuxd.kafka.console.beans.ResponseData;
|
||||
import com.xuxd.kafka.console.beans.SendMessage;
|
||||
import com.xuxd.kafka.console.beans.dto.QueryMessageDTO;
|
||||
import com.xuxd.kafka.console.service.MessageService;
|
||||
import org.apache.commons.collections.CollectionUtils;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.web.bind.annotation.GetMapping;
|
||||
import org.springframework.web.bind.annotation.PostMapping;
|
||||
import org.springframework.web.bind.annotation.RequestBody;
|
||||
import org.springframework.web.bind.annotation.RequestMapping;
|
||||
import org.springframework.web.bind.annotation.RestController;
|
||||
import org.springframework.web.bind.annotation.*;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* kafka-console-ui.
|
||||
@@ -52,4 +53,12 @@ public class MessageController {
|
||||
public Object resend(@RequestBody SendMessage message) {
|
||||
return messageService.resend(message);
|
||||
}
|
||||
|
||||
@DeleteMapping
|
||||
public Object delete(@RequestBody List<QueryMessage> messages) {
|
||||
if (CollectionUtils.isEmpty(messages)) {
|
||||
return ResponseData.create().failed("params is null");
|
||||
}
|
||||
return messageService.delete(messages);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -43,8 +43,8 @@ public class TopicController {
|
||||
}
|
||||
|
||||
@DeleteMapping
|
||||
public Object deleteTopic(@RequestParam String topic) {
|
||||
return topicService.deleteTopic(topic);
|
||||
public Object deleteTopic(@RequestBody List<String> topics) {
|
||||
return topicService.deleteTopics(topics);
|
||||
}
|
||||
|
||||
@GetMapping("/partition")
|
||||
|
||||
@@ -4,6 +4,8 @@ import com.xuxd.kafka.console.beans.QueryMessage;
|
||||
import com.xuxd.kafka.console.beans.ResponseData;
|
||||
import com.xuxd.kafka.console.beans.SendMessage;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* kafka-console-ui.
|
||||
*
|
||||
@@ -23,4 +25,6 @@ public interface MessageService {
|
||||
ResponseData send(SendMessage message);
|
||||
|
||||
ResponseData resend(SendMessage message);
|
||||
|
||||
ResponseData delete(List<QueryMessage> messages);
|
||||
}
|
||||
|
||||
@@ -4,6 +4,8 @@ import com.xuxd.kafka.console.beans.ReplicaAssignment;
|
||||
import com.xuxd.kafka.console.beans.ResponseData;
|
||||
import com.xuxd.kafka.console.beans.enums.TopicThrottleSwitch;
|
||||
import com.xuxd.kafka.console.beans.enums.TopicType;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import org.apache.kafka.clients.admin.NewTopic;
|
||||
|
||||
@@ -19,7 +21,7 @@ public interface TopicService {
|
||||
|
||||
ResponseData getTopicList(String topic, TopicType type);
|
||||
|
||||
ResponseData deleteTopic(String topic);
|
||||
ResponseData deleteTopics(Collection<String> topics);
|
||||
|
||||
ResponseData getTopicPartitionInfo(String topic);
|
||||
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package com.xuxd.kafka.console.service.impl;
|
||||
|
||||
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
|
||||
import com.xuxd.kafka.console.beans.BrokerNode;
|
||||
import com.xuxd.kafka.console.beans.ClusterInfo;
|
||||
import com.xuxd.kafka.console.beans.ResponseData;
|
||||
import com.xuxd.kafka.console.beans.dos.ClusterInfoDO;
|
||||
@@ -8,15 +9,11 @@ import com.xuxd.kafka.console.beans.vo.BrokerApiVersionVO;
|
||||
import com.xuxd.kafka.console.beans.vo.ClusterInfoVO;
|
||||
import com.xuxd.kafka.console.dao.ClusterInfoMapper;
|
||||
import com.xuxd.kafka.console.service.ClusterService;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.Comparator;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.TreeSet;
|
||||
|
||||
import java.util.*;
|
||||
import java.util.stream.Collectors;
|
||||
import kafka.console.ClusterConsole;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.commons.collections.CollectionUtils;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.kafka.clients.NodeApiVersions;
|
||||
@@ -30,6 +27,7 @@ import org.springframework.stereotype.Service;
|
||||
* @author xuxd
|
||||
* @date 2021-10-08 14:23:09
|
||||
**/
|
||||
@Slf4j
|
||||
@Service
|
||||
public class ClusterServiceImpl implements ClusterService {
|
||||
|
||||
@@ -45,7 +43,12 @@ public class ClusterServiceImpl implements ClusterService {
|
||||
|
||||
@Override public ResponseData getClusterInfo() {
|
||||
ClusterInfo clusterInfo = clusterConsole.clusterInfo();
|
||||
clusterInfo.setNodes(new TreeSet<>(clusterInfo.getNodes()));
|
||||
Set<BrokerNode> nodes = clusterInfo.getNodes();
|
||||
if (nodes == null) {
|
||||
log.error("集群节点信息为空,集群地址可能不正确或集群内没有活跃节点");
|
||||
return ResponseData.create().failed("集群节点信息为空,集群地址可能不正确或集群内没有活跃节点");
|
||||
}
|
||||
clusterInfo.setNodes(new TreeSet<>(nodes));
|
||||
return ResponseData.create().data(clusterInfo).success();
|
||||
}
|
||||
|
||||
|
||||
@@ -24,6 +24,7 @@ import kafka.console.TopicConsole;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.commons.collections.CollectionUtils;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.kafka.clients.admin.RecordsToDelete;
|
||||
import org.apache.kafka.clients.admin.TopicDescription;
|
||||
import org.apache.kafka.clients.consumer.ConsumerRecord;
|
||||
import org.apache.kafka.clients.producer.ProducerRecord;
|
||||
@@ -242,6 +243,18 @@ public class MessageServiceImpl implements MessageService, ApplicationContextAwa
|
||||
return success ? ResponseData.create().success("success: " + tuple2._2()) : ResponseData.create().failed(tuple2._2());
|
||||
}
|
||||
|
||||
@Override
|
||||
public ResponseData delete(List<QueryMessage> messages) {
|
||||
Map<TopicPartition, RecordsToDelete> params = new HashMap<>(messages.size(), 1f);
|
||||
|
||||
messages.forEach(message -> {
|
||||
params.put(new TopicPartition(message.getTopic(), message.getPartition()), RecordsToDelete.beforeOffset(message.getOffset()));
|
||||
});
|
||||
Tuple2<Object, String> tuple2 = messageConsole.delete(params);
|
||||
boolean success = (boolean) tuple2._1();
|
||||
return success ? ResponseData.create().success() : ResponseData.create().failed(tuple2._2());
|
||||
}
|
||||
|
||||
private Map<TopicPartition, ConsumerRecord<byte[], byte[]>> searchRecordByOffset(QueryMessage queryMessage) {
|
||||
Set<TopicPartition> partitions = getPartitions(queryMessage);
|
||||
|
||||
|
||||
@@ -9,16 +9,6 @@ import com.xuxd.kafka.console.beans.vo.TopicDescriptionVO;
|
||||
import com.xuxd.kafka.console.beans.vo.TopicPartitionVO;
|
||||
import com.xuxd.kafka.console.service.TopicService;
|
||||
import com.xuxd.kafka.console.utils.GsonUtil;
|
||||
import java.util.Calendar;
|
||||
import java.util.Collections;
|
||||
import java.util.Comparator;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.stream.Collectors;
|
||||
import kafka.console.MessageConsole;
|
||||
import kafka.console.TopicConsole;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
@@ -33,6 +23,10 @@ import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Service;
|
||||
import scala.Tuple2;
|
||||
|
||||
import java.util.*;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
* kafka-console-ui.
|
||||
*
|
||||
@@ -87,8 +81,8 @@ public class TopicServiceImpl implements TopicService {
|
||||
return ResponseData.create().data(topicDescriptions.stream().map(d -> TopicDescriptionVO.from(d))).success();
|
||||
}
|
||||
|
||||
@Override public ResponseData deleteTopic(String topic) {
|
||||
Tuple2<Object, String> tuple2 = topicConsole.deleteTopic(topic);
|
||||
@Override public ResponseData deleteTopics(Collection<String> topics) {
|
||||
Tuple2<Object, String> tuple2 = topicConsole.deleteTopics(topics);
|
||||
return (Boolean) tuple2._1 ? ResponseData.create().success() : ResponseData.create().failed(tuple2._2);
|
||||
}
|
||||
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
package com.xuxd.kafka.console.interceptor;
|
||||
package com.xuxd.kafka.console.service.interceptor;
|
||||
|
||||
import com.xuxd.kafka.console.beans.ResponseData;
|
||||
import com.xuxd.kafka.console.beans.dos.ClusterInfoDO;
|
||||
@@ -1,4 +1,4 @@
|
||||
package com.xuxd.kafka.console.interceptor;
|
||||
package com.xuxd.kafka.console.service.interceptor;
|
||||
|
||||
import com.xuxd.kafka.console.beans.ResponseData;
|
||||
import javax.servlet.http.HttpServletRequest;
|
||||
@@ -12,6 +12,14 @@ kafka:
|
||||
# 集群其它属性配置
|
||||
properties:
|
||||
# request.timeout.ms: 5000
|
||||
# 缓存连接,不缓存的情况下,每次请求建立连接. 即使每次请求建立连接,其实也很快,某些情况下开启ACL,查询可能很慢,可以设置连接缓存为true,
|
||||
# 或者想提高查询速度,也可以设置下面连接缓存为true
|
||||
# 缓存 admin client的连接
|
||||
cache-admin-connection: false
|
||||
# 缓存 producer的连接
|
||||
cache-producer-connection: false
|
||||
# 缓存 consumer的连接
|
||||
cache-consumer-connection: false
|
||||
|
||||
spring:
|
||||
application:
|
||||
|
||||
@@ -18,6 +18,7 @@ import org.apache.kafka.common.network.Selector
|
||||
import org.apache.kafka.common.protocol.Errors
|
||||
import org.apache.kafka.common.requests._
|
||||
import org.apache.kafka.common.utils.{KafkaThread, LogContext, Time}
|
||||
import org.slf4j.{Logger, LoggerFactory}
|
||||
|
||||
import java.io.IOException
|
||||
import java.util.Properties
|
||||
@@ -34,7 +35,9 @@ import scala.util.{Failure, Success, Try}
|
||||
* @author xuxd
|
||||
* @date 2022-01-22 15:15:57
|
||||
* */
|
||||
object BrokerApiVersion extends Logging {
|
||||
object BrokerApiVersion{
|
||||
|
||||
protected lazy val log : Logger = LoggerFactory.getLogger(this.getClass)
|
||||
|
||||
def listAllBrokerApiVersionInfo(): java.util.HashMap[Node, NodeApiVersions] = {
|
||||
val res = new java.util.HashMap[Node, NodeApiVersions]()
|
||||
@@ -48,7 +51,7 @@ object BrokerApiVersion extends Logging {
|
||||
case Success(v) => {
|
||||
res.put(broker, v)
|
||||
}
|
||||
case Failure(v) => logger.error(s"${broker} -> ERROR: ${v}\n")
|
||||
case Failure(v) => log.error(s"${broker} -> ERROR: ${v}\n")
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
@@ -149,12 +152,12 @@ object BrokerApiVersion extends Logging {
|
||||
val response = sendAnyNode(request).asInstanceOf[MetadataResponse]
|
||||
val errors = response.errors
|
||||
if (!errors.isEmpty) {
|
||||
logger.info(s"Metadata request contained errors: $errors")
|
||||
log.info(s"Metadata request contained errors: $errors")
|
||||
}
|
||||
|
||||
// 在3.x版本中这个方法是buildCluster 代替cluster()了
|
||||
// response.buildCluster.nodes.asScala.toList
|
||||
response.cluster().nodes.asScala.toList
|
||||
response.buildCluster.nodes.asScala.toList
|
||||
// response.cluster().nodes.asScala.toList
|
||||
}
|
||||
|
||||
def listAllBrokerVersionInfo(): Map[Node, Try[NodeApiVersions]] =
|
||||
@@ -277,40 +280,40 @@ object BrokerApiVersion extends Logging {
|
||||
|
||||
// 版本不一样,这个地方的兼容性问题也不一样了
|
||||
// 3.x版本用这个
|
||||
// val networkClient = new NetworkClient(
|
||||
// selector,
|
||||
// metadata,
|
||||
// clientId,
|
||||
// DefaultMaxInFlightRequestsPerConnection,
|
||||
// DefaultReconnectBackoffMs,
|
||||
// DefaultReconnectBackoffMax,
|
||||
// DefaultSendBufferBytes,
|
||||
// DefaultReceiveBufferBytes,
|
||||
// requestTimeoutMs,
|
||||
// connectionSetupTimeoutMs,
|
||||
// connectionSetupTimeoutMaxMs,
|
||||
// time,
|
||||
// true,
|
||||
// new ApiVersions,
|
||||
// logContext)
|
||||
val networkClient = new NetworkClient(
|
||||
selector,
|
||||
metadata,
|
||||
clientId,
|
||||
DefaultMaxInFlightRequestsPerConnection,
|
||||
DefaultReconnectBackoffMs,
|
||||
DefaultReconnectBackoffMax,
|
||||
DefaultSendBufferBytes,
|
||||
DefaultReceiveBufferBytes,
|
||||
requestTimeoutMs,
|
||||
connectionSetupTimeoutMs,
|
||||
connectionSetupTimeoutMaxMs,
|
||||
time,
|
||||
true,
|
||||
new ApiVersions,
|
||||
logContext)
|
||||
|
||||
val networkClient = new NetworkClient(
|
||||
selector,
|
||||
metadata,
|
||||
clientId,
|
||||
DefaultMaxInFlightRequestsPerConnection,
|
||||
DefaultReconnectBackoffMs,
|
||||
DefaultReconnectBackoffMax,
|
||||
DefaultSendBufferBytes,
|
||||
DefaultReceiveBufferBytes,
|
||||
requestTimeoutMs,
|
||||
connectionSetupTimeoutMs,
|
||||
connectionSetupTimeoutMaxMs,
|
||||
ClientDnsLookup.USE_ALL_DNS_IPS,
|
||||
time,
|
||||
true,
|
||||
new ApiVersions,
|
||||
logContext)
|
||||
// val networkClient = new NetworkClient(
|
||||
// selector,
|
||||
// metadata,
|
||||
// clientId,
|
||||
// DefaultMaxInFlightRequestsPerConnection,
|
||||
// DefaultReconnectBackoffMs,
|
||||
// DefaultReconnectBackoffMax,
|
||||
// DefaultSendBufferBytes,
|
||||
// DefaultReceiveBufferBytes,
|
||||
// requestTimeoutMs,
|
||||
// connectionSetupTimeoutMs,
|
||||
// connectionSetupTimeoutMaxMs,
|
||||
// ClientDnsLookup.USE_ALL_DNS_IPS,
|
||||
// time,
|
||||
// true,
|
||||
// new ApiVersions,
|
||||
// logContext)
|
||||
|
||||
val highLevelClient = new ConsumerNetworkClient(
|
||||
logContext,
|
||||
|
||||
@@ -1,18 +1,21 @@
|
||||
package kafka.console
|
||||
|
||||
import com.google.common.cache.{CacheLoader, RemovalListener, RemovalNotification}
|
||||
import com.xuxd.kafka.console.cache.TimeBasedCache
|
||||
import com.xuxd.kafka.console.config.{ContextConfigHolder, KafkaConfig}
|
||||
import kafka.zk.{AdminZkClient, KafkaZkClient}
|
||||
import kafka.zk.AdminZkClient
|
||||
import org.apache.kafka.clients.admin._
|
||||
import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer, OffsetAndMetadata}
|
||||
import org.apache.kafka.clients.producer.KafkaProducer
|
||||
import org.apache.kafka.common.TopicPartition
|
||||
import org.apache.kafka.common.requests.ListOffsetsResponse
|
||||
import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySerializer, StringSerializer}
|
||||
import org.apache.kafka.common.utils.Time
|
||||
import org.slf4j.{Logger, LoggerFactory}
|
||||
|
||||
import java.util.Properties
|
||||
import java.util.concurrent.Executors
|
||||
import scala.collection.{Map, Seq}
|
||||
import scala.concurrent.{ExecutionContext, Future}
|
||||
import scala.jdk.CollectionConverters.{MapHasAsJava, MapHasAsScala}
|
||||
|
||||
/**
|
||||
@@ -27,11 +30,13 @@ class KafkaConsole(config: KafkaConfig) {
|
||||
|
||||
protected def withAdminClient(f: Admin => Any): Any = {
|
||||
|
||||
val admin = createAdminClient()
|
||||
val admin = if (config.isCacheAdminConnection()) AdminCache.cache.get(ContextConfigHolder.CONTEXT_CONFIG.get().getBootstrapServer()) else createAdminClient()
|
||||
try {
|
||||
f(admin)
|
||||
} finally {
|
||||
admin.close()
|
||||
if (!config.isCacheAdminConnection) {
|
||||
admin.close()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -45,33 +50,40 @@ class KafkaConsole(config: KafkaConfig) {
|
||||
|
||||
protected def withConsumerAndCatchError(f: KafkaConsumer[Array[Byte], Array[Byte]] => Any, eh: Exception => Any,
|
||||
extra: Properties = new Properties()): Any = {
|
||||
val props = getProps()
|
||||
props.putAll(extra)
|
||||
props.put(ConsumerConfig.CLIENT_ID_CONFIG, String.valueOf(System.currentTimeMillis()))
|
||||
val consumer = new KafkaConsumer(props, new ByteArrayDeserializer, new ByteArrayDeserializer)
|
||||
// val props = getProps()
|
||||
// props.putAll(extra)
|
||||
// props.put(ConsumerConfig.CLIENT_ID_CONFIG, String.valueOf(System.currentTimeMillis()))
|
||||
// val consumer = new KafkaConsumer(props, new ByteArrayDeserializer, new ByteArrayDeserializer)
|
||||
ConsumerCache.setProperties(extra)
|
||||
val consumer = if (config.isCacheConsumerConnection) ConsumerCache.cache.get(ContextConfigHolder.CONTEXT_CONFIG.get().getBootstrapServer()) else KafkaConsole.createByteArrayKVConsumer(extra)
|
||||
|
||||
try {
|
||||
f(consumer)
|
||||
} catch {
|
||||
case er: Exception => eh(er)
|
||||
}
|
||||
finally {
|
||||
consumer.close()
|
||||
ConsumerCache.clearProperties()
|
||||
if (!config.isCacheConsumerConnection) {
|
||||
consumer.close()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
protected def withProducerAndCatchError(f: KafkaProducer[String, String] => Any, eh: Exception => Any,
|
||||
extra: Properties = new Properties()): Any = {
|
||||
val props = getProps()
|
||||
props.putAll(extra)
|
||||
props.put(ConsumerConfig.CLIENT_ID_CONFIG, String.valueOf(System.currentTimeMillis()))
|
||||
val producer = new KafkaProducer[String, String](props, new StringSerializer, new StringSerializer)
|
||||
ProducerCache.setProperties(extra)
|
||||
val producer = if (config.isCacheProducerConnection) ProducerCache.cache.get(ContextConfigHolder.CONTEXT_CONFIG.get().getBootstrapServer) else KafkaConsole.createProducer(extra)
|
||||
try {
|
||||
f(producer)
|
||||
} catch {
|
||||
case er: Exception => eh(er)
|
||||
}
|
||||
finally {
|
||||
producer.close()
|
||||
ProducerCache.clearProperties()
|
||||
if (!config.isCacheProducerConnection) {
|
||||
producer.close()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -79,7 +91,6 @@ class KafkaConsole(config: KafkaConfig) {
|
||||
extra: Properties = new Properties()): Any = {
|
||||
val props = getProps()
|
||||
props.putAll(extra)
|
||||
props.put(ConsumerConfig.CLIENT_ID_CONFIG, String.valueOf(System.currentTimeMillis()))
|
||||
val producer = new KafkaProducer[Array[Byte], Array[Byte]](props, new ByteArraySerializer, new ByteArraySerializer)
|
||||
try {
|
||||
f(producer)
|
||||
@@ -91,14 +102,17 @@ class KafkaConsole(config: KafkaConfig) {
|
||||
}
|
||||
}
|
||||
|
||||
@Deprecated
|
||||
protected def withZKClient(f: AdminZkClient => Any): Any = {
|
||||
val zkClient = KafkaZkClient(config.getZookeeperAddr, false, 30000, 30000, Int.MaxValue, Time.SYSTEM)
|
||||
val adminZkClient = new AdminZkClient(zkClient)
|
||||
try {
|
||||
f(adminZkClient)
|
||||
} finally {
|
||||
zkClient.close()
|
||||
}
|
||||
// val zkClient = KafkaZkClient(config.getZookeeperAddr, false, 30000, 30000, Int.MaxValue, Time.SYSTEM)
|
||||
// 3.x
|
||||
// val zkClient = KafkaZkClient(config.getZookeeperAddr, false, 30000, 30000, Int.MaxValue, Time.SYSTEM, new ZKClientConfig(), "KafkaZkClient")
|
||||
// val adminZkClient = new AdminZkClient(zkClient)
|
||||
// try {
|
||||
// f(adminZkClient)
|
||||
// } finally {
|
||||
// zkClient.close()
|
||||
// }
|
||||
}
|
||||
|
||||
protected def createAdminClient(props: Properties): Admin = {
|
||||
@@ -110,20 +124,47 @@ class KafkaConsole(config: KafkaConfig) {
|
||||
}
|
||||
|
||||
private def createAdminClient(): Admin = {
|
||||
Admin.create(getProps())
|
||||
KafkaConsole.createAdminClient()
|
||||
}
|
||||
|
||||
private def getProps(): Properties = {
|
||||
KafkaConsole.getProps()
|
||||
}
|
||||
}
|
||||
|
||||
object KafkaConsole {
|
||||
val log: Logger = LoggerFactory.getLogger(this.getClass)
|
||||
|
||||
def createAdminClient(): Admin = {
|
||||
Admin.create(getProps())
|
||||
}
|
||||
|
||||
def createByteArrayKVConsumer(extra: Properties) : KafkaConsumer[Array[Byte], Array[Byte]] = {
|
||||
val props = getProps()
|
||||
props.putAll(extra)
|
||||
props.put(ConsumerConfig.CLIENT_ID_CONFIG, String.valueOf(System.currentTimeMillis()))
|
||||
new KafkaConsumer(props, new ByteArrayDeserializer, new ByteArrayDeserializer)
|
||||
}
|
||||
|
||||
def createProducer(extra: Properties) : KafkaProducer[String, String] = {
|
||||
val props = getProps()
|
||||
props.putAll(extra)
|
||||
new KafkaProducer(props, new StringSerializer, new StringSerializer)
|
||||
}
|
||||
|
||||
def createByteArrayStringProducer(extra: Properties) : KafkaProducer[Array[Byte], Array[Byte]] = {
|
||||
val props = getProps()
|
||||
props.putAll(extra)
|
||||
new KafkaProducer(props, new ByteArraySerializer, new ByteArraySerializer)
|
||||
}
|
||||
|
||||
def getProps(): Properties = {
|
||||
val props: Properties = new Properties();
|
||||
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, ContextConfigHolder.CONTEXT_CONFIG.get().getBootstrapServer())
|
||||
props.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, ContextConfigHolder.CONTEXT_CONFIG.get().getRequestTimeoutMs())
|
||||
props.putAll(ContextConfigHolder.CONTEXT_CONFIG.get().getProperties())
|
||||
props
|
||||
}
|
||||
}
|
||||
|
||||
object KafkaConsole {
|
||||
val log: Logger = LoggerFactory.getLogger(this.getClass)
|
||||
|
||||
def getCommittedOffsets(admin: Admin, groupId: String,
|
||||
timeoutMs: Integer): Map[TopicPartition, OffsetAndMetadata] = {
|
||||
@@ -174,4 +215,88 @@ object KafkaConsole {
|
||||
}.toMap
|
||||
res
|
||||
}
|
||||
implicit val ec = ExecutionContext.fromExecutorService(Executors.newFixedThreadPool(2))
|
||||
}
|
||||
|
||||
object AdminCache {
|
||||
|
||||
private val log: Logger = LoggerFactory.getLogger(this.getClass)
|
||||
|
||||
private val cacheLoader = new CacheLoader[String, Admin] {
|
||||
override def load(key: String): Admin = KafkaConsole.createAdminClient()
|
||||
}
|
||||
|
||||
private val removeListener = new RemovalListener[String, Admin] {
|
||||
override def onRemoval(notification: RemovalNotification[String, Admin]): Unit = {
|
||||
Future {
|
||||
log.warn("Close expired admin connection: {}", notification.getKey)
|
||||
notification.getValue.close()
|
||||
log.warn("Close expired admin connection complete: {}", notification.getKey)
|
||||
}(KafkaConsole.ec)
|
||||
|
||||
}
|
||||
}
|
||||
val cache = new TimeBasedCache[String, Admin](cacheLoader, removeListener)
|
||||
}
|
||||
|
||||
object ConsumerCache {
|
||||
|
||||
private val log: Logger = LoggerFactory.getLogger(this.getClass)
|
||||
|
||||
private val threadLocal = new ThreadLocal[Properties]
|
||||
|
||||
private val cacheLoader = new CacheLoader[String, KafkaConsumer[Array[Byte], Array[Byte]]] {
|
||||
override def load(key: String): KafkaConsumer[Array[Byte], Array[Byte]] = KafkaConsole.createByteArrayKVConsumer(threadLocal.get())
|
||||
}
|
||||
|
||||
private val removeListener = new RemovalListener[String, KafkaConsumer[Array[Byte], Array[Byte]]] {
|
||||
override def onRemoval(notification: RemovalNotification[String, KafkaConsumer[Array[Byte], Array[Byte]]]): Unit = {
|
||||
Future {
|
||||
log.warn("Close expired consumer connection: {}", notification.getKey)
|
||||
notification.getValue.close()
|
||||
log.warn("Close expired consumer connection complete: {}", notification.getKey)
|
||||
}(KafkaConsole.ec)
|
||||
|
||||
}
|
||||
}
|
||||
val cache = new TimeBasedCache[String, KafkaConsumer[Array[Byte], Array[Byte]]](cacheLoader, removeListener)
|
||||
|
||||
def setProperties(props : Properties) : Unit = {
|
||||
threadLocal.set(props)
|
||||
}
|
||||
|
||||
def clearProperties() : Unit = {
|
||||
threadLocal.remove()
|
||||
}
|
||||
}
|
||||
|
||||
object ProducerCache {
|
||||
|
||||
private val log: Logger = LoggerFactory.getLogger(this.getClass)
|
||||
|
||||
private val threadLocal = new ThreadLocal[Properties]
|
||||
|
||||
private val cacheLoader = new CacheLoader[String, KafkaProducer[String, String]] {
|
||||
override def load(key: String): KafkaProducer[String, String] = KafkaConsole.createProducer(threadLocal.get())
|
||||
}
|
||||
|
||||
private val removeListener = new RemovalListener[String, KafkaProducer[String, String]] {
|
||||
override def onRemoval(notification: RemovalNotification[String, KafkaProducer[String, String]]): Unit = {
|
||||
Future {
|
||||
log.warn("Close expired producer connection: {}", notification.getKey)
|
||||
notification.getValue.close()
|
||||
log.warn("Close expired producer connection complete: {}", notification.getKey)
|
||||
}(KafkaConsole.ec)
|
||||
|
||||
}
|
||||
}
|
||||
val cache = new TimeBasedCache[String, KafkaProducer[String, String]](cacheLoader, removeListener)
|
||||
|
||||
def setProperties(props : Properties) : Unit = {
|
||||
threadLocal.set(props)
|
||||
}
|
||||
|
||||
def clearProperties() : Unit = {
|
||||
threadLocal.remove()
|
||||
}
|
||||
}
|
||||
@@ -4,13 +4,14 @@ import com.xuxd.kafka.console.beans.MessageFilter
|
||||
import com.xuxd.kafka.console.beans.enums.FilterType
|
||||
import com.xuxd.kafka.console.config.{ContextConfigHolder, KafkaConfig}
|
||||
import org.apache.commons.lang3.StringUtils
|
||||
import org.apache.kafka.clients.admin.{DeleteRecordsOptions, RecordsToDelete}
|
||||
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
|
||||
import org.apache.kafka.clients.producer.ProducerRecord
|
||||
import org.apache.kafka.common.TopicPartition
|
||||
|
||||
import java.time.Duration
|
||||
import java.util
|
||||
import java.util.Properties
|
||||
import java.util.{Properties}
|
||||
import scala.collection.immutable
|
||||
import scala.jdk.CollectionConverters.{CollectionHasAsScala, MapHasAsScala, SeqHasAsJava}
|
||||
|
||||
@@ -127,7 +128,7 @@ class MessageConsole(config: KafkaConfig) extends KafkaConsole(config: KafkaConf
|
||||
record.offset(),
|
||||
record.timestamp(),
|
||||
record.timestampType(),
|
||||
record.checksum(),
|
||||
// record.checksum(),
|
||||
record.serializedKeySize(),
|
||||
record.serializedValueSize(),
|
||||
record.key(),
|
||||
@@ -236,4 +237,14 @@ class MessageConsole(config: KafkaConfig) extends KafkaConsole(config: KafkaConf
|
||||
(false, e.getMessage)
|
||||
}).asInstanceOf[(Boolean, String)]
|
||||
}
|
||||
|
||||
def delete(recordsToDelete: util.Map[TopicPartition, RecordsToDelete]): (Boolean, String) = {
|
||||
withAdminClientAndCatchError(admin => {
|
||||
admin.deleteRecords(recordsToDelete, withTimeoutMs(new DeleteRecordsOptions())).all().get()
|
||||
(true, "")
|
||||
}, e => {
|
||||
log.error("delete message error.", e)
|
||||
(false, "delete error :" + e.getMessage)
|
||||
}).asInstanceOf[(Boolean, String)]
|
||||
}
|
||||
}
|
||||
|
||||
@@ -66,17 +66,17 @@ class TopicConsole(config: KafkaConfig) extends KafkaConsole(config: KafkaConfig
|
||||
/**
|
||||
* delete topic by topic name.
|
||||
*
|
||||
* @param topic topic name.
|
||||
* @param topics topic name list.
|
||||
* @return result or : fail message.
|
||||
*/
|
||||
def deleteTopic(topic: String): (Boolean, String) = {
|
||||
def deleteTopics(topics: util.Collection[String]): (Boolean, String) = {
|
||||
withAdminClientAndCatchError(admin => {
|
||||
val timeoutMs = ContextConfigHolder.CONTEXT_CONFIG.get().getRequestTimeoutMs()
|
||||
admin.deleteTopics(Collections.singleton(topic), new DeleteTopicsOptions().retryOnQuotaViolation(false)).all().get(timeoutMs, TimeUnit.MILLISECONDS)
|
||||
admin.deleteTopics(topics, new DeleteTopicsOptions().retryOnQuotaViolation(false)).all().get(timeoutMs, TimeUnit.MILLISECONDS)
|
||||
(true, "")
|
||||
},
|
||||
e => {
|
||||
log.error("delete topic error, topic: " + topic, e)
|
||||
log.error("delete topic error, topic: " + topics, e)
|
||||
(false, e.getMessage)
|
||||
}).asInstanceOf[(Boolean, String)]
|
||||
}
|
||||
|
||||
160
ui/package-lock.json
generated
@@ -1820,63 +1820,6 @@
|
||||
"integrity": "sha1-/q7SVZc9LndVW4PbwIhRpsY1IPo=",
|
||||
"dev": true
|
||||
},
|
||||
"ansi-styles": {
|
||||
"version": "4.3.0",
|
||||
"resolved": "https://registry.npmjs.org/ansi-styles/-/ansi-styles-4.3.0.tgz",
|
||||
"integrity": "sha512-zbB9rCJAT1rbjiVDb2hqKFHNYLxgtk8NURxZ3IZwD3F6NtxbXZQCnnSi1Lkx+IDohdPlFp222wVALIheZJQSEg==",
|
||||
"dev": true,
|
||||
"optional": true,
|
||||
"requires": {
|
||||
"color-convert": "^2.0.1"
|
||||
}
|
||||
},
|
||||
"chalk": {
|
||||
"version": "4.1.2",
|
||||
"resolved": "https://registry.npmjs.org/chalk/-/chalk-4.1.2.tgz",
|
||||
"integrity": "sha512-oKnbhFyRIXpUuez8iBMmyEa4nbj4IOQyuhc/wy9kY7/WVPcwIO9VA668Pu8RkO7+0G76SLROeyw9CpQ061i4mA==",
|
||||
"dev": true,
|
||||
"optional": true,
|
||||
"requires": {
|
||||
"ansi-styles": "^4.1.0",
|
||||
"supports-color": "^7.1.0"
|
||||
}
|
||||
},
|
||||
"color-convert": {
|
||||
"version": "2.0.1",
|
||||
"resolved": "https://registry.npmjs.org/color-convert/-/color-convert-2.0.1.tgz",
|
||||
"integrity": "sha512-RRECPsj7iu/xb5oKYcsFHSppFNnsj/52OVTRKb4zP5onXwVF3zVmmToNcOfGC+CRDpfK/U584fMg38ZHCaElKQ==",
|
||||
"dev": true,
|
||||
"optional": true,
|
||||
"requires": {
|
||||
"color-name": "~1.1.4"
|
||||
}
|
||||
},
|
||||
"color-name": {
|
||||
"version": "1.1.4",
|
||||
"resolved": "https://registry.npmjs.org/color-name/-/color-name-1.1.4.tgz",
|
||||
"integrity": "sha512-dOy+3AuW3a2wNbZHIuMZpTcgjGuLU/uBL/ubcZF9OXbDo8ff4O8yVp5Bf0efS8uEoYo5q4Fx7dY9OgQGXgAsQA==",
|
||||
"dev": true,
|
||||
"optional": true
|
||||
},
|
||||
"has-flag": {
|
||||
"version": "4.0.0",
|
||||
"resolved": "https://registry.npmjs.org/has-flag/-/has-flag-4.0.0.tgz",
|
||||
"integrity": "sha512-EykJT/Q1KjTWctppgIAgfSO0tKVuZUjhgMr17kqTumMl6Afv3EISleU7qZUzoXDFTAHTDC4NOoG/ZxU3EvlMPQ==",
|
||||
"dev": true,
|
||||
"optional": true
|
||||
},
|
||||
"loader-utils": {
|
||||
"version": "2.0.2",
|
||||
"resolved": "https://registry.npmjs.org/loader-utils/-/loader-utils-2.0.2.tgz",
|
||||
"integrity": "sha512-TM57VeHptv569d/GKh6TAYdzKblwDNiumOdkFnejjD0XwTH87K90w3O7AiJRqdQoXygvi1VQTJTLGhJl7WqA7A==",
|
||||
"dev": true,
|
||||
"optional": true,
|
||||
"requires": {
|
||||
"big.js": "^5.2.2",
|
||||
"emojis-list": "^3.0.0",
|
||||
"json5": "^2.1.2"
|
||||
}
|
||||
},
|
||||
"ssri": {
|
||||
"version": "8.0.1",
|
||||
"resolved": "https://registry.nlark.com/ssri/download/ssri-8.0.1.tgz",
|
||||
@@ -1885,28 +1828,6 @@
|
||||
"requires": {
|
||||
"minipass": "^3.1.1"
|
||||
}
|
||||
},
|
||||
"supports-color": {
|
||||
"version": "7.2.0",
|
||||
"resolved": "https://registry.npmjs.org/supports-color/-/supports-color-7.2.0.tgz",
|
||||
"integrity": "sha512-qpCAvRl9stuOHveKsn7HncJRvv501qIacKzQlO/+Lwxc9+0q2wLyv4Dfvt80/DPn2pqOBsJdDiogXGR9+OvwRw==",
|
||||
"dev": true,
|
||||
"optional": true,
|
||||
"requires": {
|
||||
"has-flag": "^4.0.0"
|
||||
}
|
||||
},
|
||||
"vue-loader-v16": {
|
||||
"version": "npm:vue-loader@16.8.3",
|
||||
"resolved": "https://registry.npmjs.org/vue-loader/-/vue-loader-16.8.3.tgz",
|
||||
"integrity": "sha512-7vKN45IxsKxe5GcVCbc2qFU5aWzyiLrYJyUuMz4BQLKctCj/fmCa0w6fGiiQ2cLFetNcek1ppGJQDCup0c1hpA==",
|
||||
"dev": true,
|
||||
"optional": true,
|
||||
"requires": {
|
||||
"chalk": "^4.1.0",
|
||||
"hash-sum": "^2.0.0",
|
||||
"loader-utils": "^2.0.0"
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
@@ -12176,6 +12097,87 @@
|
||||
}
|
||||
}
|
||||
},
|
||||
"vue-loader-v16": {
|
||||
"version": "npm:vue-loader@16.8.3",
|
||||
"resolved": "https://registry.npmjs.org/vue-loader/-/vue-loader-16.8.3.tgz",
|
||||
"integrity": "sha512-7vKN45IxsKxe5GcVCbc2qFU5aWzyiLrYJyUuMz4BQLKctCj/fmCa0w6fGiiQ2cLFetNcek1ppGJQDCup0c1hpA==",
|
||||
"dev": true,
|
||||
"optional": true,
|
||||
"requires": {
|
||||
"chalk": "^4.1.0",
|
||||
"hash-sum": "^2.0.0",
|
||||
"loader-utils": "^2.0.0"
|
||||
},
|
||||
"dependencies": {
|
||||
"ansi-styles": {
|
||||
"version": "4.3.0",
|
||||
"resolved": "https://registry.npmjs.org/ansi-styles/-/ansi-styles-4.3.0.tgz",
|
||||
"integrity": "sha512-zbB9rCJAT1rbjiVDb2hqKFHNYLxgtk8NURxZ3IZwD3F6NtxbXZQCnnSi1Lkx+IDohdPlFp222wVALIheZJQSEg==",
|
||||
"dev": true,
|
||||
"optional": true,
|
||||
"requires": {
|
||||
"color-convert": "^2.0.1"
|
||||
}
|
||||
},
|
||||
"chalk": {
|
||||
"version": "4.1.2",
|
||||
"resolved": "https://registry.npmjs.org/chalk/-/chalk-4.1.2.tgz",
|
||||
"integrity": "sha512-oKnbhFyRIXpUuez8iBMmyEa4nbj4IOQyuhc/wy9kY7/WVPcwIO9VA668Pu8RkO7+0G76SLROeyw9CpQ061i4mA==",
|
||||
"dev": true,
|
||||
"optional": true,
|
||||
"requires": {
|
||||
"ansi-styles": "^4.1.0",
|
||||
"supports-color": "^7.1.0"
|
||||
}
|
||||
},
|
||||
"color-convert": {
|
||||
"version": "2.0.1",
|
||||
"resolved": "https://registry.npmjs.org/color-convert/-/color-convert-2.0.1.tgz",
|
||||
"integrity": "sha512-RRECPsj7iu/xb5oKYcsFHSppFNnsj/52OVTRKb4zP5onXwVF3zVmmToNcOfGC+CRDpfK/U584fMg38ZHCaElKQ==",
|
||||
"dev": true,
|
||||
"optional": true,
|
||||
"requires": {
|
||||
"color-name": "~1.1.4"
|
||||
}
|
||||
},
|
||||
"color-name": {
|
||||
"version": "1.1.4",
|
||||
"resolved": "https://registry.npmjs.org/color-name/-/color-name-1.1.4.tgz",
|
||||
"integrity": "sha512-dOy+3AuW3a2wNbZHIuMZpTcgjGuLU/uBL/ubcZF9OXbDo8ff4O8yVp5Bf0efS8uEoYo5q4Fx7dY9OgQGXgAsQA==",
|
||||
"dev": true,
|
||||
"optional": true
|
||||
},
|
||||
"has-flag": {
|
||||
"version": "4.0.0",
|
||||
"resolved": "https://registry.npmjs.org/has-flag/-/has-flag-4.0.0.tgz",
|
||||
"integrity": "sha512-EykJT/Q1KjTWctppgIAgfSO0tKVuZUjhgMr17kqTumMl6Afv3EISleU7qZUzoXDFTAHTDC4NOoG/ZxU3EvlMPQ==",
|
||||
"dev": true,
|
||||
"optional": true
|
||||
},
|
||||
"loader-utils": {
|
||||
"version": "2.0.2",
|
||||
"resolved": "https://registry.npmjs.org/loader-utils/-/loader-utils-2.0.2.tgz",
|
||||
"integrity": "sha512-TM57VeHptv569d/GKh6TAYdzKblwDNiumOdkFnejjD0XwTH87K90w3O7AiJRqdQoXygvi1VQTJTLGhJl7WqA7A==",
|
||||
"dev": true,
|
||||
"optional": true,
|
||||
"requires": {
|
||||
"big.js": "^5.2.2",
|
||||
"emojis-list": "^3.0.0",
|
||||
"json5": "^2.1.2"
|
||||
}
|
||||
},
|
||||
"supports-color": {
|
||||
"version": "7.2.0",
|
||||
"resolved": "https://registry.npmjs.org/supports-color/-/supports-color-7.2.0.tgz",
|
||||
"integrity": "sha512-qpCAvRl9stuOHveKsn7HncJRvv501qIacKzQlO/+Lwxc9+0q2wLyv4Dfvt80/DPn2pqOBsJdDiogXGR9+OvwRw==",
|
||||
"dev": true,
|
||||
"optional": true,
|
||||
"requires": {
|
||||
"has-flag": "^4.0.0"
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
"vue-ref": {
|
||||
"version": "2.0.0",
|
||||
"resolved": "https://registry.npm.taobao.org/vue-ref/download/vue-ref-2.0.0.tgz",
|
||||
|
||||
|
Before Width: | Height: | Size: 4.2 KiB After Width: | Height: | Size: 5.4 KiB |
BIN
ui/public/vue.ico
Normal file
|
After Width: | Height: | Size: 4.2 KiB |
@@ -276,4 +276,8 @@ export const KafkaMessageApi = {
|
||||
url: "/message/resend",
|
||||
method: "post",
|
||||
},
|
||||
delete: {
|
||||
url: "/message",
|
||||
method: "delete",
|
||||
},
|
||||
};
|
||||
|
||||
196
ui/src/views/message/DeleteMessage.vue
Normal file
@@ -0,0 +1,196 @@
|
||||
<template>
|
||||
<div class="tab-content">
|
||||
<a-spin :spinning="loading">
|
||||
<div>
|
||||
<h4 class="hint-content">
|
||||
注意:以下删除,将删除该分区比该偏移位点小的所有消息(不包含该位点)
|
||||
</h4>
|
||||
<hr />
|
||||
</div>
|
||||
<div id="search-offset-form-advanced-search">
|
||||
<a-form
|
||||
class="ant-advanced-search-form"
|
||||
:form="form"
|
||||
@submit="handleSearch"
|
||||
>
|
||||
<a-row :gutter="24">
|
||||
<a-col :span="9">
|
||||
<a-form-item label="topic">
|
||||
<a-select
|
||||
class="topic-select"
|
||||
@change="handleTopicChange"
|
||||
show-search
|
||||
option-filter-prop="children"
|
||||
v-decorator="[
|
||||
'topic',
|
||||
{
|
||||
rules: [{ required: true, message: '请选择一个topic!' }],
|
||||
},
|
||||
]"
|
||||
placeholder="请选择一个topic"
|
||||
>
|
||||
<a-select-option v-for="v in topicList" :key="v" :value="v">
|
||||
{{ v }}
|
||||
</a-select-option>
|
||||
</a-select>
|
||||
</a-form-item>
|
||||
</a-col>
|
||||
<a-col :span="6">
|
||||
<a-form-item label="分区">
|
||||
<a-select
|
||||
class="type-select"
|
||||
show-search
|
||||
option-filter-prop="children"
|
||||
v-model="selectPartition"
|
||||
placeholder="请选择一个分区"
|
||||
>
|
||||
<a-select-option v-for="v in partitions" :key="v" :value="v">
|
||||
<span v-if="v == -1">全部</span> <span v-else>{{ v }}</span>
|
||||
</a-select-option>
|
||||
</a-select>
|
||||
</a-form-item>
|
||||
</a-col>
|
||||
<a-col :span="7">
|
||||
<a-form-item label="偏移">
|
||||
<a-input
|
||||
v-decorator="[
|
||||
'offset',
|
||||
{
|
||||
rules: [{ required: true, message: '请输入消息偏移!' }],
|
||||
},
|
||||
]"
|
||||
placeholder="消息偏移"
|
||||
/>
|
||||
</a-form-item>
|
||||
</a-col>
|
||||
<a-col :span="2" :style="{ textAlign: 'right' }">
|
||||
<a-form-item>
|
||||
<a-button type="primary" html-type="submit"> 执行删除</a-button>
|
||||
</a-form-item>
|
||||
</a-col>
|
||||
</a-row>
|
||||
</a-form>
|
||||
</div>
|
||||
</a-spin>
|
||||
</div>
|
||||
</template>
|
||||
|
||||
<script>
|
||||
import request from "@/utils/request";
|
||||
import { KafkaMessageApi, KafkaTopicApi } from "@/utils/api";
|
||||
import notification from "ant-design-vue/lib/notification";
|
||||
|
||||
export default {
|
||||
name: "DeleteMessage",
|
||||
props: {
|
||||
topicList: {
|
||||
type: Array,
|
||||
},
|
||||
},
|
||||
data() {
|
||||
return {
|
||||
loading: false,
|
||||
form: this.$form.createForm(this, { name: "message_search_offset" }),
|
||||
partitions: [],
|
||||
selectPartition: undefined,
|
||||
};
|
||||
},
|
||||
methods: {
|
||||
handleSearch(e) {
|
||||
e.preventDefault();
|
||||
this.form.validateFields((err, values) => {
|
||||
if (!err) {
|
||||
const data = Object.assign({}, values, {
|
||||
partition: this.selectPartition,
|
||||
});
|
||||
this.loading = true;
|
||||
request({
|
||||
url: KafkaMessageApi.delete.url,
|
||||
method: KafkaMessageApi.delete.method,
|
||||
data: [data],
|
||||
}).then((res) => {
|
||||
this.loading = false;
|
||||
if (res.code == 0) {
|
||||
this.$message.success(res.msg);
|
||||
} else {
|
||||
notification.error({
|
||||
message: "error",
|
||||
description: res.msg,
|
||||
});
|
||||
}
|
||||
});
|
||||
}
|
||||
});
|
||||
},
|
||||
getPartitionInfo(topic) {
|
||||
this.loading = true;
|
||||
request({
|
||||
url: KafkaTopicApi.getPartitionInfo.url + "?topic=" + topic,
|
||||
method: KafkaTopicApi.getPartitionInfo.method,
|
||||
}).then((res) => {
|
||||
this.loading = false;
|
||||
if (res.code != 0) {
|
||||
notification.error({
|
||||
message: "error",
|
||||
description: res.msg,
|
||||
});
|
||||
} else {
|
||||
this.partitions = res.data.map((v) => v.partition);
|
||||
}
|
||||
});
|
||||
},
|
||||
handleTopicChange(topic) {
|
||||
this.selectPartition =
|
||||
this.partitions.length > 0 ? this.partitions[0] : 0;
|
||||
this.getPartitionInfo(topic);
|
||||
},
|
||||
},
|
||||
};
|
||||
</script>
|
||||
|
||||
<style scoped>
|
||||
.tab-content {
|
||||
width: 100%;
|
||||
height: 100%;
|
||||
}
|
||||
|
||||
.ant-advanced-search-form {
|
||||
padding: 24px;
|
||||
background: #fbfbfb;
|
||||
border: 1px solid #d9d9d9;
|
||||
border-radius: 6px;
|
||||
}
|
||||
|
||||
.ant-advanced-search-form .ant-form-item {
|
||||
display: flex;
|
||||
}
|
||||
|
||||
.ant-advanced-search-form .ant-form-item-control-wrapper {
|
||||
flex: 1;
|
||||
}
|
||||
|
||||
#components-form-topic-advanced-search .ant-form {
|
||||
max-width: none;
|
||||
margin-bottom: 1%;
|
||||
}
|
||||
|
||||
#search-offset-form-advanced-search .search-result-list {
|
||||
margin-top: 16px;
|
||||
border: 1px dashed #e9e9e9;
|
||||
border-radius: 6px;
|
||||
background-color: #fafafa;
|
||||
min-height: 200px;
|
||||
text-align: center;
|
||||
padding-top: 80px;
|
||||
}
|
||||
.topic-select {
|
||||
width: 400px !important;
|
||||
}
|
||||
.type-select {
|
||||
width: 200px !important;
|
||||
}
|
||||
|
||||
.hint-content {
|
||||
color: red;
|
||||
}
|
||||
</style>
|
||||
@@ -11,6 +11,9 @@
|
||||
<a-tab-pane key="3" tab="在线发送">
|
||||
<SendMessage :topic-list="topicList"></SendMessage>
|
||||
</a-tab-pane>
|
||||
<a-tab-pane key="4" tab="在线删除">
|
||||
<DeleteMessage :topic-list="topicList"></DeleteMessage>
|
||||
</a-tab-pane>
|
||||
</a-tabs>
|
||||
</a-spin>
|
||||
</div>
|
||||
@@ -23,9 +26,10 @@ import request from "@/utils/request";
|
||||
import { KafkaTopicApi } from "@/utils/api";
|
||||
import notification from "ant-design-vue/lib/notification";
|
||||
import SendMessage from "@/views/message/SendMessage";
|
||||
import DeleteMessage from "./DeleteMessage";
|
||||
export default {
|
||||
name: "Message",
|
||||
components: { SearchByTime, SearchByOffset, SendMessage },
|
||||
components: { DeleteMessage, SearchByTime, SearchByOffset, SendMessage },
|
||||
data() {
|
||||
return {
|
||||
loading: false,
|
||||
|
||||
@@ -49,10 +49,26 @@
|
||||
<a-button type="primary" @click="openCreateTopicDialog"
|
||||
>新增</a-button
|
||||
>
|
||||
<a-popconfirm
|
||||
title="删除这些Topic?"
|
||||
ok-text="确认"
|
||||
cancel-text="取消"
|
||||
@confirm="deleteTopics(selectedRowKeys)"
|
||||
>
|
||||
<a-button type="danger" class="btn-left" :disabled="!hasSelected" :loading="loading">
|
||||
批量删除
|
||||
</a-button>
|
||||
</a-popconfirm>
|
||||
<span style="margin-left: 8px">
|
||||
<template v-if="hasSelected">
|
||||
{{ `已选择 ${selectedRowKeys.length} 个Topic` }}
|
||||
</template>
|
||||
</span>
|
||||
</div>
|
||||
<a-table
|
||||
:columns="columns"
|
||||
:data-source="filteredData"
|
||||
:row-selection="{ selectedRowKeys: selectedRowKeys, onChange: onSelectChange }"
|
||||
bordered
|
||||
row-key="name"
|
||||
>
|
||||
@@ -225,8 +241,14 @@ export default {
|
||||
filterTopic: "",
|
||||
filteredData: [],
|
||||
type: "normal",
|
||||
selectedRowKeys: [], // Check here to configure the default column
|
||||
};
|
||||
},
|
||||
computed: {
|
||||
hasSelected() {
|
||||
return this.selectedRowKeys.length > 0;
|
||||
},
|
||||
},
|
||||
methods: {
|
||||
handleSearch(e) {
|
||||
e.preventDefault();
|
||||
@@ -256,14 +278,16 @@ export default {
|
||||
}
|
||||
});
|
||||
},
|
||||
deleteTopic(topic) {
|
||||
deleteTopics(topics) {
|
||||
request({
|
||||
url: KafkaTopicApi.deleteTopic.url + "?topic=" + topic,
|
||||
url: KafkaTopicApi.deleteTopic.url,
|
||||
method: KafkaTopicApi.deleteTopic.method,
|
||||
data: topics
|
||||
}).then((res) => {
|
||||
if (res.code == 0) {
|
||||
this.$message.success(res.msg);
|
||||
this.getTopicList();
|
||||
this.selectedRowKeys = [];
|
||||
} else {
|
||||
notification.error({
|
||||
message: "error",
|
||||
@@ -272,6 +296,9 @@ export default {
|
||||
}
|
||||
});
|
||||
},
|
||||
deleteTopic(topic) {
|
||||
this.deleteTopics([topic])
|
||||
},
|
||||
onTopicUpdate(input) {
|
||||
this.filterTopic = input.target.value;
|
||||
this.filter();
|
||||
@@ -342,9 +369,13 @@ export default {
|
||||
closeThrottleDialog() {
|
||||
this.showThrottleDialog = false;
|
||||
},
|
||||
onSelectChange(selectedRowKeys) {
|
||||
this.selectedRowKeys = selectedRowKeys;
|
||||
},
|
||||
},
|
||||
created() {
|
||||
this.getTopicList();
|
||||
this.selectedRowKeys = [];
|
||||
},
|
||||
};
|
||||
|
||||
@@ -431,4 +462,8 @@ const columns = [
|
||||
.type-select {
|
||||
width: 200px !important;
|
||||
}
|
||||
|
||||
.btn-left {
|
||||
margin-left: 1%;
|
||||
}
|
||||
</style>
|
||||
|
||||