15 Commits

Author SHA1 Message Date
许晓东
b19c6200d2 polish README 2021-12-21 14:57:16 +08:00
许晓东
5f6a06c100 1.0.2安装包下载地址 2021-12-21 14:44:55 +08:00
许晓东
5930e44fdf 消息详情支持重新发送 2021-12-21 14:08:36 +08:00
许晓东
98f33bb2cc 分区信息里展示当前分区的有效时间范围 2021-12-20 20:29:34 +08:00
许晓东
0ec3bac6c2 在线发送消息 2021-12-20 00:09:20 +08:00
许晓东
bd814d550d 按时间查询消息及时释放内存 2021-12-17 20:06:23 +08:00
许晓东
b9548d1640 fix按时间查询消息bug,加长页面请求超时 2021-12-13 19:05:57 +08:00
许晓东
57a41e087f 查询消息详情的时候展示消费情况 2021-12-12 23:35:17 +08:00
许晓东
54cd402810 查询消息详情信息 2021-12-12 18:53:29 +08:00
许晓东
c17b0aa4b9 根据偏移查询消息 2021-12-11 23:56:18 +08:00
Xiaodong Xu
8169ddb019 Merge pull request #5 from xxd763795151/master
根据时间查询消息
2021-12-11 14:55:07 +08:00
许晓东
5f24c62855 根据时间查询消息 2021-12-11 14:53:54 +08:00
许晓东
3b21fc4cd8 加一个消息页面 2021-12-05 23:18:51 +08:00
许晓东
d15ec4a2db 页面左上角加个标志 2021-12-04 14:47:32 +08:00
许晓东
12431db525 更新最新版本安装包下载地址 2021-11-30 20:20:03 +08:00
30 changed files with 1860 additions and 28 deletions

View File

