12 Commits

Author SHA1 Message Date
许晓东
20535027bf 取消正在进行的副本重分配,消费组->消费详情增加刷新按钮 2021-11-30 19:49:02 +08:00
许晓东
222ba34702 发送统计 2021-11-30 15:15:47 +08:00
许晓东
39e50a6589 根据时间戳重围消费位点,采用东8区时间 2021-11-29 19:40:46 +08:00
许晓东
e881c58a8f 移除topic 限流 2021-11-27 20:49:46 +08:00
许晓东
34c87997d1 topic 限流 2021-11-27 19:46:06 +08:00
许晓东
4639335a9d polish README.md 2021-11-25 19:07:51 +08:00
许晓东
73fed3face 解除限流速率配置 2021-11-25 10:55:37 +08:00
许晓东
1b028fcb4f 限流配置 2021-11-24 20:57:33 +08:00
许晓东
62569c4454 变更副本 2021-11-23 19:59:52 +08:00
许晓东
a219551802 变更副本信息 2021-11-20 22:35:38 +08:00
许晓东
7a98eb479f 变更副本信息查询 2021-11-19 21:01:11 +08:00
许晓东
405f272fb7 下载地址 2021-11-18 15:13:08 +08:00
37 changed files with 1793 additions and 39 deletions

View File

@@ -3,8 +3,8 @@
为了开发的省事,没有多语言支持,只支持中文展示。
用过rocketmq-console吧前端展示风格跟那个有点类似。
## 安装包下载
* 点击下载:[kafka-console-ui.tar.gz](http://43.128.31.53/kafka-console-ui.tar.gz) 或 [kafka-console-ui.zip](http://43.128.31.53/kafka-console-ui.zip)
* 参考下面的打包部署,下载源码重新打包
* 点击下载:[kafka-console-ui.tar.gz](https://github.com/xxd763795151/kafka-console-ui/releases/download/v1.0.0/kafka-console-ui.tar.gz) 或 [kafka-console-ui.zip](https://github.com/xxd763795151/kafka-console-ui/releases/download/v1.0.0/kafka-console-ui.zip)
* 参考下面的打包部署,下载源码重新打包(最新功能特性)
## 功能支持
* 集群信息
* Topic管理
@@ -76,4 +76,11 @@ sh bin/shutdown.sh
## 前端
前端代码在工程的ui目录下找个前端开发的ide打开进行开发即可。
## 注意
前后分离,直接启动后端如果未编译前端代码是没有前端页面的,可以先打包进行编译`sh package.sh`然后再用idea启动或者前端部分单独启动
前后分离,直接启动后端如果未编译前端代码是没有前端页面的,可以先打包进行编译`sh package.sh`然后再用idea启动或者前端部分单独启动
# 页面示例
如果未启用ACL配置不会显示ACL的菜单页面所以导航栏上没有Acl这一项
![集群](./document/集群.png)
![Topic](./document/Topic.png)
![消费组](./document/消费组.png)
![运维](./document/运维.png)

BIN
document/Topic.png Normal file
View File

Binary file not shown.

After

Width:  |  Height:  |  Size: 42 KiB

View File

Binary file not shown.

Before

Width:  |  Height:  |  Size: 354 KiB

After

Width:  |  Height:  |  Size: 204 KiB

BIN
document/消费组.png Normal file
View File

Binary file not shown.

After

Width:  |  Height:  |  Size: 53 KiB

BIN
document/运维.png Normal file
View File

Binary file not shown.

After

Width:  |  Height:  |  Size: 126 KiB

BIN
document/集群.png Normal file
View File

Binary file not shown.

After

Width:  |  Height:  |  Size: 30 KiB

View File

@@ -10,7 +10,7 @@
</parent>
<groupId>com.xuxd</groupId>
<artifactId>kafka-console-ui</artifactId>
<version>1.0.0</version>
<version>1.0.1</version>
<name>kafka-console-ui</name>
<description>Kafka console manage ui</description>
<properties>

View File

@@ -0,0 +1,29 @@
package com.xuxd.kafka.console.beans;
import java.util.List;
import lombok.Data;
/**
* kafka-console-ui.
*
* @author xuxd
* @date 2021-11-19 17:07:50
**/
@Data
public class ReplicaAssignment {
private long version = 1L;
private List<Partition> partitions;
private long interBrokerThrottle = -1;
@Data
static class Partition {
private String topic;
private int partition;
private List<Integer> replicas;
}
}

View File

@@ -0,0 +1,22 @@
package com.xuxd.kafka.console.beans.dto;
import com.xuxd.kafka.console.beans.enums.ThrottleUnit;
import java.util.ArrayList;
import java.util.List;
import lombok.Data;
/**
* kafka-console-ui.
*
* @author xuxd
* @date 2021-11-24 19:37:10
**/
@Data
public class BrokerThrottleDTO {
private List<Integer> brokerList = new ArrayList<>();
private long throttle;
private ThrottleUnit unit;
}

View File

@@ -0,0 +1,21 @@
package com.xuxd.kafka.console.beans.dto;
import com.xuxd.kafka.console.beans.enums.TopicThrottleSwitch;
import java.util.List;
import lombok.Data;
/**
* kafka-console-ui.
*
* @author xuxd
* @date 2021-11-26 15:33:37
**/
@Data
public class TopicThrottleDTO {
private String topic;
private List<Integer> partitions;
private TopicThrottleSwitch operation;
}

View File

@@ -0,0 +1,18 @@
package com.xuxd.kafka.console.beans.enums;
/**
* kafka-console-ui.
*
* @author xuxd
* @date 2021-11-24 19:38:00
**/
public enum ThrottleUnit {
KB, MB;
public long toKb(long size) {
if (this == MB) {
return 1024 * size;
}
return size;
}
}

View File

@@ -0,0 +1,11 @@
package com.xuxd.kafka.console.beans.enums;
/**
* kafka-console-ui.
*
* @author xuxd
* @date 2021-11-26 15:33:07
**/
public enum TopicThrottleSwitch {
ON,OFF;
}

View File

@@ -0,0 +1,25 @@
package com.xuxd.kafka.console.beans.vo;
import com.xuxd.kafka.console.beans.TopicPartition;
import java.util.List;
import lombok.Data;
/**
* kafka-console-ui.
*
* @author xuxd
* @date 2021-11-30 16:03:41
**/
@Data
public class CurrentReassignmentVO {
private final String topic;
private final int partition;
private final List<Integer> replicas;
private final List<Integer> addingReplicas;
private final List<Integer> removingReplicas;
}

View File

@@ -33,8 +33,8 @@ public class TopicPartitionVO {
TopicPartitionVO partitionVO = new TopicPartitionVO();
partitionVO.setPartition(partitionInfo.partition());
partitionVO.setLeader(partitionInfo.leader().toString());
partitionVO.setReplicas(partitionInfo.replicas().stream().map(Node::toString).collect(Collectors.toList()));
partitionVO.setIsr(partitionInfo.isr().stream().map(Node::toString).collect(Collectors.toList()));
partitionVO.setReplicas(partitionInfo.replicas().stream().map(node -> node.host() + ":" + node.port() + " (id: " + node.idString() + ")").collect(Collectors.toList()));
partitionVO.setIsr(partitionInfo.isr().stream().map(Node::idString).collect(Collectors.toList()));
return partitionVO;
}
}

View File

@@ -1,5 +1,7 @@
package com.xuxd.kafka.console.controller;
import com.xuxd.kafka.console.beans.TopicPartition;
import com.xuxd.kafka.console.beans.dto.BrokerThrottleDTO;
import com.xuxd.kafka.console.beans.dto.ReplicationDTO;
import com.xuxd.kafka.console.beans.dto.SyncDataDTO;
import com.xuxd.kafka.console.service.OperationService;
@@ -52,4 +54,24 @@ public class OperationController {
public Object electPreferredLeader(@RequestBody ReplicationDTO dto) {
return operationService.electPreferredLeader(dto.getTopic(), dto.getPartition());
}
@PostMapping("/broker/throttle")
public Object configThrottle(@RequestBody BrokerThrottleDTO dto) {
return operationService.configThrottle(dto.getBrokerList(), dto.getUnit().toKb(dto.getThrottle()));
}
@DeleteMapping("/broker/throttle")
public Object removeThrottle(@RequestBody BrokerThrottleDTO dto) {
return operationService.removeThrottle(dto.getBrokerList());
}
@GetMapping("/replication/reassignments")
public Object currentReassignments() {
return operationService.currentReassignments();
}
@DeleteMapping("/replication/reassignments")
public Object cancelReassignment(@RequestBody TopicPartition partition) {
return operationService.cancelReassignment(new org.apache.kafka.common.TopicPartition(partition.getTopic(), partition.getPartition()));
}
}

View File

@@ -1,7 +1,9 @@
package com.xuxd.kafka.console.controller;
import com.xuxd.kafka.console.beans.ReplicaAssignment;
import com.xuxd.kafka.console.beans.dto.AddPartitionDTO;
import com.xuxd.kafka.console.beans.dto.NewTopicDTO;
import com.xuxd.kafka.console.beans.dto.TopicThrottleDTO;
import com.xuxd.kafka.console.beans.enums.TopicType;
import com.xuxd.kafka.console.service.TopicService;
import java.util.ArrayList;
@@ -71,4 +73,24 @@ public class TopicController {
return topicService.addPartitions(topic, addNum, assignment);
}
@GetMapping("/replica/assignment")
public Object getCurrentReplicaAssignment(@RequestParam String topic) {
return topicService.getCurrentReplicaAssignment(topic);
}
@PostMapping("/replica/assignment")
public Object updateReplicaAssignment(@RequestBody ReplicaAssignment assignment) {
return topicService.updateReplicaAssignment(assignment);
}
@PostMapping("/replica/throttle")
public Object configThrottle(@RequestBody TopicThrottleDTO dto) {
return topicService.configThrottle(dto.getTopic(), dto.getPartitions(), dto.getOperation());
}
@GetMapping("/send/stats")
public Object sendStats(@RequestParam String topic) {
return topicService.sendStats(topic);
}
}

