分区信息里展示当前分区的有效时间范围

This commit is contained in:
许晓东
2021-12-20 20:29:34 +08:00
parent 0ec3bac6c2
commit 98f33bb2cc
6 changed files with 76 additions and 3 deletions

View File

@@ -29,6 +29,10 @@ public class TopicPartitionVO {
private long diff;
private long beginTime;
private long endTime;
public static TopicPartitionVO from(TopicPartitionInfo partitionInfo) {
TopicPartitionVO partitionVO = new TopicPartitionVO();
partitionVO.setPartition(partitionInfo.partition());

View File

@@ -19,12 +19,14 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import kafka.console.MessageConsole;
import kafka.console.TopicConsole;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.admin.NewPartitions;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.TopicPartitionInfo;
import org.springframework.beans.factory.annotation.Autowired;
@@ -44,6 +46,9 @@ public class TopicServiceImpl implements TopicService {
@Autowired
private TopicConsole topicConsole;
@Autowired
private MessageConsole messageConsole;
private Gson gson = GsonUtil.INSTANCE.get();
@Override public ResponseData getTopicNameList(boolean internal) {
@@ -106,6 +111,10 @@ public class TopicServiceImpl implements TopicService {
mapTuple2._2().forEach((k, v) -> {
endTable.put(k.partition(), (Long) v);
});
// computer the valid time range.
Map<TopicPartition, Object> beginOffsetTable = new HashMap<>();
Map<TopicPartition, Object> endOffsetTable = new HashMap<>();
Map<Integer, TopicPartition> partitionCache = new HashMap<>();
for (TopicPartitionVO partitionVO : voList) {
long begin = beginTable.get(partitionVO.getPartition());
@@ -113,7 +122,29 @@ public class TopicServiceImpl implements TopicService {
partitionVO.setBeginOffset(begin);
partitionVO.setEndOffset(end);
partitionVO.setDiff(end - begin);
if (begin != end) {
TopicPartition partition = new TopicPartition(topic, partitionVO.getPartition());
partitionCache.put(partitionVO.getPartition(), partition);
beginOffsetTable.put(partition, begin);
endOffsetTable.put(partition, end - 1); // end must < endOff
} else {
partitionVO.setBeginTime(-1L);
partitionVO.setEndTime(-1L);
}
}
Map<TopicPartition, ConsumerRecord<byte[], byte[]>> beginRecordMap = messageConsole.searchBy(beginOffsetTable);
Map<TopicPartition, ConsumerRecord<byte[], byte[]>> endRecordMap = messageConsole.searchBy(endOffsetTable);
for (TopicPartitionVO partitionVO : voList) {
if (partitionVO.getBeginTime() != -1L) {
TopicPartition partition = partitionCache.get(partitionVO.getPartition());
partitionVO.setBeginTime(beginRecordMap.containsKey(partition) ? beginRecordMap.get(partition).timestamp() : -1L);
partitionVO.setEndTime(endRecordMap.containsKey(partition) ? endRecordMap.get(partition).timestamp() : -1L);
}
}
return ResponseData.create().data(voList).success();
}