diff --git a/src/main/scala/kafka/console/ConsumerConsole.scala b/src/main/scala/kafka/console/ConsumerConsole.scala index e9ca8fb..d13d415 100644 --- a/src/main/scala/kafka/console/ConsumerConsole.scala +++ b/src/main/scala/kafka/console/ConsumerConsole.scala @@ -1,10 +1,5 @@ package kafka.console -import java.time.Duration -import java.util -import java.util.concurrent.TimeUnit -import java.util.{Collections, Properties, Set} - import com.xuxd.kafka.console.config.KafkaConfig import org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo import org.apache.kafka.clients.admin._ @@ -12,6 +7,10 @@ import org.apache.kafka.clients.consumer.{ConsumerConfig, OffsetAndMetadata, Off import org.apache.kafka.common.requests.ListOffsetsResponse import org.apache.kafka.common.{ConsumerGroupState, TopicPartition} +import java.time.Duration +import java.util +import java.util.concurrent.TimeUnit +import java.util.{Collections, Properties, Set} import scala.beans.BeanProperty import scala.collection.{Map, Seq, mutable} import scala.jdk.CollectionConverters._ @@ -154,6 +153,7 @@ class ConsumerConsole(config: KafkaConfig) extends KafkaConsole(config: KafkaCon strategy match { case OffsetResetStrategy.EARLIEST => consumer.seekToBeginning(partitions.asJava) case OffsetResetStrategy.LATEST => consumer.seekToEnd(partitions.asJava) + case _ => } partitions.foreach(consumer.position(_)) consumer.commitSync() diff --git a/src/main/scala/kafka/console/OperationConsole.scala b/src/main/scala/kafka/console/OperationConsole.scala index 53d0858..5d8ad35 100644 --- a/src/main/scala/kafka/console/OperationConsole.scala +++ b/src/main/scala/kafka/console/OperationConsole.scala @@ -85,7 +85,7 @@ class OperationConsole(config: KafkaConfig, topicConsole: TopicConsole, } (true, "") } catch { - case ex => { + case ex: Throwable => { log.error("syncConsumerOffset error.", ex) (false, ex.getMessage) } @@ -155,7 +155,7 @@ class OperationConsole(config: KafkaConfig, topicConsole: TopicConsole, } (true, "") } catch { - case ex => { + case ex: Throwable => { log.error("syncConsumerOffset error.", ex) (false, ex.getMessage) }