View File

@@ -1,7 +1,9 @@
package com.xuxd.kafka.console.service;
import com.xuxd.kafka.console.beans.ResponseData;
import java.util.List;
import java.util.Properties;
import org.apache.kafka.common.TopicPartition;
/**
* kafka-console-ui.
@@ -20,4 +22,12 @@ public interface OperationService {
ResponseData deleteAlignmentById(Long id);
ResponseData electPreferredLeader(String topic, int partition);
ResponseData configThrottle(List<Integer> brokerList, long size);
ResponseData removeThrottle(List<Integer> brokerList);
ResponseData currentReassignments();
ResponseData cancelReassignment(TopicPartition partition);
}

View File

@@ -1,10 +1,10 @@
package com.xuxd.kafka.console.service;
import com.xuxd.kafka.console.beans.ReplicaAssignment;
import com.xuxd.kafka.console.beans.ResponseData;
import com.xuxd.kafka.console.beans.enums.TopicThrottleSwitch;
import com.xuxd.kafka.console.beans.enums.TopicType;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.apache.kafka.clients.admin.NewTopic;
/**
@@ -26,4 +26,12 @@ public interface TopicService {
ResponseData createTopic(NewTopic topic);
ResponseData addPartitions(String topic, int addNum, List<List<Integer>> newAssignmentst);
ResponseData getCurrentReplicaAssignment(String topic);
ResponseData updateReplicaAssignment(ReplicaAssignment assignment);
ResponseData configThrottle(String topic, List<Integer> partitions, TopicThrottleSwitch throttleSwitch);
ResponseData sendStats(String topic);
}

View File

@@ -142,7 +142,7 @@ public class ConsumerServiceImpl implements ConsumerService {
@Override public ResponseData resetOffsetByDate(String groupId, String topic, String dateStr) {
long timestamp = -1L;
try {
StringBuilder sb = new StringBuilder(dateStr.replace(" ", "T")).append(".000");
StringBuilder sb = new StringBuilder(dateStr.replace(" ", "T")).append(".000+08:00");//固定为utc+08:00东8区来计算
timestamp = Utils.getDateTime(sb.toString());
} catch (ParseException e) {
throw new IllegalArgumentException(e);

View File

@@ -5,16 +5,21 @@ import com.google.gson.Gson;
import com.google.gson.JsonObject;
import com.xuxd.kafka.console.beans.ResponseData;
import com.xuxd.kafka.console.beans.dos.MinOffsetAlignmentDO;
import com.xuxd.kafka.console.beans.vo.CurrentReassignmentVO;
import com.xuxd.kafka.console.beans.vo.OffsetAlignmentVO;
import com.xuxd.kafka.console.dao.MinOffsetAlignmentMapper;
import com.xuxd.kafka.console.service.OperationService;
import com.xuxd.kafka.console.utils.GsonUtil;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.stream.Collectors;
import kafka.console.OperationConsole;
import org.apache.kafka.clients.admin.PartitionReassignment;
import org.apache.kafka.common.TopicPartition;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.beans.factory.annotation.Autowired;
@@ -30,7 +35,7 @@ import scala.Tuple2;
@Service
public class OperationServiceImpl implements OperationService {
private Gson gson = new Gson();
private Gson gson = GsonUtil.INSTANCE.get();
@Autowired
private OperationConsole operationConsole;
@@ -122,4 +127,39 @@ public class OperationServiceImpl implements OperationService {
return (boolean) tuple2._1() ? ResponseData.create().success() : ResponseData.create().failed(tuple2._2());
}
@Override public ResponseData configThrottle(List<Integer> brokerList, long size) {
Tuple2<Object, String> tuple2 = operationConsole.modifyInterBrokerThrottle(new HashSet<>(brokerList), size);
return (boolean) tuple2._1() ? ResponseData.create().success() : ResponseData.create().failed(tuple2._2());
}
@Override public ResponseData removeThrottle(List<Integer> brokerList) {
Tuple2<Object, String> tuple2 = operationConsole.clearBrokerLevelThrottles(new HashSet<>(brokerList));
return (boolean) tuple2._1() ? ResponseData.create().success() : ResponseData.create().failed(tuple2._2());
}
@Override public ResponseData currentReassignments() {
Map<TopicPartition, PartitionReassignment> reassignmentMap = operationConsole.currentReassignments();
List<CurrentReassignmentVO> vos = reassignmentMap.entrySet().stream().map(entry -> {
TopicPartition partition = entry.getKey();
PartitionReassignment reassignment = entry.getValue();
return new CurrentReassignmentVO(partition.topic(),
partition.partition(), reassignment.replicas(), reassignment.addingReplicas(), reassignment.removingReplicas());
}).collect(Collectors.toList());
return ResponseData.create().data(vos).success();
}
@Override public ResponseData cancelReassignment(TopicPartition partition) {
Map<TopicPartition, Throwable> res = operationConsole.cancelPartitionReassignments(Collections.singleton(partition));
if (!res.isEmpty()) {
StringBuilder sb = new StringBuilder("Failed: ");
res.forEach((p, t) -> {
sb.append(p.toString()).append(": ").append(t.getMessage()).append(System.lineSeparator());
});
return ResponseData.create().failed(sb.toString());
}
return ResponseData.create().success();
}
}

View File

@@ -1,10 +1,15 @@
package com.xuxd.kafka.console.service.impl;
import com.google.gson.Gson;
import com.xuxd.kafka.console.beans.ReplicaAssignment;
import com.xuxd.kafka.console.beans.ResponseData;
import com.xuxd.kafka.console.beans.enums.TopicThrottleSwitch;
import com.xuxd.kafka.console.beans.enums.TopicType;
import com.xuxd.kafka.console.beans.vo.TopicDescriptionVO;
import com.xuxd.kafka.console.beans.vo.TopicPartitionVO;
import com.xuxd.kafka.console.service.TopicService;
import com.xuxd.kafka.console.utils.GsonUtil;
import java.util.Calendar;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
@@ -12,6 +17,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import kafka.console.TopicConsole;
import lombok.extern.slf4j.Slf4j;
@@ -38,6 +44,8 @@ public class TopicServiceImpl implements TopicService {
@Autowired
private TopicConsole topicConsole;
private Gson gson = GsonUtil.INSTANCE.get();
@Override public ResponseData getTopicNameList(boolean internal) {
return ResponseData.create().data(topicConsole.getTopicNameList(internal)).success();
}
@@ -130,4 +138,92 @@ public class TopicServiceImpl implements TopicService {
return success ? ResponseData.create().success() : ResponseData.create().failed(tuple2._2());
}
@Override public ResponseData getCurrentReplicaAssignment(String topic) {
Tuple2<Object, String> tuple2 = topicConsole.getCurrentReplicaAssignmentJson(topic);
boolean success = (boolean) tuple2._1();
return success ? ResponseData.create().data(gson.fromJson(tuple2._2(), ReplicaAssignment.class)).success() : ResponseData.create().failed(tuple2._2());
}
@Override public ResponseData updateReplicaAssignment(ReplicaAssignment assignment) {
Tuple2<Object, String> tuple2 = topicConsole.updateReplicas(gson.toJson(assignment), assignment.getInterBrokerThrottle());
boolean success = (boolean) tuple2._1();
return success ? ResponseData.create().success() : ResponseData.create().failed(tuple2._2());
}
@Override
public ResponseData configThrottle(String topic, List<Integer> partitions, TopicThrottleSwitch throttleSwitch) {
Tuple2<Object, String> tuple2 = null;
switch (throttleSwitch) {
case ON:
tuple2 = topicConsole.configThrottle(topic, partitions);
break;
case OFF:
tuple2 = topicConsole.clearThrottle(topic);
break;
default:
throw new IllegalArgumentException("switch is unknown.");
}
boolean success = (boolean) tuple2._1();
return success ? ResponseData.create().success() : ResponseData.create().failed(tuple2._2());
}
@Override public ResponseData sendStats(String topic) {
Calendar calendar = Calendar.getInstance();
long current = calendar.getTimeInMillis();
calendar.set(Calendar.HOUR_OF_DAY, 0);
calendar.set(Calendar.MINUTE, 0);
calendar.set(Calendar.SECOND, 0);
calendar.set(Calendar.MILLISECOND, 0);
long today = calendar.getTimeInMillis();
calendar.add(Calendar.DAY_OF_MONTH, -1);
long yesterday = calendar.getTimeInMillis();
Map<TopicPartition, Long> currentOffset = topicConsole.getOffsetForTimestamp(topic, current);
Map<TopicPartition, Long> todayOffset = topicConsole.getOffsetForTimestamp(topic, today);
Map<TopicPartition, Long> yesterdayOffset = topicConsole.getOffsetForTimestamp(topic, yesterday);
Map<String, Object> res = new HashMap<>();
// 昨天的消息数是今天减去昨天的
AtomicLong yesterdayTotal = new AtomicLong(0L), todayTotal = new AtomicLong(0L);
Map<Integer, Long> yesterdayDetail = new HashMap<>(), todayDetail = new HashMap<>();
todayOffset.forEach(((partition, aLong) -> {
Long last = yesterdayOffset.get(partition);
long diff = last == null ? aLong : aLong - last;
yesterdayDetail.put(partition.partition(), diff);
yesterdayTotal.addAndGet(diff);
}));
currentOffset.forEach(((partition, aLong) -> {
Long last = todayOffset.get(partition);
long diff = last == null ? aLong : aLong - last;
todayDetail.put(partition.partition(), diff);
todayTotal.addAndGet(diff);
}));
Map<String, Object> yes = new HashMap<>(), to = new HashMap<>();
yes.put("detail", convertList(yesterdayDetail));
yes.put("total", yesterdayTotal.get());
to.put("detail", convertList(todayDetail));
to.put("total", todayTotal.get());
res.put("yesterday", yes);
res.put("today", to);
// 今天的消息数是现在减去今天0时的
return ResponseData.create().data(res).success();
}
private List<Map<String, Object>> convertList(Map<Integer, Long> source) {
List<Map<String, Object>> collect = source.entrySet().stream().map(entry -> {
Map<String, Object> map = new HashMap<>(3, 1.0f);
map.put("partition", entry.getKey());
map.put("num", entry.getValue());
return map;
}).collect(Collectors.toList());
return collect;
}
}

View File

@@ -0,0 +1,19 @@
package com.xuxd.kafka.console.utils;
import com.google.gson.Gson;
/**
* kafka-console-ui.
*
* @author xuxd
* @date 2021-11-19 17:01:01
**/
public enum GsonUtil {
INSTANCE;
private Gson gson = new Gson();
public Gson get() {
return gson;
}
}

