取消正在进行的副本重分配,消费组->消费详情增加刷新按钮

This commit is contained in:
许晓东
2021-11-30 19:49:02 +08:00
parent 222ba34702
commit 20535027bf
9 changed files with 288 additions and 5 deletions

View File

@@ -0,0 +1,25 @@
package com.xuxd.kafka.console.beans.vo;
import com.xuxd.kafka.console.beans.TopicPartition;
import java.util.List;
import lombok.Data;
/**
* kafka-console-ui.
*
* @author xuxd
* @date 2021-11-30 16:03:41
**/
@Data
public class CurrentReassignmentVO {
private final String topic;
private final int partition;
private final List<Integer> replicas;
private final List<Integer> addingReplicas;
private final List<Integer> removingReplicas;
}

View File

@@ -1,5 +1,6 @@
package com.xuxd.kafka.console.controller;
import com.xuxd.kafka.console.beans.TopicPartition;
import com.xuxd.kafka.console.beans.dto.BrokerThrottleDTO;
import com.xuxd.kafka.console.beans.dto.ReplicationDTO;
import com.xuxd.kafka.console.beans.dto.SyncDataDTO;
@@ -63,4 +64,14 @@ public class OperationController {
public Object removeThrottle(@RequestBody BrokerThrottleDTO dto) {
return operationService.removeThrottle(dto.getBrokerList());
}
@GetMapping("/replication/reassignments")
public Object currentReassignments() {
return operationService.currentReassignments();
}
@DeleteMapping("/replication/reassignments")
public Object cancelReassignment(@RequestBody TopicPartition partition) {
return operationService.cancelReassignment(new org.apache.kafka.common.TopicPartition(partition.getTopic(), partition.getPartition()));
}
}

View File

@@ -3,6 +3,7 @@ package com.xuxd.kafka.console.service;
import com.xuxd.kafka.console.beans.ResponseData;
import java.util.List;
import java.util.Properties;
import org.apache.kafka.common.TopicPartition;
/**
* kafka-console-ui.
@@ -25,4 +26,8 @@ public interface OperationService {
ResponseData configThrottle(List<Integer> brokerList, long size);
ResponseData removeThrottle(List<Integer> brokerList);
ResponseData currentReassignments();
ResponseData cancelReassignment(TopicPartition partition);
}

View File

@@ -5,17 +5,21 @@ import com.google.gson.Gson;
import com.google.gson.JsonObject;
import com.xuxd.kafka.console.beans.ResponseData;
import com.xuxd.kafka.console.beans.dos.MinOffsetAlignmentDO;
import com.xuxd.kafka.console.beans.vo.CurrentReassignmentVO;
import com.xuxd.kafka.console.beans.vo.OffsetAlignmentVO;
import com.xuxd.kafka.console.dao.MinOffsetAlignmentMapper;
import com.xuxd.kafka.console.service.OperationService;
import com.xuxd.kafka.console.utils.GsonUtil;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.stream.Collectors;
import kafka.console.OperationConsole;
import org.apache.kafka.clients.admin.PartitionReassignment;
import org.apache.kafka.common.TopicPartition;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.beans.factory.annotation.Autowired;
@@ -135,4 +139,27 @@ public class OperationServiceImpl implements OperationService {
return (boolean) tuple2._1() ? ResponseData.create().success() : ResponseData.create().failed(tuple2._2());
}
@Override public ResponseData currentReassignments() {
Map<TopicPartition, PartitionReassignment> reassignmentMap = operationConsole.currentReassignments();
List<CurrentReassignmentVO> vos = reassignmentMap.entrySet().stream().map(entry -> {
TopicPartition partition = entry.getKey();
PartitionReassignment reassignment = entry.getValue();
return new CurrentReassignmentVO(partition.topic(),
partition.partition(), reassignment.replicas(), reassignment.addingReplicas(), reassignment.removingReplicas());
}).collect(Collectors.toList());
return ResponseData.create().data(vos).success();
}
@Override public ResponseData cancelReassignment(TopicPartition partition) {
Map<TopicPartition, Throwable> res = operationConsole.cancelPartitionReassignments(Collections.singleton(partition));
if (!res.isEmpty()) {
StringBuilder sb = new StringBuilder("Failed: ");
res.forEach((p, t) -> {
sb.append(p.toString()).append(": ").append(t.getMessage()).append(System.lineSeparator());
});
return ResponseData.create().failed(sb.toString());
}
return ResponseData.create().success();
}
}

View File

@@ -2,14 +2,14 @@ package kafka.console
import com.xuxd.kafka.console.config.KafkaConfig
import kafka.admin.ReassignPartitionsCommand
import org.apache.kafka.clients.admin.ElectLeadersOptions
import org.apache.kafka.clients.admin.{ElectLeadersOptions, ListPartitionReassignmentsOptions, PartitionReassignment}
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.common.serialization.ByteArrayDeserializer
import org.apache.kafka.common.{ElectionType, TopicPartition}
import java.util.concurrent.TimeUnit
import java.util.{Collections, Properties}
import scala.jdk.CollectionConverters.{CollectionHasAsScala, ListHasAsScala, MapHasAsScala, SeqHasAsJava, SetHasAsJava, SetHasAsScala}
import scala.jdk.CollectionConverters.{CollectionHasAsScala, ListHasAsScala, MapHasAsJava, MapHasAsScala, SeqHasAsJava, SetHasAsJava, SetHasAsScala}
/**
* kafka-console-ui.
@@ -231,4 +231,26 @@ class OperationConsole(config: KafkaConfig, topicConsole: TopicConsole,
(false, e.getMessage)
}).asInstanceOf[(Boolean, String)]
}
/**
* current reassigning is active.
*/
def currentReassignments(): util.Map[TopicPartition, PartitionReassignment] = {
withAdminClientAndCatchError(admin => {
admin.listPartitionReassignments(withTimeoutMs(new ListPartitionReassignmentsOptions)).reassignments().get()
}, e => {
Collections.emptyMap()
log.error("listPartitionReassignments error.", e)
}).asInstanceOf[util.Map[TopicPartition, PartitionReassignment]]
}
def cancelPartitionReassignments(reassignments: util.Set[TopicPartition]): util.Map[TopicPartition, Throwable] = {
withAdminClientAndCatchError(admin => {
val res = ReassignPartitionsCommand.cancelPartitionReassignments(admin, reassignments.asScala.toSet)
res.asJava
}, e => {
log.error("cancelPartitionReassignments error.", e)
throw e
}).asInstanceOf[util.Map[TopicPartition, Throwable]]
}
}

