From 3b01f28446318dad77b7166b4cde1dd8e26a6fec Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=AE=B8=E6=99=93=E4=B8=9C?= <763795151@qq.com> Date: Wed, 27 Oct 2021 16:49:04 +0800 Subject: [PATCH] =?UTF-8?q?=E9=9B=86=E7=BE=A4=E5=90=8C=E6=AD=A5-=E3=80=8B?= =?UTF-8?q?=E5=90=8C=E6=AD=A5=E6=B6=88=E8=B4=B9=E4=BD=8D=E7=A7=BB?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../service/impl/OperationServiceImpl.java | 6 +++--- .../scala/kafka/console/OperationConsole.scala | 16 ++++++++-------- 2 files changed, 11 insertions(+), 11 deletions(-) diff --git a/src/main/java/com/xuxd/kafka/console/service/impl/OperationServiceImpl.java b/src/main/java/com/xuxd/kafka/console/service/impl/OperationServiceImpl.java index c935dd5..53b5afe 100644 --- a/src/main/java/com/xuxd/kafka/console/service/impl/OperationServiceImpl.java +++ b/src/main/java/com/xuxd/kafka/console/service/impl/OperationServiceImpl.java @@ -64,12 +64,12 @@ public class OperationServiceImpl implements OperationService { @Override public ResponseData minOffsetAlignment(String groupId, String topic, Properties thatProps) { - Tuple2, Map> tuple2 = operationConsole.checkAndFetchMinOffset(groupId, topic, thatProps); - Map thisMinOffset = tuple2._1(); + Tuple2, Map> tuple2 = operationConsole.checkAndFetchOffset(groupId, topic, thatProps); + Map thisMaxOffset = tuple2._1(); Map thatMinOffset = tuple2._2(); JsonObject thisJson = new JsonObject(), thatJson = new JsonObject(); - thisMinOffset.forEach((k, v) -> { + thisMaxOffset.forEach((k, v) -> { thisJson.addProperty(String.valueOf(k.partition()), v.toString()); }); thatMinOffset.forEach((k, v) -> { diff --git a/src/main/scala/kafka/console/OperationConsole.scala b/src/main/scala/kafka/console/OperationConsole.scala index dd5e6ef..d48afc3 100644 --- a/src/main/scala/kafka/console/OperationConsole.scala +++ b/src/main/scala/kafka/console/OperationConsole.scala @@ -96,7 +96,7 @@ class OperationConsole(config: KafkaConfig, topicConsole: TopicConsole, import java.util def syncConsumerOffset(groupId: String, topic: String, props: Properties, - thisMinOffset: util.Map[TopicPartition, Long], + thisMaxOffset: util.Map[TopicPartition, Long], thatMinOffset: util.Map[TopicPartition, Long]): (Boolean, String) = { val thatAdmin = createAdminClient(props) try { @@ -143,9 +143,9 @@ class OperationConsole(config: KafkaConfig, topicConsole: TopicConsole, } }) // get from this kafka: min offset of topic - val thisMinTopicOffset = thisMinOffset.asScala + val thisMaxTopicOffset = thisMaxOffset.asScala - for ((t: TopicPartition, o: Long) <- thisMinTopicOffset) { + for ((t: TopicPartition, o: Long) <- thisMaxTopicOffset) { val res = consumerConsole.resetPartitionToTargetOffset(groupId, t, o + thatDiffOffset.get(t).get) if (!res._1) { log.error(s"reset $t offset failed") @@ -164,14 +164,14 @@ class OperationConsole(config: KafkaConfig, topicConsole: TopicConsole, } /** - * check partition consistency and fetch the min offset for the topic. + * check partition consistency and fetch the min offset for the that kafka topic, max offset for this kafka. * * @param groupId group id. * @param topic topic. * @param props other kafka cluster config. - * @return _1: this min offset, _2: that min offset. + * @return _1: this max offset, _2: that min offset. */ - def checkAndFetchMinOffset(groupId: String, topic: String, + def checkAndFetchOffset(groupId: String, topic: String, props: Properties): (util.Map[TopicPartition, Long], util.Map[TopicPartition, Long]) = { val thatAdmin = createAdminClient(props) val thatConsumer = new KafkaConsumer(props, new ByteArrayDeserializer, new ByteArrayDeserializer) @@ -186,9 +186,9 @@ class OperationConsole(config: KafkaConfig, topicConsole: TopicConsole, throw new IllegalStateException("topic partition inconsistent.") } val thatMinTopicOffset = thatConsumer.beginningOffsets(thatTopicPartitions.asJava) - val thisMinTopicOffset = topicConsole.getTopicOffset(topic, thisTopicPartitions.asJava)._1 + val thisMaxTopicOffset = topicConsole.getTopicOffset(topic, thisTopicPartitions.asJava)._2 - (thisMinTopicOffset, thatMinTopicOffset).asInstanceOf[(util.Map[TopicPartition, Long], util.Map[TopicPartition, Long])] + (thisMaxTopicOffset, thatMinTopicOffset).asInstanceOf[(util.Map[TopicPartition, Long], util.Map[TopicPartition, Long])] } finally { thatAdmin.close() thatConsumer.close()