集群同步-》最小位移对齐
This commit is contained in:
@@ -92,4 +92,38 @@ class OperationConsole(config: KafkaConfig, topicConsole: TopicConsole,
|
||||
thatAdmin.close()
|
||||
}
|
||||
}
|
||||
|
||||
import java.util
|
||||
|
||||
/**
|
||||
* check partition consistency and fetch the min offset for the topic.
|
||||
*
|
||||
* @param groupId group id.
|
||||
* @param topic topic.
|
||||
* @param props other kafka cluster config.
|
||||
* @return _1: this min offset, _2: that min offset.
|
||||
*/
|
||||
def checkAndFetchMinOffset(groupId: String, topic: String,
|
||||
props: Properties): (util.Map[TopicPartition, Long], util.Map[TopicPartition, Long]) = {
|
||||
val thatAdmin = createAdminClient(props)
|
||||
val thatConsumer = new KafkaConsumer(props, new ByteArrayDeserializer, new ByteArrayDeserializer)
|
||||
|
||||
try {
|
||||
val thisTopicPartitions = consumerConsole.listSubscribeTopics(groupId).get(topic).asScala.sortBy(_.partition())
|
||||
val thatTopicPartitionMap = thatAdmin.listConsumerGroupOffsets(
|
||||
groupId
|
||||
).partitionsToOffsetAndMetadata.get(timeoutMs, TimeUnit.MILLISECONDS).asScala.filter(_._1.topic().equals(topic))
|
||||
val thatTopicPartitions = thatTopicPartitionMap.keySet.toList.sortBy(_.partition())
|
||||
if (thatTopicPartitions != thisTopicPartitions) {
|
||||
throw new IllegalStateException("topic partition inconsistent.")
|
||||
}
|
||||
val thatMinTopicOffset = thatConsumer.beginningOffsets(thatTopicPartitions.asJava)
|
||||
val thisMinTopicOffset = topicConsole.getTopicOffset(topic, thisTopicPartitions.asJava)._1
|
||||
|
||||
(thisMinTopicOffset, thatMinTopicOffset).asInstanceOf[(util.Map[TopicPartition, Long], util.Map[TopicPartition, Long])]
|
||||
} finally {
|
||||
thatAdmin.close()
|
||||
thatConsumer.close()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user