支持缓存生产者连接,缓存连接默认关闭

This commit is contained in:
许晓东
2022-07-09 18:54:20 +08:00
parent cc1989a74b
commit 727edfcca8
2 changed files with 56 additions and 9 deletions

View File

@@ -12,10 +12,14 @@ kafka:
# 集群其它属性配置
properties:
# request.timeout.ms: 5000
# 缓存连接
cache-admin-connection: true
# 缓存连接,不缓存的情况下,每次请求建立连接. 即使每次请求建立连接其实也很快某些情况下开启ACL查询可能很慢可以设置连接缓存为true
# 或者想提高查询速度,也可以设置下面连接缓存为true
# 缓存 admin client的连接
cache-admin-connection: false
# 缓存 producer的连接
cache-producer-connection: false
cache-consumer-connection: true
# 缓存 consumer的连接
cache-consumer-connection: false
spring:
application:

View File

@@ -72,17 +72,18 @@ class KafkaConsole(config: KafkaConfig) {
protected def withProducerAndCatchError(f: KafkaProducer[String, String] => Any, eh: Exception => Any,
extra: Properties = new Properties()): Any = {
val props = getProps()
props.putAll(extra)
props.put(ConsumerConfig.CLIENT_ID_CONFIG, String.valueOf(System.currentTimeMillis()))
val producer = new KafkaProducer[String, String](props, new StringSerializer, new StringSerializer)
ProducerCache.setProperties(extra)
val producer = if (config.isCacheProducerConnection) ProducerCache.cache.get(ContextConfigHolder.CONTEXT_CONFIG.get().getBootstrapServer) else KafkaConsole.createProducer(extra)
try {
f(producer)
} catch {
case er: Exception => eh(er)
}
finally {
producer.close()
ProducerCache.clearProperties()
if (!config.isCacheProducerConnection) {
producer.close()
}
}
}
@@ -90,7 +91,6 @@ class KafkaConsole(config: KafkaConfig) {
extra: Properties = new Properties()): Any = {
val props = getProps()
props.putAll(extra)
props.put(ConsumerConfig.CLIENT_ID_CONFIG, String.valueOf(System.currentTimeMillis()))
val producer = new KafkaProducer[Array[Byte], Array[Byte]](props, new ByteArraySerializer, new ByteArraySerializer)
try {
f(producer)
@@ -146,6 +146,18 @@ object KafkaConsole {
new KafkaConsumer(props, new ByteArrayDeserializer, new ByteArrayDeserializer)
}
def createProducer(extra: Properties) : KafkaProducer[String, String] = {
val props = getProps()
props.putAll(extra)
new KafkaProducer(props, new StringSerializer, new StringSerializer)
}
def createByteArrayStringProducer(extra: Properties) : KafkaProducer[Array[Byte], Array[Byte]] = {
val props = getProps()
props.putAll(extra)
new KafkaProducer(props, new ByteArraySerializer, new ByteArraySerializer)
}
def getProps(): Properties = {
val props: Properties = new Properties();
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, ContextConfigHolder.CONTEXT_CONFIG.get().getBootstrapServer())
@@ -253,6 +265,37 @@ object ConsumerCache {
threadLocal.set(props)
}
def clearProperties() : Unit = {
threadLocal.remove()
}
}
object ProducerCache {
private val log: Logger = LoggerFactory.getLogger(this.getClass)
private val threadLocal = new ThreadLocal[Properties]
private val cacheLoader = new CacheLoader[String, KafkaProducer[String, String]] {
override def load(key: String): KafkaProducer[String, String] = KafkaConsole.createProducer(threadLocal.get())
}
private val removeListener = new RemovalListener[String, KafkaProducer[String, String]] {
override def onRemoval(notification: RemovalNotification[String, KafkaProducer[String, String]]): Unit = {
Future {
log.warn("Close expired producer connection: {}", notification.getKey)
notification.getValue.close()
log.warn("Close expired producer connection complete: {}", notification.getKey)
}(KafkaConsole.ec)
}
}
val cache = new TimeBasedCache[String, KafkaProducer[String, String]](cacheLoader, removeListener)
def setProperties(props : Properties) : Unit = {
threadLocal.set(props)
}
def clearProperties() : Unit = {
threadLocal.remove()
}