diff --git a/src/main/java/com/xuxd/kafka/console/beans/AclEntry.java b/src/main/java/com/xuxd/kafka/console/beans/AclEntry.java index 248132e..4f8ef77 100644 --- a/src/main/java/com/xuxd/kafka/console/beans/AclEntry.java +++ b/src/main/java/com/xuxd/kafka/console/beans/AclEntry.java @@ -1,5 +1,6 @@ package com.xuxd.kafka.console.beans; +import java.util.Objects; import lombok.Data; import org.apache.commons.lang3.StringUtils; import org.apache.kafka.common.acl.AccessControlEntry; @@ -49,6 +50,11 @@ public class AclEntry { return entry; } + public boolean isNull() { + return Objects.isNull(resourceType) && Objects.isNull(name) && Objects.isNull(patternType) && Objects.isNull(principal) + && Objects.isNull(host) && Objects.isNull(operation) && Objects.isNull(permissionType); + } + public AclBinding toAclBinding() { ResourceType resourceType = StringUtils.isBlank(this.resourceType) ? ResourceType.UNKNOWN : ResourceType.valueOf(this.resourceType); String resourceName = StringUtils.isBlank(this.name) ? ResourcePattern.WILDCARD_RESOURCE : this.name; @@ -62,12 +68,12 @@ public class AclEntry { } public AclBindingFilter toAclBindingFilter() { - ResourceType resourceType = StringUtils.isBlank(this.resourceType) ? ResourceType.UNKNOWN : ResourceType.valueOf(this.resourceType); + ResourceType resourceType = StringUtils.isBlank(this.resourceType) ? ResourceType.UNKNOWN : ResourceType.valueOf(this.resourceType.toUpperCase()); String resourceName = StringUtils.isBlank(this.name) ? ResourcePattern.WILDCARD_RESOURCE : this.name; PatternType patternType = StringUtils.isBlank(this.patternType) ? PatternType.LITERAL : PatternType.valueOf(this.patternType); String principal = StringUtils.isNotBlank(this.principal) ? new KafkaPrincipal(KafkaPrincipal.USER_TYPE, this.principal).toString() : KafkaPrincipal.ANONYMOUS.toString(); String host = StringUtils.isBlank(this.host) ? ResourcePattern.WILDCARD_RESOURCE : this.host; - AclOperation operation = StringUtils.isBlank(this.operation) ? AclOperation.UNKNOWN : AclOperation.valueOf(this.operation); + AclOperation operation = StringUtils.isBlank(this.operation) ? AclOperation.UNKNOWN : AclOperation.valueOf(this.operation.toUpperCase()); AclPermissionType permissionType = StringUtils.isBlank(this.permissionType) ? AclPermissionType.ALLOW : AclPermissionType.valueOf(this.permissionType); AclBindingFilter filter = new AclBindingFilter(new ResourcePatternFilter(resourceType, resourceName, patternType), diff --git a/src/main/java/com/xuxd/kafka/console/beans/CounterMap.java b/src/main/java/com/xuxd/kafka/console/beans/CounterMap.java new file mode 100644 index 0000000..ba69878 --- /dev/null +++ b/src/main/java/com/xuxd/kafka/console/beans/CounterMap.java @@ -0,0 +1,25 @@ +package com.xuxd.kafka.console.beans; + +import java.util.Map; +import lombok.Getter; + +/** + * kafka-console-ui. + * + * @author xuxd + * @date 2021-08-31 21:05:14 + **/ +public class CounterMap { + + @Getter + private Map map; + + @Getter + private int total; + + public CounterMap(Map map) { + this.map = map; + this.total = this.map != null ? this.map.size() : 0; + } + +} diff --git a/src/main/java/com/xuxd/kafka/console/beans/dto/QueryAclDTO.java b/src/main/java/com/xuxd/kafka/console/beans/dto/QueryAclDTO.java new file mode 100644 index 0000000..9583891 --- /dev/null +++ b/src/main/java/com/xuxd/kafka/console/beans/dto/QueryAclDTO.java @@ -0,0 +1,30 @@ +package com.xuxd.kafka.console.beans.dto; + +import com.xuxd.kafka.console.beans.AclEntry; +import lombok.Data; + +/** + * kafka-console-ui. + * + * @author xuxd + * @date 2021-08-31 17:18:40 + **/ +@Data +public class QueryAclDTO { + + private String resourceType; + + private String username = null; + + private String resourceName = null; + + public AclEntry toEntry() { + AclEntry entry = new AclEntry(); + entry.setPrincipal(username); + entry.setResourceType(resourceType); + entry.setName(resourceName); + + return entry; + } + +} diff --git a/src/main/java/com/xuxd/kafka/console/controller/AclAuthController.java b/src/main/java/com/xuxd/kafka/console/controller/AclAuthController.java index 2832200..efead86 100644 --- a/src/main/java/com/xuxd/kafka/console/controller/AclAuthController.java +++ b/src/main/java/com/xuxd/kafka/console/controller/AclAuthController.java @@ -4,6 +4,7 @@ import com.xuxd.kafka.console.beans.AclEntry; import com.xuxd.kafka.console.beans.dto.ConsumerAuthDTO; import com.xuxd.kafka.console.beans.dto.DeleteAclDTO; import com.xuxd.kafka.console.beans.dto.ProducerAuthDTO; +import com.xuxd.kafka.console.beans.dto.QueryAclDTO; import com.xuxd.kafka.console.service.AclService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.DeleteMapping; @@ -31,6 +32,11 @@ public class AclAuthController { return aclService.getAclList(); } + @PostMapping("/list") + public Object getAclList(@RequestBody QueryAclDTO param) { + return aclService.getAclList(param.toEntry()); + } + @PostMapping public Object addAcl(@RequestBody AclEntry entry) { return aclService.addAcl(entry); diff --git a/src/main/java/com/xuxd/kafka/console/service/AclService.java b/src/main/java/com/xuxd/kafka/console/service/AclService.java index e62b3f5..d011683 100644 --- a/src/main/java/com/xuxd/kafka/console/service/AclService.java +++ b/src/main/java/com/xuxd/kafka/console/service/AclService.java @@ -20,6 +20,8 @@ public interface AclService { ResponseData getAclList(); + ResponseData getAclList(AclEntry entry); + ResponseData deleteAcl(AclEntry entry); ResponseData addAcl(AclEntry entry); diff --git a/src/main/java/com/xuxd/kafka/console/service/impl/AclServiceImpl.java b/src/main/java/com/xuxd/kafka/console/service/impl/AclServiceImpl.java index d20fa34..83d6cb5 100644 --- a/src/main/java/com/xuxd/kafka/console/service/impl/AclServiceImpl.java +++ b/src/main/java/com/xuxd/kafka/console/service/impl/AclServiceImpl.java @@ -2,10 +2,12 @@ package com.xuxd.kafka.console.service.impl; import com.xuxd.kafka.console.beans.AclEntry; import com.xuxd.kafka.console.beans.CounterList; +import com.xuxd.kafka.console.beans.CounterMap; import com.xuxd.kafka.console.beans.ResponseData; import com.xuxd.kafka.console.service.AclService; import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.stream.Collectors; import kafka.console.KafkaAclConsole; @@ -49,11 +51,19 @@ public class AclServiceImpl implements AclService { } @Override public ResponseData getAclList() { - List aclBindingList = aclConsole.getAclList(); + List aclBindingList = aclConsole.getAclList(null); return ResponseData.create().data(new CounterList<>(aclBindingList.stream().map(x -> AclEntry.valueOf(x)).collect(Collectors.toList()))).success(); } + @Override public ResponseData getAclList(AclEntry entry) { + List aclBindingList = entry.isNull() ? aclConsole.getAclList(null) : aclConsole.getAclList(entry); + List entryList = aclBindingList.stream().map(x -> AclEntry.valueOf(x)).collect(Collectors.toList()); + Map> entryMap = entryList.stream().collect(Collectors.groupingBy(AclEntry::getPrincipal)); + + return ResponseData.create().data(new CounterMap<>(entryMap)).success(); + } + @Override public ResponseData deleteAcl(AclEntry entry) { return aclConsole.deleteAcl(entry, false, false, false) ? ResponseData.create().success() : ResponseData.create().failed(); } diff --git a/src/main/scala/kafka/console/KafkaAclConsole.scala b/src/main/scala/kafka/console/KafkaAclConsole.scala index a06f38b..8e7c0f1 100644 --- a/src/main/scala/kafka/console/KafkaAclConsole.scala +++ b/src/main/scala/kafka/console/KafkaAclConsole.scala @@ -6,8 +6,10 @@ import java.util.{Collections, List} import com.xuxd.kafka.console.beans.AclEntry import com.xuxd.kafka.console.config.KafkaConfig +import org.apache.commons.lang3.StringUtils import org.apache.kafka.common.acl._ import org.apache.kafka.common.resource.{ResourcePattern, ResourcePatternFilter, ResourceType} +import org.apache.kafka.common.security.auth.KafkaPrincipal import scala.jdk.CollectionConverters.SetHasAsJava @@ -19,8 +21,36 @@ import scala.jdk.CollectionConverters.SetHasAsJava * */ class KafkaAclConsole(config: KafkaConfig) extends KafkaConsole(config: KafkaConfig) with Logging { - def getAclList(): List[AclBinding] = { - withAdminClient(adminClient => adminClient.describeAcls(AclBindingFilter.ANY).values().get()).asInstanceOf[List[AclBinding]] + def getAclList(entry: AclEntry): List[AclBinding] = { + + if (entry == null) { + withAdminClient(adminClient => adminClient.describeAcls(AclBindingFilter.ANY).values().get()).asInstanceOf[List[AclBinding]] + } else { + entry.isNull match { + case true => withAdminClient(adminClient => adminClient.describeAcls(AclBindingFilter.ANY).values().get()).asInstanceOf[List[AclBinding]] + case false => { + val f = entry.toAclBindingFilter + var resourceType: ResourceType = ResourceType.ANY + if (f.patternFilter().resourceType() != ResourceType.UNKNOWN) { + resourceType = f.patternFilter().resourceType() + } + + var name: String = null + if (f.patternFilter().name() != ResourcePattern.WILDCARD_RESOURCE) { + name = f.patternFilter().name() + } + + var principal: String = null + if ( StringUtils.isNotBlank(entry.getPrincipal) && !KafkaPrincipal.ANONYMOUS.toString.equalsIgnoreCase(f.entryFilter().principal())) { + principal = f.entryFilter().principal(); + } + val filter = new AclBindingFilter(new ResourcePatternFilter(resourceType, name, f.patternFilter().patternType()), + new AccessControlEntryFilter(principal, f.entryFilter().host(), AclOperation.ANY, AclPermissionType.ANY)) + log.info(filter.toString) + withAdminClient(adminClient => adminClient.describeAcls(filter).values().get()).asInstanceOf[List[AclBinding]] + } + } + } } def addAcl(acls: List[AclBinding]): Boolean = {