View File

@@ -1,15 +1,20 @@
package kafka.console
import java.util.Properties
import com.xuxd.kafka.console.config.KafkaConfig
import kafka.zk.{AdminZkClient, KafkaZkClient}
import org.apache.kafka.clients.CommonClientConfigs
import org.apache.kafka.clients.admin.{AbstractOptions, Admin, AdminClientConfig}
import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}
import org.apache.kafka.clients.admin._
import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer, OffsetAndMetadata}
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.config.SaslConfigs
import org.apache.kafka.common.requests.ListOffsetsResponse
import org.apache.kafka.common.serialization.ByteArrayDeserializer
import org.apache.kafka.common.utils.Time
import org.slf4j.{Logger, LoggerFactory}
import java.util.Properties
import scala.collection.{Map, Seq}
import scala.jdk.CollectionConverters.{MapHasAsJava, MapHasAsScala}
/**
* kafka-console-ui.
@@ -89,3 +94,57 @@ class KafkaConsole(config: KafkaConfig) {
props
}
}
object KafkaConsole {
val log: Logger = LoggerFactory.getLogger(this.getClass)
def getCommittedOffsets(admin: Admin, groupId: String,
timeoutMs: Integer): Map[TopicPartition, OffsetAndMetadata] = {
admin.listConsumerGroupOffsets(
groupId, new ListConsumerGroupOffsetsOptions().timeoutMs(timeoutMs)
).partitionsToOffsetAndMetadata.get.asScala
}
def getLogTimestampOffsets(admin: Admin, topicPartitions: Seq[TopicPartition],
timestamp: java.lang.Long, timeoutMs: Integer): Map[TopicPartition, OffsetAndMetadata] = {
val timestampOffsets = topicPartitions.map { topicPartition =>
topicPartition -> OffsetSpec.forTimestamp(timestamp)
}.toMap
val offsets = admin.listOffsets(
timestampOffsets.asJava,
new ListOffsetsOptions().timeoutMs(timeoutMs)
).all.get
val (successfulOffsetsForTimes, unsuccessfulOffsetsForTimes) =
offsets.asScala.partition(_._2.offset != ListOffsetsResponse.UNKNOWN_OFFSET)
val successfulLogTimestampOffsets = successfulOffsetsForTimes.map {
case (topicPartition, listOffsetsResultInfo) => topicPartition -> new OffsetAndMetadata(listOffsetsResultInfo.offset)
}.toMap
unsuccessfulOffsetsForTimes.foreach { entry =>
log.warn(s"\nWarn: Partition " + entry._1.partition() + " from topic " + entry._1.topic() +
" is empty. Falling back to latest known offset.")
}
successfulLogTimestampOffsets ++ getLogEndOffsets(admin, unsuccessfulOffsetsForTimes.keySet.toSeq, timeoutMs)
}
def getLogEndOffsets(admin: Admin,
topicPartitions: Seq[TopicPartition], timeoutMs: Integer): Predef.Map[TopicPartition, OffsetAndMetadata] = {
val endOffsets = topicPartitions.map { topicPartition =>
topicPartition -> OffsetSpec.latest
}.toMap
val offsets = admin.listOffsets(
endOffsets.asJava,
new ListOffsetsOptions().timeoutMs(timeoutMs)
).all.get
val res = topicPartitions.map { topicPartition =>
Option(offsets.get(topicPartition)) match {
case Some(listOffsetsResultInfo) => topicPartition -> new OffsetAndMetadata(listOffsetsResultInfo.offset)
case _ =>
throw new IllegalArgumentException
}
}.toMap
res
}
}

View File

@@ -1,15 +1,15 @@
package kafka.console
import java.util.concurrent.TimeUnit
import java.util.{Collections, Properties}
import com.xuxd.kafka.console.config.KafkaConfig
import org.apache.kafka.clients.admin.ElectLeadersOptions
import kafka.admin.ReassignPartitionsCommand
import org.apache.kafka.clients.admin.{ElectLeadersOptions, ListPartitionReassignmentsOptions, PartitionReassignment}
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.common.serialization.ByteArrayDeserializer
import org.apache.kafka.common.{ElectionType, TopicPartition}
import scala.jdk.CollectionConverters.{CollectionHasAsScala, ListHasAsScala, MapHasAsScala, SeqHasAsJava, SetHasAsJava, SetHasAsScala}
import java.util.concurrent.TimeUnit
import java.util.{Collections, Properties}
import scala.jdk.CollectionConverters.{CollectionHasAsScala, ListHasAsScala, MapHasAsJava, MapHasAsScala, SeqHasAsJava, SetHasAsJava, SetHasAsScala}
/**
* kafka-console-ui.
@@ -210,4 +210,47 @@ class OperationConsole(config: KafkaConfig, topicConsole: TopicConsole,
val topicList = topicConsole.getTopicList(Collections.singleton(topic))
topicList.asScala.flatMap(_.partitions().asScala.map(t => new TopicPartition(topic, t.partition()))).toSet.asJava
}
}
def modifyInterBrokerThrottle(reassigningBrokers: util.Set[Int],
interBrokerThrottle: Long): (Boolean, String) = {
withAdminClientAndCatchError(admin => {
ReassignPartitionsCommand.modifyInterBrokerThrottle(admin, reassigningBrokers.asScala.toSet, interBrokerThrottle)
(true, "")
}, e => {
log.error("modifyInterBrokerThrottle error.", e)
(false, e.getMessage)
}).asInstanceOf[(Boolean, String)]
}
def clearBrokerLevelThrottles(brokers: util.Set[Int]): (Boolean, String) = {
withAdminClientAndCatchError(admin => {
ReassignPartitionsCommand.clearBrokerLevelThrottles(admin, brokers.asScala.toSet)
(true, "")
}, e => {
log.error("clearBrokerLevelThrottles error.", e)
(false, e.getMessage)
}).asInstanceOf[(Boolean, String)]
}
/**
* current reassigning is active.
*/
def currentReassignments(): util.Map[TopicPartition, PartitionReassignment] = {
withAdminClientAndCatchError(admin => {
admin.listPartitionReassignments(withTimeoutMs(new ListPartitionReassignmentsOptions)).reassignments().get()
}, e => {
Collections.emptyMap()
log.error("listPartitionReassignments error.", e)
}).asInstanceOf[util.Map[TopicPartition, PartitionReassignment]]
}
def cancelPartitionReassignments(reassignments: util.Set[TopicPartition]): util.Map[TopicPartition, Throwable] = {
withAdminClientAndCatchError(admin => {
val res = ReassignPartitionsCommand.cancelPartitionReassignments(admin, reassignments.asScala.toSet)
res.asJava
}, e => {
log.error("cancelPartitionReassignments error.", e)
throw e
}).asInstanceOf[util.Map[TopicPartition, Throwable]]
}
}

