副本重分配-》生成分配计划

This commit is contained in:
许晓东
2022-02-15 20:13:07 +08:00
parent 01c7121ee4
commit dda08a2152
9 changed files with 367 additions and 16 deletions

View File

@@ -0,0 +1,18 @@
package com.xuxd.kafka.console.beans.dto;
import java.util.List;
import lombok.Data;
/**
* kafka-console-ui.
*
* @author xuxd
* @date 2022-02-15 19:08:13
**/
@Data
public class ProposedAssignmentDTO {
private String topic;
private List<Integer> brokers;
}

View File

@@ -2,6 +2,7 @@ package com.xuxd.kafka.console.controller;
import com.xuxd.kafka.console.beans.TopicPartition;
import com.xuxd.kafka.console.beans.dto.BrokerThrottleDTO;
import com.xuxd.kafka.console.beans.dto.ProposedAssignmentDTO;
import com.xuxd.kafka.console.beans.dto.ReplicationDTO;
import com.xuxd.kafka.console.beans.dto.SyncDataDTO;
import com.xuxd.kafka.console.service.OperationService;
@@ -74,4 +75,9 @@ public class OperationController {
public Object cancelReassignment(@RequestBody TopicPartition partition) {
return operationService.cancelReassignment(new org.apache.kafka.common.TopicPartition(partition.getTopic(), partition.getPartition()));
}
@PostMapping("/replication/reassignments/proposed")
public Object proposedAssignments(@RequestBody ProposedAssignmentDTO dto) {
return operationService.proposedAssignments(dto.getTopic(), dto.getBrokers());
}
}

View File

@@ -30,4 +30,6 @@ public interface OperationService {
ResponseData currentReassignments();
ResponseData cancelReassignment(TopicPartition partition);
ResponseData proposedAssignments(String topic, List<Integer> brokerList);
}

View File

@@ -1,6 +1,7 @@
package com.xuxd.kafka.console.service.impl;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.google.common.collect.Lists;
import com.google.gson.Gson;
import com.google.gson.JsonObject;
import com.xuxd.kafka.console.beans.ResponseData;
@@ -10,6 +11,7 @@ import com.xuxd.kafka.console.beans.vo.OffsetAlignmentVO;
import com.xuxd.kafka.console.dao.MinOffsetAlignmentMapper;
import com.xuxd.kafka.console.service.OperationService;
import com.xuxd.kafka.console.utils.GsonUtil;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@@ -19,6 +21,7 @@ import java.util.Properties;
import java.util.Set;
import java.util.stream.Collectors;
import kafka.console.OperationConsole;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.admin.PartitionReassignment;
import org.apache.kafka.common.TopicPartition;
import org.springframework.beans.factory.ObjectProvider;
@@ -162,4 +165,21 @@ public class OperationServiceImpl implements OperationService {
}
return ResponseData.create().success();
}
@Override public ResponseData proposedAssignments(String topic, List<Integer> brokerList) {
Map<String, Object> params = new HashMap<>();
params.put("version", 1);
Map<String, String> topicMap = new HashMap<>(1, 1.0f);
topicMap.put("topic", topic);
params.put("topics", Lists.newArrayList(topicMap));
List<String> list = brokerList.stream().map(String::valueOf).collect(Collectors.toList());
Map<TopicPartition, List<Object>> assignments = operationConsole.proposedAssignments(gson.toJson(params), StringUtils.join(list, ","));
List<CurrentReassignmentVO> res = new ArrayList<>(assignments.size());
assignments.forEach((tp, replicas) -> {
CurrentReassignmentVO vo = new CurrentReassignmentVO(tp.topic(), tp.partition(),
replicas.stream().map(x -> (Integer) x).collect(Collectors.toList()), null, null);
res.add(vo);
});
return ResponseData.create().data(res).success();
}
}

View File

@@ -256,4 +256,20 @@ class OperationConsole(config: KafkaConfig, topicConsole: TopicConsole,
throw e
}).asInstanceOf[util.Map[TopicPartition, Throwable]]
}
def proposedAssignments(reassignmentJson: String,
brokerListString: String): util.Map[TopicPartition, util.List[Int]] = {
withAdminClientAndCatchError(admin => {
val map = ReassignPartitionsCommand.generateAssignment(admin, reassignmentJson, brokerListString, true)._1
val res = new util.HashMap[TopicPartition, util.List[Int]]()
for (tp <- map.keys) {
res.put(tp, map(tp).asJava)
// res.put(tp, map.getOrElse(tp, Seq.empty).asJava)
}
res
}, e => {
log.error("proposedAssignments error.", e)
throw e
})
}.asInstanceOf[util.Map[TopicPartition, util.List[Int]]]
}

View File

