解除限流速率配置

This commit is contained in:
许晓东
2021-11-25 10:55:37 +08:00
parent 1b028fcb4f
commit 73fed3face
9 changed files with 182 additions and 15 deletions

View File

@@ -33,8 +33,8 @@ public class TopicPartitionVO {
TopicPartitionVO partitionVO = new TopicPartitionVO();
partitionVO.setPartition(partitionInfo.partition());
partitionVO.setLeader(partitionInfo.leader().toString());
partitionVO.setReplicas(partitionInfo.replicas().stream().map(Node::toString).collect(Collectors.toList()));
partitionVO.setIsr(partitionInfo.isr().stream().map(Node::toString).collect(Collectors.toList()));
partitionVO.setReplicas(partitionInfo.replicas().stream().map(node -> node.host() + ":" + node.port() + " (id: " + node.idString() + ")").collect(Collectors.toList()));
partitionVO.setIsr(partitionInfo.isr().stream().map(Node::idString).collect(Collectors.toList()));
return partitionVO;
}
}

View File

@@ -58,4 +58,9 @@ public class OperationController {
public Object configThrottle(@RequestBody BrokerThrottleDTO dto) {
return operationService.configThrottle(dto.getBrokerList(), dto.getUnit().toKb(dto.getThrottle()));
}
@DeleteMapping("/broker/throttle")
public Object removeThrottle(@RequestBody BrokerThrottleDTO dto) {
return operationService.removeThrottle(dto.getBrokerList());
}
}

View File

@@ -23,4 +23,6 @@ public interface OperationService {
ResponseData electPreferredLeader(String topic, int partition);
ResponseData configThrottle(List<Integer> brokerList, long size);
ResponseData removeThrottle(List<Integer> brokerList);
}

View File

@@ -129,4 +129,10 @@ public class OperationServiceImpl implements OperationService {
return (boolean) tuple2._1() ? ResponseData.create().success() : ResponseData.create().failed(tuple2._2());
}
@Override public ResponseData removeThrottle(List<Integer> brokerList) {
Tuple2<Object, String> tuple2 = operationConsole.clearBrokerLevelThrottles(new HashSet<>(brokerList));
return (boolean) tuple2._1() ? ResponseData.create().success() : ResponseData.create().failed(tuple2._2());
}
}

View File

@@ -221,4 +221,14 @@ class OperationConsole(config: KafkaConfig, topicConsole: TopicConsole,
(false, e.getMessage)
}).asInstanceOf[(Boolean, String)]
}
def clearBrokerLevelThrottles(brokers: util.Set[Int]): (Boolean, String) = {
withAdminClientAndCatchError(admin => {
ReassignPartitionsCommand.clearBrokerLevelThrottles(admin, brokers.asScala.toSet)
(true, "")
}, e => {
log.error("clearBrokerLevelThrottles error.", e)
(false, e.getMessage)
}).asInstanceOf[(Boolean, String)]
}
}

View File

@@ -202,4 +202,8 @@ export const KafkaOpApi = {
url: "/op/broker/throttle",
method: "post",
},
removeThrottle: {
url: "/op/broker/throttle",
method: "delete",
},
};

View File

