diff --git a/src/main/scala/kafka/console/ClientQuotaConsole.scala b/src/main/scala/kafka/console/ClientQuotaConsole.scala index bcd44cc..c24caac 100644 --- a/src/main/scala/kafka/console/ClientQuotaConsole.scala +++ b/src/main/scala/kafka/console/ClientQuotaConsole.scala @@ -1,13 +1,12 @@ package kafka.console import com.xuxd.kafka.console.config.KafkaConfig -import kafka.server.ConfigType import org.apache.kafka.clients.admin.{Admin, AlterClientQuotasOptions} import org.apache.kafka.common.quota.{ClientQuotaAlteration, ClientQuotaEntity, ClientQuotaFilter, ClientQuotaFilterComponent} import java.util.Collections import java.util.concurrent.TimeUnit -import scala.jdk.CollectionConverters.{IterableHasAsJava, ListHasAsScala, SeqHasAsJava} +import scala.jdk.CollectionConverters.{IterableHasAsJava, ListHasAsScala, MapHasAsScala, SeqHasAsJava} /** * client quota console. @@ -26,40 +25,46 @@ class ClientQuotaConsole(config: KafkaConfig) extends KafkaConsole(config: Kafka }) }.asInstanceOf[java.util.Map[ClientQuotaEntity, java.util.Map[String, Double]]] + def addQuotaConfigs(entityTypes: java.util.List[String], entityNames: java.util.List[String], configsToBeAddedMap: java.util.Map[String, String]): (Boolean, String) = { + alterQuotaConfigs(entityTypes, entityNames, configsToBeAddedMap, Collections.emptyList()) + } + + def deleteQuotaConfigs(entityTypes: java.util.List[String], entityNames: java.util.List[String], configsToBeDeleted: java.util.List[String]): (Boolean, String) = { + alterQuotaConfigs(entityTypes, entityNames, Collections.emptyMap(), configsToBeDeleted) + } + + def alterQuotaConfigs(entityTypes: java.util.List[String], entityNames: java.util.List[String], configsToBeAddedMap: java.util.Map[String, String], configsToBeDeleted: java.util.List[String]): (Boolean, String) = { + withAdminClientAndCatchError(admin => alterQuotaConfigsInner(admin, entityTypes.asScala.toList, entityNames.asScala.toList, configsToBeAddedMap.asScala.toMap, configsToBeDeleted.asScala.toSeq), + e => { + log.error("getAllClientQuotasConfigs error.", e) + (false, e.getMessage) + }) + (true, "") + } + private def getAllClientQuotasConfigs(adminClient: Admin, entityTypes: List[String], entityNames: List[String]): java.util.Map[ClientQuotaEntity, java.util.Map[String, Double]] = { - val components = entityTypes.map(Some(_)).zipAll(entityNames.map(Some(_)), None, None).map { case (entityTypeOpt, entityNameOpt) => - val entityType = entityTypeOpt match { - case Some(ConfigType.User) => ClientQuotaEntity.USER - case Some(ConfigType.Client) => ClientQuotaEntity.CLIENT_ID - case Some(ConfigType.Ip) => ClientQuotaEntity.IP - case Some(_) => throw new IllegalArgumentException(s"Unexpected entity type ${entityTypeOpt.get}") - case None => throw new IllegalArgumentException("More entity names specified than entity types") - } + val components = entityTypes.map(Some(_)).zipAll(entityNames.map(Some(_)), None, None).map { case (entityType, entityNameOpt) => entityNameOpt match { - case Some("") => ClientQuotaFilterComponent.ofDefaultEntity(entityType) - case Some(name) => ClientQuotaFilterComponent.ofEntity(entityType, name) - case None => ClientQuotaFilterComponent.ofEntityType(entityType) + case Some("") => ClientQuotaFilterComponent.ofDefaultEntity(entityType.get) + case Some(name) => ClientQuotaFilterComponent.ofEntity(entityType.get, name) + case None => ClientQuotaFilterComponent.ofEntityType(entityType.get) } } adminClient.describeClientQuotas(ClientQuotaFilter.containsOnly(components.asJava)).entities.get(30, TimeUnit.SECONDS) }.asInstanceOf[java.util.Map[ClientQuotaEntity, java.util.Map[String, Double]]] - private def alterQuotaConfigs(adminClient: Admin, entityTypes: List[String], entityNames: List[String], configsToBeAddedMap: Map[String, String], configsToBeDeleted: Seq[String]) = { + private def alterQuotaConfigsInner(adminClient: Admin, entityTypes: List[String], entityNames: List[String], configsToBeAddedMap: Map[String, String], configsToBeDeleted: Seq[String]) = { // handle altering client/user quota configs - val oldConfig = getAllClientQuotasConfigs(adminClient, entityTypes, entityNames) + // val oldConfig = getAllClientQuotasConfigs(adminClient, entityTypes, entityNames) + // val invalidConfigs = configsToBeDeleted.filterNot(oldConfig.asScala.toMap.contains) + // if (invalidConfigs.nonEmpty) + // throw new InvalidConfigurationException(s"Invalid config(s): ${invalidConfigs.mkString(",")}") - val alterEntityTypes = entityTypes.map { - case ConfigType.User => ClientQuotaEntity.USER - case ConfigType.Client => ClientQuotaEntity.CLIENT_ID - case ConfigType.Ip => ClientQuotaEntity.IP - case entType => throw new IllegalArgumentException(s"Unexpected entity type: $entType") - } val alterEntityNames = entityNames.map(en => if (en.nonEmpty) en else null) - // Explicitly populate a HashMap to ensure nulls are recorded properly. val alterEntityMap = new java.util.HashMap[String, String] - alterEntityTypes.zip(alterEntityNames).foreach { case (k, v) => alterEntityMap.put(k, v) } + entityTypes.zip(alterEntityNames).foreach { case (k, v) => alterEntityMap.put(k, v) } val entity = new ClientQuotaEntity(alterEntityMap) val alterOptions = new AlterClientQuotasOptions().validateOnly(false) diff --git a/src/test/java/com/xuxd/kafka/console/scala/ClientQuotaConsoleTest.java b/src/test/java/com/xuxd/kafka/console/scala/ClientQuotaConsoleTest.java index 735e8d7..32850c2 100644 --- a/src/test/java/com/xuxd/kafka/console/scala/ClientQuotaConsoleTest.java +++ b/src/test/java/com/xuxd/kafka/console/scala/ClientQuotaConsoleTest.java @@ -4,25 +4,40 @@ import com.xuxd.kafka.console.config.ContextConfig; import com.xuxd.kafka.console.config.ContextConfigHolder; import com.xuxd.kafka.console.config.KafkaConfig; import kafka.console.ClientQuotaConsole; -import kafka.server.ConfigType; +import org.apache.kafka.common.config.internals.QuotaConfigs; import org.apache.kafka.common.quota.ClientQuotaEntity; import org.junit.jupiter.api.Test; import java.util.Arrays; +import java.util.HashMap; import java.util.Map; public class ClientQuotaConsoleTest { + String bootstrapServer = "127.0.0.1:9092"; + @Test void testGetClientQuotasConfigs() { ClientQuotaConsole console = new ClientQuotaConsole(new KafkaConfig()); ContextConfig config = new ContextConfig(); - config.setBootstrapServer("127.0.0.1:9092"); + config.setBootstrapServer(bootstrapServer); ContextConfigHolder.CONTEXT_CONFIG.set(config); - Map> configs = console.getClientQuotasConfigs(Arrays.asList(ConfigType.User()), Arrays.asList()); + Map> configs = console.getClientQuotasConfigs(Arrays.asList(ClientQuotaEntity.USER), Arrays.asList()); configs.forEach((k, v) -> { System.out.println(k); System.out.println(v); }); } + + @Test + void testAlterClientQuotasConfigs() { + ClientQuotaConsole console = new ClientQuotaConsole(new KafkaConfig()); + ContextConfig config = new ContextConfig(); + config.setBootstrapServer(bootstrapServer); + ContextConfigHolder.CONTEXT_CONFIG.set(config); + Map configsToBeAddedMap = new HashMap<>(); + configsToBeAddedMap.put(QuotaConfigs.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG, "102400"); + + console.addQuotaConfigs(Arrays.asList(ClientQuotaEntity.USER), Arrays.asList("user-test"), configsToBeAddedMap); + } }