diff --git a/src/main/java/com/xuxd/kafka/console/cache/TimeBasedCache.java b/src/main/java/com/xuxd/kafka/console/cache/TimeBasedCache.java new file mode 100644 index 0000000..12c9f99 --- /dev/null +++ b/src/main/java/com/xuxd/kafka/console/cache/TimeBasedCache.java @@ -0,0 +1,33 @@ +package com.xuxd.kafka.console.cache; + +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.CacheLoader; +import com.google.common.cache.LoadingCache; +import com.google.common.cache.RemovalListener; +import kafka.console.KafkaConsole; + +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; + +public class TimeBasedCache { + private LoadingCache cache; + + private KafkaConsole console; + + public TimeBasedCache(CacheLoader loader, RemovalListener listener) { + cache = CacheBuilder.newBuilder() + .maximumSize(50) // maximum 100 records can be cached + .expireAfterAccess(30, TimeUnit.MINUTES) // cache will expire after 30 minutes of access + .removalListener(listener) + .build(loader); + + } + + public V get(K k) { + try { + return cache.get(k); + } catch (ExecutionException e) { + throw new RuntimeException("Get connection from cache error.", e); + } + } +} diff --git a/src/main/java/com/xuxd/kafka/console/config/KafkaConfig.java b/src/main/java/com/xuxd/kafka/console/config/KafkaConfig.java index f3a0592..4bd28cf 100644 --- a/src/main/java/com/xuxd/kafka/console/config/KafkaConfig.java +++ b/src/main/java/com/xuxd/kafka/console/config/KafkaConfig.java @@ -20,6 +20,12 @@ public class KafkaConfig { private Properties properties; + private boolean cacheAdminConnection; + + private boolean cacheProducerConnection; + + private boolean cacheConsumerConnection; + public String getBootstrapServer() { return bootstrapServer; } @@ -43,4 +49,28 @@ public class KafkaConfig { public void setProperties(Properties properties) { this.properties = properties; } + + public boolean isCacheAdminConnection() { + return cacheAdminConnection; + } + + public void setCacheAdminConnection(boolean cacheAdminConnection) { + this.cacheAdminConnection = cacheAdminConnection; + } + + public boolean isCacheProducerConnection() { + return cacheProducerConnection; + } + + public void setCacheProducerConnection(boolean cacheProducerConnection) { + this.cacheProducerConnection = cacheProducerConnection; + } + + public boolean isCacheConsumerConnection() { + return cacheConsumerConnection; + } + + public void setCacheConsumerConnection(boolean cacheConsumerConnection) { + this.cacheConsumerConnection = cacheConsumerConnection; + } } diff --git a/src/main/java/com/xuxd/kafka/console/interceptor/ContextSetFilter.java b/src/main/java/com/xuxd/kafka/console/service/interceptor/ContextSetFilter.java similarity index 98% rename from src/main/java/com/xuxd/kafka/console/interceptor/ContextSetFilter.java rename to src/main/java/com/xuxd/kafka/console/service/interceptor/ContextSetFilter.java index 43421c2..57725b1 100644 --- a/src/main/java/com/xuxd/kafka/console/interceptor/ContextSetFilter.java +++ b/src/main/java/com/xuxd/kafka/console/service/interceptor/ContextSetFilter.java @@ -1,4 +1,4 @@ -package com.xuxd.kafka.console.interceptor; +package com.xuxd.kafka.console.service.interceptor; import com.xuxd.kafka.console.beans.ResponseData; import com.xuxd.kafka.console.beans.dos.ClusterInfoDO; diff --git a/src/main/java/com/xuxd/kafka/console/interceptor/GlobalExceptionHandler.java b/src/main/java/com/xuxd/kafka/console/service/interceptor/GlobalExceptionHandler.java similarity index 93% rename from src/main/java/com/xuxd/kafka/console/interceptor/GlobalExceptionHandler.java rename to src/main/java/com/xuxd/kafka/console/service/interceptor/GlobalExceptionHandler.java index 7715f45..1e6c17d 100644 --- a/src/main/java/com/xuxd/kafka/console/interceptor/GlobalExceptionHandler.java +++ b/src/main/java/com/xuxd/kafka/console/service/interceptor/GlobalExceptionHandler.java @@ -1,4 +1,4 @@ -package com.xuxd.kafka.console.interceptor; +package com.xuxd.kafka.console.service.interceptor; import com.xuxd.kafka.console.beans.ResponseData; import javax.servlet.http.HttpServletRequest; diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 63f6641..111f947 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -12,6 +12,10 @@ kafka: # 集群其它属性配置 properties: # request.timeout.ms: 5000 + # 缓存连接 + cache-admin-connection: true + cache-producer-connection: false + cache-consumer-connection: true spring: application: diff --git a/src/main/scala/kafka/console/KafkaConsole.scala b/src/main/scala/kafka/console/KafkaConsole.scala index 3b64314..33d1345 100644 --- a/src/main/scala/kafka/console/KafkaConsole.scala +++ b/src/main/scala/kafka/console/KafkaConsole.scala @@ -1,18 +1,21 @@ package kafka.console +import com.google.common.cache.{CacheLoader, RemovalListener, RemovalNotification} +import com.xuxd.kafka.console.cache.TimeBasedCache import com.xuxd.kafka.console.config.{ContextConfigHolder, KafkaConfig} -import kafka.zk.{AdminZkClient, KafkaZkClient} +import kafka.zk.AdminZkClient import org.apache.kafka.clients.admin._ import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer, OffsetAndMetadata} import org.apache.kafka.clients.producer.KafkaProducer import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.requests.ListOffsetsResponse import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySerializer, StringSerializer} -import org.apache.kafka.common.utils.Time import org.slf4j.{Logger, LoggerFactory} import java.util.Properties +import java.util.concurrent.Executors import scala.collection.{Map, Seq} +import scala.concurrent.{ExecutionContext, Future} import scala.jdk.CollectionConverters.{MapHasAsJava, MapHasAsScala} /** @@ -27,11 +30,13 @@ class KafkaConsole(config: KafkaConfig) { protected def withAdminClient(f: Admin => Any): Any = { - val admin = createAdminClient() + val admin = if (config.isCacheAdminConnection()) AdminCache.cache.get(ContextConfigHolder.CONTEXT_CONFIG.get().getBootstrapServer()) else createAdminClient() try { f(admin) } finally { - admin.close() + if (!config.isCacheAdminConnection) { + admin.close() + } } } @@ -45,17 +50,22 @@ class KafkaConsole(config: KafkaConfig) { protected def withConsumerAndCatchError(f: KafkaConsumer[Array[Byte], Array[Byte]] => Any, eh: Exception => Any, extra: Properties = new Properties()): Any = { - val props = getProps() - props.putAll(extra) - props.put(ConsumerConfig.CLIENT_ID_CONFIG, String.valueOf(System.currentTimeMillis())) - val consumer = new KafkaConsumer(props, new ByteArrayDeserializer, new ByteArrayDeserializer) +// val props = getProps() +// props.putAll(extra) +// props.put(ConsumerConfig.CLIENT_ID_CONFIG, String.valueOf(System.currentTimeMillis())) +// val consumer = new KafkaConsumer(props, new ByteArrayDeserializer, new ByteArrayDeserializer) + ConsumerCache.setProperties(extra) + val consumer = if (config.isCacheConsumerConnection) ConsumerCache.cache.get(ContextConfigHolder.CONTEXT_CONFIG.get().getBootstrapServer()) else KafkaConsole.createByteArrayKVConsumer(extra) + try { f(consumer) } catch { case er: Exception => eh(er) } finally { - consumer.close() + if (!config.isCacheConsumerConnection) { + consumer.close() + } } } @@ -113,20 +123,35 @@ class KafkaConsole(config: KafkaConfig) { } private def createAdminClient(): Admin = { - Admin.create(getProps()) + KafkaConsole.createAdminClient() } private def getProps(): Properties = { + KafkaConsole.getProps() + } +} + +object KafkaConsole { + val log: Logger = LoggerFactory.getLogger(this.getClass) + + def createAdminClient(): Admin = { + Admin.create(getProps()) + } + + def createByteArrayKVConsumer(extra: Properties) : KafkaConsumer[Array[Byte], Array[Byte]] = { + val props = getProps() + props.putAll(extra) + props.put(ConsumerConfig.CLIENT_ID_CONFIG, String.valueOf(System.currentTimeMillis())) + new KafkaConsumer(props, new ByteArrayDeserializer, new ByteArrayDeserializer) + } + + def getProps(): Properties = { val props: Properties = new Properties(); props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, ContextConfigHolder.CONTEXT_CONFIG.get().getBootstrapServer()) props.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, ContextConfigHolder.CONTEXT_CONFIG.get().getRequestTimeoutMs()) props.putAll(ContextConfigHolder.CONTEXT_CONFIG.get().getProperties()) props } -} - -object KafkaConsole { - val log: Logger = LoggerFactory.getLogger(this.getClass) def getCommittedOffsets(admin: Admin, groupId: String, timeoutMs: Integer): Map[TopicPartition, OffsetAndMetadata] = { @@ -177,4 +202,57 @@ object KafkaConsole { }.toMap res } + implicit val ec = ExecutionContext.fromExecutorService(Executors.newFixedThreadPool(2)) } + +object AdminCache { + + private val log: Logger = LoggerFactory.getLogger(this.getClass) + + private val cacheLoader = new CacheLoader[String, Admin] { + override def load(key: String): Admin = KafkaConsole.createAdminClient() + } + + private val removeListener = new RemovalListener[String, Admin] { + override def onRemoval(notification: RemovalNotification[String, Admin]): Unit = { + Future { + log.warn("Close expired admin connection: {}", notification.getKey) + notification.getValue.close() + log.warn("Close expired admin connection complete: {}", notification.getKey) + }(KafkaConsole.ec) + + } + } + val cache = new TimeBasedCache[String, Admin](cacheLoader, removeListener) +} + +object ConsumerCache { + + private val log: Logger = LoggerFactory.getLogger(this.getClass) + + private val threadLocal = new ThreadLocal[Properties] + + private val cacheLoader = new CacheLoader[String, KafkaConsumer[Array[Byte], Array[Byte]]] { + override def load(key: String): KafkaConsumer[Array[Byte], Array[Byte]] = KafkaConsole.createByteArrayKVConsumer(threadLocal.get()) + } + + private val removeListener = new RemovalListener[String, KafkaConsumer[Array[Byte], Array[Byte]]] { + override def onRemoval(notification: RemovalNotification[String, KafkaConsumer[Array[Byte], Array[Byte]]]): Unit = { + Future { + log.warn("Close expired consumer connection: {}", notification.getKey) + notification.getValue.close() + log.warn("Close expired consumer connection complete: {}", notification.getKey) + }(KafkaConsole.ec) + + } + } + val cache = new TimeBasedCache[String, KafkaConsumer[Array[Byte], Array[Byte]]](cacheLoader, removeListener) + + def setProperties(props : Properties) : Unit = { + threadLocal.set(props) + } + + def clear() : Unit = { + threadLocal.remove() + } +} \ No newline at end of file