没有编辑权限,隐藏集群属性;打印全部操作日志.
This commit is contained in:
@@ -1,6 +1,9 @@
|
||||
package com.xuxd.kafka.console.aspect;
|
||||
|
||||
import com.xuxd.kafka.console.aspect.annotation.ControllerLog;
|
||||
import com.xuxd.kafka.console.beans.Credentials;
|
||||
import com.xuxd.kafka.console.config.LogConfig;
|
||||
import com.xuxd.kafka.console.filter.CredentialsContext;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.aspectj.lang.ProceedingJoinPoint;
|
||||
@@ -12,6 +15,7 @@ import org.springframework.stereotype.Component;
|
||||
|
||||
import java.lang.reflect.Method;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
|
||||
@@ -28,6 +32,12 @@ public class ControllerLogAspect {
|
||||
|
||||
private ReentrantLock lock = new ReentrantLock();
|
||||
|
||||
private final LogConfig logConfig;
|
||||
|
||||
public ControllerLogAspect(LogConfig logConfig) {
|
||||
this.logConfig = logConfig;
|
||||
}
|
||||
|
||||
@Pointcut("@annotation(com.xuxd.kafka.console.aspect.annotation.ControllerLog)")
|
||||
private void pointcut() {
|
||||
|
||||
@@ -35,6 +45,9 @@ public class ControllerLogAspect {
|
||||
|
||||
@Around("pointcut()")
|
||||
public Object around(ProceedingJoinPoint joinPoint) throws Throwable {
|
||||
if (!logConfig.isPrintControllerLog()) {
|
||||
return joinPoint.proceed();
|
||||
}
|
||||
StringBuilder params = new StringBuilder("[");
|
||||
try {
|
||||
String methodName = getMethodFullName(joinPoint.getTarget().getClass().getName(), joinPoint.getSignature().getName());
|
||||
@@ -56,6 +69,10 @@ public class ControllerLogAspect {
|
||||
String resStr = "[" + (res != null ? res.toString() : "") + "]";
|
||||
|
||||
StringBuilder sb = new StringBuilder();
|
||||
Credentials credentials = CredentialsContext.get();
|
||||
if (credentials != null) {
|
||||
sb.append("[").append(credentials.getUsername()).append("] ");
|
||||
}
|
||||
String shortMethodName = descMap.getOrDefault(methodName, ".-");
|
||||
shortMethodName = shortMethodName.substring(shortMethodName.lastIndexOf(".") + 1);
|
||||
sb.append("[").append(shortMethodName)
|
||||
@@ -85,6 +102,9 @@ public class ControllerLogAspect {
|
||||
Class<?>[] clzArr = new Class[args.length];
|
||||
for (int i = 0; i < args.length; i++) {
|
||||
clzArr[i] = args[i].getClass();
|
||||
if (List.class.isAssignableFrom(clzArr[i])) {
|
||||
clzArr[i] = List.class;
|
||||
}
|
||||
}
|
||||
method = aClass.getDeclaredMethod(methodName, clzArr);
|
||||
|
||||
|
||||
@@ -101,17 +101,27 @@ public class PermissionAspect {
|
||||
throw new UnAuthorizedException(credentials.getUsername() + ":" + allowPermSet);
|
||||
}
|
||||
|
||||
boolean unauthorized = true;
|
||||
boolean notFoundHideProperty = true;
|
||||
String roleIds = userDO.getRoleIds();
|
||||
List<Long> roleIdList = Arrays.stream(roleIds.split(",")).map(String::trim).filter(StringUtils::isNotEmpty).map(Long::valueOf).collect(Collectors.toList());
|
||||
for (Long roleId : roleIdList) {
|
||||
Set<String> permSet = rolePermCache.getRolePermCache().getOrDefault(roleId, Collections.emptySet());
|
||||
for (String p : allowPermSet) {
|
||||
if (permSet.contains(p)) {
|
||||
return;
|
||||
unauthorized = false;
|
||||
}
|
||||
}
|
||||
if (permSet.contains(authConfig.getHideClusterPropertyPerm())) {
|
||||
notFoundHideProperty = false;
|
||||
}
|
||||
}
|
||||
if (unauthorized) {
|
||||
throw new UnAuthorizedException(credentials.getUsername() + ":" + allowPermSet);
|
||||
}
|
||||
if (authConfig.isHideClusterProperty() && notFoundHideProperty) {
|
||||
credentials.setHideClusterProperty(true);
|
||||
}
|
||||
throw new UnAuthorizedException(credentials.getUsername() + ":" + allowPermSet);
|
||||
}
|
||||
|
||||
private Map<String, Set<String>> checkPermMap(String methodName, String[] value) {
|
||||
|
||||
@@ -15,6 +15,11 @@ public class Credentials {
|
||||
|
||||
private long expiration;
|
||||
|
||||
/**
|
||||
* 是否隐藏集群属性
|
||||
*/
|
||||
private boolean hideClusterProperty;
|
||||
|
||||
public boolean isInvalid() {
|
||||
return this == INVALID;
|
||||
}
|
||||
|
||||
@@ -18,4 +18,8 @@ public class AuthConfig {
|
||||
private String secret = "kafka-console-ui-default-secret";
|
||||
|
||||
private long expireHours;
|
||||
|
||||
private boolean hideClusterProperty;
|
||||
|
||||
private String hideClusterPropertyPerm = "op:cluster-switch:edit";
|
||||
}
|
||||
|
||||
23
src/main/java/com/xuxd/kafka/console/config/LogConfig.java
Normal file
23
src/main/java/com/xuxd/kafka/console/config/LogConfig.java
Normal file
@@ -0,0 +1,23 @@
|
||||
package com.xuxd.kafka.console.config;
|
||||
|
||||
import org.springframework.boot.context.properties.ConfigurationProperties;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
|
||||
/**
|
||||
* @author: xuxd
|
||||
* @since: 2023/8/20 20:00
|
||||
**/
|
||||
@Configuration
|
||||
@ConfigurationProperties(prefix = "log")
|
||||
public class LogConfig {
|
||||
|
||||
private boolean printControllerLog = true;
|
||||
|
||||
public boolean isPrintControllerLog() {
|
||||
return printControllerLog;
|
||||
}
|
||||
|
||||
public void setPrintControllerLog(boolean printControllerLog) {
|
||||
this.printControllerLog = printControllerLog;
|
||||
}
|
||||
}
|
||||
@@ -1,5 +1,6 @@
|
||||
package com.xuxd.kafka.console.controller;
|
||||
|
||||
import com.xuxd.kafka.console.aspect.annotation.ControllerLog;
|
||||
import com.xuxd.kafka.console.aspect.annotation.Permission;
|
||||
import com.xuxd.kafka.console.beans.AclEntry;
|
||||
import com.xuxd.kafka.console.beans.dto.AddAuthDTO;
|
||||
@@ -46,6 +47,7 @@ public class AclAuthController {
|
||||
return aclService.getAclList(param.toEntry());
|
||||
}
|
||||
|
||||
@ControllerLog("增加Acl")
|
||||
@Permission({"acl:authority:add-principal", "acl:authority:add", "acl:sasl-scram:add-auth"})
|
||||
@PostMapping
|
||||
public Object addAcl(@RequestBody AddAuthDTO param) {
|
||||
@@ -58,6 +60,7 @@ public class AclAuthController {
|
||||
* @param param entry.topic && entry.username must.
|
||||
* @return
|
||||
*/
|
||||
@ControllerLog("增加ProducerAcl")
|
||||
@Permission({"acl:authority:producer", "acl:sasl-scram:producer"})
|
||||
@PostMapping("/producer")
|
||||
public Object addProducerAcl(@RequestBody ProducerAuthDTO param) {
|
||||
@@ -71,6 +74,7 @@ public class AclAuthController {
|
||||
* @param param entry.topic && entry.groupId entry.username must.
|
||||
* @return
|
||||
*/
|
||||
@ControllerLog("增加ConsumerAcl")
|
||||
@Permission({"acl:authority:consumer", "acl:sasl-scram:consumer"})
|
||||
@PostMapping("/consumer")
|
||||
public Object addConsumerAcl(@RequestBody ConsumerAuthDTO param) {
|
||||
@@ -84,6 +88,7 @@ public class AclAuthController {
|
||||
* @param entry entry
|
||||
* @return
|
||||
*/
|
||||
@ControllerLog("删除Acl")
|
||||
@Permission({"acl:authority:clean", "acl:sasl-scram:pure"})
|
||||
@DeleteMapping
|
||||
public Object deleteAclByUser(@RequestBody AclEntry entry) {
|
||||
@@ -96,6 +101,7 @@ public class AclAuthController {
|
||||
* @param param entry.username
|
||||
* @return
|
||||
*/
|
||||
@ControllerLog("删除Acl")
|
||||
@Permission({"acl:authority:clean", "acl:sasl-scram:pure"})
|
||||
@DeleteMapping("/user")
|
||||
public Object deleteAclByUser(@RequestBody DeleteAclDTO param) {
|
||||
@@ -103,11 +109,12 @@ public class AclAuthController {
|
||||
}
|
||||
|
||||
/**
|
||||
* add producer acl.
|
||||
* delete producer acl.
|
||||
*
|
||||
* @param param entry.topic && entry.username must.
|
||||
* @return
|
||||
*/
|
||||
@ControllerLog("删除ProducerAcl")
|
||||
@Permission({"acl:authority:clean", "acl:sasl-scram:pure"})
|
||||
@DeleteMapping("/producer")
|
||||
public Object deleteProducerAcl(@RequestBody ProducerAuthDTO param) {
|
||||
@@ -116,11 +123,12 @@ public class AclAuthController {
|
||||
}
|
||||
|
||||
/**
|
||||
* add consumer acl.
|
||||
* delete consumer acl.
|
||||
*
|
||||
* @param param entry.topic && entry.groupId entry.username must.
|
||||
* @return
|
||||
*/
|
||||
@ControllerLog("删除ConsumerAcl")
|
||||
@Permission({"acl:authority:clean", "acl:sasl-scram:pure"})
|
||||
@DeleteMapping("/consumer")
|
||||
public Object deleteConsumerAcl(@RequestBody ConsumerAuthDTO param) {
|
||||
@@ -134,6 +142,7 @@ public class AclAuthController {
|
||||
* @param param acl principal.
|
||||
* @return true or false.
|
||||
*/
|
||||
@ControllerLog("清除Acl")
|
||||
@Permission({"acl:authority:clean", "acl:sasl-scram:pure"})
|
||||
@DeleteMapping("/clear")
|
||||
public Object clearAcl(@RequestBody DeleteAclDTO param) {
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
package com.xuxd.kafka.console.controller;
|
||||
|
||||
import com.xuxd.kafka.console.aspect.annotation.ControllerLog;
|
||||
import com.xuxd.kafka.console.aspect.annotation.Permission;
|
||||
import com.xuxd.kafka.console.beans.AclEntry;
|
||||
import com.xuxd.kafka.console.beans.AclUser;
|
||||
@@ -33,12 +34,14 @@ public class AclUserController {
|
||||
return aclService.getUserList();
|
||||
}
|
||||
|
||||
@ControllerLog("增加SaslUser")
|
||||
@Permission({"acl:sasl-scram:add-update", "acl:sasl-scram:add-auth"})
|
||||
@PostMapping
|
||||
public Object addOrUpdateUser(@RequestBody AclUser user) {
|
||||
return aclService.addOrUpdateUser(user.getUsername(), user.getPassword());
|
||||
}
|
||||
|
||||
@ControllerLog("删除SaslUser")
|
||||
@Permission({"acl:sasl-scram:del", "acl:sasl-scram:pure"})
|
||||
@DeleteMapping
|
||||
public Object deleteUser(@RequestBody AclUser user) {
|
||||
@@ -46,6 +49,7 @@ public class AclUserController {
|
||||
}
|
||||
|
||||
|
||||
@ControllerLog("删除SaslUser和Acl")
|
||||
@Permission({"acl:sasl-scram:del", "acl:sasl-scram:pure"})
|
||||
@DeleteMapping("/auth")
|
||||
public Object deleteUserAndAuth(@RequestBody AclUser user) {
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
package com.xuxd.kafka.console.controller;
|
||||
|
||||
import com.xuxd.kafka.console.aspect.annotation.ControllerLog;
|
||||
import com.xuxd.kafka.console.aspect.annotation.Permission;
|
||||
import com.xuxd.kafka.console.beans.ResponseData;
|
||||
import com.xuxd.kafka.console.beans.dto.AlterClientQuotaDTO;
|
||||
@@ -28,6 +29,7 @@ public class ClientQuotaController {
|
||||
return clientQuotaService.getClientQuotaConfigs(request.getTypes(), request.getNames());
|
||||
}
|
||||
|
||||
@ControllerLog("增加限流配额")
|
||||
@Permission({"quota:user:add", "quota:client:add", "quota:user-client:add", "quota:edit"})
|
||||
@PostMapping
|
||||
public Object alterClientQuotaConfigs(@RequestBody AlterClientQuotaDTO request) {
|
||||
@@ -41,6 +43,7 @@ public class ClientQuotaController {
|
||||
return clientQuotaService.alterClientQuotaConfigs(request);
|
||||
}
|
||||
|
||||
@ControllerLog("删除限流配额")
|
||||
@Permission("quota:del")
|
||||
@DeleteMapping
|
||||
public Object deleteClientQuotaConfigs(@RequestBody AlterClientQuotaDTO request) {
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
package com.xuxd.kafka.console.controller;
|
||||
|
||||
import com.xuxd.kafka.console.aspect.annotation.ControllerLog;
|
||||
import com.xuxd.kafka.console.aspect.annotation.Permission;
|
||||
import com.xuxd.kafka.console.beans.dto.ClusterInfoDTO;
|
||||
import com.xuxd.kafka.console.service.ClusterService;
|
||||
@@ -30,23 +31,27 @@ public class ClusterController {
|
||||
return clusterService.getClusterInfo();
|
||||
}
|
||||
|
||||
@Permission("op:cluster-switch")
|
||||
@GetMapping("/info")
|
||||
public Object getClusterInfoList() {
|
||||
return clusterService.getClusterInfoList();
|
||||
}
|
||||
|
||||
@ControllerLog("增加集群信息")
|
||||
@Permission("op:cluster-switch:add")
|
||||
@PostMapping("/info")
|
||||
public Object addClusterInfo(@RequestBody ClusterInfoDTO dto) {
|
||||
return clusterService.addClusterInfo(dto.to());
|
||||
}
|
||||
|
||||
@ControllerLog("删除集群信息")
|
||||
@Permission("op:cluster-switch:del")
|
||||
@DeleteMapping("/info")
|
||||
public Object deleteClusterInfo(@RequestBody ClusterInfoDTO dto) {
|
||||
return clusterService.deleteClusterInfo(dto.getId());
|
||||
}
|
||||
|
||||
@ControllerLog("编辑集群信息")
|
||||
@Permission("op:cluster-switch:edit")
|
||||
@PutMapping("/info")
|
||||
public Object updateClusterInfo(@RequestBody ClusterInfoDTO dto) {
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
package com.xuxd.kafka.console.controller;
|
||||
|
||||
import com.xuxd.kafka.console.aspect.annotation.ControllerLog;
|
||||
import com.xuxd.kafka.console.aspect.annotation.Permission;
|
||||
import com.xuxd.kafka.console.beans.ResponseData;
|
||||
import com.xuxd.kafka.console.beans.dto.AlterConfigDTO;
|
||||
@@ -48,12 +49,14 @@ public class ConfigController {
|
||||
return configService.getTopicConfig(topic);
|
||||
}
|
||||
|
||||
@ControllerLog("编辑topic配置")
|
||||
@Permission("topic:property-config:edit")
|
||||
@PostMapping("/topic")
|
||||
public Object setTopicConfig(@RequestBody AlterConfigDTO dto) {
|
||||
return configService.alterTopicConfig(dto.getEntity(), dto.to(), AlterType.SET);
|
||||
}
|
||||
|
||||
@ControllerLog("删除topic配置")
|
||||
@Permission("topic:property-config:del")
|
||||
@DeleteMapping("/topic")
|
||||
public Object deleteTopicConfig(@RequestBody AlterConfigDTO dto) {
|
||||
@@ -66,12 +69,14 @@ public class ConfigController {
|
||||
return configService.getBrokerConfig(brokerId);
|
||||
}
|
||||
|
||||
@ControllerLog("设置broker配置")
|
||||
@Permission("cluster:edit")
|
||||
@PostMapping("/broker")
|
||||
public Object setBrokerConfig(@RequestBody AlterConfigDTO dto) {
|
||||
return configService.alterBrokerConfig(dto.getEntity(), dto.to(), AlterType.SET);
|
||||
}
|
||||
|
||||
@ControllerLog("编辑broker配置")
|
||||
@Permission("cluster:edit")
|
||||
@DeleteMapping("/broker")
|
||||
public Object deleteBrokerConfig(@RequestBody AlterConfigDTO dto) {
|
||||
@@ -84,12 +89,14 @@ public class ConfigController {
|
||||
return configService.getBrokerLoggerConfig(brokerId);
|
||||
}
|
||||
|
||||
@ControllerLog("编辑broker日志配置")
|
||||
@Permission("cluster:edit")
|
||||
@PostMapping("/broker/logger")
|
||||
public Object setBrokerLoggerConfig(@RequestBody AlterConfigDTO dto) {
|
||||
return configService.alterBrokerLoggerConfig(dto.getEntity(), dto.to(), AlterType.SET);
|
||||
}
|
||||
|
||||
@ControllerLog("删除broker日志配置")
|
||||
@Permission("cluster:edit")
|
||||
@DeleteMapping("/broker/logger")
|
||||
public Object deleteBrokerLoggerConfig(@RequestBody AlterConfigDTO dto) {
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
package com.xuxd.kafka.console.controller;
|
||||
|
||||
import com.xuxd.kafka.console.aspect.annotation.ControllerLog;
|
||||
import com.xuxd.kafka.console.aspect.annotation.Permission;
|
||||
import com.xuxd.kafka.console.beans.ResponseData;
|
||||
import com.xuxd.kafka.console.beans.dto.AddSubscriptionDTO;
|
||||
@@ -43,6 +44,7 @@ public class ConsumerController {
|
||||
return consumerService.getConsumerGroupList(groupIdList, stateSet);
|
||||
}
|
||||
|
||||
@ControllerLog("删除消费组")
|
||||
@Permission("group:del")
|
||||
@DeleteMapping("/group")
|
||||
public Object deleteConsumerGroup(@RequestParam("groupId") String groupId) {
|
||||
@@ -61,12 +63,14 @@ public class ConsumerController {
|
||||
return consumerService.getConsumerDetail(groupId);
|
||||
}
|
||||
|
||||
@ControllerLog("新增消费组")
|
||||
@Permission("group:add")
|
||||
@PostMapping("/subscription")
|
||||
public Object addSubscription(@RequestBody AddSubscriptionDTO subscriptionDTO) {
|
||||
return consumerService.addSubscription(subscriptionDTO.getGroupId(), subscriptionDTO.getTopic());
|
||||
}
|
||||
|
||||
@ControllerLog("重置消费位点")
|
||||
@Permission({"group:consumer-detail:min",
|
||||
"group:consumer-detail:last",
|
||||
"group:consumer-detail:timestamp",
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
package com.xuxd.kafka.console.controller;
|
||||
|
||||
import com.xuxd.kafka.console.aspect.annotation.ControllerLog;
|
||||
import com.xuxd.kafka.console.aspect.annotation.Permission;
|
||||
import com.xuxd.kafka.console.beans.QueryMessage;
|
||||
import com.xuxd.kafka.console.beans.ResponseData;
|
||||
@@ -48,18 +49,21 @@ public class MessageController {
|
||||
return messageService.deserializerList();
|
||||
}
|
||||
|
||||
@ControllerLog("在线发送消息")
|
||||
@Permission("message:send")
|
||||
@PostMapping("/send")
|
||||
public Object send(@RequestBody SendMessage message) {
|
||||
return messageService.send(message);
|
||||
}
|
||||
|
||||
@ControllerLog("重新发送消息")
|
||||
@Permission("message:resend")
|
||||
@PostMapping("/resend")
|
||||
public Object resend(@RequestBody SendMessage message) {
|
||||
return messageService.resend(message);
|
||||
}
|
||||
|
||||
@ControllerLog("在线删除消息")
|
||||
@Permission("message:del")
|
||||
@DeleteMapping
|
||||
public Object delete(@RequestBody List<QueryMessage> messages) {
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
package com.xuxd.kafka.console.controller;
|
||||
|
||||
import com.xuxd.kafka.console.aspect.annotation.ControllerLog;
|
||||
import com.xuxd.kafka.console.aspect.annotation.Permission;
|
||||
import com.xuxd.kafka.console.beans.TopicPartition;
|
||||
import com.xuxd.kafka.console.beans.dto.BrokerThrottleDTO;
|
||||
@@ -24,12 +25,14 @@ public class OperationController {
|
||||
@Autowired
|
||||
private OperationService operationService;
|
||||
|
||||
@ControllerLog("同步消费位点")
|
||||
@PostMapping("/sync/consumer/offset")
|
||||
public Object syncConsumerOffset(@RequestBody SyncDataDTO dto) {
|
||||
dto.getProperties().put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, dto.getAddress());
|
||||
return operationService.syncConsumerOffset(dto.getGroupId(), dto.getTopic(), dto.getProperties());
|
||||
}
|
||||
|
||||
@ControllerLog("重新位点对齐")
|
||||
@PostMapping("/sync/min/offset/alignment")
|
||||
public Object minOffsetAlignment(@RequestBody SyncDataDTO dto) {
|
||||
dto.getProperties().put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, dto.getAddress());
|
||||
@@ -41,23 +44,27 @@ public class OperationController {
|
||||
return operationService.getAlignmentList();
|
||||
}
|
||||
|
||||
@ControllerLog("deleteAlignment")
|
||||
@DeleteMapping("/sync/alignment")
|
||||
public Object deleteAlignment(@RequestParam("id") Long id) {
|
||||
return operationService.deleteAlignmentById(id);
|
||||
}
|
||||
|
||||
@ControllerLog("优先副本leader")
|
||||
@Permission({"topic:partition-detail:preferred", "op:replication-preferred"})
|
||||
@PostMapping("/replication/preferred")
|
||||
public Object electPreferredLeader(@RequestBody ReplicationDTO dto) {
|
||||
return operationService.electPreferredLeader(dto.getTopic(), dto.getPartition());
|
||||
}
|
||||
|
||||
@ControllerLog("配置同步限流")
|
||||
@Permission("op:config-throttle")
|
||||
@PostMapping("/broker/throttle")
|
||||
public Object configThrottle(@RequestBody BrokerThrottleDTO dto) {
|
||||
return operationService.configThrottle(dto.getBrokerList(), dto.getUnit().toKb(dto.getThrottle()));
|
||||
}
|
||||
|
||||
@ControllerLog("移除限流配置")
|
||||
@Permission("op:remove-throttle")
|
||||
@DeleteMapping("/broker/throttle")
|
||||
public Object removeThrottle(@RequestBody BrokerThrottleDTO dto) {
|
||||
@@ -70,6 +77,7 @@ public class OperationController {
|
||||
return operationService.currentReassignments();
|
||||
}
|
||||
|
||||
@ControllerLog("取消副本重分配")
|
||||
@Permission("op:replication-update-detail:cancel")
|
||||
@DeleteMapping("/replication/reassignments")
|
||||
public Object cancelReassignment(@RequestBody TopicPartition partition) {
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
package com.xuxd.kafka.console.controller;
|
||||
|
||||
import com.xuxd.kafka.console.aspect.annotation.ControllerLog;
|
||||
import com.xuxd.kafka.console.aspect.annotation.Permission;
|
||||
import com.xuxd.kafka.console.beans.ReplicaAssignment;
|
||||
import com.xuxd.kafka.console.beans.dto.AddPartitionDTO;
|
||||
@@ -39,6 +40,7 @@ public class TopicController {
|
||||
return topicService.getTopicList(topic, TopicType.valueOf(type.toUpperCase()));
|
||||
}
|
||||
|
||||
@ControllerLog("删除topic")
|
||||
@Permission({"topic:batch-del", "topic:del"})
|
||||
@DeleteMapping
|
||||
public Object deleteTopic(@RequestBody List<String> topics) {
|
||||
@@ -51,12 +53,14 @@ public class TopicController {
|
||||
return topicService.getTopicPartitionInfo(topic.trim());
|
||||
}
|
||||
|
||||
@ControllerLog("创建topic")
|
||||
@Permission("topic:add")
|
||||
@PostMapping("/new")
|
||||
public Object createNewTopic(@RequestBody NewTopicDTO topicDTO) {
|
||||
return topicService.createTopic(topicDTO.toNewTopic());
|
||||
}
|
||||
|
||||
@ControllerLog("增加topic分区")
|
||||
@Permission("topic:partition-add")
|
||||
@PostMapping("/partition/new")
|
||||
public Object addPartition(@RequestBody AddPartitionDTO partitionDTO) {
|
||||
@@ -80,12 +84,14 @@ public class TopicController {
|
||||
return topicService.getCurrentReplicaAssignment(topic);
|
||||
}
|
||||
|
||||
@ControllerLog("更新副本")
|
||||
@Permission({"topic:replication-modify", "op:replication-reassign"})
|
||||
@PostMapping("/replica/assignment")
|
||||
public Object updateReplicaAssignment(@RequestBody ReplicaAssignment assignment) {
|
||||
return topicService.updateReplicaAssignment(assignment);
|
||||
}
|
||||
|
||||
@ControllerLog("配置限流")
|
||||
@Permission("topic:replication-sync-throttle")
|
||||
@PostMapping("/replica/throttle")
|
||||
public Object configThrottle(@RequestBody TopicThrottleDTO dto) {
|
||||
|
||||
@@ -3,15 +3,14 @@ package com.xuxd.kafka.console.service.impl;
|
||||
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
|
||||
import com.xuxd.kafka.console.beans.BrokerNode;
|
||||
import com.xuxd.kafka.console.beans.ClusterInfo;
|
||||
import com.xuxd.kafka.console.beans.Credentials;
|
||||
import com.xuxd.kafka.console.beans.ResponseData;
|
||||
import com.xuxd.kafka.console.beans.dos.ClusterInfoDO;
|
||||
import com.xuxd.kafka.console.beans.vo.BrokerApiVersionVO;
|
||||
import com.xuxd.kafka.console.beans.vo.ClusterInfoVO;
|
||||
import com.xuxd.kafka.console.dao.ClusterInfoMapper;
|
||||
import com.xuxd.kafka.console.filter.CredentialsContext;
|
||||
import com.xuxd.kafka.console.service.ClusterService;
|
||||
|
||||
import java.util.*;
|
||||
import java.util.stream.Collectors;
|
||||
import kafka.console.ClusterConsole;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.commons.collections.CollectionUtils;
|
||||
@@ -21,6 +20,9 @@ import org.apache.kafka.common.Node;
|
||||
import org.springframework.beans.factory.ObjectProvider;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import java.util.*;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
* kafka-console-ui.
|
||||
*
|
||||
@@ -36,12 +38,13 @@ public class ClusterServiceImpl implements ClusterService {
|
||||
private final ClusterInfoMapper clusterInfoMapper;
|
||||
|
||||
public ClusterServiceImpl(ObjectProvider<ClusterConsole> clusterConsole,
|
||||
ObjectProvider<ClusterInfoMapper> clusterInfoMapper) {
|
||||
ObjectProvider<ClusterInfoMapper> clusterInfoMapper) {
|
||||
this.clusterConsole = clusterConsole.getIfAvailable();
|
||||
this.clusterInfoMapper = clusterInfoMapper.getIfAvailable();
|
||||
}
|
||||
|
||||
@Override public ResponseData getClusterInfo() {
|
||||
@Override
|
||||
public ResponseData getClusterInfo() {
|
||||
ClusterInfo clusterInfo = clusterConsole.clusterInfo();
|
||||
Set<BrokerNode> nodes = clusterInfo.getNodes();
|
||||
if (nodes == null) {
|
||||
@@ -52,12 +55,22 @@ public class ClusterServiceImpl implements ClusterService {
|
||||
return ResponseData.create().data(clusterInfo).success();
|
||||
}
|
||||
|
||||
@Override public ResponseData getClusterInfoList() {
|
||||
@Override
|
||||
public ResponseData getClusterInfoList() {
|
||||
// 如果开启权限管理,当前用户没有集群切换->集群信息的编辑权限,隐藏集群的属性信息,避免ACL属性暴露出来
|
||||
Credentials credentials = CredentialsContext.get();
|
||||
return ResponseData.create().data(clusterInfoMapper.selectList(null)
|
||||
.stream().map(ClusterInfoVO::from).collect(Collectors.toList())).success();
|
||||
.stream().map(e -> {
|
||||
ClusterInfoVO vo = ClusterInfoVO.from(e);
|
||||
if (credentials != null && credentials.isHideClusterProperty()) {
|
||||
vo.setProperties(Collections.emptyList());
|
||||
}
|
||||
return vo;
|
||||
}).collect(Collectors.toList())).success();
|
||||
}
|
||||
|
||||
@Override public ResponseData addClusterInfo(ClusterInfoDO infoDO) {
|
||||
@Override
|
||||
public ResponseData addClusterInfo(ClusterInfoDO infoDO) {
|
||||
QueryWrapper<ClusterInfoDO> queryWrapper = new QueryWrapper<>();
|
||||
queryWrapper.eq("cluster_name", infoDO.getClusterName());
|
||||
if (clusterInfoMapper.selectCount(queryWrapper) > 0) {
|
||||
@@ -67,12 +80,14 @@ public class ClusterServiceImpl implements ClusterService {
|
||||
return ResponseData.create().success();
|
||||
}
|
||||
|
||||
@Override public ResponseData deleteClusterInfo(Long id) {
|
||||
@Override
|
||||
public ResponseData deleteClusterInfo(Long id) {
|
||||
clusterInfoMapper.deleteById(id);
|
||||
return ResponseData.create().success();
|
||||
}
|
||||
|
||||
@Override public ResponseData updateClusterInfo(ClusterInfoDO infoDO) {
|
||||
@Override
|
||||
public ResponseData updateClusterInfo(ClusterInfoDO infoDO) {
|
||||
if (infoDO.getProperties() == null) {
|
||||
// null 的话不更新,这个是bug,设置为空字符串解决
|
||||
infoDO.setProperties("");
|
||||
@@ -81,7 +96,8 @@ public class ClusterServiceImpl implements ClusterService {
|
||||
return ResponseData.create().success();
|
||||
}
|
||||
|
||||
@Override public ResponseData peekClusterInfo() {
|
||||
@Override
|
||||
public ResponseData peekClusterInfo() {
|
||||
List<ClusterInfoDO> dos = clusterInfoMapper.selectList(null);
|
||||
if (CollectionUtils.isEmpty(dos)) {
|
||||
return ResponseData.create().failed("No Cluster Info.");
|
||||
@@ -89,7 +105,8 @@ public class ClusterServiceImpl implements ClusterService {
|
||||
return ResponseData.create().data(dos.stream().findFirst().map(ClusterInfoVO::from)).success();
|
||||
}
|
||||
|
||||
@Override public ResponseData getBrokerApiVersionInfo() {
|
||||
@Override
|
||||
public ResponseData getBrokerApiVersionInfo() {
|
||||
HashMap<Node, NodeApiVersions> map = clusterConsole.listBrokerVersionInfo();
|
||||
List<BrokerApiVersionVO> list = new ArrayList<>(map.size());
|
||||
map.forEach(((node, versions) -> {
|
||||
|
||||
Reference in New Issue
Block a user