kafka console initial commit

This commit is contained in:
许晓东
2021-08-30 20:24:24 +08:00
commit 5b36126ea4
30 changed files with 1727 additions and 0 deletions

View File

@@ -0,0 +1,13 @@
package com.xuxd.kafka.console;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class KafkaConsoleUiApplication {
public static void main(String[] args) {
SpringApplication.run(KafkaConsoleUiApplication.class, args);
}
}

View File

@@ -0,0 +1,97 @@
package com.xuxd.kafka.console.beans;
import lombok.Data;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.common.acl.AccessControlEntry;
import org.apache.kafka.common.acl.AccessControlEntryFilter;
import org.apache.kafka.common.acl.AclBinding;
import org.apache.kafka.common.acl.AclBindingFilter;
import org.apache.kafka.common.acl.AclOperation;
import org.apache.kafka.common.acl.AclPermissionType;
import org.apache.kafka.common.resource.PatternType;
import org.apache.kafka.common.resource.ResourcePattern;
import org.apache.kafka.common.resource.ResourcePatternFilter;
import org.apache.kafka.common.resource.ResourceType;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
/**
* kafka-console-ui.
*
* @author xuxd
* @date 2021-08-28 20:17:27
**/
@Data
public class AclEntry {
private String resourceType;
private String name = null;
private String patternType;
private String principal = null;
private String host;
private String operation;
private String permissionType;
public static AclEntry valueOf(AclBinding binding) {
AclEntry entry = new AclEntry();
entry.setResourceType(binding.pattern().resourceType().name());
entry.setName(binding.pattern().name());
entry.setPatternType(binding.pattern().patternType().name());
entry.setPrincipal(KafkaPrincipal.fromString(binding.entry().principal()).getName());
entry.setHost(binding.entry().host());
entry.setOperation(binding.entry().operation().name());
entry.setPermissionType(binding.entry().permissionType().name());
return entry;
}
public AclBinding toAclBinding() {
ResourceType resourceType = StringUtils.isBlank(this.resourceType) ? ResourceType.UNKNOWN : ResourceType.valueOf(this.resourceType);
String resourceName = StringUtils.isBlank(this.name) ? ResourcePattern.WILDCARD_RESOURCE : this.name;
PatternType patternType = StringUtils.isBlank(this.patternType) ? PatternType.LITERAL : PatternType.valueOf(this.patternType);
String principal = StringUtils.isNotBlank(this.principal) ? new KafkaPrincipal(KafkaPrincipal.USER_TYPE, this.principal).toString() : KafkaPrincipal.ANONYMOUS.toString();
String host = StringUtils.isBlank(this.host) ? ResourcePattern.WILDCARD_RESOURCE : this.host;
AclOperation operation = StringUtils.isBlank(this.operation) ? AclOperation.UNKNOWN : AclOperation.valueOf(this.operation);
AclPermissionType permissionType = StringUtils.isBlank(this.permissionType) ? AclPermissionType.ALLOW : AclPermissionType.valueOf(this.permissionType);
return new AclBinding(new ResourcePattern(resourceType, resourceName, patternType),
new AccessControlEntry(principal, host, operation, permissionType));
}
public AclBindingFilter toAclBindingFilter() {
ResourceType resourceType = StringUtils.isBlank(this.resourceType) ? ResourceType.UNKNOWN : ResourceType.valueOf(this.resourceType);
String resourceName = StringUtils.isBlank(this.name) ? ResourcePattern.WILDCARD_RESOURCE : this.name;
PatternType patternType = StringUtils.isBlank(this.patternType) ? PatternType.LITERAL : PatternType.valueOf(this.patternType);
String principal = StringUtils.isNotBlank(this.principal) ? new KafkaPrincipal(KafkaPrincipal.USER_TYPE, this.principal).toString() : KafkaPrincipal.ANONYMOUS.toString();
String host = StringUtils.isBlank(this.host) ? ResourcePattern.WILDCARD_RESOURCE : this.host;
AclOperation operation = StringUtils.isBlank(this.operation) ? AclOperation.UNKNOWN : AclOperation.valueOf(this.operation);
AclPermissionType permissionType = StringUtils.isBlank(this.permissionType) ? AclPermissionType.ALLOW : AclPermissionType.valueOf(this.permissionType);
AclBindingFilter filter = new AclBindingFilter(new ResourcePatternFilter(resourceType, resourceName, patternType),
new AccessControlEntryFilter(principal, host, operation, permissionType));
return filter;
}
public AclBindingFilter toAclBindingFilter(boolean allResource, boolean allPrincipal, boolean allOperation) {
AclEntry entry = deepClone();
AclBindingFilter filter = new AclBindingFilter(new ResourcePatternFilter(allResource ? ResourceType.ANY : ResourceType.valueOf(entry.resourceType), entry.name, PatternType.LITERAL),
new AccessControlEntryFilter(allPrincipal ? null : entry.principal, entry.host, allOperation ? AclOperation.ALL : AclOperation.valueOf(entry.operation), AclPermissionType.ANY));
System.out.println(filter);
return filter;
}
public AclEntry deepClone() {
AclEntry entry = new AclEntry();
entry.setResourceType(this.resourceType);
entry.setName(this.name);
entry.setPatternType(this.patternType);
entry.setPrincipal(this.principal);
entry.setHost(this.host);
entry.setOperation(this.operation);
entry.setPermissionType(this.permissionType);
return entry;
}
}

