限流配置
This commit is contained in:
@@ -0,0 +1,22 @@
|
||||
package com.xuxd.kafka.console.beans.dto;
|
||||
|
||||
import com.xuxd.kafka.console.beans.enums.ThrottleUnit;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import lombok.Data;
|
||||
|
||||
/**
|
||||
* kafka-console-ui.
|
||||
*
|
||||
* @author xuxd
|
||||
* @date 2021-11-24 19:37:10
|
||||
**/
|
||||
@Data
|
||||
public class BrokerThrottleDTO {
|
||||
|
||||
private List<Integer> brokerList = new ArrayList<>();
|
||||
|
||||
private long throttle;
|
||||
|
||||
private ThrottleUnit unit;
|
||||
}
|
||||
@@ -0,0 +1,18 @@
|
||||
package com.xuxd.kafka.console.beans.enums;
|
||||
|
||||
/**
|
||||
* kafka-console-ui.
|
||||
*
|
||||
* @author xuxd
|
||||
* @date 2021-11-24 19:38:00
|
||||
**/
|
||||
public enum ThrottleUnit {
|
||||
KB, MB;
|
||||
|
||||
public long toKb(long size) {
|
||||
if (this == MB) {
|
||||
return 1024 * size;
|
||||
}
|
||||
return size;
|
||||
}
|
||||
}
|
||||
@@ -1,5 +1,6 @@
|
||||
package com.xuxd.kafka.console.controller;
|
||||
|
||||
import com.xuxd.kafka.console.beans.dto.BrokerThrottleDTO;
|
||||
import com.xuxd.kafka.console.beans.dto.ReplicationDTO;
|
||||
import com.xuxd.kafka.console.beans.dto.SyncDataDTO;
|
||||
import com.xuxd.kafka.console.service.OperationService;
|
||||
@@ -52,4 +53,9 @@ public class OperationController {
|
||||
public Object electPreferredLeader(@RequestBody ReplicationDTO dto) {
|
||||
return operationService.electPreferredLeader(dto.getTopic(), dto.getPartition());
|
||||
}
|
||||
|
||||
@PostMapping("/broker/throttle")
|
||||
public Object configThrottle(@RequestBody BrokerThrottleDTO dto) {
|
||||
return operationService.configThrottle(dto.getBrokerList(), dto.getUnit().toKb(dto.getThrottle()));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package com.xuxd.kafka.console.service;
|
||||
|
||||
import com.xuxd.kafka.console.beans.ResponseData;
|
||||
import java.util.List;
|
||||
import java.util.Properties;
|
||||
|
||||
/**
|
||||
@@ -20,4 +21,6 @@ public interface OperationService {
|
||||
ResponseData deleteAlignmentById(Long id);
|
||||
|
||||
ResponseData electPreferredLeader(String topic, int partition);
|
||||
|
||||
ResponseData configThrottle(List<Integer> brokerList, long size);
|
||||
}
|
||||
|
||||
@@ -123,4 +123,10 @@ public class OperationServiceImpl implements OperationService {
|
||||
|
||||
return (boolean) tuple2._1() ? ResponseData.create().success() : ResponseData.create().failed(tuple2._2());
|
||||
}
|
||||
|
||||
@Override public ResponseData configThrottle(List<Integer> brokerList, long size) {
|
||||
Tuple2<Object, String> tuple2 = operationConsole.modifyInterBrokerThrottle(new HashSet<>(brokerList), size);
|
||||
|
||||
return (boolean) tuple2._1() ? ResponseData.create().success() : ResponseData.create().failed(tuple2._2());
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,14 +1,14 @@
|
||||
package kafka.console
|
||||
|
||||
import java.util.concurrent.TimeUnit
|
||||
import java.util.{Collections, Properties}
|
||||
|
||||
import com.xuxd.kafka.console.config.KafkaConfig
|
||||
import kafka.admin.ReassignPartitionsCommand
|
||||
import org.apache.kafka.clients.admin.ElectLeadersOptions
|
||||
import org.apache.kafka.clients.consumer.KafkaConsumer
|
||||
import org.apache.kafka.common.serialization.ByteArrayDeserializer
|
||||
import org.apache.kafka.common.{ElectionType, TopicPartition}
|
||||
|
||||
import java.util.concurrent.TimeUnit
|
||||
import java.util.{Collections, Properties}
|
||||
import scala.jdk.CollectionConverters.{CollectionHasAsScala, ListHasAsScala, MapHasAsScala, SeqHasAsJava, SetHasAsJava, SetHasAsScala}
|
||||
|
||||
/**
|
||||
@@ -210,4 +210,15 @@ class OperationConsole(config: KafkaConfig, topicConsole: TopicConsole,
|
||||
val topicList = topicConsole.getTopicList(Collections.singleton(topic))
|
||||
topicList.asScala.flatMap(_.partitions().asScala.map(t => new TopicPartition(topic, t.partition()))).toSet.asJava
|
||||
}
|
||||
}
|
||||
|
||||
def modifyInterBrokerThrottle(reassigningBrokers: util.Set[Int],
|
||||
interBrokerThrottle: Long): (Boolean, String) = {
|
||||
withAdminClientAndCatchError(admin => {
|
||||
ReassignPartitionsCommand.modifyInterBrokerThrottle(admin, reassigningBrokers.asScala.toSet, interBrokerThrottle)
|
||||
(true, "")
|
||||
}, e => {
|
||||
log.error("modifyInterBrokerThrottle error.", e)
|
||||
(false, e.getMessage)
|
||||
}).asInstanceOf[(Boolean, String)]
|
||||
}
|
||||
}
|
||||
@@ -198,4 +198,8 @@ export const KafkaOpApi = {
|
||||
url: "/op/replication/preferred",
|
||||
method: "post",
|
||||
},
|
||||
configThrottle: {
|
||||
url: "/op/broker/throttle",
|
||||
method: "post",
|
||||
},
|
||||
};
|
||||
|
||||
157
ui/src/views/op/ConfigThrottle.vue
Normal file
157
ui/src/views/op/ConfigThrottle.vue
Normal file
@@ -0,0 +1,157 @@
|
||||
<template>
|
||||
<a-modal
|
||||
title="限流配置"
|
||||
:visible="show"
|
||||
:width="1000"
|
||||
:mask="false"
|
||||
:maskClosable="false"
|
||||
okText="确认"
|
||||
cancelText="取消"
|
||||
:destroyOnClose="true"
|
||||
@cancel="handleCancel"
|
||||
@ok="ok"
|
||||
>
|
||||
<div>
|
||||
<a-spin :spinning="loading">
|
||||
<a-form
|
||||
:form="form"
|
||||
:label-col="{ span: 5 }"
|
||||
:wrapper-col="{ span: 12 }"
|
||||
>
|
||||
<a-form-item label="Broker">
|
||||
<a-select
|
||||
mode="multiple"
|
||||
option-filter-prop="children"
|
||||
v-decorator="[
|
||||
'brokerList',
|
||||
{
|
||||
initialValue: brokers,
|
||||
rules: [{ required: true, message: '请选择一个broker!' }],
|
||||
},
|
||||
]"
|
||||
placeholder="请选择一个broker"
|
||||
>
|
||||
<a-select-option v-for="v in brokers" :key="v" :value="v">
|
||||
<span v-if="v == -1">全部</span> <span v-else>{{ v }}</span>
|
||||
</a-select-option>
|
||||
</a-select>
|
||||
</a-form-item>
|
||||
<a-form-item label="带宽">
|
||||
<a-input-number
|
||||
:min="1"
|
||||
:max="1024"
|
||||
v-decorator="[
|
||||
'throttle',
|
||||
{
|
||||
initialValue: 1,
|
||||
rules: [{ required: true, message: '输入带宽!' }],
|
||||
},
|
||||
]"
|
||||
/>
|
||||
<a-select default-value="MB" v-model="unit" style="width: 100px">
|
||||
<a-select-option value="MB"> MB/s </a-select-option>
|
||||
<a-select-option value="KB"> KB/s </a-select-option>
|
||||
</a-select>
|
||||
</a-form-item>
|
||||
</a-form>
|
||||
<hr />
|
||||
<div><h4>注意:</h4></div>
|
||||
<ul>
|
||||
<li>该限速带宽,指的是broker之间副本进行同步时占用的带宽</li>
|
||||
<li>该配置是broker级别配置,是针对broker上topic的副本</li>
|
||||
<li>
|
||||
在当前页面对指定broker限流配置后,并不是说设置后该broker上的所有topic副本同步就被限制为当前流速了。这仅仅是速率设置,如果需要对某topic的副本同步进行限流,还需要去
|
||||
Topic->限流 处操作,只有进行限流操作的topic,该限速才会对其生效
|
||||
</li>
|
||||
<li>
|
||||
上面这句话的意思就是,这里只配置topic副本同步的速率,要使这个配置真正在某个topic上生效,还要开启这个topic的限流
|
||||
</li>
|
||||
</ul>
|
||||
<h4>如何检查限流配置是否成功:</h4>
|
||||
kafka的限流速率是通过下面这两项配置的:
|
||||
<ul>
|
||||
<li>leader.replication.throttled.rate</li>
|
||||
<li>follower.replication.throttled.rate</li>
|
||||
</ul>
|
||||
只需通过
|
||||
<strong>集群->属性配置</strong>
|
||||
查看是否存在这两项配置,如果存在便是配置的有限流,值的大小就是速率,单位:kb/s
|
||||
</a-spin>
|
||||
</div>
|
||||
</a-modal>
|
||||
</template>
|
||||
|
||||
<script>
|
||||
import request from "@/utils/request";
|
||||
import { KafkaClusterApi, KafkaOpApi } from "@/utils/api";
|
||||
import notification from "ant-design-vue/lib/notification";
|
||||
|
||||
export default {
|
||||
name: "ConfigThrottle",
|
||||
props: {
|
||||
visible: {
|
||||
type: Boolean,
|
||||
default: false,
|
||||
},
|
||||
},
|
||||
data() {
|
||||
return {
|
||||
show: this.visible,
|
||||
loading: false,
|
||||
form: this.$form.createForm(this, { name: "ConfigThrottleForm" }),
|
||||
brokers: [],
|
||||
unit: "MB",
|
||||
};
|
||||
},
|
||||
watch: {
|
||||
visible(v) {
|
||||
this.show = v;
|
||||
if (this.show) {
|
||||
this.getClusterInfo();
|
||||
}
|
||||
},
|
||||
},
|
||||
methods: {
|
||||
handleCancel() {
|
||||
this.$emit("closeConfigThrottleDialog", { refresh: false });
|
||||
},
|
||||
getClusterInfo() {
|
||||
this.loading = true;
|
||||
request({
|
||||
url: KafkaClusterApi.getClusterInfo.url,
|
||||
method: KafkaClusterApi.getClusterInfo.method,
|
||||
}).then((res) => {
|
||||
this.loading = false;
|
||||
this.brokers = [];
|
||||
res.data.nodes.forEach((node) => this.brokers.push(node.id));
|
||||
});
|
||||
},
|
||||
ok() {
|
||||
this.form.validateFields((err, values) => {
|
||||
if (!err) {
|
||||
const data = Object.assign({}, values, { unit: this.unit });
|
||||
this.loading = true;
|
||||
request({
|
||||
url: KafkaOpApi.configThrottle.url,
|
||||
method: KafkaOpApi.configThrottle.method,
|
||||
data: data,
|
||||
}).then((res) => {
|
||||
this.loading = false;
|
||||
if (res.code == 0) {
|
||||
this.$message.success(res.msg);
|
||||
this.$emit("closeConfigThrottleDialog", { refresh: false });
|
||||
} else {
|
||||
notification.error({
|
||||
message: "error",
|
||||
description: res.msg,
|
||||
});
|
||||
}
|
||||
});
|
||||
}
|
||||
});
|
||||
},
|
||||
},
|
||||
};
|
||||
</script>
|
||||
|
||||
<style scoped></style>
|
||||
@@ -3,7 +3,9 @@
|
||||
<div class="content-module">
|
||||
<a-card title="Broker管理" style="width: 100%; text-align: left">
|
||||
<p>
|
||||
<a-button type="primary"> 配置限流 </a-button>
|
||||
<a-button type="primary" @click="openConfigThrottleDialog">
|
||||
配置限流
|
||||
</a-button>
|
||||
<label>说明:</label>
|
||||
<span
|
||||
>设置指定broker上的topic的副本之间数据同步占用的带宽,这个设置是broker级别的,但是设置后还要去对应的topic上进行限流配置,指定对这个topic的相关副本进行限制</span
|
||||
@@ -87,6 +89,11 @@
|
||||
@closeDataSyncSchemeDialog="closeDataSyncSchemeDialog"
|
||||
>
|
||||
</DataSyncScheme>
|
||||
<ConfigThrottle
|
||||
:visible="brokerManager.showConfigThrottleDialog"
|
||||
@closeConfigThrottleDialog="closeConfigThrottleDialog"
|
||||
>
|
||||
</ConfigThrottle>
|
||||
</div>
|
||||
</template>
|
||||
|
||||
@@ -96,6 +103,7 @@ import MinOffsetAlignment from "@/views/op/MinOffsetAlignment";
|
||||
import OffsetAlignmentTable from "@/views/op/OffsetAlignmentTable";
|
||||
import ElectPreferredLeader from "@/views/op/ElectPreferredLeader";
|
||||
import DataSyncScheme from "@/views/op/DataSyncScheme";
|
||||
import ConfigThrottle from "@/views/op/ConfigThrottle";
|
||||
export default {
|
||||
name: "Operation",
|
||||
components: {
|
||||
@@ -104,6 +112,7 @@ export default {
|
||||
OffsetAlignmentTable,
|
||||
ElectPreferredLeader,
|
||||
DataSyncScheme,
|
||||
ConfigThrottle,
|
||||
},
|
||||
data() {
|
||||
return {
|
||||
@@ -116,6 +125,9 @@ export default {
|
||||
replicationManager: {
|
||||
showElectPreferredLeaderDialog: false,
|
||||
},
|
||||
brokerManager: {
|
||||
showConfigThrottleDialog: false,
|
||||
},
|
||||
};
|
||||
},
|
||||
methods: {
|
||||
@@ -149,6 +161,12 @@ export default {
|
||||
closeElectPreferredLeaderDialog() {
|
||||
this.replicationManager.showElectPreferredLeaderDialog = false;
|
||||
},
|
||||
openConfigThrottleDialog() {
|
||||
this.brokerManager.showConfigThrottleDialog = true;
|
||||
},
|
||||
closeConfigThrottleDialog() {
|
||||
this.brokerManager.showConfigThrottleDialog = false;
|
||||
},
|
||||
},
|
||||
};
|
||||
</script>
|
||||
|
||||
Reference in New Issue
Block a user