15 Commits

Author SHA1 Message Date
许晓东
f0369c6246 优化linux启停脚本 2025-08-06 20:21:02 +08:00
许晓东
0027f55184 windows启动脚本支持任意路径,增加PowerShell启动脚本. 2025-07-30 20:57:56 +08:00
许晓东
745aa6e9bc 升级v1.0.13版本 2025-07-01 20:34:22 +08:00
许晓东
1e86aa4569 增加消息转发功能. 2025-06-16 20:28:26 +08:00
许晓东
780d05bc06 增加消息转发接口. 2025-06-11 20:12:09 +08:00
许晓东
a755ffb010 消息详情对话框点击模态框的遮罩层关闭模态框. 2025-05-27 20:52:37 +08:00
许晓东
39544327ee 补充TopicConsole一个注释. 2025-05-20 21:07:08 +08:00
许晓东
9b7263241c broker配置mask close默认设置为true 2025-04-24 20:28:33 +08:00
许晓东
680630d372 Topic菜单消费详情支持点击刷新 2025-03-16 19:47:42 +08:00
许晓东
d554053e44 发布v1.0.12版本 2025-02-24 22:53:02 +08:00
许晓东
ba0a12fd5f 升级v1.0.12版本 2025-02-24 22:39:27 +08:00
许晓东
a927dd412e 消费组支持模糊过滤查询. 2025-02-22 20:56:00 +08:00
许晓东
a48812ed0e message提示框显示1秒,最多显示1条. 2025-01-12 22:02:49 +08:00
许晓东
dd2863e51d topic-发送统计: 增加刷新按钮 2024-12-16 20:50:54 +08:00
许晓东
89846018cc 发布1.0.11版本 2024-10-06 12:55:43 +08:00
24 changed files with 629 additions and 45 deletions

View File