@@ -246,6 +246,10 @@ export const KafkaOpApi = {
url: "/op/replication/reassignments",
method: "delete",
},
proposedAssignment: {
url: "/op/replication/reassignments/proposed",
method: "post",
},
};
export const KafkaMessageApi = {
searchByTime: {

View File

@@ -6,22 +6,24 @@
<p></p>
<hr />
<h3>kafka API 版本兼容性</h3>
<a-table
:columns="columns"
:data-source="brokerApiVersionInfo"
bordered
row-key="brokerId"
>
<div slot="operation" slot-scope="record">
<a-button
size="small"
href="javascript:;"
class="operation-btn"
@click="openApiVersionInfoDialog(record)"
>详情
</a-button>
</div>
</a-table>
<a-spin :spinning="apiVersionInfoLoading">
<a-table
:columns="columns"
:data-source="brokerApiVersionInfo"
bordered
row-key="brokerId"
>
<div slot="operation" slot-scope="record">
<a-button
size="small"
href="javascript:;"
class="operation-btn"
@click="openApiVersionInfoDialog(record)"
>详情
</a-button>
</div>
</a-table>
</a-spin>
<VersionInfo
:version-info="apiVersionInfo"
:visible="showApiVersionInfoDialog"
@@ -47,6 +49,7 @@ export default {
brokerApiVersionInfo: [],
showApiVersionInfoDialog: false,
apiVersionInfo: [],
apiVersionInfoLoading: false,
};
},
methods: {
@@ -73,10 +76,12 @@ export default {
});
}
});
this.apiVersionInfoLoading = true;
request({
url: KafkaClusterApi.getBrokerApiVersionInfo.url,
method: KafkaClusterApi.getBrokerApiVersionInfo.method,
}).then((res) => {
this.apiVersionInfoLoading = false;
if (res.code == 0) {
this.brokerApiVersionInfo = res.data;
} else {

View File

@@ -49,6 +49,15 @@
<label>说明</label>
<span>查看正在进行副本变更/重分配的任务或者将其取消</span>
</p>
<p>
<a-button type="primary" @click="openReplicaReassignDialog">
副本重分配
</a-button>
<label>说明</label>
<span
>副本所在节点重新分配打个比方集群有6个节点分区1的3个副本在节点123现在将它们重新分配到345</span
>
</p>
</a-card>
</div>
<!-- 隐藏数据同步相关-->
@@ -125,6 +134,11 @@
:visible="clusterManager.showClusterInfoDialog"
@closeClusterInfoDialog="closeClusterInfoDialog"
></ClusterInfo>
<ReplicaReassign
:visible="replicationManager.showReplicaReassignDialog"
@closeReplicaReassignDialog="closeReplicaReassignDialog"
>
</ReplicaReassign>
</div>
</template>
@@ -138,6 +152,7 @@ import ConfigThrottle from "@/views/op/ConfigThrottle";
import RemoveThrottle from "@/views/op/RemoveThrottle";
import CurrentReassignments from "@/views/op/CurrentReassignments";
import ClusterInfo from "@/views/op/ClusterInfo";
import ReplicaReassign from "@/views/op/ReplicaReassign";
export default {
name: "Operation",
components: {
@@ -150,6 +165,7 @@ export default {
RemoveThrottle,
CurrentReassignments,
ClusterInfo,
ReplicaReassign,
},
data() {
return {
@@ -162,6 +178,7 @@ export default {
replicationManager: {
showElectPreferredLeaderDialog: false,
showCurrentReassignmentsDialog: false,
showReplicaReassignDialog: false,
},
brokerManager: {
showConfigThrottleDialog: false,
@@ -227,6 +244,12 @@ export default {
closeClusterInfoDialog() {
this.clusterManager.showClusterInfoDialog = false;
},
openReplicaReassignDialog() {
this.replicationManager.showReplicaReassignDialog = true;
},
closeReplicaReassignDialog() {
this.replicationManager.showReplicaReassignDialog = false;
},
},
};
</script>

View File

@@ -0,0 +1,257 @@
<template>
<a-modal
title="副本重分配"
:visible="show"
:width="800"
:mask="false"
:destroyOnClose="true"
:footer="null"
:maskClosable="false"
@cancel="handleCancel"
>
<div>
<a-spin :spinning="loading">
<a-form
:form="form"
:label-col="{ span: 5 }"
:wrapper-col="{ span: 12 }"
@submit="handleSubmit"
>
<a-form-item label="Topic">
<a-select
@change="handleTopicChange"
show-search
option-filter-prop="children"
v-decorator="[
'topic',
{ rules: [{ required: true, message: '请选择一个topic!' }] },
]"
placeholder="请选择一个topic"
>
<a-select-option v-for="v in topicList" :key="v" :value="v">
{{ v }}
</a-select-option>
</a-select>
</a-form-item>
<a-form-item label="分配到Broker">
<a-select
mode="multiple"
option-filter-prop="children"
v-decorator="[
'brokers',
{
initialValue: brokers,
rules: [{ required: true, message: '请选择一个broker!' }],
},
]"
placeholder="请选择一个broker"
>
<a-select-option v-for="v in brokers" :key="v" :value="v">
<span v-if="v == -1">全部</span> <span v-else>{{ v }}</span>
</a-select-option>
</a-select>
</a-form-item>
<a-table
bordered
:columns="columns"
:data-source="currentAssignment"
:rowKey="
(record, index) => {
return index;
}
"
>
</a-table>
<a-form-item :wrapper-col="{ span: 12, offset: 5 }">
<a-button type="primary" html-type="submit">
重新生成分配计划
</a-button>
</a-form-item>
</a-form>
<hr />
<h2>新的分配计划</h2>
<a-table
bordered
:columns="columns"
:data-source="proposedAssignmentShow"
:rowKey="
(record, index) => {
return index;
}
"
>
</a-table>
<a-button type="danger"> 更新分配 </a-button>
</a-spin>
</div>
</a-modal>
</template>
<script>
import request from "@/utils/request";
import { KafkaTopicApi, KafkaOpApi, KafkaClusterApi } from "@/utils/api";
import notification from "ant-design-vue/es/notification";
export default {
name: "ReplicaReassign",
props: {
visible: {
type: Boolean,
default: false,
},
},
data() {
return {
show: this.visible,
data: [],
loading: false,
form: this.$form.createForm(this, { name: "ReplicaReassignForm" }),
topicList: [],
partitions: [],
brokers: [],
currentAssignment: [],
proposedAssignment: [],
proposedAssignmentShow: [],
columns,
};
},
watch: {
visible(v) {
this.show = v;
if (this.show) {
this.clearData();
this.getTopicNameList();
this.getClusterInfo();
}
},
},
methods: {
handleSubmit(e) {
e.preventDefault();
this.form.validateFields((err, values) => {
if (!err) {
this.getProposedAssignment(values);
}
});
},
getTopicReplicaInfo(topic) {
this.loading = true;
request({
url: KafkaTopicApi.getCurrentReplicaAssignment.url + "?topic=" + topic,
method: KafkaTopicApi.getCurrentReplicaAssignment.method,
}).then((res) => {
this.loading = false;
if (res.code == 0) {
this.currentAssignment = res.data.partitions;
this.currentAssignment.forEach(
(e) => (e.replicas = e.replicas.join(","))
);
} else {
notification.error({
message: "error",
description: res.msg,
});
}
});
},
getTopicNameList() {
request({
url: KafkaTopicApi.getTopicNameList.url,
method: KafkaTopicApi.getTopicNameList.method,
}).then((res) => {
if (res.code == 0) {
this.topicList = res.data;
} else {
notification.error({
message: "error",
description: res.msg,
});
}
});
},
getPartitionInfo(topic) {
this.loading = true;
request({
url: KafkaTopicApi.getPartitionInfo.url + "?topic=" + topic,
method: KafkaTopicApi.getPartitionInfo.method,
}).then((res) => {
this.loading = false;
if (res.code != 0) {
notification.error({
message: "error",
description: res.msg,
});
} else {
this.partitions = res.data.map((v) => v.partition);
this.partitions.splice(0, 0, -1);
}
});
},
handleTopicChange(topic) {
// this.getPartitionInfo(topic);
this.clearData();
this.getTopicReplicaInfo(topic);
},
getClusterInfo() {
this.loading = true;
request({
url: KafkaClusterApi.getClusterInfo.url,
method: KafkaClusterApi.getClusterInfo.method,
}).then((res) => {
this.loading = false;
this.brokers = [];
res.data.nodes.forEach((node) => this.brokers.push(node.id));
});
},
getProposedAssignment(params) {
this.loading = true;
request({
url: KafkaOpApi.proposedAssignment.url,
method: KafkaOpApi.proposedAssignment.method,
data: params,
}).then((res) => {
this.loading = false;
if (res.code != 0) {
notification.error({
message: "error",
description: res.msg,
});
} else {
this.proposedAssignment = res.data;
this.proposedAssignmentShow = Object.assign(
[],
this.proposedAssignment
);
this.proposedAssignmentShow.forEach(
(e) => (e.replicas = e.replicas.join(","))
);
}
});
},
clearData() {
this.currentAssignment = [];
this.proposedAssignment = [];
this.proposedAssignmentShow = [];
},
handleCancel() {
this.data = [];
this.$emit("closeReplicaReassignDialog", { refresh: false });
},
},
};
const columns = [
{
title: "分区",
dataIndex: "partition",
key: "partition",
},
{
title: "副本所在broker",
dataIndex: "replicas",
key: "replicas",
scopedSlots: { customRender: "replicas" },
},
];
</script>
<style scoped></style>