准备开发多集群支持
This commit is contained in:
@@ -0,0 +1,28 @@
|
|||||||
|
package com.xuxd.kafka.console.beans.dos;
|
||||||
|
|
||||||
|
import com.baomidou.mybatisplus.annotation.IdType;
|
||||||
|
import com.baomidou.mybatisplus.annotation.TableId;
|
||||||
|
import com.baomidou.mybatisplus.annotation.TableName;
|
||||||
|
import lombok.Data;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* kafka-console-ui.
|
||||||
|
*
|
||||||
|
* @author xuxd
|
||||||
|
* @date 2021-12-31 09:54:24
|
||||||
|
**/
|
||||||
|
@Data
|
||||||
|
@TableName("t_cluster_info")
|
||||||
|
public class ClusterInfoDO {
|
||||||
|
|
||||||
|
@TableId(type = IdType.AUTO)
|
||||||
|
private Long id;
|
||||||
|
|
||||||
|
private String clusterName;
|
||||||
|
|
||||||
|
private String address;
|
||||||
|
|
||||||
|
private String properties;
|
||||||
|
|
||||||
|
private String updateTime;
|
||||||
|
}
|
||||||
65
src/main/java/com/xuxd/kafka/console/boot/Bootstrap.java
Normal file
65
src/main/java/com/xuxd/kafka/console/boot/Bootstrap.java
Normal file
@@ -0,0 +1,65 @@
|
|||||||
|
package com.xuxd.kafka.console.boot;
|
||||||
|
|
||||||
|
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
|
||||||
|
import com.xuxd.kafka.console.beans.dos.ClusterInfoDO;
|
||||||
|
import com.xuxd.kafka.console.config.KafkaConfig;
|
||||||
|
import com.xuxd.kafka.console.dao.ClusterInfoMapper;
|
||||||
|
import com.xuxd.kafka.console.utils.ConvertUtil;
|
||||||
|
import java.util.List;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.apache.commons.collections.CollectionUtils;
|
||||||
|
import org.apache.commons.lang3.StringUtils;
|
||||||
|
import org.springframework.beans.factory.ObjectProvider;
|
||||||
|
import org.springframework.beans.factory.SmartInitializingSingleton;
|
||||||
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* kafka-console-ui.
|
||||||
|
*
|
||||||
|
* @author xuxd
|
||||||
|
* @date 2021-12-30 19:16:50
|
||||||
|
**/
|
||||||
|
@Slf4j
|
||||||
|
@Component
|
||||||
|
public class Bootstrap implements SmartInitializingSingleton {
|
||||||
|
|
||||||
|
public static final String DEFAULT_CLUSTER_NAME = "default";
|
||||||
|
|
||||||
|
private final KafkaConfig config;
|
||||||
|
|
||||||
|
private final ClusterInfoMapper clusterInfoMapper;
|
||||||
|
|
||||||
|
public Bootstrap(KafkaConfig config, ObjectProvider<ClusterInfoMapper> clusterInfoMapper) {
|
||||||
|
this.config = config;
|
||||||
|
this.clusterInfoMapper = clusterInfoMapper.getIfAvailable();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void initialize() {
|
||||||
|
loadDefaultClusterConfig();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void loadDefaultClusterConfig() {
|
||||||
|
log.info("load default kafka config.");
|
||||||
|
if (StringUtils.isBlank(config.getBootstrapServer())) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
QueryWrapper<ClusterInfoDO> clusterInfoDOQueryWrapper = new QueryWrapper<>();
|
||||||
|
clusterInfoDOQueryWrapper.eq("cluster_name", DEFAULT_CLUSTER_NAME);
|
||||||
|
List<Object> objects = clusterInfoMapper.selectObjs(clusterInfoDOQueryWrapper);
|
||||||
|
if (CollectionUtils.isNotEmpty(objects)) {
|
||||||
|
log.warn("default kafka cluster config has existed[any of cluster name or address].");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
ClusterInfoDO infoDO = new ClusterInfoDO();
|
||||||
|
infoDO.setClusterName(DEFAULT_CLUSTER_NAME);
|
||||||
|
infoDO.setAddress(config.getBootstrapServer().trim());
|
||||||
|
infoDO.setProperties(ConvertUtil.toJsonString(config.getProperties()));
|
||||||
|
clusterInfoMapper.insert(infoDO);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override public void afterSingletonsInstantiated() {
|
||||||
|
initialize();
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,52 @@
|
|||||||
|
package com.xuxd.kafka.console.config;
|
||||||
|
|
||||||
|
import java.util.Properties;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* kafka-console-ui.
|
||||||
|
*
|
||||||
|
* @author xuxd
|
||||||
|
* @date 2021-12-30 15:46:55
|
||||||
|
**/
|
||||||
|
public class ContextConfig {
|
||||||
|
|
||||||
|
public static final int DEFAULT_REQUEST_TIMEOUT_MS = 5000;
|
||||||
|
|
||||||
|
private String bootstrapServer;
|
||||||
|
|
||||||
|
private int requestTimeoutMs = DEFAULT_REQUEST_TIMEOUT_MS;
|
||||||
|
|
||||||
|
private Properties properties = new Properties();
|
||||||
|
|
||||||
|
public String getBootstrapServer() {
|
||||||
|
return bootstrapServer;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setBootstrapServer(String bootstrapServer) {
|
||||||
|
this.bootstrapServer = bootstrapServer;
|
||||||
|
}
|
||||||
|
|
||||||
|
public int getRequestTimeoutMs() {
|
||||||
|
return requestTimeoutMs;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setRequestTimeoutMs(int requestTimeoutMs) {
|
||||||
|
this.requestTimeoutMs = requestTimeoutMs;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Properties getProperties() {
|
||||||
|
return properties;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setProperties(Properties properties) {
|
||||||
|
this.properties = properties;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override public String toString() {
|
||||||
|
return "KafkaContextConfig{" +
|
||||||
|
"bootstrapServer='" + bootstrapServer + '\'' +
|
||||||
|
", requestTimeoutMs=" + requestTimeoutMs +
|
||||||
|
", properties=" + properties +
|
||||||
|
'}';
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,12 @@
|
|||||||
|
package com.xuxd.kafka.console.config;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* kafka-console-ui.
|
||||||
|
*
|
||||||
|
* @author xuxd
|
||||||
|
* @date 2021-12-30 18:55:28
|
||||||
|
**/
|
||||||
|
public class ContextConfigHolder {
|
||||||
|
|
||||||
|
public static final ThreadLocal<ContextConfig> CONTEXT_CONFIG = new ThreadLocal<>();
|
||||||
|
}
|
||||||
@@ -1,5 +1,6 @@
|
|||||||
package com.xuxd.kafka.console.config;
|
package com.xuxd.kafka.console.config;
|
||||||
|
|
||||||
|
import java.util.Properties;
|
||||||
import org.springframework.boot.context.properties.ConfigurationProperties;
|
import org.springframework.boot.context.properties.ConfigurationProperties;
|
||||||
import org.springframework.context.annotation.Configuration;
|
import org.springframework.context.annotation.Configuration;
|
||||||
|
|
||||||
@@ -33,6 +34,8 @@ public class KafkaConfig {
|
|||||||
|
|
||||||
private boolean enableAcl;
|
private boolean enableAcl;
|
||||||
|
|
||||||
|
private Properties properties;
|
||||||
|
|
||||||
public String getBootstrapServer() {
|
public String getBootstrapServer() {
|
||||||
return bootstrapServer;
|
return bootstrapServer;
|
||||||
}
|
}
|
||||||
@@ -112,4 +115,12 @@ public class KafkaConfig {
|
|||||||
public void setEnableAcl(boolean enableAcl) {
|
public void setEnableAcl(boolean enableAcl) {
|
||||||
this.enableAcl = enableAcl;
|
this.enableAcl = enableAcl;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public Properties getProperties() {
|
||||||
|
return properties;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setProperties(Properties properties) {
|
||||||
|
this.properties = properties;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -0,0 +1,13 @@
|
|||||||
|
package com.xuxd.kafka.console.dao;
|
||||||
|
|
||||||
|
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
|
||||||
|
import com.xuxd.kafka.console.beans.dos.ClusterInfoDO;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* kafka-console-ui.
|
||||||
|
*
|
||||||
|
* @author xuxd
|
||||||
|
* @date 2021-12-31 09:58:52
|
||||||
|
**/
|
||||||
|
public interface ClusterInfoMapper extends BaseMapper<ClusterInfoDO> {
|
||||||
|
}
|
||||||
@@ -0,0 +1,10 @@
|
|||||||
|
package com.xuxd.kafka.console.service;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* kafka-console-ui.
|
||||||
|
*
|
||||||
|
* @author xuxd
|
||||||
|
* @date 2021-12-31 11:42:43
|
||||||
|
**/
|
||||||
|
public interface ClusterInfoService {
|
||||||
|
}
|
||||||
@@ -0,0 +1,14 @@
|
|||||||
|
package com.xuxd.kafka.console.service.impl;
|
||||||
|
|
||||||
|
import com.xuxd.kafka.console.service.ClusterInfoService;
|
||||||
|
import org.springframework.stereotype.Service;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* kafka-console-ui.
|
||||||
|
*
|
||||||
|
* @author xuxd
|
||||||
|
* @date 2021-12-31 11:42:59
|
||||||
|
**/
|
||||||
|
@Service
|
||||||
|
public class ClusterInfoServiceImpl implements ClusterInfoService {
|
||||||
|
}
|
||||||
@@ -3,7 +3,9 @@ package com.xuxd.kafka.console.utils;
|
|||||||
import com.google.common.base.Preconditions;
|
import com.google.common.base.Preconditions;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
|
import java.util.Iterator;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.Properties;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import org.springframework.util.ClassUtils;
|
import org.springframework.util.ClassUtils;
|
||||||
|
|
||||||
@@ -35,6 +37,20 @@ public class ConvertUtil {
|
|||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
Iterator<Map.Entry<String, Object>> iterator = res.entrySet().iterator();
|
||||||
|
while (iterator.hasNext()) {
|
||||||
|
if (iterator.next().getValue() == null) {
|
||||||
|
iterator.remove();
|
||||||
|
}
|
||||||
|
}
|
||||||
return res;
|
return res;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static String toJsonString(Object src) {
|
||||||
|
return GsonUtil.INSTANCE.get().toJson(src);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static Properties toProperties(String jsonStr) {
|
||||||
|
return GsonUtil.INSTANCE.get().fromJson(jsonStr, Properties.class);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,15 +1,16 @@
|
|||||||
-- DROP TABLE IF EXISTS T_KAKFA_USER;
|
-- DROP TABLE IF EXISTS T_KAKFA_USER;
|
||||||
|
|
||||||
|
-- kafka ACL启用SASL_SCRAM中的用户
|
||||||
CREATE TABLE IF NOT EXISTS T_KAFKA_USER
|
CREATE TABLE IF NOT EXISTS T_KAFKA_USER
|
||||||
(
|
(
|
||||||
ID IDENTITY NOT NULL COMMENT '主键ID',
|
ID IDENTITY NOT NULL COMMENT '主键ID',
|
||||||
USERNAME VARCHAR(50) NOT NULL DEFAULT '' COMMENT '姓名',
|
USERNAME VARCHAR(50) NOT NULL DEFAULT '' COMMENT '用户名',
|
||||||
PASSWORD VARCHAR(50) NOT NULL DEFAULT '' COMMENT '年龄',
|
PASSWORD VARCHAR(50) NOT NULL DEFAULT '' COMMENT '密码',
|
||||||
UPDATE_TIME TIMESTAMP NOT NULL DEFAULT NOW() COMMENT '更新时间',
|
UPDATE_TIME TIMESTAMP NOT NULL DEFAULT NOW() COMMENT '更新时间',
|
||||||
PRIMARY KEY (ID),
|
PRIMARY KEY (ID),
|
||||||
UNIQUE (USERNAME)
|
UNIQUE (USERNAME)
|
||||||
);
|
);
|
||||||
|
-- 消息同步解决方案中使用的位点对齐信息
|
||||||
CREATE TABLE IF NOT EXISTS T_MIN_OFFSET_ALIGNMENT
|
CREATE TABLE IF NOT EXISTS T_MIN_OFFSET_ALIGNMENT
|
||||||
(
|
(
|
||||||
ID IDENTITY NOT NULL COMMENT '主键ID',
|
ID IDENTITY NOT NULL COMMENT '主键ID',
|
||||||
@@ -21,3 +22,15 @@ CREATE TABLE IF NOT EXISTS T_MIN_OFFSET_ALIGNMENT
|
|||||||
PRIMARY KEY (ID),
|
PRIMARY KEY (ID),
|
||||||
UNIQUE (GROUP_ID, TOPIC)
|
UNIQUE (GROUP_ID, TOPIC)
|
||||||
);
|
);
|
||||||
|
|
||||||
|
-- 多集群管理,每个集群的配置信息
|
||||||
|
CREATE TABLE IF NOT EXISTS T_CLUSTER_INFO
|
||||||
|
(
|
||||||
|
ID IDENTITY NOT NULL COMMENT '主键ID',
|
||||||
|
CLUSTER_NAME VARCHAR(128) NOT NULL DEFAULT '' COMMENT '集群名',
|
||||||
|
ADDRESS VARCHAR(256) NOT NULL DEFAULT '' COMMENT '集群地址',
|
||||||
|
PROPERTIES VARCHAR(512) NOT NULL DEFAULT '' COMMENT '集群的其它属性配置',
|
||||||
|
UPDATE_TIME TIMESTAMP NOT NULL DEFAULT NOW() COMMENT '更新时间',
|
||||||
|
PRIMARY KEY (ID),
|
||||||
|
UNIQUE (CLUSTER_NAME)
|
||||||
|
);
|
||||||
Reference in New Issue
Block a user