diff --git a/src/main/java/com/xuxd/kafka/console/service/impl/OperationServiceImpl.java b/src/main/java/com/xuxd/kafka/console/service/impl/OperationServiceImpl.java index b238880..c935dd5 100644 --- a/src/main/java/com/xuxd/kafka/console/service/impl/OperationServiceImpl.java +++ b/src/main/java/com/xuxd/kafka/console/service/impl/OperationServiceImpl.java @@ -7,6 +7,7 @@ import com.xuxd.kafka.console.beans.ResponseData; import com.xuxd.kafka.console.beans.dos.MinOffsetAlignmentDO; import com.xuxd.kafka.console.dao.MinOffsetAlignmentMapper; import com.xuxd.kafka.console.service.OperationService; +import java.util.HashMap; import java.util.Map; import java.util.Properties; import kafka.console.OperationConsole; @@ -37,8 +38,26 @@ public class OperationServiceImpl implements OperationService { } @Override public ResponseData syncConsumerOffset(String groupId, String topic, Properties thatProps) { + QueryWrapper wrapper = new QueryWrapper<>(); + wrapper.eq("group_id", groupId); + wrapper.eq("topic", topic); + MinOffsetAlignmentDO alignmentDO = minOffsetAlignmentMapper.selectOne(wrapper); + if (alignmentDO == null) { + return ResponseData.create().failed("No min offset info."); + } - Tuple2 tuple2 = operationConsole.syncConsumerOffset(groupId, topic, thatProps); + Map thisOffset = gson.fromJson(alignmentDO.getThisOffset(), Map.class); + Map thatOffset = gson.fromJson(alignmentDO.getThatOffset(), Map.class); + + Map thisMinOffset = new HashMap<>(), thatMinOffset = new HashMap<>(); + thisOffset.forEach((k, v)-> { + thisMinOffset.put(new TopicPartition(topic, Integer.valueOf(k)), Long.valueOf(v.toString())); + }); + thatOffset.forEach((k, v)-> { + thatMinOffset.put(new TopicPartition(topic, Integer.valueOf(k)), Long.valueOf(v.toString())); + }); + + Tuple2 tuple2 = operationConsole.syncConsumerOffset(groupId, topic, thatProps, thisMinOffset, thatMinOffset); return (boolean) tuple2._1() ? ResponseData.create().success() : ResponseData.create().failed(tuple2._2()); } diff --git a/src/main/scala/kafka/console/OperationConsole.scala b/src/main/scala/kafka/console/OperationConsole.scala index 58bd13b..dd5e6ef 100644 --- a/src/main/scala/kafka/console/OperationConsole.scala +++ b/src/main/scala/kafka/console/OperationConsole.scala @@ -95,6 +95,74 @@ class OperationConsole(config: KafkaConfig, topicConsole: TopicConsole, import java.util + def syncConsumerOffset(groupId: String, topic: String, props: Properties, + thisMinOffset: util.Map[TopicPartition, Long], + thatMinOffset: util.Map[TopicPartition, Long]): (Boolean, String) = { + val thatAdmin = createAdminClient(props) + try { + val searchGroupIds = Collections.singleton(groupId) + val groupDescriptionList = consumerConsole.getConsumerGroupList(searchGroupIds) + if (groupDescriptionList.isEmpty) { + throw new IllegalArgumentException("consumer group info is null.") + } + for (groupDescription <- groupDescriptionList.asScala) { + if (groupDescription.members().size() > 0) { + log.error("syncConsumerOffset: kafka exist consumer client.") + throw new UnsupportedOperationException("exist consumer client.") + } + } + val thatGroupDescriptionList = thatAdmin.describeConsumerGroups(searchGroupIds).all().get(timeoutMs, TimeUnit.MILLISECONDS).values() + if (groupDescriptionList.isEmpty) { + throw new IllegalArgumentException("that consumer group info is null.") + } + for (thatGroupDescription <- thatGroupDescriptionList.asScala) { + if (thatGroupDescription.members().size() > 0) { + log.error("syncConsumerOffset: that kafka exist consumer client.") + throw new UnsupportedOperationException("that kafka exist consumer client.") + } + } + val thisTopicPartitions = consumerConsole.listSubscribeTopics(groupId).get(topic).asScala.sortBy(_.partition()) + + val thatTopicPartitionMap = thatAdmin.listConsumerGroupOffsets( + groupId + ).partitionsToOffsetAndMetadata.get(timeoutMs, TimeUnit.MILLISECONDS).asScala.filter(_._1.topic().equals(topic)) + val thatTopicPartitions = thatTopicPartitionMap.keySet.toList.sortBy(_.partition()) + if (thatTopicPartitions != thisTopicPartitions) { + throw new IllegalStateException("topic partition inconsistent.") + } + + // get from that kafka: min offset of topic and consumer offset for partition + val thatCommittedOffset = thatTopicPartitionMap.map(m => (m._1, m._2.offset())) + val thatMinTopicOffset = thatMinOffset.asScala + + val thatDiffOffset = thatMinTopicOffset.map(m => { + val t = m._1 + thatCommittedOffset.get(t) match { + case None => throw new IllegalArgumentException("An unknown error occurred calculating the offset difference") + case Some(committedOffset) => (t, committedOffset - m._2) + } + }) + // get from this kafka: min offset of topic + val thisMinTopicOffset = thisMinOffset.asScala + + for ((t: TopicPartition, o: Long) <- thisMinTopicOffset) { + val res = consumerConsole.resetPartitionToTargetOffset(groupId, t, o + thatDiffOffset.get(t).get) + if (!res._1) { + log.error(s"reset $t offset failed") + throw new UnsupportedOperationException(s"reset $t offset failed") + } + } + (true, "") + } catch { + case ex => { + log.error("syncConsumerOffset error.", ex) + (false, ex.getMessage) + } + } finally { + thatAdmin.close() + } + } + /** * check partition consistency and fetch the min offset for the topic. *