客户端限流console.
This commit is contained in:
77
src/main/scala/kafka/console/ClientQuotaConsole.scala
Normal file
77
src/main/scala/kafka/console/ClientQuotaConsole.scala
Normal file
@@ -0,0 +1,77 @@
|
||||
package kafka.console
|
||||
|
||||
import com.xuxd.kafka.console.config.KafkaConfig
|
||||
import kafka.server.ConfigType
|
||||
import org.apache.kafka.clients.admin.{Admin, AlterClientQuotasOptions}
|
||||
import org.apache.kafka.common.quota.{ClientQuotaAlteration, ClientQuotaEntity, ClientQuotaFilter, ClientQuotaFilterComponent}
|
||||
|
||||
import java.util.Collections
|
||||
import java.util.concurrent.TimeUnit
|
||||
import scala.jdk.CollectionConverters.{IterableHasAsJava, ListHasAsScala, SeqHasAsJava}
|
||||
|
||||
/**
|
||||
* client quota console.
|
||||
*
|
||||
* @author xuxd
|
||||
* @date 2022-12-30 10:55:56
|
||||
* */
|
||||
class ClientQuotaConsole(config: KafkaConfig) extends KafkaConsole(config: KafkaConfig) with Logging {
|
||||
|
||||
|
||||
def getClientQuotasConfigs(entityTypes: java.util.List[String], entityNames: java.util.List[String]): java.util.Map[ClientQuotaEntity, java.util.Map[String, Double]] = {
|
||||
withAdminClientAndCatchError(admin => getAllClientQuotasConfigs(admin, entityTypes.asScala.toList, entityNames.asScala.toList),
|
||||
e => {
|
||||
log.error("getAllClientQuotasConfigs error.", e)
|
||||
java.util.Collections.emptyMap()
|
||||
})
|
||||
}.asInstanceOf[java.util.Map[ClientQuotaEntity, java.util.Map[String, Double]]]
|
||||
|
||||
private def getAllClientQuotasConfigs(adminClient: Admin, entityTypes: List[String], entityNames: List[String]): java.util.Map[ClientQuotaEntity, java.util.Map[String, Double]] = {
|
||||
val components = entityTypes.map(Some(_)).zipAll(entityNames.map(Some(_)), None, None).map { case (entityTypeOpt, entityNameOpt) =>
|
||||
val entityType = entityTypeOpt match {
|
||||
case Some(ConfigType.User) => ClientQuotaEntity.USER
|
||||
case Some(ConfigType.Client) => ClientQuotaEntity.CLIENT_ID
|
||||
case Some(ConfigType.Ip) => ClientQuotaEntity.IP
|
||||
case Some(_) => throw new IllegalArgumentException(s"Unexpected entity type ${entityTypeOpt.get}")
|
||||
case None => throw new IllegalArgumentException("More entity names specified than entity types")
|
||||
}
|
||||
entityNameOpt match {
|
||||
case Some("") => ClientQuotaFilterComponent.ofDefaultEntity(entityType)
|
||||
case Some(name) => ClientQuotaFilterComponent.ofEntity(entityType, name)
|
||||
case None => ClientQuotaFilterComponent.ofEntityType(entityType)
|
||||
}
|
||||
}
|
||||
|
||||
adminClient.describeClientQuotas(ClientQuotaFilter.containsOnly(components.asJava)).entities.get(30, TimeUnit.SECONDS)
|
||||
}.asInstanceOf[java.util.Map[ClientQuotaEntity, java.util.Map[String, Double]]]
|
||||
|
||||
private def alterQuotaConfigs(adminClient: Admin, entityTypes: List[String], entityNames: List[String], configsToBeAddedMap: Map[String, String], configsToBeDeleted: Seq[String]) = {
|
||||
// handle altering client/user quota configs
|
||||
val oldConfig = getAllClientQuotasConfigs(adminClient, entityTypes, entityNames)
|
||||
|
||||
val alterEntityTypes = entityTypes.map {
|
||||
case ConfigType.User => ClientQuotaEntity.USER
|
||||
case ConfigType.Client => ClientQuotaEntity.CLIENT_ID
|
||||
case ConfigType.Ip => ClientQuotaEntity.IP
|
||||
case entType => throw new IllegalArgumentException(s"Unexpected entity type: $entType")
|
||||
}
|
||||
val alterEntityNames = entityNames.map(en => if (en.nonEmpty) en else null)
|
||||
|
||||
// Explicitly populate a HashMap to ensure nulls are recorded properly.
|
||||
val alterEntityMap = new java.util.HashMap[String, String]
|
||||
alterEntityTypes.zip(alterEntityNames).foreach { case (k, v) => alterEntityMap.put(k, v) }
|
||||
val entity = new ClientQuotaEntity(alterEntityMap)
|
||||
|
||||
val alterOptions = new AlterClientQuotasOptions().validateOnly(false)
|
||||
val alterOps = (configsToBeAddedMap.map { case (key, value) =>
|
||||
val doubleValue = try value.toDouble catch {
|
||||
case _: NumberFormatException =>
|
||||
throw new IllegalArgumentException(s"Cannot parse quota configuration value for $key: $value")
|
||||
}
|
||||
new ClientQuotaAlteration.Op(key, doubleValue)
|
||||
} ++ configsToBeDeleted.map(key => new ClientQuotaAlteration.Op(key, null))).asJavaCollection
|
||||
|
||||
adminClient.alterClientQuotas(Collections.singleton(new ClientQuotaAlteration(entity, alterOps)), alterOptions)
|
||||
.all().get(60, TimeUnit.SECONDS)
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,28 @@
|
||||
package com.xuxd.kafka.console.scala;
|
||||
|
||||
import com.xuxd.kafka.console.config.ContextConfig;
|
||||
import com.xuxd.kafka.console.config.ContextConfigHolder;
|
||||
import com.xuxd.kafka.console.config.KafkaConfig;
|
||||
import kafka.console.ClientQuotaConsole;
|
||||
import kafka.server.ConfigType;
|
||||
import org.apache.kafka.common.quota.ClientQuotaEntity;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Map;
|
||||
|
||||
public class ClientQuotaConsoleTest {
|
||||
|
||||
@Test
|
||||
void testGetClientQuotasConfigs() {
|
||||
ClientQuotaConsole console = new ClientQuotaConsole(new KafkaConfig());
|
||||
ContextConfig config = new ContextConfig();
|
||||
config.setBootstrapServer("10.1.18.222:9092");
|
||||
ContextConfigHolder.CONTEXT_CONFIG.set(config);
|
||||
Map<ClientQuotaEntity, Map<String, Object>> configs = console.getClientQuotasConfigs(Arrays.asList(ConfigType.User()), Arrays.asList());
|
||||
configs.forEach((k, v) -> {
|
||||
System.out.println(k);
|
||||
System.out.println(v);
|
||||
});
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user