打包资源文件包含
This commit is contained in:
@@ -1,10 +1,5 @@
|
|||||||
package kafka.console
|
package kafka.console
|
||||||
|
|
||||||
import java.time.Duration
|
|
||||||
import java.util
|
|
||||||
import java.util.concurrent.TimeUnit
|
|
||||||
import java.util.{Collections, Properties, Set}
|
|
||||||
|
|
||||||
import com.xuxd.kafka.console.config.KafkaConfig
|
import com.xuxd.kafka.console.config.KafkaConfig
|
||||||
import org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo
|
import org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo
|
||||||
import org.apache.kafka.clients.admin._
|
import org.apache.kafka.clients.admin._
|
||||||
@@ -12,6 +7,10 @@ import org.apache.kafka.clients.consumer.{ConsumerConfig, OffsetAndMetadata, Off
|
|||||||
import org.apache.kafka.common.requests.ListOffsetsResponse
|
import org.apache.kafka.common.requests.ListOffsetsResponse
|
||||||
import org.apache.kafka.common.{ConsumerGroupState, TopicPartition}
|
import org.apache.kafka.common.{ConsumerGroupState, TopicPartition}
|
||||||
|
|
||||||
|
import java.time.Duration
|
||||||
|
import java.util
|
||||||
|
import java.util.concurrent.TimeUnit
|
||||||
|
import java.util.{Collections, Properties, Set}
|
||||||
import scala.beans.BeanProperty
|
import scala.beans.BeanProperty
|
||||||
import scala.collection.{Map, Seq, mutable}
|
import scala.collection.{Map, Seq, mutable}
|
||||||
import scala.jdk.CollectionConverters._
|
import scala.jdk.CollectionConverters._
|
||||||
@@ -154,6 +153,7 @@ class ConsumerConsole(config: KafkaConfig) extends KafkaConsole(config: KafkaCon
|
|||||||
strategy match {
|
strategy match {
|
||||||
case OffsetResetStrategy.EARLIEST => consumer.seekToBeginning(partitions.asJava)
|
case OffsetResetStrategy.EARLIEST => consumer.seekToBeginning(partitions.asJava)
|
||||||
case OffsetResetStrategy.LATEST => consumer.seekToEnd(partitions.asJava)
|
case OffsetResetStrategy.LATEST => consumer.seekToEnd(partitions.asJava)
|
||||||
|
case _ =>
|
||||||
}
|
}
|
||||||
partitions.foreach(consumer.position(_))
|
partitions.foreach(consumer.position(_))
|
||||||
consumer.commitSync()
|
consumer.commitSync()
|
||||||
|
|||||||
@@ -85,7 +85,7 @@ class OperationConsole(config: KafkaConfig, topicConsole: TopicConsole,
|
|||||||
}
|
}
|
||||||
(true, "")
|
(true, "")
|
||||||
} catch {
|
} catch {
|
||||||
case ex => {
|
case ex: Throwable => {
|
||||||
log.error("syncConsumerOffset error.", ex)
|
log.error("syncConsumerOffset error.", ex)
|
||||||
(false, ex.getMessage)
|
(false, ex.getMessage)
|
||||||
}
|
}
|
||||||
@@ -155,7 +155,7 @@ class OperationConsole(config: KafkaConfig, topicConsole: TopicConsole,
|
|||||||
}
|
}
|
||||||
(true, "")
|
(true, "")
|
||||||
} catch {
|
} catch {
|
||||||
case ex => {
|
case ex: Throwable => {
|
||||||
log.error("syncConsumerOffset error.", ex)
|
log.error("syncConsumerOffset error.", ex)
|
||||||
(false, ex.getMessage)
|
(false, ex.getMessage)
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user