diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 111f947..8400628 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -12,10 +12,14 @@ kafka: # 集群其它属性配置 properties: # request.timeout.ms: 5000 - # 缓存连接 - cache-admin-connection: true + # 缓存连接,不缓存的情况下,每次请求建立连接. 即使每次请求建立连接,其实也很快,某些情况下开启ACL,查询可能很慢,可以设置连接缓存为true, + # 或者想提高查询速度,也可以设置下面连接缓存为true + # 缓存 admin client的连接 + cache-admin-connection: false + # 缓存 producer的连接 cache-producer-connection: false - cache-consumer-connection: true + # 缓存 consumer的连接 + cache-consumer-connection: false spring: application: diff --git a/src/main/scala/kafka/console/KafkaConsole.scala b/src/main/scala/kafka/console/KafkaConsole.scala index cd8414c..74b6949 100644 --- a/src/main/scala/kafka/console/KafkaConsole.scala +++ b/src/main/scala/kafka/console/KafkaConsole.scala @@ -72,17 +72,18 @@ class KafkaConsole(config: KafkaConfig) { protected def withProducerAndCatchError(f: KafkaProducer[String, String] => Any, eh: Exception => Any, extra: Properties = new Properties()): Any = { - val props = getProps() - props.putAll(extra) - props.put(ConsumerConfig.CLIENT_ID_CONFIG, String.valueOf(System.currentTimeMillis())) - val producer = new KafkaProducer[String, String](props, new StringSerializer, new StringSerializer) + ProducerCache.setProperties(extra) + val producer = if (config.isCacheProducerConnection) ProducerCache.cache.get(ContextConfigHolder.CONTEXT_CONFIG.get().getBootstrapServer) else KafkaConsole.createProducer(extra) try { f(producer) } catch { case er: Exception => eh(er) } finally { - producer.close() + ProducerCache.clearProperties() + if (!config.isCacheProducerConnection) { + producer.close() + } } } @@ -90,7 +91,6 @@ class KafkaConsole(config: KafkaConfig) { extra: Properties = new Properties()): Any = { val props = getProps() props.putAll(extra) - props.put(ConsumerConfig.CLIENT_ID_CONFIG, String.valueOf(System.currentTimeMillis())) val producer = new KafkaProducer[Array[Byte], Array[Byte]](props, new ByteArraySerializer, new ByteArraySerializer) try { f(producer) @@ -146,6 +146,18 @@ object KafkaConsole { new KafkaConsumer(props, new ByteArrayDeserializer, new ByteArrayDeserializer) } + def createProducer(extra: Properties) : KafkaProducer[String, String] = { + val props = getProps() + props.putAll(extra) + new KafkaProducer(props, new StringSerializer, new StringSerializer) + } + + def createByteArrayStringProducer(extra: Properties) : KafkaProducer[Array[Byte], Array[Byte]] = { + val props = getProps() + props.putAll(extra) + new KafkaProducer(props, new ByteArraySerializer, new ByteArraySerializer) + } + def getProps(): Properties = { val props: Properties = new Properties(); props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, ContextConfigHolder.CONTEXT_CONFIG.get().getBootstrapServer()) @@ -253,6 +265,37 @@ object ConsumerCache { threadLocal.set(props) } + def clearProperties() : Unit = { + threadLocal.remove() + } +} + +object ProducerCache { + + private val log: Logger = LoggerFactory.getLogger(this.getClass) + + private val threadLocal = new ThreadLocal[Properties] + + private val cacheLoader = new CacheLoader[String, KafkaProducer[String, String]] { + override def load(key: String): KafkaProducer[String, String] = KafkaConsole.createProducer(threadLocal.get()) + } + + private val removeListener = new RemovalListener[String, KafkaProducer[String, String]] { + override def onRemoval(notification: RemovalNotification[String, KafkaProducer[String, String]]): Unit = { + Future { + log.warn("Close expired producer connection: {}", notification.getKey) + notification.getValue.close() + log.warn("Close expired producer connection complete: {}", notification.getKey) + }(KafkaConsole.ec) + + } + } + val cache = new TimeBasedCache[String, KafkaProducer[String, String]](cacheLoader, removeListener) + + def setProperties(props : Properties) : Unit = { + threadLocal.set(props) + } + def clearProperties() : Unit = { threadLocal.remove() }