View File

@@ -0,0 +1,17 @@
package com.xuxd.kafka.console.beans;
import lombok.Data;
/**
* kafka-console-ui.
*
* @author xuxd
* @date 2021-08-28 19:43:26
**/
@Data
public class AclUser {
private String username;
private String password;
}

View File

@@ -0,0 +1,26 @@
package com.xuxd.kafka.console.beans;
import java.util.List;
import lombok.Getter;
/**
* kafka-console-ui.
*
* @author xuxd
* @date 2021-08-30 20:10:07
**/
public class CounterList<T> {
@Getter
private List<T> list;
private int total;
public CounterList(List<T> list) {
this.list = list;
}
public int getTotal() {
return list != null ? list.size() : 0;
}
}

View File

@@ -0,0 +1,74 @@
package com.xuxd.kafka.console.beans;
import lombok.Getter;
import lombok.Setter;
/**
* kafka-console-ui.
*
* @author xuxd
* @date 2021-08-28 19:29:11
**/
public class ResponseData<T> {
public static final int SUCCESS_CODE = 0, FAILED_CODE = -9999;
public static final String SUCCESS_MSG = "success", FAILED_MSG = "failed";
@Getter
@Setter
private int code = SUCCESS_CODE;
@Getter
@Setter
private String msg = SUCCESS_MSG;
@Getter
@Setter
private T data;
public static ResponseData create() {
return new ResponseData();
}
public static <T> ResponseData create(Class<T> cls) {
return new ResponseData<T>();
}
public ResponseData<T> data(T t) {
this.data = t;
return this;
}
public ResponseData<T> success() {
this.code = SUCCESS_CODE;
this.msg = SUCCESS_MSG;
return this;
}
public ResponseData<T> success(String msg) {
this.code = SUCCESS_CODE;
this.msg = msg;
return this;
}
public ResponseData<T> failed() {
this.code = FAILED_CODE;
this.msg = FAILED_MSG;
return this;
}
public ResponseData<T> failed(String msg) {
this.code = FAILED_CODE;
this.msg = msg;
return this;
}
@Override public String toString() {
return "ResponseData{" +
"code=" + code +
", msg='" + msg + '\'' +
", data=" + data +
'}';
}
}

View File

@@ -0,0 +1,37 @@
package com.xuxd.kafka.console.beans.dto;
import com.xuxd.kafka.console.beans.AclEntry;
import lombok.Data;
import org.apache.kafka.common.resource.ResourceType;
/**
* kafka-console-ui.
*
* @author xuxd
* @date 2021-08-30 16:28:47
**/
@Data
public class ConsumerAuthDTO {
private String topic;
private String groupId;
private String username;
public AclEntry toTopicEntry() {
AclEntry entry = new AclEntry();
entry.setPrincipal(username);
entry.setName(topic);
entry.setResourceType(ResourceType.TOPIC.name());
return entry;
}
public AclEntry toGroupEntry() {
AclEntry entry = new AclEntry();
entry.setPrincipal(username);
entry.setName(groupId);
entry.setResourceType(ResourceType.GROUP.name());
return entry;
}
}

View File

@@ -0,0 +1,23 @@
package com.xuxd.kafka.console.beans.dto;
import com.xuxd.kafka.console.beans.AclEntry;
import lombok.Data;
/**
* kafka-console-ui.
*
* @author xuxd
* @date 2021-08-30 19:37:47
**/
@Data
public class DeleteAclDTO {
private String username;
public AclEntry toUserEntry() {
AclEntry entry = new AclEntry();
entry.setPrincipal(username);
return entry;
}
}

View File

@@ -0,0 +1,27 @@
package com.xuxd.kafka.console.beans.dto;
import com.xuxd.kafka.console.beans.AclEntry;
import lombok.Data;
import org.apache.kafka.common.resource.ResourceType;
/**
* kafka-console-ui.
*
* @author xuxd
* @date 2021-08-30 16:28:23
**/
@Data
public class ProducerAuthDTO {
private String topic;
private String username;
public AclEntry toEntry() {
AclEntry entry = new AclEntry();
entry.setPrincipal(username);
entry.setName(topic);
entry.setResourceType(ResourceType.TOPIC.name());
return entry;
}
}

