From b46492930c6a9e097a244d616f8176d0e0f9e614 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=AE=B8=E6=99=93=E4=B8=9C?= <763795151@qq.com> Date: Tue, 26 Oct 2021 21:16:46 +0800 Subject: [PATCH] =?UTF-8?q?=E9=9B=86=E7=BE=A4=E5=90=8C=E6=AD=A5-=E3=80=8B?= =?UTF-8?q?=E6=9C=80=E5=B0=8F=E4=BD=8D=E7=A7=BB=E5=AF=B9=E9=BD=90?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../service/impl/OperationServiceImpl.java | 21 +++++- .../kafka/console/OperationConsole.scala | 68 +++++++++++++++++++ 2 files changed, 88 insertions(+), 1 deletion(-) diff --git a/src/main/java/com/xuxd/kafka/console/service/impl/OperationServiceImpl.java b/src/main/java/com/xuxd/kafka/console/service/impl/OperationServiceImpl.java index b238880..c935dd5 100644 --- a/src/main/java/com/xuxd/kafka/console/service/impl/OperationServiceImpl.java +++ b/src/main/java/com/xuxd/kafka/console/service/impl/OperationServiceImpl.java @@ -7,6 +7,7 @@ import com.xuxd.kafka.console.beans.ResponseData; import com.xuxd.kafka.console.beans.dos.MinOffsetAlignmentDO; import com.xuxd.kafka.console.dao.MinOffsetAlignmentMapper; import com.xuxd.kafka.console.service.OperationService; +import java.util.HashMap; import java.util.Map; import java.util.Properties; import kafka.console.OperationConsole; @@ -37,8 +38,26 @@ public class OperationServiceImpl implements OperationService { } @Override public ResponseData syncConsumerOffset(String groupId, String topic, Properties thatProps) { + QueryWrapper wrapper = new QueryWrapper<>(); + wrapper.eq("group_id", groupId); + wrapper.eq("topic", topic); + MinOffsetAlignmentDO alignmentDO = minOffsetAlignmentMapper.selectOne(wrapper); + if (alignmentDO == null) { + return ResponseData.create().failed("No min offset info."); + } - Tuple2 tuple2 = operationConsole.syncConsumerOffset(groupId, topic, thatProps); + Map thisOffset = gson.fromJson(alignmentDO.getThisOffset(), Map.class); + Map thatOffset = gson.fromJson(alignmentDO.getThatOffset(), Map.class); + + Map thisMinOffset = new HashMap<>(), thatMinOffset = new HashMap<>(); + thisOffset.forEach((k, v)-> { + thisMinOffset.put(new TopicPartition(topic, Integer.valueOf(k)), Long.valueOf(v.toString())); + }); + thatOffset.forEach((k, v)-> { + thatMinOffset.put(new TopicPartition(topic, Integer.valueOf(k)), Long.valueOf(v.toString())); + }); + + Tuple2 tuple2 = operationConsole.syncConsumerOffset(groupId, topic, thatProps, thisMinOffset, thatMinOffset); return (boolean) tuple2._1() ? ResponseData.create().success() : ResponseData.create().failed(tuple2._2()); } diff --git a/src/main/scala/kafka/console/OperationConsole.scala b/src/main/scala/kafka/console/OperationConsole.scala index 58bd13b..dd5e6ef 100644 --- a/src/main/scala/kafka/console/OperationConsole.scala +++ b/src/main/scala/kafka/console/OperationConsole.scala @@ -95,6 +95,74 @@ class OperationConsole(config: KafkaConfig, topicConsole: TopicConsole, import java.util + def syncConsumerOffset(groupId: String, topic: String, props: Properties, + thisMinOffset: util.Map[TopicPartition, Long], + thatMinOffset: util.Map[TopicPartition, Long]): (Boolean, String) = { + val thatAdmin = createAdminClient(props) + try { + val searchGroupIds = Collections.singleton(groupId) + val groupDescriptionList = consumerConsole.getConsumerGroupList(searchGroupIds) + if (groupDescriptionList.isEmpty) { + throw new IllegalArgumentException("consumer group info is null.") + } + for (groupDescription <- groupDescriptionList.asScala) { + if (groupDescription.members().size() > 0) { + log.error("syncConsumerOffset: kafka exist consumer client.") + throw new UnsupportedOperationException("exist consumer client.") + } + } + val thatGroupDescriptionList = thatAdmin.describeConsumerGroups(searchGroupIds).all().get(timeoutMs, TimeUnit.MILLISECONDS).values() + if (groupDescriptionList.isEmpty) { + throw new IllegalArgumentException("that consumer group info is null.") + } + for (thatGroupDescription <- thatGroupDescriptionList.asScala) { + if (thatGroupDescription.members().size() > 0) { + log.error("syncConsumerOffset: that kafka exist consumer client.") + throw new UnsupportedOperationException("that kafka exist consumer client.") + } + } + val thisTopicPartitions = consumerConsole.listSubscribeTopics(groupId).get(topic).asScala.sortBy(_.partition()) + + val thatTopicPartitionMap = thatAdmin.listConsumerGroupOffsets( + groupId + ).partitionsToOffsetAndMetadata.get(timeoutMs, TimeUnit.MILLISECONDS).asScala.filter(_._1.topic().equals(topic)) + val thatTopicPartitions = thatTopicPartitionMap.keySet.toList.sortBy(_.partition()) + if (thatTopicPartitions != thisTopicPartitions) { + throw new IllegalStateException("topic partition inconsistent.") + } + + // get from that kafka: min offset of topic and consumer offset for partition + val thatCommittedOffset = thatTopicPartitionMap.map(m => (m._1, m._2.offset())) + val thatMinTopicOffset = thatMinOffset.asScala + + val thatDiffOffset = thatMinTopicOffset.map(m => { + val t = m._1 + thatCommittedOffset.get(t) match { + case None => throw new IllegalArgumentException("An unknown error occurred calculating the offset difference") + case Some(committedOffset) => (t, committedOffset - m._2) + } + }) + // get from this kafka: min offset of topic + val thisMinTopicOffset = thisMinOffset.asScala + + for ((t: TopicPartition, o: Long) <- thisMinTopicOffset) { + val res = consumerConsole.resetPartitionToTargetOffset(groupId, t, o + thatDiffOffset.get(t).get) + if (!res._1) { + log.error(s"reset $t offset failed") + throw new UnsupportedOperationException(s"reset $t offset failed") + } + } + (true, "") + } catch { + case ex => { + log.error("syncConsumerOffset error.", ex) + (false, ex.getMessage) + } + } finally { + thatAdmin.close() + } + } + /** * check partition consistency and fetch the min offset for the topic. *