@@ -25,16 +25,16 @@ v1.0.6版本之前如果kafka集群启用了ACL但是控制台没看到Acl
![功能特性](./document/img/功能特性.png)
## 安装包下载
点击下载(v1.0.10版本)[kafka-console-ui.zip](https://github.com/xxd763795151/kafka-console-ui/releases/download/v1.0.10/kafka-console-ui-1.0.10.zip)
点击下载(v1.0.13版本)[kafka-console-ui.zip](https://github.com/xxd763795151/kafka-console-ui/releases/download/v1.0.13/kafka-console-ui-1.0.13.zip)
如果安装包下载的比较慢,可以查看下面的源码打包说明,把代码下载下来,本地快速打包.
github下载慢也可以试试从gitee下载点击下载[gitee来源kafka-console-ui.zip](https://gitee.com/xiaodong_xu/kafka-console-ui/releases/download/v1.0.10/kafka-console-ui-1.0.10.zip)
github下载慢也可以试试从gitee下载点击下载[gitee来源kafka-console-ui.zip](https://gitee.com/xiaodong_xu/kafka-console-ui/releases/download/v1.0.13/kafka-console-ui-1.0.13.zip)
## 快速使用
### Windows
1. 解压缩zip安装包
2. 进入bin目录必须在bin目录下双击执行`start.bat`启动
2. 进入bin目录, 双击执行`start.bat`启动; 如果使用PowerShell, 也可以选择运行`start.ps1`启动
3. 停止:直接关闭启动的命令行窗口即可
### Linux或Mac OS
@@ -68,7 +68,7 @@ sh bin/shutdown.sh
在新增集群的时候除了集群地址还可以输入集群的其它属性配置比如请求超时ACL配置等。如果开启了ACL切换到该集群的时候导航栏上便会出现ACL菜单支持进行相关操作目前是基于SASL_SCRAM认证授权管理支持的最完善其它的我也没验证过虽然是我开发的但是我也没具体全部验证这一块功能授权部分应该是通用的
## kafka版本
* 当前使用的kafka 3.2.0
* 当前使用的kafka 3.5.0
## 监控
仅提供运维管理功能监控、告警需要配合其它组件如有需要建议请查看https://blog.csdn.net/x763795151/article/details/119705372

View File

@@ -23,6 +23,7 @@
<includes>
<include>*.sh</include>
<include>*.bat</include>
<include>*.ps1</include>
</includes>
<outputDirectory>bin</outputDirectory>
<fileMode>0755</fileMode>

View File

@@ -1,13 +1,61 @@
#!/bin/bash
PREFIX='./'
CMD=$0
if [[ $CMD == $PREFIX* ]]; then
CMD=${CMD:2}
# 获取脚本真实路径兼容Linux和macOS
if [ -L "$0" ]; then
# 处理符号链接
if command -v greadlink >/dev/null 2>&1; then
SCRIPT_PATH=$(greadlink -f "$0")
elif command -v readlink >/dev/null 2>&1; then
SCRIPT_PATH=$(readlink -f "$0")
else
# 使用perl作为备选
SCRIPT_PATH=$(perl -e 'use Cwd "abs_path"; print abs_path(shift)' "$0" 2>/dev/null)
fi
else
SCRIPT_PATH="$0"
fi
SCRIPT_DIR=$(dirname "`pwd`/$CMD")
PROJECT_DIR=`dirname "$SCRIPT_DIR"`
# 最终回退方案
if [ -z "$SCRIPT_PATH" ] || [ ! -f "$SCRIPT_PATH" ]; then
SCRIPT_PATH="$0"
fi
# 计算项目根目录
SCRIPT_DIR=$(dirname "$SCRIPT_PATH")
PROJECT_DIR=$(cd "$SCRIPT_DIR" && cd .. && pwd 2>/dev/null)
# 验证项目目录是否存在
if [ ! -d "$PROJECT_DIR" ]; then
echo "ERROR: Failed to determine project directory!" >&2
exit 1
fi
# 不要修改进程标记,作为进程属性关闭使用
PROCESS_FLAG="kafka-console-ui-process-flag:${PROJECT_DIR}"
pkill -f $PROCESS_FLAG
echo 'Stop Kafka-console-ui!'
# 停止前检查进程是否存在
if pgrep -f "$PROCESS_FLAG" >/dev/null; then
# 先尝试正常停止
pkill -f "$PROCESS_FLAG"
# 等待进程退出
TIMEOUT=10
while [ $TIMEOUT -gt 0 ]; do
if ! pgrep -f "$PROCESS_FLAG" >/dev/null; then
break
fi
sleep 1
TIMEOUT=$((TIMEOUT - 1))
done
# 检查是否仍有进程存在
if pgrep -f "$PROCESS_FLAG" >/dev/null; then
# 强制终止
pkill -9 -f "$PROCESS_FLAG"
echo "Stop Kafka-console-ui! (force killed)"
else
echo "Stop Kafka-console-ui! (gracefully stopped)"
fi
else
echo "Kafka-console-ui is not running."
fi

View File

@@ -1,8 +1,38 @@
@echo off
rem MAIN_CLASS=org.springframework.boot.loader.JarLauncher
rem JAVA_HOME=jre1.8.0_66
set JAVA_CMD=%JAVA_HOME%\bin\java
set JAVA_OPTS=-Xmx512m -Xms512m -Xmn256m -Xss256k -Dfile.encoding=utf-8
set CONFIG_FILE=../config/application.yml
set TARGET=../lib/kafka-console-ui.jar
set DATA_DIR=..
"%JAVA_CMD%" -jar %TARGET% --spring.config.location=%CONFIG_FILE% --data.dir=%DATA_DIR%
setlocal enabledelayedexpansion
set "BIN_DIR=%~dp0"
if not "%BIN_DIR:~-1%"=="\" set "BIN_DIR=%BIN_DIR%\"
set "BASE_DIR=%BIN_DIR%.."
for %%I in ("%BASE_DIR%") do set "BASE_DIR=%%~fI"
if defined JAVA_HOME (
set "JAVA_CMD=%JAVA_HOME%\bin\java"
) else (
echo ERROR: JAVA_HOME is not defined
exit /b 1
)
set "JAVA_OPTS=-Xmx512m -Xms512m -Xmn256m -Xss256k -Dfile.encoding=utf-8"
set "CONFIG_FILE=%BASE_DIR%\config\application.yml"
set "TARGET=%BASE_DIR%\lib\kafka-console-ui.jar"
set "DATA_DIR=%BASE_DIR%"
set "LOG_HOME=%BASE_DIR%"
if not exist "%TARGET%" (
echo ERROR: Jar file not found at [%TARGET%]
exit /b 1
)
if not exist "%CONFIG_FILE%" (
echo WARNING: Config file not found at [%CONFIG_FILE%]
)
"%JAVA_CMD%" %JAVA_OPTS% -jar "%TARGET%" --spring.config.location="%CONFIG_FILE%" --data.dir="%DATA_DIR%" --logging.home="%LOG_HOME%"
endlocal

41
bin/start.ps1 Normal file
View File

@@ -0,0 +1,41 @@
# PowerShell
# Set the script execution policy. If necessary, execute this command in PowerShell and then run the script.
# Set-ExecutionPolicy Bypass -Scope Process -Force
param()
$BIN_DIR = $PSScriptRoot
if (-not $BIN_DIR.EndsWith('\')) {
$BIN_DIR += '\'
}
$BASE_DIR = (Get-Item (Join-Path $BIN_DIR "..")).FullName
if (-not $env:JAVA_HOME) {
Write-Error "ERROR: JAVA_HOME is not defined"
exit 1
}
$JAVA_OPTS = "-Xmx512m -Xms512m -Xmn256m -Xss256k -Dfile.encoding=utf-8"
$CONFIG_FILE = Join-Path $BASE_DIR "config\application.yml"
$TARGET = Join-Path $BASE_DIR "lib\kafka-console-ui.jar"
$DATA_DIR = $BASE_DIR
$LOG_HOME = $BASE_DIR
if (-not (Test-Path $TARGET -PathType Leaf)) {
Write-Error "ERROR: Jar file not found at [$TARGET]"
exit 1
}
if (-not (Test-Path $CONFIG_FILE -PathType Leaf)) {
Write-Warning "WARNING: Config file not found at [$CONFIG_FILE]"
}
$javaCmd = Join-Path $env:JAVA_HOME "bin\java.exe"
& $javaCmd $JAVA_OPTS.Split() `
-jar $TARGET `
"--spring.config.location=$CONFIG_FILE" `
"--data.dir=$DATA_DIR" `
"--logging.home=$LOG_HOME"

View File

@@ -1,29 +1,55 @@
#!/bin/bash
# 设置jvm堆大小及栈大小栈大小最少设置为256K不要小于这个值比如设置为128太小了
# 设置jvm堆大小及栈大小
JAVA_MEM_OPTS="-Xmx512m -Xms512m -Xmn256m -Xss256k"
PREFIX='./'
CMD=$0
if [[ $CMD == $PREFIX* ]]; then
CMD=${CMD:2}
# 获取脚本真实路径兼容Linux和macOS
if [ -L "$0" ]; then
# 处理符号链接
if command -v greadlink >/dev/null 2>&1; then
# macOS使用greadlink需brew install coreutils
SCRIPT_PATH=$(greadlink -f "$0")
else
# Linux使用readlink
SCRIPT_PATH=$(readlink -f "$0")
fi
else
SCRIPT_PATH="$0"
fi
SCRIPT_DIR=$(dirname "`pwd`/$CMD")
PROJECT_DIR=`dirname "$SCRIPT_DIR"`
# 如果上述方法失败如macOS无greadlink使用替代方案
if [ -z "$SCRIPT_PATH" ] || [ ! -f "$SCRIPT_PATH" ]; then
# 使用perl跨平台解决方案
SCRIPT_PATH=$(perl -e 'use Cwd "abs_path"; print abs_path(shift)' "$0" 2>/dev/null)
fi
# 最终回退方案
if [ -z "$SCRIPT_PATH" ] || [ ! -f "$SCRIPT_PATH" ]; then
SCRIPT_PATH="$0"
fi
# 计算项目根目录
SCRIPT_DIR=$(dirname "$SCRIPT_PATH")
PROJECT_DIR=$(cd "$SCRIPT_DIR" && cd .. && pwd)
CONF_FILE="$PROJECT_DIR/config/application.yml"
TARGET="$PROJECT_DIR/lib/kafka-console-ui.jar"
#设置h2文件根目录
DATA_DIR=$PROJECT_DIR
# 设置h2文件根目录
DATA_DIR="$PROJECT_DIR"
# 日志目录,默认为当前工程目录下
# 这个是错误输出如果启动命令有误输出到这个文件应用日志不会输出到error.out应用日志输出到上面的kafka-console-ui.log中
ERROR_OUT="$PROJECT_DIR/error.out"
# 不要修改进程标记作为进程属性关闭使用如果要修改请把shutdown.sh里的该属性的值保持一致
# 日志目录
LOG_HOME="$PROJECT_DIR"
PROCESS_FLAG="kafka-console-ui-process-flag:${PROJECT_DIR}"
JAVA_OPTS="$JAVA_OPTS $JAVA_MEM_OPTS -Dfile.encoding=utf-8"
nohup java -jar $JAVA_OPTS $TARGET --spring.config.location="$CONF_FILE" --logging.home="$PROJECT_DIR" --data.dir=$DATA_DIR $PROCESS_FLAG 1>/dev/null 2>$ERROR_OUT &
# 启动应用
nohup java -jar $JAVA_OPTS "$TARGET" \
--spring.config.location="$CONF_FILE" \
--logging.home="$LOG_HOME" \
--data.dir="$DATA_DIR" \
"$PROCESS_FLAG" >/dev/null 2>"$ERROR_OUT" &
echo "Kafka-console-ui Started!"
echo "Kafka-console-ui Started! PID: $!"

View File

@@ -10,7 +10,7 @@
</parent>
<groupId>com.xuxd</groupId>
<artifactId>kafka-console-ui</artifactId>
<version>1.0.11</version>
<version>1.0.13</version>
<name>kafka-console-ui</name>
<description>Kafka console manage ui</description>
<properties>

View File

@@ -0,0 +1,30 @@
package com.xuxd.kafka.console.beans;
import lombok.Data;
/**
* 消息转发请求参数.
*
* @author: xuxd
* @since: 2025/6/5 16:52
**/
@Data
public class ForwardMessage {
private SendMessage message;
/**
* 是否还发到同一个分区.
*/
private boolean samePartition;
/**
* 目标集群id.
*/
private long targetClusterId;
/**
* 目标topic.
*/
private String targetTopic;
}

View File

@@ -2,6 +2,7 @@ package com.xuxd.kafka.console.controller;
import com.xuxd.kafka.console.aspect.annotation.ControllerLog;
import com.xuxd.kafka.console.aspect.annotation.Permission;
import com.xuxd.kafka.console.beans.ForwardMessage;
import com.xuxd.kafka.console.beans.QueryMessage;
import com.xuxd.kafka.console.beans.ResponseData;
import com.xuxd.kafka.console.beans.SendMessage;
@@ -83,4 +84,11 @@ public class MessageController {
}
return messageService.sendStatisticsByTime(dto);
}
@Permission("message:forward")
@ControllerLog("消息转发")
@PostMapping("/forward")
public Object forward(@RequestBody ForwardMessage message) {
return messageService.forward(message);
}
}

View File

@@ -49,6 +49,10 @@ public class ContextSetFilter implements Filter {
String uri = request.getRequestURI();
if (!excludes.contains(uri)) {
String headerId = request.getHeader(Header.ID);
String specificId = request.getHeader(Header.SPECIFIC_ID);
if (StringUtils.isNotBlank(specificId)) {
headerId = specificId;
}
if (StringUtils.isBlank(headerId)) {
// ResponseData failed = ResponseData.create().failed("Cluster info is null.");
ResponseData failed = ResponseData.create().failed("没有集群信息,请先切换集群");
@@ -84,5 +88,6 @@ public class ContextSetFilter implements Filter {
interface Header {
String ID = "X-Cluster-Info-Id";
String NAME = "X-Cluster-Info-Name";
String SPECIFIC_ID = "X-Specific-Cluster-Info-Id";
}
}

View File

@@ -1,5 +1,6 @@
package com.xuxd.kafka.console.service;
import com.xuxd.kafka.console.beans.ForwardMessage;
import com.xuxd.kafka.console.beans.QueryMessage;
import com.xuxd.kafka.console.beans.dto.QuerySendStatisticsDTO;
import com.xuxd.kafka.console.beans.ResponseData;
@@ -32,4 +33,6 @@ public interface MessageService {
ResponseData delete(List<QueryMessage> messages);
ResponseData sendStatisticsByTime(QuerySendStatisticsDTO request);
ResponseData forward(ForwardMessage message);
}

View File

@@ -1,14 +1,15 @@
package com.xuxd.kafka.console.service.impl;
import com.xuxd.kafka.console.beans.MessageFilter;
import com.xuxd.kafka.console.beans.QueryMessage;
import com.xuxd.kafka.console.beans.ResponseData;
import com.xuxd.kafka.console.beans.SendMessage;
import com.xuxd.kafka.console.beans.*;
import com.xuxd.kafka.console.beans.dos.ClusterInfoDO;
import com.xuxd.kafka.console.beans.dto.QuerySendStatisticsDTO;
import com.xuxd.kafka.console.beans.enums.FilterType;
import com.xuxd.kafka.console.beans.vo.ConsumerRecordVO;
import com.xuxd.kafka.console.beans.vo.MessageDetailVO;
import com.xuxd.kafka.console.beans.vo.QuerySendStatisticsVO;
import com.xuxd.kafka.console.config.ContextConfig;
import com.xuxd.kafka.console.config.ContextConfigHolder;
import com.xuxd.kafka.console.dao.ClusterInfoMapper;
import com.xuxd.kafka.console.service.ConsumerService;
import com.xuxd.kafka.console.service.MessageService;
import kafka.console.ConsumerConsole;
@@ -52,6 +53,9 @@ public class MessageServiceImpl implements MessageService, ApplicationContextAwa
@Autowired
private ConsumerConsole consumerConsole;
@Autowired
private ClusterInfoMapper clusterInfoMapper;
private ApplicationContext applicationContext;
private Map<String, Deserializer> deserializerDict = new HashMap<>();
@@ -304,6 +308,47 @@ public class MessageServiceImpl implements MessageService, ApplicationContextAwa
return ResponseData.create().data(vo).success();
}
@Override
public ResponseData forward(ForwardMessage message) {
ClusterInfoDO clusterInfoDO = clusterInfoMapper.selectById(message.getTargetClusterId());
if (clusterInfoDO == null) {
return ResponseData.create().failed("Target cluster not found.");
}
SendMessage sendMessage = message.getMessage();
// first, search message detail
Map<TopicPartition, Object> offsetTable = new HashMap<>();
TopicPartition topicPartition = new TopicPartition(sendMessage.getTopic(), sendMessage.getPartition());
offsetTable.put(topicPartition, sendMessage.getOffset());
Map<TopicPartition, ConsumerRecord<byte[], byte[]>> recordMap = messageConsole.searchBy(offsetTable);
ConsumerRecord<byte[], byte[]> consumerRecord = recordMap.get(topicPartition);
if (consumerRecord == null) {
return ResponseData.create().failed("Source message not found.");
}
String topic = message.getTargetTopic();
if (StringUtils.isEmpty(topic)) {
topic = sendMessage.getTopic();
}
// copy from consumer record.
ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(topic,
message.isSamePartition() ? consumerRecord.partition() : null,
consumerRecord.key(),
consumerRecord.value(),
consumerRecord.headers());
ContextConfig config = ContextConfigHolder.CONTEXT_CONFIG.get();
config.setClusterInfoId(clusterInfoDO.getId());
config.setClusterName(clusterInfoDO.getClusterName());
config.setBootstrapServer(clusterInfoDO.getAddress());
// send.
Tuple2<Object, String> tuple2 = messageConsole.sendSync(record);
boolean success = (boolean) tuple2._1;
if (!success) {
return ResponseData.create().failed(tuple2._2);
}
return ResponseData.create().success();
}
private Map<TopicPartition, ConsumerRecord<byte[], byte[]>> searchRecordByOffset(QueryMessage queryMessage) {
Set<TopicPartition> partitions = getPartitions(queryMessage);

View File

@@ -15,7 +15,7 @@ kafka:
# 缓存连接,不缓存的情况下,每次请求建立连接. 即使每次请求建立连接其实也很快某些情况下开启ACL查询可能很慢可以设置连接缓存为true
# 或者想提高查询速度也可以设置下面连接缓存为true
# 缓存 admin client的连接
cache-admin-connection: false
cache-admin-connection: true
# 缓存 producer的连接
cache-producer-connection: false
# 缓存 consumer的连接

View File

@@ -43,6 +43,7 @@ insert into t_sys_permission(id, name,type,parent_id,permission) values(65,'在
insert into t_sys_permission(id, name,type,parent_id,permission) values(66,'消息详情',1,61,'message:detail');
insert into t_sys_permission(id, name,type,parent_id,permission) values(67,'重新发送',1,61,'message:resend');
insert into t_sys_permission(id, name,type,parent_id,permission) values(68,'发送统计',1,61,'message:send-statistics');
insert into t_sys_permission(id, name,type,parent_id,permission) values(69,'消息转发',1,61,'message:forward');
insert into t_sys_permission(id, name,type,parent_id,permission) values(80,'限流',0,null,'quota');
insert into t_sys_permission(id, name,type,parent_id,permission) values(81,'用户',1,80,'quota:user');
@@ -102,8 +103,8 @@ insert into t_sys_permission(id, name,type,parent_id,permission) values(171,'取
-- t_sys_permission end--
-- t_sys_role start--
insert into t_sys_role(id, role_name, description, permission_ids) VALUES (1,'超级管理员','超级管理员','12,13,14,22,23,24,25,26,27,28,29,30,34,35,31,32,33,42,43,44,45,46,47,48,49,50,62,63,64,65,66,67,68,81,82,83,84,85,86,87,88,101,102,103,104,105,106,107,108,109,110,111,112,113,114,115,141,142,143,144,145,146,147,148,149,150,151,152,153,161,162,163,164,165,166,167,168,169,171,170');
insert into t_sys_role(id, role_name, description, permission_ids) VALUES (2,'普通管理员','普通管理员,不能更改用户信息','12,13,14,22,23,24,25,26,27,28,29,30,34,35,31,32,33,42,43,44,45,46,47,48,49,50,62,63,64,65,66,67,68,81,82,83,84,85,86,87,88,101,102,103,104,105,106,107,108,109,110,111,112,113,114,115,141,146,149,150,161,162,163,164,165,166,167,168,169,171,170');
insert into t_sys_role(id, role_name, description, permission_ids) VALUES (1,'超级管理员','超级管理员','12,13,14,22,23,24,25,26,27,28,29,30,34,35,31,32,33,42,43,44,45,46,47,48,49,50,62,63,64,65,66,67,68,69,81,82,83,84,85,86,87,88,101,102,103,104,105,106,107,108,109,110,111,112,113,114,115,141,142,143,144,145,146,147,148,149,150,151,152,153,161,162,163,164,165,166,167,168,169,171,170');
insert into t_sys_role(id, role_name, description, permission_ids) VALUES (2,'普通管理员','普通管理员,不能更改用户信息','12,13,14,22,23,24,25,26,27,28,29,30,34,35,31,32,33,42,43,44,45,46,47,48,49,50,62,63,64,65,66,67,68,69,81,82,83,84,85,86,87,88,101,102,103,104,105,106,107,108,109,110,111,112,113,114,115,141,146,149,150,161,162,163,164,165,166,167,168,169,171,170');
-- insert into t_sys_role(id, role_name, description, permission_ids) VALUES (2,'访客','访客','12,13,22,26,29,32,44,45,50,62,63,81,83,85,141,146,149,150,161,163');
-- t_sys_role end--

View File

@@ -15,7 +15,7 @@ import scala.collection.{Map, Seq}
import scala.jdk.CollectionConverters.{CollectionHasAsScala, MapHasAsJava, MapHasAsScala, SeqHasAsJava, SetHasAsJava}
/**
* kafka-console-ui.
* kafka topic console.
*
* @author xuxd
* @date 2021-09-08 19:52:27
@@ -52,6 +52,12 @@ class TopicConsole(config: KafkaConfig) extends KafkaConsole(config: KafkaConfig
}).asInstanceOf[Set[String]]
}
/**
* get topic list by topic name list.
*
* @param topics topic name list.
* @return topic list.
*/
def getTopicList(topics: Set[String]): List[TopicDescription] = {
if (topics == null || topics.isEmpty) {
Collections.emptyList()

View File

@@ -17,3 +17,8 @@ new Vue({
store,
render: (h) => h(App),
}).$mount("#app");
Vue.prototype.$message.config({
duration: 1,
maxCount: 1,
});

View File

@@ -300,6 +300,10 @@ export const KafkaMessageApi = {
url: "/message/send/statistics",
method: "post",
},
forward: {
url: "/message/forward",
method: "post",
},
};
export const KafkaClientQuotaApi = {

View File

@@ -6,7 +6,7 @@
:mask="false"
:destroyOnClose="true"
:footer="null"
:maskClosable="false"
:maskClosable="true"
@cancel="handleCancel"
>
<div>

View File

@@ -6,7 +6,7 @@
:mask="false"
:destroyOnClose="true"
:footer="null"
:maskClosable="false"
:maskClosable="true"
@cancel="handleCancel"
>
<div>

View File

@@ -58,6 +58,22 @@
</a-form-item>
</a-col>
</a-row>
<hr class="hr" />
<a-row :gutter="24">
<a-col :span="24">
<a-form-item label="过滤消费组">
<a-input
placeholder="groupId 模糊过滤"
class="input-w"
v-decorator="['filterGroupId']"
@change="onFilterGroupIdUpdate"
/>
<span>
仅过滤当前已查出来的消费组如果要查询服务端最新消费组请点击查询按钮</span
>
</a-form-item>
</a-col>
</a-row>
</a-form>
</div>
<div class="operation-row-button">
@@ -70,7 +86,7 @@
</div>
<a-table
:columns="columns"
:data-source="data"
:data-source="filteredData"
bordered
row-key="groupId"
>
@@ -169,6 +185,7 @@ export default {
return {
queryParam: {},
data: [],
filteredData: [],
columns,
selectRow: {},
form: this.$form.createForm(this, {
@@ -186,6 +203,7 @@ export default {
showConsumerDetailDialog: false,
showAddSubscriptionDialog: false,
showOffsetPartitionDialog: false,
filterGroupId: "",
};
},
methods: {
@@ -209,6 +227,7 @@ export default {
this.loading = false;
if (res.code == 0) {
this.data = res.data.list;
this.filter();
} else {
notification.error({
message: "error",
@@ -268,6 +287,19 @@ export default {
closeOffsetPartitionDialog() {
this.showOffsetPartitionDialog = false;
},
onFilterGroupIdUpdate(input) {
this.filterGroupId = input.target.value;
this.filter();
},
filter() {
if (this.filterGroupId) {
this.filteredData = this.data.filter(
(e) => e.groupId.indexOf(this.filterGroupId) != -1
);
} else {
this.filteredData = this.data;
}
},
},
created() {
this.getConsumerGroupList();
@@ -372,4 +404,9 @@ const columns = [
.type-select {
width: 200px !important;
}
.hr {
height: 1px;
border: none;
border-top: 1px dashed #0066cc;
}
</style>

View File

@@ -0,0 +1,248 @@
<template>
<a-modal
title="转发消息"
:visible="show"
:width="600"
:mask="false"
:destroyOnClose="true"
:footer="null"
:maskClosable="true"
@cancel="handleCancel"
>
<div>
<a-spin :spinning="loading">
<div>
<h4>选择集群</h4>
<hr />
<div class="message-detail" id="message-detail">
<a-form
:form="form"
:label-col="{ span: 5 }"
:wrapper-col="{ span: 18 }"
@submit="handleSubmit"
>
<a-form-item label="集群">
<a-select
class="select-width"
@change="clusterChange"
v-decorator="[
'targetClusterId',
{
rules: [{ required: true, message: '请选择一个集群!' }],
},
]"
placeholder="请选择一个集群"
>
<a-select-option
v-for="v in clusterList"
:key="v.id"
:value="v.id"
>
{{ v.clusterName }}
</a-select-option>
</a-select>
</a-form-item>
<a-form-item label="Topic">
<a-select
class="select-width"
show-search
option-filter-prop="children"
v-decorator="[
'targetTopic',
{
rules: [{ required: true, message: '请选择一个topic!' }],
},
]"
placeholder="请选择一个topic"
>
<a-select-option v-for="v in topicList" :key="v" :value="v">
{{ v }}
</a-select-option>
</a-select>
</a-form-item>
<a-form-item label="相同分区">
<a-radio-group
v-decorator="[
'samePartition',
{
initialValue: 'false',
rules: [{ required: true, message: '请选择!' }],
},
]"
>
<a-radio value="false"> 否</a-radio>
<a-radio value="true"> 是</a-radio>
</a-radio-group>
<span class="mar-left">和原消息保持同一个分区</span>
</a-form-item>
<a-form-item>
<div class="form-footer">
<a-button type="primary" html-type="submit"> 提交</a-button>
</div>
</a-form-item>
</a-form>
</div>
</div>
</a-spin>
</div>
</a-modal>
</template>
<script>
import request from "@/utils/request";
import { KafkaClusterApi, KafkaMessageApi, KafkaTopicApi } from "@/utils/api";
import notification from "ant-design-vue/lib/notification";
import moment from "moment";
export default {
name: "ForwardMessage",
props: {
record: {},
visible: {
type: Boolean,
default: false,
},
},
data() {
return {
show: this.visible,
data: {},
loading: false,
showForwardDialog: false,
targetClusterId: -1,
clusterList: [],
partition: -1,
topicList: [],
form: this.$form.createForm(this, { name: "ForwardMessageForm" }),
};
},
watch: {
visible(v) {
this.show = v;
if (this.show) {
this.getClusterList();
}
},
},
methods: {
getClusterList() {
this.loading = true;
request({
url: KafkaClusterApi.getClusterInfoList.url,
method: KafkaClusterApi.getClusterInfoList.method,
}).then((res) => {
this.loading = false;
if (res.code != 0) {
notification.error({
message: "error",
description: res.msg,
});
} else {
this.clusterList = res.data;
this.targetClusterId = this.clusterList[0].id;
}
});
},
handleSubmit(e) {
e.preventDefault();
this.form.validateFields((err, values) => {
if (!err) {
const params = {
message: Object.assign({}, this.record),
};
this.forward({ ...params, ...values });
}
});
},
handleCancel() {
this.$emit("closeForwardDialog", { refresh: false });
},
formatTime(time) {
return time == -1 ? -1 : moment(time).format("YYYY-MM-DD HH:mm:ss:SSS");
},
clusterChange(e) {
this.getTopicNameList(e);
},
forward(params) {
this.loading = true;
request({
url: KafkaMessageApi.forward.url,
method: KafkaMessageApi.forward.method,
data: params,
}).then((res) => {
this.loading = false;
if (res.code != 0) {
notification.error({
message: "error",
description: res.msg,
});
} else {
this.$message.success(res.msg);
}
});
},
openForwardDialog() {
this.showForwardDialog = true;
},
closeForwardDialog() {
this.showForwardDialog = false;
},
getTopicNameList(clusterInfoId) {
this.loading = true;
request({
url: KafkaTopicApi.getTopicNameList.url,
method: KafkaTopicApi.getTopicNameList.method,
headers: {
"X-Specific-Cluster-Info-Id": clusterInfoId,
},
}).then((res) => {
this.loading = false;
if (res.code == 0) {
this.topicList = res.data;
} else {
notification.error({
message: "error",
description: res.msg,
});
}
});
},
},
};
</script>
<style scoped>
.m-info {
/*text-decoration: underline;*/
}
.title {
width: 15%;
display: inline-block;
text-align: right;
margin-right: 2%;
font-weight: bold;
}
.ant-spin-container #message-detail textarea {
max-width: 80% !important;
vertical-align: top !important;
}
.center {
text-align: center;
}
.mar-left {
margin-left: 1%;
}
.select-width {
width: 80%;
}
.form-footer {
text-align: center;
margin-top: 3%;
}
</style>

View File

@@ -6,7 +6,7 @@
:mask="false"
:destroyOnClose="true"
:footer="null"
:maskClosable="false"
:maskClosable="true"
@cancel="handleCancel"
>
<div>
@@ -120,8 +120,22 @@
重新发送
</a-button>
</a-popconfirm>
<a-button
type="dashed"
class="mar-left"
icon="plus"
v-action:message:forward
@click="openForwardDialog()"
>
转发消息
</a-button>
</div>
</a-spin>
<ForwardMessage
:visible="showForwardDialog"
:record="data"
@closeForwardDialog="closeForwardDialog"
></ForwardMessage>
</div>
</a-modal>
</template>
@@ -131,9 +145,11 @@ import request from "@/utils/request";
import { KafkaMessageApi } from "@/utils/api";
import notification from "ant-design-vue/lib/notification";
import moment from "moment";
import ForwardMessage from "@/views/message/ForwardMessage.vue";
export default {
name: "MessageDetail",
components: { ForwardMessage },
props: {
record: {},
visible: {
@@ -151,6 +167,7 @@ export default {
valueDeserializer: "String",
consumerDetail: [],
columns,
showForwardDialog: false,
};
},
watch: {
@@ -232,6 +249,12 @@ export default {
}
});
},
openForwardDialog() {
this.showForwardDialog = true;
},
closeForwardDialog() {
this.showForwardDialog = false;
},
},
};
const columns = [
@@ -264,4 +287,7 @@ const columns = [
max-width: 80% !important;
vertical-align: top !important;
}
.mar-left {
margin-left: 1%;
}
</style>

View File

@@ -14,6 +14,15 @@
<div v-for="(v, k) in data" :key="k">
<strong>消费组: </strong><span class="color-font">{{ k }}</span
><strong> | 积压: </strong><span class="color-font">{{ v.lag }}</span>
<a-button
type="primary"
icon="reload"
size="small"
style="float: right"
@click="getConsumerDetail"
>
刷新
</a-button>
<hr />
<a-table
:columns="columns"

View File

@@ -11,7 +11,18 @@
>
<div>
<a-spin :spinning="loading">
<h4>今天发送消息数{{ today.total }}</h4>
<h4>
今天发送消息数{{ today.total
}}<a-button
type="primary"
icon="reload"
size="small"
style="float: right"
@click="sendStatus"
>
刷新
</a-button>
</h4>
<a-table
:columns="columns"
:data-source="today.detail"