View File

@@ -0,0 +1,65 @@
package com.xuxd.kafka.console.config;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;
/**
* kafka-console-ui.
*
* @author xuxd
* @date 2021-08-28 11:21:12
**/
@Configuration
@ConfigurationProperties(prefix = "kafka.config")
public class KafkaConfig {
private String bootstrapServer;
private int requestTimeoutMs;
private String securityProtocol;
private String saslMechanism;
private String saslJaasConfig;
public String getBootstrapServer() {
return bootstrapServer;
}
public void setBootstrapServer(String bootstrapServer) {
this.bootstrapServer = bootstrapServer;
}
public int getRequestTimeoutMs() {
return requestTimeoutMs;
}
public void setRequestTimeoutMs(int requestTimeoutMs) {
this.requestTimeoutMs = requestTimeoutMs;
}
public String getSecurityProtocol() {
return securityProtocol;
}
public void setSecurityProtocol(String securityProtocol) {
this.securityProtocol = securityProtocol;
}
public String getSaslMechanism() {
return saslMechanism;
}
public void setSaslMechanism(String saslMechanism) {
this.saslMechanism = saslMechanism;
}
public String getSaslJaasConfig() {
return saslJaasConfig;
}
public void setSaslJaasConfig(String saslJaasConfig) {
this.saslJaasConfig = saslJaasConfig;
}
}

View File

@@ -0,0 +1,26 @@
package com.xuxd.kafka.console.config;
import kafka.console.KafkaAclConsole;
import kafka.console.KafkaConfigConsole;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* kafka-console-ui.
*
* @author xuxd
* @date 2021-08-28 11:45:26
**/
@Configuration
public class KafkaConfiguration {
@Bean
public KafkaConfigConsole kafkaConfigConsole(KafkaConfig config) {
return new KafkaConfigConsole(config);
}
@Bean
public KafkaAclConsole kafkaAclConsole(KafkaConfig config) {
return new KafkaAclConsole(config);
}
}

View File

@@ -0,0 +1,98 @@
package com.xuxd.kafka.console.controller;
import com.xuxd.kafka.console.beans.AclEntry;
import com.xuxd.kafka.console.beans.dto.ConsumerAuthDTO;
import com.xuxd.kafka.console.beans.dto.DeleteAclDTO;
import com.xuxd.kafka.console.beans.dto.ProducerAuthDTO;
import com.xuxd.kafka.console.service.AclService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.DeleteMapping;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* kafka-console-ui.
*
* @author xuxd
* @date 2021-08-28 11:47:48
**/
@RestController
@RequestMapping("/acl")
public class AclAuthController {
@Autowired
private AclService aclService;
@GetMapping
public Object getAclList() {
return aclService.getAclList();
}
@PostMapping
public Object addAcl(@RequestBody AclEntry entry) {
return aclService.addAcl(entry);
}
/**
* add producer acl.
*
* @param param entry.topic && entry.username must.
* @return
*/
@PostMapping("/producer")
public Object addProducerAcl(@RequestBody ProducerAuthDTO param) {
return aclService.addProducerAcl(param.toEntry());
}
/**
* add consumer acl.
*
* @param param entry.topic && entry.groupId entry.username must.
* @return
*/
@PostMapping("/consumer")
public Object addConsumerAcl(@RequestBody ConsumerAuthDTO param) {
return aclService.addConsumerAcl(param.toTopicEntry(), param.toGroupEntry());
}
/**
* delete user acl .
*
* @param param entry.username
* @return
*/
@DeleteMapping("/user")
public Object deleteAclByUser(@RequestBody DeleteAclDTO param) {
return aclService.deleteUserAcl(param.toUserEntry());
}
/**
* add producer acl.
*
* @param param entry.topic && entry.username must.
* @return
*/
@DeleteMapping("/producer")
public Object deleteProducerAcl(@RequestBody ProducerAuthDTO param) {
return aclService.deleteProducerAcl(param.toEntry());
}
/**
* add consumer acl.
*
* @param param entry.topic && entry.groupId entry.username must.
* @return
*/
@DeleteMapping("/consumer")
public Object deleteConsumerAcl(@RequestBody ConsumerAuthDTO param) {
return aclService.deleteConsumerAcl(param.toTopicEntry(), param.toGroupEntry());
}
}

View File