@@ -1,14 +1,16 @@
# kafka可视化管理平台
一款轻量级的kafka可视化管理平台安装配置快捷、简单易用。
为了开发的省事,没有多语言支持,只支持中文展示。
为了开发的省事,没有国际化支持,只支持中文展示。
用过rocketmq-console吧前端展示风格跟那个有点类似。
## 安装包下载
* 点击下载:[kafka-console-ui.tar.gz](https://github.com/xxd763795151/kafka-console-ui/releases/download/v1.0.0/kafka-console-ui.tar.gz) 或 [kafka-console-ui.zip](https://github.com/xxd763795151/kafka-console-ui/releases/download/v1.0.0/kafka-console-ui.zip)
* 参考下面的打包部署,下载源码重新打包(最新功能特性)
以下两种方式2选一直接下载安装包或下载源码手动打包
* 点击下载(v1.0.2版本)[kafka-console-ui.tar.gz](https://github.com/xxd763795151/kafka-console-ui/releases/download/v1.0.2/kafka-console-ui.tar.gz) 或 [kafka-console-ui.zip](https://github.com/xxd763795151/kafka-console-ui/releases/download/v1.0.2/kafka-console-ui.zip)
* 参考下面的打包部署,下载源码重新打包(提交的最新功能特性)
## 功能支持
* 集群信息
* Topic管理
* 消费组管理
* 消息管理
* 基于SASL_SCRAM认证授权管理
* 运维
![功能特性](./document/功能特性.png)
@@ -83,4 +85,6 @@ sh bin/shutdown.sh
![集群](./document/集群.png)
![Topic](./document/Topic.png)
![消费组](./document/消费组.png)
![运维](./document/运维.png)
![运维](./document/运维.png)
增加消息检索页面
![消息](./document/消息.png)

View File

Binary file not shown.

Before

Width:  |  Height:  |  Size: 204 KiB

After

Width:  |  Height:  |  Size: 492 KiB

BIN
document/消息.png Normal file
View File

Binary file not shown.

After

Width:  |  Height:  |  Size: 400 KiB

View File

@@ -10,7 +10,7 @@
</parent>
<groupId>com.xuxd</groupId>
<artifactId>kafka-console-ui</artifactId>
<version>1.0.1</version>
<version>1.0.2</version>
<name>kafka-console-ui</name>
<description>Kafka console manage ui</description>
<properties>

View File

@@ -0,0 +1,27 @@
package com.xuxd.kafka.console.beans;
import lombok.Data;
/**
* kafka-console-ui.
*
* @author xuxd
* @date 2021-12-11 09:45:49
**/
@Data
public class QueryMessage {
private String topic;
private int partition;
private long startTime;
private long endTime;
private long offset;
private String keyDeserializer;
private String valueDeserializer;
}

View File

@@ -0,0 +1,25 @@
package com.xuxd.kafka.console.beans;
import lombok.Data;
/**
* kafka-console-ui.
*
* @author xuxd
* @date 2021-12-19 23:28:31
**/
@Data
public class SendMessage {
private String topic;
private int partition;
private String key;
private String body;
private int num;
private long offset;
}

View File

@@ -0,0 +1,50 @@
package com.xuxd.kafka.console.beans.dto;
import com.xuxd.kafka.console.beans.QueryMessage;
import java.util.Date;
import lombok.Data;
/**
* kafka-console-ui.
*
* @author xuxd
* @date 2021-12-11 09:17:59
**/
@Data
public class QueryMessageDTO {
private String topic;
private int partition;
private Date startTime;
private Date endTime;
private Long offset;
private String keyDeserializer;
private String valueDeserializer;
public QueryMessage toQueryMessage() {
QueryMessage queryMessage = new QueryMessage();
queryMessage.setTopic(topic);
queryMessage.setPartition(partition);
if (startTime != null) {
queryMessage.setStartTime(startTime.getTime());
}
if (endTime != null) {
queryMessage.setEndTime(endTime.getTime());
}
if (offset != null) {
queryMessage.setOffset(offset);
}
queryMessage.setKeyDeserializer(keyDeserializer);
queryMessage.setValueDeserializer(valueDeserializer);
return queryMessage;
}
}

View File

@@ -0,0 +1,32 @@
package com.xuxd.kafka.console.beans.vo;
import lombok.Data;
import org.apache.kafka.clients.consumer.ConsumerRecord;
/**
* kafka-console-ui.
*
* @author xuxd
* @date 2021-12-11 14:19:35
**/
@Data
public class ConsumerRecordVO {
private String topic;
private int partition;
private long offset;
private long timestamp;
public static ConsumerRecordVO fromConsumerRecord(ConsumerRecord record) {
ConsumerRecordVO vo = new ConsumerRecordVO();
vo.setTopic(record.topic());
vo.setPartition(record.partition());
vo.setOffset(record.offset());
vo.setTimestamp(record.timestamp());
return vo;
}
}

View File

@@ -0,0 +1,47 @@
package com.xuxd.kafka.console.beans.vo;
import java.util.ArrayList;
import java.util.List;
import lombok.Data;
/**
* kafka-console-ui.
*
* @author xuxd
* @date 2021-12-12 12:45:23
**/
@Data
public class MessageDetailVO {
private String topic;
private int partition;
private long offset;
private long timestamp;
private String timestampType;
private List<HeaderVO> headers = new ArrayList<>();
private Object key;
private Object value;
private List<ConsumerVO> consumers;
@Data
public static class HeaderVO {
String key;
String value;
}
@Data
public static class ConsumerVO {
String groupId;
String status;
}
}

View File

@@ -29,6 +29,10 @@ public class TopicPartitionVO {
private long diff;
private long beginTime;
private long endTime;
public static TopicPartitionVO from(TopicPartitionInfo partitionInfo) {
TopicPartitionVO partitionVO = new TopicPartitionVO();
partitionVO.setPartition(partitionInfo.partition());

View File

@@ -5,6 +5,7 @@ import kafka.console.ConfigConsole;
import kafka.console.ConsumerConsole;
import kafka.console.KafkaAclConsole;
import kafka.console.KafkaConfigConsole;
import kafka.console.MessageConsole;
import kafka.console.OperationConsole;
import kafka.console.TopicConsole;
import org.springframework.context.annotation.Bean;
@@ -54,4 +55,9 @@ public class KafkaConfiguration {
ConsumerConsole consumerConsole) {
return new OperationConsole(config, topicConsole, consumerConsole);
}
@Bean
public MessageConsole messageConsole(KafkaConfig config) {
return new MessageConsole(config);
}
}

View File

@@ -0,0 +1,55 @@
package com.xuxd.kafka.console.controller;
import com.xuxd.kafka.console.beans.SendMessage;
import com.xuxd.kafka.console.beans.dto.QueryMessageDTO;
import com.xuxd.kafka.console.service.MessageService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* kafka-console-ui.
*
* @author xuxd
* @date 2021-12-11 09:22:19
**/
@RestController
@RequestMapping("/message")
public class MessageController {
@Autowired
private MessageService messageService;
@PostMapping("/search/time")
public Object searchByTime(@RequestBody QueryMessageDTO dto) {
return messageService.searchByTime(dto.toQueryMessage());
}
@PostMapping("/search/offset")
public Object searchByOffset(@RequestBody QueryMessageDTO dto) {
return messageService.searchByOffset(dto.toQueryMessage());
}
@PostMapping("/search/detail")
public Object searchDetail(@RequestBody QueryMessageDTO dto) {
return messageService.searchDetail(dto.toQueryMessage());
}
@GetMapping("/deserializer/list")
public Object deserializerList() {
return messageService.deserializerList();
}
@PostMapping("/send")
public Object send(@RequestBody SendMessage message) {
return messageService.send(message);
}
@PostMapping("/resend")
public Object resend(@RequestBody SendMessage message) {
return messageService.resend(message);
}
}

View File

@@ -38,4 +38,6 @@ public interface ConsumerService {
ResponseData getTopicSubscribedByGroups(String topic);
ResponseData getOffsetPartition(String groupId);
ResponseData<Set<String>> getSubscribedGroups(String topic);
}

View File

@@ -0,0 +1,26 @@
package com.xuxd.kafka.console.service;
import com.xuxd.kafka.console.beans.QueryMessage;
import com.xuxd.kafka.console.beans.ResponseData;
import com.xuxd.kafka.console.beans.SendMessage;
/**
* kafka-console-ui.
*
* @author xuxd
* @date 2021-12-11 09:43:26
**/
public interface MessageService {
ResponseData searchByTime(QueryMessage queryMessage);
ResponseData searchByOffset(QueryMessage queryMessage);
ResponseData searchDetail(QueryMessage queryMessage);
ResponseData deserializerList();
ResponseData send(SendMessage message);
ResponseData resend(SendMessage message);
}

View File

@@ -15,6 +15,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import kafka.console.ConsumerConsole;
import kafka.console.TopicConsole;
@@ -48,6 +49,8 @@ public class ConsumerServiceImpl implements ConsumerService {
@Autowired
private TopicConsole topicConsole;
private ReentrantLock lock = new ReentrantLock();
@Override public ResponseData getConsumerGroupList(List<String> groupIds, Set<ConsumerGroupState> states) {
String simulateGroup = "inner_xxx_not_exit_group_###" + System.currentTimeMillis();
Set<String> groupList = new HashSet<>();
@@ -167,25 +170,7 @@ public class ConsumerServiceImpl implements ConsumerService {
}
@Override public ResponseData getTopicSubscribedByGroups(String topic) {
if (topicSubscribedInfo.isNeedRefresh(topic)) {
Set<String> groupIdList = consumerConsole.getConsumerGroupIdList(Collections.emptySet());
Map<String, Set<String>> cache = new HashMap<>();
Map<String, List<TopicPartition>> subscribeTopics = consumerConsole.listSubscribeTopics(groupIdList);
subscribeTopics.forEach((groupId, tl) -> {
tl.forEach(topicPartition -> {
String t = topicPartition.topic();
if (!cache.containsKey(t)) {
cache.put(t, new HashSet<>());
}
cache.get(t).add(groupId);
});
});
topicSubscribedInfo.refresh(cache);
}
Set<String> groups = topicSubscribedInfo.getSubscribedGroups(topic);
Set<String> groups = this.getSubscribedGroups(topic).getData();
Map<String, Object> res = new HashMap<>();
Collection<ConsumerConsole.TopicPartitionConsumeInfo> consumerDetail = consumerConsole.getConsumerDetail(groups);
@@ -212,6 +197,34 @@ public class ConsumerServiceImpl implements ConsumerService {
return ResponseData.create().data(Utils.abs(groupId.hashCode()) % size);
}
@Override public ResponseData<Set<String>> getSubscribedGroups(String topic) {
if (topicSubscribedInfo.isNeedRefresh(topic) && !lock.isLocked()) {
try {
lock.lock();
Set<String> groupIdList = consumerConsole.getConsumerGroupIdList(Collections.emptySet());
Map<String, Set<String>> cache = new HashMap<>();
Map<String, List<TopicPartition>> subscribeTopics = consumerConsole.listSubscribeTopics(groupIdList);
subscribeTopics.forEach((groupId, tl) -> {
tl.forEach(topicPartition -> {
String t = topicPartition.topic();
if (!cache.containsKey(t)) {
cache.put(t, new HashSet<>());
}
cache.get(t).add(groupId);
});
});
topicSubscribedInfo.refresh(cache);
} finally {
lock.unlock();
}
}
Set<String> groups = topicSubscribedInfo.getSubscribedGroups(topic);
return ResponseData.create(Set.class).data(groups).success();
}
class TopicSubscribedInfo {
long lastTime = System.currentTimeMillis();

View File

@@ -0,0 +1,212 @@
package com.xuxd.kafka.console.service.impl;
import com.xuxd.kafka.console.beans.QueryMessage;
import com.xuxd.kafka.console.beans.ResponseData;
import com.xuxd.kafka.console.beans.SendMessage;
import com.xuxd.kafka.console.beans.vo.ConsumerRecordVO;
import com.xuxd.kafka.console.beans.vo.MessageDetailVO;
import com.xuxd.kafka.console.service.ConsumerService;
import com.xuxd.kafka.console.service.MessageService;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import kafka.console.ConsumerConsole;
import kafka.console.MessageConsole;
import kafka.console.TopicConsole;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.BytesDeserializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.DoubleDeserializer;
import org.apache.kafka.common.serialization.FloatDeserializer;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Service;
import scala.Tuple2;
/**
* kafka-console-ui.
*
* @author xuxd
* @date 2021-12-11 09:43:44
**/
@Slf4j
@Service
public class MessageServiceImpl implements MessageService, ApplicationContextAware {
@Autowired
private MessageConsole messageConsole;
@Autowired
private TopicConsole topicConsole;
@Autowired
private ConsumerConsole consumerConsole;
private ApplicationContext applicationContext;
private Map<String, Deserializer> deserializerDict = new HashMap<>();
{
deserializerDict.put("ByteArray", new ByteArrayDeserializer());
deserializerDict.put("Integer", new IntegerDeserializer());
deserializerDict.put("String", new StringDeserializer());
deserializerDict.put("Float", new FloatDeserializer());
deserializerDict.put("Double", new DoubleDeserializer());
deserializerDict.put("Byte", new BytesDeserializer());
}
public static String defaultDeserializer = "String";
@Override public ResponseData searchByTime(QueryMessage queryMessage) {
int maxNums = 10000;
Set<TopicPartition> partitions = getPartitions(queryMessage);
long startTime = System.currentTimeMillis();
List<ConsumerRecord<byte[], byte[]>> records = messageConsole.searchBy(partitions, queryMessage.getStartTime(), queryMessage.getEndTime(), maxNums);
log.info("search message by time, cost time: {}", (System.currentTimeMillis() - startTime));
List<ConsumerRecordVO> vos = records.stream().filter(record -> record.timestamp() <= queryMessage.getEndTime())
.map(ConsumerRecordVO::fromConsumerRecord).collect(Collectors.toList());
Map<String, Object> res = new HashMap<>();
res.put("maxNum", maxNums);
res.put("realNum", vos.size());
res.put("data", vos.subList(0, Math.min(maxNums, vos.size())));
return ResponseData.create().data(res).success();
}
@Override public ResponseData searchByOffset(QueryMessage queryMessage) {
Map<TopicPartition, ConsumerRecord<byte[], byte[]>> recordMap = searchRecordByOffset(queryMessage);
return ResponseData.create().data(recordMap.values().stream().map(ConsumerRecordVO::fromConsumerRecord).collect(Collectors.toList())).success();
}
@Override public ResponseData searchDetail(QueryMessage queryMessage) {
if (queryMessage.getPartition() == -1) {
throw new IllegalArgumentException();
}
if (StringUtils.isBlank(queryMessage.getKeyDeserializer())) {
queryMessage.setKeyDeserializer(defaultDeserializer);
}
if (StringUtils.isBlank(queryMessage.getValueDeserializer())) {
queryMessage.setValueDeserializer(defaultDeserializer);
}
Map<TopicPartition, ConsumerRecord<byte[], byte[]>> recordMap = searchRecordByOffset(queryMessage);
ConsumerRecord<byte[], byte[]> record = recordMap.get(new TopicPartition(queryMessage.getTopic(), queryMessage.getPartition()));
if (record != null) {
MessageDetailVO vo = new MessageDetailVO();
vo.setTopic(record.topic());
vo.setPartition(record.partition());
vo.setOffset(record.offset());
vo.setTimestamp(record.timestamp());
vo.setTimestampType(record.timestampType().name());
try {
vo.setKey(deserializerDict.get(queryMessage.getKeyDeserializer()).deserialize(queryMessage.getTopic(), record.key()));
} catch (Exception e) {
vo.setKey("KeyDeserializer Error: " + e.getMessage());
}
try {
vo.setValue(deserializerDict.get(queryMessage.getValueDeserializer()).deserialize(queryMessage.getTopic(), record.value()));
} catch (Exception e) {
vo.setValue("ValueDeserializer Error: " + e.getMessage());
}
record.headers().forEach(header -> {
MessageDetailVO.HeaderVO headerVO = new MessageDetailVO.HeaderVO();
headerVO.setKey(header.key());
headerVO.setValue(new String(header.value()));
vo.getHeaders().add(headerVO);
});
// 为了尽量保持代码好看不直接注入另一个service层的实现类了
Set<String> groupIds = applicationContext.getBean(ConsumerService.class).getSubscribedGroups(record.topic()).getData();
Collection<ConsumerConsole.TopicPartitionConsumeInfo> consumerDetail = consumerConsole.getConsumerDetail(groupIds);
List<MessageDetailVO.ConsumerVO> consumerVOS = new LinkedList<>();
consumerDetail.forEach(consumerInfo -> {
if (consumerInfo.topicPartition().equals(new TopicPartition(record.topic(), record.partition()))) {
MessageDetailVO.ConsumerVO consumerVO = new MessageDetailVO.ConsumerVO();
consumerVO.setGroupId(consumerInfo.getGroupId());
consumerVO.setStatus(consumerInfo.getConsumerOffset() <= record.offset() ? "unconsume" : "consumed");
consumerVOS.add(consumerVO);
}
});
vo.setConsumers(consumerVOS);
return ResponseData.create().data(vo).success();
}
return ResponseData.create().failed("Not found message detail.");
}
@Override public ResponseData deserializerList() {
return ResponseData.create().data(deserializerDict.keySet()).success();
}
@Override public ResponseData send(SendMessage message) {
messageConsole.send(message.getTopic(), message.getPartition(), message.getKey(), message.getBody(), message.getNum());
return ResponseData.create().success();
}
@Override public ResponseData resend(SendMessage message) {
TopicPartition partition = new TopicPartition(message.getTopic(), message.getPartition());
Map<TopicPartition, Object> offsetTable = new HashMap<>(1, 1.0f);
offsetTable.put(partition, message.getOffset());
Map<TopicPartition, ConsumerRecord<byte[], byte[]>> recordMap = messageConsole.searchBy(offsetTable);
if (recordMap.isEmpty()) {
return ResponseData.create().failed("Get message failed.");
}
ConsumerRecord<byte[], byte[]> record = recordMap.get(partition);
ProducerRecord<byte[], byte[]> producerRecord = new ProducerRecord<>(record.topic(), record.partition(), record.key(), record.value(), record.headers());
Tuple2<Object, String> tuple2 = messageConsole.sendSync(producerRecord);
boolean success = (boolean) tuple2._1();
return success ? ResponseData.create().success("success: " + tuple2._2()) : ResponseData.create().failed(tuple2._2());
}
private Map<TopicPartition, ConsumerRecord<byte[], byte[]>> searchRecordByOffset(QueryMessage queryMessage) {
Set<TopicPartition> partitions = getPartitions(queryMessage);
Map<TopicPartition, Object> offsetTable = new HashMap<>();
partitions.forEach(tp -> {
offsetTable.put(tp, queryMessage.getOffset());
});
Map<TopicPartition, ConsumerRecord<byte[], byte[]>> recordMap = messageConsole.searchBy(offsetTable);
return recordMap;
}
private Set<TopicPartition> getPartitions(QueryMessage queryMessage) {
Set<TopicPartition> partitions = new HashSet<>();
if (queryMessage.getPartition() != -1) {
partitions.add(new TopicPartition(queryMessage.getTopic(), queryMessage.getPartition()));
} else {
List<TopicDescription> list = topicConsole.getTopicList(Collections.singleton(queryMessage.getTopic()));
if (CollectionUtils.isEmpty(list)) {
throw new IllegalArgumentException("Can not find topic info.");
}
Set<TopicPartition> set = list.get(0).partitions().stream()
.map(tp -> new TopicPartition(queryMessage.getTopic(), tp.partition())).collect(Collectors.toSet());
partitions.addAll(set);
}
return partitions;
}
@Override public void setApplicationContext(ApplicationContext context) throws BeansException {
this.applicationContext = context;
}
}

View File

@@ -19,12 +19,14 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import kafka.console.MessageConsole;
import kafka.console.TopicConsole;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.admin.NewPartitions;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.TopicPartitionInfo;
import org.springframework.beans.factory.annotation.Autowired;
@@ -44,6 +46,9 @@ public class TopicServiceImpl implements TopicService {
@Autowired
private TopicConsole topicConsole;
@Autowired
private MessageConsole messageConsole;
private Gson gson = GsonUtil.INSTANCE.get();
@Override public ResponseData getTopicNameList(boolean internal) {
@@ -106,6 +111,10 @@ public class TopicServiceImpl implements TopicService {
mapTuple2._2().forEach((k, v) -> {
endTable.put(k.partition(), (Long) v);
});
// computer the valid time range.
Map<TopicPartition, Object> beginOffsetTable = new HashMap<>();
Map<TopicPartition, Object> endOffsetTable = new HashMap<>();
Map<Integer, TopicPartition> partitionCache = new HashMap<>();
for (TopicPartitionVO partitionVO : voList) {
long begin = beginTable.get(partitionVO.getPartition());
@@ -113,7 +122,29 @@ public class TopicServiceImpl implements TopicService {
partitionVO.setBeginOffset(begin);
partitionVO.setEndOffset(end);
partitionVO.setDiff(end - begin);
if (begin != end) {
TopicPartition partition = new TopicPartition(topic, partitionVO.getPartition());
partitionCache.put(partitionVO.getPartition(), partition);
beginOffsetTable.put(partition, begin);
endOffsetTable.put(partition, end - 1); // end must < endOff
} else {
partitionVO.setBeginTime(-1L);
partitionVO.setEndTime(-1L);
}
}
Map<TopicPartition, ConsumerRecord<byte[], byte[]>> beginRecordMap = messageConsole.searchBy(beginOffsetTable);
Map<TopicPartition, ConsumerRecord<byte[], byte[]>> endRecordMap = messageConsole.searchBy(endOffsetTable);
for (TopicPartitionVO partitionVO : voList) {
if (partitionVO.getBeginTime() != -1L) {
TopicPartition partition = partitionCache.get(partitionVO.getPartition());
partitionVO.setBeginTime(beginRecordMap.containsKey(partition) ? beginRecordMap.get(partition).timestamp() : -1L);
partitionVO.setEndTime(endRecordMap.containsKey(partition) ? endRecordMap.get(partition).timestamp() : -1L);
}
}
return ResponseData.create().data(voList).success();
}

View File

@@ -5,10 +5,11 @@ import kafka.zk.{AdminZkClient, KafkaZkClient}
import org.apache.kafka.clients.CommonClientConfigs
import org.apache.kafka.clients.admin._
import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer, OffsetAndMetadata}
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.config.SaslConfigs
import org.apache.kafka.common.requests.ListOffsetsResponse
import org.apache.kafka.common.serialization.ByteArrayDeserializer
import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySerializer, StringSerializer}
import org.apache.kafka.common.utils.Time
import org.slf4j.{Logger, LoggerFactory}
@@ -60,6 +61,38 @@ class KafkaConsole(config: KafkaConfig) {
}
}
protected def withProducerAndCatchError(f: KafkaProducer[String, String] => Any, eh: Exception => Any,
extra: Properties = new Properties()): Any = {
val props = getProps()
props.putAll(extra)
props.put(ConsumerConfig.CLIENT_ID_CONFIG, String.valueOf(System.currentTimeMillis()))
val producer = new KafkaProducer[String, String](props, new StringSerializer, new StringSerializer)
try {
f(producer)
} catch {
case er: Exception => eh(er)
}
finally {
producer.close()
}
}
protected def withByteProducerAndCatchError(f: KafkaProducer[Array[Byte], Array[Byte]] => Any, eh: Exception => Any,
extra: Properties = new Properties()): Any = {
val props = getProps()
props.putAll(extra)
props.put(ConsumerConfig.CLIENT_ID_CONFIG, String.valueOf(System.currentTimeMillis()))
val producer = new KafkaProducer[Array[Byte], Array[Byte]](props, new ByteArraySerializer, new ByteArraySerializer)
try {
f(producer)
} catch {
case er: Exception => eh(er)
}
finally {
producer.close()
}
}
protected def withZKClient(f: AdminZkClient => Any): Any = {
val zkClient = KafkaZkClient(config.getZookeeperAddr, false, 30000, 30000, Int.MaxValue, Time.SYSTEM)
val adminZkClient = new AdminZkClient(zkClient)

View File

@@ -0,0 +1,196 @@
package kafka.console
import com.xuxd.kafka.console.config.KafkaConfig
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.TopicPartition
import java.time.Duration
import java.util
import java.util.Properties
import scala.collection.immutable
import scala.jdk.CollectionConverters.{CollectionHasAsScala, MapHasAsScala, SeqHasAsJava}
/**
* kafka-console-ui.
*
* @author xuxd
* @date 2021-12-11 09:39:40
* */
class MessageConsole(config: KafkaConfig) extends KafkaConsole(config: KafkaConfig) with Logging {
def searchBy(partitions: util.Collection[TopicPartition], startTime: Long, endTime: Long,
maxNums: Int): util.List[ConsumerRecord[Array[Byte], Array[Byte]]] = {
var startOffTable: immutable.Map[TopicPartition, Long] = Map.empty
var endOffTable: immutable.Map[TopicPartition, Long] = Map.empty
withAdminClientAndCatchError(admin => {
val startTable = KafkaConsole.getLogTimestampOffsets(admin, partitions.asScala.toSeq, startTime, timeoutMs)
startOffTable = startTable.map(t2 => (t2._1, t2._2.offset())).toMap
endOffTable = KafkaConsole.getLogTimestampOffsets(admin, partitions.asScala.toSeq, endTime, timeoutMs)
.map(t2 => (t2._1, t2._2.offset())).toMap
}, e => {
log.error("getLogTimestampOffsets error.", e)
throw new RuntimeException("getLogTimestampOffsets error", e)
})
var terminate: Boolean = (startOffTable == endOffTable)
val res = new util.LinkedList[ConsumerRecord[Array[Byte], Array[Byte]]]()
// 如果最小和最大偏移一致,就结束
if (!terminate) {
val arrive = new util.HashSet[TopicPartition](partitions)
val props = new Properties()
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
withConsumerAndCatchError(consumer => {
consumer.assign(partitions)
for ((tp, off) <- startOffTable) {
consumer.seek(tp, off)
}
// 终止条件
// 1.所有查询分区达都到最大偏移的时候
while (!terminate) {
// 达到查询的最大条数
if (res.size() >= maxNums) {
terminate = true
} else {
val records = consumer.poll(Duration.ofMillis(timeoutMs))
if (records.isEmpty) {
terminate = true
} else {
for ((tp, endOff) <- endOffTable) {
if (!terminate) {
var recordList = records.records(tp)
if (!recordList.isEmpty) {
val first = recordList.get(0)
if (first.offset() >= endOff) {
arrive.remove(tp)
} else {
//
// (String topic,
// int partition,
// long offset,
// long timestamp,
// TimestampType timestampType,
// Long checksum,
// int serializedKeySize,
// int serializedValueSize,
// K key,
// V value,
// Headers headers,
// Optional<Integer> leaderEpoch)
val nullVList = recordList.asScala.map(record => new ConsumerRecord[Array[Byte], Array[Byte]](record.topic(),
record.partition(),
record.offset(),
record.timestamp(),
record.timestampType(),
record.checksum(),
record.serializedKeySize(),
record.serializedValueSize(),
record.key(),
null,
record.headers(),
record.leaderEpoch())).toSeq.asJava
res.addAll(nullVList)
if (recordList.get(recordList.size() - 1).offset() >= endOff) {
arrive.remove(tp)
}
if (recordList != null) {
recordList = null
}
}
}
if (arrive.isEmpty) {
terminate = true
}
}
}
}
}
}
}, e => {
log.error("searchBy time error.", e)
})
}
res
}
def searchBy(
tp2o: util.Map[TopicPartition, Long]): util.Map[TopicPartition, ConsumerRecord[Array[Byte], Array[Byte]]] = {
val props = new Properties()
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
val res = new util.HashMap[TopicPartition, ConsumerRecord[Array[Byte], Array[Byte]]]()
withConsumerAndCatchError(consumer => {
var tpSet = tp2o.keySet()
val tpSetCopy = new util.HashSet[TopicPartition](tpSet)
val endOffsets = consumer.endOffsets(tpSet)
val beginOffsets = consumer.beginningOffsets(tpSet)
for ((tp, off) <- tp2o.asScala) {
val endOff = endOffsets.get(tp)
// if (endOff <= off) {
// consumer.seek(tp, endOff)
// tpSetCopy.remove(tp)
// } else {
// consumer.seek(tp, off)
// }
val beginOff = beginOffsets.get(tp)
if (off < beginOff || off >= endOff) {
tpSetCopy.remove(tp)
}
}
tpSet = tpSetCopy
consumer.assign(tpSet)
tpSet.asScala.foreach(tp => {
consumer.seek(tp, tp2o.get(tp))
})
var terminate = tpSet.isEmpty
while (!terminate) {
val records = consumer.poll(Duration.ofMillis(timeoutMs))
val tps = new util.HashSet(tpSet).asScala
for (tp <- tps) {
if (!res.containsKey(tp)) {
val recordList = records.records(tp)
if (!recordList.isEmpty) {
val record = recordList.get(0)
res.put(tp, record)
tpSet.remove(tp)
}
}
if (tpSet.isEmpty) {
terminate = true
}
}
}
}, e => {
log.error("searchBy offset error.", e)
})
res
}
def send(topic: String, partition: Int, key: String, value: String, num: Int): Unit = {
withProducerAndCatchError(producer => {
val nullKey = if (key != null && key.trim().length() == 0) null else key
for (a <- 1 to num) {
val record = if (partition != -1) new ProducerRecord[String, String](topic, partition, nullKey, value)
else new ProducerRecord[String, String](topic, nullKey, value)
producer.send(record)
}
}, e => log.error("send error.", e))
}
def sendSync(record: ProducerRecord[Array[Byte], Array[Byte]]): (Boolean, String) = {
withByteProducerAndCatchError(producer => {
val metadata = producer.send(record).get()
(true, metadata.toString())
}, e => {
log.error("send error.", e)
(false, e.getMessage)
}).asInstanceOf[(Boolean, String)]
}
}

View File

@@ -1,6 +1,7 @@
<template>
<div id="app">
<div id="nav">
<h2 class="logo">Kafka 控制台</h2>
<router-link to="/" class="pad-l-r">主页</router-link>
<span>|</span
><router-link to="/cluster-page" class="pad-l-r">集群</router-link>
@@ -8,6 +9,8 @@
><router-link to="/topic-page" class="pad-l-r">Topic</router-link>
<span>|</span
><router-link to="/group-page" class="pad-l-r">消费组</router-link>
<span>|</span
><router-link to="/message-page" class="pad-l-r">消息</router-link>
<span v-show="config.enableAcl">|</span
><router-link to="/acl-page" class="pad-l-r" v-show="config.enableAcl"
>Acl</router-link
@@ -44,7 +47,6 @@ export default {
font-family: Avenir, Helvetica, Arial, sans-serif;
-webkit-font-smoothing: antialiased;
-moz-osx-font-smoothing: grayscale;
text-align: center;
color: #2c3e50;
}
@@ -59,6 +61,7 @@ export default {
padding-top: 1%;
padding-bottom: 1%;
margin-bottom: 1%;
text-align: center;
}
#nav a {
@@ -81,4 +84,10 @@ export default {
height: 90%;
width: 100%;
}
.logo {
float: left;
left: 1%;
top: 1%;
position: absolute;
}
</style>

View File

@@ -43,6 +43,12 @@ const routes = [
component: () =>
import(/* webpackChunkName: "cluster" */ "../views/cluster/Cluster.vue"),
},
{
path: "/message-page",
name: "Message",
component: () =>
import(/* webpackChunkName: "cluster" */ "../views/message/Message.vue"),
},
];
const router = new VueRouter({

View File

@@ -223,3 +223,29 @@ export const KafkaOpApi = {
method: "delete",
},
};
export const KafkaMessageApi = {
searchByTime: {
url: "/message/search/time",
method: "post",
},
searchByOffset: {
url: "/message/search/offset",
method: "post",
},
searchDetail: {
url: "/message/search/detail",
method: "post",
},
deserializerList: {
url: "/message/deserializer/list",
method: "get",
},
send: {
url: "/message/send",
method: "post",
},
resend: {
url: "/message/resend",
method: "post",
},
};

View File

@@ -6,7 +6,7 @@ import { VueAxios } from "./axios";
const request = axios.create({
// API 请求的默认前缀
baseURL: process.env.VUE_APP_API_BASE_URL,
timeout: 30000, // 请求超时时间
timeout: 120000, // 请求超时时间
});
// 异常拦截处理器

View File

@@ -0,0 +1,58 @@
<template>
<div class="content">
<a-spin :spinning="loading">
<a-tabs default-active-key="1" size="large" tabPosition="top">
<a-tab-pane key="1" tab="根据时间查询消息">
<SearchByTime :topic-list="topicList"></SearchByTime>
</a-tab-pane>
<a-tab-pane key="2" tab="根据偏移查询消息">
<SearchByOffset :topic-list="topicList"></SearchByOffset>
</a-tab-pane>
<a-tab-pane key="3" tab="在线发送">
<SendMessage :topic-list="topicList"></SendMessage>
</a-tab-pane>
</a-tabs>
</a-spin>
</div>
</template>
<script>
import SearchByTime from "@/views/message/SearchByTime";
import SearchByOffset from "@/views/message/SearchByOffset";
import request from "@/utils/request";
import { KafkaTopicApi } from "@/utils/api";
import notification from "ant-design-vue/lib/notification";
import SendMessage from "@/views/message/SendMessage";
export default {
name: "Message",
components: { SearchByTime, SearchByOffset, SendMessage },
data() {
return {
loading: false,
topicList: [],
};
},
methods: {
getTopicNameList() {
request({
url: KafkaTopicApi.getTopicNameList.url,
method: KafkaTopicApi.getTopicNameList.method,
}).then((res) => {
if (res.code == 0) {
this.topicList = res.data;
} else {
notification.error({
message: "error",
description: res.msg,
});
}
});
},
},
created() {
this.getTopicNameList();
},
};
</script>
<style scoped></style>

View File

@@ -0,0 +1,265 @@
<template>
<a-modal
title="消息详情"
:visible="show"
:width="800"
:mask="false"
:destroyOnClose="true"
:footer="null"
:maskClosable="false"
@cancel="handleCancel"
>
<div>
<a-spin :spinning="loading">
<div>
<h4>消息信息</h4>
<hr />
<div class="message-detail" id="message-detail">
<p>
<label class="title">Topic: </label>
<span class="m-info">{{ data.topic }}</span>
</p>
<p>
<label class="title">分区: </label>
<span class="m-info">{{ data.partition }}</span>
</p>
<p>
<label class="title">偏移: </label>
<span class="m-info">{{ data.offset }}</span>
</p>
<p>
<label class="title">消息头: </label>
<span class="m-info">{{ data.headers }}</span>
</p>
<p>
<label class="title">时间类型: </label>
<span class="m-info"
>{{
data.timestampType
}}(表示下面的时间是哪种类型消息创建写入日志亦或其它)</span
>
</p>
<p>
<label class="title">时间: </label>
<span class="m-info">{{ formatTime(data.timestamp) }}</span>
</p>
<p>
<label class="title">Key反序列化: </label>
<a-select
style="width: 120px"
v-model="keyDeserializer"
@change="keyDeserializerChange"
>
<a-select-option
v-for="v in deserializerList"
:key="v"
:value="v"
>
{{ v }}
</a-select-option>
</a-select>
<span>选一个合适反序列化器要不可能乱码了</span>
</p>
<p>
<label class="title">Key: </label>
<span class="m-info">{{ data.key }}</span>
</p>
<p>
<label class="title">消息体反序列化: </label>
<a-select
v-model="valueDeserializer"
style="width: 120px"
@change="valueDeserializerChange"
>
<a-select-option
v-for="v in deserializerList"
:key="v"
:value="v"
>
{{ v }}
</a-select-option>
</a-select>
<span>选一个合适反序列化器要不可能乱码了</span>
</p>
<p>
<label class="title">消息体: </label>
<a-textarea
type="textarea"
:value="data.value"
:rows="5"
:read-only="true"
></a-textarea>
</p>
</div>
</div>
<div>
<h4>消费信息</h4>
<hr />
<a-table
:columns="columns"
:data-source="data.consumers"
bordered
row-key="groupId"
>
<div slot="status" slot-scope="text">
<span v-if="text == 'consumed'">已消费</span
><span v-else style="color: red">未消费</span>
</div>
</a-table>
</div>
<div>
<h4>操作</h4>
<hr />
<a-popconfirm
title="确定将当前这条消息重新发回broker"
ok-text="确认"
cancel-text="取消"
@confirm="resend"
>
<a-button type="primary" icon="reload"> 重新发送 </a-button>
</a-popconfirm>
</div>
</a-spin>
</div>
</a-modal>
</template>
<script>
import request from "@/utils/request";
import { KafkaMessageApi } from "@/utils/api";
import notification from "ant-design-vue/lib/notification";
import moment from "moment";
export default {
name: "MessageDetail",
props: {
record: {},
visible: {
type: Boolean,
default: false,
},
},
data() {
return {
show: this.visible,
data: {},
loading: false,
deserializerList: [],
keyDeserializer: "String",
valueDeserializer: "String",
consumerDetail: [],
columns,
};
},
watch: {
visible(v) {
this.show = v;
if (this.show) {
this.getMessageDetail();
this.getDeserializerList();
}
},
},
methods: {
getMessageDetail() {
this.loading = true;
const params = Object.assign({}, this.record, {
keyDeserializer: this.keyDeserializer,
valueDeserializer: this.valueDeserializer,
});
request({
url: KafkaMessageApi.searchDetail.url,
method: KafkaMessageApi.searchDetail.method,
data: params,
}).then((res) => {
this.loading = false;
if (res.code != 0) {
notification.error({
message: "error",
description: res.msg,
});
} else {
this.data = res.data;
}
});
},
getDeserializerList() {
request({
url: KafkaMessageApi.deserializerList.url,
method: KafkaMessageApi.deserializerList.method,
}).then((res) => {
if (res.code != 0) {
notification.error({
message: "error",
description: res.msg,
});
} else {
this.deserializerList = res.data;
}
});
},
handleCancel() {
this.data = {};
this.$emit("closeDetailDialog", { refresh: false });
},
formatTime(time) {
return moment(time).format("YYYY-MM-DD HH:mm:ss:SSS");
},
keyDeserializerChange() {
this.getMessageDetail();
},
valueDeserializerChange() {
this.getMessageDetail();
},
resend() {
const params = Object.assign({}, this.data);
this.loading = true;
request({
url: KafkaMessageApi.resend.url,
method: KafkaMessageApi.resend.method,
data: params,
}).then((res) => {
this.loading = false;
if (res.code != 0) {
notification.error({
message: "error",
description: res.msg,
});
} else {
this.$message.success(res.msg);
}
});
},
},
};
const columns = [
{
title: "消费组",
dataIndex: "groupId",
key: "groupId",
},
{
title: "消费情况",
dataIndex: "status",
key: "status",
scopedSlots: { customRender: "status" },
},
];
</script>
<style scoped>
.m-info {
/*text-decoration: underline;*/
}
.title {
width: 15%;
display: inline-block;
text-align: right;
margin-right: 2%;
font-weight: bold;
}
.ant-spin-container #message-detail textarea {
max-width: 80% !important;
vertical-align: top !important;
}
</style>

View File

@@ -0,0 +1,95 @@
<template>
<div>
<a-table
:columns="columns"
:data-source="data"
bordered
:row-key="
(record, index) => {
return index;
}
"
>
<div slot="operation" slot-scope="record">
<a-button
size="small"
href="javascript:;"
class="operation-btn"
@click="openDetailDialog(record)"
>消息详情
</a-button>
</div>
</a-table>
<MessageDetail
:visible="showDetailDialog"
:record="record"
@closeDetailDialog="closeDetailDialog"
></MessageDetail>
</div>
</template>
<script>
import moment from "moment";
import MessageDetail from "@/views/message/MessageDetail";
export default {
name: "MessageList",
components: { MessageDetail },
props: {
data: {
type: Array,
},
},
data() {
return {
columns: columns,
showDetailDialog: false,
record: {},
};
},
methods: {
openDetailDialog(record) {
this.record = record;
this.showDetailDialog = true;
},
closeDetailDialog() {
this.showDetailDialog = false;
},
},
};
const columns = [
{
title: "topic",
dataIndex: "topic",
key: "topic",
width: 300,
},
{
title: "分区",
dataIndex: "partition",
key: "partition",
},
{
title: "偏移",
dataIndex: "offset",
key: "offset",
},
{
title: "时间",
dataIndex: "timestamp",
key: "timestamp",
slots: { title: "timestamp" },
scopedSlots: { customRender: "timestamp" },
customRender: (text) => {
return moment(text).format("YYYY-MM-DD HH:mm:ss:SSS");
},
},
{
title: "操作",
key: "operation",
scopedSlots: { customRender: "operation" },
width: 200,
},
];
</script>
<style scoped></style>

View File

@@ -0,0 +1,195 @@
<template>
<div class="tab-content">
<a-spin :spinning="loading">
<div id="search-offset-form-advanced-search">
<a-form
class="ant-advanced-search-form"
:form="form"
@submit="handleSearch"
>
<a-row :gutter="24">
<a-col :span="9">
<a-form-item label="topic">
<a-select
class="topic-select"
@change="handleTopicChange"
show-search
option-filter-prop="children"
v-decorator="[
'topic',
{
rules: [{ required: true, message: '请选择一个topic!' }],
},
]"
placeholder="请选择一个topic"
>
<a-select-option v-for="v in topicList" :key="v" :value="v">
{{ v }}
</a-select-option>
</a-select>
</a-form-item>
</a-col>
<a-col :span="6">
<a-form-item label="分区">
<a-select
class="type-select"
show-search
option-filter-prop="children"
v-model="selectPartition"
placeholder="请选择一个分区"
>
<a-select-option v-for="v in partitions" :key="v" :value="v">
<span v-if="v == -1">全部</span> <span v-else>{{ v }}</span>
</a-select-option>
</a-select>
</a-form-item>
</a-col>
<a-col :span="7">
<a-form-item label="偏移">
<a-input
v-decorator="[
'offset',
{
rules: [{ required: true, message: '请输入消息偏移!' }],
},
]"
placeholder="消息偏移"
/>
</a-form-item>
</a-col>
<a-col :span="2" :style="{ textAlign: 'right' }">
<a-form-item>
<a-button type="primary" html-type="submit"> 搜索</a-button>
</a-form-item>
</a-col>
</a-row>
</a-form>
</div>
<MessageList :data="data"></MessageList>
</a-spin>
</div>
</template>
<script>
import request from "@/utils/request";
import { KafkaMessageApi, KafkaTopicApi } from "@/utils/api";
import notification from "ant-design-vue/lib/notification";
import MessageList from "@/views/message/MessageList";
export default {
name: "SearchByOffset",
components: { MessageList },
props: {
topicList: {
type: Array,
},
},
data() {
return {
loading: false,
form: this.$form.createForm(this, { name: "message_search_offset" }),
partitions: [],
selectPartition: undefined,
rangeConfig: {
rules: [{ type: "array", required: true, message: "请选择时间!" }],
},
data: defaultData,
};
},
methods: {
handleSearch(e) {
e.preventDefault();
this.form.validateFields((err, values) => {
if (!err) {
const data = Object.assign({}, values, {
partition: this.selectPartition,
});
this.loading = true;
request({
url: KafkaMessageApi.searchByOffset.url,
method: KafkaMessageApi.searchByOffset.method,
data: data,
}).then((res) => {
this.loading = false;
if (res.code == 0) {
this.$message.success(res.msg);
this.data = res.data;
} else {
notification.error({
message: "error",
description: res.msg,
});
}
});
}
});
},
getPartitionInfo(topic) {
this.loading = true;
request({
url: KafkaTopicApi.getPartitionInfo.url + "?topic=" + topic,
method: KafkaTopicApi.getPartitionInfo.method,
}).then((res) => {
this.loading = false;
if (res.code != 0) {
notification.error({
message: "error",
description: res.msg,
});
} else {
this.partitions = res.data.map((v) => v.partition);
this.partitions.splice(0, 0, -1);
}
});
},
handleTopicChange(topic) {
this.selectPartition = -1;
this.getPartitionInfo(topic);
},
},
};
const defaultData = [];
</script>
<style scoped>
.tab-content {
width: 100%;
height: 100%;
}
.ant-advanced-search-form {
padding: 24px;
background: #fbfbfb;
border: 1px solid #d9d9d9;
border-radius: 6px;
}
.ant-advanced-search-form .ant-form-item {
display: flex;
}
.ant-advanced-search-form .ant-form-item-control-wrapper {
flex: 1;
}
#components-form-topic-advanced-search .ant-form {
max-width: none;
margin-bottom: 1%;
}
#search-offset-form-advanced-search .search-result-list {
margin-top: 16px;
border: 1px dashed #e9e9e9;
border-radius: 6px;
background-color: #fafafa;
min-height: 200px;
text-align: center;
padding-top: 80px;
}
.topic-select {
width: 400px !important;
}
.type-select {
width: 200px !important;
}
</style>

View File

@@ -0,0 +1,201 @@
<template>
<div class="tab-content">
<a-spin :spinning="loading">
<div id="search-time-form-advanced-search">
<a-form
class="ant-advanced-search-form"
:form="form"
@submit="handleSearch"
>
<a-row :gutter="24">
<a-col :span="9">
<a-form-item label="topic">
<a-select
class="topic-select"
@change="handleTopicChange"
show-search
option-filter-prop="children"
v-decorator="[
'topic',
{
rules: [{ required: true, message: '请选择一个topic!' }],
},
]"
placeholder="请选择一个topic"
>
<a-select-option v-for="v in topicList" :key="v" :value="v">
{{ v }}
</a-select-option>
</a-select>
</a-form-item>
</a-col>
<a-col :span="5">
<a-form-item label="分区">
<a-select
class="type-select"
show-search
option-filter-prop="children"
v-model="selectPartition"
placeholder="请选择一个分区"
>
<a-select-option v-for="v in partitions" :key="v" :value="v">
<span v-if="v == -1">全部</span> <span v-else>{{ v }}</span>
</a-select-option>
</a-select>
</a-form-item>
</a-col>
<a-col :span="8">
<a-form-item label="时间">
<a-range-picker
v-decorator="['time', rangeConfig]"
show-time
format="YYYY-MM-DD HH:mm:ss"
/>
</a-form-item>
</a-col>
<a-col :span="2" :style="{ textAlign: 'right' }">
<a-form-item>
<a-button type="primary" html-type="submit"> 搜索</a-button>
</a-form-item>
</a-col>
</a-row>
</a-form>
</div>
<p style="margin-top: 1%">
<strong
>检索条数:{{ data.realNum }},允许返回的最大条数:{{
data.maxNum
}}</strong
>
</p>
<MessageList :data="data.data"></MessageList>
</a-spin>
</div>
</template>
<script>
import request from "@/utils/request";
import { KafkaMessageApi, KafkaTopicApi } from "@/utils/api";
import notification from "ant-design-vue/lib/notification";
import MessageList from "@/views/message/MessageList";
export default {
name: "SearchByTime",
components: { MessageList },
props: {
topicList: {
type: Array,
},
},
data() {
return {
loading: false,
form: this.$form.createForm(this, { name: "message_search_time" }),
partitions: [],
selectPartition: undefined,
rangeConfig: {
rules: [{ type: "array", required: true, message: "请选择时间!" }],
},
data: defaultData,
};
},
methods: {
handleSearch(e) {
e.preventDefault();
this.form.validateFields((err, values) => {
if (!err) {
const data = Object.assign({}, values, {
partition: this.selectPartition,
});
data.startTime = values.time[0].valueOf();
data.endTime = values.time[1];
this.loading = true;
request({
url: KafkaMessageApi.searchByTime.url,
method: KafkaMessageApi.searchByTime.method,
data: data,
}).then((res) => {
this.loading = false;
if (res.code == 0) {
this.$message.success(res.msg);
this.data = res.data;
} else {
notification.error({
message: "error",
description: res.msg,
});
}
});
}
});
},
getPartitionInfo(topic) {
this.loading = true;
request({
url: KafkaTopicApi.getPartitionInfo.url + "?topic=" + topic,
method: KafkaTopicApi.getPartitionInfo.method,
}).then((res) => {
this.loading = false;
if (res.code != 0) {
notification.error({
message: "error",
description: res.msg,
});
} else {
this.partitions = res.data.map((v) => v.partition);
this.partitions.splice(0, 0, -1);
}
});
},
handleTopicChange(topic) {
this.selectPartition = -1;
this.getPartitionInfo(topic);
},
},
};
const defaultData = { realNum: 0, maxNum: 0 };
</script>
<style scoped>
.tab-content {
width: 100%;
height: 100%;
}
.ant-advanced-search-form {
padding: 24px;
background: #fbfbfb;
border: 1px solid #d9d9d9;
border-radius: 6px;
}
.ant-advanced-search-form .ant-form-item {
display: flex;
}
.ant-advanced-search-form .ant-form-item-control-wrapper {
flex: 1;
}
#components-form-topic-advanced-search .ant-form {
max-width: none;
margin-bottom: 1%;
}
#search-time-form-advanced-search .search-result-list {
margin-top: 16px;
border: 1px dashed #e9e9e9;
border-radius: 6px;
background-color: #fafafa;
min-height: 200px;
text-align: center;
padding-top: 80px;
}
.topic-select {
width: 400px !important;
}
.type-select {
width: 150px !important;
}
</style>

