add h2 and save user info to db
This commit is contained in:
@@ -1,9 +1,13 @@
|
||||
package com.xuxd.kafka.console;
|
||||
|
||||
import org.mybatis.spring.annotation.MapperScan;
|
||||
import org.springframework.boot.SpringApplication;
|
||||
import org.springframework.boot.autoconfigure.SpringBootApplication;
|
||||
import org.springframework.scheduling.annotation.EnableScheduling;
|
||||
|
||||
@MapperScan("com.xuxd.kafka.console.dao")
|
||||
@SpringBootApplication
|
||||
@EnableScheduling
|
||||
public class KafkaConsoleUiApplication {
|
||||
|
||||
public static void main(String[] args) {
|
||||
|
||||
@@ -0,0 +1,26 @@
|
||||
package com.xuxd.kafka.console.beans.dos;
|
||||
|
||||
import com.baomidou.mybatisplus.annotation.IdType;
|
||||
import com.baomidou.mybatisplus.annotation.TableId;
|
||||
import com.baomidou.mybatisplus.annotation.TableName;
|
||||
import lombok.Data;
|
||||
|
||||
/**
|
||||
* kafka-console-ui.
|
||||
*
|
||||
* @author xuxd
|
||||
* @date 2021-09-06 15:09:23
|
||||
**/
|
||||
@Data
|
||||
@TableName("t_kafka_user")
|
||||
public class KafkaUserDO {
|
||||
|
||||
@TableId(type = IdType.AUTO)
|
||||
private Long id;
|
||||
|
||||
private String username;
|
||||
|
||||
private String password;
|
||||
|
||||
private String updateTime;
|
||||
}
|
||||
19
src/main/java/com/xuxd/kafka/console/config/CronConfig.java
Normal file
19
src/main/java/com/xuxd/kafka/console/config/CronConfig.java
Normal file
@@ -0,0 +1,19 @@
|
||||
package com.xuxd.kafka.console.config;
|
||||
|
||||
import lombok.Data;
|
||||
import org.springframework.boot.context.properties.ConfigurationProperties;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
|
||||
/**
|
||||
* kafka-console-ui.
|
||||
*
|
||||
* @author xuxd
|
||||
* @date 2021-09-06 16:23:30
|
||||
**/
|
||||
@Data
|
||||
@Configuration
|
||||
@ConfigurationProperties(prefix = "cron")
|
||||
public class CronConfig {
|
||||
|
||||
private String clearDirtyUser;
|
||||
}
|
||||
@@ -0,0 +1,15 @@
|
||||
package com.xuxd.kafka.console.dao;
|
||||
|
||||
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
|
||||
import com.xuxd.kafka.console.beans.dos.KafkaUserDO;
|
||||
|
||||
/**
|
||||
* kafka-console-ui.
|
||||
*
|
||||
* @author xuxd
|
||||
* @date 2021-09-06 15:12:19
|
||||
**/
|
||||
public interface KafkaUserMapper extends BaseMapper<KafkaUserDO> {
|
||||
|
||||
|
||||
}
|
||||
@@ -0,0 +1,45 @@
|
||||
package com.xuxd.kafka.console.schedule;
|
||||
|
||||
import com.xuxd.kafka.console.dao.KafkaUserMapper;
|
||||
import java.util.Set;
|
||||
import kafka.console.KafkaConfigConsole;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.scheduling.annotation.Scheduled;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
/**
|
||||
* kafka-console-ui.
|
||||
*
|
||||
* @author xuxd
|
||||
* @date 2021-09-06 16:22:33
|
||||
**/
|
||||
@Slf4j
|
||||
@Component
|
||||
public class KafkaAclSchedule {
|
||||
|
||||
private final KafkaUserMapper userMapper;
|
||||
|
||||
private final KafkaConfigConsole configConsole;
|
||||
|
||||
public KafkaAclSchedule(KafkaUserMapper userMapper, KafkaConfigConsole configConsole) {
|
||||
this.userMapper = userMapper;
|
||||
this.configConsole = configConsole;
|
||||
}
|
||||
|
||||
@Scheduled(cron = "${cron.clear-dirty-user}")
|
||||
public void clearDirtyKafkaUser() {
|
||||
log.info("Start clear dirty data for kafka user from database.");
|
||||
Set<String> userSet = configConsole.getUserList(null);
|
||||
userMapper.selectList(null).forEach(u -> {
|
||||
if (!userSet.contains(u.getUsername())) {
|
||||
log.info("clear user: {} from database.", u.getUsername());
|
||||
try {
|
||||
userMapper.deleteById(u.getId());
|
||||
} catch (Exception e) {
|
||||
log.error("userMapper.deleteById error, user: " + u, e);
|
||||
}
|
||||
}
|
||||
});
|
||||
log.info("Clear end.");
|
||||
}
|
||||
}
|
||||
@@ -4,7 +4,9 @@ import com.xuxd.kafka.console.beans.AclEntry;
|
||||
import com.xuxd.kafka.console.beans.CounterList;
|
||||
import com.xuxd.kafka.console.beans.CounterMap;
|
||||
import com.xuxd.kafka.console.beans.ResponseData;
|
||||
import com.xuxd.kafka.console.beans.dos.KafkaUserDO;
|
||||
import com.xuxd.kafka.console.config.KafkaConfig;
|
||||
import com.xuxd.kafka.console.dao.KafkaUserMapper;
|
||||
import com.xuxd.kafka.console.service.AclService;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
@@ -20,6 +22,7 @@ import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.kafka.clients.admin.UserScramCredentialsDescription;
|
||||
import org.apache.kafka.common.acl.AclBinding;
|
||||
import org.apache.kafka.common.acl.AclOperation;
|
||||
import org.springframework.beans.factory.ObjectProvider;
|
||||
import org.springframework.beans.factory.SmartInitializingSingleton;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Service;
|
||||
@@ -44,6 +47,12 @@ public class AclServiceImpl implements AclService, SmartInitializingSingleton {
|
||||
@Autowired
|
||||
private KafkaConfig kafkaConfig;
|
||||
|
||||
private final KafkaUserMapper kafkaUserMapper;
|
||||
|
||||
public AclServiceImpl(ObjectProvider<KafkaUserMapper> kafkaUserMapper) {
|
||||
this.kafkaUserMapper = kafkaUserMapper.getIfAvailable();
|
||||
}
|
||||
|
||||
@Override public ResponseData<Set<String>> getUserList() {
|
||||
try {
|
||||
return ResponseData.create(Set.class).data(configConsole.getUserList(null)).success();
|
||||
@@ -55,7 +64,24 @@ public class AclServiceImpl implements AclService, SmartInitializingSingleton {
|
||||
|
||||
@Override public ResponseData addOrUpdateUser(String name, String pass) {
|
||||
log.info("add or update user, username: {}, password: {}", name, pass);
|
||||
return configConsole.addOrUpdateUser(name, pass) ? ResponseData.create().success() : ResponseData.create().failed();
|
||||
if (!configConsole.addOrUpdateUser(name, pass)) {
|
||||
log.error("add user to kafka failed.");
|
||||
return ResponseData.create().failed("add user to kafka failed");
|
||||
}
|
||||
// save user info to database.
|
||||
KafkaUserDO userDO = new KafkaUserDO();
|
||||
userDO.setUsername(name);
|
||||
userDO.setPassword(pass);
|
||||
try {
|
||||
Map<String, Object> map = new HashMap<>();
|
||||
map.put("username", name);
|
||||
kafkaUserMapper.deleteByMap(map);
|
||||
kafkaUserMapper.insert(userDO);
|
||||
}catch (Exception e) {
|
||||
log.error("kafkaUserMapper.insert error.", e);
|
||||
return ResponseData.create().failed(e.getMessage());
|
||||
}
|
||||
return ResponseData.create().success();
|
||||
}
|
||||
|
||||
@Override public ResponseData deleteUser(String name) {
|
||||
|
||||
Reference in New Issue
Block a user