集群同步-》最小位移对齐

This commit is contained in:
许晓东
2021-10-26 21:16:46 +08:00
parent a06b6dbb5f
commit b46492930c
2 changed files with 88 additions and 1 deletions

View File

@@ -95,6 +95,74 @@ class OperationConsole(config: KafkaConfig, topicConsole: TopicConsole,
import java.util
def syncConsumerOffset(groupId: String, topic: String, props: Properties,
thisMinOffset: util.Map[TopicPartition, Long],
thatMinOffset: util.Map[TopicPartition, Long]): (Boolean, String) = {
val thatAdmin = createAdminClient(props)
try {
val searchGroupIds = Collections.singleton(groupId)
val groupDescriptionList = consumerConsole.getConsumerGroupList(searchGroupIds)
if (groupDescriptionList.isEmpty) {
throw new IllegalArgumentException("consumer group info is null.")
}
for (groupDescription <- groupDescriptionList.asScala) {
if (groupDescription.members().size() > 0) {
log.error("syncConsumerOffset: kafka exist consumer client.")
throw new UnsupportedOperationException("exist consumer client.")
}
}
val thatGroupDescriptionList = thatAdmin.describeConsumerGroups(searchGroupIds).all().get(timeoutMs, TimeUnit.MILLISECONDS).values()
if (groupDescriptionList.isEmpty) {
throw new IllegalArgumentException("that consumer group info is null.")
}
for (thatGroupDescription <- thatGroupDescriptionList.asScala) {
if (thatGroupDescription.members().size() > 0) {
log.error("syncConsumerOffset: that kafka exist consumer client.")
throw new UnsupportedOperationException("that kafka exist consumer client.")
}
}
val thisTopicPartitions = consumerConsole.listSubscribeTopics(groupId).get(topic).asScala.sortBy(_.partition())
val thatTopicPartitionMap = thatAdmin.listConsumerGroupOffsets(
groupId
).partitionsToOffsetAndMetadata.get(timeoutMs, TimeUnit.MILLISECONDS).asScala.filter(_._1.topic().equals(topic))
val thatTopicPartitions = thatTopicPartitionMap.keySet.toList.sortBy(_.partition())
if (thatTopicPartitions != thisTopicPartitions) {
throw new IllegalStateException("topic partition inconsistent.")
}
// get from that kafka: min offset of topic and consumer offset for partition
val thatCommittedOffset = thatTopicPartitionMap.map(m => (m._1, m._2.offset()))
val thatMinTopicOffset = thatMinOffset.asScala
val thatDiffOffset = thatMinTopicOffset.map(m => {
val t = m._1
thatCommittedOffset.get(t) match {
case None => throw new IllegalArgumentException("An unknown error occurred calculating the offset difference")
case Some(committedOffset) => (t, committedOffset - m._2)
}
})
// get from this kafka: min offset of topic
val thisMinTopicOffset = thisMinOffset.asScala
for ((t: TopicPartition, o: Long) <- thisMinTopicOffset) {
val res = consumerConsole.resetPartitionToTargetOffset(groupId, t, o + thatDiffOffset.get(t).get)
if (!res._1) {
log.error(s"reset $t offset failed")
throw new UnsupportedOperationException(s"reset $t offset failed")
}
}
(true, "")
} catch {
case ex => {
log.error("syncConsumerOffset error.", ex)
(false, ex.getMessage)
}
} finally {
thatAdmin.close()
}
}
/**
* check partition consistency and fetch the min offset for the topic.
*