重置消费位点
This commit is contained in:
@@ -8,7 +8,7 @@ import java.util.{Collections, Properties, Set}
|
||||
import com.xuxd.kafka.console.config.KafkaConfig
|
||||
import org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo
|
||||
import org.apache.kafka.clients.admin.{ConsumerGroupDescription, DeleteConsumerGroupsOptions, ListConsumerGroupsOptions, OffsetSpec}
|
||||
import org.apache.kafka.clients.consumer.{ConsumerConfig, OffsetAndMetadata}
|
||||
import org.apache.kafka.clients.consumer.{ConsumerConfig, OffsetAndMetadata, OffsetResetStrategy}
|
||||
import org.apache.kafka.common.{ConsumerGroupState, TopicPartition}
|
||||
|
||||
import scala.beans.BeanProperty
|
||||
@@ -138,20 +138,27 @@ class ConsumerConsole(config: KafkaConfig) extends KafkaConsole(config: KafkaCon
|
||||
}
|
||||
|
||||
def resetOffsetToEarliest(groupId: String, topic: String): (Boolean, String) = {
|
||||
resetOffsetToEndpoint(groupId, topic, OffsetResetStrategy.EARLIEST)
|
||||
}
|
||||
|
||||
def resetOffsetToEndpoint(groupId: String, topic: String, strategy: OffsetResetStrategy): (Boolean, String) = {
|
||||
val props = new Properties()
|
||||
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId)
|
||||
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
|
||||
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
|
||||
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, strategy.name().toLowerCase);
|
||||
withConsumerAndCatchError(consumer => {
|
||||
consumer.subscribe(Collections.singleton(topic))
|
||||
consumer.poll(0)
|
||||
val partitions = consumer.partitionsFor(topic).asScala.map(p => new TopicPartition(topic, p.partition())).toList
|
||||
consumer.seekToBeginning(partitions.asJava)
|
||||
strategy match {
|
||||
case OffsetResetStrategy.EARLIEST => consumer.seekToBeginning(partitions.asJava)
|
||||
case OffsetResetStrategy.LATEST => consumer.seekToEnd(partitions.asJava)
|
||||
}
|
||||
partitions.foreach(consumer.position(_))
|
||||
consumer.commitSync()
|
||||
(true, "")
|
||||
}, e => {
|
||||
log.error("resetOffsetToEarliest error", e)
|
||||
log.error("resetOffsetToEndpoint error", e)
|
||||
(false, e.getMessage)
|
||||
}, props).asInstanceOf[(Boolean, String)]
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user