View File

@@ -1,14 +1,18 @@
package kafka.console
import java.util
import java.util.concurrent.TimeUnit
import java.util.{Collections, List, Set}
import com.xuxd.kafka.console.config.KafkaConfig
import kafka.admin.ReassignPartitionsCommand._
import kafka.utils.Json
import org.apache.kafka.clients.admin._
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException
import org.apache.kafka.common.utils.Time
import org.apache.kafka.common.{TopicPartition, TopicPartitionReplica}
import scala.jdk.CollectionConverters.{CollectionHasAsScala, SetHasAsJava}
import java.util
import java.util.concurrent.{ExecutionException, TimeUnit}
import java.util.{Collections, List, Set}
import scala.collection.{Map, Seq}
import scala.jdk.CollectionConverters.{CollectionHasAsScala, MapHasAsJava, MapHasAsScala, SeqHasAsJava, SetHasAsJava}
/**
* kafka-console-ui.
@@ -121,4 +125,180 @@ class TopicConsole(config: KafkaConfig) extends KafkaConsole(config: KafkaConfig
(false, e.getMessage)
}).asInstanceOf[(Boolean, String)]
}
def getCurrentReplicaAssignmentJson(topic: String): (Boolean, String) = {
withAdminClientAndCatchError(admin => {
val json = formatAsReassignmentJson(getReplicaAssignmentForTopics(admin, Seq(topic)), Map.empty)
(true, json)
}, e => {
log.error("getCurrentReplicaAssignmentJson error, ", e)
(false, e.getMessage)
}).asInstanceOf[(Boolean, String)]
}
def updateReplicas(reassignmentJson: String, interBrokerThrottle: Long = -1L): (Boolean, String) = {
withAdminClientAndCatchError(admin => {
executeAssignment(admin, reassignmentJson, interBrokerThrottle)
(true, "")
}, e => {
log.error("executeAssignment error, ", e)
(false, e.getMessage)
}).asInstanceOf[(Boolean, String)]
}
/**
* Copy and modify from @{link kafka.admin.ReassignPartitionsCommand#executeAssignment}.
*/
def executeAssignment(adminClient: Admin,
reassignmentJson: String,
interBrokerThrottle: Long = -1L,
logDirThrottle: Long = -1L,
timeoutMs: Long = 30000L,
time: Time = Time.SYSTEM): Unit = {
val (proposedParts, proposedReplicas) = parseExecuteAssignmentArgs(reassignmentJson)
val currentReassignments = adminClient.
listPartitionReassignments().reassignments().get().asScala
// If there is an existing assignment
// This helps avoid surprising users.
if (currentReassignments.nonEmpty) {
throw new TerseReassignmentFailureException("Cannot execute because there is an existing partition assignment.")
}
verifyBrokerIds(adminClient, proposedParts.values.flatten.toSet)
val currentParts = getReplicaAssignmentForPartitions(adminClient, proposedParts.keySet.toSet)
log.info("currentPartitionReplicaAssignment: " + currentPartitionReplicaAssignmentToString(proposedParts, currentParts))
log.info(s"newPartitionReplicaAssignment: $reassignmentJson")
if (interBrokerThrottle >= 0 || logDirThrottle >= 0) {
if (interBrokerThrottle >= 0) {
val moveMap = calculateProposedMoveMap(currentReassignments, proposedParts, currentParts)
modifyReassignmentThrottle(adminClient, moveMap)
}
if (logDirThrottle >= 0) {
val movingBrokers = calculateMovingBrokers(proposedReplicas.keySet.toSet)
modifyLogDirThrottle(adminClient, movingBrokers, logDirThrottle)
}
}
// Execute the partition reassignments.
val errors = alterPartitionReassignments(adminClient, proposedParts)
if (errors.nonEmpty) {
throw new TerseReassignmentFailureException(
"Error reassigning partition(s):%n%s".format(
errors.keySet.toBuffer.sortWith(compareTopicPartitions).map { part =>
s"$part: ${errors(part).getMessage}"
}.mkString(System.lineSeparator())))
}
if (proposedReplicas.nonEmpty) {
executeMoves(adminClient, proposedReplicas, timeoutMs, time)
}
}
def configThrottle(topic: String, partitions: util.List[Integer]): (Boolean, String) = {
withAdminClientAndCatchError(admin => {
val throttles = {
if (partitions.get(0) == -1) {
Map(topic -> "*")
} else {
val topicDescription = admin.describeTopics(Collections.singleton(topic), withTimeoutMs(new DescribeTopicsOptions))
.all().get().values().asScala.toList
def convert(partition: Integer, replicas: scala.List[Int]): String = {
replicas.map("%d:%d".format(partition, _)).toSet.mkString(",")
}
val ptor = topicDescription.head.partitions().asScala.map(info => (info.partition(), info.replicas().asScala.map(_.id()))).toMap
val conf = partitions.asScala.map(partition => convert(partition, ptor.get(partition) match {
case Some(v) => v.toList
case None => throw new IllegalArgumentException
})).toList
Map(topic -> conf.mkString(","))
}
}
modifyTopicThrottles(admin, throttles, throttles)
(true, "")
}, e => {
log.error("configThrottle error, ", e)
(false, e.getMessage)
}).asInstanceOf[(Boolean, String)]
}
def clearThrottle(topic: String): (Boolean, String) = {
withAdminClientAndCatchError(admin => {
clearTopicLevelThrottles(admin, Collections.singleton(topic).asScala.toSet)
(true, "")
}, e => {
log.error("clearThrottle error, ", e)
(false, e.getMessage)
}).asInstanceOf[(Boolean, String)]
}
def getOffsetForTimestamp(topic: String, timestamp: java.lang.Long): util.Map[TopicPartition, java.lang.Long] = {
withAdminClientAndCatchError(admin => {
val partitions = describeTopics(admin, Collections.singleton(topic)).get(topic) match {
case Some(topicDescription: TopicDescription) => topicDescription.partitions()
.asScala.map(info => new TopicPartition(topic, info.partition())).toSeq
case None => throw new IllegalArgumentException("topic is not exist.")
}
val offsetMap = KafkaConsole.getLogTimestampOffsets(admin, partitions, timestamp, timeoutMs)
offsetMap.map(tuple2 => (tuple2._1, tuple2._2.offset())).toMap.asJava
}, e => {
log.error("clearThrottle error, ", e)
Collections.emptyMap()
}).asInstanceOf[util.Map[TopicPartition, java.lang.Long]]
}
/**
* Get the current replica assignments for some topics.
*
* @param adminClient The AdminClient to use.
* @param topics The topics to get information about.
* @return A map from partitions to broker assignments.
* If any topic can't be found, an exception will be thrown.
*/
private def getReplicaAssignmentForTopics(adminClient: Admin,
topics: Seq[String])
: Map[TopicPartition, Seq[Int]] = {
describeTopics(adminClient, topics.toSet.asJava).flatMap {
case (topicName, topicDescription) => topicDescription.partitions.asScala.map { info =>
(new TopicPartition(topicName, info.partition), info.replicas.asScala.map(_.id).toSeq)
}
}
}
private def formatAsReassignmentJson(partitionsToBeReassigned: Map[TopicPartition, Seq[Int]],
replicaLogDirAssignment: Map[TopicPartitionReplica, String]): String = {
Json.encodeAsString(Map(
"version" -> 1,
"partitions" -> partitionsToBeReassigned.keySet.toBuffer.sortWith(compareTopicPartitions).map {
tp =>
val replicas = partitionsToBeReassigned(tp)
Map(
"topic" -> tp.topic,
"partition" -> tp.partition,
"replicas" -> replicas.asJava
).asJava
}.asJava
).asJava)
}
private def describeTopics(adminClient: Admin,
topics: Set[String])
: Map[String, TopicDescription] = {
adminClient.describeTopics(topics).values.asScala.map { case (topicName, topicDescriptionFuture) =>
try topicName -> topicDescriptionFuture.get
catch {
case t: ExecutionException if t.getCause.isInstanceOf[UnknownTopicOrPartitionException] =>
throw new ExecutionException(
new UnknownTopicOrPartitionException(s"Topic $topicName not found."))
}
}
}
private def modifyReassignmentThrottle(admin: Admin, moveMap: MoveMap): Unit = {
val leaderThrottles = calculateLeaderThrottles(moveMap)
val followerThrottles = calculateFollowerThrottles(moveMap)
modifyTopicThrottles(admin, leaderThrottles, followerThrottles)
}
}

