Compare commits
15 Commits
v1.0.1
...
single-dat
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
b19c6200d2 | ||
|
|
5f6a06c100 | ||
|
|
5930e44fdf | ||
|
|
98f33bb2cc | ||
|
|
0ec3bac6c2 | ||
|
|
bd814d550d | ||
|
|
b9548d1640 | ||
|
|
57a41e087f | ||
|
|
54cd402810 | ||
|
|
c17b0aa4b9 | ||
|
|
8169ddb019 | ||
|
|
5f24c62855 | ||
|
|
3b21fc4cd8 | ||
|
|
d15ec4a2db | ||
|
|
12431db525 |
12
README.md
12
README.md
@@ -1,14 +1,16 @@
|
||||
# kafka可视化管理平台
|
||||
一款轻量级的kafka可视化管理平台,安装配置快捷、简单易用。
|
||||
为了开发的省事,没有多语言支持,只支持中文展示。
|
||||
为了开发的省事,没有国际化支持,只支持中文展示。
|
||||
用过rocketmq-console吧,对,前端展示风格跟那个有点类似。
|
||||
## 安装包下载
|
||||
* 点击下载:[kafka-console-ui.tar.gz](https://github.com/xxd763795151/kafka-console-ui/releases/download/v1.0.0/kafka-console-ui.tar.gz) 或 [kafka-console-ui.zip](https://github.com/xxd763795151/kafka-console-ui/releases/download/v1.0.0/kafka-console-ui.zip)
|
||||
* 参考下面的打包部署,下载源码重新打包(最新功能特性)
|
||||
以下两种方式2选一,直接下载安装包或下载源码,手动打包
|
||||
* 点击下载(v1.0.2版本):[kafka-console-ui.tar.gz](https://github.com/xxd763795151/kafka-console-ui/releases/download/v1.0.2/kafka-console-ui.tar.gz) 或 [kafka-console-ui.zip](https://github.com/xxd763795151/kafka-console-ui/releases/download/v1.0.2/kafka-console-ui.zip)
|
||||
* 参考下面的打包部署,下载源码重新打包(提交的最新功能特性)
|
||||
## 功能支持
|
||||
* 集群信息
|
||||
* Topic管理
|
||||
* 消费组管理
|
||||
* 消息管理
|
||||
* 基于SASL_SCRAM认证授权管理
|
||||
* 运维
|
||||

|
||||
@@ -83,4 +85,6 @@ sh bin/shutdown.sh
|
||||

|
||||

|
||||

|
||||

|
||||

|
||||
增加消息检索页面
|
||||

|
||||
Binary file not shown.
|
Before Width: | Height: | Size: 204 KiB After Width: | Height: | Size: 492 KiB |
BIN
document/消息.png
Normal file
BIN
document/消息.png
Normal file
Binary file not shown.
|
After Width: | Height: | Size: 400 KiB |
2
pom.xml
2
pom.xml
@@ -10,7 +10,7 @@
|
||||
</parent>
|
||||
<groupId>com.xuxd</groupId>
|
||||
<artifactId>kafka-console-ui</artifactId>
|
||||
<version>1.0.1</version>
|
||||
<version>1.0.2</version>
|
||||
<name>kafka-console-ui</name>
|
||||
<description>Kafka console manage ui</description>
|
||||
<properties>
|
||||
|
||||
27
src/main/java/com/xuxd/kafka/console/beans/QueryMessage.java
Normal file
27
src/main/java/com/xuxd/kafka/console/beans/QueryMessage.java
Normal file
@@ -0,0 +1,27 @@
|
||||
package com.xuxd.kafka.console.beans;
|
||||
|
||||
import lombok.Data;
|
||||
|
||||
/**
|
||||
* kafka-console-ui.
|
||||
*
|
||||
* @author xuxd
|
||||
* @date 2021-12-11 09:45:49
|
||||
**/
|
||||
@Data
|
||||
public class QueryMessage {
|
||||
|
||||
private String topic;
|
||||
|
||||
private int partition;
|
||||
|
||||
private long startTime;
|
||||
|
||||
private long endTime;
|
||||
|
||||
private long offset;
|
||||
|
||||
private String keyDeserializer;
|
||||
|
||||
private String valueDeserializer;
|
||||
}
|
||||
25
src/main/java/com/xuxd/kafka/console/beans/SendMessage.java
Normal file
25
src/main/java/com/xuxd/kafka/console/beans/SendMessage.java
Normal file
@@ -0,0 +1,25 @@
|
||||
package com.xuxd.kafka.console.beans;
|
||||
|
||||
import lombok.Data;
|
||||
|
||||
/**
|
||||
* kafka-console-ui.
|
||||
*
|
||||
* @author xuxd
|
||||
* @date 2021-12-19 23:28:31
|
||||
**/
|
||||
@Data
|
||||
public class SendMessage {
|
||||
|
||||
private String topic;
|
||||
|
||||
private int partition;
|
||||
|
||||
private String key;
|
||||
|
||||
private String body;
|
||||
|
||||
private int num;
|
||||
|
||||
private long offset;
|
||||
}
|
||||
@@ -0,0 +1,50 @@
|
||||
package com.xuxd.kafka.console.beans.dto;
|
||||
|
||||
import com.xuxd.kafka.console.beans.QueryMessage;
|
||||
import java.util.Date;
|
||||
import lombok.Data;
|
||||
|
||||
/**
|
||||
* kafka-console-ui.
|
||||
*
|
||||
* @author xuxd
|
||||
* @date 2021-12-11 09:17:59
|
||||
**/
|
||||
@Data
|
||||
public class QueryMessageDTO {
|
||||
|
||||
private String topic;
|
||||
|
||||
private int partition;
|
||||
|
||||
private Date startTime;
|
||||
|
||||
private Date endTime;
|
||||
|
||||
private Long offset;
|
||||
|
||||
private String keyDeserializer;
|
||||
|
||||
private String valueDeserializer;
|
||||
|
||||
public QueryMessage toQueryMessage() {
|
||||
QueryMessage queryMessage = new QueryMessage();
|
||||
queryMessage.setTopic(topic);
|
||||
queryMessage.setPartition(partition);
|
||||
if (startTime != null) {
|
||||
queryMessage.setStartTime(startTime.getTime());
|
||||
}
|
||||
if (endTime != null) {
|
||||
queryMessage.setEndTime(endTime.getTime());
|
||||
}
|
||||
|
||||
if (offset != null) {
|
||||
queryMessage.setOffset(offset);
|
||||
}
|
||||
|
||||
queryMessage.setKeyDeserializer(keyDeserializer);
|
||||
queryMessage.setValueDeserializer(valueDeserializer);
|
||||
|
||||
return queryMessage;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,32 @@
|
||||
package com.xuxd.kafka.console.beans.vo;
|
||||
|
||||
import lombok.Data;
|
||||
import org.apache.kafka.clients.consumer.ConsumerRecord;
|
||||
|
||||
/**
|
||||
* kafka-console-ui.
|
||||
*
|
||||
* @author xuxd
|
||||
* @date 2021-12-11 14:19:35
|
||||
**/
|
||||
@Data
|
||||
public class ConsumerRecordVO {
|
||||
|
||||
private String topic;
|
||||
|
||||
private int partition;
|
||||
|
||||
private long offset;
|
||||
|
||||
private long timestamp;
|
||||
|
||||
public static ConsumerRecordVO fromConsumerRecord(ConsumerRecord record) {
|
||||
ConsumerRecordVO vo = new ConsumerRecordVO();
|
||||
vo.setTopic(record.topic());
|
||||
vo.setPartition(record.partition());
|
||||
vo.setOffset(record.offset());
|
||||
vo.setTimestamp(record.timestamp());
|
||||
|
||||
return vo;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,47 @@
|
||||
package com.xuxd.kafka.console.beans.vo;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import lombok.Data;
|
||||
|
||||
/**
|
||||
* kafka-console-ui.
|
||||
*
|
||||
* @author xuxd
|
||||
* @date 2021-12-12 12:45:23
|
||||
**/
|
||||
@Data
|
||||
public class MessageDetailVO {
|
||||
|
||||
private String topic;
|
||||
|
||||
private int partition;
|
||||
|
||||
private long offset;
|
||||
|
||||
private long timestamp;
|
||||
|
||||
private String timestampType;
|
||||
|
||||
private List<HeaderVO> headers = new ArrayList<>();
|
||||
|
||||
private Object key;
|
||||
|
||||
private Object value;
|
||||
|
||||
private List<ConsumerVO> consumers;
|
||||
|
||||
@Data
|
||||
public static class HeaderVO {
|
||||
String key;
|
||||
|
||||
String value;
|
||||
}
|
||||
|
||||
@Data
|
||||
public static class ConsumerVO {
|
||||
String groupId;
|
||||
|
||||
String status;
|
||||
}
|
||||
}
|
||||
@@ -29,6 +29,10 @@ public class TopicPartitionVO {
|
||||
|
||||
private long diff;
|
||||
|
||||
private long beginTime;
|
||||
|
||||
private long endTime;
|
||||
|
||||
public static TopicPartitionVO from(TopicPartitionInfo partitionInfo) {
|
||||
TopicPartitionVO partitionVO = new TopicPartitionVO();
|
||||
partitionVO.setPartition(partitionInfo.partition());
|
||||
|
||||
@@ -5,6 +5,7 @@ import kafka.console.ConfigConsole;
|
||||
import kafka.console.ConsumerConsole;
|
||||
import kafka.console.KafkaAclConsole;
|
||||
import kafka.console.KafkaConfigConsole;
|
||||
import kafka.console.MessageConsole;
|
||||
import kafka.console.OperationConsole;
|
||||
import kafka.console.TopicConsole;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
@@ -54,4 +55,9 @@ public class KafkaConfiguration {
|
||||
ConsumerConsole consumerConsole) {
|
||||
return new OperationConsole(config, topicConsole, consumerConsole);
|
||||
}
|
||||
|
||||
@Bean
|
||||
public MessageConsole messageConsole(KafkaConfig config) {
|
||||
return new MessageConsole(config);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,55 @@
|
||||
package com.xuxd.kafka.console.controller;
|
||||
|
||||
import com.xuxd.kafka.console.beans.SendMessage;
|
||||
import com.xuxd.kafka.console.beans.dto.QueryMessageDTO;
|
||||
import com.xuxd.kafka.console.service.MessageService;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.web.bind.annotation.GetMapping;
|
||||
import org.springframework.web.bind.annotation.PostMapping;
|
||||
import org.springframework.web.bind.annotation.RequestBody;
|
||||
import org.springframework.web.bind.annotation.RequestMapping;
|
||||
import org.springframework.web.bind.annotation.RestController;
|
||||
|
||||
/**
|
||||
* kafka-console-ui.
|
||||
*
|
||||
* @author xuxd
|
||||
* @date 2021-12-11 09:22:19
|
||||
**/
|
||||
@RestController
|
||||
@RequestMapping("/message")
|
||||
public class MessageController {
|
||||
|
||||
@Autowired
|
||||
private MessageService messageService;
|
||||
|
||||
@PostMapping("/search/time")
|
||||
public Object searchByTime(@RequestBody QueryMessageDTO dto) {
|
||||
return messageService.searchByTime(dto.toQueryMessage());
|
||||
}
|
||||
|
||||
@PostMapping("/search/offset")
|
||||
public Object searchByOffset(@RequestBody QueryMessageDTO dto) {
|
||||
return messageService.searchByOffset(dto.toQueryMessage());
|
||||
}
|
||||
|
||||
@PostMapping("/search/detail")
|
||||
public Object searchDetail(@RequestBody QueryMessageDTO dto) {
|
||||
return messageService.searchDetail(dto.toQueryMessage());
|
||||
}
|
||||
|
||||
@GetMapping("/deserializer/list")
|
||||
public Object deserializerList() {
|
||||
return messageService.deserializerList();
|
||||
}
|
||||
|
||||
@PostMapping("/send")
|
||||
public Object send(@RequestBody SendMessage message) {
|
||||
return messageService.send(message);
|
||||
}
|
||||
|
||||
@PostMapping("/resend")
|
||||
public Object resend(@RequestBody SendMessage message) {
|
||||
return messageService.resend(message);
|
||||
}
|
||||
}
|
||||
@@ -38,4 +38,6 @@ public interface ConsumerService {
|
||||
ResponseData getTopicSubscribedByGroups(String topic);
|
||||
|
||||
ResponseData getOffsetPartition(String groupId);
|
||||
|
||||
ResponseData<Set<String>> getSubscribedGroups(String topic);
|
||||
}
|
||||
|
||||
@@ -0,0 +1,26 @@
|
||||
package com.xuxd.kafka.console.service;
|
||||
|
||||
import com.xuxd.kafka.console.beans.QueryMessage;
|
||||
import com.xuxd.kafka.console.beans.ResponseData;
|
||||
import com.xuxd.kafka.console.beans.SendMessage;
|
||||
|
||||
/**
|
||||
* kafka-console-ui.
|
||||
*
|
||||
* @author xuxd
|
||||
* @date 2021-12-11 09:43:26
|
||||
**/
|
||||
public interface MessageService {
|
||||
|
||||
ResponseData searchByTime(QueryMessage queryMessage);
|
||||
|
||||
ResponseData searchByOffset(QueryMessage queryMessage);
|
||||
|
||||
ResponseData searchDetail(QueryMessage queryMessage);
|
||||
|
||||
ResponseData deserializerList();
|
||||
|
||||
ResponseData send(SendMessage message);
|
||||
|
||||
ResponseData resend(SendMessage message);
|
||||
}
|
||||
@@ -15,6 +15,7 @@ import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
import java.util.stream.Collectors;
|
||||
import kafka.console.ConsumerConsole;
|
||||
import kafka.console.TopicConsole;
|
||||
@@ -48,6 +49,8 @@ public class ConsumerServiceImpl implements ConsumerService {
|
||||
@Autowired
|
||||
private TopicConsole topicConsole;
|
||||
|
||||
private ReentrantLock lock = new ReentrantLock();
|
||||
|
||||
@Override public ResponseData getConsumerGroupList(List<String> groupIds, Set<ConsumerGroupState> states) {
|
||||
String simulateGroup = "inner_xxx_not_exit_group_###" + System.currentTimeMillis();
|
||||
Set<String> groupList = new HashSet<>();
|
||||
@@ -167,25 +170,7 @@ public class ConsumerServiceImpl implements ConsumerService {
|
||||
}
|
||||
|
||||
@Override public ResponseData getTopicSubscribedByGroups(String topic) {
|
||||
if (topicSubscribedInfo.isNeedRefresh(topic)) {
|
||||
Set<String> groupIdList = consumerConsole.getConsumerGroupIdList(Collections.emptySet());
|
||||
Map<String, Set<String>> cache = new HashMap<>();
|
||||
Map<String, List<TopicPartition>> subscribeTopics = consumerConsole.listSubscribeTopics(groupIdList);
|
||||
|
||||
subscribeTopics.forEach((groupId, tl) -> {
|
||||
tl.forEach(topicPartition -> {
|
||||
String t = topicPartition.topic();
|
||||
if (!cache.containsKey(t)) {
|
||||
cache.put(t, new HashSet<>());
|
||||
}
|
||||
cache.get(t).add(groupId);
|
||||
});
|
||||
});
|
||||
|
||||
topicSubscribedInfo.refresh(cache);
|
||||
}
|
||||
|
||||
Set<String> groups = topicSubscribedInfo.getSubscribedGroups(topic);
|
||||
Set<String> groups = this.getSubscribedGroups(topic).getData();
|
||||
|
||||
Map<String, Object> res = new HashMap<>();
|
||||
Collection<ConsumerConsole.TopicPartitionConsumeInfo> consumerDetail = consumerConsole.getConsumerDetail(groups);
|
||||
@@ -212,6 +197,34 @@ public class ConsumerServiceImpl implements ConsumerService {
|
||||
return ResponseData.create().data(Utils.abs(groupId.hashCode()) % size);
|
||||
}
|
||||
|
||||
@Override public ResponseData<Set<String>> getSubscribedGroups(String topic) {
|
||||
if (topicSubscribedInfo.isNeedRefresh(topic) && !lock.isLocked()) {
|
||||
try {
|
||||
lock.lock();
|
||||
Set<String> groupIdList = consumerConsole.getConsumerGroupIdList(Collections.emptySet());
|
||||
Map<String, Set<String>> cache = new HashMap<>();
|
||||
Map<String, List<TopicPartition>> subscribeTopics = consumerConsole.listSubscribeTopics(groupIdList);
|
||||
|
||||
subscribeTopics.forEach((groupId, tl) -> {
|
||||
tl.forEach(topicPartition -> {
|
||||
String t = topicPartition.topic();
|
||||
if (!cache.containsKey(t)) {
|
||||
cache.put(t, new HashSet<>());
|
||||
}
|
||||
cache.get(t).add(groupId);
|
||||
});
|
||||
});
|
||||
|
||||
topicSubscribedInfo.refresh(cache);
|
||||
} finally {
|
||||
lock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
Set<String> groups = topicSubscribedInfo.getSubscribedGroups(topic);
|
||||
return ResponseData.create(Set.class).data(groups).success();
|
||||
}
|
||||
|
||||
class TopicSubscribedInfo {
|
||||
long lastTime = System.currentTimeMillis();
|
||||
|
||||
|
||||
@@ -0,0 +1,212 @@
|
||||
package com.xuxd.kafka.console.service.impl;
|
||||
|
||||
import com.xuxd.kafka.console.beans.QueryMessage;
|
||||
import com.xuxd.kafka.console.beans.ResponseData;
|
||||
import com.xuxd.kafka.console.beans.SendMessage;
|
||||
import com.xuxd.kafka.console.beans.vo.ConsumerRecordVO;
|
||||
import com.xuxd.kafka.console.beans.vo.MessageDetailVO;
|
||||
import com.xuxd.kafka.console.service.ConsumerService;
|
||||
import com.xuxd.kafka.console.service.MessageService;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.stream.Collectors;
|
||||
import kafka.console.ConsumerConsole;
|
||||
import kafka.console.MessageConsole;
|
||||
import kafka.console.TopicConsole;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.commons.collections.CollectionUtils;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.kafka.clients.admin.TopicDescription;
|
||||
import org.apache.kafka.clients.consumer.ConsumerRecord;
|
||||
import org.apache.kafka.clients.producer.ProducerRecord;
|
||||
import org.apache.kafka.common.TopicPartition;
|
||||
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
|
||||
import org.apache.kafka.common.serialization.BytesDeserializer;
|
||||
import org.apache.kafka.common.serialization.Deserializer;
|
||||
import org.apache.kafka.common.serialization.DoubleDeserializer;
|
||||
import org.apache.kafka.common.serialization.FloatDeserializer;
|
||||
import org.apache.kafka.common.serialization.IntegerDeserializer;
|
||||
import org.apache.kafka.common.serialization.StringDeserializer;
|
||||
import org.springframework.beans.BeansException;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.context.ApplicationContext;
|
||||
import org.springframework.context.ApplicationContextAware;
|
||||
import org.springframework.stereotype.Service;
|
||||
import scala.Tuple2;
|
||||
|
||||
/**
|
||||
* kafka-console-ui.
|
||||
*
|
||||
* @author xuxd
|
||||
* @date 2021-12-11 09:43:44
|
||||
**/
|
||||
@Slf4j
|
||||
@Service
|
||||
public class MessageServiceImpl implements MessageService, ApplicationContextAware {
|
||||
|
||||
@Autowired
|
||||
private MessageConsole messageConsole;
|
||||
|
||||
@Autowired
|
||||
private TopicConsole topicConsole;
|
||||
|
||||
@Autowired
|
||||
private ConsumerConsole consumerConsole;
|
||||
|
||||
private ApplicationContext applicationContext;
|
||||
|
||||
private Map<String, Deserializer> deserializerDict = new HashMap<>();
|
||||
|
||||
{
|
||||
deserializerDict.put("ByteArray", new ByteArrayDeserializer());
|
||||
deserializerDict.put("Integer", new IntegerDeserializer());
|
||||
deserializerDict.put("String", new StringDeserializer());
|
||||
deserializerDict.put("Float", new FloatDeserializer());
|
||||
deserializerDict.put("Double", new DoubleDeserializer());
|
||||
deserializerDict.put("Byte", new BytesDeserializer());
|
||||
}
|
||||
|
||||
public static String defaultDeserializer = "String";
|
||||
|
||||
@Override public ResponseData searchByTime(QueryMessage queryMessage) {
|
||||
int maxNums = 10000;
|
||||
|
||||
Set<TopicPartition> partitions = getPartitions(queryMessage);
|
||||
long startTime = System.currentTimeMillis();
|
||||
List<ConsumerRecord<byte[], byte[]>> records = messageConsole.searchBy(partitions, queryMessage.getStartTime(), queryMessage.getEndTime(), maxNums);
|
||||
log.info("search message by time, cost time: {}", (System.currentTimeMillis() - startTime));
|
||||
List<ConsumerRecordVO> vos = records.stream().filter(record -> record.timestamp() <= queryMessage.getEndTime())
|
||||
.map(ConsumerRecordVO::fromConsumerRecord).collect(Collectors.toList());
|
||||
Map<String, Object> res = new HashMap<>();
|
||||
res.put("maxNum", maxNums);
|
||||
res.put("realNum", vos.size());
|
||||
res.put("data", vos.subList(0, Math.min(maxNums, vos.size())));
|
||||
return ResponseData.create().data(res).success();
|
||||
}
|
||||
|
||||
@Override public ResponseData searchByOffset(QueryMessage queryMessage) {
|
||||
Map<TopicPartition, ConsumerRecord<byte[], byte[]>> recordMap = searchRecordByOffset(queryMessage);
|
||||
|
||||
return ResponseData.create().data(recordMap.values().stream().map(ConsumerRecordVO::fromConsumerRecord).collect(Collectors.toList())).success();
|
||||
}
|
||||
|
||||
@Override public ResponseData searchDetail(QueryMessage queryMessage) {
|
||||
if (queryMessage.getPartition() == -1) {
|
||||
throw new IllegalArgumentException();
|
||||
}
|
||||
if (StringUtils.isBlank(queryMessage.getKeyDeserializer())) {
|
||||
queryMessage.setKeyDeserializer(defaultDeserializer);
|
||||
}
|
||||
|
||||
if (StringUtils.isBlank(queryMessage.getValueDeserializer())) {
|
||||
queryMessage.setValueDeserializer(defaultDeserializer);
|
||||
}
|
||||
|
||||
Map<TopicPartition, ConsumerRecord<byte[], byte[]>> recordMap = searchRecordByOffset(queryMessage);
|
||||
ConsumerRecord<byte[], byte[]> record = recordMap.get(new TopicPartition(queryMessage.getTopic(), queryMessage.getPartition()));
|
||||
if (record != null) {
|
||||
MessageDetailVO vo = new MessageDetailVO();
|
||||
vo.setTopic(record.topic());
|
||||
vo.setPartition(record.partition());
|
||||
vo.setOffset(record.offset());
|
||||
vo.setTimestamp(record.timestamp());
|
||||
vo.setTimestampType(record.timestampType().name());
|
||||
try {
|
||||
vo.setKey(deserializerDict.get(queryMessage.getKeyDeserializer()).deserialize(queryMessage.getTopic(), record.key()));
|
||||
} catch (Exception e) {
|
||||
vo.setKey("KeyDeserializer Error: " + e.getMessage());
|
||||
}
|
||||
try {
|
||||
vo.setValue(deserializerDict.get(queryMessage.getValueDeserializer()).deserialize(queryMessage.getTopic(), record.value()));
|
||||
} catch (Exception e) {
|
||||
vo.setValue("ValueDeserializer Error: " + e.getMessage());
|
||||
}
|
||||
|
||||
record.headers().forEach(header -> {
|
||||
MessageDetailVO.HeaderVO headerVO = new MessageDetailVO.HeaderVO();
|
||||
headerVO.setKey(header.key());
|
||||
headerVO.setValue(new String(header.value()));
|
||||
vo.getHeaders().add(headerVO);
|
||||
});
|
||||
|
||||
// 为了尽量保持代码好看,不直接注入另一个service层的实现类了
|
||||
Set<String> groupIds = applicationContext.getBean(ConsumerService.class).getSubscribedGroups(record.topic()).getData();
|
||||
Collection<ConsumerConsole.TopicPartitionConsumeInfo> consumerDetail = consumerConsole.getConsumerDetail(groupIds);
|
||||
|
||||
List<MessageDetailVO.ConsumerVO> consumerVOS = new LinkedList<>();
|
||||
consumerDetail.forEach(consumerInfo -> {
|
||||
if (consumerInfo.topicPartition().equals(new TopicPartition(record.topic(), record.partition()))) {
|
||||
MessageDetailVO.ConsumerVO consumerVO = new MessageDetailVO.ConsumerVO();
|
||||
consumerVO.setGroupId(consumerInfo.getGroupId());
|
||||
consumerVO.setStatus(consumerInfo.getConsumerOffset() <= record.offset() ? "unconsume" : "consumed");
|
||||
consumerVOS.add(consumerVO);
|
||||
}
|
||||
});
|
||||
|
||||
vo.setConsumers(consumerVOS);
|
||||
return ResponseData.create().data(vo).success();
|
||||
}
|
||||
return ResponseData.create().failed("Not found message detail.");
|
||||
}
|
||||
|
||||
@Override public ResponseData deserializerList() {
|
||||
return ResponseData.create().data(deserializerDict.keySet()).success();
|
||||
}
|
||||
|
||||
@Override public ResponseData send(SendMessage message) {
|
||||
messageConsole.send(message.getTopic(), message.getPartition(), message.getKey(), message.getBody(), message.getNum());
|
||||
return ResponseData.create().success();
|
||||
}
|
||||
|
||||
@Override public ResponseData resend(SendMessage message) {
|
||||
TopicPartition partition = new TopicPartition(message.getTopic(), message.getPartition());
|
||||
Map<TopicPartition, Object> offsetTable = new HashMap<>(1, 1.0f);
|
||||
offsetTable.put(partition, message.getOffset());
|
||||
Map<TopicPartition, ConsumerRecord<byte[], byte[]>> recordMap = messageConsole.searchBy(offsetTable);
|
||||
if (recordMap.isEmpty()) {
|
||||
return ResponseData.create().failed("Get message failed.");
|
||||
}
|
||||
ConsumerRecord<byte[], byte[]> record = recordMap.get(partition);
|
||||
ProducerRecord<byte[], byte[]> producerRecord = new ProducerRecord<>(record.topic(), record.partition(), record.key(), record.value(), record.headers());
|
||||
Tuple2<Object, String> tuple2 = messageConsole.sendSync(producerRecord);
|
||||
boolean success = (boolean) tuple2._1();
|
||||
return success ? ResponseData.create().success("success: " + tuple2._2()) : ResponseData.create().failed(tuple2._2());
|
||||
}
|
||||
|
||||
private Map<TopicPartition, ConsumerRecord<byte[], byte[]>> searchRecordByOffset(QueryMessage queryMessage) {
|
||||
Set<TopicPartition> partitions = getPartitions(queryMessage);
|
||||
|
||||
Map<TopicPartition, Object> offsetTable = new HashMap<>();
|
||||
partitions.forEach(tp -> {
|
||||
offsetTable.put(tp, queryMessage.getOffset());
|
||||
});
|
||||
Map<TopicPartition, ConsumerRecord<byte[], byte[]>> recordMap = messageConsole.searchBy(offsetTable);
|
||||
return recordMap;
|
||||
}
|
||||
|
||||
private Set<TopicPartition> getPartitions(QueryMessage queryMessage) {
|
||||
Set<TopicPartition> partitions = new HashSet<>();
|
||||
if (queryMessage.getPartition() != -1) {
|
||||
partitions.add(new TopicPartition(queryMessage.getTopic(), queryMessage.getPartition()));
|
||||
} else {
|
||||
List<TopicDescription> list = topicConsole.getTopicList(Collections.singleton(queryMessage.getTopic()));
|
||||
if (CollectionUtils.isEmpty(list)) {
|
||||
throw new IllegalArgumentException("Can not find topic info.");
|
||||
}
|
||||
Set<TopicPartition> set = list.get(0).partitions().stream()
|
||||
.map(tp -> new TopicPartition(queryMessage.getTopic(), tp.partition())).collect(Collectors.toSet());
|
||||
partitions.addAll(set);
|
||||
}
|
||||
return partitions;
|
||||
}
|
||||
|
||||
@Override public void setApplicationContext(ApplicationContext context) throws BeansException {
|
||||
this.applicationContext = context;
|
||||
}
|
||||
}
|
||||
@@ -19,12 +19,14 @@ import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.stream.Collectors;
|
||||
import kafka.console.MessageConsole;
|
||||
import kafka.console.TopicConsole;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.kafka.clients.admin.NewPartitions;
|
||||
import org.apache.kafka.clients.admin.NewTopic;
|
||||
import org.apache.kafka.clients.admin.TopicDescription;
|
||||
import org.apache.kafka.clients.consumer.ConsumerRecord;
|
||||
import org.apache.kafka.common.TopicPartition;
|
||||
import org.apache.kafka.common.TopicPartitionInfo;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
@@ -44,6 +46,9 @@ public class TopicServiceImpl implements TopicService {
|
||||
@Autowired
|
||||
private TopicConsole topicConsole;
|
||||
|
||||
@Autowired
|
||||
private MessageConsole messageConsole;
|
||||
|
||||
private Gson gson = GsonUtil.INSTANCE.get();
|
||||
|
||||
@Override public ResponseData getTopicNameList(boolean internal) {
|
||||
@@ -106,6 +111,10 @@ public class TopicServiceImpl implements TopicService {
|
||||
mapTuple2._2().forEach((k, v) -> {
|
||||
endTable.put(k.partition(), (Long) v);
|
||||
});
|
||||
// computer the valid time range.
|
||||
Map<TopicPartition, Object> beginOffsetTable = new HashMap<>();
|
||||
Map<TopicPartition, Object> endOffsetTable = new HashMap<>();
|
||||
Map<Integer, TopicPartition> partitionCache = new HashMap<>();
|
||||
|
||||
for (TopicPartitionVO partitionVO : voList) {
|
||||
long begin = beginTable.get(partitionVO.getPartition());
|
||||
@@ -113,7 +122,29 @@ public class TopicServiceImpl implements TopicService {
|
||||
partitionVO.setBeginOffset(begin);
|
||||
partitionVO.setEndOffset(end);
|
||||
partitionVO.setDiff(end - begin);
|
||||
|
||||
if (begin != end) {
|
||||
TopicPartition partition = new TopicPartition(topic, partitionVO.getPartition());
|
||||
partitionCache.put(partitionVO.getPartition(), partition);
|
||||
beginOffsetTable.put(partition, begin);
|
||||
endOffsetTable.put(partition, end - 1); // end must < endOff
|
||||
} else {
|
||||
partitionVO.setBeginTime(-1L);
|
||||
partitionVO.setEndTime(-1L);
|
||||
}
|
||||
}
|
||||
|
||||
Map<TopicPartition, ConsumerRecord<byte[], byte[]>> beginRecordMap = messageConsole.searchBy(beginOffsetTable);
|
||||
Map<TopicPartition, ConsumerRecord<byte[], byte[]>> endRecordMap = messageConsole.searchBy(endOffsetTable);
|
||||
|
||||
for (TopicPartitionVO partitionVO : voList) {
|
||||
if (partitionVO.getBeginTime() != -1L) {
|
||||
TopicPartition partition = partitionCache.get(partitionVO.getPartition());
|
||||
partitionVO.setBeginTime(beginRecordMap.containsKey(partition) ? beginRecordMap.get(partition).timestamp() : -1L);
|
||||
partitionVO.setEndTime(endRecordMap.containsKey(partition) ? endRecordMap.get(partition).timestamp() : -1L);
|
||||
}
|
||||
}
|
||||
|
||||
return ResponseData.create().data(voList).success();
|
||||
}
|
||||
|
||||
|
||||
@@ -5,10 +5,11 @@ import kafka.zk.{AdminZkClient, KafkaZkClient}
|
||||
import org.apache.kafka.clients.CommonClientConfigs
|
||||
import org.apache.kafka.clients.admin._
|
||||
import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer, OffsetAndMetadata}
|
||||
import org.apache.kafka.clients.producer.KafkaProducer
|
||||
import org.apache.kafka.common.TopicPartition
|
||||
import org.apache.kafka.common.config.SaslConfigs
|
||||
import org.apache.kafka.common.requests.ListOffsetsResponse
|
||||
import org.apache.kafka.common.serialization.ByteArrayDeserializer
|
||||
import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySerializer, StringSerializer}
|
||||
import org.apache.kafka.common.utils.Time
|
||||
import org.slf4j.{Logger, LoggerFactory}
|
||||
|
||||
@@ -60,6 +61,38 @@ class KafkaConsole(config: KafkaConfig) {
|
||||
}
|
||||
}
|
||||
|
||||
protected def withProducerAndCatchError(f: KafkaProducer[String, String] => Any, eh: Exception => Any,
|
||||
extra: Properties = new Properties()): Any = {
|
||||
val props = getProps()
|
||||
props.putAll(extra)
|
||||
props.put(ConsumerConfig.CLIENT_ID_CONFIG, String.valueOf(System.currentTimeMillis()))
|
||||
val producer = new KafkaProducer[String, String](props, new StringSerializer, new StringSerializer)
|
||||
try {
|
||||
f(producer)
|
||||
} catch {
|
||||
case er: Exception => eh(er)
|
||||
}
|
||||
finally {
|
||||
producer.close()
|
||||
}
|
||||
}
|
||||
|
||||
protected def withByteProducerAndCatchError(f: KafkaProducer[Array[Byte], Array[Byte]] => Any, eh: Exception => Any,
|
||||
extra: Properties = new Properties()): Any = {
|
||||
val props = getProps()
|
||||
props.putAll(extra)
|
||||
props.put(ConsumerConfig.CLIENT_ID_CONFIG, String.valueOf(System.currentTimeMillis()))
|
||||
val producer = new KafkaProducer[Array[Byte], Array[Byte]](props, new ByteArraySerializer, new ByteArraySerializer)
|
||||
try {
|
||||
f(producer)
|
||||
} catch {
|
||||
case er: Exception => eh(er)
|
||||
}
|
||||
finally {
|
||||
producer.close()
|
||||
}
|
||||
}
|
||||
|
||||
protected def withZKClient(f: AdminZkClient => Any): Any = {
|
||||
val zkClient = KafkaZkClient(config.getZookeeperAddr, false, 30000, 30000, Int.MaxValue, Time.SYSTEM)
|
||||
val adminZkClient = new AdminZkClient(zkClient)
|
||||
|
||||
196
src/main/scala/kafka/console/MessageConsole.scala
Normal file
196
src/main/scala/kafka/console/MessageConsole.scala
Normal file
@@ -0,0 +1,196 @@
|
||||
package kafka.console
|
||||
|
||||
import com.xuxd.kafka.console.config.KafkaConfig
|
||||
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
|
||||
import org.apache.kafka.clients.producer.ProducerRecord
|
||||
import org.apache.kafka.common.TopicPartition
|
||||
|
||||
import java.time.Duration
|
||||
import java.util
|
||||
import java.util.Properties
|
||||
import scala.collection.immutable
|
||||
import scala.jdk.CollectionConverters.{CollectionHasAsScala, MapHasAsScala, SeqHasAsJava}
|
||||
|
||||
/**
|
||||
* kafka-console-ui.
|
||||
*
|
||||
* @author xuxd
|
||||
* @date 2021-12-11 09:39:40
|
||||
* */
|
||||
class MessageConsole(config: KafkaConfig) extends KafkaConsole(config: KafkaConfig) with Logging {
|
||||
|
||||
def searchBy(partitions: util.Collection[TopicPartition], startTime: Long, endTime: Long,
|
||||
maxNums: Int): util.List[ConsumerRecord[Array[Byte], Array[Byte]]] = {
|
||||
var startOffTable: immutable.Map[TopicPartition, Long] = Map.empty
|
||||
var endOffTable: immutable.Map[TopicPartition, Long] = Map.empty
|
||||
withAdminClientAndCatchError(admin => {
|
||||
val startTable = KafkaConsole.getLogTimestampOffsets(admin, partitions.asScala.toSeq, startTime, timeoutMs)
|
||||
startOffTable = startTable.map(t2 => (t2._1, t2._2.offset())).toMap
|
||||
|
||||
endOffTable = KafkaConsole.getLogTimestampOffsets(admin, partitions.asScala.toSeq, endTime, timeoutMs)
|
||||
.map(t2 => (t2._1, t2._2.offset())).toMap
|
||||
}, e => {
|
||||
log.error("getLogTimestampOffsets error.", e)
|
||||
throw new RuntimeException("getLogTimestampOffsets error", e)
|
||||
})
|
||||
var terminate: Boolean = (startOffTable == endOffTable)
|
||||
val res = new util.LinkedList[ConsumerRecord[Array[Byte], Array[Byte]]]()
|
||||
// 如果最小和最大偏移一致,就结束
|
||||
if (!terminate) {
|
||||
|
||||
val arrive = new util.HashSet[TopicPartition](partitions)
|
||||
val props = new Properties()
|
||||
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
|
||||
withConsumerAndCatchError(consumer => {
|
||||
consumer.assign(partitions)
|
||||
for ((tp, off) <- startOffTable) {
|
||||
consumer.seek(tp, off)
|
||||
}
|
||||
|
||||
// 终止条件
|
||||
// 1.所有查询分区达都到最大偏移的时候
|
||||
while (!terminate) {
|
||||
// 达到查询的最大条数
|
||||
if (res.size() >= maxNums) {
|
||||
terminate = true
|
||||
} else {
|
||||
val records = consumer.poll(Duration.ofMillis(timeoutMs))
|
||||
|
||||
if (records.isEmpty) {
|
||||
terminate = true
|
||||
} else {
|
||||
for ((tp, endOff) <- endOffTable) {
|
||||
if (!terminate) {
|
||||
var recordList = records.records(tp)
|
||||
if (!recordList.isEmpty) {
|
||||
val first = recordList.get(0)
|
||||
if (first.offset() >= endOff) {
|
||||
arrive.remove(tp)
|
||||
} else {
|
||||
//
|
||||
// (String topic,
|
||||
// int partition,
|
||||
// long offset,
|
||||
// long timestamp,
|
||||
// TimestampType timestampType,
|
||||
// Long checksum,
|
||||
// int serializedKeySize,
|
||||
// int serializedValueSize,
|
||||
// K key,
|
||||
// V value,
|
||||
// Headers headers,
|
||||
// Optional<Integer> leaderEpoch)
|
||||
val nullVList = recordList.asScala.map(record => new ConsumerRecord[Array[Byte], Array[Byte]](record.topic(),
|
||||
record.partition(),
|
||||
record.offset(),
|
||||
record.timestamp(),
|
||||
record.timestampType(),
|
||||
record.checksum(),
|
||||
record.serializedKeySize(),
|
||||
record.serializedValueSize(),
|
||||
record.key(),
|
||||
null,
|
||||
record.headers(),
|
||||
record.leaderEpoch())).toSeq.asJava
|
||||
res.addAll(nullVList)
|
||||
if (recordList.get(recordList.size() - 1).offset() >= endOff) {
|
||||
arrive.remove(tp)
|
||||
}
|
||||
if (recordList != null) {
|
||||
recordList = null
|
||||
}
|
||||
}
|
||||
}
|
||||
if (arrive.isEmpty) {
|
||||
terminate = true
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}, e => {
|
||||
log.error("searchBy time error.", e)
|
||||
})
|
||||
}
|
||||
|
||||
res
|
||||
}
|
||||
|
||||
def searchBy(
|
||||
tp2o: util.Map[TopicPartition, Long]): util.Map[TopicPartition, ConsumerRecord[Array[Byte], Array[Byte]]] = {
|
||||
val props = new Properties()
|
||||
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
|
||||
val res = new util.HashMap[TopicPartition, ConsumerRecord[Array[Byte], Array[Byte]]]()
|
||||
withConsumerAndCatchError(consumer => {
|
||||
var tpSet = tp2o.keySet()
|
||||
val tpSetCopy = new util.HashSet[TopicPartition](tpSet)
|
||||
val endOffsets = consumer.endOffsets(tpSet)
|
||||
val beginOffsets = consumer.beginningOffsets(tpSet)
|
||||
for ((tp, off) <- tp2o.asScala) {
|
||||
val endOff = endOffsets.get(tp)
|
||||
// if (endOff <= off) {
|
||||
// consumer.seek(tp, endOff)
|
||||
// tpSetCopy.remove(tp)
|
||||
// } else {
|
||||
// consumer.seek(tp, off)
|
||||
// }
|
||||
val beginOff = beginOffsets.get(tp)
|
||||
if (off < beginOff || off >= endOff) {
|
||||
tpSetCopy.remove(tp)
|
||||
}
|
||||
}
|
||||
|
||||
tpSet = tpSetCopy
|
||||
consumer.assign(tpSet)
|
||||
tpSet.asScala.foreach(tp => {
|
||||
consumer.seek(tp, tp2o.get(tp))
|
||||
})
|
||||
|
||||
var terminate = tpSet.isEmpty
|
||||
while (!terminate) {
|
||||
val records = consumer.poll(Duration.ofMillis(timeoutMs))
|
||||
val tps = new util.HashSet(tpSet).asScala
|
||||
for (tp <- tps) {
|
||||
if (!res.containsKey(tp)) {
|
||||
val recordList = records.records(tp)
|
||||
if (!recordList.isEmpty) {
|
||||
val record = recordList.get(0)
|
||||
res.put(tp, record)
|
||||
tpSet.remove(tp)
|
||||
}
|
||||
}
|
||||
if (tpSet.isEmpty) {
|
||||
terminate = true
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}, e => {
|
||||
log.error("searchBy offset error.", e)
|
||||
})
|
||||
res
|
||||
}
|
||||
|
||||
def send(topic: String, partition: Int, key: String, value: String, num: Int): Unit = {
|
||||
withProducerAndCatchError(producer => {
|
||||
val nullKey = if (key != null && key.trim().length() == 0) null else key
|
||||
for (a <- 1 to num) {
|
||||
val record = if (partition != -1) new ProducerRecord[String, String](topic, partition, nullKey, value)
|
||||
else new ProducerRecord[String, String](topic, nullKey, value)
|
||||
producer.send(record)
|
||||
}
|
||||
}, e => log.error("send error.", e))
|
||||
|
||||
}
|
||||
|
||||
def sendSync(record: ProducerRecord[Array[Byte], Array[Byte]]): (Boolean, String) = {
|
||||
withByteProducerAndCatchError(producer => {
|
||||
val metadata = producer.send(record).get()
|
||||
(true, metadata.toString())
|
||||
}, e => {
|
||||
log.error("send error.", e)
|
||||
(false, e.getMessage)
|
||||
}).asInstanceOf[(Boolean, String)]
|
||||
}
|
||||
}
|
||||
@@ -1,6 +1,7 @@
|
||||
<template>
|
||||
<div id="app">
|
||||
<div id="nav">
|
||||
<h2 class="logo">Kafka 控制台</h2>
|
||||
<router-link to="/" class="pad-l-r">主页</router-link>
|
||||
<span>|</span
|
||||
><router-link to="/cluster-page" class="pad-l-r">集群</router-link>
|
||||
@@ -8,6 +9,8 @@
|
||||
><router-link to="/topic-page" class="pad-l-r">Topic</router-link>
|
||||
<span>|</span
|
||||
><router-link to="/group-page" class="pad-l-r">消费组</router-link>
|
||||
<span>|</span
|
||||
><router-link to="/message-page" class="pad-l-r">消息</router-link>
|
||||
<span v-show="config.enableAcl">|</span
|
||||
><router-link to="/acl-page" class="pad-l-r" v-show="config.enableAcl"
|
||||
>Acl</router-link
|
||||
@@ -44,7 +47,6 @@ export default {
|
||||
font-family: Avenir, Helvetica, Arial, sans-serif;
|
||||
-webkit-font-smoothing: antialiased;
|
||||
-moz-osx-font-smoothing: grayscale;
|
||||
text-align: center;
|
||||
color: #2c3e50;
|
||||
}
|
||||
|
||||
@@ -59,6 +61,7 @@ export default {
|
||||
padding-top: 1%;
|
||||
padding-bottom: 1%;
|
||||
margin-bottom: 1%;
|
||||
text-align: center;
|
||||
}
|
||||
|
||||
#nav a {
|
||||
@@ -81,4 +84,10 @@ export default {
|
||||
height: 90%;
|
||||
width: 100%;
|
||||
}
|
||||
.logo {
|
||||
float: left;
|
||||
left: 1%;
|
||||
top: 1%;
|
||||
position: absolute;
|
||||
}
|
||||
</style>
|
||||
|
||||
@@ -43,6 +43,12 @@ const routes = [
|
||||
component: () =>
|
||||
import(/* webpackChunkName: "cluster" */ "../views/cluster/Cluster.vue"),
|
||||
},
|
||||
{
|
||||
path: "/message-page",
|
||||
name: "Message",
|
||||
component: () =>
|
||||
import(/* webpackChunkName: "cluster" */ "../views/message/Message.vue"),
|
||||
},
|
||||
];
|
||||
|
||||
const router = new VueRouter({
|
||||
|
||||
@@ -223,3 +223,29 @@ export const KafkaOpApi = {
|
||||
method: "delete",
|
||||
},
|
||||
};
|
||||
export const KafkaMessageApi = {
|
||||
searchByTime: {
|
||||
url: "/message/search/time",
|
||||
method: "post",
|
||||
},
|
||||
searchByOffset: {
|
||||
url: "/message/search/offset",
|
||||
method: "post",
|
||||
},
|
||||
searchDetail: {
|
||||
url: "/message/search/detail",
|
||||
method: "post",
|
||||
},
|
||||
deserializerList: {
|
||||
url: "/message/deserializer/list",
|
||||
method: "get",
|
||||
},
|
||||
send: {
|
||||
url: "/message/send",
|
||||
method: "post",
|
||||
},
|
||||
resend: {
|
||||
url: "/message/resend",
|
||||
method: "post",
|
||||
},
|
||||
};
|
||||
|
||||
@@ -6,7 +6,7 @@ import { VueAxios } from "./axios";
|
||||
const request = axios.create({
|
||||
// API 请求的默认前缀
|
||||
baseURL: process.env.VUE_APP_API_BASE_URL,
|
||||
timeout: 30000, // 请求超时时间
|
||||
timeout: 120000, // 请求超时时间
|
||||
});
|
||||
|
||||
// 异常拦截处理器
|
||||
|
||||
58
ui/src/views/message/Message.vue
Normal file
58
ui/src/views/message/Message.vue
Normal file
@@ -0,0 +1,58 @@
|
||||
<template>
|
||||
<div class="content">
|
||||
<a-spin :spinning="loading">
|
||||
<a-tabs default-active-key="1" size="large" tabPosition="top">
|
||||
<a-tab-pane key="1" tab="根据时间查询消息">
|
||||
<SearchByTime :topic-list="topicList"></SearchByTime>
|
||||
</a-tab-pane>
|
||||
<a-tab-pane key="2" tab="根据偏移查询消息">
|
||||
<SearchByOffset :topic-list="topicList"></SearchByOffset>
|
||||
</a-tab-pane>
|
||||
<a-tab-pane key="3" tab="在线发送">
|
||||
<SendMessage :topic-list="topicList"></SendMessage>
|
||||
</a-tab-pane>
|
||||
</a-tabs>
|
||||
</a-spin>
|
||||
</div>
|
||||
</template>
|
||||
|
||||
<script>
|
||||
import SearchByTime from "@/views/message/SearchByTime";
|
||||
import SearchByOffset from "@/views/message/SearchByOffset";
|
||||
import request from "@/utils/request";
|
||||
import { KafkaTopicApi } from "@/utils/api";
|
||||
import notification from "ant-design-vue/lib/notification";
|
||||
import SendMessage from "@/views/message/SendMessage";
|
||||
export default {
|
||||
name: "Message",
|
||||
components: { SearchByTime, SearchByOffset, SendMessage },
|
||||
data() {
|
||||
return {
|
||||
loading: false,
|
||||
topicList: [],
|
||||
};
|
||||
},
|
||||
methods: {
|
||||
getTopicNameList() {
|
||||
request({
|
||||
url: KafkaTopicApi.getTopicNameList.url,
|
||||
method: KafkaTopicApi.getTopicNameList.method,
|
||||
}).then((res) => {
|
||||
if (res.code == 0) {
|
||||
this.topicList = res.data;
|
||||
} else {
|
||||
notification.error({
|
||||
message: "error",
|
||||
description: res.msg,
|
||||
});
|
||||
}
|
||||
});
|
||||
},
|
||||
},
|
||||
created() {
|
||||
this.getTopicNameList();
|
||||
},
|
||||
};
|
||||
</script>
|
||||
|
||||
<style scoped></style>
|
||||
265
ui/src/views/message/MessageDetail.vue
Normal file
265
ui/src/views/message/MessageDetail.vue
Normal file
@@ -0,0 +1,265 @@
|
||||
<template>
|
||||
<a-modal
|
||||
title="消息详情"
|
||||
:visible="show"
|
||||
:width="800"
|
||||
:mask="false"
|
||||
:destroyOnClose="true"
|
||||
:footer="null"
|
||||
:maskClosable="false"
|
||||
@cancel="handleCancel"
|
||||
>
|
||||
<div>
|
||||
<a-spin :spinning="loading">
|
||||
<div>
|
||||
<h4>消息信息</h4>
|
||||
<hr />
|
||||
<div class="message-detail" id="message-detail">
|
||||
<p>
|
||||
<label class="title">Topic: </label>
|
||||
<span class="m-info">{{ data.topic }}</span>
|
||||
</p>
|
||||
<p>
|
||||
<label class="title">分区: </label>
|
||||
<span class="m-info">{{ data.partition }}</span>
|
||||
</p>
|
||||
<p>
|
||||
<label class="title">偏移: </label>
|
||||
<span class="m-info">{{ data.offset }}</span>
|
||||
</p>
|
||||
<p>
|
||||
<label class="title">消息头: </label>
|
||||
<span class="m-info">{{ data.headers }}</span>
|
||||
</p>
|
||||
<p>
|
||||
<label class="title">时间类型: </label>
|
||||
<span class="m-info"
|
||||
>{{
|
||||
data.timestampType
|
||||
}}(表示下面的时间是哪种类型:消息创建、写入日志亦或其它)</span
|
||||
>
|
||||
</p>
|
||||
<p>
|
||||
<label class="title">时间: </label>
|
||||
<span class="m-info">{{ formatTime(data.timestamp) }}</span>
|
||||
</p>
|
||||
<p>
|
||||
<label class="title">Key反序列化: </label>
|
||||
<a-select
|
||||
style="width: 120px"
|
||||
v-model="keyDeserializer"
|
||||
@change="keyDeserializerChange"
|
||||
>
|
||||
<a-select-option
|
||||
v-for="v in deserializerList"
|
||||
:key="v"
|
||||
:value="v"
|
||||
>
|
||||
{{ v }}
|
||||
</a-select-option>
|
||||
</a-select>
|
||||
<span>选一个合适反序列化器,要不可能乱码了</span>
|
||||
</p>
|
||||
<p>
|
||||
<label class="title">Key: </label>
|
||||
<span class="m-info">{{ data.key }}</span>
|
||||
</p>
|
||||
<p>
|
||||
<label class="title">消息体反序列化: </label>
|
||||
<a-select
|
||||
v-model="valueDeserializer"
|
||||
style="width: 120px"
|
||||
@change="valueDeserializerChange"
|
||||
>
|
||||
<a-select-option
|
||||
v-for="v in deserializerList"
|
||||
:key="v"
|
||||
:value="v"
|
||||
>
|
||||
{{ v }}
|
||||
</a-select-option>
|
||||
</a-select>
|
||||
<span>选一个合适反序列化器,要不可能乱码了</span>
|
||||
</p>
|
||||
<p>
|
||||
<label class="title">消息体: </label>
|
||||
<a-textarea
|
||||
type="textarea"
|
||||
:value="data.value"
|
||||
:rows="5"
|
||||
:read-only="true"
|
||||
></a-textarea>
|
||||
</p>
|
||||
</div>
|
||||
</div>
|
||||
<div>
|
||||
<h4>消费信息</h4>
|
||||
<hr />
|
||||
<a-table
|
||||
:columns="columns"
|
||||
:data-source="data.consumers"
|
||||
bordered
|
||||
row-key="groupId"
|
||||
>
|
||||
<div slot="status" slot-scope="text">
|
||||
<span v-if="text == 'consumed'">已消费</span
|
||||
><span v-else style="color: red">未消费</span>
|
||||
</div>
|
||||
</a-table>
|
||||
</div>
|
||||
<div>
|
||||
<h4>操作</h4>
|
||||
<hr />
|
||||
<a-popconfirm
|
||||
title="确定将当前这条消息重新发回broker?"
|
||||
ok-text="确认"
|
||||
cancel-text="取消"
|
||||
@confirm="resend"
|
||||
>
|
||||
<a-button type="primary" icon="reload"> 重新发送 </a-button>
|
||||
</a-popconfirm>
|
||||
</div>
|
||||
</a-spin>
|
||||
</div>
|
||||
</a-modal>
|
||||
</template>
|
||||
|
||||
<script>
|
||||
import request from "@/utils/request";
|
||||
import { KafkaMessageApi } from "@/utils/api";
|
||||
import notification from "ant-design-vue/lib/notification";
|
||||
import moment from "moment";
|
||||
|
||||
export default {
|
||||
name: "MessageDetail",
|
||||
props: {
|
||||
record: {},
|
||||
visible: {
|
||||
type: Boolean,
|
||||
default: false,
|
||||
},
|
||||
},
|
||||
data() {
|
||||
return {
|
||||
show: this.visible,
|
||||
data: {},
|
||||
loading: false,
|
||||
deserializerList: [],
|
||||
keyDeserializer: "String",
|
||||
valueDeserializer: "String",
|
||||
consumerDetail: [],
|
||||
columns,
|
||||
};
|
||||
},
|
||||
watch: {
|
||||
visible(v) {
|
||||
this.show = v;
|
||||
if (this.show) {
|
||||
this.getMessageDetail();
|
||||
this.getDeserializerList();
|
||||
}
|
||||
},
|
||||
},
|
||||
methods: {
|
||||
getMessageDetail() {
|
||||
this.loading = true;
|
||||
const params = Object.assign({}, this.record, {
|
||||
keyDeserializer: this.keyDeserializer,
|
||||
valueDeserializer: this.valueDeserializer,
|
||||
});
|
||||
request({
|
||||
url: KafkaMessageApi.searchDetail.url,
|
||||
method: KafkaMessageApi.searchDetail.method,
|
||||
data: params,
|
||||
}).then((res) => {
|
||||
this.loading = false;
|
||||
if (res.code != 0) {
|
||||
notification.error({
|
||||
message: "error",
|
||||
description: res.msg,
|
||||
});
|
||||
} else {
|
||||
this.data = res.data;
|
||||
}
|
||||
});
|
||||
},
|
||||
getDeserializerList() {
|
||||
request({
|
||||
url: KafkaMessageApi.deserializerList.url,
|
||||
method: KafkaMessageApi.deserializerList.method,
|
||||
}).then((res) => {
|
||||
if (res.code != 0) {
|
||||
notification.error({
|
||||
message: "error",
|
||||
description: res.msg,
|
||||
});
|
||||
} else {
|
||||
this.deserializerList = res.data;
|
||||
}
|
||||
});
|
||||
},
|
||||
handleCancel() {
|
||||
this.data = {};
|
||||
this.$emit("closeDetailDialog", { refresh: false });
|
||||
},
|
||||
formatTime(time) {
|
||||
return moment(time).format("YYYY-MM-DD HH:mm:ss:SSS");
|
||||
},
|
||||
keyDeserializerChange() {
|
||||
this.getMessageDetail();
|
||||
},
|
||||
valueDeserializerChange() {
|
||||
this.getMessageDetail();
|
||||
},
|
||||
resend() {
|
||||
const params = Object.assign({}, this.data);
|
||||
this.loading = true;
|
||||
request({
|
||||
url: KafkaMessageApi.resend.url,
|
||||
method: KafkaMessageApi.resend.method,
|
||||
data: params,
|
||||
}).then((res) => {
|
||||
this.loading = false;
|
||||
if (res.code != 0) {
|
||||
notification.error({
|
||||
message: "error",
|
||||
description: res.msg,
|
||||
});
|
||||
} else {
|
||||
this.$message.success(res.msg);
|
||||
}
|
||||
});
|
||||
},
|
||||
},
|
||||
};
|
||||
const columns = [
|
||||
{
|
||||
title: "消费组",
|
||||
dataIndex: "groupId",
|
||||
key: "groupId",
|
||||
},
|
||||
{
|
||||
title: "消费情况",
|
||||
dataIndex: "status",
|
||||
key: "status",
|
||||
scopedSlots: { customRender: "status" },
|
||||
},
|
||||
];
|
||||
</script>
|
||||
|
||||
<style scoped>
|
||||
.m-info {
|
||||
/*text-decoration: underline;*/
|
||||
}
|
||||
.title {
|
||||
width: 15%;
|
||||
display: inline-block;
|
||||
text-align: right;
|
||||
margin-right: 2%;
|
||||
font-weight: bold;
|
||||
}
|
||||
.ant-spin-container #message-detail textarea {
|
||||
max-width: 80% !important;
|
||||
vertical-align: top !important;
|
||||
}
|
||||
</style>
|
||||
95
ui/src/views/message/MessageList.vue
Normal file
95
ui/src/views/message/MessageList.vue
Normal file
@@ -0,0 +1,95 @@
|
||||
<template>
|
||||
<div>
|
||||
<a-table
|
||||
:columns="columns"
|
||||
:data-source="data"
|
||||
bordered
|
||||
:row-key="
|
||||
(record, index) => {
|
||||
return index;
|
||||
}
|
||||
"
|
||||
>
|
||||
<div slot="operation" slot-scope="record">
|
||||
<a-button
|
||||
size="small"
|
||||
href="javascript:;"
|
||||
class="operation-btn"
|
||||
@click="openDetailDialog(record)"
|
||||
>消息详情
|
||||
</a-button>
|
||||
</div>
|
||||
</a-table>
|
||||
<MessageDetail
|
||||
:visible="showDetailDialog"
|
||||
:record="record"
|
||||
@closeDetailDialog="closeDetailDialog"
|
||||
></MessageDetail>
|
||||
</div>
|
||||
</template>
|
||||
|
||||
<script>
|
||||
import moment from "moment";
|
||||
import MessageDetail from "@/views/message/MessageDetail";
|
||||
export default {
|
||||
name: "MessageList",
|
||||
components: { MessageDetail },
|
||||
props: {
|
||||
data: {
|
||||
type: Array,
|
||||
},
|
||||
},
|
||||
data() {
|
||||
return {
|
||||
columns: columns,
|
||||
showDetailDialog: false,
|
||||
record: {},
|
||||
};
|
||||
},
|
||||
methods: {
|
||||
openDetailDialog(record) {
|
||||
this.record = record;
|
||||
this.showDetailDialog = true;
|
||||
},
|
||||
closeDetailDialog() {
|
||||
this.showDetailDialog = false;
|
||||
},
|
||||
},
|
||||
};
|
||||
const columns = [
|
||||
{
|
||||
title: "topic",
|
||||
dataIndex: "topic",
|
||||
key: "topic",
|
||||
width: 300,
|
||||
},
|
||||
{
|
||||
title: "分区",
|
||||
dataIndex: "partition",
|
||||
key: "partition",
|
||||
},
|
||||
{
|
||||
title: "偏移",
|
||||
dataIndex: "offset",
|
||||
key: "offset",
|
||||
},
|
||||
{
|
||||
title: "时间",
|
||||
dataIndex: "timestamp",
|
||||
key: "timestamp",
|
||||
slots: { title: "timestamp" },
|
||||
scopedSlots: { customRender: "timestamp" },
|
||||
customRender: (text) => {
|
||||
return moment(text).format("YYYY-MM-DD HH:mm:ss:SSS");
|
||||
},
|
||||
},
|
||||
{
|
||||
title: "操作",
|
||||
key: "operation",
|
||||
scopedSlots: { customRender: "operation" },
|
||||
width: 200,
|
||||
},
|
||||
];
|
||||
</script>
|
||||
|
||||
<style scoped></style>
|
||||
195
ui/src/views/message/SearchByOffset.vue
Normal file
195
ui/src/views/message/SearchByOffset.vue
Normal file
@@ -0,0 +1,195 @@
|
||||
<template>
|
||||
<div class="tab-content">
|
||||
<a-spin :spinning="loading">
|
||||
<div id="search-offset-form-advanced-search">
|
||||
<a-form
|
||||
class="ant-advanced-search-form"
|
||||
:form="form"
|
||||
@submit="handleSearch"
|
||||
>
|
||||
<a-row :gutter="24">
|
||||
<a-col :span="9">
|
||||
<a-form-item label="topic">
|
||||
<a-select
|
||||
class="topic-select"
|
||||
@change="handleTopicChange"
|
||||
show-search
|
||||
option-filter-prop="children"
|
||||
v-decorator="[
|
||||
'topic',
|
||||
{
|
||||
rules: [{ required: true, message: '请选择一个topic!' }],
|
||||
},
|
||||
]"
|
||||
placeholder="请选择一个topic"
|
||||
>
|
||||
<a-select-option v-for="v in topicList" :key="v" :value="v">
|
||||
{{ v }}
|
||||
</a-select-option>
|
||||
</a-select>
|
||||
</a-form-item>
|
||||
</a-col>
|
||||
<a-col :span="6">
|
||||
<a-form-item label="分区">
|
||||
<a-select
|
||||
class="type-select"
|
||||
show-search
|
||||
option-filter-prop="children"
|
||||
v-model="selectPartition"
|
||||
placeholder="请选择一个分区"
|
||||
>
|
||||
<a-select-option v-for="v in partitions" :key="v" :value="v">
|
||||
<span v-if="v == -1">全部</span> <span v-else>{{ v }}</span>
|
||||
</a-select-option>
|
||||
</a-select>
|
||||
</a-form-item>
|
||||
</a-col>
|
||||
<a-col :span="7">
|
||||
<a-form-item label="偏移">
|
||||
<a-input
|
||||
v-decorator="[
|
||||
'offset',
|
||||
{
|
||||
rules: [{ required: true, message: '请输入消息偏移!' }],
|
||||
},
|
||||
]"
|
||||
placeholder="消息偏移"
|
||||
/>
|
||||
</a-form-item>
|
||||
</a-col>
|
||||
<a-col :span="2" :style="{ textAlign: 'right' }">
|
||||
<a-form-item>
|
||||
<a-button type="primary" html-type="submit"> 搜索</a-button>
|
||||
</a-form-item>
|
||||
</a-col>
|
||||
</a-row>
|
||||
</a-form>
|
||||
</div>
|
||||
<MessageList :data="data"></MessageList>
|
||||
</a-spin>
|
||||
</div>
|
||||
</template>
|
||||
|
||||
<script>
|
||||
import request from "@/utils/request";
|
||||
import { KafkaMessageApi, KafkaTopicApi } from "@/utils/api";
|
||||
import notification from "ant-design-vue/lib/notification";
|
||||
import MessageList from "@/views/message/MessageList";
|
||||
|
||||
export default {
|
||||
name: "SearchByOffset",
|
||||
components: { MessageList },
|
||||
props: {
|
||||
topicList: {
|
||||
type: Array,
|
||||
},
|
||||
},
|
||||
data() {
|
||||
return {
|
||||
loading: false,
|
||||
form: this.$form.createForm(this, { name: "message_search_offset" }),
|
||||
partitions: [],
|
||||
selectPartition: undefined,
|
||||
rangeConfig: {
|
||||
rules: [{ type: "array", required: true, message: "请选择时间!" }],
|
||||
},
|
||||
data: defaultData,
|
||||
};
|
||||
},
|
||||
methods: {
|
||||
handleSearch(e) {
|
||||
e.preventDefault();
|
||||
this.form.validateFields((err, values) => {
|
||||
if (!err) {
|
||||
const data = Object.assign({}, values, {
|
||||
partition: this.selectPartition,
|
||||
});
|
||||
this.loading = true;
|
||||
request({
|
||||
url: KafkaMessageApi.searchByOffset.url,
|
||||
method: KafkaMessageApi.searchByOffset.method,
|
||||
data: data,
|
||||
}).then((res) => {
|
||||
this.loading = false;
|
||||
if (res.code == 0) {
|
||||
this.$message.success(res.msg);
|
||||
this.data = res.data;
|
||||
} else {
|
||||
notification.error({
|
||||
message: "error",
|
||||
description: res.msg,
|
||||
});
|
||||
}
|
||||
});
|
||||
}
|
||||
});
|
||||
},
|
||||
getPartitionInfo(topic) {
|
||||
this.loading = true;
|
||||
request({
|
||||
url: KafkaTopicApi.getPartitionInfo.url + "?topic=" + topic,
|
||||
method: KafkaTopicApi.getPartitionInfo.method,
|
||||
}).then((res) => {
|
||||
this.loading = false;
|
||||
if (res.code != 0) {
|
||||
notification.error({
|
||||
message: "error",
|
||||
description: res.msg,
|
||||
});
|
||||
} else {
|
||||
this.partitions = res.data.map((v) => v.partition);
|
||||
this.partitions.splice(0, 0, -1);
|
||||
}
|
||||
});
|
||||
},
|
||||
handleTopicChange(topic) {
|
||||
this.selectPartition = -1;
|
||||
this.getPartitionInfo(topic);
|
||||
},
|
||||
},
|
||||
};
|
||||
const defaultData = [];
|
||||
</script>
|
||||
|
||||
<style scoped>
|
||||
.tab-content {
|
||||
width: 100%;
|
||||
height: 100%;
|
||||
}
|
||||
|
||||
.ant-advanced-search-form {
|
||||
padding: 24px;
|
||||
background: #fbfbfb;
|
||||
border: 1px solid #d9d9d9;
|
||||
border-radius: 6px;
|
||||
}
|
||||
|
||||
.ant-advanced-search-form .ant-form-item {
|
||||
display: flex;
|
||||
}
|
||||
|
||||
.ant-advanced-search-form .ant-form-item-control-wrapper {
|
||||
flex: 1;
|
||||
}
|
||||
|
||||
#components-form-topic-advanced-search .ant-form {
|
||||
max-width: none;
|
||||
margin-bottom: 1%;
|
||||
}
|
||||
|
||||
#search-offset-form-advanced-search .search-result-list {
|
||||
margin-top: 16px;
|
||||
border: 1px dashed #e9e9e9;
|
||||
border-radius: 6px;
|
||||
background-color: #fafafa;
|
||||
min-height: 200px;
|
||||
text-align: center;
|
||||
padding-top: 80px;
|
||||
}
|
||||
.topic-select {
|
||||
width: 400px !important;
|
||||
}
|
||||
.type-select {
|
||||
width: 200px !important;
|
||||
}
|
||||
</style>
|
||||
201
ui/src/views/message/SearchByTime.vue
Normal file
201
ui/src/views/message/SearchByTime.vue
Normal file
@@ -0,0 +1,201 @@
|
||||
<template>
|
||||
<div class="tab-content">
|
||||
<a-spin :spinning="loading">
|
||||
<div id="search-time-form-advanced-search">
|
||||
<a-form
|
||||
class="ant-advanced-search-form"
|
||||
:form="form"
|
||||
@submit="handleSearch"
|
||||
>
|
||||
<a-row :gutter="24">
|
||||
<a-col :span="9">
|
||||
<a-form-item label="topic">
|
||||
<a-select
|
||||
class="topic-select"
|
||||
@change="handleTopicChange"
|
||||
show-search
|
||||
option-filter-prop="children"
|
||||
v-decorator="[
|
||||
'topic',
|
||||
{
|
||||
rules: [{ required: true, message: '请选择一个topic!' }],
|
||||
},
|
||||
]"
|
||||
placeholder="请选择一个topic"
|
||||
>
|
||||
<a-select-option v-for="v in topicList" :key="v" :value="v">
|
||||
{{ v }}
|
||||
</a-select-option>
|
||||
</a-select>
|
||||
</a-form-item>
|
||||
</a-col>
|
||||
<a-col :span="5">
|
||||
<a-form-item label="分区">
|
||||
<a-select
|
||||
class="type-select"
|
||||
show-search
|
||||
option-filter-prop="children"
|
||||
v-model="selectPartition"
|
||||
placeholder="请选择一个分区"
|
||||
>
|
||||
<a-select-option v-for="v in partitions" :key="v" :value="v">
|
||||
<span v-if="v == -1">全部</span> <span v-else>{{ v }}</span>
|
||||
</a-select-option>
|
||||
</a-select>
|
||||
</a-form-item>
|
||||
</a-col>
|
||||
<a-col :span="8">
|
||||
<a-form-item label="时间">
|
||||
<a-range-picker
|
||||
v-decorator="['time', rangeConfig]"
|
||||
show-time
|
||||
format="YYYY-MM-DD HH:mm:ss"
|
||||
/>
|
||||
</a-form-item>
|
||||
</a-col>
|
||||
<a-col :span="2" :style="{ textAlign: 'right' }">
|
||||
<a-form-item>
|
||||
<a-button type="primary" html-type="submit"> 搜索</a-button>
|
||||
</a-form-item>
|
||||
</a-col>
|
||||
</a-row>
|
||||
</a-form>
|
||||
</div>
|
||||
<p style="margin-top: 1%">
|
||||
<strong
|
||||
>检索条数:{{ data.realNum }},允许返回的最大条数:{{
|
||||
data.maxNum
|
||||
}}</strong
|
||||
>
|
||||
</p>
|
||||
<MessageList :data="data.data"></MessageList>
|
||||
</a-spin>
|
||||
</div>
|
||||
</template>
|
||||
|
||||
<script>
|
||||
import request from "@/utils/request";
|
||||
import { KafkaMessageApi, KafkaTopicApi } from "@/utils/api";
|
||||
import notification from "ant-design-vue/lib/notification";
|
||||
import MessageList from "@/views/message/MessageList";
|
||||
|
||||
export default {
|
||||
name: "SearchByTime",
|
||||
components: { MessageList },
|
||||
props: {
|
||||
topicList: {
|
||||
type: Array,
|
||||
},
|
||||
},
|
||||
data() {
|
||||
return {
|
||||
loading: false,
|
||||
form: this.$form.createForm(this, { name: "message_search_time" }),
|
||||
partitions: [],
|
||||
selectPartition: undefined,
|
||||
rangeConfig: {
|
||||
rules: [{ type: "array", required: true, message: "请选择时间!" }],
|
||||
},
|
||||
data: defaultData,
|
||||
};
|
||||
},
|
||||
methods: {
|
||||
handleSearch(e) {
|
||||
e.preventDefault();
|
||||
this.form.validateFields((err, values) => {
|
||||
if (!err) {
|
||||
const data = Object.assign({}, values, {
|
||||
partition: this.selectPartition,
|
||||
});
|
||||
data.startTime = values.time[0].valueOf();
|
||||
data.endTime = values.time[1];
|
||||
this.loading = true;
|
||||
request({
|
||||
url: KafkaMessageApi.searchByTime.url,
|
||||
method: KafkaMessageApi.searchByTime.method,
|
||||
data: data,
|
||||
}).then((res) => {
|
||||
this.loading = false;
|
||||
if (res.code == 0) {
|
||||
this.$message.success(res.msg);
|
||||
this.data = res.data;
|
||||
} else {
|
||||
notification.error({
|
||||
message: "error",
|
||||
description: res.msg,
|
||||
});
|
||||
}
|
||||
});
|
||||
}
|
||||
});
|
||||
},
|
||||
getPartitionInfo(topic) {
|
||||
this.loading = true;
|
||||
request({
|
||||
url: KafkaTopicApi.getPartitionInfo.url + "?topic=" + topic,
|
||||
method: KafkaTopicApi.getPartitionInfo.method,
|
||||
}).then((res) => {
|
||||
this.loading = false;
|
||||
if (res.code != 0) {
|
||||
notification.error({
|
||||
message: "error",
|
||||
description: res.msg,
|
||||
});
|
||||
} else {
|
||||
this.partitions = res.data.map((v) => v.partition);
|
||||
this.partitions.splice(0, 0, -1);
|
||||
}
|
||||
});
|
||||
},
|
||||
handleTopicChange(topic) {
|
||||
this.selectPartition = -1;
|
||||
this.getPartitionInfo(topic);
|
||||
},
|
||||
},
|
||||
};
|
||||
const defaultData = { realNum: 0, maxNum: 0 };
|
||||
</script>
|
||||
|
||||
<style scoped>
|
||||
.tab-content {
|
||||
width: 100%;
|
||||
height: 100%;
|
||||
}
|
||||
|
||||
.ant-advanced-search-form {
|
||||
padding: 24px;
|
||||
background: #fbfbfb;
|
||||
border: 1px solid #d9d9d9;
|
||||
border-radius: 6px;
|
||||
}
|
||||
|
||||
.ant-advanced-search-form .ant-form-item {
|
||||
display: flex;
|
||||
}
|
||||
|
||||
.ant-advanced-search-form .ant-form-item-control-wrapper {
|
||||
flex: 1;
|
||||
}
|
||||
|
||||
#components-form-topic-advanced-search .ant-form {
|
||||
max-width: none;
|
||||
margin-bottom: 1%;
|
||||
}
|
||||
|
||||
#search-time-form-advanced-search .search-result-list {
|
||||
margin-top: 16px;
|
||||
border: 1px dashed #e9e9e9;
|
||||
border-radius: 6px;
|
||||
background-color: #fafafa;
|
||||
min-height: 200px;
|
||||
text-align: center;
|
||||
padding-top: 80px;
|
||||
}
|
||||
.topic-select {
|
||||
width: 400px !important;
|
||||
}
|
||||
|
||||
.type-select {
|
||||
width: 150px !important;
|
||||
}
|
||||
</style>
|
||||
173
ui/src/views/message/SendMessage.vue
Normal file
173
ui/src/views/message/SendMessage.vue
Normal file
@@ -0,0 +1,173 @@
|
||||
<template>
|
||||
<div class="content">
|
||||
<a-spin :spinning="loading">
|
||||
<a-form :form="form" @submit="handleSubmit">
|
||||
<a-form-item label="Topic">
|
||||
<a-select
|
||||
class="topic-select"
|
||||
@change="handleTopicChange"
|
||||
show-search
|
||||
option-filter-prop="children"
|
||||
v-decorator="[
|
||||
'topic',
|
||||
{
|
||||
rules: [{ required: true, message: '请选择一个topic!' }],
|
||||
},
|
||||
]"
|
||||
placeholder="请选择一个topic"
|
||||
>
|
||||
<a-select-option v-for="v in topicList" :key="v" :value="v">
|
||||
{{ v }}
|
||||
</a-select-option>
|
||||
</a-select>
|
||||
</a-form-item>
|
||||
<a-form-item label="分区">
|
||||
<a-select
|
||||
class="type-select"
|
||||
show-search
|
||||
option-filter-prop="children"
|
||||
v-model="selectPartition"
|
||||
placeholder="请选择一个分区"
|
||||
>
|
||||
<a-select-option v-for="v in partitions" :key="v" :value="v">
|
||||
<span v-if="v == -1">默认</span> <span v-else>{{ v }}</span>
|
||||
</a-select-option>
|
||||
</a-select>
|
||||
</a-form-item>
|
||||
<a-form-item label="消息Key">
|
||||
<a-input v-decorator="['key', { initialValue: 'key' }]" />
|
||||
</a-form-item>
|
||||
<a-form-item label="消息体" has-feedback>
|
||||
<a-textarea
|
||||
v-decorator="[
|
||||
'body',
|
||||
{
|
||||
rules: [
|
||||
{
|
||||
required: true,
|
||||
message: '输入消息体!',
|
||||
},
|
||||
],
|
||||
},
|
||||
]"
|
||||
placeholder="输入消息体!"
|
||||
/>
|
||||
</a-form-item>
|
||||
<a-form-item label="发送的消息数">
|
||||
<a-input-number
|
||||
v-decorator="[
|
||||
'num',
|
||||
{
|
||||
initialValue: 1,
|
||||
rules: [
|
||||
{
|
||||
required: true,
|
||||
message: '输入消息数!',
|
||||
},
|
||||
],
|
||||
},
|
||||
]"
|
||||
:min="1"
|
||||
:max="32"
|
||||
/>
|
||||
</a-form-item>
|
||||
<a-form-item :wrapper-col="{ span: 12, offset: 5 }">
|
||||
<a-button type="primary" html-type="submit"> 提交 </a-button>
|
||||
</a-form-item>
|
||||
</a-form>
|
||||
</a-spin>
|
||||
</div>
|
||||
</template>
|
||||
|
||||
<script>
|
||||
import request from "@/utils/request";
|
||||
import { KafkaTopicApi, KafkaMessageApi } from "@/utils/api";
|
||||
import notification from "ant-design-vue/lib/notification";
|
||||
export default {
|
||||
name: "SendMessage",
|
||||
components: {},
|
||||
props: {
|
||||
topicList: {
|
||||
type: Array,
|
||||
},
|
||||
},
|
||||
data() {
|
||||
return {
|
||||
form: this.$form.createForm(this, { name: "message_send" }),
|
||||
loading: false,
|
||||
partitions: [],
|
||||
selectPartition: undefined,
|
||||
};
|
||||
},
|
||||
methods: {
|
||||
getTopicNameList() {
|
||||
request({
|
||||
url: KafkaTopicApi.getTopicNameList.url,
|
||||
method: KafkaTopicApi.getTopicNameList.method,
|
||||
}).then((res) => {
|
||||
if (res.code == 0) {
|
||||
this.topicList = res.data;
|
||||
} else {
|
||||
notification.error({
|
||||
message: "error",
|
||||
description: res.msg,
|
||||
});
|
||||
}
|
||||
});
|
||||
},
|
||||
getPartitionInfo(topic) {
|
||||
this.loading = true;
|
||||
request({
|
||||
url: KafkaTopicApi.getPartitionInfo.url + "?topic=" + topic,
|
||||
method: KafkaTopicApi.getPartitionInfo.method,
|
||||
}).then((res) => {
|
||||
this.loading = false;
|
||||
if (res.code != 0) {
|
||||
notification.error({
|
||||
message: "error",
|
||||
description: res.msg,
|
||||
});
|
||||
} else {
|
||||
this.partitions = res.data.map((v) => v.partition);
|
||||
this.partitions.splice(0, 0, -1);
|
||||
}
|
||||
});
|
||||
},
|
||||
handleTopicChange(topic) {
|
||||
this.selectPartition = -1;
|
||||
this.getPartitionInfo(topic);
|
||||
},
|
||||
handleSubmit(e) {
|
||||
e.preventDefault();
|
||||
this.form.validateFields((err, values) => {
|
||||
if (!err) {
|
||||
const param = Object.assign({}, values, {
|
||||
partition: this.selectPartition,
|
||||
});
|
||||
this.loading = true;
|
||||
request({
|
||||
url: KafkaMessageApi.send.url,
|
||||
method: KafkaMessageApi.send.method,
|
||||
data: param,
|
||||
}).then((res) => {
|
||||
this.loading = false;
|
||||
if (res.code == 0) {
|
||||
this.$message.success(res.msg);
|
||||
} else {
|
||||
notification.error({
|
||||
message: "error",
|
||||
description: res.msg,
|
||||
});
|
||||
}
|
||||
});
|
||||
}
|
||||
});
|
||||
},
|
||||
},
|
||||
created() {
|
||||
this.getTopicNameList();
|
||||
},
|
||||
};
|
||||
</script>
|
||||
|
||||
<style scoped></style>
|
||||
@@ -49,7 +49,15 @@
|
||||
</a-button>
|
||||
</a-popconfirm>
|
||||
</div>
|
||||
<p slot="expandedRowRender" slot-scope="record" style="margin: 0">
|
||||
有效消息的时间范围:<span class="red-font">{{
|
||||
formatTime(record.beginTime)
|
||||
}}</span>
|
||||
~
|
||||
<span class="red-font">{{ formatTime(record.endTime) }}</span>
|
||||
</p>
|
||||
</a-table>
|
||||
<p>友情提示:点击+号展开,可以查看当前分区的有效消息的时间范围</p>
|
||||
</a-spin>
|
||||
</div>
|
||||
</a-modal>
|
||||
@@ -59,6 +67,7 @@
|
||||
import request from "@/utils/request";
|
||||
import { KafkaOpApi, KafkaTopicApi } from "@/utils/api";
|
||||
import notification from "ant-design-vue/es/notification";
|
||||
import moment from "moment";
|
||||
export default {
|
||||
name: "PartitionInfo",
|
||||
props: {
|
||||
@@ -128,6 +137,11 @@ export default {
|
||||
}
|
||||
});
|
||||
},
|
||||
formatTime(timestamp) {
|
||||
return timestamp != -1
|
||||
? moment(timestamp).format("YYYY-MM-DD HH:mm:ss:SSS")
|
||||
: timestamp;
|
||||
},
|
||||
},
|
||||
};
|
||||
|
||||
@@ -169,6 +183,26 @@ const columns = [
|
||||
dataIndex: "diff",
|
||||
key: "diff",
|
||||
},
|
||||
// {
|
||||
// title: "有效消息起始时间",
|
||||
// dataIndex: "beginTime",
|
||||
// key: "beginTime",
|
||||
// slots: { title: "beginTime" },
|
||||
// scopedSlots: { customRender: "internal" },
|
||||
// customRender: (text) => {
|
||||
// return text != -1 ? moment(text).format("YYYY-MM-DD HH:mm:ss:SSS") : text;
|
||||
// },
|
||||
// },
|
||||
// {
|
||||
// title: "有效消息结束时间",
|
||||
// dataIndex: "endTime",
|
||||
// key: "endTime",
|
||||
// slots: { title: "endTime" },
|
||||
// scopedSlots: { customRender: "internal" },
|
||||
// customRender: (text) => {
|
||||
// return text != -1 ? moment(text).format("YYYY-MM-DD HH:mm:ss:SSS") : text;
|
||||
// },
|
||||
// },
|
||||
{
|
||||
title: "操作",
|
||||
key: "operation",
|
||||
@@ -177,4 +211,11 @@ const columns = [
|
||||
];
|
||||
</script>
|
||||
|
||||
<style scoped></style>
|
||||
<style scoped>
|
||||
.red-font {
|
||||
color: red;
|
||||
}
|
||||
.green-font {
|
||||
color: green;
|
||||
}
|
||||
</style>
|
||||
|
||||
Reference in New Issue
Block a user