@@ -0,0 +1,40 @@
package com.xuxd.kafka.console.controller;
import com.xuxd.kafka.console.beans.AclUser;
import com.xuxd.kafka.console.service.AclService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.DeleteMapping;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* kafka-console-ui.
*
* @author xuxd
* @date 2021-08-28 21:13:05
**/
@RestController
@RequestMapping("/user")
public class AclUserController {
@Autowired
private AclService aclService;
@GetMapping
public Object getUserList() {
return aclService.getUserList();
}
@PostMapping
public Object addOrUpdateUser(@RequestBody AclUser user) {
return aclService.addOrUpdateUser(user.getUsername(), user.getPassword());
}
@DeleteMapping
public Object deleteUser(@RequestBody AclUser user) {
return aclService.deleteUser(user.getUsername());
}
}

View File

@@ -0,0 +1,19 @@
package com.xuxd.kafka.console.controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* kafka-console-ui.
*
* @author xuxd
* @date 2021-08-28 10:54:58
**/
@RestController
public class IndexController {
@RequestMapping("/")
public String index() {
return "hello world";
}
}

View File

@@ -0,0 +1,37 @@
package com.xuxd.kafka.console.service;
import com.xuxd.kafka.console.beans.AclEntry;
import com.xuxd.kafka.console.beans.ResponseData;
import java.util.Set;
/**
* kafka-console-ui.
*
* @author xuxd
* @date 2021-08-28 11:44:06
**/
public interface AclService {
ResponseData<Set<String>> getUserList();
ResponseData addOrUpdateUser(String name, String pass);
ResponseData deleteUser(String name);
ResponseData getAclList();
ResponseData deleteAcl(AclEntry entry);
ResponseData addAcl(AclEntry entry);
ResponseData addProducerAcl(AclEntry entry);
ResponseData addConsumerAcl(AclEntry topic, AclEntry group);
ResponseData deleteProducerAcl(AclEntry entry);
ResponseData deleteConsumerAcl(AclEntry topic, AclEntry group);
ResponseData deleteUserAcl(AclEntry entry);
}

View File

@@ -0,0 +1,84 @@
package com.xuxd.kafka.console.service.impl;
import com.xuxd.kafka.console.beans.AclEntry;
import com.xuxd.kafka.console.beans.CounterList;
import com.xuxd.kafka.console.beans.ResponseData;
import com.xuxd.kafka.console.service.AclService;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import kafka.console.KafkaAclConsole;
import kafka.console.KafkaConfigConsole;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.common.acl.AclBinding;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
/**
* kafka-console-ui.
*
* @author xuxd
* @date 2021-08-28 11:44:40
**/
@Slf4j
@Service
public class AclServiceImpl implements AclService {
@Autowired
private KafkaConfigConsole configConsole;
@Autowired
private KafkaAclConsole aclConsole;
@Override public ResponseData<Set<String>> getUserList() {
try {
return ResponseData.create(Set.class).data(configConsole.getUserList()).success();
} catch (Exception e) {
log.error("getUserList error.", e);
return ResponseData.create().failed();
}
}
@Override public ResponseData addOrUpdateUser(String name, String pass) {
return configConsole.addOrUpdateUser(name, pass) ? ResponseData.create().success() : ResponseData.create().failed();
}
@Override public ResponseData deleteUser(String name) {
return configConsole.deleteUser(name) ? ResponseData.create().success() : ResponseData.create().failed();
}
@Override public ResponseData getAclList() {
List<AclBinding> aclBindingList = aclConsole.getAclList();
return ResponseData.create().data(new CounterList<>(aclBindingList.stream().map(x -> AclEntry.valueOf(x)).collect(Collectors.toList()))).success();
}
@Override public ResponseData deleteAcl(AclEntry entry) {
return aclConsole.deleteAcl(entry, false, false, false) ? ResponseData.create().success() : ResponseData.create().failed();
}
@Override public ResponseData addAcl(AclEntry entry) {
return aclConsole.addAcl(Collections.singletonList(entry.toAclBinding())) ? ResponseData.create().success() : ResponseData.create().failed();
}
@Override public ResponseData addProducerAcl(AclEntry entry) {
return aclConsole.addProducerAcl(entry) ? ResponseData.create().success() : ResponseData.create().failed();
}
@Override public ResponseData addConsumerAcl(AclEntry topic, AclEntry group) {
return aclConsole.addConsumerAcl(topic, group) ? ResponseData.create().success() : ResponseData.create().failed();
}
@Override public ResponseData deleteProducerAcl(AclEntry entry) {
return aclConsole.deleteProducerAcl(entry) ? ResponseData.create().success() : ResponseData.create().failed();
}
@Override public ResponseData deleteConsumerAcl(AclEntry topic, AclEntry group) {
return aclConsole.deleteConsumerAcl(topic, group) ? ResponseData.create().success() : ResponseData.create().failed();
}
@Override public ResponseData deleteUserAcl(AclEntry entry) {
return aclConsole.deleteUserAcl(entry) ? ResponseData.create().success() : ResponseData.create().failed();
}
}