diff --git a/src/main/java/com/xuxd/kafka/console/service/impl/OperationServiceImpl.java b/src/main/java/com/xuxd/kafka/console/service/impl/OperationServiceImpl.java index c935dd5..53b5afe 100644 --- a/src/main/java/com/xuxd/kafka/console/service/impl/OperationServiceImpl.java +++ b/src/main/java/com/xuxd/kafka/console/service/impl/OperationServiceImpl.java @@ -64,12 +64,12 @@ public class OperationServiceImpl implements OperationService { @Override public ResponseData minOffsetAlignment(String groupId, String topic, Properties thatProps) { - Tuple2, Map> tuple2 = operationConsole.checkAndFetchMinOffset(groupId, topic, thatProps); - Map thisMinOffset = tuple2._1(); + Tuple2, Map> tuple2 = operationConsole.checkAndFetchOffset(groupId, topic, thatProps); + Map thisMaxOffset = tuple2._1(); Map thatMinOffset = tuple2._2(); JsonObject thisJson = new JsonObject(), thatJson = new JsonObject(); - thisMinOffset.forEach((k, v) -> { + thisMaxOffset.forEach((k, v) -> { thisJson.addProperty(String.valueOf(k.partition()), v.toString()); }); thatMinOffset.forEach((k, v) -> { diff --git a/src/main/scala/kafka/console/OperationConsole.scala b/src/main/scala/kafka/console/OperationConsole.scala index dd5e6ef..d48afc3 100644 --- a/src/main/scala/kafka/console/OperationConsole.scala +++ b/src/main/scala/kafka/console/OperationConsole.scala @@ -96,7 +96,7 @@ class OperationConsole(config: KafkaConfig, topicConsole: TopicConsole, import java.util def syncConsumerOffset(groupId: String, topic: String, props: Properties, - thisMinOffset: util.Map[TopicPartition, Long], + thisMaxOffset: util.Map[TopicPartition, Long], thatMinOffset: util.Map[TopicPartition, Long]): (Boolean, String) = { val thatAdmin = createAdminClient(props) try { @@ -143,9 +143,9 @@ class OperationConsole(config: KafkaConfig, topicConsole: TopicConsole, } }) // get from this kafka: min offset of topic - val thisMinTopicOffset = thisMinOffset.asScala + val thisMaxTopicOffset = thisMaxOffset.asScala - for ((t: TopicPartition, o: Long) <- thisMinTopicOffset) { + for ((t: TopicPartition, o: Long) <- thisMaxTopicOffset) { val res = consumerConsole.resetPartitionToTargetOffset(groupId, t, o + thatDiffOffset.get(t).get) if (!res._1) { log.error(s"reset $t offset failed") @@ -164,14 +164,14 @@ class OperationConsole(config: KafkaConfig, topicConsole: TopicConsole, } /** - * check partition consistency and fetch the min offset for the topic. + * check partition consistency and fetch the min offset for the that kafka topic, max offset for this kafka. * * @param groupId group id. * @param topic topic. * @param props other kafka cluster config. - * @return _1: this min offset, _2: that min offset. + * @return _1: this max offset, _2: that min offset. */ - def checkAndFetchMinOffset(groupId: String, topic: String, + def checkAndFetchOffset(groupId: String, topic: String, props: Properties): (util.Map[TopicPartition, Long], util.Map[TopicPartition, Long]) = { val thatAdmin = createAdminClient(props) val thatConsumer = new KafkaConsumer(props, new ByteArrayDeserializer, new ByteArrayDeserializer) @@ -186,9 +186,9 @@ class OperationConsole(config: KafkaConfig, topicConsole: TopicConsole, throw new IllegalStateException("topic partition inconsistent.") } val thatMinTopicOffset = thatConsumer.beginningOffsets(thatTopicPartitions.asJava) - val thisMinTopicOffset = topicConsole.getTopicOffset(topic, thisTopicPartitions.asJava)._1 + val thisMaxTopicOffset = topicConsole.getTopicOffset(topic, thisTopicPartitions.asJava)._2 - (thisMinTopicOffset, thatMinTopicOffset).asInstanceOf[(util.Map[TopicPartition, Long], util.Map[TopicPartition, Long])] + (thisMaxTopicOffset, thatMinTopicOffset).asInstanceOf[(util.Map[TopicPartition, Long], util.Map[TopicPartition, Long])] } finally { thatAdmin.close() thatConsumer.close()