diff --git a/src/main/java/com/xuxd/kafka/console/service/impl/ConsumerServiceImpl.java b/src/main/java/com/xuxd/kafka/console/service/impl/ConsumerServiceImpl.java index 20bbaf7..92c2798 100644 --- a/src/main/java/com/xuxd/kafka/console/service/impl/ConsumerServiceImpl.java +++ b/src/main/java/com/xuxd/kafka/console/service/impl/ConsumerServiceImpl.java @@ -111,9 +111,14 @@ public class ConsumerServiceImpl implements ConsumerService { } // consumer message and commit offset. - consumerConsole.consumeMessageDoNothing(groupId, topic); + Tuple2 tuple21 = consumerConsole.consumeMessageDoNothing(groupId, topic); + if (!(boolean) tuple21._1()) { + return ResponseData.create().failed(tuple21._2()); + } - // reset consume offset to 0. - return ResponseData.create().success(); + // reset consume offset to earliest. + Tuple2 tuple2 = consumerConsole.resetOffsetToEarliest(groupId, topic); + + return (boolean) tuple2._1() ? ResponseData.create().success() : ResponseData.create().failed(tuple2._2()); } } diff --git a/src/main/scala/kafka/console/ConsumerConsole.scala b/src/main/scala/kafka/console/ConsumerConsole.scala index 7f71d4a..3df49ae 100644 --- a/src/main/scala/kafka/console/ConsumerConsole.scala +++ b/src/main/scala/kafka/console/ConsumerConsole.scala @@ -84,10 +84,16 @@ class ConsumerConsole(config: KafkaConfig) extends KafkaConsole(config: KafkaCon val topicPartitionConsumeInfoMap = commitOffsets.keySet.map(topicPartition => { val t = new TopicPartitionConsumeInfo t.topicPartition = topicPartition - t.logEndOffset = endOffsets.get(t.topicPartition).get.offset() - t.consumerOffset = getPartitionOffset(t.topicPartition).get - t.lag = t.logEndOffset - t.consumerOffset t.groupId = consumerGroup.groupId() + t.consumerOffset = getPartitionOffset(t.topicPartition).get + endOffsets.get(t.topicPartition) match { + case None => t.lag = -1 + case Some(v) => { + t.logEndOffset = v.offset() + t.lag = t.logEndOffset - t.consumerOffset + } + } + t.lag = t.logEndOffset - t.consumerOffset (topicPartition, t) }).toMap @@ -113,18 +119,41 @@ class ConsumerConsole(config: KafkaConfig) extends KafkaConsole(config: KafkaCon res } - def consumeMessageDoNothing(groupId: String, topic: String): Unit = { + def consumeMessageDoNothing(groupId: String, topic: String): (Boolean, String) = { val props = new Properties() props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId) props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false") withConsumerAndCatchError(consumer => { consumer.subscribe(Collections.singletonList(topic)) - consumer.poll(Duration.ofSeconds(1)) + for (i <- 1 to 2) { + consumer.poll(Duration.ofSeconds(1)) + } consumer.commitSync() - }, e=> { + (true, "") + }, e => { log.error("subscribe error", e) - }, props) + (false, e.getMessage) + }, props).asInstanceOf[(Boolean, String)] + } + + def resetOffsetToEarliest(groupId: String, topic: String): (Boolean, String) = { + val props = new Properties() + props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId) + props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false") + props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + withConsumerAndCatchError(consumer => { + consumer.subscribe(Collections.singleton(topic)) + consumer.poll(0) + val partitions = consumer.partitionsFor(topic).asScala.map(p => new TopicPartition(topic, p.partition())).toList + consumer.seekToBeginning(partitions.asJava) + partitions.foreach(consumer.position(_)) + consumer.commitSync() + (true, "") + }, e => { + log.error("resetOffsetToEarliest error", e) + (false, e.getMessage) + }, props).asInstanceOf[(Boolean, String)] } private def describeConsumerGroups(groupIds: util.Set[String]): mutable.Map[String, ConsumerGroupDescription] = {