View File

@@ -214,4 +214,12 @@ export const KafkaOpApi = {
url: "/op/broker/throttle",
method: "delete",
},
currentReassignments: {
url: "/op/replication/reassignments",
method: "get",
},
cancelReassignment: {
url: "/op/replication/reassignments",
method: "delete",
},
};

View File

@@ -47,6 +47,15 @@
@click="openResetOffsetByTimeDialog(k)"
>时间戳
</a-button>
<a-button
type="primary"
icon="reload"
size="small"
style="float: right"
@click="getConsumerDetail"
>
刷新
</a-button>
<hr />
<a-table
:columns="columns"

View File

@@ -0,0 +1,166 @@
<template>
<a-modal
title="正在进行副本重分配的分区"
:visible="show"
:width="1200"
:mask="false"
:destroyOnClose="true"
:footer="null"
:maskClosable="false"
@cancel="handleCancel"
>
<div>
<a-spin :spinning="loading">
<a-table
:columns="columns"
:data-source="data"
bordered
:rowKey="(record) => record.topic + record.partition"
>
<div slot="replicas" slot-scope="text">
<span v-for="i in text" :key="i">
{{ i }}
</span>
</div>
<div slot="addingReplicas" slot-scope="text">
<span v-for="i in text" :key="i">
{{ i }}
</span>
</div>
<div slot="removingReplicas" slot-scope="text">
<span v-for="i in text" :key="i">
{{ i }}
</span>
</div>
<div slot="operation" slot-scope="record">
<a-popconfirm
title="取消正在进行的副本重分配任务?"
ok-text="确认"
cancel-text="取消"
@confirm="cancelReassignment(record)"
>
<a-button size="small" href="javascript:;" class="operation-btn"
>取消
</a-button>
</a-popconfirm>
</div>
</a-table>
</a-spin>
</div>
</a-modal>
</template>
<script>
import request from "@/utils/request";
import { KafkaOpApi } from "@/utils/api";
import notification from "ant-design-vue/es/notification";
export default {
name: "CurrentReassignments",
props: {
visible: {
type: Boolean,
default: false,
},
},
data() {
return {
columns: columns,
show: this.visible,
data: [],
loading: false,
};
},
watch: {
visible(v) {
this.show = v;
if (this.show) {
this.currentReassignments();
}
},
},
methods: {
currentReassignments() {
this.loading = true;
const api = KafkaOpApi.currentReassignments;
request({
url: api.url,
method: api.method,
}).then((res) => {
this.loading = false;
if (res.code != 0) {
notification.error({
message: "error",
description: res.msg,
});
} else {
this.data = res.data;
this.yesterday = this.data.yesterday;
this.today = this.data.today;
}
});
},
handleCancel() {
this.$emit("closeCurrentReassignmentsDialog", {});
},
cancelReassignment(record) {
const param = { topic: record.topic, partition: record.partition };
this.loading = true;
const api = KafkaOpApi.cancelReassignment;
request({
url: api.url,
method: api.method,
data: param,
}).then((res) => {
this.loading = false;
if (res.code != 0) {
notification.error({
message: "error",
description: res.msg,
});
} else {
this.currentReassignments();
}
});
},
},
};
const columns = [
{
title: "Topic",
dataIndex: "topic",
key: "topic",
},
{
title: "分区",
dataIndex: "partition",
key: "partition",
},
{
title: "副本",
dataIndex: "replicas",
key: "replicas",
scopedSlots: { customRender: "replicas" },
},
{
title: "正在增加的副本",
dataIndex: "addingReplicas",
key: "addingReplicas",
scopedSlots: { customRender: "addingReplicas" },
},
{
title: "正在移除的副本",
dataIndex: "removingReplicas",
key: "removingReplicas",
scopedSlots: { customRender: "removingReplicas" },
},
{
title: "操作",
key: "operation",
scopedSlots: { customRender: "operation" },
},
];
</script>
<style scoped></style>