View File

@@ -117,6 +117,22 @@ export const KafkaTopicApi = {
url: "/topic/partition/new",
method: "post",
},
getCurrentReplicaAssignment: {
url: "/topic/replica/assignment",
method: "get",
},
updateReplicaAssignment: {
url: "/topic/replica/assignment",
method: "post",
},
configThrottle: {
url: "/topic/replica/throttle",
method: "post",
},
sendStats: {
url: "/topic/send/stats",
method: "get",
},
};
export const KafkaConsumerApi = {
@@ -190,4 +206,20 @@ export const KafkaOpApi = {
url: "/op/replication/preferred",
method: "post",
},
configThrottle: {
url: "/op/broker/throttle",
method: "post",
},
removeThrottle: {
url: "/op/broker/throttle",
method: "delete",
},
currentReassignments: {
url: "/op/replication/reassignments",
method: "get",
},
cancelReassignment: {
url: "/op/replication/reassignments",
method: "delete",
},
};

View File

@@ -47,6 +47,15 @@
@click="openResetOffsetByTimeDialog(k)"
>时间戳
</a-button>
<a-button
type="primary"
icon="reload"
size="small"
style="float: right"
@click="getConsumerDetail"
>
刷新
</a-button>
<hr />
<a-table
:columns="columns"
@@ -70,6 +79,11 @@
</a-button>
</div>
</a-table>
<p>
<strong style="color: red"
>注意重置位点时要求当前没有正在运行的消费端否则重置的时候会报错返回失败信息</strong
>
</p>
</div>
<a-modal

View File

@@ -32,6 +32,10 @@
/>
</a-form-item>
</a-form>
<hr />
<p>
*注意该时间为北京时间这里固定为东8区的计算时间如果所在地区不是采用北京时间中国大部分地区都是采用的北京时间请自行对照为当地时间重置
</p>
</a-spin>
</div>
</a-modal>

View File

@@ -0,0 +1,157 @@
<template>
<a-modal
title="限流配置"
:visible="show"
:width="1000"
:mask="false"
:maskClosable="false"
okText="确认"
cancelText="取消"
:destroyOnClose="true"
@cancel="handleCancel"
@ok="ok"
>
<div>
<a-spin :spinning="loading">
<a-form
:form="form"
:label-col="{ span: 5 }"
:wrapper-col="{ span: 12 }"
>
<a-form-item label="Broker">
<a-select
mode="multiple"
option-filter-prop="children"
v-decorator="[
'brokerList',
{
initialValue: brokers,
rules: [{ required: true, message: '请选择一个broker!' }],
},
]"
placeholder="请选择一个broker"
>
<a-select-option v-for="v in brokers" :key="v" :value="v">
<span v-if="v == -1">全部</span> <span v-else>{{ v }}</span>
</a-select-option>
</a-select>
</a-form-item>
<a-form-item label="带宽">
<a-input-number
:min="1"
:max="1024"
v-decorator="[
'throttle',
{
initialValue: 1,
rules: [{ required: true, message: '输入带宽!' }],
},
]"
/>
<a-select default-value="MB" v-model="unit" style="width: 100px">
<a-select-option value="MB"> MB/s </a-select-option>
<a-select-option value="KB"> KB/s </a-select-option>
</a-select>
</a-form-item>
</a-form>
<hr />
<div><h4>注意:</h4></div>
<ul>
<li>该限速带宽指的是broker之间副本进行同步时占用的带宽</li>
<li>该配置是broker级别配置是针对broker上topic的副本</li>
<li>
在当前页面对指定broker限流配置后并不是说设置后该broker上的所有topic副本同步就被限制为当前流速了。这仅仅是速率设置如果需要对某topic的副本同步进行限流还需要去
Topic->限流 处操作只有进行限流操作的topic该限速才会对其生效
</li>
<li>
上面这句话的意思就是这里只配置topic副本同步的速率要使这个配置真正在某个topic上生效还要开启这个topic的限流
</li>
</ul>
<h4>如何检查限流配置是否成功:</h4>
kafka的限流速率是通过下面这两项配置的
<ul>
<li>leader.replication.throttled.rate</li>
<li>follower.replication.throttled.rate</li>
</ul>
只需通过
<strong>集群->属性配置</strong>
查看是否存在这两项配置如果存在便是配置的有限流值的大小就是速率单位kb/s
</a-spin>
</div>
</a-modal>
</template>
<script>
import request from "@/utils/request";
import { KafkaClusterApi, KafkaOpApi } from "@/utils/api";
import notification from "ant-design-vue/lib/notification";
export default {
name: "ConfigThrottle",
props: {
visible: {
type: Boolean,
default: false,
},
},
data() {
return {
show: this.visible,
loading: false,
form: this.$form.createForm(this, { name: "ConfigThrottleForm" }),
brokers: [],
unit: "MB",
};
},
watch: {
visible(v) {
this.show = v;
if (this.show) {
this.getClusterInfo();
}
},
},
methods: {
handleCancel() {
this.$emit("closeConfigThrottleDialog", { refresh: false });
},
getClusterInfo() {
this.loading = true;
request({
url: KafkaClusterApi.getClusterInfo.url,
method: KafkaClusterApi.getClusterInfo.method,
}).then((res) => {
this.loading = false;
this.brokers = [];
res.data.nodes.forEach((node) => this.brokers.push(node.id));
});
},
ok() {
this.form.validateFields((err, values) => {
if (!err) {
const data = Object.assign({}, values, { unit: this.unit });
this.loading = true;
request({
url: KafkaOpApi.configThrottle.url,
method: KafkaOpApi.configThrottle.method,
data: data,
}).then((res) => {
this.loading = false;
if (res.code == 0) {
this.$message.success(res.msg);
this.$emit("closeConfigThrottleDialog", { refresh: false });
} else {
notification.error({
message: "error",
description: res.msg,
});
}
});
}
});
},
},
};
</script>
<style scoped></style>

