ACL的认证和授权管理功能分离.

This commit is contained in:
许晓东
2022-08-28 21:39:08 +08:00
parent 4c3fe5230c
commit 3bd14a35d6
16 changed files with 1160 additions and 502 deletions

View File

@@ -118,4 +118,14 @@ public class AclAuthController {
return aclService.deleteConsumerAcl(param.toTopicEntry(), param.toGroupEntry());
}
/**
* clear principal acls.
*
* @param param acl principal.
* @return true or false.
*/
@DeleteMapping("/clear")
public Object clearAcl(@RequestBody DeleteAclDTO param) {
return aclService.clearAcl(param.toUserEntry());
}
}

View File

@@ -1,7 +1,9 @@
package com.xuxd.kafka.console.controller;
import com.xuxd.kafka.console.beans.AclEntry;
import com.xuxd.kafka.console.beans.AclUser;
import com.xuxd.kafka.console.service.AclService;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.DeleteMapping;
import org.springframework.web.bind.annotation.GetMapping;
@@ -12,7 +14,7 @@ import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
/**
* kafka-console-ui.
* kafka-console-ui. sasl scram user.
*
* @author xuxd
* @date 2021-08-28 21:13:05
@@ -49,4 +51,11 @@ public class AclUserController {
public Object getUserDetail(@RequestParam String username) {
return aclService.getUserDetail(username);
}
@GetMapping("/scram")
public Object getSaslScramUserList(@RequestParam(required = false) String username) {
AclEntry entry = new AclEntry();
entry.setPrincipal(StringUtils.isNotBlank(username) ? username : null);
return aclService.getSaslScramUserList(entry);
}
}

View File

@@ -72,6 +72,7 @@ public class ContextSetFilter implements Filter {
config.setProperties(ConvertUtil.toProperties(infoDO.getProperties()));
}
ContextConfigHolder.CONTEXT_CONFIG.set(config);
// log.info("current kafka config: {}", config);
}
}
chain.doFilter(req, response);

View File

@@ -42,4 +42,7 @@ public interface AclService {
ResponseData getUserDetail(String username);
ResponseData clearAcl(AclEntry entry);
ResponseData getSaslScramUserList(AclEntry entry);
}

View File

@@ -10,30 +10,23 @@ import com.xuxd.kafka.console.config.ContextConfigHolder;
import com.xuxd.kafka.console.dao.KafkaUserMapper;
import com.xuxd.kafka.console.service.AclService;
import com.xuxd.kafka.console.utils.SaslUtil;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.stream.Collectors;
import kafka.console.KafkaAclConsole;
import kafka.console.KafkaConfigConsole;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.admin.ScramMechanism;
import org.apache.kafka.clients.admin.UserScramCredentialsDescription;
import org.apache.kafka.common.acl.AclBinding;
import org.apache.kafka.common.acl.AclOperation;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.errors.SecurityDisabledException;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import scala.Tuple2;
import java.util.*;
import java.util.stream.Collectors;
import static com.xuxd.kafka.console.utils.SaslUtil.isEnableSasl;
import static com.xuxd.kafka.console.utils.SaslUtil.isEnableScram;
@@ -139,39 +132,52 @@ public class AclServiceImpl implements AclService {
}
@Override public ResponseData getAclList(AclEntry entry) {
List<AclBinding> aclBindingList = entry.isNull() ? aclConsole.getAclList(null) : aclConsole.getAclList(entry);
List<AclBinding> aclBindingList = Collections.emptyList();
try {
aclBindingList = entry.isNull() ? aclConsole.getAclList(null) : aclConsole.getAclList(entry);
}catch (Exception ex) {
if (ex.getCause() instanceof SecurityDisabledException) {
Throwable e = ex.getCause();
log.info("SecurityDisabledException: {}", e.getMessage());
Map<String, String> hint = new HashMap<>(2);
hint.put("hint", "Security Disabled: " + e.getMessage());
return ResponseData.create().data(hint).success();
}
throw new RuntimeException(ex.getCause());
}
// List<AclBinding> aclBindingList = entry.isNull() ? aclConsole.getAclList(null) : aclConsole.getAclList(entry);
List<AclEntry> entryList = aclBindingList.stream().map(x -> AclEntry.valueOf(x)).collect(Collectors.toList());
Map<String, List<AclEntry>> entryMap = entryList.stream().collect(Collectors.groupingBy(AclEntry::getPrincipal));
Map<String, Object> resultMap = new HashMap<>();
entryMap.forEach((k, v) -> {
Map<String, List<AclEntry>> map = v.stream().collect(Collectors.groupingBy(e -> e.getResourceType() + "#" + e.getName()));
String username = SaslUtil.findUsername(ContextConfigHolder.CONTEXT_CONFIG.get().getProperties().getProperty(SaslConfigs.SASL_JAAS_CONFIG));
if (k.equals(username)) {
Map<String, Object> map2 = new HashMap<>(map);
Map<String, Object> userMap = new HashMap<>();
userMap.put("role", "admin");
map2.put("USER", userMap);
}
// String username = SaslUtil.findUsername(ContextConfigHolder.CONTEXT_CONFIG.get().getProperties().getProperty(SaslConfigs.SASL_JAAS_CONFIG));
// if (k.equals(username)) {
// Map<String, Object> map2 = new HashMap<>(map);
// Map<String, Object> userMap = new HashMap<>();
// userMap.put("role", "admin");
// map2.put("USER", userMap);
// }
resultMap.put(k, map);
});
if (entry.isNull() || StringUtils.isNotBlank(entry.getPrincipal())) {
Map<String, UserScramCredentialsDescription> detailList = configConsole.getUserDetailList(StringUtils.isNotBlank(entry.getPrincipal()) ? Collections.singletonList(entry.getPrincipal()) : null);
detailList.values().forEach(u -> {
if (!resultMap.containsKey(u.name()) && !u.credentialInfos().isEmpty()) {
String username = SaslUtil.findUsername(ContextConfigHolder.CONTEXT_CONFIG.get().getProperties().getProperty(SaslConfigs.SASL_JAAS_CONFIG));
if (!u.name().equals(username)) {
resultMap.put(u.name(), Collections.emptyMap());
} else {
Map<String, Object> map2 = new HashMap<>();
Map<String, Object> userMap = new HashMap<>();
userMap.put("role", "admin");
map2.put("USER", userMap);
resultMap.put(u.name(), map2);
}
}
});
}
// if (entry.isNull() || StringUtils.isNotBlank(entry.getPrincipal())) {
// Map<String, UserScramCredentialsDescription> detailList = configConsole.getUserDetailList(StringUtils.isNotBlank(entry.getPrincipal()) ? Collections.singletonList(entry.getPrincipal()) : null);
//
// detailList.values().forEach(u -> {
// if (!resultMap.containsKey(u.name()) && !u.credentialInfos().isEmpty()) {
// String username = SaslUtil.findUsername(ContextConfigHolder.CONTEXT_CONFIG.get().getProperties().getProperty(SaslConfigs.SASL_JAAS_CONFIG));
// if (!u.name().equals(username)) {
// resultMap.put(u.name(), Collections.emptyMap());
// } else {
// Map<String, Object> map2 = new HashMap<>();
// Map<String, Object> userMap = new HashMap<>();
// userMap.put("role", "admin");
// map2.put("USER", userMap);
// resultMap.put(u.name(), map2);
// }
// }
// });
// }
return ResponseData.create().data(new CounterMap<>(resultMap)).success();
}
@@ -236,6 +242,37 @@ public class AclServiceImpl implements AclService {
return ResponseData.create().data(vo).success();
}
@Override
public ResponseData clearAcl(AclEntry entry) {
log.info("Start clear acl, principal: {}", entry);
return aclConsole.deleteUserAcl(entry) ? ResponseData.create().success() : ResponseData.create().failed("操作失败");
}
@Override
public ResponseData getSaslScramUserList(AclEntry entry) {
Map<String, Object> resultMap = new HashMap<>();
if (entry.isNull() || StringUtils.isNotBlank(entry.getPrincipal())) {
Map<String, UserScramCredentialsDescription> detailList = configConsole.getUserDetailList(StringUtils.isNotBlank(entry.getPrincipal()) ? Collections.singletonList(entry.getPrincipal()) : null);
detailList.values().forEach(u -> {
if (!resultMap.containsKey(u.name()) && !u.credentialInfos().isEmpty()) {
String username = SaslUtil.findUsername(ContextConfigHolder.CONTEXT_CONFIG.get().getProperties().getProperty(SaslConfigs.SASL_JAAS_CONFIG));
if (!u.name().equals(username)) {
resultMap.put(u.name(), Collections.emptyMap());
} else {
Map<String, Object> map2 = new HashMap<>();
Map<String, Object> userMap = new HashMap<>();
userMap.put("role", "admin");
map2.put("USER", userMap);
resultMap.put(u.name(), map2);
}
}
});
}
return ResponseData.create().data(new CounterMap<>(resultMap)).success();
}
// @Override public void afterSingletonsInstantiated() {
// if (kafkaConfig.isEnableAcl() && kafkaConfig.isAdminCreate()) {
// log.info("Start create admin user, username: {}, password: {}", kafkaConfig.getAdminUsername(), kafkaConfig.getAdminPassword());

View File

@@ -73,6 +73,10 @@ public class ClusterServiceImpl implements ClusterService {
}
@Override public ResponseData updateClusterInfo(ClusterInfoDO infoDO) {
if (infoDO.getProperties() == null) {
// null 的话不更新这个是bug设置为空字符串解决
infoDO.setProperties("");
}
clusterInfoMapper.updateById(infoDO);
return ResponseData.create().success();
}