diff --git a/src/main/java/com/xuxd/kafka/console/beans/dos/ClusterInfoDO.java b/src/main/java/com/xuxd/kafka/console/beans/dos/ClusterInfoDO.java new file mode 100644 index 0000000..8016915 --- /dev/null +++ b/src/main/java/com/xuxd/kafka/console/beans/dos/ClusterInfoDO.java @@ -0,0 +1,28 @@ +package com.xuxd.kafka.console.beans.dos; + +import com.baomidou.mybatisplus.annotation.IdType; +import com.baomidou.mybatisplus.annotation.TableId; +import com.baomidou.mybatisplus.annotation.TableName; +import lombok.Data; + +/** + * kafka-console-ui. + * + * @author xuxd + * @date 2021-12-31 09:54:24 + **/ +@Data +@TableName("t_cluster_info") +public class ClusterInfoDO { + + @TableId(type = IdType.AUTO) + private Long id; + + private String clusterName; + + private String address; + + private String properties; + + private String updateTime; +} diff --git a/src/main/java/com/xuxd/kafka/console/boot/Bootstrap.java b/src/main/java/com/xuxd/kafka/console/boot/Bootstrap.java new file mode 100644 index 0000000..d2e4084 --- /dev/null +++ b/src/main/java/com/xuxd/kafka/console/boot/Bootstrap.java @@ -0,0 +1,65 @@ +package com.xuxd.kafka.console.boot; + +import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; +import com.xuxd.kafka.console.beans.dos.ClusterInfoDO; +import com.xuxd.kafka.console.config.KafkaConfig; +import com.xuxd.kafka.console.dao.ClusterInfoMapper; +import com.xuxd.kafka.console.utils.ConvertUtil; +import java.util.List; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.lang3.StringUtils; +import org.springframework.beans.factory.ObjectProvider; +import org.springframework.beans.factory.SmartInitializingSingleton; +import org.springframework.stereotype.Component; + +/** + * kafka-console-ui. + * + * @author xuxd + * @date 2021-12-30 19:16:50 + **/ +@Slf4j +@Component +public class Bootstrap implements SmartInitializingSingleton { + + public static final String DEFAULT_CLUSTER_NAME = "default"; + + private final KafkaConfig config; + + private final ClusterInfoMapper clusterInfoMapper; + + public Bootstrap(KafkaConfig config, ObjectProvider clusterInfoMapper) { + this.config = config; + this.clusterInfoMapper = clusterInfoMapper.getIfAvailable(); + } + + private void initialize() { + loadDefaultClusterConfig(); + } + + private void loadDefaultClusterConfig() { + log.info("load default kafka config."); + if (StringUtils.isBlank(config.getBootstrapServer())) { + return; + } + + QueryWrapper clusterInfoDOQueryWrapper = new QueryWrapper<>(); + clusterInfoDOQueryWrapper.eq("cluster_name", DEFAULT_CLUSTER_NAME); + List objects = clusterInfoMapper.selectObjs(clusterInfoDOQueryWrapper); + if (CollectionUtils.isNotEmpty(objects)) { + log.warn("default kafka cluster config has existed[any of cluster name or address]."); + return; + } + + ClusterInfoDO infoDO = new ClusterInfoDO(); + infoDO.setClusterName(DEFAULT_CLUSTER_NAME); + infoDO.setAddress(config.getBootstrapServer().trim()); + infoDO.setProperties(ConvertUtil.toJsonString(config.getProperties())); + clusterInfoMapper.insert(infoDO); + } + + @Override public void afterSingletonsInstantiated() { + initialize(); + } +} diff --git a/src/main/java/com/xuxd/kafka/console/config/ContextConfig.java b/src/main/java/com/xuxd/kafka/console/config/ContextConfig.java new file mode 100644 index 0000000..9b9a030 --- /dev/null +++ b/src/main/java/com/xuxd/kafka/console/config/ContextConfig.java @@ -0,0 +1,52 @@ +package com.xuxd.kafka.console.config; + +import java.util.Properties; + +/** + * kafka-console-ui. + * + * @author xuxd + * @date 2021-12-30 15:46:55 + **/ +public class ContextConfig { + + public static final int DEFAULT_REQUEST_TIMEOUT_MS = 5000; + + private String bootstrapServer; + + private int requestTimeoutMs = DEFAULT_REQUEST_TIMEOUT_MS; + + private Properties properties = new Properties(); + + public String getBootstrapServer() { + return bootstrapServer; + } + + public void setBootstrapServer(String bootstrapServer) { + this.bootstrapServer = bootstrapServer; + } + + public int getRequestTimeoutMs() { + return requestTimeoutMs; + } + + public void setRequestTimeoutMs(int requestTimeoutMs) { + this.requestTimeoutMs = requestTimeoutMs; + } + + public Properties getProperties() { + return properties; + } + + public void setProperties(Properties properties) { + this.properties = properties; + } + + @Override public String toString() { + return "KafkaContextConfig{" + + "bootstrapServer='" + bootstrapServer + '\'' + + ", requestTimeoutMs=" + requestTimeoutMs + + ", properties=" + properties + + '}'; + } +} diff --git a/src/main/java/com/xuxd/kafka/console/config/ContextConfigHolder.java b/src/main/java/com/xuxd/kafka/console/config/ContextConfigHolder.java new file mode 100644 index 0000000..fe5f8c9 --- /dev/null +++ b/src/main/java/com/xuxd/kafka/console/config/ContextConfigHolder.java @@ -0,0 +1,12 @@ +package com.xuxd.kafka.console.config; + +/** + * kafka-console-ui. + * + * @author xuxd + * @date 2021-12-30 18:55:28 + **/ +public class ContextConfigHolder { + + public static final ThreadLocal CONTEXT_CONFIG = new ThreadLocal<>(); +} diff --git a/src/main/java/com/xuxd/kafka/console/config/KafkaConfig.java b/src/main/java/com/xuxd/kafka/console/config/KafkaConfig.java index 147048a..bded9ff 100644 --- a/src/main/java/com/xuxd/kafka/console/config/KafkaConfig.java +++ b/src/main/java/com/xuxd/kafka/console/config/KafkaConfig.java @@ -1,5 +1,6 @@ package com.xuxd.kafka.console.config; +import java.util.Properties; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.context.annotation.Configuration; @@ -33,6 +34,8 @@ public class KafkaConfig { private boolean enableAcl; + private Properties properties; + public String getBootstrapServer() { return bootstrapServer; } @@ -112,4 +115,12 @@ public class KafkaConfig { public void setEnableAcl(boolean enableAcl) { this.enableAcl = enableAcl; } + + public Properties getProperties() { + return properties; + } + + public void setProperties(Properties properties) { + this.properties = properties; + } } diff --git a/src/main/java/com/xuxd/kafka/console/dao/ClusterInfoMapper.java b/src/main/java/com/xuxd/kafka/console/dao/ClusterInfoMapper.java new file mode 100644 index 0000000..fa1e57e --- /dev/null +++ b/src/main/java/com/xuxd/kafka/console/dao/ClusterInfoMapper.java @@ -0,0 +1,13 @@ +package com.xuxd.kafka.console.dao; + +import com.baomidou.mybatisplus.core.mapper.BaseMapper; +import com.xuxd.kafka.console.beans.dos.ClusterInfoDO; + +/** + * kafka-console-ui. + * + * @author xuxd + * @date 2021-12-31 09:58:52 + **/ +public interface ClusterInfoMapper extends BaseMapper { +} diff --git a/src/main/java/com/xuxd/kafka/console/service/ClusterInfoService.java b/src/main/java/com/xuxd/kafka/console/service/ClusterInfoService.java new file mode 100644 index 0000000..5d78ad2 --- /dev/null +++ b/src/main/java/com/xuxd/kafka/console/service/ClusterInfoService.java @@ -0,0 +1,10 @@ +package com.xuxd.kafka.console.service; + +/** + * kafka-console-ui. + * + * @author xuxd + * @date 2021-12-31 11:42:43 + **/ +public interface ClusterInfoService { +} diff --git a/src/main/java/com/xuxd/kafka/console/service/impl/ClusterInfoServiceImpl.java b/src/main/java/com/xuxd/kafka/console/service/impl/ClusterInfoServiceImpl.java new file mode 100644 index 0000000..16d07a7 --- /dev/null +++ b/src/main/java/com/xuxd/kafka/console/service/impl/ClusterInfoServiceImpl.java @@ -0,0 +1,14 @@ +package com.xuxd.kafka.console.service.impl; + +import com.xuxd.kafka.console.service.ClusterInfoService; +import org.springframework.stereotype.Service; + +/** + * kafka-console-ui. + * + * @author xuxd + * @date 2021-12-31 11:42:59 + **/ +@Service +public class ClusterInfoServiceImpl implements ClusterInfoService { +} diff --git a/src/main/java/com/xuxd/kafka/console/utils/ConvertUtil.java b/src/main/java/com/xuxd/kafka/console/utils/ConvertUtil.java index 68ef690..3fc7769 100644 --- a/src/main/java/com/xuxd/kafka/console/utils/ConvertUtil.java +++ b/src/main/java/com/xuxd/kafka/console/utils/ConvertUtil.java @@ -3,7 +3,9 @@ package com.xuxd.kafka.console.utils; import com.google.common.base.Preconditions; import java.util.Arrays; import java.util.HashMap; +import java.util.Iterator; import java.util.Map; +import java.util.Properties; import lombok.extern.slf4j.Slf4j; import org.springframework.util.ClassUtils; @@ -35,6 +37,20 @@ public class ConvertUtil { } }); } + Iterator> iterator = res.entrySet().iterator(); + while (iterator.hasNext()) { + if (iterator.next().getValue() == null) { + iterator.remove(); + } + } return res; } + + public static String toJsonString(Object src) { + return GsonUtil.INSTANCE.get().toJson(src); + } + + public static Properties toProperties(String jsonStr) { + return GsonUtil.INSTANCE.get().fromJson(jsonStr, Properties.class); + } } diff --git a/src/main/resources/db/schema-h2.sql b/src/main/resources/db/schema-h2.sql index c4b3cb7..a85429d 100644 --- a/src/main/resources/db/schema-h2.sql +++ b/src/main/resources/db/schema-h2.sql @@ -1,23 +1,36 @@ -- DROP TABLE IF EXISTS T_KAKFA_USER; +-- kafka ACL启用SASL_SCRAM中的用户 CREATE TABLE IF NOT EXISTS T_KAFKA_USER ( - ID IDENTITY NOT NULL COMMENT '主键ID', - USERNAME VARCHAR(50) NOT NULL DEFAULT '' COMMENT '姓名', - PASSWORD VARCHAR(50) NOT NULL DEFAULT '' COMMENT '年龄', - UPDATE_TIME TIMESTAMP NOT NULL DEFAULT NOW() COMMENT '更新时间', + ID IDENTITY NOT NULL COMMENT '主键ID', + USERNAME VARCHAR(50) NOT NULL DEFAULT '' COMMENT '用户名', + PASSWORD VARCHAR(50) NOT NULL DEFAULT '' COMMENT '密码', + UPDATE_TIME TIMESTAMP NOT NULL DEFAULT NOW() COMMENT '更新时间', PRIMARY KEY (ID), UNIQUE (USERNAME) ); - +-- 消息同步解决方案中使用的位点对齐信息 CREATE TABLE IF NOT EXISTS T_MIN_OFFSET_ALIGNMENT ( - ID IDENTITY NOT NULL COMMENT '主键ID', - GROUP_ID VARCHAR(128) NOT NULL DEFAULT '' COMMENT 'groupId', - TOPIC VARCHAR(128) NOT NULL DEFAULT '' COMMENT 'topic', - THAT_OFFSET VARCHAR(512) NOT NULL DEFAULT '' COMMENT 'min offset for that kafka cluster', - THIS_OFFSET VARCHAR(512) NOT NULL DEFAULT '' COMMENT 'min offset for this kafka cluster', - UPDATE_TIME TIMESTAMP NOT NULL DEFAULT NOW() COMMENT '更新时间', + ID IDENTITY NOT NULL COMMENT '主键ID', + GROUP_ID VARCHAR(128) NOT NULL DEFAULT '' COMMENT 'groupId', + TOPIC VARCHAR(128) NOT NULL DEFAULT '' COMMENT 'topic', + THAT_OFFSET VARCHAR(512) NOT NULL DEFAULT '' COMMENT 'min offset for that kafka cluster', + THIS_OFFSET VARCHAR(512) NOT NULL DEFAULT '' COMMENT 'min offset for this kafka cluster', + UPDATE_TIME TIMESTAMP NOT NULL DEFAULT NOW() COMMENT '更新时间', PRIMARY KEY (ID), UNIQUE (GROUP_ID, TOPIC) +); + +-- 多集群管理,每个集群的配置信息 +CREATE TABLE IF NOT EXISTS T_CLUSTER_INFO +( + ID IDENTITY NOT NULL COMMENT '主键ID', + CLUSTER_NAME VARCHAR(128) NOT NULL DEFAULT '' COMMENT '集群名', + ADDRESS VARCHAR(256) NOT NULL DEFAULT '' COMMENT '集群地址', + PROPERTIES VARCHAR(512) NOT NULL DEFAULT '' COMMENT '集群的其它属性配置', + UPDATE_TIME TIMESTAMP NOT NULL DEFAULT NOW() COMMENT '更新时间', + PRIMARY KEY (ID), + UNIQUE (CLUSTER_NAME) ); \ No newline at end of file