查看topic分区消息偏移

This commit is contained in:
许晓东
2021-09-24 16:43:30 +08:00
parent e65eba9237
commit fd955d215c
5 changed files with 91 additions and 3 deletions

View File

@@ -23,6 +23,12 @@ public class TopicPartitionVO {
private List<String> isr;
private long beginOffset;
private long endOffset;
private long diff;
public static TopicPartitionVO from(TopicPartitionInfo partitionInfo) {
TopicPartitionVO partitionVO = new TopicPartitionVO();
partitionVO.setPartition(partitionInfo.partition());

View File

@@ -7,13 +7,18 @@ import com.xuxd.kafka.console.beans.vo.TopicPartitionVO;
import com.xuxd.kafka.console.service.TopicService;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import kafka.console.TopicConsole;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.TopicPartitionInfo;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import scala.Tuple2;
@@ -78,7 +83,27 @@ public class TopicServiceImpl implements TopicService {
return ResponseData.create().success();
}
TopicDescription topicDescription = list.get(0);
List<TopicPartitionInfo> topicPartitionInfos = topicDescription.partitions();
List<TopicPartitionVO> voList = topicPartitionInfos.stream().map(TopicPartitionVO::from).collect(Collectors.toList());
List<TopicPartition> partitions = topicPartitionInfos.stream().map(p -> new TopicPartition(topic, p.partition())).collect(Collectors.toList());
return ResponseData.create().data(topicDescription.partitions().stream().map(p -> TopicPartitionVO.from(p))).success();
Tuple2<Map<TopicPartition, Object>, Map<TopicPartition, Object>> mapTuple2 = topicConsole.getTopicOffset(topic, partitions);
Map<Integer, Long> beginTable = new HashMap<>(), endTable = new HashMap<>();
mapTuple2._1().forEach((k, v) -> {
beginTable.put(k.partition(), (Long) v);
});
mapTuple2._2().forEach((k, v) -> {
endTable.put(k.partition(), (Long) v);
});
for (TopicPartitionVO partitionVO : voList) {
long begin = beginTable.get(partitionVO.getPartition());
long end = endTable.get(partitionVO.getPartition());
partitionVO.setBeginOffset(begin);
partitionVO.setEndOffset(end);
partitionVO.setDiff(end - begin);
}
return ResponseData.create().data(voList).success();
}
}

View File

@@ -6,7 +6,9 @@ import com.xuxd.kafka.console.config.KafkaConfig
import kafka.zk.{AdminZkClient, KafkaZkClient}
import org.apache.kafka.clients.CommonClientConfigs
import org.apache.kafka.clients.admin.{Admin, AdminClientConfig}
import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}
import org.apache.kafka.common.config.SaslConfigs
import org.apache.kafka.common.serialization.ByteArrayDeserializer
import org.apache.kafka.common.utils.Time
/**
@@ -37,6 +39,22 @@ class KafkaConsole(config: KafkaConfig) {
}
}
protected def withConsumerAndCatchError(f: KafkaConsumer[Array[Byte], Array[Byte]] => Any, eh: Exception => Any,
extra: Properties = new Properties()): Any = {
val props = getProps()
props.putAll(extra)
props.put(ConsumerConfig.CLIENT_ID_CONFIG, String.valueOf(System.currentTimeMillis()))
val consumer = new KafkaConsumer(props, new ByteArrayDeserializer, new ByteArrayDeserializer)
try {
f(consumer)
} catch {
case er: Exception => eh(er)
}
finally {
consumer.close()
}
}
protected def withZKClient(f: AdminZkClient => Any): Any = {
val zkClient = KafkaZkClient(config.getZookeeperAddr, false, 30000, 30000, Int.MaxValue, Time.SYSTEM)
val adminZkClient = new AdminZkClient(zkClient)
@@ -48,6 +66,10 @@ class KafkaConsole(config: KafkaConfig) {
}
private def createAdminClient(): Admin = {
Admin.create(getProps())
}
private def getProps(): Properties = {
val props: Properties = new Properties();
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, config.getBootstrapServer)
props.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, config.getRequestTimeoutMs())
@@ -56,7 +78,6 @@ class KafkaConsole(config: KafkaConfig) {
props.put(SaslConfigs.SASL_MECHANISM, config.getSaslMechanism())
props.put(SaslConfigs.SASL_JAAS_CONFIG, config.getSaslJaasConfig())
}
Admin.create(props)
props
}
}

View File

@@ -6,6 +6,7 @@ import java.util.{Collections, List, Set}
import com.xuxd.kafka.console.config.KafkaConfig
import org.apache.kafka.clients.admin.{DeleteTopicsOptions, ListTopicsOptions, TopicDescription}
import org.apache.kafka.common.TopicPartition
import scala.jdk.CollectionConverters.{CollectionHasAsScala, SetHasAsJava}
@@ -72,4 +73,24 @@ class TopicConsole(config: KafkaConfig) extends KafkaConsole(config: KafkaConfig
(false, e.getMessage)
}).asInstanceOf[(Boolean, String)]
}
/**
* get topic begin offset and end offset.
*
* @param topic topic name.
* @param partitions topic partition info list.
* @return partition -> begin offset and end offset.
*/
def getTopicOffset(topic: String,
partitions: List[TopicPartition]): (util.Map[TopicPartition, Long], util.Map[TopicPartition, Long]) = {
withConsumerAndCatchError(consumer => {
val beginOffsets = consumer.beginningOffsets(partitions)
val endOffsets = consumer.endOffsets(partitions)
(beginOffsets, endOffsets)
}, e => {
log.error("getTopicOffset error, topic: " + topic, e)
(Collections.emptyMap(), Collections.emptyMap())
}).asInstanceOf[(util.Map[TopicPartition, Long], util.Map[TopicPartition, Long])]
}
}

View File

@@ -114,6 +114,21 @@ const columns = [
key: "isr",
scopedSlots: { customRender: "isr" },
},
{
title: "最小位点",
dataIndex: "beginOffset",
key: "beginOffset",
},
{
title: "最大位点",
dataIndex: "endOffset",
key: "endOffset",
},
{
title: "消息总数",
dataIndex: "diff",
key: "diff",
},
];
</script>