缓存连接
This commit is contained in:
@@ -1,18 +1,21 @@
|
||||
package kafka.console
|
||||
|
||||
import com.google.common.cache.{CacheLoader, RemovalListener, RemovalNotification}
|
||||
import com.xuxd.kafka.console.cache.TimeBasedCache
|
||||
import com.xuxd.kafka.console.config.{ContextConfigHolder, KafkaConfig}
|
||||
import kafka.zk.{AdminZkClient, KafkaZkClient}
|
||||
import kafka.zk.AdminZkClient
|
||||
import org.apache.kafka.clients.admin._
|
||||
import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer, OffsetAndMetadata}
|
||||
import org.apache.kafka.clients.producer.KafkaProducer
|
||||
import org.apache.kafka.common.TopicPartition
|
||||
import org.apache.kafka.common.requests.ListOffsetsResponse
|
||||
import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySerializer, StringSerializer}
|
||||
import org.apache.kafka.common.utils.Time
|
||||
import org.slf4j.{Logger, LoggerFactory}
|
||||
|
||||
import java.util.Properties
|
||||
import java.util.concurrent.Executors
|
||||
import scala.collection.{Map, Seq}
|
||||
import scala.concurrent.{ExecutionContext, Future}
|
||||
import scala.jdk.CollectionConverters.{MapHasAsJava, MapHasAsScala}
|
||||
|
||||
/**
|
||||
@@ -27,11 +30,13 @@ class KafkaConsole(config: KafkaConfig) {
|
||||
|
||||
protected def withAdminClient(f: Admin => Any): Any = {
|
||||
|
||||
val admin = createAdminClient()
|
||||
val admin = if (config.isCacheAdminConnection()) AdminCache.cache.get(ContextConfigHolder.CONTEXT_CONFIG.get().getBootstrapServer()) else createAdminClient()
|
||||
try {
|
||||
f(admin)
|
||||
} finally {
|
||||
admin.close()
|
||||
if (!config.isCacheAdminConnection) {
|
||||
admin.close()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -45,17 +50,22 @@ class KafkaConsole(config: KafkaConfig) {
|
||||
|
||||
protected def withConsumerAndCatchError(f: KafkaConsumer[Array[Byte], Array[Byte]] => Any, eh: Exception => Any,
|
||||
extra: Properties = new Properties()): Any = {
|
||||
val props = getProps()
|
||||
props.putAll(extra)
|
||||
props.put(ConsumerConfig.CLIENT_ID_CONFIG, String.valueOf(System.currentTimeMillis()))
|
||||
val consumer = new KafkaConsumer(props, new ByteArrayDeserializer, new ByteArrayDeserializer)
|
||||
// val props = getProps()
|
||||
// props.putAll(extra)
|
||||
// props.put(ConsumerConfig.CLIENT_ID_CONFIG, String.valueOf(System.currentTimeMillis()))
|
||||
// val consumer = new KafkaConsumer(props, new ByteArrayDeserializer, new ByteArrayDeserializer)
|
||||
ConsumerCache.setProperties(extra)
|
||||
val consumer = if (config.isCacheConsumerConnection) ConsumerCache.cache.get(ContextConfigHolder.CONTEXT_CONFIG.get().getBootstrapServer()) else KafkaConsole.createByteArrayKVConsumer(extra)
|
||||
|
||||
try {
|
||||
f(consumer)
|
||||
} catch {
|
||||
case er: Exception => eh(er)
|
||||
}
|
||||
finally {
|
||||
consumer.close()
|
||||
if (!config.isCacheConsumerConnection) {
|
||||
consumer.close()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -113,20 +123,35 @@ class KafkaConsole(config: KafkaConfig) {
|
||||
}
|
||||
|
||||
private def createAdminClient(): Admin = {
|
||||
Admin.create(getProps())
|
||||
KafkaConsole.createAdminClient()
|
||||
}
|
||||
|
||||
private def getProps(): Properties = {
|
||||
KafkaConsole.getProps()
|
||||
}
|
||||
}
|
||||
|
||||
object KafkaConsole {
|
||||
val log: Logger = LoggerFactory.getLogger(this.getClass)
|
||||
|
||||
def createAdminClient(): Admin = {
|
||||
Admin.create(getProps())
|
||||
}
|
||||
|
||||
def createByteArrayKVConsumer(extra: Properties) : KafkaConsumer[Array[Byte], Array[Byte]] = {
|
||||
val props = getProps()
|
||||
props.putAll(extra)
|
||||
props.put(ConsumerConfig.CLIENT_ID_CONFIG, String.valueOf(System.currentTimeMillis()))
|
||||
new KafkaConsumer(props, new ByteArrayDeserializer, new ByteArrayDeserializer)
|
||||
}
|
||||
|
||||
def getProps(): Properties = {
|
||||
val props: Properties = new Properties();
|
||||
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, ContextConfigHolder.CONTEXT_CONFIG.get().getBootstrapServer())
|
||||
props.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, ContextConfigHolder.CONTEXT_CONFIG.get().getRequestTimeoutMs())
|
||||
props.putAll(ContextConfigHolder.CONTEXT_CONFIG.get().getProperties())
|
||||
props
|
||||
}
|
||||
}
|
||||
|
||||
object KafkaConsole {
|
||||
val log: Logger = LoggerFactory.getLogger(this.getClass)
|
||||
|
||||
def getCommittedOffsets(admin: Admin, groupId: String,
|
||||
timeoutMs: Integer): Map[TopicPartition, OffsetAndMetadata] = {
|
||||
@@ -177,4 +202,57 @@ object KafkaConsole {
|
||||
}.toMap
|
||||
res
|
||||
}
|
||||
implicit val ec = ExecutionContext.fromExecutorService(Executors.newFixedThreadPool(2))
|
||||
}
|
||||
|
||||
object AdminCache {
|
||||
|
||||
private val log: Logger = LoggerFactory.getLogger(this.getClass)
|
||||
|
||||
private val cacheLoader = new CacheLoader[String, Admin] {
|
||||
override def load(key: String): Admin = KafkaConsole.createAdminClient()
|
||||
}
|
||||
|
||||
private val removeListener = new RemovalListener[String, Admin] {
|
||||
override def onRemoval(notification: RemovalNotification[String, Admin]): Unit = {
|
||||
Future {
|
||||
log.warn("Close expired admin connection: {}", notification.getKey)
|
||||
notification.getValue.close()
|
||||
log.warn("Close expired admin connection complete: {}", notification.getKey)
|
||||
}(KafkaConsole.ec)
|
||||
|
||||
}
|
||||
}
|
||||
val cache = new TimeBasedCache[String, Admin](cacheLoader, removeListener)
|
||||
}
|
||||
|
||||
object ConsumerCache {
|
||||
|
||||
private val log: Logger = LoggerFactory.getLogger(this.getClass)
|
||||
|
||||
private val threadLocal = new ThreadLocal[Properties]
|
||||
|
||||
private val cacheLoader = new CacheLoader[String, KafkaConsumer[Array[Byte], Array[Byte]]] {
|
||||
override def load(key: String): KafkaConsumer[Array[Byte], Array[Byte]] = KafkaConsole.createByteArrayKVConsumer(threadLocal.get())
|
||||
}
|
||||
|
||||
private val removeListener = new RemovalListener[String, KafkaConsumer[Array[Byte], Array[Byte]]] {
|
||||
override def onRemoval(notification: RemovalNotification[String, KafkaConsumer[Array[Byte], Array[Byte]]]): Unit = {
|
||||
Future {
|
||||
log.warn("Close expired consumer connection: {}", notification.getKey)
|
||||
notification.getValue.close()
|
||||
log.warn("Close expired consumer connection complete: {}", notification.getKey)
|
||||
}(KafkaConsole.ec)
|
||||
|
||||
}
|
||||
}
|
||||
val cache = new TimeBasedCache[String, KafkaConsumer[Array[Byte], Array[Byte]]](cacheLoader, removeListener)
|
||||
|
||||
def setProperties(props : Properties) : Unit = {
|
||||
threadLocal.set(props)
|
||||
}
|
||||
|
||||
def clear() : Unit = {
|
||||
threadLocal.remove()
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user