集群切换

This commit is contained in:
许晓东
2022-01-05 21:19:46 +08:00
parent 6f9676e259
commit f5fb2c4f88
20 changed files with 373 additions and 37 deletions

View File

@@ -3,11 +3,13 @@ package com.xuxd.kafka.console;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.web.servlet.ServletComponentScan;
import org.springframework.scheduling.annotation.EnableScheduling;
@MapperScan("com.xuxd.kafka.console.dao")
@SpringBootApplication
@EnableScheduling
@ServletComponentScan
public class KafkaConsoleUiApplication {
public static void main(String[] args) {

View File

@@ -3,8 +3,10 @@ package com.xuxd.kafka.console.controller;
import com.xuxd.kafka.console.beans.dto.ClusterInfoDTO;
import com.xuxd.kafka.console.service.ClusterService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.DeleteMapping;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.PutMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@@ -27,13 +29,28 @@ public class ClusterController {
return clusterService.getClusterInfo();
}
@GetMapping("/list")
@GetMapping("/info")
public Object getClusterInfoList() {
return clusterService.getClusterInfoList();
}
@PostMapping
@PostMapping("/info")
public Object addClusterInfo(@RequestBody ClusterInfoDTO dto) {
return clusterService.addClusterInfo(dto.to());
}
@DeleteMapping("/info")
public Object deleteClusterInfo(@RequestBody ClusterInfoDTO dto) {
return clusterService.deleteClusterInfo(dto.getId());
}
@PutMapping("/info")
public Object updateClusterInfo(@RequestBody ClusterInfoDTO dto) {
return clusterService.updateClusterInfo(dto.to());
}
@GetMapping("/info/peek")
public Object peekClusterInfo() {
return clusterService.peekClusterInfo();
}
}

View File

@@ -0,0 +1,75 @@
package com.xuxd.kafka.console.interceptor;
import com.xuxd.kafka.console.beans.ResponseData;
import com.xuxd.kafka.console.beans.dos.ClusterInfoDO;
import com.xuxd.kafka.console.config.ContextConfig;
import com.xuxd.kafka.console.config.ContextConfigHolder;
import com.xuxd.kafka.console.dao.ClusterInfoMapper;
import com.xuxd.kafka.console.utils.ConvertUtil;
import java.io.IOException;
import java.util.HashSet;
import java.util.Set;
import javax.servlet.Filter;
import javax.servlet.FilterChain;
import javax.servlet.ServletException;
import javax.servlet.ServletRequest;
import javax.servlet.ServletResponse;
import javax.servlet.annotation.WebFilter;
import javax.servlet.http.HttpServletRequest;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.MediaType;
/**
* kafka-console-ui.
*
* @author xuxd
* @date 2022-01-05 19:56:25
**/
@WebFilter(filterName = "context-set-filter", urlPatterns = {"/*"})
@Slf4j
public class ContextSetFilter implements Filter {
private Set<String> excludes = new HashSet<>();
{
excludes.add("/cluster/info/peek");
excludes.add("/cluster/info");
}
@Autowired
private ClusterInfoMapper clusterInfoMapper;
@Override public void doFilter(ServletRequest req, ServletResponse response,
FilterChain chain) throws IOException, ServletException {
try {
HttpServletRequest request = (HttpServletRequest) req;
String uri = request.getRequestURI();
if (!excludes.contains(uri)) {
String headerId = request.getHeader(Header.ID);
if (StringUtils.isBlank(headerId)) {
ResponseData failed = ResponseData.create().failed("Cluster id is null.");
response.setContentType(MediaType.APPLICATION_JSON_VALUE);
response.getOutputStream().println(ConvertUtil.toJsonString(failed));
return;
} else {
ClusterInfoDO infoDO = clusterInfoMapper.selectById(Long.valueOf(headerId));
ContextConfig config = new ContextConfig();
config.setBootstrapServer(infoDO.getAddress());
config.setProperties(ConvertUtil.toProperties(infoDO.getProperties()));
ContextConfigHolder.CONTEXT_CONFIG.set(config);
}
}
chain.doFilter(req, response);
} finally {
ContextConfigHolder.CONTEXT_CONFIG.remove();
}
}
interface Header {
String ID = "X-Cluster-Info-Id";
String NAME = "X-Cluster-Info-Name";
}
}

View File

@@ -15,4 +15,10 @@ public interface ClusterService {
ResponseData getClusterInfoList();
ResponseData addClusterInfo(ClusterInfoDO infoDO);
ResponseData deleteClusterInfo(Long id);
ResponseData updateClusterInfo(ClusterInfoDO infoDO);
ResponseData peekClusterInfo();
}

View File

@@ -1,13 +1,16 @@
package com.xuxd.kafka.console.service.impl;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.xuxd.kafka.console.beans.ResponseData;
import com.xuxd.kafka.console.beans.dos.ClusterInfoDO;
import com.xuxd.kafka.console.beans.vo.ClusterInfoVO;
import com.xuxd.kafka.console.dao.ClusterInfoMapper;
import com.xuxd.kafka.console.service.ClusterService;
import java.util.List;
import java.util.stream.Collectors;
import kafka.console.ClusterConsole;
import org.springframework.beans.factory.annotation.Autowired;
import org.apache.commons.collections.CollectionUtils;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.stereotype.Service;
/**
@@ -19,11 +22,15 @@ import org.springframework.stereotype.Service;
@Service
public class ClusterServiceImpl implements ClusterService {
@Autowired
private ClusterConsole clusterConsole;
private final ClusterConsole clusterConsole;
@Autowired
private ClusterInfoMapper clusterInfoMapper;
private final ClusterInfoMapper clusterInfoMapper;
public ClusterServiceImpl(ObjectProvider<ClusterConsole> clusterConsole,
ObjectProvider<ClusterInfoMapper> clusterInfoMapper) {
this.clusterConsole = clusterConsole.getIfAvailable();
this.clusterInfoMapper = clusterInfoMapper.getIfAvailable();
}
@Override public ResponseData getClusterInfo() {
return ResponseData.create().data(clusterConsole.clusterInfo()).success();
@@ -35,8 +42,31 @@ public class ClusterServiceImpl implements ClusterService {
}
@Override public ResponseData addClusterInfo(ClusterInfoDO infoDO) {
QueryWrapper<ClusterInfoDO> queryWrapper = new QueryWrapper<>();
queryWrapper.eq("cluster_name", infoDO.getClusterName());
if (clusterInfoMapper.selectCount(queryWrapper) > 0) {
return ResponseData.create().failed("cluster name exist.");
}
clusterInfoMapper.insert(infoDO);
return ResponseData.create().success();
}
@Override public ResponseData deleteClusterInfo(Long id) {
clusterInfoMapper.deleteById(id);
return ResponseData.create().success();
}
@Override public ResponseData updateClusterInfo(ClusterInfoDO infoDO) {
clusterInfoMapper.updateById(infoDO);
return ResponseData.create().success();
}
@Override public ResponseData peekClusterInfo() {
List<ClusterInfoDO> dos = clusterInfoMapper.selectList(null);
if (CollectionUtils.isEmpty(dos)) {
return ResponseData.create().failed("No Cluster Info.");
}
return ResponseData.create().data(dos.stream().findFirst().map(ClusterInfoVO::from)).success();
}
}

View File

@@ -1,6 +1,6 @@
package kafka.console
import com.xuxd.kafka.console.config.KafkaConfig
import com.xuxd.kafka.console.config.{ContextConfigHolder, KafkaConfig}
import kafka.zk.{AdminZkClient, KafkaZkClient}
import org.apache.kafka.clients.CommonClientConfigs
import org.apache.kafka.clients.admin._
@@ -117,8 +117,9 @@ class KafkaConsole(config: KafkaConfig) {
private def getProps(): Properties = {
val props: Properties = new Properties();
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, config.getBootstrapServer)
props.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, config.getRequestTimeoutMs())
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, ContextConfigHolder.CONTEXT_CONFIG.get().getBootstrapServer())
props.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, ContextConfigHolder.CONTEXT_CONFIG.get().getRequestTimeoutMs())
props.putAll(ContextConfigHolder.CONTEXT_CONFIG.get().getProperties())
if (config.isEnableAcl) {
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, config.getSecurityProtocol())
props.put(SaslConfigs.SASL_MECHANISM, config.getSaslMechanism())

View File

@@ -17,13 +17,18 @@
>
<span>|</span
><router-link to="/op-page" class="pad-l-r">运维</router-link>
<span class="right">集群{{ clusterName }}</span>
</div>
<router-view class="content" />
</div>
</template>
<script>
import { KafkaConfigApi } from "@/utils/api";
import { KafkaConfigApi, KafkaClusterApi } from "@/utils/api";
import request from "@/utils/request";
import { mapMutations, mapState } from "vuex";
import { getClusterInfo } from "@/utils/local-cache";
import notification from "ant-design-vue/lib/notification";
import { CLUSTER } from "@/store/mutation-types";
export default {
data() {
@@ -38,6 +43,35 @@ export default {
}).then((res) => {
this.config = res.data;
});
const clusterInfo = getClusterInfo();
if (!clusterInfo) {
request({
url: KafkaClusterApi.peekClusterInfo.url,
method: KafkaClusterApi.peekClusterInfo.method,
}).then((res) => {
if (res.code == 0) {
this.switchCluster(res.data);
} else {
notification.error({
message: "error",
description: res.msg,
});
}
});
} else {
this.switchCluster(clusterInfo);
}
},
computed: {
...mapState({
clusterName: (state) => state.clusterInfo.clusterName,
}),
},
methods: {
...mapMutations({
switchCluster: CLUSTER.SWITCH,
}),
},
};
</script>
@@ -90,4 +124,10 @@ export default {
top: 1%;
position: absolute;
}
.right {
float: right;
right: 1%;
top: 2%;
position: absolute;
}
</style>

View File

@@ -1,11 +1,24 @@
import Vue from "vue";
import Vuex from "vuex";
import { CLUSTER } from "@/store/mutation-types";
import { setClusterInfo } from "@/utils/local-cache";
Vue.use(Vuex);
export default new Vuex.Store({
state: {},
mutations: {},
state: {
clusterInfo: {
id: undefined,
clusterName: undefined,
},
},
mutations: {
[CLUSTER.SWITCH](state, clusterInfo) {
state.clusterInfo.id = clusterInfo.id;
state.clusterInfo.clusterName = clusterInfo.clusterName;
setClusterInfo(clusterInfo);
},
},
actions: {},
modules: {},
});

View File

@@ -0,0 +1,3 @@
export const CLUSTER = {
SWITCH: "switchCluster",
};

View File

@@ -184,13 +184,25 @@ export const KafkaClusterApi = {
method: "get",
},
getClusterInfoList: {
url: "/cluster/list",
url: "/cluster/info",
method: "get",
},
addClusterInfo: {
url: "/cluster",
url: "/cluster/info",
method: "post",
},
deleteClusterInfo: {
url: "/cluster/info",
method: "delete",
},
updateClusterInfo: {
url: "/cluster/info",
method: "put",
},
peekClusterInfo: {
url: "/cluster/info/peek",
method: "get",
},
};
export const KafkaOpApi = {

View File

@@ -1,3 +1,7 @@
export const ConstantEvent = {
updateUserDialogData: "updateUserDialogData",
};
export const Cache = {
clusterInfo: "clusterInfo",
};

View File

@@ -0,0 +1,10 @@
import { Cache } from "@/utils/constants";
export function setClusterInfo(clusterInfo) {
localStorage.setItem(Cache.clusterInfo, JSON.stringify(clusterInfo));
}
export function getClusterInfo() {
const str = localStorage.getItem(Cache.clusterInfo);
return str ? JSON.parse(str) : undefined;
}

View File

@@ -1,6 +1,7 @@
import axios from "axios";
import notification from "ant-design-vue/es/notification";
import { VueAxios } from "./axios";
import { getClusterInfo } from "@/utils/local-cache";
// 创建 axios 实例
const request = axios.create({
@@ -22,10 +23,14 @@ const errorHandler = (error) => {
};
// request interceptor
// request.interceptors.request.use(config => {
//
// return config
// }, errorHandler)
request.interceptors.request.use((config) => {
const clusterInfo = getClusterInfo();
if (clusterInfo) {
config.headers["X-Cluster-Info-Id"] = clusterInfo.id;
config.headers["X-Cluster-Info-Name"] = clusterInfo.clusterName;
}
return config;
}, errorHandler);
// response interceptor
request.interceptors.response.use((response) => {

View File

@@ -11,6 +11,7 @@
// @ is an alias to /src
import request from "@/utils/request";
import { KafkaConfigApi } from "@/utils/api";
import notification from "ant-design-vue/lib/notification";
export default {
name: "Home",
components: {},
@@ -25,7 +26,14 @@ export default {
url: KafkaConfigApi.getConfig.url,
method: KafkaConfigApi.getConfig.method,
}).then((res) => {
this.config = res.data;
if (res.code == 0) {
this.config = res.data;
} else {
notification.error({
message: "error",
description: res.msg,
});
}
});
},
};

View File

@@ -45,6 +45,7 @@
import request from "@/utils/request";
import { KafkaClusterApi } from "@/utils/api";
import BrokerConfig from "@/views/cluster/BrokerConfig";
import notification from "ant-design-vue/lib/notification";
export default {
name: "Topic",
@@ -68,8 +69,15 @@ export default {
method: KafkaClusterApi.getClusterInfo.method,
}).then((res) => {
this.loading = false;
this.data = res.data.nodes;
this.clusterId = res.data.clusterId;
if (res.code == 0) {
this.data = res.data.nodes;
this.clusterId = res.data.clusterId;
} else {
notification.error({
message: "error",
description: res.msg,
});
}
});
},
openBrokerConfigDialog(record, isLoggerConfig) {

View File

@@ -196,7 +196,14 @@ export default {
data: this.queryParam,
}).then((res) => {
this.loading = false;
this.data = res.data.list;
if (res.code == 0) {
this.data = res.data.list;
} else {
notification.error({
message: "error",
description: res.msg,
});
}
});
},
deleteGroup(group) {

View File

@@ -21,7 +21,10 @@
<a-input
v-decorator="[
'clusterName',
{ rules: [{ required: true, message: '输入集群名称!' }] },
{
rules: [{ required: true, message: '输入集群名称!' }],
initialValue: clusterInfo.clusterName,
},
]"
placeholder="输入集群名称"
/>
@@ -30,7 +33,10 @@
<a-input
v-decorator="[
'address',
{ rules: [{ required: true, message: '输入集群地址!' }] },
{
rules: [{ required: true, message: '输入集群地址!' }],
initialValue: clusterInfo.address,
},
]"
placeholder="输入集群地址"
/>
@@ -44,11 +50,14 @@ security-protocol=SASL_PLAINTEXT
sasl-mechanism=SCRAM-SHA-256
sasl-jaas-config=org.apache.kafka.common.security.scram.ScramLoginModule required username="name" password="password";
'
v-decorator="['properties']"
v-decorator="[
'properties',
{ initialValue: clusterInfo.properties },
]"
/>
</a-form-item>
<a-form-item :wrapper-col="{ span: 12, offset: 5 }">
<a-button type="primary" html-type="submit"> 提交 </a-button>
<a-button type="primary" html-type="submit"> 提交</a-button>
</a-form-item>
</a-form>
</a-spin>
@@ -60,6 +69,7 @@ sasl-jaas-config=org.apache.kafka.common.security.scram.ScramLoginModule require
import request from "@/utils/request";
import { KafkaClusterApi } from "@/utils/api";
import notification from "ant-design-vue/es/notification";
export default {
name: "AddClusterInfo",
props: {
@@ -67,6 +77,18 @@ export default {
type: Boolean,
default: false,
},
isModify: {
type: Boolean,
default: false,
},
clusterInfo: {
type: Object,
default: () => defaultInfo,
},
closeDialogEvent: {
type: String,
default: "closeAddClusterInfoDialog",
},
},
data() {
return {
@@ -87,15 +109,21 @@ export default {
this.form.validateFields((err, values) => {
if (!err) {
this.loading = true;
const api = this.isModify
? KafkaClusterApi.updateClusterInfo
: KafkaClusterApi.addClusterInfo;
const data = this.isModify
? Object.assign({}, this.clusterInfo, values)
: Object.assign({}, values);
request({
url: KafkaClusterApi.addClusterInfo.url,
method: KafkaClusterApi.addClusterInfo.method,
data: values,
url: api.url,
method: api.method,
data: data,
}).then((res) => {
this.loading = false;
if (res.code == 0) {
this.$message.success(res.msg);
this.$emit("closeAddClusterInfoDialog", { refresh: true });
this.$emit(this.closeDialogEvent, { refresh: true });
} else {
notification.error({
message: "error",
@@ -108,10 +136,11 @@ export default {
},
handleCancel() {
this.data = [];
this.$emit("closeAddClusterInfoDialog", { refresh: false });
this.$emit(this.closeDialogEvent, { refresh: false });
},
},
};
const defaultInfo = { clusterName: "", address: "", properties: "" };
</script>
<style scoped></style>

View File

@@ -37,9 +37,14 @@
size="small"
href="javascript:;"
class="operation-btn"
@click="switchCluster(record)"
>切换
</a-button>
<a-button size="small" href="javascript:;" class="operation-btn"
<a-button
size="small"
href="javascript:;"
class="operation-btn"
@click="openUpdateClusterInfoDialog(record)"
>编辑
</a-button>
<a-popconfirm
@@ -63,6 +68,15 @@
@closeAddClusterInfoDialog="closeAddClusterInfoDialog"
>
</AddClusterInfo>
<AddClusterInfo
:visible="showUpdateClusterInfoDialog"
closeDialogEvent="closeUpdateClusterInfoDialog"
@closeUpdateClusterInfoDialog="closeUpdateClusterInfoDialog"
:cluster-info="select"
:is-modify="true"
>
</AddClusterInfo>
</a-spin>
</div>
</a-modal>
@@ -72,6 +86,9 @@
import request from "@/utils/request";
import { KafkaClusterApi } from "@/utils/api";
import AddClusterInfo from "@/views/op/AddClusterInfo";
import notification from "ant-design-vue/lib/notification";
import { mapMutations } from "vuex";
import { CLUSTER } from "@/store/mutation-types";
export default {
name: "Cluster",
@@ -89,6 +106,8 @@ export default {
data: [],
loading: false,
showAddClusterInfoDialog: false,
showUpdateClusterInfoDialog: false,
select: {},
};
},
watch: {
@@ -110,7 +129,24 @@ export default {
this.data = res.data;
});
},
deleteClusterInfo() {},
deleteClusterInfo(record) {
request({
url: KafkaClusterApi.deleteClusterInfo.url,
method: KafkaClusterApi.deleteClusterInfo.method,
data: Object.assign({}, { id: record.id }),
}).then((res) => {
this.loading = false;
if (res.code == 0) {
this.$message.success(res.msg);
this.getClusterInfoList();
} else {
notification.error({
message: "error",
description: res.msg,
});
}
});
},
handleCancel() {
this.data = [];
this.$emit("closeClusterInfoDialog", {});
@@ -124,6 +160,27 @@ export default {
this.getClusterInfoList();
}
},
openUpdateClusterInfoDialog(record) {
this.showUpdateClusterInfoDialog = true;
const r = Object.assign({}, record);
if (r.properties) {
let str = "";
r.properties.forEach((e) => {
str = str + e + "\r\n";
});
r.properties = str;
}
this.select = r;
},
closeUpdateClusterInfoDialog(res) {
this.showUpdateClusterInfoDialog = false;
if (res.refresh) {
this.getClusterInfoList();
}
},
...mapMutations({
switchCluster: CLUSTER.SWITCH,
}),
},
};

View File

@@ -7,7 +7,9 @@
集群切换
</a-button>
<label>说明</label>
<span>多集群管理增加删除集群配置切换集群</span>
<span
>多集群管理增加删除集群配置切换选中集群为当前操作集群</span
>
</p>
</a-card>
</div>

View File

@@ -245,8 +245,15 @@ export default {
params: this.queryParam,
}).then((res) => {
this.loading = false;
this.data = res.data;
this.filter();
if (res.code == 0) {
this.data = res.data;
this.filter();
} else {
notification.error({
message: "error",
description: res.msg,
});
}
});
},
deleteTopic(topic) {