From 54cd40281029485ea0cd2780709451eedf7d2ac4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=AE=B8=E6=99=93=E4=B8=9C?= <763795151@qq.com> Date: Sun, 12 Dec 2021 18:53:29 +0800 Subject: [PATCH] =?UTF-8?q?=E6=9F=A5=E8=AF=A2=E6=B6=88=E6=81=AF=E8=AF=A6?= =?UTF-8?q?=E6=83=85=E4=BF=A1=E6=81=AF?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../kafka/console/beans/QueryMessage.java | 4 + .../console/beans/dto/QueryMessageDTO.java | 7 + .../console/beans/vo/MessageDetailVO.java | 38 ++++ .../console/controller/MessageController.java | 11 + .../kafka/console/service/MessageService.java | 4 + .../service/impl/MessageServiceImpl.java | 76 ++++++- ui/src/utils/api.js | 8 + ui/src/views/message/MessageDetail.vue | 204 ++++++++++++++++++ ui/src/views/message/MessageList.vue | 33 ++- ui/src/views/message/SearchByOffset.vue | 2 +- 10 files changed, 380 insertions(+), 7 deletions(-) create mode 100644 src/main/java/com/xuxd/kafka/console/beans/vo/MessageDetailVO.java create mode 100644 ui/src/views/message/MessageDetail.vue diff --git a/src/main/java/com/xuxd/kafka/console/beans/QueryMessage.java b/src/main/java/com/xuxd/kafka/console/beans/QueryMessage.java index 49aff5b..f620e82 100644 --- a/src/main/java/com/xuxd/kafka/console/beans/QueryMessage.java +++ b/src/main/java/com/xuxd/kafka/console/beans/QueryMessage.java @@ -20,4 +20,8 @@ public class QueryMessage { private long endTime; private long offset; + + private String keyDeserializer; + + private String valueDeserializer; } diff --git a/src/main/java/com/xuxd/kafka/console/beans/dto/QueryMessageDTO.java b/src/main/java/com/xuxd/kafka/console/beans/dto/QueryMessageDTO.java index 3c83df4..965f8a5 100644 --- a/src/main/java/com/xuxd/kafka/console/beans/dto/QueryMessageDTO.java +++ b/src/main/java/com/xuxd/kafka/console/beans/dto/QueryMessageDTO.java @@ -23,6 +23,10 @@ public class QueryMessageDTO { private Long offset; + private String keyDeserializer; + + private String valueDeserializer; + public QueryMessage toQueryMessage() { QueryMessage queryMessage = new QueryMessage(); queryMessage.setTopic(topic); @@ -38,6 +42,9 @@ public class QueryMessageDTO { queryMessage.setOffset(offset); } + queryMessage.setKeyDeserializer(keyDeserializer); + queryMessage.setValueDeserializer(valueDeserializer); + return queryMessage; } } diff --git a/src/main/java/com/xuxd/kafka/console/beans/vo/MessageDetailVO.java b/src/main/java/com/xuxd/kafka/console/beans/vo/MessageDetailVO.java new file mode 100644 index 0000000..f97fb67 --- /dev/null +++ b/src/main/java/com/xuxd/kafka/console/beans/vo/MessageDetailVO.java @@ -0,0 +1,38 @@ +package com.xuxd.kafka.console.beans.vo; + +import java.util.ArrayList; +import java.util.List; +import lombok.Data; + +/** + * kafka-console-ui. + * + * @author xuxd + * @date 2021-12-12 12:45:23 + **/ +@Data +public class MessageDetailVO { + + private String topic; + + private int partition; + + private long offset; + + private long timestamp; + + private String timestampType; + + private List headers = new ArrayList<>(); + + private Object key; + + private Object value; + + @Data + public static class HeaderVO { + String key; + + String value; + } +} diff --git a/src/main/java/com/xuxd/kafka/console/controller/MessageController.java b/src/main/java/com/xuxd/kafka/console/controller/MessageController.java index 68ad12f..9fae9c6 100644 --- a/src/main/java/com/xuxd/kafka/console/controller/MessageController.java +++ b/src/main/java/com/xuxd/kafka/console/controller/MessageController.java @@ -3,6 +3,7 @@ package com.xuxd.kafka.console.controller; import com.xuxd.kafka.console.beans.dto.QueryMessageDTO; import com.xuxd.kafka.console.service.MessageService; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RequestMapping; @@ -30,4 +31,14 @@ public class MessageController { public Object searchByOffset(@RequestBody QueryMessageDTO dto) { return messageService.searchByOffset(dto.toQueryMessage()); } + + @PostMapping("/search/detail") + public Object searchDetail(@RequestBody QueryMessageDTO dto) { + return messageService.searchDetail(dto.toQueryMessage()); + } + + @GetMapping("/deserializer/list") + public Object deserializerList() { + return messageService.deserializerList(); + } } diff --git a/src/main/java/com/xuxd/kafka/console/service/MessageService.java b/src/main/java/com/xuxd/kafka/console/service/MessageService.java index 4c54de2..e5eebfc 100644 --- a/src/main/java/com/xuxd/kafka/console/service/MessageService.java +++ b/src/main/java/com/xuxd/kafka/console/service/MessageService.java @@ -14,4 +14,8 @@ public interface MessageService { ResponseData searchByTime(QueryMessage queryMessage); ResponseData searchByOffset(QueryMessage queryMessage); + + ResponseData searchDetail(QueryMessage queryMessage); + + ResponseData deserializerList(); } diff --git a/src/main/java/com/xuxd/kafka/console/service/impl/MessageServiceImpl.java b/src/main/java/com/xuxd/kafka/console/service/impl/MessageServiceImpl.java index 830fe6d..c229c2f 100644 --- a/src/main/java/com/xuxd/kafka/console/service/impl/MessageServiceImpl.java +++ b/src/main/java/com/xuxd/kafka/console/service/impl/MessageServiceImpl.java @@ -3,6 +3,7 @@ package com.xuxd.kafka.console.service.impl; import com.xuxd.kafka.console.beans.QueryMessage; import com.xuxd.kafka.console.beans.ResponseData; import com.xuxd.kafka.console.beans.vo.ConsumerRecordVO; +import com.xuxd.kafka.console.beans.vo.MessageDetailVO; import com.xuxd.kafka.console.service.MessageService; import java.util.Collections; import java.util.HashMap; @@ -14,9 +15,15 @@ import java.util.stream.Collectors; import kafka.console.MessageConsole; import kafka.console.TopicConsole; import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.lang3.StringUtils; import org.apache.kafka.clients.admin.TopicDescription; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.DoubleDeserializer; +import org.apache.kafka.common.serialization.FloatDeserializer; +import org.apache.kafka.common.serialization.IntegerDeserializer; +import org.apache.kafka.common.serialization.StringDeserializer; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; @@ -35,6 +42,17 @@ public class MessageServiceImpl implements MessageService { @Autowired private TopicConsole topicConsole; + private Map deserializerDict = new HashMap<>(); + + { + deserializerDict.put("Integer", new IntegerDeserializer()); + deserializerDict.put("String", new StringDeserializer()); + deserializerDict.put("Float", new FloatDeserializer()); + deserializerDict.put("Double", new DoubleDeserializer()); + } + + public static String defaultDeserializer = "String"; + @Override public ResponseData searchByTime(QueryMessage queryMessage) { int maxNums = 10000; @@ -50,14 +68,68 @@ public class MessageServiceImpl implements MessageService { } @Override public ResponseData searchByOffset(QueryMessage queryMessage) { + Map> recordMap = searchRecordByOffset(queryMessage); + + return ResponseData.create().data(recordMap.values().stream().map(ConsumerRecordVO::fromConsumerRecord).collect(Collectors.toList())).success(); + } + + @Override public ResponseData searchDetail(QueryMessage queryMessage) { + if (queryMessage.getPartition() == -1) { + throw new IllegalArgumentException(); + } + if (StringUtils.isBlank(queryMessage.getKeyDeserializer())) { + queryMessage.setKeyDeserializer(defaultDeserializer); + } + + if (StringUtils.isBlank(queryMessage.getValueDeserializer())) { + queryMessage.setValueDeserializer(defaultDeserializer); + } + + Map> recordMap = searchRecordByOffset(queryMessage); + ConsumerRecord record = recordMap.get(new TopicPartition(queryMessage.getTopic(), queryMessage.getPartition())); + if (record != null) { + MessageDetailVO vo = new MessageDetailVO(); + vo.setTopic(record.topic()); + vo.setPartition(record.partition()); + vo.setOffset(record.offset()); + vo.setTimestamp(record.timestamp()); + vo.setTimestampType(record.timestampType().name()); + try { + vo.setKey(deserializerDict.get(queryMessage.getKeyDeserializer()).deserialize(queryMessage.getTopic(), record.key())); + } catch (Exception e) { + vo.setKey("KeyDeserializer Error: " + e.getMessage()); + } + try { + vo.setValue(deserializerDict.get(queryMessage.getValueDeserializer()).deserialize(queryMessage.getTopic(), record.value())); + } catch (Exception e) { + vo.setValue("ValueDeserializer Error: " + e.getMessage()); + } + + record.headers().forEach(header -> { + MessageDetailVO.HeaderVO headerVO = new MessageDetailVO.HeaderVO(); + headerVO.setKey(header.key()); + headerVO.setValue(new String(header.value())); + vo.getHeaders().add(headerVO); + }); + + return ResponseData.create().data(vo).success(); + } + return ResponseData.create().failed("Not found message detail."); + } + + @Override public ResponseData deserializerList() { + return ResponseData.create().data(deserializerDict.keySet()).success(); + } + + private Map> searchRecordByOffset(QueryMessage queryMessage) { Set partitions = getPartitions(queryMessage); + Map offsetTable = new HashMap<>(); partitions.forEach(tp -> { offsetTable.put(tp, queryMessage.getOffset()); }); Map> recordMap = messageConsole.searchBy(offsetTable); - - return ResponseData.create().data(recordMap.values().stream().map(ConsumerRecordVO::fromConsumerRecord).collect(Collectors.toList())).success(); + return recordMap; } private Set getPartitions(QueryMessage queryMessage) { diff --git a/ui/src/utils/api.js b/ui/src/utils/api.js index 4d5d586..c9fae3e 100644 --- a/ui/src/utils/api.js +++ b/ui/src/utils/api.js @@ -232,4 +232,12 @@ export const KafkaMessageApi = { url: "/message/search/offset", method: "post", }, + searchDetail: { + url: "/message/search/detail", + method: "post", + }, + deserializerList: { + url: "/message/deserializer/list", + method: "get", + }, }; diff --git a/ui/src/views/message/MessageDetail.vue b/ui/src/views/message/MessageDetail.vue new file mode 100644 index 0000000..cbf82a8 --- /dev/null +++ b/ui/src/views/message/MessageDetail.vue @@ -0,0 +1,204 @@ + + + + + diff --git a/ui/src/views/message/MessageList.vue b/ui/src/views/message/MessageList.vue index d6b5835..36f5080 100644 --- a/ui/src/views/message/MessageList.vue +++ b/ui/src/views/message/MessageList.vue @@ -4,22 +4,36 @@ :columns="columns" :data-source="data" bordered - row-key="(record,index)=>{return index}" + :row-key=" + (record, index) => { + return index; + } + " > -
- + 消息详情
+