Compare commits
12 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
20535027bf | ||
|
|
222ba34702 | ||
|
|
39e50a6589 | ||
|
|
e881c58a8f | ||
|
|
34c87997d1 | ||
|
|
4639335a9d | ||
|
|
73fed3face | ||
|
|
1b028fcb4f | ||
|
|
62569c4454 | ||
|
|
a219551802 | ||
|
|
7a98eb479f | ||
|
|
405f272fb7 |
13
README.md
13
README.md
@@ -3,8 +3,8 @@
|
||||
为了开发的省事,没有多语言支持,只支持中文展示。
|
||||
用过rocketmq-console吧,对,前端展示风格跟那个有点类似。
|
||||
## 安装包下载
|
||||
* 点击下载:[kafka-console-ui.tar.gz](http://43.128.31.53/kafka-console-ui.tar.gz) 或 [kafka-console-ui.zip](http://43.128.31.53/kafka-console-ui.zip)
|
||||
* 参考下面的打包部署,下载源码重新打包
|
||||
* 点击下载:[kafka-console-ui.tar.gz](https://github.com/xxd763795151/kafka-console-ui/releases/download/v1.0.0/kafka-console-ui.tar.gz) 或 [kafka-console-ui.zip](https://github.com/xxd763795151/kafka-console-ui/releases/download/v1.0.0/kafka-console-ui.zip)
|
||||
* 参考下面的打包部署,下载源码重新打包(最新功能特性)
|
||||
## 功能支持
|
||||
* 集群信息
|
||||
* Topic管理
|
||||
@@ -76,4 +76,11 @@ sh bin/shutdown.sh
|
||||
## 前端
|
||||
前端代码在工程的ui目录下,找个前端开发的ide打开进行开发即可。
|
||||
## 注意
|
||||
前后分离,直接启动后端如果未编译前端代码是没有前端页面的,可以先打包进行编译`sh package.sh`,然后再用idea启动,或者前端部分单独启动
|
||||
前后分离,直接启动后端如果未编译前端代码是没有前端页面的,可以先打包进行编译`sh package.sh`,然后再用idea启动,或者前端部分单独启动
|
||||
# 页面示例
|
||||
如果未启用ACL配置,不会显示ACL的菜单页面,所以导航栏上没有Acl这一项
|
||||
|
||||

|
||||

|
||||

|
||||

|
||||
BIN
document/Topic.png
Normal file
BIN
document/Topic.png
Normal file
Binary file not shown.
|
After Width: | Height: | Size: 42 KiB |
Binary file not shown.
|
Before Width: | Height: | Size: 354 KiB After Width: | Height: | Size: 204 KiB |
BIN
document/消费组.png
Normal file
BIN
document/消费组.png
Normal file
Binary file not shown.
|
After Width: | Height: | Size: 53 KiB |
BIN
document/运维.png
Normal file
BIN
document/运维.png
Normal file
Binary file not shown.
|
After Width: | Height: | Size: 126 KiB |
BIN
document/集群.png
Normal file
BIN
document/集群.png
Normal file
Binary file not shown.
|
After Width: | Height: | Size: 30 KiB |
2
pom.xml
2
pom.xml
@@ -10,7 +10,7 @@
|
||||
</parent>
|
||||
<groupId>com.xuxd</groupId>
|
||||
<artifactId>kafka-console-ui</artifactId>
|
||||
<version>1.0.0</version>
|
||||
<version>1.0.1</version>
|
||||
<name>kafka-console-ui</name>
|
||||
<description>Kafka console manage ui</description>
|
||||
<properties>
|
||||
|
||||
@@ -0,0 +1,29 @@
|
||||
package com.xuxd.kafka.console.beans;
|
||||
|
||||
import java.util.List;
|
||||
import lombok.Data;
|
||||
|
||||
/**
|
||||
* kafka-console-ui.
|
||||
*
|
||||
* @author xuxd
|
||||
* @date 2021-11-19 17:07:50
|
||||
**/
|
||||
@Data
|
||||
public class ReplicaAssignment {
|
||||
|
||||
private long version = 1L;
|
||||
|
||||
private List<Partition> partitions;
|
||||
|
||||
private long interBrokerThrottle = -1;
|
||||
|
||||
@Data
|
||||
static class Partition {
|
||||
private String topic;
|
||||
|
||||
private int partition;
|
||||
|
||||
private List<Integer> replicas;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,22 @@
|
||||
package com.xuxd.kafka.console.beans.dto;
|
||||
|
||||
import com.xuxd.kafka.console.beans.enums.ThrottleUnit;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import lombok.Data;
|
||||
|
||||
/**
|
||||
* kafka-console-ui.
|
||||
*
|
||||
* @author xuxd
|
||||
* @date 2021-11-24 19:37:10
|
||||
**/
|
||||
@Data
|
||||
public class BrokerThrottleDTO {
|
||||
|
||||
private List<Integer> brokerList = new ArrayList<>();
|
||||
|
||||
private long throttle;
|
||||
|
||||
private ThrottleUnit unit;
|
||||
}
|
||||
@@ -0,0 +1,21 @@
|
||||
package com.xuxd.kafka.console.beans.dto;
|
||||
|
||||
import com.xuxd.kafka.console.beans.enums.TopicThrottleSwitch;
|
||||
import java.util.List;
|
||||
import lombok.Data;
|
||||
|
||||
/**
|
||||
* kafka-console-ui.
|
||||
*
|
||||
* @author xuxd
|
||||
* @date 2021-11-26 15:33:37
|
||||
**/
|
||||
@Data
|
||||
public class TopicThrottleDTO {
|
||||
|
||||
private String topic;
|
||||
|
||||
private List<Integer> partitions;
|
||||
|
||||
private TopicThrottleSwitch operation;
|
||||
}
|
||||
@@ -0,0 +1,18 @@
|
||||
package com.xuxd.kafka.console.beans.enums;
|
||||
|
||||
/**
|
||||
* kafka-console-ui.
|
||||
*
|
||||
* @author xuxd
|
||||
* @date 2021-11-24 19:38:00
|
||||
**/
|
||||
public enum ThrottleUnit {
|
||||
KB, MB;
|
||||
|
||||
public long toKb(long size) {
|
||||
if (this == MB) {
|
||||
return 1024 * size;
|
||||
}
|
||||
return size;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,11 @@
|
||||
package com.xuxd.kafka.console.beans.enums;
|
||||
|
||||
/**
|
||||
* kafka-console-ui.
|
||||
*
|
||||
* @author xuxd
|
||||
* @date 2021-11-26 15:33:07
|
||||
**/
|
||||
public enum TopicThrottleSwitch {
|
||||
ON,OFF;
|
||||
}
|
||||
@@ -0,0 +1,25 @@
|
||||
package com.xuxd.kafka.console.beans.vo;
|
||||
|
||||
import com.xuxd.kafka.console.beans.TopicPartition;
|
||||
import java.util.List;
|
||||
import lombok.Data;
|
||||
|
||||
/**
|
||||
* kafka-console-ui.
|
||||
*
|
||||
* @author xuxd
|
||||
* @date 2021-11-30 16:03:41
|
||||
**/
|
||||
@Data
|
||||
public class CurrentReassignmentVO {
|
||||
|
||||
private final String topic;
|
||||
|
||||
private final int partition;
|
||||
|
||||
private final List<Integer> replicas;
|
||||
|
||||
private final List<Integer> addingReplicas;
|
||||
|
||||
private final List<Integer> removingReplicas;
|
||||
}
|
||||
@@ -33,8 +33,8 @@ public class TopicPartitionVO {
|
||||
TopicPartitionVO partitionVO = new TopicPartitionVO();
|
||||
partitionVO.setPartition(partitionInfo.partition());
|
||||
partitionVO.setLeader(partitionInfo.leader().toString());
|
||||
partitionVO.setReplicas(partitionInfo.replicas().stream().map(Node::toString).collect(Collectors.toList()));
|
||||
partitionVO.setIsr(partitionInfo.isr().stream().map(Node::toString).collect(Collectors.toList()));
|
||||
partitionVO.setReplicas(partitionInfo.replicas().stream().map(node -> node.host() + ":" + node.port() + " (id: " + node.idString() + ")").collect(Collectors.toList()));
|
||||
partitionVO.setIsr(partitionInfo.isr().stream().map(Node::idString).collect(Collectors.toList()));
|
||||
return partitionVO;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,5 +1,7 @@
|
||||
package com.xuxd.kafka.console.controller;
|
||||
|
||||
import com.xuxd.kafka.console.beans.TopicPartition;
|
||||
import com.xuxd.kafka.console.beans.dto.BrokerThrottleDTO;
|
||||
import com.xuxd.kafka.console.beans.dto.ReplicationDTO;
|
||||
import com.xuxd.kafka.console.beans.dto.SyncDataDTO;
|
||||
import com.xuxd.kafka.console.service.OperationService;
|
||||
@@ -52,4 +54,24 @@ public class OperationController {
|
||||
public Object electPreferredLeader(@RequestBody ReplicationDTO dto) {
|
||||
return operationService.electPreferredLeader(dto.getTopic(), dto.getPartition());
|
||||
}
|
||||
|
||||
@PostMapping("/broker/throttle")
|
||||
public Object configThrottle(@RequestBody BrokerThrottleDTO dto) {
|
||||
return operationService.configThrottle(dto.getBrokerList(), dto.getUnit().toKb(dto.getThrottle()));
|
||||
}
|
||||
|
||||
@DeleteMapping("/broker/throttle")
|
||||
public Object removeThrottle(@RequestBody BrokerThrottleDTO dto) {
|
||||
return operationService.removeThrottle(dto.getBrokerList());
|
||||
}
|
||||
|
||||
@GetMapping("/replication/reassignments")
|
||||
public Object currentReassignments() {
|
||||
return operationService.currentReassignments();
|
||||
}
|
||||
|
||||
@DeleteMapping("/replication/reassignments")
|
||||
public Object cancelReassignment(@RequestBody TopicPartition partition) {
|
||||
return operationService.cancelReassignment(new org.apache.kafka.common.TopicPartition(partition.getTopic(), partition.getPartition()));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,7 +1,9 @@
|
||||
package com.xuxd.kafka.console.controller;
|
||||
|
||||
import com.xuxd.kafka.console.beans.ReplicaAssignment;
|
||||
import com.xuxd.kafka.console.beans.dto.AddPartitionDTO;
|
||||
import com.xuxd.kafka.console.beans.dto.NewTopicDTO;
|
||||
import com.xuxd.kafka.console.beans.dto.TopicThrottleDTO;
|
||||
import com.xuxd.kafka.console.beans.enums.TopicType;
|
||||
import com.xuxd.kafka.console.service.TopicService;
|
||||
import java.util.ArrayList;
|
||||
@@ -71,4 +73,24 @@ public class TopicController {
|
||||
|
||||
return topicService.addPartitions(topic, addNum, assignment);
|
||||
}
|
||||
|
||||
@GetMapping("/replica/assignment")
|
||||
public Object getCurrentReplicaAssignment(@RequestParam String topic) {
|
||||
return topicService.getCurrentReplicaAssignment(topic);
|
||||
}
|
||||
|
||||
@PostMapping("/replica/assignment")
|
||||
public Object updateReplicaAssignment(@RequestBody ReplicaAssignment assignment) {
|
||||
return topicService.updateReplicaAssignment(assignment);
|
||||
}
|
||||
|
||||
@PostMapping("/replica/throttle")
|
||||
public Object configThrottle(@RequestBody TopicThrottleDTO dto) {
|
||||
return topicService.configThrottle(dto.getTopic(), dto.getPartitions(), dto.getOperation());
|
||||
}
|
||||
|
||||
@GetMapping("/send/stats")
|
||||
public Object sendStats(@RequestParam String topic) {
|
||||
return topicService.sendStats(topic);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,7 +1,9 @@
|
||||
package com.xuxd.kafka.console.service;
|
||||
|
||||
import com.xuxd.kafka.console.beans.ResponseData;
|
||||
import java.util.List;
|
||||
import java.util.Properties;
|
||||
import org.apache.kafka.common.TopicPartition;
|
||||
|
||||
/**
|
||||
* kafka-console-ui.
|
||||
@@ -20,4 +22,12 @@ public interface OperationService {
|
||||
ResponseData deleteAlignmentById(Long id);
|
||||
|
||||
ResponseData electPreferredLeader(String topic, int partition);
|
||||
|
||||
ResponseData configThrottle(List<Integer> brokerList, long size);
|
||||
|
||||
ResponseData removeThrottle(List<Integer> brokerList);
|
||||
|
||||
ResponseData currentReassignments();
|
||||
|
||||
ResponseData cancelReassignment(TopicPartition partition);
|
||||
}
|
||||
|
||||
@@ -1,10 +1,10 @@
|
||||
package com.xuxd.kafka.console.service;
|
||||
|
||||
import com.xuxd.kafka.console.beans.ReplicaAssignment;
|
||||
import com.xuxd.kafka.console.beans.ResponseData;
|
||||
import com.xuxd.kafka.console.beans.enums.TopicThrottleSwitch;
|
||||
import com.xuxd.kafka.console.beans.enums.TopicType;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Properties;
|
||||
import org.apache.kafka.clients.admin.NewTopic;
|
||||
|
||||
/**
|
||||
@@ -26,4 +26,12 @@ public interface TopicService {
|
||||
ResponseData createTopic(NewTopic topic);
|
||||
|
||||
ResponseData addPartitions(String topic, int addNum, List<List<Integer>> newAssignmentst);
|
||||
|
||||
ResponseData getCurrentReplicaAssignment(String topic);
|
||||
|
||||
ResponseData updateReplicaAssignment(ReplicaAssignment assignment);
|
||||
|
||||
ResponseData configThrottle(String topic, List<Integer> partitions, TopicThrottleSwitch throttleSwitch);
|
||||
|
||||
ResponseData sendStats(String topic);
|
||||
}
|
||||
|
||||
@@ -142,7 +142,7 @@ public class ConsumerServiceImpl implements ConsumerService {
|
||||
@Override public ResponseData resetOffsetByDate(String groupId, String topic, String dateStr) {
|
||||
long timestamp = -1L;
|
||||
try {
|
||||
StringBuilder sb = new StringBuilder(dateStr.replace(" ", "T")).append(".000");
|
||||
StringBuilder sb = new StringBuilder(dateStr.replace(" ", "T")).append(".000+08:00");//固定为utc+08:00东8区来计算
|
||||
timestamp = Utils.getDateTime(sb.toString());
|
||||
} catch (ParseException e) {
|
||||
throw new IllegalArgumentException(e);
|
||||
|
||||
@@ -5,16 +5,21 @@ import com.google.gson.Gson;
|
||||
import com.google.gson.JsonObject;
|
||||
import com.xuxd.kafka.console.beans.ResponseData;
|
||||
import com.xuxd.kafka.console.beans.dos.MinOffsetAlignmentDO;
|
||||
import com.xuxd.kafka.console.beans.vo.CurrentReassignmentVO;
|
||||
import com.xuxd.kafka.console.beans.vo.OffsetAlignmentVO;
|
||||
import com.xuxd.kafka.console.dao.MinOffsetAlignmentMapper;
|
||||
import com.xuxd.kafka.console.service.OperationService;
|
||||
import com.xuxd.kafka.console.utils.GsonUtil;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Properties;
|
||||
import java.util.Set;
|
||||
import java.util.stream.Collectors;
|
||||
import kafka.console.OperationConsole;
|
||||
import org.apache.kafka.clients.admin.PartitionReassignment;
|
||||
import org.apache.kafka.common.TopicPartition;
|
||||
import org.springframework.beans.factory.ObjectProvider;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
@@ -30,7 +35,7 @@ import scala.Tuple2;
|
||||
@Service
|
||||
public class OperationServiceImpl implements OperationService {
|
||||
|
||||
private Gson gson = new Gson();
|
||||
private Gson gson = GsonUtil.INSTANCE.get();
|
||||
|
||||
@Autowired
|
||||
private OperationConsole operationConsole;
|
||||
@@ -122,4 +127,39 @@ public class OperationServiceImpl implements OperationService {
|
||||
|
||||
return (boolean) tuple2._1() ? ResponseData.create().success() : ResponseData.create().failed(tuple2._2());
|
||||
}
|
||||
|
||||
@Override public ResponseData configThrottle(List<Integer> brokerList, long size) {
|
||||
Tuple2<Object, String> tuple2 = operationConsole.modifyInterBrokerThrottle(new HashSet<>(brokerList), size);
|
||||
|
||||
return (boolean) tuple2._1() ? ResponseData.create().success() : ResponseData.create().failed(tuple2._2());
|
||||
}
|
||||
|
||||
@Override public ResponseData removeThrottle(List<Integer> brokerList) {
|
||||
Tuple2<Object, String> tuple2 = operationConsole.clearBrokerLevelThrottles(new HashSet<>(brokerList));
|
||||
|
||||
return (boolean) tuple2._1() ? ResponseData.create().success() : ResponseData.create().failed(tuple2._2());
|
||||
}
|
||||
|
||||
@Override public ResponseData currentReassignments() {
|
||||
Map<TopicPartition, PartitionReassignment> reassignmentMap = operationConsole.currentReassignments();
|
||||
List<CurrentReassignmentVO> vos = reassignmentMap.entrySet().stream().map(entry -> {
|
||||
TopicPartition partition = entry.getKey();
|
||||
PartitionReassignment reassignment = entry.getValue();
|
||||
return new CurrentReassignmentVO(partition.topic(),
|
||||
partition.partition(), reassignment.replicas(), reassignment.addingReplicas(), reassignment.removingReplicas());
|
||||
}).collect(Collectors.toList());
|
||||
return ResponseData.create().data(vos).success();
|
||||
}
|
||||
|
||||
@Override public ResponseData cancelReassignment(TopicPartition partition) {
|
||||
Map<TopicPartition, Throwable> res = operationConsole.cancelPartitionReassignments(Collections.singleton(partition));
|
||||
if (!res.isEmpty()) {
|
||||
StringBuilder sb = new StringBuilder("Failed: ");
|
||||
res.forEach((p, t) -> {
|
||||
sb.append(p.toString()).append(": ").append(t.getMessage()).append(System.lineSeparator());
|
||||
});
|
||||
return ResponseData.create().failed(sb.toString());
|
||||
}
|
||||
return ResponseData.create().success();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,10 +1,15 @@
|
||||
package com.xuxd.kafka.console.service.impl;
|
||||
|
||||
import com.google.gson.Gson;
|
||||
import com.xuxd.kafka.console.beans.ReplicaAssignment;
|
||||
import com.xuxd.kafka.console.beans.ResponseData;
|
||||
import com.xuxd.kafka.console.beans.enums.TopicThrottleSwitch;
|
||||
import com.xuxd.kafka.console.beans.enums.TopicType;
|
||||
import com.xuxd.kafka.console.beans.vo.TopicDescriptionVO;
|
||||
import com.xuxd.kafka.console.beans.vo.TopicPartitionVO;
|
||||
import com.xuxd.kafka.console.service.TopicService;
|
||||
import com.xuxd.kafka.console.utils.GsonUtil;
|
||||
import java.util.Calendar;
|
||||
import java.util.Collections;
|
||||
import java.util.Comparator;
|
||||
import java.util.HashMap;
|
||||
@@ -12,6 +17,7 @@ import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.stream.Collectors;
|
||||
import kafka.console.TopicConsole;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
@@ -38,6 +44,8 @@ public class TopicServiceImpl implements TopicService {
|
||||
@Autowired
|
||||
private TopicConsole topicConsole;
|
||||
|
||||
private Gson gson = GsonUtil.INSTANCE.get();
|
||||
|
||||
@Override public ResponseData getTopicNameList(boolean internal) {
|
||||
return ResponseData.create().data(topicConsole.getTopicNameList(internal)).success();
|
||||
}
|
||||
@@ -130,4 +138,92 @@ public class TopicServiceImpl implements TopicService {
|
||||
|
||||
return success ? ResponseData.create().success() : ResponseData.create().failed(tuple2._2());
|
||||
}
|
||||
|
||||
@Override public ResponseData getCurrentReplicaAssignment(String topic) {
|
||||
Tuple2<Object, String> tuple2 = topicConsole.getCurrentReplicaAssignmentJson(topic);
|
||||
boolean success = (boolean) tuple2._1();
|
||||
|
||||
return success ? ResponseData.create().data(gson.fromJson(tuple2._2(), ReplicaAssignment.class)).success() : ResponseData.create().failed(tuple2._2());
|
||||
}
|
||||
|
||||
@Override public ResponseData updateReplicaAssignment(ReplicaAssignment assignment) {
|
||||
Tuple2<Object, String> tuple2 = topicConsole.updateReplicas(gson.toJson(assignment), assignment.getInterBrokerThrottle());
|
||||
boolean success = (boolean) tuple2._1();
|
||||
return success ? ResponseData.create().success() : ResponseData.create().failed(tuple2._2());
|
||||
}
|
||||
|
||||
@Override
|
||||
public ResponseData configThrottle(String topic, List<Integer> partitions, TopicThrottleSwitch throttleSwitch) {
|
||||
Tuple2<Object, String> tuple2 = null;
|
||||
switch (throttleSwitch) {
|
||||
case ON:
|
||||
tuple2 = topicConsole.configThrottle(topic, partitions);
|
||||
break;
|
||||
case OFF:
|
||||
tuple2 = topicConsole.clearThrottle(topic);
|
||||
break;
|
||||
default:
|
||||
throw new IllegalArgumentException("switch is unknown.");
|
||||
}
|
||||
boolean success = (boolean) tuple2._1();
|
||||
return success ? ResponseData.create().success() : ResponseData.create().failed(tuple2._2());
|
||||
}
|
||||
|
||||
@Override public ResponseData sendStats(String topic) {
|
||||
Calendar calendar = Calendar.getInstance();
|
||||
long current = calendar.getTimeInMillis();
|
||||
|
||||
calendar.set(Calendar.HOUR_OF_DAY, 0);
|
||||
calendar.set(Calendar.MINUTE, 0);
|
||||
calendar.set(Calendar.SECOND, 0);
|
||||
calendar.set(Calendar.MILLISECOND, 0);
|
||||
long today = calendar.getTimeInMillis();
|
||||
|
||||
calendar.add(Calendar.DAY_OF_MONTH, -1);
|
||||
long yesterday = calendar.getTimeInMillis();
|
||||
|
||||
Map<TopicPartition, Long> currentOffset = topicConsole.getOffsetForTimestamp(topic, current);
|
||||
Map<TopicPartition, Long> todayOffset = topicConsole.getOffsetForTimestamp(topic, today);
|
||||
Map<TopicPartition, Long> yesterdayOffset = topicConsole.getOffsetForTimestamp(topic, yesterday);
|
||||
|
||||
Map<String, Object> res = new HashMap<>();
|
||||
|
||||
// 昨天的消息数是今天减去昨天的
|
||||
AtomicLong yesterdayTotal = new AtomicLong(0L), todayTotal = new AtomicLong(0L);
|
||||
Map<Integer, Long> yesterdayDetail = new HashMap<>(), todayDetail = new HashMap<>();
|
||||
todayOffset.forEach(((partition, aLong) -> {
|
||||
Long last = yesterdayOffset.get(partition);
|
||||
long diff = last == null ? aLong : aLong - last;
|
||||
yesterdayDetail.put(partition.partition(), diff);
|
||||
yesterdayTotal.addAndGet(diff);
|
||||
}));
|
||||
currentOffset.forEach(((partition, aLong) -> {
|
||||
Long last = todayOffset.get(partition);
|
||||
long diff = last == null ? aLong : aLong - last;
|
||||
todayDetail.put(partition.partition(), diff);
|
||||
todayTotal.addAndGet(diff);
|
||||
}));
|
||||
|
||||
Map<String, Object> yes = new HashMap<>(), to = new HashMap<>();
|
||||
yes.put("detail", convertList(yesterdayDetail));
|
||||
yes.put("total", yesterdayTotal.get());
|
||||
to.put("detail", convertList(todayDetail));
|
||||
to.put("total", todayTotal.get());
|
||||
|
||||
res.put("yesterday", yes);
|
||||
res.put("today", to);
|
||||
// 今天的消息数是现在减去今天0时的
|
||||
return ResponseData.create().data(res).success();
|
||||
}
|
||||
|
||||
private List<Map<String, Object>> convertList(Map<Integer, Long> source) {
|
||||
List<Map<String, Object>> collect = source.entrySet().stream().map(entry -> {
|
||||
Map<String, Object> map = new HashMap<>(3, 1.0f);
|
||||
map.put("partition", entry.getKey());
|
||||
map.put("num", entry.getValue());
|
||||
return map;
|
||||
}).collect(Collectors.toList());
|
||||
|
||||
return collect;
|
||||
}
|
||||
}
|
||||
|
||||
19
src/main/java/com/xuxd/kafka/console/utils/GsonUtil.java
Normal file
19
src/main/java/com/xuxd/kafka/console/utils/GsonUtil.java
Normal file
@@ -0,0 +1,19 @@
|
||||
package com.xuxd.kafka.console.utils;
|
||||
|
||||
import com.google.gson.Gson;
|
||||
|
||||
/**
|
||||
* kafka-console-ui.
|
||||
*
|
||||
* @author xuxd
|
||||
* @date 2021-11-19 17:01:01
|
||||
**/
|
||||
public enum GsonUtil {
|
||||
INSTANCE;
|
||||
|
||||
private Gson gson = new Gson();
|
||||
|
||||
public Gson get() {
|
||||
return gson;
|
||||
}
|
||||
}
|
||||
@@ -1,15 +1,20 @@
|
||||
package kafka.console
|
||||
|
||||
import java.util.Properties
|
||||
|
||||
import com.xuxd.kafka.console.config.KafkaConfig
|
||||
import kafka.zk.{AdminZkClient, KafkaZkClient}
|
||||
import org.apache.kafka.clients.CommonClientConfigs
|
||||
import org.apache.kafka.clients.admin.{AbstractOptions, Admin, AdminClientConfig}
|
||||
import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}
|
||||
import org.apache.kafka.clients.admin._
|
||||
import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer, OffsetAndMetadata}
|
||||
import org.apache.kafka.common.TopicPartition
|
||||
import org.apache.kafka.common.config.SaslConfigs
|
||||
import org.apache.kafka.common.requests.ListOffsetsResponse
|
||||
import org.apache.kafka.common.serialization.ByteArrayDeserializer
|
||||
import org.apache.kafka.common.utils.Time
|
||||
import org.slf4j.{Logger, LoggerFactory}
|
||||
|
||||
import java.util.Properties
|
||||
import scala.collection.{Map, Seq}
|
||||
import scala.jdk.CollectionConverters.{MapHasAsJava, MapHasAsScala}
|
||||
|
||||
/**
|
||||
* kafka-console-ui.
|
||||
@@ -89,3 +94,57 @@ class KafkaConsole(config: KafkaConfig) {
|
||||
props
|
||||
}
|
||||
}
|
||||
|
||||
object KafkaConsole {
|
||||
val log: Logger = LoggerFactory.getLogger(this.getClass)
|
||||
|
||||
def getCommittedOffsets(admin: Admin, groupId: String,
|
||||
timeoutMs: Integer): Map[TopicPartition, OffsetAndMetadata] = {
|
||||
admin.listConsumerGroupOffsets(
|
||||
groupId, new ListConsumerGroupOffsetsOptions().timeoutMs(timeoutMs)
|
||||
).partitionsToOffsetAndMetadata.get.asScala
|
||||
}
|
||||
|
||||
def getLogTimestampOffsets(admin: Admin, topicPartitions: Seq[TopicPartition],
|
||||
timestamp: java.lang.Long, timeoutMs: Integer): Map[TopicPartition, OffsetAndMetadata] = {
|
||||
val timestampOffsets = topicPartitions.map { topicPartition =>
|
||||
topicPartition -> OffsetSpec.forTimestamp(timestamp)
|
||||
}.toMap
|
||||
val offsets = admin.listOffsets(
|
||||
timestampOffsets.asJava,
|
||||
new ListOffsetsOptions().timeoutMs(timeoutMs)
|
||||
).all.get
|
||||
val (successfulOffsetsForTimes, unsuccessfulOffsetsForTimes) =
|
||||
offsets.asScala.partition(_._2.offset != ListOffsetsResponse.UNKNOWN_OFFSET)
|
||||
|
||||
val successfulLogTimestampOffsets = successfulOffsetsForTimes.map {
|
||||
case (topicPartition, listOffsetsResultInfo) => topicPartition -> new OffsetAndMetadata(listOffsetsResultInfo.offset)
|
||||
}.toMap
|
||||
|
||||
unsuccessfulOffsetsForTimes.foreach { entry =>
|
||||
log.warn(s"\nWarn: Partition " + entry._1.partition() + " from topic " + entry._1.topic() +
|
||||
" is empty. Falling back to latest known offset.")
|
||||
}
|
||||
|
||||
successfulLogTimestampOffsets ++ getLogEndOffsets(admin, unsuccessfulOffsetsForTimes.keySet.toSeq, timeoutMs)
|
||||
}
|
||||
|
||||
def getLogEndOffsets(admin: Admin,
|
||||
topicPartitions: Seq[TopicPartition], timeoutMs: Integer): Predef.Map[TopicPartition, OffsetAndMetadata] = {
|
||||
val endOffsets = topicPartitions.map { topicPartition =>
|
||||
topicPartition -> OffsetSpec.latest
|
||||
}.toMap
|
||||
val offsets = admin.listOffsets(
|
||||
endOffsets.asJava,
|
||||
new ListOffsetsOptions().timeoutMs(timeoutMs)
|
||||
).all.get
|
||||
val res = topicPartitions.map { topicPartition =>
|
||||
Option(offsets.get(topicPartition)) match {
|
||||
case Some(listOffsetsResultInfo) => topicPartition -> new OffsetAndMetadata(listOffsetsResultInfo.offset)
|
||||
case _ =>
|
||||
throw new IllegalArgumentException
|
||||
}
|
||||
}.toMap
|
||||
res
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,15 +1,15 @@
|
||||
package kafka.console
|
||||
|
||||
import java.util.concurrent.TimeUnit
|
||||
import java.util.{Collections, Properties}
|
||||
|
||||
import com.xuxd.kafka.console.config.KafkaConfig
|
||||
import org.apache.kafka.clients.admin.ElectLeadersOptions
|
||||
import kafka.admin.ReassignPartitionsCommand
|
||||
import org.apache.kafka.clients.admin.{ElectLeadersOptions, ListPartitionReassignmentsOptions, PartitionReassignment}
|
||||
import org.apache.kafka.clients.consumer.KafkaConsumer
|
||||
import org.apache.kafka.common.serialization.ByteArrayDeserializer
|
||||
import org.apache.kafka.common.{ElectionType, TopicPartition}
|
||||
|
||||
import scala.jdk.CollectionConverters.{CollectionHasAsScala, ListHasAsScala, MapHasAsScala, SeqHasAsJava, SetHasAsJava, SetHasAsScala}
|
||||
import java.util.concurrent.TimeUnit
|
||||
import java.util.{Collections, Properties}
|
||||
import scala.jdk.CollectionConverters.{CollectionHasAsScala, ListHasAsScala, MapHasAsJava, MapHasAsScala, SeqHasAsJava, SetHasAsJava, SetHasAsScala}
|
||||
|
||||
/**
|
||||
* kafka-console-ui.
|
||||
@@ -210,4 +210,47 @@ class OperationConsole(config: KafkaConfig, topicConsole: TopicConsole,
|
||||
val topicList = topicConsole.getTopicList(Collections.singleton(topic))
|
||||
topicList.asScala.flatMap(_.partitions().asScala.map(t => new TopicPartition(topic, t.partition()))).toSet.asJava
|
||||
}
|
||||
}
|
||||
|
||||
def modifyInterBrokerThrottle(reassigningBrokers: util.Set[Int],
|
||||
interBrokerThrottle: Long): (Boolean, String) = {
|
||||
withAdminClientAndCatchError(admin => {
|
||||
ReassignPartitionsCommand.modifyInterBrokerThrottle(admin, reassigningBrokers.asScala.toSet, interBrokerThrottle)
|
||||
(true, "")
|
||||
}, e => {
|
||||
log.error("modifyInterBrokerThrottle error.", e)
|
||||
(false, e.getMessage)
|
||||
}).asInstanceOf[(Boolean, String)]
|
||||
}
|
||||
|
||||
def clearBrokerLevelThrottles(brokers: util.Set[Int]): (Boolean, String) = {
|
||||
withAdminClientAndCatchError(admin => {
|
||||
ReassignPartitionsCommand.clearBrokerLevelThrottles(admin, brokers.asScala.toSet)
|
||||
(true, "")
|
||||
}, e => {
|
||||
log.error("clearBrokerLevelThrottles error.", e)
|
||||
(false, e.getMessage)
|
||||
}).asInstanceOf[(Boolean, String)]
|
||||
}
|
||||
|
||||
/**
|
||||
* current reassigning is active.
|
||||
*/
|
||||
def currentReassignments(): util.Map[TopicPartition, PartitionReassignment] = {
|
||||
withAdminClientAndCatchError(admin => {
|
||||
admin.listPartitionReassignments(withTimeoutMs(new ListPartitionReassignmentsOptions)).reassignments().get()
|
||||
}, e => {
|
||||
Collections.emptyMap()
|
||||
log.error("listPartitionReassignments error.", e)
|
||||
}).asInstanceOf[util.Map[TopicPartition, PartitionReassignment]]
|
||||
}
|
||||
|
||||
def cancelPartitionReassignments(reassignments: util.Set[TopicPartition]): util.Map[TopicPartition, Throwable] = {
|
||||
withAdminClientAndCatchError(admin => {
|
||||
val res = ReassignPartitionsCommand.cancelPartitionReassignments(admin, reassignments.asScala.toSet)
|
||||
res.asJava
|
||||
}, e => {
|
||||
log.error("cancelPartitionReassignments error.", e)
|
||||
throw e
|
||||
}).asInstanceOf[util.Map[TopicPartition, Throwable]]
|
||||
}
|
||||
}
|
||||
@@ -1,14 +1,18 @@
|
||||
package kafka.console
|
||||
|
||||
import java.util
|
||||
import java.util.concurrent.TimeUnit
|
||||
import java.util.{Collections, List, Set}
|
||||
|
||||
import com.xuxd.kafka.console.config.KafkaConfig
|
||||
import kafka.admin.ReassignPartitionsCommand._
|
||||
import kafka.utils.Json
|
||||
import org.apache.kafka.clients.admin._
|
||||
import org.apache.kafka.common.TopicPartition
|
||||
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException
|
||||
import org.apache.kafka.common.utils.Time
|
||||
import org.apache.kafka.common.{TopicPartition, TopicPartitionReplica}
|
||||
|
||||
import scala.jdk.CollectionConverters.{CollectionHasAsScala, SetHasAsJava}
|
||||
import java.util
|
||||
import java.util.concurrent.{ExecutionException, TimeUnit}
|
||||
import java.util.{Collections, List, Set}
|
||||
import scala.collection.{Map, Seq}
|
||||
import scala.jdk.CollectionConverters.{CollectionHasAsScala, MapHasAsJava, MapHasAsScala, SeqHasAsJava, SetHasAsJava}
|
||||
|
||||
/**
|
||||
* kafka-console-ui.
|
||||
@@ -121,4 +125,180 @@ class TopicConsole(config: KafkaConfig) extends KafkaConsole(config: KafkaConfig
|
||||
(false, e.getMessage)
|
||||
}).asInstanceOf[(Boolean, String)]
|
||||
}
|
||||
|
||||
def getCurrentReplicaAssignmentJson(topic: String): (Boolean, String) = {
|
||||
withAdminClientAndCatchError(admin => {
|
||||
val json = formatAsReassignmentJson(getReplicaAssignmentForTopics(admin, Seq(topic)), Map.empty)
|
||||
(true, json)
|
||||
}, e => {
|
||||
log.error("getCurrentReplicaAssignmentJson error, ", e)
|
||||
(false, e.getMessage)
|
||||
}).asInstanceOf[(Boolean, String)]
|
||||
}
|
||||
|
||||
def updateReplicas(reassignmentJson: String, interBrokerThrottle: Long = -1L): (Boolean, String) = {
|
||||
withAdminClientAndCatchError(admin => {
|
||||
executeAssignment(admin, reassignmentJson, interBrokerThrottle)
|
||||
(true, "")
|
||||
}, e => {
|
||||
log.error("executeAssignment error, ", e)
|
||||
(false, e.getMessage)
|
||||
}).asInstanceOf[(Boolean, String)]
|
||||
}
|
||||
|
||||
/**
|
||||
* Copy and modify from @{link kafka.admin.ReassignPartitionsCommand#executeAssignment}.
|
||||
*/
|
||||
def executeAssignment(adminClient: Admin,
|
||||
reassignmentJson: String,
|
||||
interBrokerThrottle: Long = -1L,
|
||||
logDirThrottle: Long = -1L,
|
||||
timeoutMs: Long = 30000L,
|
||||
time: Time = Time.SYSTEM): Unit = {
|
||||
val (proposedParts, proposedReplicas) = parseExecuteAssignmentArgs(reassignmentJson)
|
||||
val currentReassignments = adminClient.
|
||||
listPartitionReassignments().reassignments().get().asScala
|
||||
// If there is an existing assignment
|
||||
// This helps avoid surprising users.
|
||||
if (currentReassignments.nonEmpty) {
|
||||
throw new TerseReassignmentFailureException("Cannot execute because there is an existing partition assignment.")
|
||||
}
|
||||
verifyBrokerIds(adminClient, proposedParts.values.flatten.toSet)
|
||||
val currentParts = getReplicaAssignmentForPartitions(adminClient, proposedParts.keySet.toSet)
|
||||
log.info("currentPartitionReplicaAssignment: " + currentPartitionReplicaAssignmentToString(proposedParts, currentParts))
|
||||
log.info(s"newPartitionReplicaAssignment: $reassignmentJson")
|
||||
|
||||
if (interBrokerThrottle >= 0 || logDirThrottle >= 0) {
|
||||
|
||||
if (interBrokerThrottle >= 0) {
|
||||
val moveMap = calculateProposedMoveMap(currentReassignments, proposedParts, currentParts)
|
||||
modifyReassignmentThrottle(adminClient, moveMap)
|
||||
}
|
||||
|
||||
if (logDirThrottle >= 0) {
|
||||
val movingBrokers = calculateMovingBrokers(proposedReplicas.keySet.toSet)
|
||||
modifyLogDirThrottle(adminClient, movingBrokers, logDirThrottle)
|
||||
}
|
||||
}
|
||||
|
||||
// Execute the partition reassignments.
|
||||
val errors = alterPartitionReassignments(adminClient, proposedParts)
|
||||
if (errors.nonEmpty) {
|
||||
throw new TerseReassignmentFailureException(
|
||||
"Error reassigning partition(s):%n%s".format(
|
||||
errors.keySet.toBuffer.sortWith(compareTopicPartitions).map { part =>
|
||||
s"$part: ${errors(part).getMessage}"
|
||||
}.mkString(System.lineSeparator())))
|
||||
}
|
||||
if (proposedReplicas.nonEmpty) {
|
||||
executeMoves(adminClient, proposedReplicas, timeoutMs, time)
|
||||
}
|
||||
}
|
||||
|
||||
def configThrottle(topic: String, partitions: util.List[Integer]): (Boolean, String) = {
|
||||
withAdminClientAndCatchError(admin => {
|
||||
val throttles = {
|
||||
if (partitions.get(0) == -1) {
|
||||
Map(topic -> "*")
|
||||
} else {
|
||||
val topicDescription = admin.describeTopics(Collections.singleton(topic), withTimeoutMs(new DescribeTopicsOptions))
|
||||
.all().get().values().asScala.toList
|
||||
|
||||
def convert(partition: Integer, replicas: scala.List[Int]): String = {
|
||||
replicas.map("%d:%d".format(partition, _)).toSet.mkString(",")
|
||||
}
|
||||
|
||||
val ptor = topicDescription.head.partitions().asScala.map(info => (info.partition(), info.replicas().asScala.map(_.id()))).toMap
|
||||
val conf = partitions.asScala.map(partition => convert(partition, ptor.get(partition) match {
|
||||
case Some(v) => v.toList
|
||||
case None => throw new IllegalArgumentException
|
||||
})).toList
|
||||
Map(topic -> conf.mkString(","))
|
||||
}
|
||||
}
|
||||
modifyTopicThrottles(admin, throttles, throttles)
|
||||
(true, "")
|
||||
}, e => {
|
||||
log.error("configThrottle error, ", e)
|
||||
(false, e.getMessage)
|
||||
}).asInstanceOf[(Boolean, String)]
|
||||
}
|
||||
|
||||
def clearThrottle(topic: String): (Boolean, String) = {
|
||||
withAdminClientAndCatchError(admin => {
|
||||
clearTopicLevelThrottles(admin, Collections.singleton(topic).asScala.toSet)
|
||||
(true, "")
|
||||
}, e => {
|
||||
log.error("clearThrottle error, ", e)
|
||||
(false, e.getMessage)
|
||||
}).asInstanceOf[(Boolean, String)]
|
||||
}
|
||||
|
||||
def getOffsetForTimestamp(topic: String, timestamp: java.lang.Long): util.Map[TopicPartition, java.lang.Long] = {
|
||||
withAdminClientAndCatchError(admin => {
|
||||
val partitions = describeTopics(admin, Collections.singleton(topic)).get(topic) match {
|
||||
case Some(topicDescription: TopicDescription) => topicDescription.partitions()
|
||||
.asScala.map(info => new TopicPartition(topic, info.partition())).toSeq
|
||||
case None => throw new IllegalArgumentException("topic is not exist.")
|
||||
}
|
||||
val offsetMap = KafkaConsole.getLogTimestampOffsets(admin, partitions, timestamp, timeoutMs)
|
||||
offsetMap.map(tuple2 => (tuple2._1, tuple2._2.offset())).toMap.asJava
|
||||
}, e => {
|
||||
log.error("clearThrottle error, ", e)
|
||||
Collections.emptyMap()
|
||||
}).asInstanceOf[util.Map[TopicPartition, java.lang.Long]]
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the current replica assignments for some topics.
|
||||
*
|
||||
* @param adminClient The AdminClient to use.
|
||||
* @param topics The topics to get information about.
|
||||
* @return A map from partitions to broker assignments.
|
||||
* If any topic can't be found, an exception will be thrown.
|
||||
*/
|
||||
private def getReplicaAssignmentForTopics(adminClient: Admin,
|
||||
topics: Seq[String])
|
||||
: Map[TopicPartition, Seq[Int]] = {
|
||||
describeTopics(adminClient, topics.toSet.asJava).flatMap {
|
||||
case (topicName, topicDescription) => topicDescription.partitions.asScala.map { info =>
|
||||
(new TopicPartition(topicName, info.partition), info.replicas.asScala.map(_.id).toSeq)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private def formatAsReassignmentJson(partitionsToBeReassigned: Map[TopicPartition, Seq[Int]],
|
||||
replicaLogDirAssignment: Map[TopicPartitionReplica, String]): String = {
|
||||
Json.encodeAsString(Map(
|
||||
"version" -> 1,
|
||||
"partitions" -> partitionsToBeReassigned.keySet.toBuffer.sortWith(compareTopicPartitions).map {
|
||||
tp =>
|
||||
val replicas = partitionsToBeReassigned(tp)
|
||||
Map(
|
||||
"topic" -> tp.topic,
|
||||
"partition" -> tp.partition,
|
||||
"replicas" -> replicas.asJava
|
||||
).asJava
|
||||
}.asJava
|
||||
).asJava)
|
||||
}
|
||||
|
||||
private def describeTopics(adminClient: Admin,
|
||||
topics: Set[String])
|
||||
: Map[String, TopicDescription] = {
|
||||
adminClient.describeTopics(topics).values.asScala.map { case (topicName, topicDescriptionFuture) =>
|
||||
try topicName -> topicDescriptionFuture.get
|
||||
catch {
|
||||
case t: ExecutionException if t.getCause.isInstanceOf[UnknownTopicOrPartitionException] =>
|
||||
throw new ExecutionException(
|
||||
new UnknownTopicOrPartitionException(s"Topic $topicName not found."))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private def modifyReassignmentThrottle(admin: Admin, moveMap: MoveMap): Unit = {
|
||||
val leaderThrottles = calculateLeaderThrottles(moveMap)
|
||||
val followerThrottles = calculateFollowerThrottles(moveMap)
|
||||
modifyTopicThrottles(admin, leaderThrottles, followerThrottles)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -117,6 +117,22 @@ export const KafkaTopicApi = {
|
||||
url: "/topic/partition/new",
|
||||
method: "post",
|
||||
},
|
||||
getCurrentReplicaAssignment: {
|
||||
url: "/topic/replica/assignment",
|
||||
method: "get",
|
||||
},
|
||||
updateReplicaAssignment: {
|
||||
url: "/topic/replica/assignment",
|
||||
method: "post",
|
||||
},
|
||||
configThrottle: {
|
||||
url: "/topic/replica/throttle",
|
||||
method: "post",
|
||||
},
|
||||
sendStats: {
|
||||
url: "/topic/send/stats",
|
||||
method: "get",
|
||||
},
|
||||
};
|
||||
|
||||
export const KafkaConsumerApi = {
|
||||
@@ -190,4 +206,20 @@ export const KafkaOpApi = {
|
||||
url: "/op/replication/preferred",
|
||||
method: "post",
|
||||
},
|
||||
configThrottle: {
|
||||
url: "/op/broker/throttle",
|
||||
method: "post",
|
||||
},
|
||||
removeThrottle: {
|
||||
url: "/op/broker/throttle",
|
||||
method: "delete",
|
||||
},
|
||||
currentReassignments: {
|
||||
url: "/op/replication/reassignments",
|
||||
method: "get",
|
||||
},
|
||||
cancelReassignment: {
|
||||
url: "/op/replication/reassignments",
|
||||
method: "delete",
|
||||
},
|
||||
};
|
||||
|
||||
@@ -47,6 +47,15 @@
|
||||
@click="openResetOffsetByTimeDialog(k)"
|
||||
>时间戳
|
||||
</a-button>
|
||||
<a-button
|
||||
type="primary"
|
||||
icon="reload"
|
||||
size="small"
|
||||
style="float: right"
|
||||
@click="getConsumerDetail"
|
||||
>
|
||||
刷新
|
||||
</a-button>
|
||||
<hr />
|
||||
<a-table
|
||||
:columns="columns"
|
||||
@@ -70,6 +79,11 @@
|
||||
</a-button>
|
||||
</div>
|
||||
</a-table>
|
||||
<p>
|
||||
<strong style="color: red"
|
||||
>注意:重置位点时,要求当前没有正在运行的消费端,否则重置的时候会报错,返回失败信息</strong
|
||||
>
|
||||
</p>
|
||||
</div>
|
||||
|
||||
<a-modal
|
||||
|
||||
@@ -32,6 +32,10 @@
|
||||
/>
|
||||
</a-form-item>
|
||||
</a-form>
|
||||
<hr />
|
||||
<p>
|
||||
*注意:该时间为北京时间。这里固定为东8区的计算时间,如果所在地区不是采用北京时间(中国大部分地区都是采用的北京时间),请自行对照为当地时间重置。
|
||||
</p>
|
||||
</a-spin>
|
||||
</div>
|
||||
</a-modal>
|
||||
|
||||
157
ui/src/views/op/ConfigThrottle.vue
Normal file
157
ui/src/views/op/ConfigThrottle.vue
Normal file
@@ -0,0 +1,157 @@
|
||||
<template>
|
||||
<a-modal
|
||||
title="限流配置"
|
||||
:visible="show"
|
||||
:width="1000"
|
||||
:mask="false"
|
||||
:maskClosable="false"
|
||||
okText="确认"
|
||||
cancelText="取消"
|
||||
:destroyOnClose="true"
|
||||
@cancel="handleCancel"
|
||||
@ok="ok"
|
||||
>
|
||||
<div>
|
||||
<a-spin :spinning="loading">
|
||||
<a-form
|
||||
:form="form"
|
||||
:label-col="{ span: 5 }"
|
||||
:wrapper-col="{ span: 12 }"
|
||||
>
|
||||
<a-form-item label="Broker">
|
||||
<a-select
|
||||
mode="multiple"
|
||||
option-filter-prop="children"
|
||||
v-decorator="[
|
||||
'brokerList',
|
||||
{
|
||||
initialValue: brokers,
|
||||
rules: [{ required: true, message: '请选择一个broker!' }],
|
||||
},
|
||||
]"
|
||||
placeholder="请选择一个broker"
|
||||
>
|
||||
<a-select-option v-for="v in brokers" :key="v" :value="v">
|
||||
<span v-if="v == -1">全部</span> <span v-else>{{ v }}</span>
|
||||
</a-select-option>
|
||||
</a-select>
|
||||
</a-form-item>
|
||||
<a-form-item label="带宽">
|
||||
<a-input-number
|
||||
:min="1"
|
||||
:max="1024"
|
||||
v-decorator="[
|
||||
'throttle',
|
||||
{
|
||||
initialValue: 1,
|
||||
rules: [{ required: true, message: '输入带宽!' }],
|
||||
},
|
||||
]"
|
||||
/>
|
||||
<a-select default-value="MB" v-model="unit" style="width: 100px">
|
||||
<a-select-option value="MB"> MB/s </a-select-option>
|
||||
<a-select-option value="KB"> KB/s </a-select-option>
|
||||
</a-select>
|
||||
</a-form-item>
|
||||
</a-form>
|
||||
<hr />
|
||||
<div><h4>注意:</h4></div>
|
||||
<ul>
|
||||
<li>该限速带宽,指的是broker之间副本进行同步时占用的带宽</li>
|
||||
<li>该配置是broker级别配置,是针对broker上topic的副本</li>
|
||||
<li>
|
||||
在当前页面对指定broker限流配置后,并不是说设置后该broker上的所有topic副本同步就被限制为当前流速了。这仅仅是速率设置,如果需要对某topic的副本同步进行限流,还需要去
|
||||
Topic->限流 处操作,只有进行限流操作的topic,该限速才会对其生效
|
||||
</li>
|
||||
<li>
|
||||
上面这句话的意思就是,这里只配置topic副本同步的速率,要使这个配置真正在某个topic上生效,还要开启这个topic的限流
|
||||
</li>
|
||||
</ul>
|
||||
<h4>如何检查限流配置是否成功:</h4>
|
||||
kafka的限流速率是通过下面这两项配置的:
|
||||
<ul>
|
||||
<li>leader.replication.throttled.rate</li>
|
||||
<li>follower.replication.throttled.rate</li>
|
||||
</ul>
|
||||
只需通过
|
||||
<strong>集群->属性配置</strong>
|
||||
查看是否存在这两项配置,如果存在便是配置的有限流,值的大小就是速率,单位:kb/s
|
||||
</a-spin>
|
||||
</div>
|
||||
</a-modal>
|
||||
</template>
|
||||
|
||||
<script>
|
||||
import request from "@/utils/request";
|
||||
import { KafkaClusterApi, KafkaOpApi } from "@/utils/api";
|
||||
import notification from "ant-design-vue/lib/notification";
|
||||
|
||||
export default {
|
||||
name: "ConfigThrottle",
|
||||
props: {
|
||||
visible: {
|
||||
type: Boolean,
|
||||
default: false,
|
||||
},
|
||||
},
|
||||
data() {
|
||||
return {
|
||||
show: this.visible,
|
||||
loading: false,
|
||||
form: this.$form.createForm(this, { name: "ConfigThrottleForm" }),
|
||||
brokers: [],
|
||||
unit: "MB",
|
||||
};
|
||||
},
|
||||
watch: {
|
||||
visible(v) {
|
||||
this.show = v;
|
||||
if (this.show) {
|
||||
this.getClusterInfo();
|
||||
}
|
||||
},
|
||||
},
|
||||
methods: {
|
||||
handleCancel() {
|
||||
this.$emit("closeConfigThrottleDialog", { refresh: false });
|
||||
},
|
||||
getClusterInfo() {
|
||||
this.loading = true;
|
||||
request({
|
||||
url: KafkaClusterApi.getClusterInfo.url,
|
||||
method: KafkaClusterApi.getClusterInfo.method,
|
||||
}).then((res) => {
|
||||
this.loading = false;
|
||||
this.brokers = [];
|
||||
res.data.nodes.forEach((node) => this.brokers.push(node.id));
|
||||
});
|
||||
},
|
||||
ok() {
|
||||
this.form.validateFields((err, values) => {
|
||||
if (!err) {
|
||||
const data = Object.assign({}, values, { unit: this.unit });
|
||||
this.loading = true;
|
||||
request({
|
||||
url: KafkaOpApi.configThrottle.url,
|
||||
method: KafkaOpApi.configThrottle.method,
|
||||
data: data,
|
||||
}).then((res) => {
|
||||
this.loading = false;
|
||||
if (res.code == 0) {
|
||||
this.$message.success(res.msg);
|
||||
this.$emit("closeConfigThrottleDialog", { refresh: false });
|
||||
} else {
|
||||
notification.error({
|
||||
message: "error",
|
||||
description: res.msg,
|
||||
});
|
||||
}
|
||||
});
|
||||
}
|
||||
});
|
||||
},
|
||||
},
|
||||
};
|
||||
</script>
|
||||
|
||||
<style scoped></style>
|
||||
166
ui/src/views/op/CurrentReassignments.vue
Normal file
166
ui/src/views/op/CurrentReassignments.vue
Normal file
@@ -0,0 +1,166 @@
|
||||
<template>
|
||||
<a-modal
|
||||
title="正在进行副本重分配的分区"
|
||||
:visible="show"
|
||||
:width="1200"
|
||||
:mask="false"
|
||||
:destroyOnClose="true"
|
||||
:footer="null"
|
||||
:maskClosable="false"
|
||||
@cancel="handleCancel"
|
||||
>
|
||||
<div>
|
||||
<a-spin :spinning="loading">
|
||||
<a-table
|
||||
:columns="columns"
|
||||
:data-source="data"
|
||||
bordered
|
||||
:rowKey="(record) => record.topic + record.partition"
|
||||
>
|
||||
<div slot="replicas" slot-scope="text">
|
||||
<span v-for="i in text" :key="i">
|
||||
{{ i }}
|
||||
</span>
|
||||
</div>
|
||||
<div slot="addingReplicas" slot-scope="text">
|
||||
<span v-for="i in text" :key="i">
|
||||
{{ i }}
|
||||
</span>
|
||||
</div>
|
||||
<div slot="removingReplicas" slot-scope="text">
|
||||
<span v-for="i in text" :key="i">
|
||||
{{ i }}
|
||||
</span>
|
||||
</div>
|
||||
<div slot="operation" slot-scope="record">
|
||||
<a-popconfirm
|
||||
title="取消正在进行的副本重分配任务?"
|
||||
ok-text="确认"
|
||||
cancel-text="取消"
|
||||
@confirm="cancelReassignment(record)"
|
||||
>
|
||||
<a-button size="small" href="javascript:;" class="operation-btn"
|
||||
>取消
|
||||
</a-button>
|
||||
</a-popconfirm>
|
||||
</div>
|
||||
</a-table>
|
||||
</a-spin>
|
||||
</div>
|
||||
</a-modal>
|
||||
</template>
|
||||
|
||||
<script>
|
||||
import request from "@/utils/request";
|
||||
import { KafkaOpApi } from "@/utils/api";
|
||||
import notification from "ant-design-vue/es/notification";
|
||||
|
||||
export default {
|
||||
name: "CurrentReassignments",
|
||||
props: {
|
||||
visible: {
|
||||
type: Boolean,
|
||||
default: false,
|
||||
},
|
||||
},
|
||||
data() {
|
||||
return {
|
||||
columns: columns,
|
||||
show: this.visible,
|
||||
data: [],
|
||||
loading: false,
|
||||
};
|
||||
},
|
||||
watch: {
|
||||
visible(v) {
|
||||
this.show = v;
|
||||
if (this.show) {
|
||||
this.currentReassignments();
|
||||
}
|
||||
},
|
||||
},
|
||||
methods: {
|
||||
currentReassignments() {
|
||||
this.loading = true;
|
||||
const api = KafkaOpApi.currentReassignments;
|
||||
request({
|
||||
url: api.url,
|
||||
method: api.method,
|
||||
}).then((res) => {
|
||||
this.loading = false;
|
||||
if (res.code != 0) {
|
||||
notification.error({
|
||||
message: "error",
|
||||
description: res.msg,
|
||||
});
|
||||
} else {
|
||||
this.data = res.data;
|
||||
this.yesterday = this.data.yesterday;
|
||||
this.today = this.data.today;
|
||||
}
|
||||
});
|
||||
},
|
||||
handleCancel() {
|
||||
this.$emit("closeCurrentReassignmentsDialog", {});
|
||||
},
|
||||
cancelReassignment(record) {
|
||||
const param = { topic: record.topic, partition: record.partition };
|
||||
this.loading = true;
|
||||
const api = KafkaOpApi.cancelReassignment;
|
||||
request({
|
||||
url: api.url,
|
||||
method: api.method,
|
||||
data: param,
|
||||
}).then((res) => {
|
||||
this.loading = false;
|
||||
if (res.code != 0) {
|
||||
notification.error({
|
||||
message: "error",
|
||||
description: res.msg,
|
||||
});
|
||||
} else {
|
||||
this.currentReassignments();
|
||||
}
|
||||
});
|
||||
},
|
||||
},
|
||||
};
|
||||
|
||||
const columns = [
|
||||
{
|
||||
title: "Topic",
|
||||
dataIndex: "topic",
|
||||
key: "topic",
|
||||
},
|
||||
{
|
||||
title: "分区",
|
||||
dataIndex: "partition",
|
||||
key: "partition",
|
||||
},
|
||||
{
|
||||
title: "副本",
|
||||
dataIndex: "replicas",
|
||||
key: "replicas",
|
||||
scopedSlots: { customRender: "replicas" },
|
||||
},
|
||||
{
|
||||
title: "正在增加的副本",
|
||||
dataIndex: "addingReplicas",
|
||||
key: "addingReplicas",
|
||||
scopedSlots: { customRender: "addingReplicas" },
|
||||
},
|
||||
{
|
||||
title: "正在移除的副本",
|
||||
dataIndex: "removingReplicas",
|
||||
key: "removingReplicas",
|
||||
scopedSlots: { customRender: "removingReplicas" },
|
||||
},
|
||||
{
|
||||
title: "操作",
|
||||
key: "operation",
|
||||
scopedSlots: { customRender: "operation" },
|
||||
},
|
||||
];
|
||||
</script>
|
||||
|
||||
<style scoped></style>
|
||||
@@ -1,5 +1,25 @@
|
||||
<template>
|
||||
<div class="content">
|
||||
<div class="content-module">
|
||||
<a-card title="Broker管理" style="width: 100%; text-align: left">
|
||||
<p>
|
||||
<a-button type="primary" @click="openConfigThrottleDialog">
|
||||
配置限流
|
||||
</a-button>
|
||||
<label>说明:</label>
|
||||
<span
|
||||
>设置指定broker上的topic的副本之间数据同步占用的带宽,这个设置是broker级别的,但是设置后还要去对应的topic上进行限流配置,指定对这个topic的相关副本进行限制</span
|
||||
>
|
||||
</p>
|
||||
<p>
|
||||
<a-button type="primary" @click="openRemoveThrottleDialog">
|
||||
解除限流
|
||||
</a-button>
|
||||
<label>说明:</label>
|
||||
<span>解除指定broker上的topic副本之间数据同步占用的带宽限制</span>
|
||||
</p>
|
||||
</a-card>
|
||||
</div>
|
||||
<div class="content-module">
|
||||
<a-card title="副本管理" style="width: 100%; text-align: left">
|
||||
<p>
|
||||
@@ -9,6 +29,13 @@
|
||||
<label>说明:</label>
|
||||
<span>将集群中所有分区leader副本设置为首选副本</span>
|
||||
</p>
|
||||
<p>
|
||||
<a-button type="primary" @click="openCurrentReassignmentsDialog">
|
||||
副本变更详情
|
||||
</a-button>
|
||||
<label>说明:</label>
|
||||
<span>查看正在进行副本变更/重分配的任务,或者将其取消</span>
|
||||
</p>
|
||||
</a-card>
|
||||
</div>
|
||||
<div class="content-module">
|
||||
@@ -66,6 +93,20 @@
|
||||
@closeDataSyncSchemeDialog="closeDataSyncSchemeDialog"
|
||||
>
|
||||
</DataSyncScheme>
|
||||
<ConfigThrottle
|
||||
:visible="brokerManager.showConfigThrottleDialog"
|
||||
@closeConfigThrottleDialog="closeConfigThrottleDialog"
|
||||
>
|
||||
</ConfigThrottle>
|
||||
<RemoveThrottle
|
||||
:visible="brokerManager.showRemoveThrottleDialog"
|
||||
@closeRemoveThrottleDialog="closeRemoveThrottleDialog"
|
||||
>
|
||||
</RemoveThrottle>
|
||||
<CurrentReassignments
|
||||
:visible="replicationManager.showCurrentReassignmentsDialog"
|
||||
@closeCurrentReassignmentsDialog="closeCurrentReassignmentsDialog"
|
||||
></CurrentReassignments>
|
||||
</div>
|
||||
</template>
|
||||
|
||||
@@ -75,6 +116,9 @@ import MinOffsetAlignment from "@/views/op/MinOffsetAlignment";
|
||||
import OffsetAlignmentTable from "@/views/op/OffsetAlignmentTable";
|
||||
import ElectPreferredLeader from "@/views/op/ElectPreferredLeader";
|
||||
import DataSyncScheme from "@/views/op/DataSyncScheme";
|
||||
import ConfigThrottle from "@/views/op/ConfigThrottle";
|
||||
import RemoveThrottle from "@/views/op/RemoveThrottle";
|
||||
import CurrentReassignments from "@/views/op/CurrentReassignments";
|
||||
export default {
|
||||
name: "Operation",
|
||||
components: {
|
||||
@@ -83,6 +127,9 @@ export default {
|
||||
OffsetAlignmentTable,
|
||||
ElectPreferredLeader,
|
||||
DataSyncScheme,
|
||||
ConfigThrottle,
|
||||
RemoveThrottle,
|
||||
CurrentReassignments,
|
||||
},
|
||||
data() {
|
||||
return {
|
||||
@@ -94,6 +141,11 @@ export default {
|
||||
},
|
||||
replicationManager: {
|
||||
showElectPreferredLeaderDialog: false,
|
||||
showCurrentReassignmentsDialog: false,
|
||||
},
|
||||
brokerManager: {
|
||||
showConfigThrottleDialog: false,
|
||||
showRemoveThrottleDialog: false,
|
||||
},
|
||||
};
|
||||
},
|
||||
@@ -128,6 +180,24 @@ export default {
|
||||
closeElectPreferredLeaderDialog() {
|
||||
this.replicationManager.showElectPreferredLeaderDialog = false;
|
||||
},
|
||||
openConfigThrottleDialog() {
|
||||
this.brokerManager.showConfigThrottleDialog = true;
|
||||
},
|
||||
closeConfigThrottleDialog() {
|
||||
this.brokerManager.showConfigThrottleDialog = false;
|
||||
},
|
||||
openRemoveThrottleDialog() {
|
||||
this.brokerManager.showRemoveThrottleDialog = true;
|
||||
},
|
||||
closeRemoveThrottleDialog() {
|
||||
this.brokerManager.showRemoveThrottleDialog = false;
|
||||
},
|
||||
openCurrentReassignmentsDialog() {
|
||||
this.replicationManager.showCurrentReassignmentsDialog = true;
|
||||
},
|
||||
closeCurrentReassignmentsDialog() {
|
||||
this.replicationManager.showCurrentReassignmentsDialog = false;
|
||||
},
|
||||
},
|
||||
};
|
||||
</script>
|
||||
|
||||
127
ui/src/views/op/RemoveThrottle.vue
Normal file
127
ui/src/views/op/RemoveThrottle.vue
Normal file
@@ -0,0 +1,127 @@
|
||||
<template>
|
||||
<a-modal
|
||||
title="解除限流"
|
||||
:visible="show"
|
||||
:width="1000"
|
||||
:mask="false"
|
||||
:maskClosable="false"
|
||||
okText="确认"
|
||||
cancelText="取消"
|
||||
:destroyOnClose="true"
|
||||
@cancel="handleCancel"
|
||||
@ok="ok"
|
||||
>
|
||||
<div>
|
||||
<a-spin :spinning="loading">
|
||||
<a-form
|
||||
:form="form"
|
||||
:label-col="{ span: 5 }"
|
||||
:wrapper-col="{ span: 12 }"
|
||||
>
|
||||
<a-form-item label="Broker">
|
||||
<a-select
|
||||
mode="multiple"
|
||||
option-filter-prop="children"
|
||||
v-decorator="[
|
||||
'brokerList',
|
||||
{
|
||||
initialValue: brokers,
|
||||
rules: [{ required: true, message: '请选择一个broker!' }],
|
||||
},
|
||||
]"
|
||||
placeholder="请选择一个broker"
|
||||
>
|
||||
<a-select-option v-for="v in brokers" :key="v" :value="v">
|
||||
<span v-if="v == -1">全部</span> <span v-else>{{ v }}</span>
|
||||
</a-select-option>
|
||||
</a-select>
|
||||
</a-form-item>
|
||||
</a-form>
|
||||
<hr />
|
||||
<h4>如何检查是否配置的有限流速率:</h4>
|
||||
kafka的限流速率是通过下面这两项配置的:
|
||||
<ul>
|
||||
<li>leader.replication.throttled.rate</li>
|
||||
<li>follower.replication.throttled.rate</li>
|
||||
</ul>
|
||||
只需通过
|
||||
<strong>集群->属性配置</strong>
|
||||
查看是否存在这两项配置,如果不存在,便是没有配置限流速率。如果未配置限流速率,即使指定某个topic的分区副本进行限流,没有速率也不限流。
|
||||
</a-spin>
|
||||
</div>
|
||||
</a-modal>
|
||||
</template>
|
||||
|
||||
<script>
|
||||
import request from "@/utils/request";
|
||||
import { KafkaClusterApi, KafkaOpApi } from "@/utils/api";
|
||||
import notification from "ant-design-vue/lib/notification";
|
||||
|
||||
export default {
|
||||
name: "RemoveThrottle",
|
||||
props: {
|
||||
visible: {
|
||||
type: Boolean,
|
||||
default: false,
|
||||
},
|
||||
},
|
||||
data() {
|
||||
return {
|
||||
show: this.visible,
|
||||
loading: false,
|
||||
form: this.$form.createForm(this, { name: "RemoveThrottleForm" }),
|
||||
brokers: [],
|
||||
};
|
||||
},
|
||||
watch: {
|
||||
visible(v) {
|
||||
this.show = v;
|
||||
if (this.show) {
|
||||
this.getClusterInfo();
|
||||
}
|
||||
},
|
||||
},
|
||||
methods: {
|
||||
handleCancel() {
|
||||
this.$emit("closeRemoveThrottleDialog", { refresh: false });
|
||||
},
|
||||
getClusterInfo() {
|
||||
this.loading = true;
|
||||
request({
|
||||
url: KafkaClusterApi.getClusterInfo.url,
|
||||
method: KafkaClusterApi.getClusterInfo.method,
|
||||
}).then((res) => {
|
||||
this.loading = false;
|
||||
this.brokers = [];
|
||||
res.data.nodes.forEach((node) => this.brokers.push(node.id));
|
||||
});
|
||||
},
|
||||
ok() {
|
||||
this.form.validateFields((err, values) => {
|
||||
if (!err) {
|
||||
const data = Object.assign({}, values);
|
||||
this.loading = true;
|
||||
request({
|
||||
url: KafkaOpApi.removeThrottle.url,
|
||||
method: KafkaOpApi.removeThrottle.method,
|
||||
data: data,
|
||||
}).then((res) => {
|
||||
this.loading = false;
|
||||
if (res.code == 0) {
|
||||
this.$message.success(res.msg);
|
||||
this.$emit("closeRemoveThrottleDialog", { refresh: false });
|
||||
} else {
|
||||
notification.error({
|
||||
message: "error",
|
||||
description: res.msg,
|
||||
});
|
||||
}
|
||||
});
|
||||
}
|
||||
});
|
||||
},
|
||||
},
|
||||
};
|
||||
</script>
|
||||
|
||||
<style scoped></style>
|
||||
163
ui/src/views/topic/ConfigTopicThrottle.vue
Normal file
163
ui/src/views/topic/ConfigTopicThrottle.vue
Normal file
@@ -0,0 +1,163 @@
|
||||
<template>
|
||||
<a-modal
|
||||
:title="topic + '限流'"
|
||||
:visible="show"
|
||||
:width="1000"
|
||||
:mask="false"
|
||||
:maskClosable="false"
|
||||
okText="确认"
|
||||
cancelText="取消"
|
||||
:destroyOnClose="true"
|
||||
@cancel="handleCancel"
|
||||
@ok="ok"
|
||||
>
|
||||
<div>
|
||||
<a-spin :spinning="loading">
|
||||
<a-form
|
||||
:form="form"
|
||||
:label-col="{ span: 5 }"
|
||||
:wrapper-col="{ span: 12 }"
|
||||
>
|
||||
<a-form-item label="操作">
|
||||
<a-radio-group
|
||||
@change="onChange"
|
||||
v-decorator="[
|
||||
'operation',
|
||||
{
|
||||
initialValue: 'ON',
|
||||
rules: [{ required: true, message: '请选择一个操作!' }],
|
||||
},
|
||||
]"
|
||||
>
|
||||
<a-radio value="ON"> 配置限流 </a-radio>
|
||||
<a-radio value="OFF"> 移除所有分区限流配置 </a-radio>
|
||||
</a-radio-group>
|
||||
</a-form-item>
|
||||
|
||||
<a-form-item label="选择分区" v-show="showPartition">
|
||||
<a-select
|
||||
mode="multiple"
|
||||
option-filter-prop="children"
|
||||
v-decorator="[
|
||||
'partitions',
|
||||
{
|
||||
initialValue: [-1],
|
||||
rules: [{ required: true, message: '请选择一个分区!' }],
|
||||
},
|
||||
]"
|
||||
placeholder="请选择一个分区"
|
||||
>
|
||||
<a-select-option v-for="v in partitions" :key="v" :value="v">
|
||||
<span v-if="v == -1">全部</span> <span v-else>{{ v }}</span>
|
||||
</a-select-option>
|
||||
</a-select>
|
||||
</a-form-item>
|
||||
</a-form>
|
||||
<hr />
|
||||
<h4>说明:</h4>
|
||||
该限流表示topic的副本的在不同broker之间数据同步占用带宽的限制,该配置是一个topic级别的配置项。如未配置速率,即使配置了这个限流也不会进行实际的限流操作。配置速率在
|
||||
<span style="color: red">运维->配置限流</span> 处进行操作.
|
||||
<h4>如何检查是否对哪些分区启用限流:</h4>
|
||||
topic的限流是通过下面这两项配置的:
|
||||
<ul>
|
||||
<li>leader.replication.throttled.replicas</li>
|
||||
<li>follower.replication.throttled.replicas</li>
|
||||
</ul>
|
||||
只需通过
|
||||
<strong>属性配置</strong>
|
||||
查看这两项配置的值,格式为:"0:0,1:0",左侧为分区,右侧为broker
|
||||
id。示例表示:[分区0的副本:在broker 0上,分区1的副本:在broker 0上]。
|
||||
</a-spin>
|
||||
</div>
|
||||
</a-modal>
|
||||
</template>
|
||||
|
||||
<script>
|
||||
import request from "@/utils/request";
|
||||
import { KafkaTopicApi } from "@/utils/api";
|
||||
import notification from "ant-design-vue/lib/notification";
|
||||
|
||||
export default {
|
||||
name: "ConfigTopicThrottle",
|
||||
props: {
|
||||
visible: {
|
||||
type: Boolean,
|
||||
default: false,
|
||||
},
|
||||
topic: {
|
||||
type: String,
|
||||
default: "",
|
||||
},
|
||||
},
|
||||
data() {
|
||||
return {
|
||||
show: this.visible,
|
||||
loading: false,
|
||||
form: this.$form.createForm(this, { name: "RemoveThrottleForm" }),
|
||||
partitions: [],
|
||||
showPartition: true,
|
||||
};
|
||||
},
|
||||
watch: {
|
||||
visible(v) {
|
||||
this.show = v;
|
||||
if (this.show) {
|
||||
this.getPartitionInfo();
|
||||
this.showPartition = true;
|
||||
}
|
||||
},
|
||||
},
|
||||
methods: {
|
||||
handleCancel() {
|
||||
this.$emit("closeThrottleDialog", { refresh: false });
|
||||
},
|
||||
getPartitionInfo() {
|
||||
this.loading = true;
|
||||
request({
|
||||
url: KafkaTopicApi.getPartitionInfo.url + "?topic=" + this.topic,
|
||||
method: KafkaTopicApi.getPartitionInfo.method,
|
||||
}).then((res) => {
|
||||
this.loading = false;
|
||||
if (res.code != 0) {
|
||||
notification.error({
|
||||
message: "error",
|
||||
description: res.msg,
|
||||
});
|
||||
} else {
|
||||
this.partitions = res.data.map((e) => e.partition);
|
||||
this.partitions.splice(0, 0, -1);
|
||||
}
|
||||
});
|
||||
},
|
||||
ok() {
|
||||
this.form.validateFields((err, values) => {
|
||||
if (!err) {
|
||||
const data = Object.assign({}, values, { topic: this.topic });
|
||||
this.loading = true;
|
||||
request({
|
||||
url: KafkaTopicApi.configThrottle.url,
|
||||
method: KafkaTopicApi.configThrottle.method,
|
||||
data: data,
|
||||
}).then((res) => {
|
||||
this.loading = false;
|
||||
if (res.code == 0) {
|
||||
this.$message.success(res.msg);
|
||||
this.$emit("closeThrottleDialog", { refresh: false });
|
||||
} else {
|
||||
notification.error({
|
||||
message: "error",
|
||||
description: res.msg,
|
||||
});
|
||||
}
|
||||
});
|
||||
}
|
||||
});
|
||||
},
|
||||
onChange(e) {
|
||||
this.showPartition = !(e.target.value == "OFF");
|
||||
},
|
||||
},
|
||||
};
|
||||
</script>
|
||||
|
||||
<style scoped></style>
|
||||
@@ -12,6 +12,7 @@
|
||||
<div>
|
||||
<a-spin :spinning="loading">
|
||||
<a-table
|
||||
bordered
|
||||
:columns="columns"
|
||||
:data-source="data"
|
||||
:rowKey="
|
||||
@@ -21,19 +22,15 @@
|
||||
"
|
||||
>
|
||||
<ul slot="replicas" slot-scope="text">
|
||||
<ol v-for="i in text" :key="i">
|
||||
{{
|
||||
i
|
||||
}}
|
||||
</ol>
|
||||
</ul>
|
||||
<ul slot="isr" slot-scope="text">
|
||||
<ol v-for="i in text" :key="i">
|
||||
{{
|
||||
i
|
||||
}}
|
||||
</ol>
|
||||
<li v-for="i in text" :key="i">
|
||||
{{ i }}
|
||||
</li>
|
||||
</ul>
|
||||
<div slot="isr" slot-scope="text">
|
||||
<span v-for="i in text" :key="i">
|
||||
{{ i }}
|
||||
</span>
|
||||
</div>
|
||||
<div slot="operation" slot-scope="record" v-show="!record.internal">
|
||||
<a-popconfirm
|
||||
:title="
|
||||
|
||||
115
ui/src/views/topic/SendStats.vue
Normal file
115
ui/src/views/topic/SendStats.vue
Normal file
@@ -0,0 +1,115 @@
|
||||
<template>
|
||||
<a-modal
|
||||
:title="topic + '发送统计'"
|
||||
:visible="show"
|
||||
:width="1000"
|
||||
:mask="false"
|
||||
:destroyOnClose="true"
|
||||
:footer="null"
|
||||
:maskClosable="false"
|
||||
@cancel="handleCancel"
|
||||
>
|
||||
<div>
|
||||
<a-spin :spinning="loading">
|
||||
<h4>今天发送消息数:{{ today.total }}</h4>
|
||||
<a-table
|
||||
:columns="columns"
|
||||
:data-source="today.detail"
|
||||
bordered
|
||||
:rowKey="(record) => record.partition"
|
||||
>
|
||||
</a-table>
|
||||
<hr />
|
||||
<h4>昨天发送消息数:{{ yesterday.total }}</h4>
|
||||
<a-table
|
||||
:columns="columns"
|
||||
:data-source="yesterday.detail"
|
||||
bordered
|
||||
:rowKey="(record) => record.partition"
|
||||
>
|
||||
</a-table>
|
||||
</a-spin>
|
||||
</div>
|
||||
</a-modal>
|
||||
</template>
|
||||
|
||||
<script>
|
||||
import request from "@/utils/request";
|
||||
import { KafkaTopicApi } from "@/utils/api";
|
||||
import notification from "ant-design-vue/es/notification";
|
||||
|
||||
export default {
|
||||
name: "SendStats",
|
||||
props: {
|
||||
topic: {
|
||||
type: String,
|
||||
default: "",
|
||||
},
|
||||
visible: {
|
||||
type: Boolean,
|
||||
default: false,
|
||||
},
|
||||
},
|
||||
data() {
|
||||
return {
|
||||
columns: columns,
|
||||
show: this.visible,
|
||||
data: [],
|
||||
loading: false,
|
||||
yesterday: {},
|
||||
today: {},
|
||||
};
|
||||
},
|
||||
watch: {
|
||||
visible(v) {
|
||||
this.show = v;
|
||||
if (this.show) {
|
||||
this.sendStatus();
|
||||
}
|
||||
},
|
||||
},
|
||||
methods: {
|
||||
sendStatus() {
|
||||
this.loading = true;
|
||||
const api = KafkaTopicApi.sendStats;
|
||||
request({
|
||||
url: api.url + "?topic=" + this.topic,
|
||||
method: api.method,
|
||||
}).then((res) => {
|
||||
this.loading = false;
|
||||
if (res.code != 0) {
|
||||
notification.error({
|
||||
message: "error",
|
||||
description: res.msg,
|
||||
});
|
||||
} else {
|
||||
this.data = res.data;
|
||||
this.yesterday = this.data.yesterday;
|
||||
this.today = this.data.today;
|
||||
}
|
||||
});
|
||||
},
|
||||
handleCancel() {
|
||||
this.data = [];
|
||||
this.yesterday = {};
|
||||
this.today = {};
|
||||
this.$emit("closeMessageStatsDialog", {});
|
||||
},
|
||||
},
|
||||
};
|
||||
|
||||
const columns = [
|
||||
{
|
||||
title: "分区",
|
||||
dataIndex: "partition",
|
||||
key: "partition",
|
||||
},
|
||||
{
|
||||
title: "数量",
|
||||
dataIndex: "num",
|
||||
key: "num",
|
||||
},
|
||||
];
|
||||
</script>
|
||||
|
||||
<style scoped></style>
|
||||
@@ -98,6 +98,27 @@
|
||||
@click="openTopicConfigDialog(record.name)"
|
||||
>属性配置
|
||||
</a-button>
|
||||
<a-button
|
||||
size="small"
|
||||
href="javascript:;"
|
||||
class="operation-btn"
|
||||
@click="openUpdateReplicaDialog(record.name)"
|
||||
>变更副本
|
||||
</a-button>
|
||||
<a-button
|
||||
size="small"
|
||||
href="javascript:;"
|
||||
class="operation-btn"
|
||||
@click="openMessageStatsDialog(record.name)"
|
||||
>发送统计
|
||||
</a-button>
|
||||
<a-button
|
||||
size="small"
|
||||
href="javascript:;"
|
||||
class="operation-btn"
|
||||
@click="openThrottleDialog(record.name)"
|
||||
>限流
|
||||
</a-button>
|
||||
</div>
|
||||
</a-table>
|
||||
<PartitionInfo
|
||||
@@ -126,6 +147,21 @@
|
||||
:topic="selectDetail.resourceName"
|
||||
@closeTopicConfigDialog="closeTopicConfigDialog"
|
||||
></TopicConfig>
|
||||
<UpdateReplica
|
||||
:visible="showUpdateReplicaDialog"
|
||||
:topic="selectDetail.resourceName"
|
||||
@closeUpdateReplicaDialog="closeUpdateReplicaDialog"
|
||||
></UpdateReplica>
|
||||
<ConfigTopicThrottle
|
||||
:visible="showThrottleDialog"
|
||||
:topic="selectDetail.resourceName"
|
||||
@closeThrottleDialog="closeThrottleDialog"
|
||||
></ConfigTopicThrottle>
|
||||
<SendStats
|
||||
:visible="showSendStatsDialog"
|
||||
:topic="selectDetail.resourceName"
|
||||
@closeMessageStatsDialog="closeMessageStatsDialog"
|
||||
></SendStats>
|
||||
</div>
|
||||
</a-spin>
|
||||
</div>
|
||||
@@ -140,6 +176,9 @@ import CreateTopic from "@/views/topic/CreateTopic";
|
||||
import AddPartition from "@/views/topic/AddPartition";
|
||||
import ConsumedDetail from "@/views/topic/ConsumedDetail";
|
||||
import TopicConfig from "@/views/topic/TopicConfig";
|
||||
import UpdateReplica from "@/views/topic/UpdateReplica";
|
||||
import ConfigTopicThrottle from "@/views/topic/ConfigTopicThrottle";
|
||||
import SendStats from "@/views/topic/SendStats";
|
||||
|
||||
export default {
|
||||
name: "Topic",
|
||||
@@ -149,6 +188,9 @@ export default {
|
||||
AddPartition,
|
||||
ConsumedDetail,
|
||||
TopicConfig,
|
||||
UpdateReplica,
|
||||
ConfigTopicThrottle,
|
||||
SendStats,
|
||||
},
|
||||
data() {
|
||||
return {
|
||||
@@ -170,6 +212,9 @@ export default {
|
||||
showAddPartition: false,
|
||||
showConsumedDetailDialog: false,
|
||||
showTopicConfigDialog: false,
|
||||
showUpdateReplicaDialog: false,
|
||||
showThrottleDialog: false,
|
||||
showSendStatsDialog: false,
|
||||
};
|
||||
},
|
||||
methods: {
|
||||
@@ -250,6 +295,27 @@ export default {
|
||||
closeTopicConfigDialog() {
|
||||
this.showTopicConfigDialog = false;
|
||||
},
|
||||
openUpdateReplicaDialog(topic) {
|
||||
this.showUpdateReplicaDialog = true;
|
||||
this.selectDetail.resourceName = topic;
|
||||
},
|
||||
closeUpdateReplicaDialog() {
|
||||
this.showUpdateReplicaDialog = false;
|
||||
},
|
||||
openMessageStatsDialog(topic) {
|
||||
this.showSendStatsDialog = true;
|
||||
this.selectDetail.resourceName = topic;
|
||||
},
|
||||
closeMessageStatsDialog() {
|
||||
this.showSendStatsDialog = false;
|
||||
},
|
||||
openThrottleDialog(topic) {
|
||||
this.showThrottleDialog = true;
|
||||
this.selectDetail.resourceName = topic;
|
||||
},
|
||||
closeThrottleDialog() {
|
||||
this.showThrottleDialog = false;
|
||||
},
|
||||
},
|
||||
created() {
|
||||
this.getTopicList();
|
||||
@@ -281,7 +347,7 @@ const columns = [
|
||||
title: "操作",
|
||||
key: "operation",
|
||||
scopedSlots: { customRender: "operation" },
|
||||
width: 500,
|
||||
width: 800,
|
||||
},
|
||||
];
|
||||
</script>
|
||||
|
||||
211
ui/src/views/topic/UpdateReplica.vue
Normal file
211
ui/src/views/topic/UpdateReplica.vue
Normal file
@@ -0,0 +1,211 @@
|
||||
<template>
|
||||
<a-modal
|
||||
title="变更副本"
|
||||
:visible="show"
|
||||
:width="1200"
|
||||
:mask="false"
|
||||
:destroyOnClose="true"
|
||||
:maskClosable="false"
|
||||
@cancel="handleCancel"
|
||||
okText="确认"
|
||||
cancelText="取消"
|
||||
@ok="handleOk"
|
||||
>
|
||||
<div>
|
||||
<a-spin :spinning="loading">
|
||||
<div class="replica-box">
|
||||
<label>设置副本数:</label
|
||||
><a-input-number
|
||||
id="inputNumber"
|
||||
v-model="replicaNums"
|
||||
:min="1"
|
||||
:max="brokerSize"
|
||||
@change="onChange"
|
||||
/>
|
||||
</div>
|
||||
<div class="replica-box">
|
||||
<label>是否要限流:</label
|
||||
><a-input-number
|
||||
id="inputNumber"
|
||||
v-model="data.interBrokerThrottle"
|
||||
:min="-1"
|
||||
:max="102400"
|
||||
/>
|
||||
<strong>
|
||||
|说明:broker之间副本同步带宽限制,默认值为-1表示不限制,不是-1表示限制,该值并不表示流速,至于流速配置,在
|
||||
<span style="color: red">运维->配置限流</span> 处进行操作.</strong
|
||||
>
|
||||
</div>
|
||||
<a-table
|
||||
:columns="columns"
|
||||
:data-source="data.partitions"
|
||||
bordered
|
||||
:rowKey="
|
||||
(record, index) => {
|
||||
return index;
|
||||
}
|
||||
"
|
||||
>
|
||||
<div slot="replicas" slot-scope="text">
|
||||
<span v-for="i in text" :key="i">
|
||||
{{ i }}
|
||||
</span>
|
||||
</div>
|
||||
</a-table>
|
||||
<p>
|
||||
*正在进行即尚未完成的副本变更的任务,可以在
|
||||
<span style="color: red">运维->副本变更详情</span>
|
||||
处查看,也可以在那里将正在进行的任务取消。
|
||||
</p>
|
||||
<p>
|
||||
*如果是减少副本,不用限流。如果是增加副本数,副本同步的时候如果有大量消息需要同步,可能占用大量带宽,担心会影响集群的稳定,考虑是否开启限流。同步完成可以再把该topic的限流关毕。关闭操作可以点击
|
||||
限流按钮 处理。
|
||||
</p>
|
||||
</a-spin>
|
||||
</div>
|
||||
</a-modal>
|
||||
</template>
|
||||
|
||||
<script>
|
||||
import request from "@/utils/request";
|
||||
import { KafkaClusterApi, KafkaTopicApi } from "@/utils/api";
|
||||
import notification from "ant-design-vue/lib/notification";
|
||||
|
||||
export default {
|
||||
name: "UpdateReplica",
|
||||
props: {
|
||||
topic: {
|
||||
type: String,
|
||||
default: "",
|
||||
},
|
||||
visible: {
|
||||
type: Boolean,
|
||||
default: false,
|
||||
},
|
||||
},
|
||||
data() {
|
||||
return {
|
||||
columns: columns,
|
||||
show: this.visible,
|
||||
data: {},
|
||||
loading: false,
|
||||
form: this.$form.createForm(this, { name: "coordinated" }),
|
||||
brokerSize: 0,
|
||||
replicaNums: 0,
|
||||
defaultReplicaNums: 0,
|
||||
};
|
||||
},
|
||||
watch: {
|
||||
visible(v) {
|
||||
this.show = v;
|
||||
if (this.show) {
|
||||
this.getClusterInfo();
|
||||
this.getCurrentReplicaAssignment();
|
||||
}
|
||||
},
|
||||
},
|
||||
methods: {
|
||||
getCurrentReplicaAssignment() {
|
||||
this.loading = true;
|
||||
request({
|
||||
url:
|
||||
KafkaTopicApi.getCurrentReplicaAssignment.url +
|
||||
"?topic=" +
|
||||
this.topic,
|
||||
method: KafkaTopicApi.getCurrentReplicaAssignment.method,
|
||||
}).then((res) => {
|
||||
this.loading = false;
|
||||
if (res.code == 0) {
|
||||
this.data = res.data;
|
||||
if (this.data.partitions.length > 0) {
|
||||
this.replicaNums = this.data.partitions[0].replicas.length;
|
||||
this.defaultReplicaNums = this.replicaNums;
|
||||
}
|
||||
} else {
|
||||
notification.error({
|
||||
message: "error",
|
||||
description: res.msg,
|
||||
});
|
||||
}
|
||||
});
|
||||
},
|
||||
getClusterInfo() {
|
||||
this.loading = true;
|
||||
request({
|
||||
url: KafkaClusterApi.getClusterInfo.url,
|
||||
method: KafkaClusterApi.getClusterInfo.method,
|
||||
}).then((res) => {
|
||||
this.brokerSize = res.data.nodes.length;
|
||||
});
|
||||
},
|
||||
handleCancel() {
|
||||
this.data = {};
|
||||
this.$emit("closeUpdateReplicaDialog", { refresh: false });
|
||||
},
|
||||
onChange(value) {
|
||||
if (value < 1 || value > this.brokerSize) {
|
||||
return false;
|
||||
}
|
||||
if (this.data.partitions.length > 0) {
|
||||
this.data.partitions.forEach((p) => {
|
||||
if (value > p.replicas.length) {
|
||||
let num = p.replicas[p.replicas.length - 1];
|
||||
for (let i = p.replicas.length; i < value; i++) {
|
||||
p.replicas.push(++num % this.brokerSize);
|
||||
}
|
||||
}
|
||||
if (value < p.replicas.length) {
|
||||
for (let i = p.replicas.length; i > value; i--) {
|
||||
p.replicas.pop();
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
},
|
||||
handleOk() {
|
||||
this.loading = true;
|
||||
request({
|
||||
url: KafkaTopicApi.updateReplicaAssignment.url,
|
||||
method: KafkaTopicApi.updateReplicaAssignment.method,
|
||||
data: this.data,
|
||||
}).then((res) => {
|
||||
this.loading = false;
|
||||
if (res.code == 0) {
|
||||
this.$message.success(res.msg);
|
||||
this.$emit("closeUpdateReplicaDialog", { refresh: false });
|
||||
} else {
|
||||
notification.error({
|
||||
message: "error",
|
||||
description: res.msg,
|
||||
});
|
||||
}
|
||||
});
|
||||
},
|
||||
},
|
||||
};
|
||||
|
||||
const columns = [
|
||||
{
|
||||
title: "Topic",
|
||||
dataIndex: "topic",
|
||||
key: "topic",
|
||||
},
|
||||
{
|
||||
title: "分区",
|
||||
dataIndex: "partition",
|
||||
key: "partition",
|
||||
},
|
||||
{
|
||||
title: "副本",
|
||||
dataIndex: "replicas",
|
||||
key: "replicas",
|
||||
scopedSlots: { customRender: "replicas" },
|
||||
},
|
||||
];
|
||||
</script>
|
||||
|
||||
<style scoped>
|
||||
.replica-box {
|
||||
margin-bottom: 1%;
|
||||
}
|
||||
</style>
|
||||
Reference in New Issue
Block a user