Compare commits
15 Commits
v1.0.1
...
single-dat
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
b19c6200d2 | ||
|
|
5f6a06c100 | ||
|
|
5930e44fdf | ||
|
|
98f33bb2cc | ||
|
|
0ec3bac6c2 | ||
|
|
bd814d550d | ||
|
|
b9548d1640 | ||
|
|
57a41e087f | ||
|
|
54cd402810 | ||
|
|
c17b0aa4b9 | ||
|
|
8169ddb019 | ||
|
|
5f24c62855 | ||
|
|
3b21fc4cd8 | ||
|
|
d15ec4a2db | ||
|
|
12431db525 |
10
README.md
10
README.md
@@ -1,14 +1,16 @@
|
|||||||
# kafka可视化管理平台
|
# kafka可视化管理平台
|
||||||
一款轻量级的kafka可视化管理平台,安装配置快捷、简单易用。
|
一款轻量级的kafka可视化管理平台,安装配置快捷、简单易用。
|
||||||
为了开发的省事,没有多语言支持,只支持中文展示。
|
为了开发的省事,没有国际化支持,只支持中文展示。
|
||||||
用过rocketmq-console吧,对,前端展示风格跟那个有点类似。
|
用过rocketmq-console吧,对,前端展示风格跟那个有点类似。
|
||||||
## 安装包下载
|
## 安装包下载
|
||||||
* 点击下载:[kafka-console-ui.tar.gz](https://github.com/xxd763795151/kafka-console-ui/releases/download/v1.0.0/kafka-console-ui.tar.gz) 或 [kafka-console-ui.zip](https://github.com/xxd763795151/kafka-console-ui/releases/download/v1.0.0/kafka-console-ui.zip)
|
以下两种方式2选一,直接下载安装包或下载源码,手动打包
|
||||||
* 参考下面的打包部署,下载源码重新打包(最新功能特性)
|
* 点击下载(v1.0.2版本):[kafka-console-ui.tar.gz](https://github.com/xxd763795151/kafka-console-ui/releases/download/v1.0.2/kafka-console-ui.tar.gz) 或 [kafka-console-ui.zip](https://github.com/xxd763795151/kafka-console-ui/releases/download/v1.0.2/kafka-console-ui.zip)
|
||||||
|
* 参考下面的打包部署,下载源码重新打包(提交的最新功能特性)
|
||||||
## 功能支持
|
## 功能支持
|
||||||
* 集群信息
|
* 集群信息
|
||||||
* Topic管理
|
* Topic管理
|
||||||
* 消费组管理
|
* 消费组管理
|
||||||
|
* 消息管理
|
||||||
* 基于SASL_SCRAM认证授权管理
|
* 基于SASL_SCRAM认证授权管理
|
||||||
* 运维
|
* 运维
|
||||||

|

|
||||||
@@ -84,3 +86,5 @@ sh bin/shutdown.sh
|
|||||||

|

|
||||||

|

|
||||||

|

|
||||||
|
增加消息检索页面
|
||||||
|

|
||||||
Binary file not shown.
|
Before Width: | Height: | Size: 204 KiB After Width: | Height: | Size: 492 KiB |
BIN
document/消息.png
Normal file
BIN
document/消息.png
Normal file
Binary file not shown.
|
After Width: | Height: | Size: 400 KiB |
2
pom.xml
2
pom.xml
@@ -10,7 +10,7 @@
|
|||||||
</parent>
|
</parent>
|
||||||
<groupId>com.xuxd</groupId>
|
<groupId>com.xuxd</groupId>
|
||||||
<artifactId>kafka-console-ui</artifactId>
|
<artifactId>kafka-console-ui</artifactId>
|
||||||
<version>1.0.1</version>
|
<version>1.0.2</version>
|
||||||
<name>kafka-console-ui</name>
|
<name>kafka-console-ui</name>
|
||||||
<description>Kafka console manage ui</description>
|
<description>Kafka console manage ui</description>
|
||||||
<properties>
|
<properties>
|
||||||
|
|||||||
27
src/main/java/com/xuxd/kafka/console/beans/QueryMessage.java
Normal file
27
src/main/java/com/xuxd/kafka/console/beans/QueryMessage.java
Normal file
@@ -0,0 +1,27 @@
|
|||||||
|
package com.xuxd.kafka.console.beans;
|
||||||
|
|
||||||
|
import lombok.Data;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* kafka-console-ui.
|
||||||
|
*
|
||||||
|
* @author xuxd
|
||||||
|
* @date 2021-12-11 09:45:49
|
||||||
|
**/
|
||||||
|
@Data
|
||||||
|
public class QueryMessage {
|
||||||
|
|
||||||
|
private String topic;
|
||||||
|
|
||||||
|
private int partition;
|
||||||
|
|
||||||
|
private long startTime;
|
||||||
|
|
||||||
|
private long endTime;
|
||||||
|
|
||||||
|
private long offset;
|
||||||
|
|
||||||
|
private String keyDeserializer;
|
||||||
|
|
||||||
|
private String valueDeserializer;
|
||||||
|
}
|
||||||
25
src/main/java/com/xuxd/kafka/console/beans/SendMessage.java
Normal file
25
src/main/java/com/xuxd/kafka/console/beans/SendMessage.java
Normal file
@@ -0,0 +1,25 @@
|
|||||||
|
package com.xuxd.kafka.console.beans;
|
||||||
|
|
||||||
|
import lombok.Data;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* kafka-console-ui.
|
||||||
|
*
|
||||||
|
* @author xuxd
|
||||||
|
* @date 2021-12-19 23:28:31
|
||||||
|
**/
|
||||||
|
@Data
|
||||||
|
public class SendMessage {
|
||||||
|
|
||||||
|
private String topic;
|
||||||
|
|
||||||
|
private int partition;
|
||||||
|
|
||||||
|
private String key;
|
||||||
|
|
||||||
|
private String body;
|
||||||
|
|
||||||
|
private int num;
|
||||||
|
|
||||||
|
private long offset;
|
||||||
|
}
|
||||||
@@ -0,0 +1,50 @@
|
|||||||
|
package com.xuxd.kafka.console.beans.dto;
|
||||||
|
|
||||||
|
import com.xuxd.kafka.console.beans.QueryMessage;
|
||||||
|
import java.util.Date;
|
||||||
|
import lombok.Data;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* kafka-console-ui.
|
||||||
|
*
|
||||||
|
* @author xuxd
|
||||||
|
* @date 2021-12-11 09:17:59
|
||||||
|
**/
|
||||||
|
@Data
|
||||||
|
public class QueryMessageDTO {
|
||||||
|
|
||||||
|
private String topic;
|
||||||
|
|
||||||
|
private int partition;
|
||||||
|
|
||||||
|
private Date startTime;
|
||||||
|
|
||||||
|
private Date endTime;
|
||||||
|
|
||||||
|
private Long offset;
|
||||||
|
|
||||||
|
private String keyDeserializer;
|
||||||
|
|
||||||
|
private String valueDeserializer;
|
||||||
|
|
||||||
|
public QueryMessage toQueryMessage() {
|
||||||
|
QueryMessage queryMessage = new QueryMessage();
|
||||||
|
queryMessage.setTopic(topic);
|
||||||
|
queryMessage.setPartition(partition);
|
||||||
|
if (startTime != null) {
|
||||||
|
queryMessage.setStartTime(startTime.getTime());
|
||||||
|
}
|
||||||
|
if (endTime != null) {
|
||||||
|
queryMessage.setEndTime(endTime.getTime());
|
||||||
|
}
|
||||||
|
|
||||||
|
if (offset != null) {
|
||||||
|
queryMessage.setOffset(offset);
|
||||||
|
}
|
||||||
|
|
||||||
|
queryMessage.setKeyDeserializer(keyDeserializer);
|
||||||
|
queryMessage.setValueDeserializer(valueDeserializer);
|
||||||
|
|
||||||
|
return queryMessage;
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,32 @@
|
|||||||
|
package com.xuxd.kafka.console.beans.vo;
|
||||||
|
|
||||||
|
import lombok.Data;
|
||||||
|
import org.apache.kafka.clients.consumer.ConsumerRecord;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* kafka-console-ui.
|
||||||
|
*
|
||||||
|
* @author xuxd
|
||||||
|
* @date 2021-12-11 14:19:35
|
||||||
|
**/
|
||||||
|
@Data
|
||||||
|
public class ConsumerRecordVO {
|
||||||
|
|
||||||
|
private String topic;
|
||||||
|
|
||||||
|
private int partition;
|
||||||
|
|
||||||
|
private long offset;
|
||||||
|
|
||||||
|
private long timestamp;
|
||||||
|
|
||||||
|
public static ConsumerRecordVO fromConsumerRecord(ConsumerRecord record) {
|
||||||
|
ConsumerRecordVO vo = new ConsumerRecordVO();
|
||||||
|
vo.setTopic(record.topic());
|
||||||
|
vo.setPartition(record.partition());
|
||||||
|
vo.setOffset(record.offset());
|
||||||
|
vo.setTimestamp(record.timestamp());
|
||||||
|
|
||||||
|
return vo;
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,47 @@
|
|||||||
|
package com.xuxd.kafka.console.beans.vo;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
import lombok.Data;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* kafka-console-ui.
|
||||||
|
*
|
||||||
|
* @author xuxd
|
||||||
|
* @date 2021-12-12 12:45:23
|
||||||
|
**/
|
||||||
|
@Data
|
||||||
|
public class MessageDetailVO {
|
||||||
|
|
||||||
|
private String topic;
|
||||||
|
|
||||||
|
private int partition;
|
||||||
|
|
||||||
|
private long offset;
|
||||||
|
|
||||||
|
private long timestamp;
|
||||||
|
|
||||||
|
private String timestampType;
|
||||||
|
|
||||||
|
private List<HeaderVO> headers = new ArrayList<>();
|
||||||
|
|
||||||
|
private Object key;
|
||||||
|
|
||||||
|
private Object value;
|
||||||
|
|
||||||
|
private List<ConsumerVO> consumers;
|
||||||
|
|
||||||
|
@Data
|
||||||
|
public static class HeaderVO {
|
||||||
|
String key;
|
||||||
|
|
||||||
|
String value;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Data
|
||||||
|
public static class ConsumerVO {
|
||||||
|
String groupId;
|
||||||
|
|
||||||
|
String status;
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -29,6 +29,10 @@ public class TopicPartitionVO {
|
|||||||
|
|
||||||
private long diff;
|
private long diff;
|
||||||
|
|
||||||
|
private long beginTime;
|
||||||
|
|
||||||
|
private long endTime;
|
||||||
|
|
||||||
public static TopicPartitionVO from(TopicPartitionInfo partitionInfo) {
|
public static TopicPartitionVO from(TopicPartitionInfo partitionInfo) {
|
||||||
TopicPartitionVO partitionVO = new TopicPartitionVO();
|
TopicPartitionVO partitionVO = new TopicPartitionVO();
|
||||||
partitionVO.setPartition(partitionInfo.partition());
|
partitionVO.setPartition(partitionInfo.partition());
|
||||||
|
|||||||
@@ -5,6 +5,7 @@ import kafka.console.ConfigConsole;
|
|||||||
import kafka.console.ConsumerConsole;
|
import kafka.console.ConsumerConsole;
|
||||||
import kafka.console.KafkaAclConsole;
|
import kafka.console.KafkaAclConsole;
|
||||||
import kafka.console.KafkaConfigConsole;
|
import kafka.console.KafkaConfigConsole;
|
||||||
|
import kafka.console.MessageConsole;
|
||||||
import kafka.console.OperationConsole;
|
import kafka.console.OperationConsole;
|
||||||
import kafka.console.TopicConsole;
|
import kafka.console.TopicConsole;
|
||||||
import org.springframework.context.annotation.Bean;
|
import org.springframework.context.annotation.Bean;
|
||||||
@@ -54,4 +55,9 @@ public class KafkaConfiguration {
|
|||||||
ConsumerConsole consumerConsole) {
|
ConsumerConsole consumerConsole) {
|
||||||
return new OperationConsole(config, topicConsole, consumerConsole);
|
return new OperationConsole(config, topicConsole, consumerConsole);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
public MessageConsole messageConsole(KafkaConfig config) {
|
||||||
|
return new MessageConsole(config);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -0,0 +1,55 @@
|
|||||||
|
package com.xuxd.kafka.console.controller;
|
||||||
|
|
||||||
|
import com.xuxd.kafka.console.beans.SendMessage;
|
||||||
|
import com.xuxd.kafka.console.beans.dto.QueryMessageDTO;
|
||||||
|
import com.xuxd.kafka.console.service.MessageService;
|
||||||
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
|
import org.springframework.web.bind.annotation.GetMapping;
|
||||||
|
import org.springframework.web.bind.annotation.PostMapping;
|
||||||
|
import org.springframework.web.bind.annotation.RequestBody;
|
||||||
|
import org.springframework.web.bind.annotation.RequestMapping;
|
||||||
|
import org.springframework.web.bind.annotation.RestController;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* kafka-console-ui.
|
||||||
|
*
|
||||||
|
* @author xuxd
|
||||||
|
* @date 2021-12-11 09:22:19
|
||||||
|
**/
|
||||||
|
@RestController
|
||||||
|
@RequestMapping("/message")
|
||||||
|
public class MessageController {
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
private MessageService messageService;
|
||||||
|
|
||||||
|
@PostMapping("/search/time")
|
||||||
|
public Object searchByTime(@RequestBody QueryMessageDTO dto) {
|
||||||
|
return messageService.searchByTime(dto.toQueryMessage());
|
||||||
|
}
|
||||||
|
|
||||||
|
@PostMapping("/search/offset")
|
||||||
|
public Object searchByOffset(@RequestBody QueryMessageDTO dto) {
|
||||||
|
return messageService.searchByOffset(dto.toQueryMessage());
|
||||||
|
}
|
||||||
|
|
||||||
|
@PostMapping("/search/detail")
|
||||||
|
public Object searchDetail(@RequestBody QueryMessageDTO dto) {
|
||||||
|
return messageService.searchDetail(dto.toQueryMessage());
|
||||||
|
}
|
||||||
|
|
||||||
|
@GetMapping("/deserializer/list")
|
||||||
|
public Object deserializerList() {
|
||||||
|
return messageService.deserializerList();
|
||||||
|
}
|
||||||
|
|
||||||
|
@PostMapping("/send")
|
||||||
|
public Object send(@RequestBody SendMessage message) {
|
||||||
|
return messageService.send(message);
|
||||||
|
}
|
||||||
|
|
||||||
|
@PostMapping("/resend")
|
||||||
|
public Object resend(@RequestBody SendMessage message) {
|
||||||
|
return messageService.resend(message);
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -38,4 +38,6 @@ public interface ConsumerService {
|
|||||||
ResponseData getTopicSubscribedByGroups(String topic);
|
ResponseData getTopicSubscribedByGroups(String topic);
|
||||||
|
|
||||||
ResponseData getOffsetPartition(String groupId);
|
ResponseData getOffsetPartition(String groupId);
|
||||||
|
|
||||||
|
ResponseData<Set<String>> getSubscribedGroups(String topic);
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -0,0 +1,26 @@
|
|||||||
|
package com.xuxd.kafka.console.service;
|
||||||
|
|
||||||
|
import com.xuxd.kafka.console.beans.QueryMessage;
|
||||||
|
import com.xuxd.kafka.console.beans.ResponseData;
|
||||||
|
import com.xuxd.kafka.console.beans.SendMessage;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* kafka-console-ui.
|
||||||
|
*
|
||||||
|
* @author xuxd
|
||||||
|
* @date 2021-12-11 09:43:26
|
||||||
|
**/
|
||||||
|
public interface MessageService {
|
||||||
|
|
||||||
|
ResponseData searchByTime(QueryMessage queryMessage);
|
||||||
|
|
||||||
|
ResponseData searchByOffset(QueryMessage queryMessage);
|
||||||
|
|
||||||
|
ResponseData searchDetail(QueryMessage queryMessage);
|
||||||
|
|
||||||
|
ResponseData deserializerList();
|
||||||
|
|
||||||
|
ResponseData send(SendMessage message);
|
||||||
|
|
||||||
|
ResponseData resend(SendMessage message);
|
||||||
|
}
|
||||||
@@ -15,6 +15,7 @@ import java.util.HashSet;
|
|||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
import java.util.concurrent.locks.ReentrantLock;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
import kafka.console.ConsumerConsole;
|
import kafka.console.ConsumerConsole;
|
||||||
import kafka.console.TopicConsole;
|
import kafka.console.TopicConsole;
|
||||||
@@ -48,6 +49,8 @@ public class ConsumerServiceImpl implements ConsumerService {
|
|||||||
@Autowired
|
@Autowired
|
||||||
private TopicConsole topicConsole;
|
private TopicConsole topicConsole;
|
||||||
|
|
||||||
|
private ReentrantLock lock = new ReentrantLock();
|
||||||
|
|
||||||
@Override public ResponseData getConsumerGroupList(List<String> groupIds, Set<ConsumerGroupState> states) {
|
@Override public ResponseData getConsumerGroupList(List<String> groupIds, Set<ConsumerGroupState> states) {
|
||||||
String simulateGroup = "inner_xxx_not_exit_group_###" + System.currentTimeMillis();
|
String simulateGroup = "inner_xxx_not_exit_group_###" + System.currentTimeMillis();
|
||||||
Set<String> groupList = new HashSet<>();
|
Set<String> groupList = new HashSet<>();
|
||||||
@@ -167,25 +170,7 @@ public class ConsumerServiceImpl implements ConsumerService {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override public ResponseData getTopicSubscribedByGroups(String topic) {
|
@Override public ResponseData getTopicSubscribedByGroups(String topic) {
|
||||||
if (topicSubscribedInfo.isNeedRefresh(topic)) {
|
Set<String> groups = this.getSubscribedGroups(topic).getData();
|
||||||
Set<String> groupIdList = consumerConsole.getConsumerGroupIdList(Collections.emptySet());
|
|
||||||
Map<String, Set<String>> cache = new HashMap<>();
|
|
||||||
Map<String, List<TopicPartition>> subscribeTopics = consumerConsole.listSubscribeTopics(groupIdList);
|
|
||||||
|
|
||||||
subscribeTopics.forEach((groupId, tl) -> {
|
|
||||||
tl.forEach(topicPartition -> {
|
|
||||||
String t = topicPartition.topic();
|
|
||||||
if (!cache.containsKey(t)) {
|
|
||||||
cache.put(t, new HashSet<>());
|
|
||||||
}
|
|
||||||
cache.get(t).add(groupId);
|
|
||||||
});
|
|
||||||
});
|
|
||||||
|
|
||||||
topicSubscribedInfo.refresh(cache);
|
|
||||||
}
|
|
||||||
|
|
||||||
Set<String> groups = topicSubscribedInfo.getSubscribedGroups(topic);
|
|
||||||
|
|
||||||
Map<String, Object> res = new HashMap<>();
|
Map<String, Object> res = new HashMap<>();
|
||||||
Collection<ConsumerConsole.TopicPartitionConsumeInfo> consumerDetail = consumerConsole.getConsumerDetail(groups);
|
Collection<ConsumerConsole.TopicPartitionConsumeInfo> consumerDetail = consumerConsole.getConsumerDetail(groups);
|
||||||
@@ -212,6 +197,34 @@ public class ConsumerServiceImpl implements ConsumerService {
|
|||||||
return ResponseData.create().data(Utils.abs(groupId.hashCode()) % size);
|
return ResponseData.create().data(Utils.abs(groupId.hashCode()) % size);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override public ResponseData<Set<String>> getSubscribedGroups(String topic) {
|
||||||
|
if (topicSubscribedInfo.isNeedRefresh(topic) && !lock.isLocked()) {
|
||||||
|
try {
|
||||||
|
lock.lock();
|
||||||
|
Set<String> groupIdList = consumerConsole.getConsumerGroupIdList(Collections.emptySet());
|
||||||
|
Map<String, Set<String>> cache = new HashMap<>();
|
||||||
|
Map<String, List<TopicPartition>> subscribeTopics = consumerConsole.listSubscribeTopics(groupIdList);
|
||||||
|
|
||||||
|
subscribeTopics.forEach((groupId, tl) -> {
|
||||||
|
tl.forEach(topicPartition -> {
|
||||||
|
String t = topicPartition.topic();
|
||||||
|
if (!cache.containsKey(t)) {
|
||||||
|
cache.put(t, new HashSet<>());
|
||||||
|
}
|
||||||
|
cache.get(t).add(groupId);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
topicSubscribedInfo.refresh(cache);
|
||||||
|
} finally {
|
||||||
|
lock.unlock();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Set<String> groups = topicSubscribedInfo.getSubscribedGroups(topic);
|
||||||
|
return ResponseData.create(Set.class).data(groups).success();
|
||||||
|
}
|
||||||
|
|
||||||
class TopicSubscribedInfo {
|
class TopicSubscribedInfo {
|
||||||
long lastTime = System.currentTimeMillis();
|
long lastTime = System.currentTimeMillis();
|
||||||
|
|
||||||
|
|||||||
@@ -0,0 +1,212 @@
|
|||||||
|
package com.xuxd.kafka.console.service.impl;
|
||||||
|
|
||||||
|
import com.xuxd.kafka.console.beans.QueryMessage;
|
||||||
|
import com.xuxd.kafka.console.beans.ResponseData;
|
||||||
|
import com.xuxd.kafka.console.beans.SendMessage;
|
||||||
|
import com.xuxd.kafka.console.beans.vo.ConsumerRecordVO;
|
||||||
|
import com.xuxd.kafka.console.beans.vo.MessageDetailVO;
|
||||||
|
import com.xuxd.kafka.console.service.ConsumerService;
|
||||||
|
import com.xuxd.kafka.console.service.MessageService;
|
||||||
|
import java.util.Collection;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.HashSet;
|
||||||
|
import java.util.LinkedList;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Set;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
import kafka.console.ConsumerConsole;
|
||||||
|
import kafka.console.MessageConsole;
|
||||||
|
import kafka.console.TopicConsole;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.apache.commons.collections.CollectionUtils;
|
||||||
|
import org.apache.commons.lang3.StringUtils;
|
||||||
|
import org.apache.kafka.clients.admin.TopicDescription;
|
||||||
|
import org.apache.kafka.clients.consumer.ConsumerRecord;
|
||||||
|
import org.apache.kafka.clients.producer.ProducerRecord;
|
||||||
|
import org.apache.kafka.common.TopicPartition;
|
||||||
|
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
|
||||||
|
import org.apache.kafka.common.serialization.BytesDeserializer;
|
||||||
|
import org.apache.kafka.common.serialization.Deserializer;
|
||||||
|
import org.apache.kafka.common.serialization.DoubleDeserializer;
|
||||||
|
import org.apache.kafka.common.serialization.FloatDeserializer;
|
||||||
|
import org.apache.kafka.common.serialization.IntegerDeserializer;
|
||||||
|
import org.apache.kafka.common.serialization.StringDeserializer;
|
||||||
|
import org.springframework.beans.BeansException;
|
||||||
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
|
import org.springframework.context.ApplicationContext;
|
||||||
|
import org.springframework.context.ApplicationContextAware;
|
||||||
|
import org.springframework.stereotype.Service;
|
||||||
|
import scala.Tuple2;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* kafka-console-ui.
|
||||||
|
*
|
||||||
|
* @author xuxd
|
||||||
|
* @date 2021-12-11 09:43:44
|
||||||
|
**/
|
||||||
|
@Slf4j
|
||||||
|
@Service
|
||||||
|
public class MessageServiceImpl implements MessageService, ApplicationContextAware {
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
private MessageConsole messageConsole;
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
private TopicConsole topicConsole;
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
private ConsumerConsole consumerConsole;
|
||||||
|
|
||||||
|
private ApplicationContext applicationContext;
|
||||||
|
|
||||||
|
private Map<String, Deserializer> deserializerDict = new HashMap<>();
|
||||||
|
|
||||||
|
{
|
||||||
|
deserializerDict.put("ByteArray", new ByteArrayDeserializer());
|
||||||
|
deserializerDict.put("Integer", new IntegerDeserializer());
|
||||||
|
deserializerDict.put("String", new StringDeserializer());
|
||||||
|
deserializerDict.put("Float", new FloatDeserializer());
|
||||||
|
deserializerDict.put("Double", new DoubleDeserializer());
|
||||||
|
deserializerDict.put("Byte", new BytesDeserializer());
|
||||||
|
}
|
||||||
|
|
||||||
|
public static String defaultDeserializer = "String";
|
||||||
|
|
||||||
|
@Override public ResponseData searchByTime(QueryMessage queryMessage) {
|
||||||
|
int maxNums = 10000;
|
||||||
|
|
||||||
|
Set<TopicPartition> partitions = getPartitions(queryMessage);
|
||||||
|
long startTime = System.currentTimeMillis();
|
||||||
|
List<ConsumerRecord<byte[], byte[]>> records = messageConsole.searchBy(partitions, queryMessage.getStartTime(), queryMessage.getEndTime(), maxNums);
|
||||||
|
log.info("search message by time, cost time: {}", (System.currentTimeMillis() - startTime));
|
||||||
|
List<ConsumerRecordVO> vos = records.stream().filter(record -> record.timestamp() <= queryMessage.getEndTime())
|
||||||
|
.map(ConsumerRecordVO::fromConsumerRecord).collect(Collectors.toList());
|
||||||
|
Map<String, Object> res = new HashMap<>();
|
||||||
|
res.put("maxNum", maxNums);
|
||||||
|
res.put("realNum", vos.size());
|
||||||
|
res.put("data", vos.subList(0, Math.min(maxNums, vos.size())));
|
||||||
|
return ResponseData.create().data(res).success();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override public ResponseData searchByOffset(QueryMessage queryMessage) {
|
||||||
|
Map<TopicPartition, ConsumerRecord<byte[], byte[]>> recordMap = searchRecordByOffset(queryMessage);
|
||||||
|
|
||||||
|
return ResponseData.create().data(recordMap.values().stream().map(ConsumerRecordVO::fromConsumerRecord).collect(Collectors.toList())).success();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override public ResponseData searchDetail(QueryMessage queryMessage) {
|
||||||
|
if (queryMessage.getPartition() == -1) {
|
||||||
|
throw new IllegalArgumentException();
|
||||||
|
}
|
||||||
|
if (StringUtils.isBlank(queryMessage.getKeyDeserializer())) {
|
||||||
|
queryMessage.setKeyDeserializer(defaultDeserializer);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (StringUtils.isBlank(queryMessage.getValueDeserializer())) {
|
||||||
|
queryMessage.setValueDeserializer(defaultDeserializer);
|
||||||
|
}
|
||||||
|
|
||||||
|
Map<TopicPartition, ConsumerRecord<byte[], byte[]>> recordMap = searchRecordByOffset(queryMessage);
|
||||||
|
ConsumerRecord<byte[], byte[]> record = recordMap.get(new TopicPartition(queryMessage.getTopic(), queryMessage.getPartition()));
|
||||||
|
if (record != null) {
|
||||||
|
MessageDetailVO vo = new MessageDetailVO();
|
||||||
|
vo.setTopic(record.topic());
|
||||||
|
vo.setPartition(record.partition());
|
||||||
|
vo.setOffset(record.offset());
|
||||||
|
vo.setTimestamp(record.timestamp());
|
||||||
|
vo.setTimestampType(record.timestampType().name());
|
||||||
|
try {
|
||||||
|
vo.setKey(deserializerDict.get(queryMessage.getKeyDeserializer()).deserialize(queryMessage.getTopic(), record.key()));
|
||||||
|
} catch (Exception e) {
|
||||||
|
vo.setKey("KeyDeserializer Error: " + e.getMessage());
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
vo.setValue(deserializerDict.get(queryMessage.getValueDeserializer()).deserialize(queryMessage.getTopic(), record.value()));
|
||||||
|
} catch (Exception e) {
|
||||||
|
vo.setValue("ValueDeserializer Error: " + e.getMessage());
|
||||||
|
}
|
||||||
|
|
||||||
|
record.headers().forEach(header -> {
|
||||||
|
MessageDetailVO.HeaderVO headerVO = new MessageDetailVO.HeaderVO();
|
||||||
|
headerVO.setKey(header.key());
|
||||||
|
headerVO.setValue(new String(header.value()));
|
||||||
|
vo.getHeaders().add(headerVO);
|
||||||
|
});
|
||||||
|
|
||||||
|
// 为了尽量保持代码好看,不直接注入另一个service层的实现类了
|
||||||
|
Set<String> groupIds = applicationContext.getBean(ConsumerService.class).getSubscribedGroups(record.topic()).getData();
|
||||||
|
Collection<ConsumerConsole.TopicPartitionConsumeInfo> consumerDetail = consumerConsole.getConsumerDetail(groupIds);
|
||||||
|
|
||||||
|
List<MessageDetailVO.ConsumerVO> consumerVOS = new LinkedList<>();
|
||||||
|
consumerDetail.forEach(consumerInfo -> {
|
||||||
|
if (consumerInfo.topicPartition().equals(new TopicPartition(record.topic(), record.partition()))) {
|
||||||
|
MessageDetailVO.ConsumerVO consumerVO = new MessageDetailVO.ConsumerVO();
|
||||||
|
consumerVO.setGroupId(consumerInfo.getGroupId());
|
||||||
|
consumerVO.setStatus(consumerInfo.getConsumerOffset() <= record.offset() ? "unconsume" : "consumed");
|
||||||
|
consumerVOS.add(consumerVO);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
vo.setConsumers(consumerVOS);
|
||||||
|
return ResponseData.create().data(vo).success();
|
||||||
|
}
|
||||||
|
return ResponseData.create().failed("Not found message detail.");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override public ResponseData deserializerList() {
|
||||||
|
return ResponseData.create().data(deserializerDict.keySet()).success();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override public ResponseData send(SendMessage message) {
|
||||||
|
messageConsole.send(message.getTopic(), message.getPartition(), message.getKey(), message.getBody(), message.getNum());
|
||||||
|
return ResponseData.create().success();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override public ResponseData resend(SendMessage message) {
|
||||||
|
TopicPartition partition = new TopicPartition(message.getTopic(), message.getPartition());
|
||||||
|
Map<TopicPartition, Object> offsetTable = new HashMap<>(1, 1.0f);
|
||||||
|
offsetTable.put(partition, message.getOffset());
|
||||||
|
Map<TopicPartition, ConsumerRecord<byte[], byte[]>> recordMap = messageConsole.searchBy(offsetTable);
|
||||||
|
if (recordMap.isEmpty()) {
|
||||||
|
return ResponseData.create().failed("Get message failed.");
|
||||||
|
}
|
||||||
|
ConsumerRecord<byte[], byte[]> record = recordMap.get(partition);
|
||||||
|
ProducerRecord<byte[], byte[]> producerRecord = new ProducerRecord<>(record.topic(), record.partition(), record.key(), record.value(), record.headers());
|
||||||
|
Tuple2<Object, String> tuple2 = messageConsole.sendSync(producerRecord);
|
||||||
|
boolean success = (boolean) tuple2._1();
|
||||||
|
return success ? ResponseData.create().success("success: " + tuple2._2()) : ResponseData.create().failed(tuple2._2());
|
||||||
|
}
|
||||||
|
|
||||||
|
private Map<TopicPartition, ConsumerRecord<byte[], byte[]>> searchRecordByOffset(QueryMessage queryMessage) {
|
||||||
|
Set<TopicPartition> partitions = getPartitions(queryMessage);
|
||||||
|
|
||||||
|
Map<TopicPartition, Object> offsetTable = new HashMap<>();
|
||||||
|
partitions.forEach(tp -> {
|
||||||
|
offsetTable.put(tp, queryMessage.getOffset());
|
||||||
|
});
|
||||||
|
Map<TopicPartition, ConsumerRecord<byte[], byte[]>> recordMap = messageConsole.searchBy(offsetTable);
|
||||||
|
return recordMap;
|
||||||
|
}
|
||||||
|
|
||||||
|
private Set<TopicPartition> getPartitions(QueryMessage queryMessage) {
|
||||||
|
Set<TopicPartition> partitions = new HashSet<>();
|
||||||
|
if (queryMessage.getPartition() != -1) {
|
||||||
|
partitions.add(new TopicPartition(queryMessage.getTopic(), queryMessage.getPartition()));
|
||||||
|
} else {
|
||||||
|
List<TopicDescription> list = topicConsole.getTopicList(Collections.singleton(queryMessage.getTopic()));
|
||||||
|
if (CollectionUtils.isEmpty(list)) {
|
||||||
|
throw new IllegalArgumentException("Can not find topic info.");
|
||||||
|
}
|
||||||
|
Set<TopicPartition> set = list.get(0).partitions().stream()
|
||||||
|
.map(tp -> new TopicPartition(queryMessage.getTopic(), tp.partition())).collect(Collectors.toSet());
|
||||||
|
partitions.addAll(set);
|
||||||
|
}
|
||||||
|
return partitions;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override public void setApplicationContext(ApplicationContext context) throws BeansException {
|
||||||
|
this.applicationContext = context;
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -19,12 +19,14 @@ import java.util.Map;
|
|||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.concurrent.atomic.AtomicLong;
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
import kafka.console.MessageConsole;
|
||||||
import kafka.console.TopicConsole;
|
import kafka.console.TopicConsole;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import org.apache.commons.lang3.StringUtils;
|
import org.apache.commons.lang3.StringUtils;
|
||||||
import org.apache.kafka.clients.admin.NewPartitions;
|
import org.apache.kafka.clients.admin.NewPartitions;
|
||||||
import org.apache.kafka.clients.admin.NewTopic;
|
import org.apache.kafka.clients.admin.NewTopic;
|
||||||
import org.apache.kafka.clients.admin.TopicDescription;
|
import org.apache.kafka.clients.admin.TopicDescription;
|
||||||
|
import org.apache.kafka.clients.consumer.ConsumerRecord;
|
||||||
import org.apache.kafka.common.TopicPartition;
|
import org.apache.kafka.common.TopicPartition;
|
||||||
import org.apache.kafka.common.TopicPartitionInfo;
|
import org.apache.kafka.common.TopicPartitionInfo;
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
@@ -44,6 +46,9 @@ public class TopicServiceImpl implements TopicService {
|
|||||||
@Autowired
|
@Autowired
|
||||||
private TopicConsole topicConsole;
|
private TopicConsole topicConsole;
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
private MessageConsole messageConsole;
|
||||||
|
|
||||||
private Gson gson = GsonUtil.INSTANCE.get();
|
private Gson gson = GsonUtil.INSTANCE.get();
|
||||||
|
|
||||||
@Override public ResponseData getTopicNameList(boolean internal) {
|
@Override public ResponseData getTopicNameList(boolean internal) {
|
||||||
@@ -106,6 +111,10 @@ public class TopicServiceImpl implements TopicService {
|
|||||||
mapTuple2._2().forEach((k, v) -> {
|
mapTuple2._2().forEach((k, v) -> {
|
||||||
endTable.put(k.partition(), (Long) v);
|
endTable.put(k.partition(), (Long) v);
|
||||||
});
|
});
|
||||||
|
// computer the valid time range.
|
||||||
|
Map<TopicPartition, Object> beginOffsetTable = new HashMap<>();
|
||||||
|
Map<TopicPartition, Object> endOffsetTable = new HashMap<>();
|
||||||
|
Map<Integer, TopicPartition> partitionCache = new HashMap<>();
|
||||||
|
|
||||||
for (TopicPartitionVO partitionVO : voList) {
|
for (TopicPartitionVO partitionVO : voList) {
|
||||||
long begin = beginTable.get(partitionVO.getPartition());
|
long begin = beginTable.get(partitionVO.getPartition());
|
||||||
@@ -113,7 +122,29 @@ public class TopicServiceImpl implements TopicService {
|
|||||||
partitionVO.setBeginOffset(begin);
|
partitionVO.setBeginOffset(begin);
|
||||||
partitionVO.setEndOffset(end);
|
partitionVO.setEndOffset(end);
|
||||||
partitionVO.setDiff(end - begin);
|
partitionVO.setDiff(end - begin);
|
||||||
|
|
||||||
|
if (begin != end) {
|
||||||
|
TopicPartition partition = new TopicPartition(topic, partitionVO.getPartition());
|
||||||
|
partitionCache.put(partitionVO.getPartition(), partition);
|
||||||
|
beginOffsetTable.put(partition, begin);
|
||||||
|
endOffsetTable.put(partition, end - 1); // end must < endOff
|
||||||
|
} else {
|
||||||
|
partitionVO.setBeginTime(-1L);
|
||||||
|
partitionVO.setEndTime(-1L);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Map<TopicPartition, ConsumerRecord<byte[], byte[]>> beginRecordMap = messageConsole.searchBy(beginOffsetTable);
|
||||||
|
Map<TopicPartition, ConsumerRecord<byte[], byte[]>> endRecordMap = messageConsole.searchBy(endOffsetTable);
|
||||||
|
|
||||||
|
for (TopicPartitionVO partitionVO : voList) {
|
||||||
|
if (partitionVO.getBeginTime() != -1L) {
|
||||||
|
TopicPartition partition = partitionCache.get(partitionVO.getPartition());
|
||||||
|
partitionVO.setBeginTime(beginRecordMap.containsKey(partition) ? beginRecordMap.get(partition).timestamp() : -1L);
|
||||||
|
partitionVO.setEndTime(endRecordMap.containsKey(partition) ? endRecordMap.get(partition).timestamp() : -1L);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return ResponseData.create().data(voList).success();
|
return ResponseData.create().data(voList).success();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -5,10 +5,11 @@ import kafka.zk.{AdminZkClient, KafkaZkClient}
|
|||||||
import org.apache.kafka.clients.CommonClientConfigs
|
import org.apache.kafka.clients.CommonClientConfigs
|
||||||
import org.apache.kafka.clients.admin._
|
import org.apache.kafka.clients.admin._
|
||||||
import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer, OffsetAndMetadata}
|
import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer, OffsetAndMetadata}
|
||||||
|
import org.apache.kafka.clients.producer.KafkaProducer
|
||||||
import org.apache.kafka.common.TopicPartition
|
import org.apache.kafka.common.TopicPartition
|
||||||
import org.apache.kafka.common.config.SaslConfigs
|
import org.apache.kafka.common.config.SaslConfigs
|
||||||
import org.apache.kafka.common.requests.ListOffsetsResponse
|
import org.apache.kafka.common.requests.ListOffsetsResponse
|
||||||
import org.apache.kafka.common.serialization.ByteArrayDeserializer
|
import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySerializer, StringSerializer}
|
||||||
import org.apache.kafka.common.utils.Time
|
import org.apache.kafka.common.utils.Time
|
||||||
import org.slf4j.{Logger, LoggerFactory}
|
import org.slf4j.{Logger, LoggerFactory}
|
||||||
|
|
||||||
@@ -60,6 +61,38 @@ class KafkaConsole(config: KafkaConfig) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected def withProducerAndCatchError(f: KafkaProducer[String, String] => Any, eh: Exception => Any,
|
||||||
|
extra: Properties = new Properties()): Any = {
|
||||||
|
val props = getProps()
|
||||||
|
props.putAll(extra)
|
||||||
|
props.put(ConsumerConfig.CLIENT_ID_CONFIG, String.valueOf(System.currentTimeMillis()))
|
||||||
|
val producer = new KafkaProducer[String, String](props, new StringSerializer, new StringSerializer)
|
||||||
|
try {
|
||||||
|
f(producer)
|
||||||
|
} catch {
|
||||||
|
case er: Exception => eh(er)
|
||||||
|
}
|
||||||
|
finally {
|
||||||
|
producer.close()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
protected def withByteProducerAndCatchError(f: KafkaProducer[Array[Byte], Array[Byte]] => Any, eh: Exception => Any,
|
||||||
|
extra: Properties = new Properties()): Any = {
|
||||||
|
val props = getProps()
|
||||||
|
props.putAll(extra)
|
||||||
|
props.put(ConsumerConfig.CLIENT_ID_CONFIG, String.valueOf(System.currentTimeMillis()))
|
||||||
|
val producer = new KafkaProducer[Array[Byte], Array[Byte]](props, new ByteArraySerializer, new ByteArraySerializer)
|
||||||
|
try {
|
||||||
|
f(producer)
|
||||||
|
} catch {
|
||||||
|
case er: Exception => eh(er)
|
||||||
|
}
|
||||||
|
finally {
|
||||||
|
producer.close()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
protected def withZKClient(f: AdminZkClient => Any): Any = {
|
protected def withZKClient(f: AdminZkClient => Any): Any = {
|
||||||
val zkClient = KafkaZkClient(config.getZookeeperAddr, false, 30000, 30000, Int.MaxValue, Time.SYSTEM)
|
val zkClient = KafkaZkClient(config.getZookeeperAddr, false, 30000, 30000, Int.MaxValue, Time.SYSTEM)
|
||||||
val adminZkClient = new AdminZkClient(zkClient)
|
val adminZkClient = new AdminZkClient(zkClient)
|
||||||
|
|||||||
196
src/main/scala/kafka/console/MessageConsole.scala
Normal file
196
src/main/scala/kafka/console/MessageConsole.scala
Normal file
@@ -0,0 +1,196 @@
|
|||||||
|
package kafka.console
|
||||||
|
|
||||||
|
import com.xuxd.kafka.console.config.KafkaConfig
|
||||||
|
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
|
||||||
|
import org.apache.kafka.clients.producer.ProducerRecord
|
||||||
|
import org.apache.kafka.common.TopicPartition
|
||||||
|
|
||||||
|
import java.time.Duration
|
||||||
|
import java.util
|
||||||
|
import java.util.Properties
|
||||||
|
import scala.collection.immutable
|
||||||
|
import scala.jdk.CollectionConverters.{CollectionHasAsScala, MapHasAsScala, SeqHasAsJava}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* kafka-console-ui.
|
||||||
|
*
|
||||||
|
* @author xuxd
|
||||||
|
* @date 2021-12-11 09:39:40
|
||||||
|
* */
|
||||||
|
class MessageConsole(config: KafkaConfig) extends KafkaConsole(config: KafkaConfig) with Logging {
|
||||||
|
|
||||||
|
def searchBy(partitions: util.Collection[TopicPartition], startTime: Long, endTime: Long,
|
||||||
|
maxNums: Int): util.List[ConsumerRecord[Array[Byte], Array[Byte]]] = {
|
||||||
|
var startOffTable: immutable.Map[TopicPartition, Long] = Map.empty
|
||||||
|
var endOffTable: immutable.Map[TopicPartition, Long] = Map.empty
|
||||||
|
withAdminClientAndCatchError(admin => {
|
||||||
|
val startTable = KafkaConsole.getLogTimestampOffsets(admin, partitions.asScala.toSeq, startTime, timeoutMs)
|
||||||
|
startOffTable = startTable.map(t2 => (t2._1, t2._2.offset())).toMap
|
||||||
|
|
||||||
|
endOffTable = KafkaConsole.getLogTimestampOffsets(admin, partitions.asScala.toSeq, endTime, timeoutMs)
|
||||||
|
.map(t2 => (t2._1, t2._2.offset())).toMap
|
||||||
|
}, e => {
|
||||||
|
log.error("getLogTimestampOffsets error.", e)
|
||||||
|
throw new RuntimeException("getLogTimestampOffsets error", e)
|
||||||
|
})
|
||||||
|
var terminate: Boolean = (startOffTable == endOffTable)
|
||||||
|
val res = new util.LinkedList[ConsumerRecord[Array[Byte], Array[Byte]]]()
|
||||||
|
// 如果最小和最大偏移一致,就结束
|
||||||
|
if (!terminate) {
|
||||||
|
|
||||||
|
val arrive = new util.HashSet[TopicPartition](partitions)
|
||||||
|
val props = new Properties()
|
||||||
|
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
|
||||||
|
withConsumerAndCatchError(consumer => {
|
||||||
|
consumer.assign(partitions)
|
||||||
|
for ((tp, off) <- startOffTable) {
|
||||||
|
consumer.seek(tp, off)
|
||||||
|
}
|
||||||
|
|
||||||
|
// 终止条件
|
||||||
|
// 1.所有查询分区达都到最大偏移的时候
|
||||||
|
while (!terminate) {
|
||||||
|
// 达到查询的最大条数
|
||||||
|
if (res.size() >= maxNums) {
|
||||||
|
terminate = true
|
||||||
|
} else {
|
||||||
|
val records = consumer.poll(Duration.ofMillis(timeoutMs))
|
||||||
|
|
||||||
|
if (records.isEmpty) {
|
||||||
|
terminate = true
|
||||||
|
} else {
|
||||||
|
for ((tp, endOff) <- endOffTable) {
|
||||||
|
if (!terminate) {
|
||||||
|
var recordList = records.records(tp)
|
||||||
|
if (!recordList.isEmpty) {
|
||||||
|
val first = recordList.get(0)
|
||||||
|
if (first.offset() >= endOff) {
|
||||||
|
arrive.remove(tp)
|
||||||
|
} else {
|
||||||
|
//
|
||||||
|
// (String topic,
|
||||||
|
// int partition,
|
||||||
|
// long offset,
|
||||||
|
// long timestamp,
|
||||||
|
// TimestampType timestampType,
|
||||||
|
// Long checksum,
|
||||||
|
// int serializedKeySize,
|
||||||
|
// int serializedValueSize,
|
||||||
|
// K key,
|
||||||
|
// V value,
|
||||||
|
// Headers headers,
|
||||||
|
// Optional<Integer> leaderEpoch)
|
||||||
|
val nullVList = recordList.asScala.map(record => new ConsumerRecord[Array[Byte], Array[Byte]](record.topic(),
|
||||||
|
record.partition(),
|
||||||
|
record.offset(),
|
||||||
|
record.timestamp(),
|
||||||
|
record.timestampType(),
|
||||||
|
record.checksum(),
|
||||||
|
record.serializedKeySize(),
|
||||||
|
record.serializedValueSize(),
|
||||||
|
record.key(),
|
||||||
|
null,
|
||||||
|
record.headers(),
|
||||||
|
record.leaderEpoch())).toSeq.asJava
|
||||||
|
res.addAll(nullVList)
|
||||||
|
if (recordList.get(recordList.size() - 1).offset() >= endOff) {
|
||||||
|
arrive.remove(tp)
|
||||||
|
}
|
||||||
|
if (recordList != null) {
|
||||||
|
recordList = null
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (arrive.isEmpty) {
|
||||||
|
terminate = true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}, e => {
|
||||||
|
log.error("searchBy time error.", e)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
res
|
||||||
|
}
|
||||||
|
|
||||||
|
def searchBy(
|
||||||
|
tp2o: util.Map[TopicPartition, Long]): util.Map[TopicPartition, ConsumerRecord[Array[Byte], Array[Byte]]] = {
|
||||||
|
val props = new Properties()
|
||||||
|
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
|
||||||
|
val res = new util.HashMap[TopicPartition, ConsumerRecord[Array[Byte], Array[Byte]]]()
|
||||||
|
withConsumerAndCatchError(consumer => {
|
||||||
|
var tpSet = tp2o.keySet()
|
||||||
|
val tpSetCopy = new util.HashSet[TopicPartition](tpSet)
|
||||||
|
val endOffsets = consumer.endOffsets(tpSet)
|
||||||
|
val beginOffsets = consumer.beginningOffsets(tpSet)
|
||||||
|
for ((tp, off) <- tp2o.asScala) {
|
||||||
|
val endOff = endOffsets.get(tp)
|
||||||
|
// if (endOff <= off) {
|
||||||
|
// consumer.seek(tp, endOff)
|
||||||
|
// tpSetCopy.remove(tp)
|
||||||
|
// } else {
|
||||||
|
// consumer.seek(tp, off)
|
||||||
|
// }
|
||||||
|
val beginOff = beginOffsets.get(tp)
|
||||||
|
if (off < beginOff || off >= endOff) {
|
||||||
|
tpSetCopy.remove(tp)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
tpSet = tpSetCopy
|
||||||
|
consumer.assign(tpSet)
|
||||||
|
tpSet.asScala.foreach(tp => {
|
||||||
|
consumer.seek(tp, tp2o.get(tp))
|
||||||
|
})
|
||||||
|
|
||||||
|
var terminate = tpSet.isEmpty
|
||||||
|
while (!terminate) {
|
||||||
|
val records = consumer.poll(Duration.ofMillis(timeoutMs))
|
||||||
|
val tps = new util.HashSet(tpSet).asScala
|
||||||
|
for (tp <- tps) {
|
||||||
|
if (!res.containsKey(tp)) {
|
||||||
|
val recordList = records.records(tp)
|
||||||
|
if (!recordList.isEmpty) {
|
||||||
|
val record = recordList.get(0)
|
||||||
|
res.put(tp, record)
|
||||||
|
tpSet.remove(tp)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (tpSet.isEmpty) {
|
||||||
|
terminate = true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}, e => {
|
||||||
|
log.error("searchBy offset error.", e)
|
||||||
|
})
|
||||||
|
res
|
||||||
|
}
|
||||||
|
|
||||||
|
def send(topic: String, partition: Int, key: String, value: String, num: Int): Unit = {
|
||||||
|
withProducerAndCatchError(producer => {
|
||||||
|
val nullKey = if (key != null && key.trim().length() == 0) null else key
|
||||||
|
for (a <- 1 to num) {
|
||||||
|
val record = if (partition != -1) new ProducerRecord[String, String](topic, partition, nullKey, value)
|
||||||
|
else new ProducerRecord[String, String](topic, nullKey, value)
|
||||||
|
producer.send(record)
|
||||||
|
}
|
||||||
|
}, e => log.error("send error.", e))
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
def sendSync(record: ProducerRecord[Array[Byte], Array[Byte]]): (Boolean, String) = {
|
||||||
|
withByteProducerAndCatchError(producer => {
|
||||||
|
val metadata = producer.send(record).get()
|
||||||
|
(true, metadata.toString())
|
||||||
|
}, e => {
|
||||||
|
log.error("send error.", e)
|
||||||
|
(false, e.getMessage)
|
||||||
|
}).asInstanceOf[(Boolean, String)]
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -1,6 +1,7 @@
|
|||||||
<template>
|
<template>
|
||||||
<div id="app">
|
<div id="app">
|
||||||
<div id="nav">
|
<div id="nav">
|
||||||
|
<h2 class="logo">Kafka 控制台</h2>
|
||||||
<router-link to="/" class="pad-l-r">主页</router-link>
|
<router-link to="/" class="pad-l-r">主页</router-link>
|
||||||
<span>|</span
|
<span>|</span
|
||||||
><router-link to="/cluster-page" class="pad-l-r">集群</router-link>
|
><router-link to="/cluster-page" class="pad-l-r">集群</router-link>
|
||||||
@@ -8,6 +9,8 @@
|
|||||||
><router-link to="/topic-page" class="pad-l-r">Topic</router-link>
|
><router-link to="/topic-page" class="pad-l-r">Topic</router-link>
|
||||||
<span>|</span
|
<span>|</span
|
||||||
><router-link to="/group-page" class="pad-l-r">消费组</router-link>
|
><router-link to="/group-page" class="pad-l-r">消费组</router-link>
|
||||||
|
<span>|</span
|
||||||
|
><router-link to="/message-page" class="pad-l-r">消息</router-link>
|
||||||
<span v-show="config.enableAcl">|</span
|
<span v-show="config.enableAcl">|</span
|
||||||
><router-link to="/acl-page" class="pad-l-r" v-show="config.enableAcl"
|
><router-link to="/acl-page" class="pad-l-r" v-show="config.enableAcl"
|
||||||
>Acl</router-link
|
>Acl</router-link
|
||||||
@@ -44,7 +47,6 @@ export default {
|
|||||||
font-family: Avenir, Helvetica, Arial, sans-serif;
|
font-family: Avenir, Helvetica, Arial, sans-serif;
|
||||||
-webkit-font-smoothing: antialiased;
|
-webkit-font-smoothing: antialiased;
|
||||||
-moz-osx-font-smoothing: grayscale;
|
-moz-osx-font-smoothing: grayscale;
|
||||||
text-align: center;
|
|
||||||
color: #2c3e50;
|
color: #2c3e50;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -59,6 +61,7 @@ export default {
|
|||||||
padding-top: 1%;
|
padding-top: 1%;
|
||||||
padding-bottom: 1%;
|
padding-bottom: 1%;
|
||||||
margin-bottom: 1%;
|
margin-bottom: 1%;
|
||||||
|
text-align: center;
|
||||||
}
|
}
|
||||||
|
|
||||||
#nav a {
|
#nav a {
|
||||||
@@ -81,4 +84,10 @@ export default {
|
|||||||
height: 90%;
|
height: 90%;
|
||||||
width: 100%;
|
width: 100%;
|
||||||
}
|
}
|
||||||
|
.logo {
|
||||||
|
float: left;
|
||||||
|
left: 1%;
|
||||||
|
top: 1%;
|
||||||
|
position: absolute;
|
||||||
|
}
|
||||||
</style>
|
</style>
|
||||||
|
|||||||
@@ -43,6 +43,12 @@ const routes = [
|
|||||||
component: () =>
|
component: () =>
|
||||||
import(/* webpackChunkName: "cluster" */ "../views/cluster/Cluster.vue"),
|
import(/* webpackChunkName: "cluster" */ "../views/cluster/Cluster.vue"),
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
path: "/message-page",
|
||||||
|
name: "Message",
|
||||||
|
component: () =>
|
||||||
|
import(/* webpackChunkName: "cluster" */ "../views/message/Message.vue"),
|
||||||
|
},
|
||||||
];
|
];
|
||||||
|
|
||||||
const router = new VueRouter({
|
const router = new VueRouter({
|
||||||
|
|||||||
@@ -223,3 +223,29 @@ export const KafkaOpApi = {
|
|||||||
method: "delete",
|
method: "delete",
|
||||||
},
|
},
|
||||||
};
|
};
|
||||||
|
export const KafkaMessageApi = {
|
||||||
|
searchByTime: {
|
||||||
|
url: "/message/search/time",
|
||||||
|
method: "post",
|
||||||
|
},
|
||||||
|
searchByOffset: {
|
||||||
|
url: "/message/search/offset",
|
||||||
|
method: "post",
|
||||||
|
},
|
||||||
|
searchDetail: {
|
||||||
|
url: "/message/search/detail",
|
||||||
|
method: "post",
|
||||||
|
},
|
||||||
|
deserializerList: {
|
||||||
|
url: "/message/deserializer/list",
|
||||||
|
method: "get",
|
||||||
|
},
|
||||||
|
send: {
|
||||||
|
url: "/message/send",
|
||||||
|
method: "post",
|
||||||
|
},
|
||||||
|
resend: {
|
||||||
|
url: "/message/resend",
|
||||||
|
method: "post",
|
||||||
|
},
|
||||||
|
};
|
||||||
|
|||||||
@@ -6,7 +6,7 @@ import { VueAxios } from "./axios";
|
|||||||
const request = axios.create({
|
const request = axios.create({
|
||||||
// API 请求的默认前缀
|
// API 请求的默认前缀
|
||||||
baseURL: process.env.VUE_APP_API_BASE_URL,
|
baseURL: process.env.VUE_APP_API_BASE_URL,
|
||||||
timeout: 30000, // 请求超时时间
|
timeout: 120000, // 请求超时时间
|
||||||
});
|
});
|
||||||
|
|
||||||
// 异常拦截处理器
|
// 异常拦截处理器
|
||||||
|
|||||||
58
ui/src/views/message/Message.vue
Normal file
58
ui/src/views/message/Message.vue
Normal file
@@ -0,0 +1,58 @@
|
|||||||
|
<template>
|
||||||
|
<div class="content">
|
||||||
|
<a-spin :spinning="loading">
|
||||||
|
<a-tabs default-active-key="1" size="large" tabPosition="top">
|
||||||
|
<a-tab-pane key="1" tab="根据时间查询消息">
|
||||||
|
<SearchByTime :topic-list="topicList"></SearchByTime>
|
||||||
|
</a-tab-pane>
|
||||||
|
<a-tab-pane key="2" tab="根据偏移查询消息">
|
||||||
|
<SearchByOffset :topic-list="topicList"></SearchByOffset>
|
||||||
|
</a-tab-pane>
|
||||||
|
<a-tab-pane key="3" tab="在线发送">
|
||||||
|
<SendMessage :topic-list="topicList"></SendMessage>
|
||||||
|
</a-tab-pane>
|
||||||
|
</a-tabs>
|
||||||
|
</a-spin>
|
||||||
|
</div>
|
||||||
|
</template>
|
||||||
|
|
||||||
|
<script>
|
||||||
|
import SearchByTime from "@/views/message/SearchByTime";
|
||||||
|
import SearchByOffset from "@/views/message/SearchByOffset";
|
||||||
|
import request from "@/utils/request";
|
||||||
|
import { KafkaTopicApi } from "@/utils/api";
|
||||||
|
import notification from "ant-design-vue/lib/notification";
|
||||||
|
import SendMessage from "@/views/message/SendMessage";
|
||||||
|
export default {
|
||||||
|
name: "Message",
|
||||||
|
components: { SearchByTime, SearchByOffset, SendMessage },
|
||||||
|
data() {
|
||||||
|
return {
|
||||||
|
loading: false,
|
||||||
|
topicList: [],
|
||||||
|
};
|
||||||
|
},
|
||||||
|
methods: {
|
||||||
|
getTopicNameList() {
|
||||||
|
request({
|
||||||
|
url: KafkaTopicApi.getTopicNameList.url,
|
||||||
|
method: KafkaTopicApi.getTopicNameList.method,
|
||||||
|
}).then((res) => {
|
||||||
|
if (res.code == 0) {
|
||||||
|
this.topicList = res.data;
|
||||||
|
} else {
|
||||||
|
notification.error({
|
||||||
|
message: "error",
|
||||||
|
description: res.msg,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
});
|
||||||
|
},
|
||||||
|
},
|
||||||
|
created() {
|
||||||
|
this.getTopicNameList();
|
||||||
|
},
|
||||||
|
};
|
||||||
|
</script>
|
||||||
|
|
||||||
|
<style scoped></style>
|
||||||
265
ui/src/views/message/MessageDetail.vue
Normal file
265
ui/src/views/message/MessageDetail.vue
Normal file
@@ -0,0 +1,265 @@
|
|||||||
|
<template>
|
||||||
|
<a-modal
|
||||||
|
title="消息详情"
|
||||||
|
:visible="show"
|
||||||
|
:width="800"
|
||||||
|
:mask="false"
|
||||||
|
:destroyOnClose="true"
|
||||||
|
:footer="null"
|
||||||
|
:maskClosable="false"
|
||||||
|
@cancel="handleCancel"
|
||||||
|
>
|
||||||
|
<div>
|
||||||
|
<a-spin :spinning="loading">
|
||||||
|
<div>
|
||||||
|
<h4>消息信息</h4>
|
||||||
|
<hr />
|
||||||
|
<div class="message-detail" id="message-detail">
|
||||||
|
<p>
|
||||||
|
<label class="title">Topic: </label>
|
||||||
|
<span class="m-info">{{ data.topic }}</span>
|
||||||
|
</p>
|
||||||
|
<p>
|
||||||
|
<label class="title">分区: </label>
|
||||||
|
<span class="m-info">{{ data.partition }}</span>
|
||||||
|
</p>
|
||||||
|
<p>
|
||||||
|
<label class="title">偏移: </label>
|
||||||
|
<span class="m-info">{{ data.offset }}</span>
|
||||||
|
</p>
|
||||||
|
<p>
|
||||||
|
<label class="title">消息头: </label>
|
||||||
|
<span class="m-info">{{ data.headers }}</span>
|
||||||
|
</p>
|
||||||
|
<p>
|
||||||
|
<label class="title">时间类型: </label>
|
||||||
|
<span class="m-info"
|
||||||
|
>{{
|
||||||
|
data.timestampType
|
||||||
|
}}(表示下面的时间是哪种类型:消息创建、写入日志亦或其它)</span
|
||||||
|
>
|
||||||
|
</p>
|
||||||
|
<p>
|
||||||
|
<label class="title">时间: </label>
|
||||||
|
<span class="m-info">{{ formatTime(data.timestamp) }}</span>
|
||||||
|
</p>
|
||||||
|
<p>
|
||||||
|
<label class="title">Key反序列化: </label>
|
||||||
|
<a-select
|
||||||
|
style="width: 120px"
|
||||||
|
v-model="keyDeserializer"
|
||||||
|
@change="keyDeserializerChange"
|
||||||
|
>
|
||||||
|
<a-select-option
|
||||||
|
v-for="v in deserializerList"
|
||||||
|
:key="v"
|
||||||
|
:value="v"
|
||||||
|
>
|
||||||
|
{{ v }}
|
||||||
|
</a-select-option>
|
||||||
|
</a-select>
|
||||||
|
<span>选一个合适反序列化器,要不可能乱码了</span>
|
||||||
|
</p>
|
||||||
|
<p>
|
||||||
|
<label class="title">Key: </label>
|
||||||
|
<span class="m-info">{{ data.key }}</span>
|
||||||
|
</p>
|
||||||
|
<p>
|
||||||
|
<label class="title">消息体反序列化: </label>
|
||||||
|
<a-select
|
||||||
|
v-model="valueDeserializer"
|
||||||
|
style="width: 120px"
|
||||||
|
@change="valueDeserializerChange"
|
||||||
|
>
|
||||||
|
<a-select-option
|
||||||
|
v-for="v in deserializerList"
|
||||||
|
:key="v"
|
||||||
|
:value="v"
|
||||||
|
>
|
||||||
|
{{ v }}
|
||||||
|
</a-select-option>
|
||||||
|
</a-select>
|
||||||
|
<span>选一个合适反序列化器,要不可能乱码了</span>
|
||||||
|
</p>
|
||||||
|
<p>
|
||||||
|
<label class="title">消息体: </label>
|
||||||
|
<a-textarea
|
||||||
|
type="textarea"
|
||||||
|
:value="data.value"
|
||||||
|
:rows="5"
|
||||||
|
:read-only="true"
|
||||||
|
></a-textarea>
|
||||||
|
</p>
|
||||||
|
</div>
|
||||||
|
</div>
|
||||||
|
<div>
|
||||||
|
<h4>消费信息</h4>
|
||||||
|
<hr />
|
||||||
|
<a-table
|
||||||
|
:columns="columns"
|
||||||
|
:data-source="data.consumers"
|
||||||
|
bordered
|
||||||
|
row-key="groupId"
|
||||||
|
>
|
||||||
|
<div slot="status" slot-scope="text">
|
||||||
|
<span v-if="text == 'consumed'">已消费</span
|
||||||
|
><span v-else style="color: red">未消费</span>
|
||||||
|
</div>
|
||||||
|
</a-table>
|
||||||
|
</div>
|
||||||
|
<div>
|
||||||
|
<h4>操作</h4>
|
||||||
|
<hr />
|
||||||
|
<a-popconfirm
|
||||||
|
title="确定将当前这条消息重新发回broker?"
|
||||||
|
ok-text="确认"
|
||||||
|
cancel-text="取消"
|
||||||
|
@confirm="resend"
|
||||||
|
>
|
||||||
|
<a-button type="primary" icon="reload"> 重新发送 </a-button>
|
||||||
|
</a-popconfirm>
|
||||||
|
</div>
|
||||||
|
</a-spin>
|
||||||
|
</div>
|
||||||
|
</a-modal>
|
||||||
|
</template>
|
||||||
|
|
||||||
|
<script>
|
||||||
|
import request from "@/utils/request";
|
||||||
|
import { KafkaMessageApi } from "@/utils/api";
|
||||||
|
import notification from "ant-design-vue/lib/notification";
|
||||||
|
import moment from "moment";
|
||||||
|
|
||||||
|
export default {
|
||||||
|
name: "MessageDetail",
|
||||||
|
props: {
|
||||||
|
record: {},
|
||||||
|
visible: {
|
||||||
|
type: Boolean,
|
||||||
|
default: false,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
data() {
|
||||||
|
return {
|
||||||
|
show: this.visible,
|
||||||
|
data: {},
|
||||||
|
loading: false,
|
||||||
|
deserializerList: [],
|
||||||
|
keyDeserializer: "String",
|
||||||
|
valueDeserializer: "String",
|
||||||
|
consumerDetail: [],
|
||||||
|
columns,
|
||||||
|
};
|
||||||
|
},
|
||||||
|
watch: {
|
||||||
|
visible(v) {
|
||||||
|
this.show = v;
|
||||||
|
if (this.show) {
|
||||||
|
this.getMessageDetail();
|
||||||
|
this.getDeserializerList();
|
||||||
|
}
|
||||||
|
},
|
||||||
|
},
|
||||||
|
methods: {
|
||||||
|
getMessageDetail() {
|
||||||
|
this.loading = true;
|
||||||
|
const params = Object.assign({}, this.record, {
|
||||||
|
keyDeserializer: this.keyDeserializer,
|
||||||
|
valueDeserializer: this.valueDeserializer,
|
||||||
|
});
|
||||||
|
request({
|
||||||
|
url: KafkaMessageApi.searchDetail.url,
|
||||||
|
method: KafkaMessageApi.searchDetail.method,
|
||||||
|
data: params,
|
||||||
|
}).then((res) => {
|
||||||
|
this.loading = false;
|
||||||
|
if (res.code != 0) {
|
||||||
|
notification.error({
|
||||||
|
message: "error",
|
||||||
|
description: res.msg,
|
||||||
|
});
|
||||||
|
} else {
|
||||||
|
this.data = res.data;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
},
|
||||||
|
getDeserializerList() {
|
||||||
|
request({
|
||||||
|
url: KafkaMessageApi.deserializerList.url,
|
||||||
|
method: KafkaMessageApi.deserializerList.method,
|
||||||
|
}).then((res) => {
|
||||||
|
if (res.code != 0) {
|
||||||
|
notification.error({
|
||||||
|
message: "error",
|
||||||
|
description: res.msg,
|
||||||
|
});
|
||||||
|
} else {
|
||||||
|
this.deserializerList = res.data;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
},
|
||||||
|
handleCancel() {
|
||||||
|
this.data = {};
|
||||||
|
this.$emit("closeDetailDialog", { refresh: false });
|
||||||
|
},
|
||||||
|
formatTime(time) {
|
||||||
|
return moment(time).format("YYYY-MM-DD HH:mm:ss:SSS");
|
||||||
|
},
|
||||||
|
keyDeserializerChange() {
|
||||||
|
this.getMessageDetail();
|
||||||
|
},
|
||||||
|
valueDeserializerChange() {
|
||||||
|
this.getMessageDetail();
|
||||||
|
},
|
||||||
|
resend() {
|
||||||
|
const params = Object.assign({}, this.data);
|
||||||
|
this.loading = true;
|
||||||
|
request({
|
||||||
|
url: KafkaMessageApi.resend.url,
|
||||||
|
method: KafkaMessageApi.resend.method,
|
||||||
|
data: params,
|
||||||
|
}).then((res) => {
|
||||||
|
this.loading = false;
|
||||||
|
if (res.code != 0) {
|
||||||
|
notification.error({
|
||||||
|
message: "error",
|
||||||
|
description: res.msg,
|
||||||
|
});
|
||||||
|
} else {
|
||||||
|
this.$message.success(res.msg);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
},
|
||||||
|
},
|
||||||
|
};
|
||||||
|
const columns = [
|
||||||
|
{
|
||||||
|
title: "消费组",
|
||||||
|
dataIndex: "groupId",
|
||||||
|
key: "groupId",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
title: "消费情况",
|
||||||
|
dataIndex: "status",
|
||||||
|
key: "status",
|
||||||
|
scopedSlots: { customRender: "status" },
|
||||||
|
},
|
||||||
|
];
|
||||||
|
</script>
|
||||||
|
|
||||||
|
<style scoped>
|
||||||
|
.m-info {
|
||||||
|
/*text-decoration: underline;*/
|
||||||
|
}
|
||||||
|
.title {
|
||||||
|
width: 15%;
|
||||||
|
display: inline-block;
|
||||||
|
text-align: right;
|
||||||
|
margin-right: 2%;
|
||||||
|
font-weight: bold;
|
||||||
|
}
|
||||||
|
.ant-spin-container #message-detail textarea {
|
||||||
|
max-width: 80% !important;
|
||||||
|
vertical-align: top !important;
|
||||||
|
}
|
||||||
|
</style>
|
||||||
95
ui/src/views/message/MessageList.vue
Normal file
95
ui/src/views/message/MessageList.vue
Normal file
@@ -0,0 +1,95 @@
|
|||||||
|
<template>
|
||||||
|
<div>
|
||||||
|
<a-table
|
||||||
|
:columns="columns"
|
||||||
|
:data-source="data"
|
||||||
|
bordered
|
||||||
|
:row-key="
|
||||||
|
(record, index) => {
|
||||||
|
return index;
|
||||||
|
}
|
||||||
|
"
|
||||||
|
>
|
||||||
|
<div slot="operation" slot-scope="record">
|
||||||
|
<a-button
|
||||||
|
size="small"
|
||||||
|
href="javascript:;"
|
||||||
|
class="operation-btn"
|
||||||
|
@click="openDetailDialog(record)"
|
||||||
|
>消息详情
|
||||||
|
</a-button>
|
||||||
|
</div>
|
||||||
|
</a-table>
|
||||||
|
<MessageDetail
|
||||||
|
:visible="showDetailDialog"
|
||||||
|
:record="record"
|
||||||
|
@closeDetailDialog="closeDetailDialog"
|
||||||
|
></MessageDetail>
|
||||||
|
</div>
|
||||||
|
</template>
|
||||||
|
|
||||||
|
<script>
|
||||||
|
import moment from "moment";
|
||||||
|
import MessageDetail from "@/views/message/MessageDetail";
|
||||||
|
export default {
|
||||||
|
name: "MessageList",
|
||||||
|
components: { MessageDetail },
|
||||||
|
props: {
|
||||||
|
data: {
|
||||||
|
type: Array,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
data() {
|
||||||
|
return {
|
||||||
|
columns: columns,
|
||||||
|
showDetailDialog: false,
|
||||||
|
record: {},
|
||||||
|
};
|
||||||
|
},
|
||||||
|
methods: {
|
||||||
|
openDetailDialog(record) {
|
||||||
|
this.record = record;
|
||||||
|
this.showDetailDialog = true;
|
||||||
|
},
|
||||||
|
closeDetailDialog() {
|
||||||
|
this.showDetailDialog = false;
|
||||||
|
},
|
||||||
|
},
|
||||||
|
};
|
||||||
|
const columns = [
|
||||||
|
{
|
||||||
|
title: "topic",
|
||||||
|
dataIndex: "topic",
|
||||||
|
key: "topic",
|
||||||
|
width: 300,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
title: "分区",
|
||||||
|
dataIndex: "partition",
|
||||||
|
key: "partition",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
title: "偏移",
|
||||||
|
dataIndex: "offset",
|
||||||
|
key: "offset",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
title: "时间",
|
||||||
|
dataIndex: "timestamp",
|
||||||
|
key: "timestamp",
|
||||||
|
slots: { title: "timestamp" },
|
||||||
|
scopedSlots: { customRender: "timestamp" },
|
||||||
|
customRender: (text) => {
|
||||||
|
return moment(text).format("YYYY-MM-DD HH:mm:ss:SSS");
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
title: "操作",
|
||||||
|
key: "operation",
|
||||||
|
scopedSlots: { customRender: "operation" },
|
||||||
|
width: 200,
|
||||||
|
},
|
||||||
|
];
|
||||||
|
</script>
|
||||||
|
|
||||||
|
<style scoped></style>
|
||||||
195
ui/src/views/message/SearchByOffset.vue
Normal file
195
ui/src/views/message/SearchByOffset.vue
Normal file
@@ -0,0 +1,195 @@
|
|||||||
|
<template>
|
||||||
|
<div class="tab-content">
|
||||||
|
<a-spin :spinning="loading">
|
||||||
|
<div id="search-offset-form-advanced-search">
|
||||||
|
<a-form
|
||||||
|
class="ant-advanced-search-form"
|
||||||
|
:form="form"
|
||||||
|
@submit="handleSearch"
|
||||||
|
>
|
||||||
|
<a-row :gutter="24">
|
||||||
|
<a-col :span="9">
|
||||||
|
<a-form-item label="topic">
|
||||||
|
<a-select
|
||||||
|
class="topic-select"
|
||||||
|
@change="handleTopicChange"
|
||||||
|
show-search
|
||||||
|
option-filter-prop="children"
|
||||||
|
v-decorator="[
|
||||||
|
'topic',
|
||||||
|
{
|
||||||
|
rules: [{ required: true, message: '请选择一个topic!' }],
|
||||||
|
},
|
||||||
|
]"
|
||||||
|
placeholder="请选择一个topic"
|
||||||
|
>
|
||||||
|
<a-select-option v-for="v in topicList" :key="v" :value="v">
|
||||||
|
{{ v }}
|
||||||
|
</a-select-option>
|
||||||
|
</a-select>
|
||||||
|
</a-form-item>
|
||||||
|
</a-col>
|
||||||
|
<a-col :span="6">
|
||||||
|
<a-form-item label="分区">
|
||||||
|
<a-select
|
||||||
|
class="type-select"
|
||||||
|
show-search
|
||||||
|
option-filter-prop="children"
|
||||||
|
v-model="selectPartition"
|
||||||
|
placeholder="请选择一个分区"
|
||||||
|
>
|
||||||
|
<a-select-option v-for="v in partitions" :key="v" :value="v">
|
||||||
|
<span v-if="v == -1">全部</span> <span v-else>{{ v }}</span>
|
||||||
|
</a-select-option>
|
||||||
|
</a-select>
|
||||||
|
</a-form-item>
|
||||||
|
</a-col>
|
||||||
|
<a-col :span="7">
|
||||||
|
<a-form-item label="偏移">
|
||||||
|
<a-input
|
||||||
|
v-decorator="[
|
||||||
|
'offset',
|
||||||
|
{
|
||||||
|
rules: [{ required: true, message: '请输入消息偏移!' }],
|
||||||
|
},
|
||||||
|
]"
|
||||||
|
placeholder="消息偏移"
|
||||||
|
/>
|
||||||
|
</a-form-item>
|
||||||
|
</a-col>
|
||||||
|
<a-col :span="2" :style="{ textAlign: 'right' }">
|
||||||
|
<a-form-item>
|
||||||
|
<a-button type="primary" html-type="submit"> 搜索</a-button>
|
||||||
|
</a-form-item>
|
||||||
|
</a-col>
|
||||||
|
</a-row>
|
||||||
|
</a-form>
|
||||||
|
</div>
|
||||||
|
<MessageList :data="data"></MessageList>
|
||||||
|
</a-spin>
|
||||||
|
</div>
|
||||||
|
</template>
|
||||||
|
|
||||||
|
<script>
|
||||||
|
import request from "@/utils/request";
|
||||||
|
import { KafkaMessageApi, KafkaTopicApi } from "@/utils/api";
|
||||||
|
import notification from "ant-design-vue/lib/notification";
|
||||||
|
import MessageList from "@/views/message/MessageList";
|
||||||
|
|
||||||
|
export default {
|
||||||
|
name: "SearchByOffset",
|
||||||
|
components: { MessageList },
|
||||||
|
props: {
|
||||||
|
topicList: {
|
||||||
|
type: Array,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
data() {
|
||||||
|
return {
|
||||||
|
loading: false,
|
||||||
|
form: this.$form.createForm(this, { name: "message_search_offset" }),
|
||||||
|
partitions: [],
|
||||||
|
selectPartition: undefined,
|
||||||
|
rangeConfig: {
|
||||||
|
rules: [{ type: "array", required: true, message: "请选择时间!" }],
|
||||||
|
},
|
||||||
|
data: defaultData,
|
||||||
|
};
|
||||||
|
},
|
||||||
|
methods: {
|
||||||
|
handleSearch(e) {
|
||||||
|
e.preventDefault();
|
||||||
|
this.form.validateFields((err, values) => {
|
||||||
|
if (!err) {
|
||||||
|
const data = Object.assign({}, values, {
|
||||||
|
partition: this.selectPartition,
|
||||||
|
});
|
||||||
|
this.loading = true;
|
||||||
|
request({
|
||||||
|
url: KafkaMessageApi.searchByOffset.url,
|
||||||
|
method: KafkaMessageApi.searchByOffset.method,
|
||||||
|
data: data,
|
||||||
|
}).then((res) => {
|
||||||
|
this.loading = false;
|
||||||
|
if (res.code == 0) {
|
||||||
|
this.$message.success(res.msg);
|
||||||
|
this.data = res.data;
|
||||||
|
} else {
|
||||||
|
notification.error({
|
||||||
|
message: "error",
|
||||||
|
description: res.msg,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
});
|
||||||
|
},
|
||||||
|
getPartitionInfo(topic) {
|
||||||
|
this.loading = true;
|
||||||
|
request({
|
||||||
|
url: KafkaTopicApi.getPartitionInfo.url + "?topic=" + topic,
|
||||||
|
method: KafkaTopicApi.getPartitionInfo.method,
|
||||||
|
}).then((res) => {
|
||||||
|
this.loading = false;
|
||||||
|
if (res.code != 0) {
|
||||||
|
notification.error({
|
||||||
|
message: "error",
|
||||||
|
description: res.msg,
|
||||||
|
});
|
||||||
|
} else {
|
||||||
|
this.partitions = res.data.map((v) => v.partition);
|
||||||
|
this.partitions.splice(0, 0, -1);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
},
|
||||||
|
handleTopicChange(topic) {
|
||||||
|
this.selectPartition = -1;
|
||||||
|
this.getPartitionInfo(topic);
|
||||||
|
},
|
||||||
|
},
|
||||||
|
};
|
||||||
|
const defaultData = [];
|
||||||
|
</script>
|
||||||
|
|
||||||
|
<style scoped>
|
||||||
|
.tab-content {
|
||||||
|
width: 100%;
|
||||||
|
height: 100%;
|
||||||
|
}
|
||||||
|
|
||||||
|
.ant-advanced-search-form {
|
||||||
|
padding: 24px;
|
||||||
|
background: #fbfbfb;
|
||||||
|
border: 1px solid #d9d9d9;
|
||||||
|
border-radius: 6px;
|
||||||
|
}
|
||||||
|
|
||||||
|
.ant-advanced-search-form .ant-form-item {
|
||||||
|
display: flex;
|
||||||
|
}
|
||||||
|
|
||||||
|
.ant-advanced-search-form .ant-form-item-control-wrapper {
|
||||||
|
flex: 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
#components-form-topic-advanced-search .ant-form {
|
||||||
|
max-width: none;
|
||||||
|
margin-bottom: 1%;
|
||||||
|
}
|
||||||
|
|
||||||
|
#search-offset-form-advanced-search .search-result-list {
|
||||||
|
margin-top: 16px;
|
||||||
|
border: 1px dashed #e9e9e9;
|
||||||
|
border-radius: 6px;
|
||||||
|
background-color: #fafafa;
|
||||||
|
min-height: 200px;
|
||||||
|
text-align: center;
|
||||||
|
padding-top: 80px;
|
||||||
|
}
|
||||||
|
.topic-select {
|
||||||
|
width: 400px !important;
|
||||||
|
}
|
||||||
|
.type-select {
|
||||||
|
width: 200px !important;
|
||||||
|
}
|
||||||
|
</style>
|
||||||
201
ui/src/views/message/SearchByTime.vue
Normal file
201
ui/src/views/message/SearchByTime.vue
Normal file
@@ -0,0 +1,201 @@
|
|||||||
|
<template>
|
||||||
|
<div class="tab-content">
|
||||||
|
<a-spin :spinning="loading">
|
||||||
|
<div id="search-time-form-advanced-search">
|
||||||
|
<a-form
|
||||||
|
class="ant-advanced-search-form"
|
||||||
|
:form="form"
|
||||||
|
@submit="handleSearch"
|
||||||
|
>
|
||||||
|
<a-row :gutter="24">
|
||||||
|
<a-col :span="9">
|
||||||
|
<a-form-item label="topic">
|
||||||
|
<a-select
|
||||||
|
class="topic-select"
|
||||||
|
@change="handleTopicChange"
|
||||||
|
show-search
|
||||||
|
option-filter-prop="children"
|
||||||
|
v-decorator="[
|
||||||
|
'topic',
|
||||||
|
{
|
||||||
|
rules: [{ required: true, message: '请选择一个topic!' }],
|
||||||
|
},
|
||||||
|
]"
|
||||||
|
placeholder="请选择一个topic"
|
||||||
|
>
|
||||||
|
<a-select-option v-for="v in topicList" :key="v" :value="v">
|
||||||
|
{{ v }}
|
||||||
|
</a-select-option>
|
||||||
|
</a-select>
|
||||||
|
</a-form-item>
|
||||||
|
</a-col>
|
||||||
|
<a-col :span="5">
|
||||||
|
<a-form-item label="分区">
|
||||||
|
<a-select
|
||||||
|
class="type-select"
|
||||||
|
show-search
|
||||||
|
option-filter-prop="children"
|
||||||
|
v-model="selectPartition"
|
||||||
|
placeholder="请选择一个分区"
|
||||||
|
>
|
||||||
|
<a-select-option v-for="v in partitions" :key="v" :value="v">
|
||||||
|
<span v-if="v == -1">全部</span> <span v-else>{{ v }}</span>
|
||||||
|
</a-select-option>
|
||||||
|
</a-select>
|
||||||
|
</a-form-item>
|
||||||
|
</a-col>
|
||||||
|
<a-col :span="8">
|
||||||
|
<a-form-item label="时间">
|
||||||
|
<a-range-picker
|
||||||
|
v-decorator="['time', rangeConfig]"
|
||||||
|
show-time
|
||||||
|
format="YYYY-MM-DD HH:mm:ss"
|
||||||
|
/>
|
||||||
|
</a-form-item>
|
||||||
|
</a-col>
|
||||||
|
<a-col :span="2" :style="{ textAlign: 'right' }">
|
||||||
|
<a-form-item>
|
||||||
|
<a-button type="primary" html-type="submit"> 搜索</a-button>
|
||||||
|
</a-form-item>
|
||||||
|
</a-col>
|
||||||
|
</a-row>
|
||||||
|
</a-form>
|
||||||
|
</div>
|
||||||
|
<p style="margin-top: 1%">
|
||||||
|
<strong
|
||||||
|
>检索条数:{{ data.realNum }},允许返回的最大条数:{{
|
||||||
|
data.maxNum
|
||||||
|
}}</strong
|
||||||
|
>
|
||||||
|
</p>
|
||||||
|
<MessageList :data="data.data"></MessageList>
|
||||||
|
</a-spin>
|
||||||
|
</div>
|
||||||
|
</template>
|
||||||
|
|
||||||
|
<script>
|
||||||
|
import request from "@/utils/request";
|
||||||
|
import { KafkaMessageApi, KafkaTopicApi } from "@/utils/api";
|
||||||
|
import notification from "ant-design-vue/lib/notification";
|
||||||
|
import MessageList from "@/views/message/MessageList";
|
||||||
|
|
||||||
|
export default {
|
||||||
|
name: "SearchByTime",
|
||||||
|
components: { MessageList },
|
||||||
|
props: {
|
||||||
|
topicList: {
|
||||||
|
type: Array,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
data() {
|
||||||
|
return {
|
||||||
|
loading: false,
|
||||||
|
form: this.$form.createForm(this, { name: "message_search_time" }),
|
||||||
|
partitions: [],
|
||||||
|
selectPartition: undefined,
|
||||||
|
rangeConfig: {
|
||||||
|
rules: [{ type: "array", required: true, message: "请选择时间!" }],
|
||||||
|
},
|
||||||
|
data: defaultData,
|
||||||
|
};
|
||||||
|
},
|
||||||
|
methods: {
|
||||||
|
handleSearch(e) {
|
||||||
|
e.preventDefault();
|
||||||
|
this.form.validateFields((err, values) => {
|
||||||
|
if (!err) {
|
||||||
|
const data = Object.assign({}, values, {
|
||||||
|
partition: this.selectPartition,
|
||||||
|
});
|
||||||
|
data.startTime = values.time[0].valueOf();
|
||||||
|
data.endTime = values.time[1];
|
||||||
|
this.loading = true;
|
||||||
|
request({
|
||||||
|
url: KafkaMessageApi.searchByTime.url,
|
||||||
|
method: KafkaMessageApi.searchByTime.method,
|
||||||
|
data: data,
|
||||||
|
}).then((res) => {
|
||||||
|
this.loading = false;
|
||||||
|
if (res.code == 0) {
|
||||||
|
this.$message.success(res.msg);
|
||||||
|
this.data = res.data;
|
||||||
|
} else {
|
||||||
|
notification.error({
|
||||||
|
message: "error",
|
||||||
|
description: res.msg,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
});
|
||||||
|
},
|
||||||
|
getPartitionInfo(topic) {
|
||||||
|
this.loading = true;
|
||||||
|
request({
|
||||||
|
url: KafkaTopicApi.getPartitionInfo.url + "?topic=" + topic,
|
||||||
|
method: KafkaTopicApi.getPartitionInfo.method,
|
||||||
|
}).then((res) => {
|
||||||
|
this.loading = false;
|
||||||
|
if (res.code != 0) {
|
||||||
|
notification.error({
|
||||||
|
message: "error",
|
||||||
|
description: res.msg,
|
||||||
|
});
|
||||||
|
} else {
|
||||||
|
this.partitions = res.data.map((v) => v.partition);
|
||||||
|
this.partitions.splice(0, 0, -1);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
},
|
||||||
|
handleTopicChange(topic) {
|
||||||
|
this.selectPartition = -1;
|
||||||
|
this.getPartitionInfo(topic);
|
||||||
|
},
|
||||||
|
},
|
||||||
|
};
|
||||||
|
const defaultData = { realNum: 0, maxNum: 0 };
|
||||||
|
</script>
|
||||||
|
|
||||||
|
<style scoped>
|
||||||
|
.tab-content {
|
||||||
|
width: 100%;
|
||||||
|
height: 100%;
|
||||||
|
}
|
||||||
|
|
||||||
|
.ant-advanced-search-form {
|
||||||
|
padding: 24px;
|
||||||
|
background: #fbfbfb;
|
||||||
|
border: 1px solid #d9d9d9;
|
||||||
|
border-radius: 6px;
|
||||||
|
}
|
||||||
|
|
||||||
|
.ant-advanced-search-form .ant-form-item {
|
||||||
|
display: flex;
|
||||||
|
}
|
||||||
|
|
||||||
|
.ant-advanced-search-form .ant-form-item-control-wrapper {
|
||||||
|
flex: 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
#components-form-topic-advanced-search .ant-form {
|
||||||
|
max-width: none;
|
||||||
|
margin-bottom: 1%;
|
||||||
|
}
|
||||||
|
|
||||||
|
#search-time-form-advanced-search .search-result-list {
|
||||||
|
margin-top: 16px;
|
||||||
|
border: 1px dashed #e9e9e9;
|
||||||
|
border-radius: 6px;
|
||||||
|
background-color: #fafafa;
|
||||||
|
min-height: 200px;
|
||||||
|
text-align: center;
|
||||||
|
padding-top: 80px;
|
||||||
|
}
|
||||||
|
.topic-select {
|
||||||
|
width: 400px !important;
|
||||||
|
}
|
||||||
|
|
||||||
|
.type-select {
|
||||||
|
width: 150px !important;
|
||||||
|
}
|
||||||
|
</style>
|
||||||
173
ui/src/views/message/SendMessage.vue
Normal file
173
ui/src/views/message/SendMessage.vue
Normal file
@@ -0,0 +1,173 @@
|
|||||||
|
<template>
|
||||||
|
<div class="content">
|
||||||
|
<a-spin :spinning="loading">
|
||||||
|
<a-form :form="form" @submit="handleSubmit">
|
||||||
|
<a-form-item label="Topic">
|
||||||
|
<a-select
|
||||||
|
class="topic-select"
|
||||||
|
@change="handleTopicChange"
|
||||||
|
show-search
|
||||||
|
option-filter-prop="children"
|
||||||
|
v-decorator="[
|
||||||
|
'topic',
|
||||||
|
{
|
||||||
|
rules: [{ required: true, message: '请选择一个topic!' }],
|
||||||
|
},
|
||||||
|
]"
|
||||||
|
placeholder="请选择一个topic"
|
||||||
|
>
|
||||||
|
<a-select-option v-for="v in topicList" :key="v" :value="v">
|
||||||
|
{{ v }}
|
||||||
|
</a-select-option>
|
||||||
|
</a-select>
|
||||||
|
</a-form-item>
|
||||||
|
<a-form-item label="分区">
|
||||||
|
<a-select
|
||||||
|
class="type-select"
|
||||||
|
show-search
|
||||||
|
option-filter-prop="children"
|
||||||
|
v-model="selectPartition"
|
||||||
|
placeholder="请选择一个分区"
|
||||||
|
>
|
||||||
|
<a-select-option v-for="v in partitions" :key="v" :value="v">
|
||||||
|
<span v-if="v == -1">默认</span> <span v-else>{{ v }}</span>
|
||||||
|
</a-select-option>
|
||||||
|
</a-select>
|
||||||
|
</a-form-item>
|
||||||
|
<a-form-item label="消息Key">
|
||||||
|
<a-input v-decorator="['key', { initialValue: 'key' }]" />
|
||||||
|
</a-form-item>
|
||||||
|
<a-form-item label="消息体" has-feedback>
|
||||||
|
<a-textarea
|
||||||
|
v-decorator="[
|
||||||
|
'body',
|
||||||
|
{
|
||||||
|
rules: [
|
||||||
|
{
|
||||||
|
required: true,
|
||||||
|
message: '输入消息体!',
|
||||||
|
},
|
||||||
|
],
|
||||||
|
},
|
||||||
|
]"
|
||||||
|
placeholder="输入消息体!"
|
||||||
|
/>
|
||||||
|
</a-form-item>
|
||||||
|
<a-form-item label="发送的消息数">
|
||||||
|
<a-input-number
|
||||||
|
v-decorator="[
|
||||||
|
'num',
|
||||||
|
{
|
||||||
|
initialValue: 1,
|
||||||
|
rules: [
|
||||||
|
{
|
||||||
|
required: true,
|
||||||
|
message: '输入消息数!',
|
||||||
|
},
|
||||||
|
],
|
||||||
|
},
|
||||||
|
]"
|
||||||
|
:min="1"
|
||||||
|
:max="32"
|
||||||
|
/>
|
||||||
|
</a-form-item>
|
||||||
|
<a-form-item :wrapper-col="{ span: 12, offset: 5 }">
|
||||||
|
<a-button type="primary" html-type="submit"> 提交 </a-button>
|
||||||
|
</a-form-item>
|
||||||
|
</a-form>
|
||||||
|
</a-spin>
|
||||||
|
</div>
|
||||||
|
</template>
|
||||||
|
|
||||||
|
<script>
|
||||||
|
import request from "@/utils/request";
|
||||||
|
import { KafkaTopicApi, KafkaMessageApi } from "@/utils/api";
|
||||||
|
import notification from "ant-design-vue/lib/notification";
|
||||||
|
export default {
|
||||||
|
name: "SendMessage",
|
||||||
|
components: {},
|
||||||
|
props: {
|
||||||
|
topicList: {
|
||||||
|
type: Array,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
data() {
|
||||||
|
return {
|
||||||
|
form: this.$form.createForm(this, { name: "message_send" }),
|
||||||
|
loading: false,
|
||||||
|
partitions: [],
|
||||||
|
selectPartition: undefined,
|
||||||
|
};
|
||||||
|
},
|
||||||
|
methods: {
|
||||||
|
getTopicNameList() {
|
||||||
|
request({
|
||||||
|
url: KafkaTopicApi.getTopicNameList.url,
|
||||||
|
method: KafkaTopicApi.getTopicNameList.method,
|
||||||
|
}).then((res) => {
|
||||||
|
if (res.code == 0) {
|
||||||
|
this.topicList = res.data;
|
||||||
|
} else {
|
||||||
|
notification.error({
|
||||||
|
message: "error",
|
||||||
|
description: res.msg,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
});
|
||||||
|
},
|
||||||
|
getPartitionInfo(topic) {
|
||||||
|
this.loading = true;
|
||||||
|
request({
|
||||||
|
url: KafkaTopicApi.getPartitionInfo.url + "?topic=" + topic,
|
||||||
|
method: KafkaTopicApi.getPartitionInfo.method,
|
||||||
|
}).then((res) => {
|
||||||
|
this.loading = false;
|
||||||
|
if (res.code != 0) {
|
||||||
|
notification.error({
|
||||||
|
message: "error",
|
||||||
|
description: res.msg,
|
||||||
|
});
|
||||||
|
} else {
|
||||||
|
this.partitions = res.data.map((v) => v.partition);
|
||||||
|
this.partitions.splice(0, 0, -1);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
},
|
||||||
|
handleTopicChange(topic) {
|
||||||
|
this.selectPartition = -1;
|
||||||
|
this.getPartitionInfo(topic);
|
||||||
|
},
|
||||||
|
handleSubmit(e) {
|
||||||
|
e.preventDefault();
|
||||||
|
this.form.validateFields((err, values) => {
|
||||||
|
if (!err) {
|
||||||
|
const param = Object.assign({}, values, {
|
||||||
|
partition: this.selectPartition,
|
||||||
|
});
|
||||||
|
this.loading = true;
|
||||||
|
request({
|
||||||
|
url: KafkaMessageApi.send.url,
|
||||||
|
method: KafkaMessageApi.send.method,
|
||||||
|
data: param,
|
||||||
|
}).then((res) => {
|
||||||
|
this.loading = false;
|
||||||
|
if (res.code == 0) {
|
||||||
|
this.$message.success(res.msg);
|
||||||
|
} else {
|
||||||
|
notification.error({
|
||||||
|
message: "error",
|
||||||
|
description: res.msg,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
});
|
||||||
|
},
|
||||||
|
},
|
||||||
|
created() {
|
||||||
|
this.getTopicNameList();
|
||||||
|
},
|
||||||
|
};
|
||||||
|
</script>
|
||||||
|
|
||||||
|
<style scoped></style>
|
||||||
@@ -49,7 +49,15 @@
|
|||||||
</a-button>
|
</a-button>
|
||||||
</a-popconfirm>
|
</a-popconfirm>
|
||||||
</div>
|
</div>
|
||||||
|
<p slot="expandedRowRender" slot-scope="record" style="margin: 0">
|
||||||
|
有效消息的时间范围:<span class="red-font">{{
|
||||||
|
formatTime(record.beginTime)
|
||||||
|
}}</span>
|
||||||
|
~
|
||||||
|
<span class="red-font">{{ formatTime(record.endTime) }}</span>
|
||||||
|
</p>
|
||||||
</a-table>
|
</a-table>
|
||||||
|
<p>友情提示:点击+号展开,可以查看当前分区的有效消息的时间范围</p>
|
||||||
</a-spin>
|
</a-spin>
|
||||||
</div>
|
</div>
|
||||||
</a-modal>
|
</a-modal>
|
||||||
@@ -59,6 +67,7 @@
|
|||||||
import request from "@/utils/request";
|
import request from "@/utils/request";
|
||||||
import { KafkaOpApi, KafkaTopicApi } from "@/utils/api";
|
import { KafkaOpApi, KafkaTopicApi } from "@/utils/api";
|
||||||
import notification from "ant-design-vue/es/notification";
|
import notification from "ant-design-vue/es/notification";
|
||||||
|
import moment from "moment";
|
||||||
export default {
|
export default {
|
||||||
name: "PartitionInfo",
|
name: "PartitionInfo",
|
||||||
props: {
|
props: {
|
||||||
@@ -128,6 +137,11 @@ export default {
|
|||||||
}
|
}
|
||||||
});
|
});
|
||||||
},
|
},
|
||||||
|
formatTime(timestamp) {
|
||||||
|
return timestamp != -1
|
||||||
|
? moment(timestamp).format("YYYY-MM-DD HH:mm:ss:SSS")
|
||||||
|
: timestamp;
|
||||||
|
},
|
||||||
},
|
},
|
||||||
};
|
};
|
||||||
|
|
||||||
@@ -169,6 +183,26 @@ const columns = [
|
|||||||
dataIndex: "diff",
|
dataIndex: "diff",
|
||||||
key: "diff",
|
key: "diff",
|
||||||
},
|
},
|
||||||
|
// {
|
||||||
|
// title: "有效消息起始时间",
|
||||||
|
// dataIndex: "beginTime",
|
||||||
|
// key: "beginTime",
|
||||||
|
// slots: { title: "beginTime" },
|
||||||
|
// scopedSlots: { customRender: "internal" },
|
||||||
|
// customRender: (text) => {
|
||||||
|
// return text != -1 ? moment(text).format("YYYY-MM-DD HH:mm:ss:SSS") : text;
|
||||||
|
// },
|
||||||
|
// },
|
||||||
|
// {
|
||||||
|
// title: "有效消息结束时间",
|
||||||
|
// dataIndex: "endTime",
|
||||||
|
// key: "endTime",
|
||||||
|
// slots: { title: "endTime" },
|
||||||
|
// scopedSlots: { customRender: "internal" },
|
||||||
|
// customRender: (text) => {
|
||||||
|
// return text != -1 ? moment(text).format("YYYY-MM-DD HH:mm:ss:SSS") : text;
|
||||||
|
// },
|
||||||
|
// },
|
||||||
{
|
{
|
||||||
title: "操作",
|
title: "操作",
|
||||||
key: "operation",
|
key: "operation",
|
||||||
@@ -177,4 +211,11 @@ const columns = [
|
|||||||
];
|
];
|
||||||
</script>
|
</script>
|
||||||
|
|
||||||
<style scoped></style>
|
<style scoped>
|
||||||
|
.red-font {
|
||||||
|
color: red;
|
||||||
|
}
|
||||||
|
.green-font {
|
||||||
|
color: green;
|
||||||
|
}
|
||||||
|
</style>
|
||||||
|
|||||||
Reference in New Issue
Block a user