View File

@@ -0,0 +1,173 @@
<template>
<div class="content">
<a-spin :spinning="loading">
<a-form :form="form" @submit="handleSubmit">
<a-form-item label="Topic">
<a-select
class="topic-select"
@change="handleTopicChange"
show-search
option-filter-prop="children"
v-decorator="[
'topic',
{
rules: [{ required: true, message: '请选择一个topic!' }],
},
]"
placeholder="请选择一个topic"
>
<a-select-option v-for="v in topicList" :key="v" :value="v">
{{ v }}
</a-select-option>
</a-select>
</a-form-item>
<a-form-item label="分区">
<a-select
class="type-select"
show-search
option-filter-prop="children"
v-model="selectPartition"
placeholder="请选择一个分区"
>
<a-select-option v-for="v in partitions" :key="v" :value="v">
<span v-if="v == -1">默认</span> <span v-else>{{ v }}</span>
</a-select-option>
</a-select>
</a-form-item>
<a-form-item label="消息Key">
<a-input v-decorator="['key', { initialValue: 'key' }]" />
</a-form-item>
<a-form-item label="消息体" has-feedback>
<a-textarea
v-decorator="[
'body',
{
rules: [
{
required: true,
message: '输入消息体!',
},
],
},
]"
placeholder="输入消息体!"
/>
</a-form-item>
<a-form-item label="发送的消息数">
<a-input-number
v-decorator="[
'num',
{
initialValue: 1,
rules: [
{
required: true,
message: '输入消息数!',
},
],
},
]"
:min="1"
:max="32"
/>
</a-form-item>
<a-form-item :wrapper-col="{ span: 12, offset: 5 }">
<a-button type="primary" html-type="submit"> 提交 </a-button>
</a-form-item>
</a-form>
</a-spin>
</div>
</template>
<script>
import request from "@/utils/request";
import { KafkaTopicApi, KafkaMessageApi } from "@/utils/api";
import notification from "ant-design-vue/lib/notification";
export default {
name: "SendMessage",
components: {},
props: {
topicList: {
type: Array,
},
},
data() {
return {
form: this.$form.createForm(this, { name: "message_send" }),
loading: false,
partitions: [],
selectPartition: undefined,
};
},
methods: {
getTopicNameList() {
request({
url: KafkaTopicApi.getTopicNameList.url,
method: KafkaTopicApi.getTopicNameList.method,
}).then((res) => {
if (res.code == 0) {
this.topicList = res.data;
} else {
notification.error({
message: "error",
description: res.msg,
});
}
});
},
getPartitionInfo(topic) {
this.loading = true;
request({
url: KafkaTopicApi.getPartitionInfo.url + "?topic=" + topic,
method: KafkaTopicApi.getPartitionInfo.method,
}).then((res) => {
this.loading = false;
if (res.code != 0) {
notification.error({
message: "error",
description: res.msg,
});
} else {
this.partitions = res.data.map((v) => v.partition);
this.partitions.splice(0, 0, -1);
}
});
},
handleTopicChange(topic) {
this.selectPartition = -1;
this.getPartitionInfo(topic);
},
handleSubmit(e) {
e.preventDefault();
this.form.validateFields((err, values) => {
if (!err) {
const param = Object.assign({}, values, {
partition: this.selectPartition,
});
this.loading = true;
request({
url: KafkaMessageApi.send.url,
method: KafkaMessageApi.send.method,
data: param,
}).then((res) => {
this.loading = false;
if (res.code == 0) {
this.$message.success(res.msg);
} else {
notification.error({
message: "error",
description: res.msg,
});
}
});
}
});
},
},
created() {
this.getTopicNameList();
},
};
</script>
<style scoped></style>