View File

@@ -30,7 +30,7 @@
<span>将集群中所有分区leader副本设置为首选副本</span>
</p>
<p>
<a-button type="primary" @click="openReplicaReassignmentDetailDialog">
<a-button type="primary" @click="openCurrentReassignmentsDialog">
副本变更详情
</a-button>
<label>说明</label>
@@ -103,6 +103,10 @@
@closeRemoveThrottleDialog="closeRemoveThrottleDialog"
>
</RemoveThrottle>
<CurrentReassignments
:visible="replicationManager.showCurrentReassignmentsDialog"
@closeCurrentReassignmentsDialog="closeCurrentReassignmentsDialog"
></CurrentReassignments>
</div>
</template>
@@ -114,6 +118,7 @@ import ElectPreferredLeader from "@/views/op/ElectPreferredLeader";
import DataSyncScheme from "@/views/op/DataSyncScheme";
import ConfigThrottle from "@/views/op/ConfigThrottle";
import RemoveThrottle from "@/views/op/RemoveThrottle";
import CurrentReassignments from "@/views/op/CurrentReassignments";
export default {
name: "Operation",
components: {
@@ -124,6 +129,7 @@ export default {
DataSyncScheme,
ConfigThrottle,
RemoveThrottle,
CurrentReassignments,
},
data() {
return {
@@ -135,6 +141,7 @@ export default {
},
replicationManager: {
showElectPreferredLeaderDialog: false,
showCurrentReassignmentsDialog: false,
},
brokerManager: {
showConfigThrottleDialog: false,
@@ -185,8 +192,11 @@ export default {
closeRemoveThrottleDialog() {
this.brokerManager.showRemoveThrottleDialog = false;
},
openReplicaReassignmentDetailDialog() {
this.$message.info("此功能尚不支持,下个版本支持");
openCurrentReassignmentsDialog() {
this.replicationManager.showCurrentReassignmentsDialog = true;
},
closeCurrentReassignmentsDialog() {
this.replicationManager.showCurrentReassignmentsDialog = false;
},
},
};