View File

@@ -0,0 +1,166 @@
<template>
<a-modal
title="正在进行副本重分配的分区"
:visible="show"
:width="1200"
:mask="false"
:destroyOnClose="true"
:footer="null"
:maskClosable="false"
@cancel="handleCancel"
>
<div>
<a-spin :spinning="loading">
<a-table
:columns="columns"
:data-source="data"
bordered
:rowKey="(record) => record.topic + record.partition"
>
<div slot="replicas" slot-scope="text">
<span v-for="i in text" :key="i">
{{ i }}
</span>
</div>
<div slot="addingReplicas" slot-scope="text">
<span v-for="i in text" :key="i">
{{ i }}
</span>
</div>
<div slot="removingReplicas" slot-scope="text">
<span v-for="i in text" :key="i">
{{ i }}
</span>
</div>
<div slot="operation" slot-scope="record">
<a-popconfirm
title="取消正在进行的副本重分配任务?"
ok-text="确认"
cancel-text="取消"
@confirm="cancelReassignment(record)"
>
<a-button size="small" href="javascript:;" class="operation-btn"
>取消
</a-button>
</a-popconfirm>
</div>
</a-table>
</a-spin>
</div>
</a-modal>
</template>
<script>
import request from "@/utils/request";
import { KafkaOpApi } from "@/utils/api";
import notification from "ant-design-vue/es/notification";
export default {
name: "CurrentReassignments",
props: {
visible: {
type: Boolean,
default: false,
},
},
data() {
return {
columns: columns,
show: this.visible,
data: [],
loading: false,
};
},
watch: {
visible(v) {
this.show = v;
if (this.show) {
this.currentReassignments();
}
},
},
methods: {
currentReassignments() {
this.loading = true;
const api = KafkaOpApi.currentReassignments;
request({
url: api.url,
method: api.method,
}).then((res) => {
this.loading = false;
if (res.code != 0) {
notification.error({
message: "error",
description: res.msg,
});
} else {
this.data = res.data;
this.yesterday = this.data.yesterday;
this.today = this.data.today;
}
});
},
handleCancel() {
this.$emit("closeCurrentReassignmentsDialog", {});
},
cancelReassignment(record) {
const param = { topic: record.topic, partition: record.partition };
this.loading = true;
const api = KafkaOpApi.cancelReassignment;
request({
url: api.url,
method: api.method,
data: param,
}).then((res) => {
this.loading = false;
if (res.code != 0) {
notification.error({
message: "error",
description: res.msg,
});
} else {
this.currentReassignments();
}
});
},
},
};
const columns = [
{
title: "Topic",
dataIndex: "topic",
key: "topic",
},
{
title: "分区",
dataIndex: "partition",
key: "partition",
},
{
title: "副本",
dataIndex: "replicas",
key: "replicas",
scopedSlots: { customRender: "replicas" },
},
{
title: "正在增加的副本",
dataIndex: "addingReplicas",
key: "addingReplicas",
scopedSlots: { customRender: "addingReplicas" },
},
{
title: "正在移除的副本",
dataIndex: "removingReplicas",
key: "removingReplicas",
scopedSlots: { customRender: "removingReplicas" },
},
{
title: "操作",
key: "operation",
scopedSlots: { customRender: "operation" },
},
];
</script>
<style scoped></style>

View File

@@ -1,5 +1,25 @@
<template>
<div class="content">
<div class="content-module">
<a-card title="Broker管理" style="width: 100%; text-align: left">
<p>
<a-button type="primary" @click="openConfigThrottleDialog">
配置限流
</a-button>
<label>说明</label>
<span
>设置指定broker上的topic的副本之间数据同步占用的带宽这个设置是broker级别的但是设置后还要去对应的topic上进行限流配置指定对这个topic的相关副本进行限制</span
>
</p>
<p>
<a-button type="primary" @click="openRemoveThrottleDialog">
解除限流
</a-button>
<label>说明</label>
<span>解除指定broker上的topic副本之间数据同步占用的带宽限制</span>
</p>
</a-card>
</div>
<div class="content-module">
<a-card title="副本管理" style="width: 100%; text-align: left">
<p>
@@ -9,6 +29,13 @@
<label>说明</label>
<span>将集群中所有分区leader副本设置为首选副本</span>
</p>
<p>
<a-button type="primary" @click="openCurrentReassignmentsDialog">
副本变更详情
</a-button>
<label>说明</label>
<span>查看正在进行副本变更/重分配的任务或者将其取消</span>
</p>
</a-card>
</div>
<div class="content-module">
@@ -66,6 +93,20 @@
@closeDataSyncSchemeDialog="closeDataSyncSchemeDialog"
>
</DataSyncScheme>
<ConfigThrottle
:visible="brokerManager.showConfigThrottleDialog"
@closeConfigThrottleDialog="closeConfigThrottleDialog"
>
</ConfigThrottle>
<RemoveThrottle
:visible="brokerManager.showRemoveThrottleDialog"
@closeRemoveThrottleDialog="closeRemoveThrottleDialog"
>
</RemoveThrottle>
<CurrentReassignments
:visible="replicationManager.showCurrentReassignmentsDialog"
@closeCurrentReassignmentsDialog="closeCurrentReassignmentsDialog"
></CurrentReassignments>
</div>
</template>
@@ -75,6 +116,9 @@ import MinOffsetAlignment from "@/views/op/MinOffsetAlignment";
import OffsetAlignmentTable from "@/views/op/OffsetAlignmentTable";
import ElectPreferredLeader from "@/views/op/ElectPreferredLeader";
import DataSyncScheme from "@/views/op/DataSyncScheme";
import ConfigThrottle from "@/views/op/ConfigThrottle";
import RemoveThrottle from "@/views/op/RemoveThrottle";
import CurrentReassignments from "@/views/op/CurrentReassignments";
export default {
name: "Operation",
components: {
@@ -83,6 +127,9 @@ export default {
OffsetAlignmentTable,
ElectPreferredLeader,
DataSyncScheme,
ConfigThrottle,
RemoveThrottle,
CurrentReassignments,
},
data() {
return {
@@ -94,6 +141,11 @@ export default {
},
replicationManager: {
showElectPreferredLeaderDialog: false,
showCurrentReassignmentsDialog: false,
},
brokerManager: {
showConfigThrottleDialog: false,
showRemoveThrottleDialog: false,
},
};
},
@@ -128,6 +180,24 @@ export default {
closeElectPreferredLeaderDialog() {
this.replicationManager.showElectPreferredLeaderDialog = false;
},
openConfigThrottleDialog() {
this.brokerManager.showConfigThrottleDialog = true;
},
closeConfigThrottleDialog() {
this.brokerManager.showConfigThrottleDialog = false;
},
openRemoveThrottleDialog() {
this.brokerManager.showRemoveThrottleDialog = true;
},
closeRemoveThrottleDialog() {
this.brokerManager.showRemoveThrottleDialog = false;
},
openCurrentReassignmentsDialog() {
this.replicationManager.showCurrentReassignmentsDialog = true;
},
closeCurrentReassignmentsDialog() {
this.replicationManager.showCurrentReassignmentsDialog = false;
},
},
};
</script>

View File

@@ -0,0 +1,127 @@
<template>
<a-modal
title="解除限流"
:visible="show"
:width="1000"
:mask="false"
:maskClosable="false"
okText="确认"
cancelText="取消"
:destroyOnClose="true"
@cancel="handleCancel"
@ok="ok"
>
<div>
<a-spin :spinning="loading">
<a-form
:form="form"
:label-col="{ span: 5 }"
:wrapper-col="{ span: 12 }"
>
<a-form-item label="Broker">
<a-select
mode="multiple"
option-filter-prop="children"
v-decorator="[
'brokerList',
{
initialValue: brokers,
rules: [{ required: true, message: '请选择一个broker!' }],
},
]"
placeholder="请选择一个broker"
>
<a-select-option v-for="v in brokers" :key="v" :value="v">
<span v-if="v == -1">全部</span> <span v-else>{{ v }}</span>
</a-select-option>
</a-select>
</a-form-item>
</a-form>
<hr />
<h4>如何检查是否配置的有限流速率:</h4>
kafka的限流速率是通过下面这两项配置的
<ul>
<li>leader.replication.throttled.rate</li>
<li>follower.replication.throttled.rate</li>
</ul>
只需通过
<strong>集群->属性配置</strong>
查看是否存在这两项配置如果不存在便是没有配置限流速率。如果未配置限流速率即使指定某个topic的分区副本进行限流没有速率也不限流。
</a-spin>
</div>
</a-modal>
</template>
<script>
import request from "@/utils/request";
import { KafkaClusterApi, KafkaOpApi } from "@/utils/api";
import notification from "ant-design-vue/lib/notification";
export default {
name: "RemoveThrottle",
props: {
visible: {
type: Boolean,
default: false,
},
},
data() {
return {
show: this.visible,
loading: false,
form: this.$form.createForm(this, { name: "RemoveThrottleForm" }),
brokers: [],
};
},
watch: {
visible(v) {
this.show = v;
if (this.show) {
this.getClusterInfo();
}
},
},
methods: {
handleCancel() {
this.$emit("closeRemoveThrottleDialog", { refresh: false });
},
getClusterInfo() {
this.loading = true;
request({
url: KafkaClusterApi.getClusterInfo.url,
method: KafkaClusterApi.getClusterInfo.method,
}).then((res) => {
this.loading = false;
this.brokers = [];
res.data.nodes.forEach((node) => this.brokers.push(node.id));
});
},
ok() {
this.form.validateFields((err, values) => {
if (!err) {
const data = Object.assign({}, values);
this.loading = true;
request({
url: KafkaOpApi.removeThrottle.url,
method: KafkaOpApi.removeThrottle.method,
data: data,
}).then((res) => {
this.loading = false;
if (res.code == 0) {
this.$message.success(res.msg);
this.$emit("closeRemoveThrottleDialog", { refresh: false });
} else {
notification.error({
message: "error",
description: res.msg,
});
}
});
}
});
},
},
};
</script>
<style scoped></style>

