diff --git a/README.md b/README.md index e1f18c6..af03827 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,6 @@ # kafka可视化管理平台 一款轻量级的kafka可视化管理平台,安装配置快捷、简单易用。 -为了开发的省事,没有多语言支持,只支持中文展示。 +为了开发的省事,没有国际化支持,只支持中文展示。 用过rocketmq-console吧,对,前端展示风格跟那个有点类似。 ## 安装包下载 以下两种方式2选一,直接下载安装包或下载源码,手动打包 diff --git a/document/功能特性.png b/document/功能特性.png index ff12bec..ba5a806 100644 Binary files a/document/功能特性.png and b/document/功能特性.png differ diff --git a/pom.xml b/pom.xml index cb57ca4..d0c13a8 100644 --- a/pom.xml +++ b/pom.xml @@ -10,7 +10,7 @@ com.xuxd kafka-console-ui - 1.0.1 + 1.0.2 kafka-console-ui Kafka console manage ui diff --git a/src/main/java/com/xuxd/kafka/console/beans/vo/TopicPartitionVO.java b/src/main/java/com/xuxd/kafka/console/beans/vo/TopicPartitionVO.java index c6e8300..2dfa548 100644 --- a/src/main/java/com/xuxd/kafka/console/beans/vo/TopicPartitionVO.java +++ b/src/main/java/com/xuxd/kafka/console/beans/vo/TopicPartitionVO.java @@ -29,6 +29,10 @@ public class TopicPartitionVO { private long diff; + private long beginTime; + + private long endTime; + public static TopicPartitionVO from(TopicPartitionInfo partitionInfo) { TopicPartitionVO partitionVO = new TopicPartitionVO(); partitionVO.setPartition(partitionInfo.partition()); diff --git a/src/main/java/com/xuxd/kafka/console/service/impl/TopicServiceImpl.java b/src/main/java/com/xuxd/kafka/console/service/impl/TopicServiceImpl.java index f74b97d..ccf5e4c 100644 --- a/src/main/java/com/xuxd/kafka/console/service/impl/TopicServiceImpl.java +++ b/src/main/java/com/xuxd/kafka/console/service/impl/TopicServiceImpl.java @@ -19,12 +19,14 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; +import kafka.console.MessageConsole; import kafka.console.TopicConsole; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.apache.kafka.clients.admin.NewPartitions; import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.clients.admin.TopicDescription; +import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.TopicPartitionInfo; import org.springframework.beans.factory.annotation.Autowired; @@ -44,6 +46,9 @@ public class TopicServiceImpl implements TopicService { @Autowired private TopicConsole topicConsole; + @Autowired + private MessageConsole messageConsole; + private Gson gson = GsonUtil.INSTANCE.get(); @Override public ResponseData getTopicNameList(boolean internal) { @@ -106,6 +111,10 @@ public class TopicServiceImpl implements TopicService { mapTuple2._2().forEach((k, v) -> { endTable.put(k.partition(), (Long) v); }); + // computer the valid time range. + Map beginOffsetTable = new HashMap<>(); + Map endOffsetTable = new HashMap<>(); + Map partitionCache = new HashMap<>(); for (TopicPartitionVO partitionVO : voList) { long begin = beginTable.get(partitionVO.getPartition()); @@ -113,7 +122,29 @@ public class TopicServiceImpl implements TopicService { partitionVO.setBeginOffset(begin); partitionVO.setEndOffset(end); partitionVO.setDiff(end - begin); + + if (begin != end) { + TopicPartition partition = new TopicPartition(topic, partitionVO.getPartition()); + partitionCache.put(partitionVO.getPartition(), partition); + beginOffsetTable.put(partition, begin); + endOffsetTable.put(partition, end - 1); // end must < endOff + } else { + partitionVO.setBeginTime(-1L); + partitionVO.setEndTime(-1L); + } } + + Map> beginRecordMap = messageConsole.searchBy(beginOffsetTable); + Map> endRecordMap = messageConsole.searchBy(endOffsetTable); + + for (TopicPartitionVO partitionVO : voList) { + if (partitionVO.getBeginTime() != -1L) { + TopicPartition partition = partitionCache.get(partitionVO.getPartition()); + partitionVO.setBeginTime(beginRecordMap.containsKey(partition) ? beginRecordMap.get(partition).timestamp() : -1L); + partitionVO.setEndTime(endRecordMap.containsKey(partition) ? endRecordMap.get(partition).timestamp() : -1L); + } + } + return ResponseData.create().data(voList).success(); } diff --git a/ui/src/views/topic/PartitionInfo.vue b/ui/src/views/topic/PartitionInfo.vue index 3028c1f..bc28d86 100644 --- a/ui/src/views/topic/PartitionInfo.vue +++ b/ui/src/views/topic/PartitionInfo.vue @@ -49,7 +49,15 @@ +

+ 有效消息的时间范围:{{ + formatTime(record.beginTime) + }} + ~ + {{ formatTime(record.endTime) }} +

+

友情提示:点击+号展开,可以查看当前分区的有效消息的时间范围

@@ -59,6 +67,7 @@ import request from "@/utils/request"; import { KafkaOpApi, KafkaTopicApi } from "@/utils/api"; import notification from "ant-design-vue/es/notification"; +import moment from "moment"; export default { name: "PartitionInfo", props: { @@ -128,6 +137,11 @@ export default { } }); }, + formatTime(timestamp) { + return timestamp != -1 + ? moment(timestamp).format("YYYY-MM-DD HH:mm:ss:SSS") + : timestamp; + }, }, }; @@ -169,6 +183,26 @@ const columns = [ dataIndex: "diff", key: "diff", }, + // { + // title: "有效消息起始时间", + // dataIndex: "beginTime", + // key: "beginTime", + // slots: { title: "beginTime" }, + // scopedSlots: { customRender: "internal" }, + // customRender: (text) => { + // return text != -1 ? moment(text).format("YYYY-MM-DD HH:mm:ss:SSS") : text; + // }, + // }, + // { + // title: "有效消息结束时间", + // dataIndex: "endTime", + // key: "endTime", + // slots: { title: "endTime" }, + // scopedSlots: { customRender: "internal" }, + // customRender: (text) => { + // return text != -1 ? moment(text).format("YYYY-MM-DD HH:mm:ss:SSS") : text; + // }, + // }, { title: "操作", key: "operation", @@ -177,4 +211,8 @@ const columns = [ ]; - +