From e5d780d6b085c3082be1241d7501ed8b4dc3eb9d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=AE=B8=E6=99=93=E4=B8=9C?= <763795151@qq.com> Date: Tue, 31 Aug 2021 21:27:22 +0800 Subject: [PATCH] add query acl list by group API --- .../xuxd/kafka/console/beans/AclEntry.java | 10 ++++-- .../xuxd/kafka/console/beans/CounterMap.java | 25 ++++++++++++++ .../kafka/console/beans/dto/QueryAclDTO.java | 30 ++++++++++++++++ .../console/controller/AclAuthController.java | 6 ++++ .../kafka/console/service/AclService.java | 2 ++ .../console/service/impl/AclServiceImpl.java | 12 ++++++- .../scala/kafka/console/KafkaAclConsole.scala | 34 +++++++++++++++++-- 7 files changed, 114 insertions(+), 5 deletions(-) create mode 100644 src/main/java/com/xuxd/kafka/console/beans/CounterMap.java create mode 100644 src/main/java/com/xuxd/kafka/console/beans/dto/QueryAclDTO.java diff --git a/src/main/java/com/xuxd/kafka/console/beans/AclEntry.java b/src/main/java/com/xuxd/kafka/console/beans/AclEntry.java index 248132e..4f8ef77 100644 --- a/src/main/java/com/xuxd/kafka/console/beans/AclEntry.java +++ b/src/main/java/com/xuxd/kafka/console/beans/AclEntry.java @@ -1,5 +1,6 @@ package com.xuxd.kafka.console.beans; +import java.util.Objects; import lombok.Data; import org.apache.commons.lang3.StringUtils; import org.apache.kafka.common.acl.AccessControlEntry; @@ -49,6 +50,11 @@ public class AclEntry { return entry; } + public boolean isNull() { + return Objects.isNull(resourceType) && Objects.isNull(name) && Objects.isNull(patternType) && Objects.isNull(principal) + && Objects.isNull(host) && Objects.isNull(operation) && Objects.isNull(permissionType); + } + public AclBinding toAclBinding() { ResourceType resourceType = StringUtils.isBlank(this.resourceType) ? ResourceType.UNKNOWN : ResourceType.valueOf(this.resourceType); String resourceName = StringUtils.isBlank(this.name) ? ResourcePattern.WILDCARD_RESOURCE : this.name; @@ -62,12 +68,12 @@ public class AclEntry { } public AclBindingFilter toAclBindingFilter() { - ResourceType resourceType = StringUtils.isBlank(this.resourceType) ? ResourceType.UNKNOWN : ResourceType.valueOf(this.resourceType); + ResourceType resourceType = StringUtils.isBlank(this.resourceType) ? ResourceType.UNKNOWN : ResourceType.valueOf(this.resourceType.toUpperCase()); String resourceName = StringUtils.isBlank(this.name) ? ResourcePattern.WILDCARD_RESOURCE : this.name; PatternType patternType = StringUtils.isBlank(this.patternType) ? PatternType.LITERAL : PatternType.valueOf(this.patternType); String principal = StringUtils.isNotBlank(this.principal) ? new KafkaPrincipal(KafkaPrincipal.USER_TYPE, this.principal).toString() : KafkaPrincipal.ANONYMOUS.toString(); String host = StringUtils.isBlank(this.host) ? ResourcePattern.WILDCARD_RESOURCE : this.host; - AclOperation operation = StringUtils.isBlank(this.operation) ? AclOperation.UNKNOWN : AclOperation.valueOf(this.operation); + AclOperation operation = StringUtils.isBlank(this.operation) ? AclOperation.UNKNOWN : AclOperation.valueOf(this.operation.toUpperCase()); AclPermissionType permissionType = StringUtils.isBlank(this.permissionType) ? AclPermissionType.ALLOW : AclPermissionType.valueOf(this.permissionType); AclBindingFilter filter = new AclBindingFilter(new ResourcePatternFilter(resourceType, resourceName, patternType), diff --git a/src/main/java/com/xuxd/kafka/console/beans/CounterMap.java b/src/main/java/com/xuxd/kafka/console/beans/CounterMap.java new file mode 100644 index 0000000..ba69878 --- /dev/null +++ b/src/main/java/com/xuxd/kafka/console/beans/CounterMap.java @@ -0,0 +1,25 @@ +package com.xuxd.kafka.console.beans; + +import java.util.Map; +import lombok.Getter; + +/** + * kafka-console-ui. + * + * @author xuxd + * @date 2021-08-31 21:05:14 + **/ +public class CounterMap { + + @Getter + private Map map; + + @Getter + private int total; + + public CounterMap(Map map) { + this.map = map; + this.total = this.map != null ? this.map.size() : 0; + } + +} diff --git a/src/main/java/com/xuxd/kafka/console/beans/dto/QueryAclDTO.java b/src/main/java/com/xuxd/kafka/console/beans/dto/QueryAclDTO.java new file mode 100644 index 0000000..9583891 --- /dev/null +++ b/src/main/java/com/xuxd/kafka/console/beans/dto/QueryAclDTO.java @@ -0,0 +1,30 @@ +package com.xuxd.kafka.console.beans.dto; + +import com.xuxd.kafka.console.beans.AclEntry; +import lombok.Data; + +/** + * kafka-console-ui. + * + * @author xuxd + * @date 2021-08-31 17:18:40 + **/ +@Data +public class QueryAclDTO { + + private String resourceType; + + private String username = null; + + private String resourceName = null; + + public AclEntry toEntry() { + AclEntry entry = new AclEntry(); + entry.setPrincipal(username); + entry.setResourceType(resourceType); + entry.setName(resourceName); + + return entry; + } + +} diff --git a/src/main/java/com/xuxd/kafka/console/controller/AclAuthController.java b/src/main/java/com/xuxd/kafka/console/controller/AclAuthController.java index 2832200..efead86 100644 --- a/src/main/java/com/xuxd/kafka/console/controller/AclAuthController.java +++ b/src/main/java/com/xuxd/kafka/console/controller/AclAuthController.java @@ -4,6 +4,7 @@ import com.xuxd.kafka.console.beans.AclEntry; import com.xuxd.kafka.console.beans.dto.ConsumerAuthDTO; import com.xuxd.kafka.console.beans.dto.DeleteAclDTO; import com.xuxd.kafka.console.beans.dto.ProducerAuthDTO; +import com.xuxd.kafka.console.beans.dto.QueryAclDTO; import com.xuxd.kafka.console.service.AclService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.DeleteMapping; @@ -31,6 +32,11 @@ public class AclAuthController { return aclService.getAclList(); } + @PostMapping("/list") + public Object getAclList(@RequestBody QueryAclDTO param) { + return aclService.getAclList(param.toEntry()); + } + @PostMapping public Object addAcl(@RequestBody AclEntry entry) { return aclService.addAcl(entry); diff --git a/src/main/java/com/xuxd/kafka/console/service/AclService.java b/src/main/java/com/xuxd/kafka/console/service/AclService.java index e62b3f5..d011683 100644 --- a/src/main/java/com/xuxd/kafka/console/service/AclService.java +++ b/src/main/java/com/xuxd/kafka/console/service/AclService.java @@ -20,6 +20,8 @@ public interface AclService { ResponseData getAclList(); + ResponseData getAclList(AclEntry entry); + ResponseData deleteAcl(AclEntry entry); ResponseData addAcl(AclEntry entry); diff --git a/src/main/java/com/xuxd/kafka/console/service/impl/AclServiceImpl.java b/src/main/java/com/xuxd/kafka/console/service/impl/AclServiceImpl.java index d20fa34..83d6cb5 100644 --- a/src/main/java/com/xuxd/kafka/console/service/impl/AclServiceImpl.java +++ b/src/main/java/com/xuxd/kafka/console/service/impl/AclServiceImpl.java @@ -2,10 +2,12 @@ package com.xuxd.kafka.console.service.impl; import com.xuxd.kafka.console.beans.AclEntry; import com.xuxd.kafka.console.beans.CounterList; +import com.xuxd.kafka.console.beans.CounterMap; import com.xuxd.kafka.console.beans.ResponseData; import com.xuxd.kafka.console.service.AclService; import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.stream.Collectors; import kafka.console.KafkaAclConsole; @@ -49,11 +51,19 @@ public class AclServiceImpl implements AclService { } @Override public ResponseData getAclList() { - List aclBindingList = aclConsole.getAclList(); + List aclBindingList = aclConsole.getAclList(null); return ResponseData.create().data(new CounterList<>(aclBindingList.stream().map(x -> AclEntry.valueOf(x)).collect(Collectors.toList()))).success(); } + @Override public ResponseData getAclList(AclEntry entry) { + List aclBindingList = entry.isNull() ? aclConsole.getAclList(null) : aclConsole.getAclList(entry); + List entryList = aclBindingList.stream().map(x -> AclEntry.valueOf(x)).collect(Collectors.toList()); + Map> entryMap = entryList.stream().collect(Collectors.groupingBy(AclEntry::getPrincipal)); + + return ResponseData.create().data(new CounterMap<>(entryMap)).success(); + } + @Override public ResponseData deleteAcl(AclEntry entry) { return aclConsole.deleteAcl(entry, false, false, false) ? ResponseData.create().success() : ResponseData.create().failed(); } diff --git a/src/main/scala/kafka/console/KafkaAclConsole.scala b/src/main/scala/kafka/console/KafkaAclConsole.scala index a06f38b..8e7c0f1 100644 --- a/src/main/scala/kafka/console/KafkaAclConsole.scala +++ b/src/main/scala/kafka/console/KafkaAclConsole.scala @@ -6,8 +6,10 @@ import java.util.{Collections, List} import com.xuxd.kafka.console.beans.AclEntry import com.xuxd.kafka.console.config.KafkaConfig +import org.apache.commons.lang3.StringUtils import org.apache.kafka.common.acl._ import org.apache.kafka.common.resource.{ResourcePattern, ResourcePatternFilter, ResourceType} +import org.apache.kafka.common.security.auth.KafkaPrincipal import scala.jdk.CollectionConverters.SetHasAsJava @@ -19,8 +21,36 @@ import scala.jdk.CollectionConverters.SetHasAsJava * */ class KafkaAclConsole(config: KafkaConfig) extends KafkaConsole(config: KafkaConfig) with Logging { - def getAclList(): List[AclBinding] = { - withAdminClient(adminClient => adminClient.describeAcls(AclBindingFilter.ANY).values().get()).asInstanceOf[List[AclBinding]] + def getAclList(entry: AclEntry): List[AclBinding] = { + + if (entry == null) { + withAdminClient(adminClient => adminClient.describeAcls(AclBindingFilter.ANY).values().get()).asInstanceOf[List[AclBinding]] + } else { + entry.isNull match { + case true => withAdminClient(adminClient => adminClient.describeAcls(AclBindingFilter.ANY).values().get()).asInstanceOf[List[AclBinding]] + case false => { + val f = entry.toAclBindingFilter + var resourceType: ResourceType = ResourceType.ANY + if (f.patternFilter().resourceType() != ResourceType.UNKNOWN) { + resourceType = f.patternFilter().resourceType() + } + + var name: String = null + if (f.patternFilter().name() != ResourcePattern.WILDCARD_RESOURCE) { + name = f.patternFilter().name() + } + + var principal: String = null + if ( StringUtils.isNotBlank(entry.getPrincipal) && !KafkaPrincipal.ANONYMOUS.toString.equalsIgnoreCase(f.entryFilter().principal())) { + principal = f.entryFilter().principal(); + } + val filter = new AclBindingFilter(new ResourcePatternFilter(resourceType, name, f.patternFilter().patternType()), + new AccessControlEntryFilter(principal, f.entryFilter().host(), AclOperation.ANY, AclPermissionType.ANY)) + log.info(filter.toString) + withAdminClient(adminClient => adminClient.describeAcls(filter).values().get()).asInstanceOf[List[AclBinding]] + } + } + } } def addAcl(acls: List[AclBinding]): Boolean = {