View File

@@ -0,0 +1,163 @@
<template>
<a-modal
:title="topic + '限流'"
:visible="show"
:width="1000"
:mask="false"
:maskClosable="false"
okText="确认"
cancelText="取消"
:destroyOnClose="true"
@cancel="handleCancel"
@ok="ok"
>
<div>
<a-spin :spinning="loading">
<a-form
:form="form"
:label-col="{ span: 5 }"
:wrapper-col="{ span: 12 }"
>
<a-form-item label="操作">
<a-radio-group
@change="onChange"
v-decorator="[
'operation',
{
initialValue: 'ON',
rules: [{ required: true, message: '请选择一个操作!' }],
},
]"
>
<a-radio value="ON"> 配置限流 </a-radio>
<a-radio value="OFF"> 移除所有分区限流配置 </a-radio>
</a-radio-group>
</a-form-item>
<a-form-item label="选择分区" v-show="showPartition">
<a-select
mode="multiple"
option-filter-prop="children"
v-decorator="[
'partitions',
{
initialValue: [-1],
rules: [{ required: true, message: '请选择一个分区!' }],
},
]"
placeholder="请选择一个分区"
>
<a-select-option v-for="v in partitions" :key="v" :value="v">
<span v-if="v == -1">全部</span> <span v-else>{{ v }}</span>
</a-select-option>
</a-select>
</a-form-item>
</a-form>
<hr />
<h4>说明:</h4>
该限流表示topic的副本的在不同broker之间数据同步占用带宽的限制该配置是一个topic级别的配置项。如未配置速率即使配置了这个限流也不会进行实际的限流操作。配置速率在
<span style="color: red">运维->配置限流</span> 处进行操作.
<h4>如何检查是否对哪些分区启用限流:</h4>
topic的限流是通过下面这两项配置的
<ul>
<li>leader.replication.throttled.replicas</li>
<li>follower.replication.throttled.replicas</li>
</ul>
只需通过
<strong>属性配置</strong>
查看这两项配置的值,格式为:"0:0,1:0"左侧为分区右侧为broker
id。示例表示[分区0的副本在broker 0上分区1的副本在broker 0上]。
</a-spin>
</div>
</a-modal>
</template>
<script>
import request from "@/utils/request";
import { KafkaTopicApi } from "@/utils/api";
import notification from "ant-design-vue/lib/notification";
export default {
name: "ConfigTopicThrottle",
props: {
visible: {
type: Boolean,
default: false,
},
topic: {
type: String,
default: "",
},
},
data() {
return {
show: this.visible,
loading: false,
form: this.$form.createForm(this, { name: "RemoveThrottleForm" }),
partitions: [],
showPartition: true,
};
},
watch: {
visible(v) {
this.show = v;
if (this.show) {
this.getPartitionInfo();
this.showPartition = true;
}
},
},
methods: {
handleCancel() {
this.$emit("closeThrottleDialog", { refresh: false });
},
getPartitionInfo() {
this.loading = true;
request({
url: KafkaTopicApi.getPartitionInfo.url + "?topic=" + this.topic,
method: KafkaTopicApi.getPartitionInfo.method,
}).then((res) => {
this.loading = false;
if (res.code != 0) {
notification.error({
message: "error",
description: res.msg,
});
} else {
this.partitions = res.data.map((e) => e.partition);
this.partitions.splice(0, 0, -1);
}
});
},
ok() {
this.form.validateFields((err, values) => {
if (!err) {
const data = Object.assign({}, values, { topic: this.topic });
this.loading = true;
request({
url: KafkaTopicApi.configThrottle.url,
method: KafkaTopicApi.configThrottle.method,
data: data,
}).then((res) => {
this.loading = false;
if (res.code == 0) {
this.$message.success(res.msg);
this.$emit("closeThrottleDialog", { refresh: false });
} else {
notification.error({
message: "error",
description: res.msg,
});
}
});
}
});
},
onChange(e) {
this.showPartition = !(e.target.value == "OFF");
},
},
};
</script>
<style scoped></style>

View File

@@ -12,6 +12,7 @@
<div>
<a-spin :spinning="loading">
<a-table
bordered
:columns="columns"
:data-source="data"
:rowKey="
@@ -21,19 +22,15 @@
"
>
<ul slot="replicas" slot-scope="text">
<ol v-for="i in text" :key="i">
{{
i
}}
</ol>
</ul>
<ul slot="isr" slot-scope="text">
<ol v-for="i in text" :key="i">
{{
i
}}
</ol>
<li v-for="i in text" :key="i">
{{ i }}
</li>
</ul>
<div slot="isr" slot-scope="text">
<span v-for="i in text" :key="i">
{{ i }}
</span>
</div>
<div slot="operation" slot-scope="record" v-show="!record.internal">
<a-popconfirm
:title="

View File

@@ -0,0 +1,115 @@
<template>
<a-modal
:title="topic + '发送统计'"
:visible="show"
:width="1000"
:mask="false"
:destroyOnClose="true"
:footer="null"
:maskClosable="false"
@cancel="handleCancel"
>
<div>
<a-spin :spinning="loading">
<h4>今天发送消息数{{ today.total }}</h4>
<a-table
:columns="columns"
:data-source="today.detail"
bordered
:rowKey="(record) => record.partition"
>
</a-table>
<hr />
<h4>昨天发送消息数{{ yesterday.total }}</h4>
<a-table
:columns="columns"
:data-source="yesterday.detail"
bordered
:rowKey="(record) => record.partition"
>
</a-table>
</a-spin>
</div>
</a-modal>
</template>
<script>
import request from "@/utils/request";
import { KafkaTopicApi } from "@/utils/api";
import notification from "ant-design-vue/es/notification";
export default {
name: "SendStats",
props: {
topic: {
type: String,
default: "",
},
visible: {
type: Boolean,
default: false,
},
},
data() {
return {
columns: columns,
show: this.visible,
data: [],
loading: false,
yesterday: {},
today: {},
};
},
watch: {
visible(v) {
this.show = v;
if (this.show) {
this.sendStatus();
}
},
},
methods: {
sendStatus() {
this.loading = true;
const api = KafkaTopicApi.sendStats;
request({
url: api.url + "?topic=" + this.topic,
method: api.method,
}).then((res) => {
this.loading = false;
if (res.code != 0) {
notification.error({
message: "error",
description: res.msg,
});
} else {
this.data = res.data;
this.yesterday = this.data.yesterday;
this.today = this.data.today;
}
});
},
handleCancel() {
this.data = [];
this.yesterday = {};
this.today = {};
this.$emit("closeMessageStatsDialog", {});
},
},
};
const columns = [
{
title: "分区",
dataIndex: "partition",
key: "partition",
},
{
title: "数量",
dataIndex: "num",
key: "num",
},
];
</script>
<style scoped></style>

