From bbc35243e7fdb3e3ac77d64e6973eaf114e61f78 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=AE=B8=E6=99=93=E4=B8=9C?= <763795151@qq.com> Date: Mon, 25 Oct 2021 13:00:07 +0800 Subject: [PATCH] =?UTF-8?q?=E9=9B=86=E7=BE=A4=E5=90=8C=E6=AD=A5-=E3=80=8B?= =?UTF-8?q?=E5=90=8C=E6=AD=A5=E6=B6=88=E8=B4=B9=E4=BD=8D=E7=82=B9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../kafka/console/OperationConsole.scala | 70 +++++++++++++++++-- ui/src/views/op/SyncConsumerOffset.vue | 2 +- 2 files changed, 64 insertions(+), 8 deletions(-) diff --git a/src/main/scala/kafka/console/OperationConsole.scala b/src/main/scala/kafka/console/OperationConsole.scala index 5c85642..38c1e77 100644 --- a/src/main/scala/kafka/console/OperationConsole.scala +++ b/src/main/scala/kafka/console/OperationConsole.scala @@ -1,11 +1,14 @@ package kafka.console -import java.util.Properties import java.util.concurrent.TimeUnit +import java.util.{Collections, Properties} import com.xuxd.kafka.console.config.KafkaConfig +import org.apache.kafka.clients.consumer.KafkaConsumer +import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.serialization.ByteArrayDeserializer -import scala.jdk.CollectionConverters.{ListHasAsScala, MapHasAsScala} +import scala.jdk.CollectionConverters.{CollectionHasAsScala, ListHasAsScala, MapHasAsScala, SeqHasAsJava, SetHasAsScala} /** * kafka-console-ui. @@ -19,21 +22,74 @@ class OperationConsole(config: KafkaConfig, topicConsole: TopicConsole, def syncConsumerOffset(groupId: String, topic: String, props: Properties): (Boolean, String) = { val thatAdmin = createAdminClient(props) try { + val searchGroupIds = Collections.singleton(groupId) + val groupDescriptionList = consumerConsole.getConsumerGroupList(searchGroupIds) + if (groupDescriptionList.isEmpty) { + throw new IllegalArgumentException("consumer group info is null.") + } + for (groupDescription <- groupDescriptionList.asScala) { + if (groupDescription.members().size() > 0) { + log.error("syncConsumerOffset: kafka exist consumer client.") + throw new UnsupportedOperationException("exist consumer client.") + } + } + val thatGroupDescriptionList = thatAdmin.describeConsumerGroups(searchGroupIds).all().get(timeoutMs, TimeUnit.MILLISECONDS).values() + if (groupDescriptionList.isEmpty) { + throw new IllegalArgumentException("that consumer group info is null.") + } + for (thatGroupDescription <- thatGroupDescriptionList.asScala) { + if (thatGroupDescription.members().size() > 0) { + log.error("syncConsumerOffset: that kafka exist consumer client.") + throw new UnsupportedOperationException("that kafka exist consumer client.") + } + } val thisTopicPartitions = consumerConsole.listSubscribeTopics(groupId).get(topic).asScala.sortBy(_.partition()) - val thatTopicPartitions = thatAdmin.listConsumerGroupOffsets( + val thatTopicPartitionMap = thatAdmin.listConsumerGroupOffsets( groupId - ).partitionsToOffsetAndMetadata.get(timeoutMs, TimeUnit.MILLISECONDS).asScala.filter(_._1.topic().equals(topic)).keySet.toList.sortBy(_.partition()) + ).partitionsToOffsetAndMetadata.get(timeoutMs, TimeUnit.MILLISECONDS).asScala.filter(_._1.topic().equals(topic)) + val thatTopicPartitions = thatTopicPartitionMap.keySet.toList.sortBy(_.partition()) if (thatTopicPartitions != thisTopicPartitions) { throw new IllegalStateException("topic partition inconsistent.") } + // get from that kafka: min offset of topic and consumer offset for partition + val thatCommittedOffset = thatTopicPartitionMap.map(m => (m._1, m._2.offset())) + val thatMinTopicOffset = { + val consumer = new KafkaConsumer(props, new ByteArrayDeserializer, new ByteArrayDeserializer) + try { + + consumer.beginningOffsets(thatTopicPartitions.asJava).asScala + } finally { + consumer.close() + } + } + + val thatDiffOffset = thatMinTopicOffset.map(m => { + val t = m._1 + thatCommittedOffset.get(t) match { + case None => throw new IllegalArgumentException("An unknown error occurred calculating the offset difference") + case Some(committedOffset) => (t, committedOffset - m._2) + } + }) + // get from this kafka: min offset of topic + val thisMinTopicOffset = topicConsole.getTopicOffset(topic, thisTopicPartitions.asJava)._1.asScala + + for ((t: TopicPartition, o: Long) <- thisMinTopicOffset) { + val res = consumerConsole.resetPartitionToTargetOffset(groupId, t, o + thatDiffOffset.get(t).get) + if (!res._1) { + log.error(s"reset $t offset failed") + throw new UnsupportedOperationException(s"reset $t offset failed") + } + } + (true, "") } catch { - case ex => throw ex + case ex => { + log.error("syncConsumerOffset error.", ex) + (false, ex.getMessage) + } } finally { thatAdmin.close() } - - (true, "") } } diff --git a/ui/src/views/op/SyncConsumerOffset.vue b/ui/src/views/op/SyncConsumerOffset.vue index acc17e1..6c65434 100644 --- a/ui/src/views/op/SyncConsumerOffset.vue +++ b/ui/src/views/op/SyncConsumerOffset.vue @@ -140,7 +140,7 @@ export default { let k = c[0].trim(), v = c[1].trim(); for (let j = 2; j < c.length; j++) { - v += ("=" + c[j]); + v += "=" + c[j]; } if (k && v) { properties[k] = v;