新建订阅

This commit is contained in:
许晓东
2021-10-21 21:51:57 +08:00
parent 00eb51eb2e
commit 3664c7349b
3 changed files with 22 additions and 4 deletions

View File

@@ -1,13 +1,14 @@
package kafka.console
import java.time.Duration
import java.util
import java.util.concurrent.TimeUnit
import java.util.{Collections, Set}
import java.util.{Collections, Properties, Set}
import com.xuxd.kafka.console.config.KafkaConfig
import org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo
import org.apache.kafka.clients.admin.{ConsumerGroupDescription, DeleteConsumerGroupsOptions, ListConsumerGroupsOptions, OffsetSpec}
import org.apache.kafka.clients.consumer.OffsetAndMetadata
import org.apache.kafka.clients.consumer.{ConsumerConfig, OffsetAndMetadata}
import org.apache.kafka.common.{ConsumerGroupState, TopicPartition}
import scala.beans.BeanProperty
@@ -112,6 +113,20 @@ class ConsumerConsole(config: KafkaConfig) extends KafkaConsole(config: KafkaCon
res
}
def consumeMessageDoNothing(groupId: String, topic: String): Unit = {
val props = new Properties()
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId)
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
withConsumerAndCatchError(consumer => {
consumer.subscribe(Collections.singletonList(topic))
consumer.poll(Duration.ofSeconds(1))
consumer.commitSync()
}, e=> {
log.error("subscribe error", e)
}, props)
}
private def describeConsumerGroups(groupIds: util.Set[String]): mutable.Map[String, ConsumerGroupDescription] = {
withAdminClientAndCatchError(admin => {
admin.describeConsumerGroups(groupIds).describedGroups().asScala.map {