多集群支持,集群切换

This commit is contained in:
许晓东
2022-01-06 19:31:44 +08:00
parent f5fb2c4f88
commit 6a2d876d50
26 changed files with 317 additions and 209 deletions

View File

@@ -23,4 +23,6 @@ public class KafkaUserDO {
private String password;
private String updateTime;
private Long clusterInfoId;
}

View File

@@ -1,6 +1,7 @@
package com.xuxd.kafka.console.config;
import java.util.Properties;
import org.apache.kafka.clients.CommonClientConfigs;
/**
* kafka-console-ui.
@@ -12,6 +13,10 @@ public class ContextConfig {
public static final int DEFAULT_REQUEST_TIMEOUT_MS = 5000;
private Long clusterInfoId;
private String clusterName;
private String bootstrapServer;
private int requestTimeoutMs = DEFAULT_REQUEST_TIMEOUT_MS;
@@ -27,7 +32,8 @@ public class ContextConfig {
}
public int getRequestTimeoutMs() {
return requestTimeoutMs;
return properties.containsKey(CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG) ?
Integer.parseInt(properties.getProperty(CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG)) : requestTimeoutMs;
}
public void setRequestTimeoutMs(int requestTimeoutMs) {
@@ -38,6 +44,22 @@ public class ContextConfig {
return properties;
}
public Long getClusterInfoId() {
return clusterInfoId;
}
public void setClusterInfoId(Long clusterInfoId) {
this.clusterInfoId = clusterInfoId;
}
public String getClusterName() {
return clusterName;
}
public void setClusterName(String clusterName) {
this.clusterName = clusterName;
}
public void setProperties(Properties properties) {
this.properties = properties;
}

View File

@@ -16,24 +16,8 @@ public class KafkaConfig {
private String bootstrapServer;
private int requestTimeoutMs;
private String securityProtocol;
private String saslMechanism;
private String saslJaasConfig;
private String adminUsername;
private String adminPassword;
private boolean adminCreate;
private String zookeeperAddr;
private boolean enableAcl;
private Properties properties;
public String getBootstrapServer() {
@@ -44,62 +28,6 @@ public class KafkaConfig {
this.bootstrapServer = bootstrapServer;
}
public int getRequestTimeoutMs() {
return requestTimeoutMs;
}
public void setRequestTimeoutMs(int requestTimeoutMs) {
this.requestTimeoutMs = requestTimeoutMs;
}
public String getSecurityProtocol() {
return securityProtocol;
}
public void setSecurityProtocol(String securityProtocol) {
this.securityProtocol = securityProtocol;
}
public String getSaslMechanism() {
return saslMechanism;
}
public void setSaslMechanism(String saslMechanism) {
this.saslMechanism = saslMechanism;
}
public String getSaslJaasConfig() {
return saslJaasConfig;
}
public void setSaslJaasConfig(String saslJaasConfig) {
this.saslJaasConfig = saslJaasConfig;
}
public String getAdminUsername() {
return adminUsername;
}
public void setAdminUsername(String adminUsername) {
this.adminUsername = adminUsername;
}
public String getAdminPassword() {
return adminPassword;
}
public void setAdminPassword(String adminPassword) {
this.adminPassword = adminPassword;
}
public boolean isAdminCreate() {
return adminCreate;
}
public void setAdminCreate(boolean adminCreate) {
this.adminCreate = adminCreate;
}
public String getZookeeperAddr() {
return zookeeperAddr;
}
@@ -108,14 +36,6 @@ public class KafkaConfig {
this.zookeeperAddr = zookeeperAddr;
}
public boolean isEnableAcl() {
return enableAcl;
}
public void setEnableAcl(boolean enableAcl) {
this.enableAcl = enableAcl;
}
public Properties getProperties() {
return properties;
}

View File

@@ -36,7 +36,7 @@ public class ConfigController {
this.configService = configService;
}
@GetMapping
@GetMapping("/console")
public Object getConfig() {
return ResponseData.create().data(configMap).success();
}

View File

@@ -27,7 +27,7 @@ import org.springframework.http.MediaType;
* @author xuxd
* @date 2022-01-05 19:56:25
**/
@WebFilter(filterName = "context-set-filter", urlPatterns = {"/*"})
@WebFilter(filterName = "context-set-filter", urlPatterns = {"/acl/*","/user/*","/cluster/*","/config/*","/consumer/*","/message/*","/topic/*","/op/*"})
@Slf4j
public class ContextSetFilter implements Filter {
@@ -36,6 +36,7 @@ public class ContextSetFilter implements Filter {
{
excludes.add("/cluster/info/peek");
excludes.add("/cluster/info");
excludes.add("/config/console");
}
@Autowired
@@ -49,16 +50,27 @@ public class ContextSetFilter implements Filter {
if (!excludes.contains(uri)) {
String headerId = request.getHeader(Header.ID);
if (StringUtils.isBlank(headerId)) {
ResponseData failed = ResponseData.create().failed("Cluster id is null.");
response.setContentType(MediaType.APPLICATION_JSON_VALUE);
response.getOutputStream().println(ConvertUtil.toJsonString(failed));
// ResponseData failed = ResponseData.create().failed("Cluster info is null.");
ResponseData failed = ResponseData.create().failed("没有集群信息,请先切换集群");
response.setContentType(MediaType.APPLICATION_JSON_UTF8_VALUE);
response.getWriter().println(ConvertUtil.toJsonString(failed));
return;
} else {
ClusterInfoDO infoDO = clusterInfoMapper.selectById(Long.valueOf(headerId));
if (infoDO == null) {
ResponseData failed = ResponseData.create().failed("该集群找不到信息,请切换一个有效集群");
response.setContentType(MediaType.APPLICATION_JSON_UTF8_VALUE);
response.getWriter().println(ConvertUtil.toJsonString(failed));
return;
}
ContextConfig config = new ContextConfig();
config.setClusterInfoId(infoDO.getId());
config.setClusterName(infoDO.getClusterName());
config.setBootstrapServer(infoDO.getAddress());
config.setProperties(ConvertUtil.toProperties(infoDO.getProperties()));
if (StringUtils.isNotBlank(infoDO.getProperties())) {
config.setProperties(ConvertUtil.toProperties(infoDO.getProperties()));
}
ContextConfigHolder.CONTEXT_CONFIG.set(config);
}
}

View File

@@ -1,9 +1,22 @@
package com.xuxd.kafka.console.schedule;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.xuxd.kafka.console.beans.dos.ClusterInfoDO;
import com.xuxd.kafka.console.beans.dos.KafkaUserDO;
import com.xuxd.kafka.console.config.ContextConfig;
import com.xuxd.kafka.console.config.ContextConfigHolder;
import com.xuxd.kafka.console.dao.ClusterInfoMapper;
import com.xuxd.kafka.console.dao.KafkaUserMapper;
import com.xuxd.kafka.console.utils.ConvertUtil;
import com.xuxd.kafka.console.utils.SaslUtil;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import kafka.console.KafkaConfigConsole;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
@@ -21,25 +34,58 @@ public class KafkaAclSchedule {
private final KafkaConfigConsole configConsole;
public KafkaAclSchedule(KafkaUserMapper userMapper, KafkaConfigConsole configConsole) {
this.userMapper = userMapper;
this.configConsole = configConsole;
private final ClusterInfoMapper clusterInfoMapper;
public KafkaAclSchedule(ObjectProvider<KafkaUserMapper> userMapper,
ObjectProvider<KafkaConfigConsole> configConsole, ObjectProvider<ClusterInfoMapper> clusterInfoMapper) {
this.userMapper = userMapper.getIfAvailable();
this.configConsole = configConsole.getIfAvailable();
this.clusterInfoMapper = clusterInfoMapper.getIfAvailable();
}
@Scheduled(cron = "${cron.clear-dirty-user}")
public void clearDirtyKafkaUser() {
log.info("Start clear dirty data for kafka user from database.");
Set<String> userSet = configConsole.getUserList(null);
userMapper.selectList(null).forEach(u -> {
if (!userSet.contains(u.getUsername())) {
log.info("clear user: {} from database.", u.getUsername());
try {
userMapper.deleteById(u.getId());
} catch (Exception e) {
log.error("userMapper.deleteById error, user: " + u, e);
try {
log.info("Start clear dirty data for kafka user from database.");
List<ClusterInfoDO> clusterInfoDOS = clusterInfoMapper.selectList(null);
List<Long> clusterInfoIds = new ArrayList<>();
for (ClusterInfoDO infoDO : clusterInfoDOS) {
ContextConfig config = new ContextConfig();
config.setClusterInfoId(infoDO.getId());
config.setClusterName(infoDO.getClusterName());
config.setBootstrapServer(infoDO.getAddress());
if (StringUtils.isNotBlank(infoDO.getProperties())) {
config.setProperties(ConvertUtil.toProperties(infoDO.getProperties()));
}
ContextConfigHolder.CONTEXT_CONFIG.set(config);
if (SaslUtil.isEnableSasl() && SaslUtil.isEnableScram()) {
log.info("Start clear cluster: {}", infoDO.getClusterName());
Set<String> userSet = configConsole.getUserList(null);
QueryWrapper<KafkaUserDO> queryWrapper = new QueryWrapper<>();
queryWrapper.eq("cluster_info_id", infoDO.getId());
userMapper.selectList(queryWrapper).forEach(u -> {
if (!userSet.contains(u.getUsername())) {
log.info("clear user: {} from database.", u.getUsername());
try {
userMapper.deleteById(u.getId());
} catch (Exception e) {
log.error("userMapper.deleteById error, user: " + u, e);
}
}
});
clusterInfoIds.add(infoDO.getId());
}
}
});
log.info("Clear end.");
if (CollectionUtils.isNotEmpty(clusterInfoIds)) {
log.info("Clear the cluster id {}, which not found.", clusterInfoIds);
QueryWrapper<KafkaUserDO> wrapper = new QueryWrapper<>();
wrapper.notIn("cluster_info_id", clusterInfoIds);
userMapper.delete(wrapper);
}
log.info("Clear end.");
} finally {
ContextConfigHolder.CONTEXT_CONFIG.remove();
}
}
}

View File

@@ -6,29 +6,37 @@ import com.xuxd.kafka.console.beans.CounterMap;
import com.xuxd.kafka.console.beans.ResponseData;
import com.xuxd.kafka.console.beans.dos.KafkaUserDO;
import com.xuxd.kafka.console.beans.vo.KafkaUserDetailVO;
import com.xuxd.kafka.console.config.KafkaConfig;
import com.xuxd.kafka.console.config.ContextConfigHolder;
import com.xuxd.kafka.console.dao.KafkaUserMapper;
import com.xuxd.kafka.console.service.AclService;
import com.xuxd.kafka.console.utils.SaslUtil;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.stream.Collectors;
import kafka.console.KafkaAclConsole;
import kafka.console.KafkaConfigConsole;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.admin.ScramMechanism;
import org.apache.kafka.clients.admin.UserScramCredentialsDescription;
import org.apache.kafka.common.acl.AclBinding;
import org.apache.kafka.common.acl.AclOperation;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.beans.factory.SmartInitializingSingleton;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import scala.Tuple2;
import static com.xuxd.kafka.console.utils.SaslUtil.isEnableSasl;
import static com.xuxd.kafka.console.utils.SaslUtil.isEnableScram;
/**
* kafka-console-ui.
*
@@ -37,7 +45,7 @@ import scala.Tuple2;
**/
@Slf4j
@Service
public class AclServiceImpl implements AclService, SmartInitializingSingleton {
public class AclServiceImpl implements AclService {
@Autowired
private KafkaConfigConsole configConsole;
@@ -45,9 +53,6 @@ public class AclServiceImpl implements AclService, SmartInitializingSingleton {
@Autowired
private KafkaAclConsole aclConsole;
@Autowired
private KafkaConfig kafkaConfig;
private final KafkaUserMapper kafkaUserMapper;
public AclServiceImpl(ObjectProvider<KafkaUserMapper> kafkaUserMapper) {
@@ -64,15 +69,23 @@ public class AclServiceImpl implements AclService, SmartInitializingSingleton {
}
@Override public ResponseData addOrUpdateUser(String name, String pass) {
if (!isEnableSasl()) {
return ResponseData.create().failed("Only support SASL protocol.");
}
if (!isEnableScram()) {
return ResponseData.create().failed("Only support SASL_SCRAM.");
}
log.info("add or update user, username: {}, password: {}", name, pass);
if (!configConsole.addOrUpdateUser(name, pass)) {
Tuple2<Object, String> tuple2 = configConsole.addOrUpdateUser(name, pass);
if (!(boolean) tuple2._1()) {
log.error("add user to kafka failed.");
return ResponseData.create().failed("add user to kafka failed");
return ResponseData.create().failed("add user to kafka failed: " + tuple2._2());
}
// save user info to database.
KafkaUserDO userDO = new KafkaUserDO();
userDO.setUsername(name);
userDO.setPassword(pass);
userDO.setClusterInfoId(ContextConfigHolder.CONTEXT_CONFIG.get().getClusterInfoId());
try {
Map<String, Object> map = new HashMap<>();
map.put("username", name);
@@ -86,12 +99,24 @@ public class AclServiceImpl implements AclService, SmartInitializingSingleton {
}
@Override public ResponseData deleteUser(String name) {
if (!isEnableSasl()) {
return ResponseData.create().failed("Only support SASL protocol.");
}
if (!isEnableScram()) {
return ResponseData.create().failed("Only support SASL_SCRAM.");
}
log.info("delete user: {}", name);
Tuple2<Object, String> tuple2 = configConsole.deleteUser(name);
return (boolean) tuple2._1() ? ResponseData.create().success() : ResponseData.create().failed(tuple2._2());
}
@Override public ResponseData deleteUserAndAuth(String name) {
if (!isEnableSasl()) {
return ResponseData.create().failed("Only support SASL protocol.");
}
if (!isEnableScram()) {
return ResponseData.create().failed("Only support SASL_SCRAM.");
}
log.info("delete user and authority: {}", name);
AclEntry entry = new AclEntry();
entry.setPrincipal(name);
@@ -120,7 +145,8 @@ public class AclServiceImpl implements AclService, SmartInitializingSingleton {
Map<String, Object> resultMap = new HashMap<>();
entryMap.forEach((k, v) -> {
Map<String, List<AclEntry>> map = v.stream().collect(Collectors.groupingBy(e -> e.getResourceType() + "#" + e.getName()));
if (k.equals(kafkaConfig.getAdminUsername())) {
String username = SaslUtil.findUsername(ContextConfigHolder.CONTEXT_CONFIG.get().getProperties().getProperty(SaslConfigs.SASL_JAAS_CONFIG));
if (k.equals(username)) {
Map<String, Object> map2 = new HashMap<>(map);
Map<String, Object> userMap = new HashMap<>();
userMap.put("role", "admin");
@@ -133,7 +159,8 @@ public class AclServiceImpl implements AclService, SmartInitializingSingleton {
detailList.values().forEach(u -> {
if (!resultMap.containsKey(u.name()) && !u.credentialInfos().isEmpty()) {
if (!u.name().equals(kafkaConfig.getAdminUsername())) {
String username = SaslUtil.findUsername(ContextConfigHolder.CONTEXT_CONFIG.get().getProperties().getProperty(SaslConfigs.SASL_JAAS_CONFIG));
if (!u.name().equals(username)) {
resultMap.put(u.name(), Collections.emptyMap());
} else {
Map<String, Object> map2 = new HashMap<>();
@@ -194,27 +221,29 @@ public class AclServiceImpl implements AclService, SmartInitializingSingleton {
}
Map<String, Object> param = new HashMap<>();
param.put("username", username);
param.put("cluster_info_id", ContextConfigHolder.CONTEXT_CONFIG.get().getClusterInfoId());
List<KafkaUserDO> dos = kafkaUserMapper.selectByMap(param);
if (dos.isEmpty()) {
vo.setConsistencyDescription("Password is null.");
} else {
vo.setPassword(dos.stream().findFirst().get().getPassword());
// check for consistency.
boolean consistent = configConsole.isPassConsistent(username, vo.getPassword());
vo.setConsistencyDescription(consistent ? "Consistent" : "Password is not consistent.");
// boolean consistent = configConsole.isPassConsistent(username, vo.getPassword());
// vo.setConsistencyDescription(consistent ? "Consistent" : "Password is not consistent.");
vo.setConsistencyDescription("Can not check password consistent.");
}
return ResponseData.create().data(vo).success();
}
@Override public void afterSingletonsInstantiated() {
if (kafkaConfig.isEnableAcl() && kafkaConfig.isAdminCreate()) {
log.info("Start create admin user, username: {}, password: {}", kafkaConfig.getAdminUsername(), kafkaConfig.getAdminPassword());
boolean done = configConsole.addOrUpdateUserWithZK(kafkaConfig.getAdminUsername(), kafkaConfig.getAdminPassword());
if (!done) {
log.error("Create admin failed.");
throw new IllegalStateException();
}
}
}
// @Override public void afterSingletonsInstantiated() {
// if (kafkaConfig.isEnableAcl() && kafkaConfig.isAdminCreate()) {
// log.info("Start create admin user, username: {}, password: {}", kafkaConfig.getAdminUsername(), kafkaConfig.getAdminPassword());
// boolean done = configConsole.addOrUpdateUserWithZK(kafkaConfig.getAdminUsername(), kafkaConfig.getAdminPassword());
// if (!done) {
// log.error("Create admin failed.");
// throw new IllegalStateException();
// }
// }
// }
}

View File

@@ -0,0 +1,61 @@
package com.xuxd.kafka.console.utils;
import com.xuxd.kafka.console.config.ContextConfigHolder;
import java.util.Properties;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.admin.ScramMechanism;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.security.auth.SecurityProtocol;
/**
* kafka-console-ui.
*
* @author xuxd
* @date 2022-01-06 11:07:41
**/
public class SaslUtil {
public static final Pattern JAAS_PATTERN = Pattern.compile("^.*(username=\"(.*)\"[ \t]+).*$");
private SaslUtil() {
}
public static String findUsername(String saslJaasConfig) {
Matcher matcher = JAAS_PATTERN.matcher(saslJaasConfig);
return matcher.find() ? matcher.group(2) : "";
}
public static boolean isEnableSasl() {
Properties properties = ContextConfigHolder.CONTEXT_CONFIG.get().getProperties();
if (!properties.containsKey(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG)) {
return false;
}
String s = properties.getProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG);
SecurityProtocol protocol = SecurityProtocol.valueOf(s);
switch (protocol) {
case SASL_SSL:
case SASL_PLAINTEXT:
return true;
default:
return false;
}
}
public static boolean isEnableScram() {
Properties properties = ContextConfigHolder.CONTEXT_CONFIG.get().getProperties();
if (!properties.containsKey(SaslConfigs.SASL_MECHANISM)) {
return false;
}
String s = properties.getProperty(SaslConfigs.SASL_MECHANISM);
ScramMechanism mechanism = ScramMechanism.fromMechanismName(s);
switch (mechanism) {
case UNKNOWN:
return false;
default:
return true;
}
}
}