View File

@@ -49,7 +49,15 @@
</a-button>
</a-popconfirm>
</div>
<p slot="expandedRowRender" slot-scope="record" style="margin: 0">
有效消息的时间范围<span class="red-font">{{
formatTime(record.beginTime)
}}</span>
~
<span class="red-font">{{ formatTime(record.endTime) }}</span>
</p>
</a-table>
<p>友情提示点击+号展开可以查看当前分区的有效消息的时间范围</p>
</a-spin>
</div>
</a-modal>
@@ -59,6 +67,7 @@
import request from "@/utils/request";
import { KafkaOpApi, KafkaTopicApi } from "@/utils/api";
import notification from "ant-design-vue/es/notification";
import moment from "moment";
export default {
name: "PartitionInfo",
props: {
@@ -128,6 +137,11 @@ export default {
}
});
},
formatTime(timestamp) {
return timestamp != -1
? moment(timestamp).format("YYYY-MM-DD HH:mm:ss:SSS")
: timestamp;
},
},
};
@@ -169,6 +183,26 @@ const columns = [
dataIndex: "diff",
key: "diff",
},
// {
// title: "有效消息起始时间",
// dataIndex: "beginTime",
// key: "beginTime",
// slots: { title: "beginTime" },
// scopedSlots: { customRender: "internal" },
// customRender: (text) => {
// return text != -1 ? moment(text).format("YYYY-MM-DD HH:mm:ss:SSS") : text;
// },
// },
// {
// title: "有效消息结束时间",
// dataIndex: "endTime",
// key: "endTime",
// slots: { title: "endTime" },
// scopedSlots: { customRender: "internal" },
// customRender: (text) => {
// return text != -1 ? moment(text).format("YYYY-MM-DD HH:mm:ss:SSS") : text;
// },
// },
{
title: "操作",
key: "operation",
@@ -177,4 +211,11 @@ const columns = [
];
</script>
<style scoped></style>
<style scoped>
.red-font {
color: red;
}
.green-font {
color: green;
}
</style>