@@ -12,7 +12,9 @@
>
</p>
<p>
<a-button type="primary"> 解除限流 </a-button>
<a-button type="primary" @click="openRemoveThrottleDialog">
解除限流
</a-button>
<label>说明</label>
<span>解除指定broker上的topic副本之间数据同步占用的带宽限制</span>
</p>
@@ -94,6 +96,11 @@
@closeConfigThrottleDialog="closeConfigThrottleDialog"
>
</ConfigThrottle>
<RemoveThrottle
:visible="brokerManager.showRemoveThrottleDialog"
@closeRemoveThrottleDialog="closeRemoveThrottleDialog"
>
</RemoveThrottle>
</div>
</template>
@@ -104,6 +111,7 @@ import OffsetAlignmentTable from "@/views/op/OffsetAlignmentTable";
import ElectPreferredLeader from "@/views/op/ElectPreferredLeader";
import DataSyncScheme from "@/views/op/DataSyncScheme";
import ConfigThrottle from "@/views/op/ConfigThrottle";
import RemoveThrottle from "@/views/op/RemoveThrottle";
export default {
name: "Operation",
components: {
@@ -113,6 +121,7 @@ export default {
ElectPreferredLeader,
DataSyncScheme,
ConfigThrottle,
RemoveThrottle,
},
data() {
return {
@@ -127,6 +136,7 @@ export default {
},
brokerManager: {
showConfigThrottleDialog: false,
showRemoveThrottleDialog: false,
},
};
},
@@ -167,6 +177,12 @@ export default {
closeConfigThrottleDialog() {
this.brokerManager.showConfigThrottleDialog = false;
},
openRemoveThrottleDialog() {
this.brokerManager.showRemoveThrottleDialog = true;
},
closeRemoveThrottleDialog() {
this.brokerManager.showRemoveThrottleDialog = false;
},
},
};
</script>

View File

@@ -0,0 +1,127 @@
<template>
<a-modal
title="解除限流"
:visible="show"
:width="1000"
:mask="false"
:maskClosable="false"
okText="确认"
cancelText="取消"
:destroyOnClose="true"
@cancel="handleCancel"
@ok="ok"
>
<div>
<a-spin :spinning="loading">
<a-form
:form="form"
:label-col="{ span: 5 }"
:wrapper-col="{ span: 12 }"
>
<a-form-item label="Broker">
<a-select
mode="multiple"
option-filter-prop="children"
v-decorator="[
'brokerList',
{
initialValue: brokers,
rules: [{ required: true, message: '请选择一个broker!' }],
},
]"
placeholder="请选择一个broker"
>
<a-select-option v-for="v in brokers" :key="v" :value="v">
<span v-if="v == -1">全部</span> <span v-else>{{ v }}</span>
</a-select-option>
</a-select>
</a-form-item>
</a-form>
<hr />
<h4>如何检查是否配置的有限流速率:</h4>
kafka的限流速率是通过下面这两项配置的
<ul>
<li>leader.replication.throttled.rate</li>
<li>follower.replication.throttled.rate</li>
</ul>
只需通过
<strong>集群->属性配置</strong>
查看是否存在这两项配置如果不存在便是没有配置限流速率。如果未配置限流速率即使指定某个topic的分区副本进行限流没有速率也不限流。
</a-spin>
</div>
</a-modal>
</template>
<script>
import request from "@/utils/request";
import { KafkaClusterApi, KafkaOpApi } from "@/utils/api";
import notification from "ant-design-vue/lib/notification";
export default {
name: "RemoveThrottle",
props: {
visible: {
type: Boolean,
default: false,
},
},
data() {
return {
show: this.visible,
loading: false,
form: this.$form.createForm(this, { name: "RemoveThrottleForm" }),
brokers: [],
};
},
watch: {
visible(v) {
this.show = v;
if (this.show) {
this.getClusterInfo();
}
},
},
methods: {
handleCancel() {
this.$emit("closeRemoveThrottleDialog", { refresh: false });
},
getClusterInfo() {
this.loading = true;
request({
url: KafkaClusterApi.getClusterInfo.url,
method: KafkaClusterApi.getClusterInfo.method,
}).then((res) => {
this.loading = false;
this.brokers = [];
res.data.nodes.forEach((node) => this.brokers.push(node.id));
});
},
ok() {
this.form.validateFields((err, values) => {
if (!err) {
const data = Object.assign({}, values);
this.loading = true;
request({
url: KafkaOpApi.removeThrottle.url,
method: KafkaOpApi.removeThrottle.method,
data: data,
}).then((res) => {
this.loading = false;
if (res.code == 0) {
this.$message.success(res.msg);
this.$emit("closeRemoveThrottleDialog", { refresh: false });
} else {
notification.error({
message: "error",
description: res.msg,
});
}
});
}
});
},
},
};
</script>
<style scoped></style>

View File

@@ -12,6 +12,7 @@
<div>
<a-spin :spinning="loading">
<a-table
bordered
:columns="columns"
:data-source="data"
:rowKey="
@@ -21,19 +22,15 @@
"
>
<ul slot="replicas" slot-scope="text">
<ol v-for="i in text" :key="i">
{{
i
}}
</ol>
</ul>
<ul slot="isr" slot-scope="text">
<ol v-for="i in text" :key="i">
{{
i
}}
</ol>
<li v-for="i in text" :key="i">
{{ i }}
</li>
</ul>
<div slot="isr" slot-scope="text">
<span v-for="i in text" :key="i">
{{ i }}
</span>
</div>
<div slot="operation" slot-scope="record" v-show="!record.internal">
<a-popconfirm
:title="