客户端限流console
This commit is contained in:
@@ -1,13 +1,12 @@
|
|||||||
package kafka.console
|
package kafka.console
|
||||||
|
|
||||||
import com.xuxd.kafka.console.config.KafkaConfig
|
import com.xuxd.kafka.console.config.KafkaConfig
|
||||||
import kafka.server.ConfigType
|
|
||||||
import org.apache.kafka.clients.admin.{Admin, AlterClientQuotasOptions}
|
import org.apache.kafka.clients.admin.{Admin, AlterClientQuotasOptions}
|
||||||
import org.apache.kafka.common.quota.{ClientQuotaAlteration, ClientQuotaEntity, ClientQuotaFilter, ClientQuotaFilterComponent}
|
import org.apache.kafka.common.quota.{ClientQuotaAlteration, ClientQuotaEntity, ClientQuotaFilter, ClientQuotaFilterComponent}
|
||||||
|
|
||||||
import java.util.Collections
|
import java.util.Collections
|
||||||
import java.util.concurrent.TimeUnit
|
import java.util.concurrent.TimeUnit
|
||||||
import scala.jdk.CollectionConverters.{IterableHasAsJava, ListHasAsScala, SeqHasAsJava}
|
import scala.jdk.CollectionConverters.{IterableHasAsJava, ListHasAsScala, MapHasAsScala, SeqHasAsJava}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* client quota console.
|
* client quota console.
|
||||||
@@ -26,40 +25,46 @@ class ClientQuotaConsole(config: KafkaConfig) extends KafkaConsole(config: Kafka
|
|||||||
})
|
})
|
||||||
}.asInstanceOf[java.util.Map[ClientQuotaEntity, java.util.Map[String, Double]]]
|
}.asInstanceOf[java.util.Map[ClientQuotaEntity, java.util.Map[String, Double]]]
|
||||||
|
|
||||||
|
def addQuotaConfigs(entityTypes: java.util.List[String], entityNames: java.util.List[String], configsToBeAddedMap: java.util.Map[String, String]): (Boolean, String) = {
|
||||||
|
alterQuotaConfigs(entityTypes, entityNames, configsToBeAddedMap, Collections.emptyList())
|
||||||
|
}
|
||||||
|
|
||||||
|
def deleteQuotaConfigs(entityTypes: java.util.List[String], entityNames: java.util.List[String], configsToBeDeleted: java.util.List[String]): (Boolean, String) = {
|
||||||
|
alterQuotaConfigs(entityTypes, entityNames, Collections.emptyMap(), configsToBeDeleted)
|
||||||
|
}
|
||||||
|
|
||||||
|
def alterQuotaConfigs(entityTypes: java.util.List[String], entityNames: java.util.List[String], configsToBeAddedMap: java.util.Map[String, String], configsToBeDeleted: java.util.List[String]): (Boolean, String) = {
|
||||||
|
withAdminClientAndCatchError(admin => alterQuotaConfigsInner(admin, entityTypes.asScala.toList, entityNames.asScala.toList, configsToBeAddedMap.asScala.toMap, configsToBeDeleted.asScala.toSeq),
|
||||||
|
e => {
|
||||||
|
log.error("getAllClientQuotasConfigs error.", e)
|
||||||
|
(false, e.getMessage)
|
||||||
|
})
|
||||||
|
(true, "")
|
||||||
|
}
|
||||||
|
|
||||||
private def getAllClientQuotasConfigs(adminClient: Admin, entityTypes: List[String], entityNames: List[String]): java.util.Map[ClientQuotaEntity, java.util.Map[String, Double]] = {
|
private def getAllClientQuotasConfigs(adminClient: Admin, entityTypes: List[String], entityNames: List[String]): java.util.Map[ClientQuotaEntity, java.util.Map[String, Double]] = {
|
||||||
val components = entityTypes.map(Some(_)).zipAll(entityNames.map(Some(_)), None, None).map { case (entityTypeOpt, entityNameOpt) =>
|
val components = entityTypes.map(Some(_)).zipAll(entityNames.map(Some(_)), None, None).map { case (entityType, entityNameOpt) =>
|
||||||
val entityType = entityTypeOpt match {
|
|
||||||
case Some(ConfigType.User) => ClientQuotaEntity.USER
|
|
||||||
case Some(ConfigType.Client) => ClientQuotaEntity.CLIENT_ID
|
|
||||||
case Some(ConfigType.Ip) => ClientQuotaEntity.IP
|
|
||||||
case Some(_) => throw new IllegalArgumentException(s"Unexpected entity type ${entityTypeOpt.get}")
|
|
||||||
case None => throw new IllegalArgumentException("More entity names specified than entity types")
|
|
||||||
}
|
|
||||||
entityNameOpt match {
|
entityNameOpt match {
|
||||||
case Some("") => ClientQuotaFilterComponent.ofDefaultEntity(entityType)
|
case Some("") => ClientQuotaFilterComponent.ofDefaultEntity(entityType.get)
|
||||||
case Some(name) => ClientQuotaFilterComponent.ofEntity(entityType, name)
|
case Some(name) => ClientQuotaFilterComponent.ofEntity(entityType.get, name)
|
||||||
case None => ClientQuotaFilterComponent.ofEntityType(entityType)
|
case None => ClientQuotaFilterComponent.ofEntityType(entityType.get)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
adminClient.describeClientQuotas(ClientQuotaFilter.containsOnly(components.asJava)).entities.get(30, TimeUnit.SECONDS)
|
adminClient.describeClientQuotas(ClientQuotaFilter.containsOnly(components.asJava)).entities.get(30, TimeUnit.SECONDS)
|
||||||
}.asInstanceOf[java.util.Map[ClientQuotaEntity, java.util.Map[String, Double]]]
|
}.asInstanceOf[java.util.Map[ClientQuotaEntity, java.util.Map[String, Double]]]
|
||||||
|
|
||||||
private def alterQuotaConfigs(adminClient: Admin, entityTypes: List[String], entityNames: List[String], configsToBeAddedMap: Map[String, String], configsToBeDeleted: Seq[String]) = {
|
private def alterQuotaConfigsInner(adminClient: Admin, entityTypes: List[String], entityNames: List[String], configsToBeAddedMap: Map[String, String], configsToBeDeleted: Seq[String]) = {
|
||||||
// handle altering client/user quota configs
|
// handle altering client/user quota configs
|
||||||
val oldConfig = getAllClientQuotasConfigs(adminClient, entityTypes, entityNames)
|
// val oldConfig = getAllClientQuotasConfigs(adminClient, entityTypes, entityNames)
|
||||||
|
// val invalidConfigs = configsToBeDeleted.filterNot(oldConfig.asScala.toMap.contains)
|
||||||
|
// if (invalidConfigs.nonEmpty)
|
||||||
|
// throw new InvalidConfigurationException(s"Invalid config(s): ${invalidConfigs.mkString(",")}")
|
||||||
|
|
||||||
val alterEntityTypes = entityTypes.map {
|
|
||||||
case ConfigType.User => ClientQuotaEntity.USER
|
|
||||||
case ConfigType.Client => ClientQuotaEntity.CLIENT_ID
|
|
||||||
case ConfigType.Ip => ClientQuotaEntity.IP
|
|
||||||
case entType => throw new IllegalArgumentException(s"Unexpected entity type: $entType")
|
|
||||||
}
|
|
||||||
val alterEntityNames = entityNames.map(en => if (en.nonEmpty) en else null)
|
val alterEntityNames = entityNames.map(en => if (en.nonEmpty) en else null)
|
||||||
|
|
||||||
// Explicitly populate a HashMap to ensure nulls are recorded properly.
|
// Explicitly populate a HashMap to ensure nulls are recorded properly.
|
||||||
val alterEntityMap = new java.util.HashMap[String, String]
|
val alterEntityMap = new java.util.HashMap[String, String]
|
||||||
alterEntityTypes.zip(alterEntityNames).foreach { case (k, v) => alterEntityMap.put(k, v) }
|
entityTypes.zip(alterEntityNames).foreach { case (k, v) => alterEntityMap.put(k, v) }
|
||||||
val entity = new ClientQuotaEntity(alterEntityMap)
|
val entity = new ClientQuotaEntity(alterEntityMap)
|
||||||
|
|
||||||
val alterOptions = new AlterClientQuotasOptions().validateOnly(false)
|
val alterOptions = new AlterClientQuotasOptions().validateOnly(false)
|
||||||
|
|||||||
@@ -4,25 +4,40 @@ import com.xuxd.kafka.console.config.ContextConfig;
|
|||||||
import com.xuxd.kafka.console.config.ContextConfigHolder;
|
import com.xuxd.kafka.console.config.ContextConfigHolder;
|
||||||
import com.xuxd.kafka.console.config.KafkaConfig;
|
import com.xuxd.kafka.console.config.KafkaConfig;
|
||||||
import kafka.console.ClientQuotaConsole;
|
import kafka.console.ClientQuotaConsole;
|
||||||
import kafka.server.ConfigType;
|
import org.apache.kafka.common.config.internals.QuotaConfigs;
|
||||||
import org.apache.kafka.common.quota.ClientQuotaEntity;
|
import org.apache.kafka.common.quota.ClientQuotaEntity;
|
||||||
import org.junit.jupiter.api.Test;
|
import org.junit.jupiter.api.Test;
|
||||||
|
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
|
import java.util.HashMap;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
public class ClientQuotaConsoleTest {
|
public class ClientQuotaConsoleTest {
|
||||||
|
|
||||||
|
String bootstrapServer = "127.0.0.1:9092";
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
void testGetClientQuotasConfigs() {
|
void testGetClientQuotasConfigs() {
|
||||||
ClientQuotaConsole console = new ClientQuotaConsole(new KafkaConfig());
|
ClientQuotaConsole console = new ClientQuotaConsole(new KafkaConfig());
|
||||||
ContextConfig config = new ContextConfig();
|
ContextConfig config = new ContextConfig();
|
||||||
config.setBootstrapServer("127.0.0.1:9092");
|
config.setBootstrapServer(bootstrapServer);
|
||||||
ContextConfigHolder.CONTEXT_CONFIG.set(config);
|
ContextConfigHolder.CONTEXT_CONFIG.set(config);
|
||||||
Map<ClientQuotaEntity, Map<String, Object>> configs = console.getClientQuotasConfigs(Arrays.asList(ConfigType.User()), Arrays.asList());
|
Map<ClientQuotaEntity, Map<String, Object>> configs = console.getClientQuotasConfigs(Arrays.asList(ClientQuotaEntity.USER), Arrays.asList());
|
||||||
configs.forEach((k, v) -> {
|
configs.forEach((k, v) -> {
|
||||||
System.out.println(k);
|
System.out.println(k);
|
||||||
System.out.println(v);
|
System.out.println(v);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void testAlterClientQuotasConfigs() {
|
||||||
|
ClientQuotaConsole console = new ClientQuotaConsole(new KafkaConfig());
|
||||||
|
ContextConfig config = new ContextConfig();
|
||||||
|
config.setBootstrapServer(bootstrapServer);
|
||||||
|
ContextConfigHolder.CONTEXT_CONFIG.set(config);
|
||||||
|
Map<String, String> configsToBeAddedMap = new HashMap<>();
|
||||||
|
configsToBeAddedMap.put(QuotaConfigs.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG, "102400");
|
||||||
|
|
||||||
|
console.addQuotaConfigs(Arrays.asList(ClientQuotaEntity.USER), Arrays.asList("user-test"), configsToBeAddedMap);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user