diff --git a/src/main/java/com/xuxd/kafka/console/beans/vo/TopicPartitionVO.java b/src/main/java/com/xuxd/kafka/console/beans/vo/TopicPartitionVO.java index 9dcec1c..a8a3f91 100644 --- a/src/main/java/com/xuxd/kafka/console/beans/vo/TopicPartitionVO.java +++ b/src/main/java/com/xuxd/kafka/console/beans/vo/TopicPartitionVO.java @@ -23,6 +23,12 @@ public class TopicPartitionVO { private List isr; + private long beginOffset; + + private long endOffset; + + private long diff; + public static TopicPartitionVO from(TopicPartitionInfo partitionInfo) { TopicPartitionVO partitionVO = new TopicPartitionVO(); partitionVO.setPartition(partitionInfo.partition()); diff --git a/src/main/java/com/xuxd/kafka/console/service/impl/TopicServiceImpl.java b/src/main/java/com/xuxd/kafka/console/service/impl/TopicServiceImpl.java index b9037fa..3e32759 100644 --- a/src/main/java/com/xuxd/kafka/console/service/impl/TopicServiceImpl.java +++ b/src/main/java/com/xuxd/kafka/console/service/impl/TopicServiceImpl.java @@ -7,13 +7,18 @@ import com.xuxd.kafka.console.beans.vo.TopicPartitionVO; import com.xuxd.kafka.console.service.TopicService; import java.util.Collections; import java.util.Comparator; +import java.util.HashMap; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Set; +import java.util.stream.Collectors; import kafka.console.TopicConsole; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.apache.kafka.clients.admin.TopicDescription; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.TopicPartitionInfo; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import scala.Tuple2; @@ -78,7 +83,27 @@ public class TopicServiceImpl implements TopicService { return ResponseData.create().success(); } TopicDescription topicDescription = list.get(0); + List topicPartitionInfos = topicDescription.partitions(); + List voList = topicPartitionInfos.stream().map(TopicPartitionVO::from).collect(Collectors.toList()); + List partitions = topicPartitionInfos.stream().map(p -> new TopicPartition(topic, p.partition())).collect(Collectors.toList()); - return ResponseData.create().data(topicDescription.partitions().stream().map(p -> TopicPartitionVO.from(p))).success(); + Tuple2, Map> mapTuple2 = topicConsole.getTopicOffset(topic, partitions); + Map beginTable = new HashMap<>(), endTable = new HashMap<>(); + + mapTuple2._1().forEach((k, v) -> { + beginTable.put(k.partition(), (Long) v); + }); + mapTuple2._2().forEach((k, v) -> { + endTable.put(k.partition(), (Long) v); + }); + + for (TopicPartitionVO partitionVO : voList) { + long begin = beginTable.get(partitionVO.getPartition()); + long end = endTable.get(partitionVO.getPartition()); + partitionVO.setBeginOffset(begin); + partitionVO.setEndOffset(end); + partitionVO.setDiff(end - begin); + } + return ResponseData.create().data(voList).success(); } } diff --git a/src/main/scala/kafka/console/KafkaConsole.scala b/src/main/scala/kafka/console/KafkaConsole.scala index 5ffea38..cb4e9b2 100644 --- a/src/main/scala/kafka/console/KafkaConsole.scala +++ b/src/main/scala/kafka/console/KafkaConsole.scala @@ -6,7 +6,9 @@ import com.xuxd.kafka.console.config.KafkaConfig import kafka.zk.{AdminZkClient, KafkaZkClient} import org.apache.kafka.clients.CommonClientConfigs import org.apache.kafka.clients.admin.{Admin, AdminClientConfig} +import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer} import org.apache.kafka.common.config.SaslConfigs +import org.apache.kafka.common.serialization.ByteArrayDeserializer import org.apache.kafka.common.utils.Time /** @@ -37,6 +39,22 @@ class KafkaConsole(config: KafkaConfig) { } } + protected def withConsumerAndCatchError(f: KafkaConsumer[Array[Byte], Array[Byte]] => Any, eh: Exception => Any, + extra: Properties = new Properties()): Any = { + val props = getProps() + props.putAll(extra) + props.put(ConsumerConfig.CLIENT_ID_CONFIG, String.valueOf(System.currentTimeMillis())) + val consumer = new KafkaConsumer(props, new ByteArrayDeserializer, new ByteArrayDeserializer) + try { + f(consumer) + } catch { + case er: Exception => eh(er) + } + finally { + consumer.close() + } + } + protected def withZKClient(f: AdminZkClient => Any): Any = { val zkClient = KafkaZkClient(config.getZookeeperAddr, false, 30000, 30000, Int.MaxValue, Time.SYSTEM) val adminZkClient = new AdminZkClient(zkClient) @@ -48,6 +66,10 @@ class KafkaConsole(config: KafkaConfig) { } private def createAdminClient(): Admin = { + Admin.create(getProps()) + } + + private def getProps(): Properties = { val props: Properties = new Properties(); props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, config.getBootstrapServer) props.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, config.getRequestTimeoutMs()) @@ -56,7 +78,6 @@ class KafkaConsole(config: KafkaConfig) { props.put(SaslConfigs.SASL_MECHANISM, config.getSaslMechanism()) props.put(SaslConfigs.SASL_JAAS_CONFIG, config.getSaslJaasConfig()) } - - Admin.create(props) + props } } diff --git a/src/main/scala/kafka/console/TopicConsole.scala b/src/main/scala/kafka/console/TopicConsole.scala index 19f433b..b731db6 100644 --- a/src/main/scala/kafka/console/TopicConsole.scala +++ b/src/main/scala/kafka/console/TopicConsole.scala @@ -6,6 +6,7 @@ import java.util.{Collections, List, Set} import com.xuxd.kafka.console.config.KafkaConfig import org.apache.kafka.clients.admin.{DeleteTopicsOptions, ListTopicsOptions, TopicDescription} +import org.apache.kafka.common.TopicPartition import scala.jdk.CollectionConverters.{CollectionHasAsScala, SetHasAsJava} @@ -72,4 +73,24 @@ class TopicConsole(config: KafkaConfig) extends KafkaConsole(config: KafkaConfig (false, e.getMessage) }).asInstanceOf[(Boolean, String)] } + + /** + * get topic begin offset and end offset. + * + * @param topic topic name. + * @param partitions topic partition info list. + * @return partition -> begin offset and end offset. + */ + def getTopicOffset(topic: String, + partitions: List[TopicPartition]): (util.Map[TopicPartition, Long], util.Map[TopicPartition, Long]) = { + + withConsumerAndCatchError(consumer => { + val beginOffsets = consumer.beginningOffsets(partitions) + val endOffsets = consumer.endOffsets(partitions) + (beginOffsets, endOffsets) + }, e => { + log.error("getTopicOffset error, topic: " + topic, e) + (Collections.emptyMap(), Collections.emptyMap()) + }).asInstanceOf[(util.Map[TopicPartition, Long], util.Map[TopicPartition, Long])] + } } diff --git a/ui/src/views/topic/PartitionInfo.vue b/ui/src/views/topic/PartitionInfo.vue index 2c0e31c..7c3c0fa 100644 --- a/ui/src/views/topic/PartitionInfo.vue +++ b/ui/src/views/topic/PartitionInfo.vue @@ -114,6 +114,21 @@ const columns = [ key: "isr", scopedSlots: { customRender: "isr" }, }, + { + title: "最小位点", + dataIndex: "beginOffset", + key: "beginOffset", + }, + { + title: "最大位点", + dataIndex: "endOffset", + key: "endOffset", + }, + { + title: "消息总数", + dataIndex: "diff", + key: "diff", + }, ];