View File

@@ -98,6 +98,27 @@
@click="openTopicConfigDialog(record.name)"
>属性配置
</a-button>
<a-button
size="small"
href="javascript:;"
class="operation-btn"
@click="openUpdateReplicaDialog(record.name)"
>变更副本
</a-button>
<a-button
size="small"
href="javascript:;"
class="operation-btn"
@click="openMessageStatsDialog(record.name)"
>发送统计
</a-button>
<a-button
size="small"
href="javascript:;"
class="operation-btn"
@click="openThrottleDialog(record.name)"
>限流
</a-button>
</div>
</a-table>
<PartitionInfo
@@ -126,6 +147,21 @@
:topic="selectDetail.resourceName"
@closeTopicConfigDialog="closeTopicConfigDialog"
></TopicConfig>
<UpdateReplica
:visible="showUpdateReplicaDialog"
:topic="selectDetail.resourceName"
@closeUpdateReplicaDialog="closeUpdateReplicaDialog"
></UpdateReplica>
<ConfigTopicThrottle
:visible="showThrottleDialog"
:topic="selectDetail.resourceName"
@closeThrottleDialog="closeThrottleDialog"
></ConfigTopicThrottle>
<SendStats
:visible="showSendStatsDialog"
:topic="selectDetail.resourceName"
@closeMessageStatsDialog="closeMessageStatsDialog"
></SendStats>
</div>
</a-spin>
</div>
@@ -140,6 +176,9 @@ import CreateTopic from "@/views/topic/CreateTopic";
import AddPartition from "@/views/topic/AddPartition";
import ConsumedDetail from "@/views/topic/ConsumedDetail";
import TopicConfig from "@/views/topic/TopicConfig";
import UpdateReplica from "@/views/topic/UpdateReplica";
import ConfigTopicThrottle from "@/views/topic/ConfigTopicThrottle";
import SendStats from "@/views/topic/SendStats";
export default {
name: "Topic",
@@ -149,6 +188,9 @@ export default {
AddPartition,
ConsumedDetail,
TopicConfig,
UpdateReplica,
ConfigTopicThrottle,
SendStats,
},
data() {
return {
@@ -170,6 +212,9 @@ export default {
showAddPartition: false,
showConsumedDetailDialog: false,
showTopicConfigDialog: false,
showUpdateReplicaDialog: false,
showThrottleDialog: false,
showSendStatsDialog: false,
};
},
methods: {
@@ -250,6 +295,27 @@ export default {
closeTopicConfigDialog() {
this.showTopicConfigDialog = false;
},
openUpdateReplicaDialog(topic) {
this.showUpdateReplicaDialog = true;
this.selectDetail.resourceName = topic;
},
closeUpdateReplicaDialog() {
this.showUpdateReplicaDialog = false;
},
openMessageStatsDialog(topic) {
this.showSendStatsDialog = true;
this.selectDetail.resourceName = topic;
},
closeMessageStatsDialog() {
this.showSendStatsDialog = false;
},
openThrottleDialog(topic) {
this.showThrottleDialog = true;
this.selectDetail.resourceName = topic;
},
closeThrottleDialog() {
this.showThrottleDialog = false;
},
},
created() {
this.getTopicList();
@@ -281,7 +347,7 @@ const columns = [
title: "操作",
key: "operation",
scopedSlots: { customRender: "operation" },
width: 500,
width: 800,
},
];
</script>

View File

@@ -0,0 +1,211 @@
<template>
<a-modal
title="变更副本"
:visible="show"
:width="1200"
:mask="false"
:destroyOnClose="true"
:maskClosable="false"
@cancel="handleCancel"
okText="确认"
cancelText="取消"
@ok="handleOk"
>
<div>
<a-spin :spinning="loading">
<div class="replica-box">
<label>设置副本数</label
><a-input-number
id="inputNumber"
v-model="replicaNums"
:min="1"
:max="brokerSize"
@change="onChange"
/>
</div>
<div class="replica-box">
<label>是否要限流</label
><a-input-number
id="inputNumber"
v-model="data.interBrokerThrottle"
:min="-1"
:max="102400"
/>
<strong>
|说明broker之间副本同步带宽限制默认值为-1表示不限制不是-1表示限制该值并不表示流速至于流速配置
<span style="color: red">运维->配置限流</span> 处进行操作.</strong
>
</div>
<a-table
:columns="columns"
:data-source="data.partitions"
bordered
:rowKey="
(record, index) => {
return index;
}
"
>
<div slot="replicas" slot-scope="text">
<span v-for="i in text" :key="i">
{{ i }}
</span>
</div>
</a-table>
<p>
*正在进行即尚未完成的副本变更的任务可以在
<span style="color: red">运维->副本变更详情</span>
处查看也可以在那里将正在进行的任务取消
</p>
<p>
*如果是减少副本不用限流如果是增加副本数副本同步的时候如果有大量消息需要同步可能占用大量带宽担心会影响集群的稳定考虑是否开启限流同步完成可以再把该topic的限流关毕关闭操作可以点击
限流按钮 处理
</p>
</a-spin>
</div>
</a-modal>
</template>
<script>
import request from "@/utils/request";
import { KafkaClusterApi, KafkaTopicApi } from "@/utils/api";
import notification from "ant-design-vue/lib/notification";
export default {
name: "UpdateReplica",
props: {
topic: {
type: String,
default: "",
},
visible: {
type: Boolean,
default: false,
},
},
data() {
return {
columns: columns,
show: this.visible,
data: {},
loading: false,
form: this.$form.createForm(this, { name: "coordinated" }),
brokerSize: 0,
replicaNums: 0,
defaultReplicaNums: 0,
};
},
watch: {
visible(v) {
this.show = v;
if (this.show) {
this.getClusterInfo();
this.getCurrentReplicaAssignment();
}
},
},
methods: {
getCurrentReplicaAssignment() {
this.loading = true;
request({
url:
KafkaTopicApi.getCurrentReplicaAssignment.url +
"?topic=" +
this.topic,
method: KafkaTopicApi.getCurrentReplicaAssignment.method,
}).then((res) => {
this.loading = false;
if (res.code == 0) {
this.data = res.data;
if (this.data.partitions.length > 0) {
this.replicaNums = this.data.partitions[0].replicas.length;
this.defaultReplicaNums = this.replicaNums;
}
} else {
notification.error({
message: "error",
description: res.msg,
});
}
});
},
getClusterInfo() {
this.loading = true;
request({
url: KafkaClusterApi.getClusterInfo.url,
method: KafkaClusterApi.getClusterInfo.method,
}).then((res) => {
this.brokerSize = res.data.nodes.length;
});
},
handleCancel() {
this.data = {};
this.$emit("closeUpdateReplicaDialog", { refresh: false });
},
onChange(value) {
if (value < 1 || value > this.brokerSize) {
return false;
}
if (this.data.partitions.length > 0) {
this.data.partitions.forEach((p) => {
if (value > p.replicas.length) {
let num = p.replicas[p.replicas.length - 1];
for (let i = p.replicas.length; i < value; i++) {
p.replicas.push(++num % this.brokerSize);
}
}
if (value < p.replicas.length) {
for (let i = p.replicas.length; i > value; i--) {
p.replicas.pop();
}
}
});
}
},
handleOk() {
this.loading = true;
request({
url: KafkaTopicApi.updateReplicaAssignment.url,
method: KafkaTopicApi.updateReplicaAssignment.method,
data: this.data,
}).then((res) => {
this.loading = false;
if (res.code == 0) {
this.$message.success(res.msg);
this.$emit("closeUpdateReplicaDialog", { refresh: false });
} else {
notification.error({
message: "error",
description: res.msg,
});
}
});
},
},
};
const columns = [
{
title: "Topic",
dataIndex: "topic",
key: "topic",
},
{
title: "分区",
dataIndex: "partition",
key: "partition",
},
{
title: "副本",
dataIndex: "replicas",
key: "replicas",
scopedSlots: { customRender: "replicas" },
},
];
</script>
<style scoped>
.replica-box {
margin-bottom: 1%;
}
</style>