25 Commits

Author SHA1 Message Date
许晓东
5930e44fdf 消息详情支持重新发送 2021-12-21 14:08:36 +08:00
许晓东
98f33bb2cc 分区信息里展示当前分区的有效时间范围 2021-12-20 20:29:34 +08:00
许晓东
0ec3bac6c2 在线发送消息 2021-12-20 00:09:20 +08:00
许晓东
bd814d550d 按时间查询消息及时释放内存 2021-12-17 20:06:23 +08:00
许晓东
b9548d1640 fix按时间查询消息bug,加长页面请求超时 2021-12-13 19:05:57 +08:00
许晓东
57a41e087f 查询消息详情的时候展示消费情况 2021-12-12 23:35:17 +08:00
许晓东
54cd402810 查询消息详情信息 2021-12-12 18:53:29 +08:00
许晓东
c17b0aa4b9 根据偏移查询消息 2021-12-11 23:56:18 +08:00
Xiaodong Xu
8169ddb019 Merge pull request #5 from xxd763795151/master
根据时间查询消息
2021-12-11 14:55:07 +08:00
许晓东
5f24c62855 根据时间查询消息 2021-12-11 14:53:54 +08:00
许晓东
3b21fc4cd8 加一个消息页面 2021-12-05 23:18:51 +08:00
许晓东
d15ec4a2db 页面左上角加个标志 2021-12-04 14:47:32 +08:00
许晓东
12431db525 更新最新版本安装包下载地址 2021-11-30 20:20:03 +08:00
许晓东
20535027bf 取消正在进行的副本重分配,消费组->消费详情增加刷新按钮 2021-11-30 19:49:02 +08:00
许晓东
222ba34702 发送统计 2021-11-30 15:15:47 +08:00
许晓东
39e50a6589 根据时间戳重围消费位点,采用东8区时间 2021-11-29 19:40:46 +08:00
许晓东
e881c58a8f 移除topic 限流 2021-11-27 20:49:46 +08:00
许晓东
34c87997d1 topic 限流 2021-11-27 19:46:06 +08:00
许晓东
4639335a9d polish README.md 2021-11-25 19:07:51 +08:00
许晓东
73fed3face 解除限流速率配置 2021-11-25 10:55:37 +08:00
许晓东
1b028fcb4f 限流配置 2021-11-24 20:57:33 +08:00
许晓东
62569c4454 变更副本 2021-11-23 19:59:52 +08:00
许晓东
a219551802 变更副本信息 2021-11-20 22:35:38 +08:00
许晓东
7a98eb479f 变更副本信息查询 2021-11-19 21:01:11 +08:00
许晓东
405f272fb7 下载地址 2021-11-18 15:13:08 +08:00
58 changed files with 3648 additions and 63 deletions

View File

@@ -1,10 +1,11 @@
# kafka可视化管理平台
一款轻量级的kafka可视化管理平台安装配置快捷、简单易用。
为了开发的省事,没有多语言支持,只支持中文展示。
为了开发的省事,没有国际化支持,只支持中文展示。
用过rocketmq-console吧前端展示风格跟那个有点类似。
## 安装包下载
* 点击下载:[kafka-console-ui.tar.gz](http://43.128.31.53/kafka-console-ui.tar.gz) 或 [kafka-console-ui.zip](http://43.128.31.53/kafka-console-ui.zip)
* 参考下面的打包部署,下载源码重新打包
以下两种方式2选一直接下载安装包或下载源码手动打包
* 点击下载(v1.0.1版本)[kafka-console-ui.tar.gz](https://github.com/xxd763795151/kafka-console-ui/releases/download/v1.0.1/kafka-console-ui.tar.gz) 或 [kafka-console-ui.zip](https://github.com/xxd763795151/kafka-console-ui/releases/download/v1.0.1/kafka-console-ui.zip)
* 参考下面的打包部署,下载源码重新打包(提交的最新功能特性)
## 功能支持
* 集群信息
* Topic管理
@@ -76,4 +77,13 @@ sh bin/shutdown.sh
## 前端
前端代码在工程的ui目录下找个前端开发的ide打开进行开发即可。
## 注意
前后分离,直接启动后端如果未编译前端代码是没有前端页面的,可以先打包进行编译`sh package.sh`然后再用idea启动或者前端部分单独启动
前后分离,直接启动后端如果未编译前端代码是没有前端页面的,可以先打包进行编译`sh package.sh`然后再用idea启动或者前端部分单独启动
# 页面示例
如果未启用ACL配置不会显示ACL的菜单页面所以导航栏上没有Acl这一项
![集群](./document/集群.png)
![Topic](./document/Topic.png)
![消费组](./document/消费组.png)
![运维](./document/运维.png)
增加消息检索页面
![消息](./document/消息.png)

BIN
document/Topic.png Normal file
View File

Binary file not shown.

After

Width:  |  Height:  |  Size: 42 KiB

View File

Binary file not shown.

Before

Width:  |  Height:  |  Size: 354 KiB

After

Width:  |  Height:  |  Size: 492 KiB

BIN
document/消息.png Normal file
View File

Binary file not shown.

After

Width:  |  Height:  |  Size: 400 KiB

BIN
document/消费组.png Normal file
View File

Binary file not shown.

After

Width:  |  Height:  |  Size: 53 KiB

BIN
document/运维.png Normal file
View File

Binary file not shown.

After

Width:  |  Height:  |  Size: 126 KiB

BIN
document/集群.png Normal file
View File

Binary file not shown.

After

Width:  |  Height:  |  Size: 30 KiB

View File

@@ -10,7 +10,7 @@
</parent>
<groupId>com.xuxd</groupId>
<artifactId>kafka-console-ui</artifactId>
<version>1.0.0</version>
<version>1.0.2</version>
<name>kafka-console-ui</name>
<description>Kafka console manage ui</description>
<properties>

View File

@@ -0,0 +1,27 @@
package com.xuxd.kafka.console.beans;
import lombok.Data;
/**
* kafka-console-ui.
*
* @author xuxd
* @date 2021-12-11 09:45:49
**/
@Data
public class QueryMessage {
private String topic;
private int partition;
private long startTime;
private long endTime;
private long offset;
private String keyDeserializer;
private String valueDeserializer;
}

View File

@@ -0,0 +1,29 @@
package com.xuxd.kafka.console.beans;
import java.util.List;
import lombok.Data;
/**
* kafka-console-ui.
*
* @author xuxd
* @date 2021-11-19 17:07:50
**/
@Data
public class ReplicaAssignment {
private long version = 1L;
private List<Partition> partitions;
private long interBrokerThrottle = -1;
@Data
static class Partition {
private String topic;
private int partition;
private List<Integer> replicas;
}
}

View File

@@ -0,0 +1,25 @@
package com.xuxd.kafka.console.beans;
import lombok.Data;
/**
* kafka-console-ui.
*
* @author xuxd
* @date 2021-12-19 23:28:31
**/
@Data
public class SendMessage {
private String topic;
private int partition;
private String key;
private String body;
private int num;
private long offset;
}

View File

@@ -0,0 +1,22 @@
package com.xuxd.kafka.console.beans.dto;
import com.xuxd.kafka.console.beans.enums.ThrottleUnit;
import java.util.ArrayList;
import java.util.List;
import lombok.Data;
/**
* kafka-console-ui.
*
* @author xuxd
* @date 2021-11-24 19:37:10
**/
@Data
public class BrokerThrottleDTO {
private List<Integer> brokerList = new ArrayList<>();
private long throttle;
private ThrottleUnit unit;
}

View File

@@ -0,0 +1,50 @@
package com.xuxd.kafka.console.beans.dto;
import com.xuxd.kafka.console.beans.QueryMessage;
import java.util.Date;
import lombok.Data;
/**
* kafka-console-ui.
*
* @author xuxd
* @date 2021-12-11 09:17:59
**/
@Data
public class QueryMessageDTO {
private String topic;
private int partition;
private Date startTime;
private Date endTime;
private Long offset;
private String keyDeserializer;
private String valueDeserializer;
public QueryMessage toQueryMessage() {
QueryMessage queryMessage = new QueryMessage();
queryMessage.setTopic(topic);
queryMessage.setPartition(partition);
if (startTime != null) {
queryMessage.setStartTime(startTime.getTime());
}
if (endTime != null) {
queryMessage.setEndTime(endTime.getTime());
}
if (offset != null) {
queryMessage.setOffset(offset);
}
queryMessage.setKeyDeserializer(keyDeserializer);
queryMessage.setValueDeserializer(valueDeserializer);
return queryMessage;
}
}

View File

@@ -0,0 +1,21 @@
package com.xuxd.kafka.console.beans.dto;
import com.xuxd.kafka.console.beans.enums.TopicThrottleSwitch;
import java.util.List;
import lombok.Data;
/**
* kafka-console-ui.
*
* @author xuxd
* @date 2021-11-26 15:33:37
**/
@Data
public class TopicThrottleDTO {
private String topic;
private List<Integer> partitions;
private TopicThrottleSwitch operation;
}

View File

@@ -0,0 +1,18 @@
package com.xuxd.kafka.console.beans.enums;
/**
* kafka-console-ui.
*
* @author xuxd
* @date 2021-11-24 19:38:00
**/
public enum ThrottleUnit {
KB, MB;
public long toKb(long size) {
if (this == MB) {
return 1024 * size;
}
return size;
}
}

View File

@@ -0,0 +1,11 @@
package com.xuxd.kafka.console.beans.enums;
/**
* kafka-console-ui.
*
* @author xuxd
* @date 2021-11-26 15:33:07
**/
public enum TopicThrottleSwitch {
ON,OFF;
}

View File

@@ -0,0 +1,32 @@
package com.xuxd.kafka.console.beans.vo;
import lombok.Data;
import org.apache.kafka.clients.consumer.ConsumerRecord;
/**
* kafka-console-ui.
*
* @author xuxd
* @date 2021-12-11 14:19:35
**/
@Data
public class ConsumerRecordVO {
private String topic;
private int partition;
private long offset;
private long timestamp;
public static ConsumerRecordVO fromConsumerRecord(ConsumerRecord record) {
ConsumerRecordVO vo = new ConsumerRecordVO();
vo.setTopic(record.topic());
vo.setPartition(record.partition());
vo.setOffset(record.offset());
vo.setTimestamp(record.timestamp());
return vo;
}
}

View File

@@ -0,0 +1,25 @@
package com.xuxd.kafka.console.beans.vo;
import com.xuxd.kafka.console.beans.TopicPartition;
import java.util.List;
import lombok.Data;
/**
* kafka-console-ui.
*
* @author xuxd
* @date 2021-11-30 16:03:41
**/
@Data
public class CurrentReassignmentVO {
private final String topic;
private final int partition;
private final List<Integer> replicas;
private final List<Integer> addingReplicas;
private final List<Integer> removingReplicas;
}

View File

@@ -0,0 +1,47 @@
package com.xuxd.kafka.console.beans.vo;
import java.util.ArrayList;
import java.util.List;
import lombok.Data;
/**
* kafka-console-ui.
*
* @author xuxd
* @date 2021-12-12 12:45:23
**/
@Data
public class MessageDetailVO {
private String topic;
private int partition;
private long offset;
private long timestamp;
private String timestampType;
private List<HeaderVO> headers = new ArrayList<>();
private Object key;
private Object value;
private List<ConsumerVO> consumers;
@Data
public static class HeaderVO {
String key;
String value;
}
@Data
public static class ConsumerVO {
String groupId;
String status;
}
}

View File

@@ -29,12 +29,16 @@ public class TopicPartitionVO {
private long diff;
private long beginTime;
private long endTime;
public static TopicPartitionVO from(TopicPartitionInfo partitionInfo) {
TopicPartitionVO partitionVO = new TopicPartitionVO();
partitionVO.setPartition(partitionInfo.partition());
partitionVO.setLeader(partitionInfo.leader().toString());
partitionVO.setReplicas(partitionInfo.replicas().stream().map(Node::toString).collect(Collectors.toList()));
partitionVO.setIsr(partitionInfo.isr().stream().map(Node::toString).collect(Collectors.toList()));
partitionVO.setReplicas(partitionInfo.replicas().stream().map(node -> node.host() + ":" + node.port() + " (id: " + node.idString() + ")").collect(Collectors.toList()));
partitionVO.setIsr(partitionInfo.isr().stream().map(Node::idString).collect(Collectors.toList()));
return partitionVO;
}
}

View File

@@ -5,6 +5,7 @@ import kafka.console.ConfigConsole;
import kafka.console.ConsumerConsole;
import kafka.console.KafkaAclConsole;
import kafka.console.KafkaConfigConsole;
import kafka.console.MessageConsole;
import kafka.console.OperationConsole;
import kafka.console.TopicConsole;
import org.springframework.context.annotation.Bean;
@@ -54,4 +55,9 @@ public class KafkaConfiguration {
ConsumerConsole consumerConsole) {
return new OperationConsole(config, topicConsole, consumerConsole);
}
@Bean
public MessageConsole messageConsole(KafkaConfig config) {
return new MessageConsole(config);
}
}

View File

@@ -0,0 +1,55 @@
package com.xuxd.kafka.console.controller;
import com.xuxd.kafka.console.beans.SendMessage;
import com.xuxd.kafka.console.beans.dto.QueryMessageDTO;
import com.xuxd.kafka.console.service.MessageService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* kafka-console-ui.
*
* @author xuxd
* @date 2021-12-11 09:22:19
**/
@RestController
@RequestMapping("/message")
public class MessageController {
@Autowired
private MessageService messageService;
@PostMapping("/search/time")
public Object searchByTime(@RequestBody QueryMessageDTO dto) {
return messageService.searchByTime(dto.toQueryMessage());
}
@PostMapping("/search/offset")
public Object searchByOffset(@RequestBody QueryMessageDTO dto) {
return messageService.searchByOffset(dto.toQueryMessage());
}
@PostMapping("/search/detail")
public Object searchDetail(@RequestBody QueryMessageDTO dto) {
return messageService.searchDetail(dto.toQueryMessage());
}
@GetMapping("/deserializer/list")
public Object deserializerList() {
return messageService.deserializerList();
}
@PostMapping("/send")
public Object send(@RequestBody SendMessage message) {
return messageService.send(message);
}
@PostMapping("/resend")
public Object resend(@RequestBody SendMessage message) {
return messageService.resend(message);
}
}

View File

@@ -1,5 +1,7 @@
package com.xuxd.kafka.console.controller;
import com.xuxd.kafka.console.beans.TopicPartition;
import com.xuxd.kafka.console.beans.dto.BrokerThrottleDTO;
import com.xuxd.kafka.console.beans.dto.ReplicationDTO;
import com.xuxd.kafka.console.beans.dto.SyncDataDTO;
import com.xuxd.kafka.console.service.OperationService;
@@ -52,4 +54,24 @@ public class OperationController {
public Object electPreferredLeader(@RequestBody ReplicationDTO dto) {
return operationService.electPreferredLeader(dto.getTopic(), dto.getPartition());
}
@PostMapping("/broker/throttle")
public Object configThrottle(@RequestBody BrokerThrottleDTO dto) {
return operationService.configThrottle(dto.getBrokerList(), dto.getUnit().toKb(dto.getThrottle()));
}
@DeleteMapping("/broker/throttle")
public Object removeThrottle(@RequestBody BrokerThrottleDTO dto) {
return operationService.removeThrottle(dto.getBrokerList());
}
@GetMapping("/replication/reassignments")
public Object currentReassignments() {
return operationService.currentReassignments();
}
@DeleteMapping("/replication/reassignments")
public Object cancelReassignment(@RequestBody TopicPartition partition) {
return operationService.cancelReassignment(new org.apache.kafka.common.TopicPartition(partition.getTopic(), partition.getPartition()));
}
}

View File

@@ -1,7 +1,9 @@
package com.xuxd.kafka.console.controller;
import com.xuxd.kafka.console.beans.ReplicaAssignment;
import com.xuxd.kafka.console.beans.dto.AddPartitionDTO;
import com.xuxd.kafka.console.beans.dto.NewTopicDTO;
import com.xuxd.kafka.console.beans.dto.TopicThrottleDTO;
import com.xuxd.kafka.console.beans.enums.TopicType;
import com.xuxd.kafka.console.service.TopicService;
import java.util.ArrayList;
@@ -71,4 +73,24 @@ public class TopicController {
return topicService.addPartitions(topic, addNum, assignment);
}
@GetMapping("/replica/assignment")
public Object getCurrentReplicaAssignment(@RequestParam String topic) {
return topicService.getCurrentReplicaAssignment(topic);
}
@PostMapping("/replica/assignment")
public Object updateReplicaAssignment(@RequestBody ReplicaAssignment assignment) {
return topicService.updateReplicaAssignment(assignment);
}
@PostMapping("/replica/throttle")
public Object configThrottle(@RequestBody TopicThrottleDTO dto) {
return topicService.configThrottle(dto.getTopic(), dto.getPartitions(), dto.getOperation());
}
@GetMapping("/send/stats")
public Object sendStats(@RequestParam String topic) {
return topicService.sendStats(topic);
}
}

View File

@@ -38,4 +38,6 @@ public interface ConsumerService {
ResponseData getTopicSubscribedByGroups(String topic);
ResponseData getOffsetPartition(String groupId);
ResponseData<Set<String>> getSubscribedGroups(String topic);
}

View File

@@ -0,0 +1,26 @@
package com.xuxd.kafka.console.service;
import com.xuxd.kafka.console.beans.QueryMessage;
import com.xuxd.kafka.console.beans.ResponseData;
import com.xuxd.kafka.console.beans.SendMessage;
/**
* kafka-console-ui.
*
* @author xuxd
* @date 2021-12-11 09:43:26
**/
public interface MessageService {
ResponseData searchByTime(QueryMessage queryMessage);
ResponseData searchByOffset(QueryMessage queryMessage);
ResponseData searchDetail(QueryMessage queryMessage);
ResponseData deserializerList();
ResponseData send(SendMessage message);
ResponseData resend(SendMessage message);
}

View File

@@ -1,7 +1,9 @@
package com.xuxd.kafka.console.service;
import com.xuxd.kafka.console.beans.ResponseData;
import java.util.List;
import java.util.Properties;
import org.apache.kafka.common.TopicPartition;
/**
* kafka-console-ui.
@@ -20,4 +22,12 @@ public interface OperationService {
ResponseData deleteAlignmentById(Long id);
ResponseData electPreferredLeader(String topic, int partition);
ResponseData configThrottle(List<Integer> brokerList, long size);
ResponseData removeThrottle(List<Integer> brokerList);
ResponseData currentReassignments();
ResponseData cancelReassignment(TopicPartition partition);
}

View File

@@ -1,10 +1,10 @@
package com.xuxd.kafka.console.service;
import com.xuxd.kafka.console.beans.ReplicaAssignment;
import com.xuxd.kafka.console.beans.ResponseData;
import com.xuxd.kafka.console.beans.enums.TopicThrottleSwitch;
import com.xuxd.kafka.console.beans.enums.TopicType;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.apache.kafka.clients.admin.NewTopic;
/**
@@ -26,4 +26,12 @@ public interface TopicService {
ResponseData createTopic(NewTopic topic);
ResponseData addPartitions(String topic, int addNum, List<List<Integer>> newAssignmentst);
ResponseData getCurrentReplicaAssignment(String topic);
ResponseData updateReplicaAssignment(ReplicaAssignment assignment);
ResponseData configThrottle(String topic, List<Integer> partitions, TopicThrottleSwitch throttleSwitch);
ResponseData sendStats(String topic);
}

View File

@@ -15,6 +15,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import kafka.console.ConsumerConsole;
import kafka.console.TopicConsole;
@@ -48,6 +49,8 @@ public class ConsumerServiceImpl implements ConsumerService {
@Autowired
private TopicConsole topicConsole;
private ReentrantLock lock = new ReentrantLock();
@Override public ResponseData getConsumerGroupList(List<String> groupIds, Set<ConsumerGroupState> states) {
String simulateGroup = "inner_xxx_not_exit_group_###" + System.currentTimeMillis();
Set<String> groupList = new HashSet<>();
@@ -142,7 +145,7 @@ public class ConsumerServiceImpl implements ConsumerService {
@Override public ResponseData resetOffsetByDate(String groupId, String topic, String dateStr) {
long timestamp = -1L;
try {
StringBuilder sb = new StringBuilder(dateStr.replace(" ", "T")).append(".000");
StringBuilder sb = new StringBuilder(dateStr.replace(" ", "T")).append(".000+08:00");//固定为utc+08:00东8区来计算
timestamp = Utils.getDateTime(sb.toString());
} catch (ParseException e) {
throw new IllegalArgumentException(e);
@@ -167,25 +170,7 @@ public class ConsumerServiceImpl implements ConsumerService {
}
@Override public ResponseData getTopicSubscribedByGroups(String topic) {
if (topicSubscribedInfo.isNeedRefresh(topic)) {
Set<String> groupIdList = consumerConsole.getConsumerGroupIdList(Collections.emptySet());
Map<String, Set<String>> cache = new HashMap<>();
Map<String, List<TopicPartition>> subscribeTopics = consumerConsole.listSubscribeTopics(groupIdList);
subscribeTopics.forEach((groupId, tl) -> {
tl.forEach(topicPartition -> {
String t = topicPartition.topic();
if (!cache.containsKey(t)) {
cache.put(t, new HashSet<>());
}
cache.get(t).add(groupId);
});
});
topicSubscribedInfo.refresh(cache);
}
Set<String> groups = topicSubscribedInfo.getSubscribedGroups(topic);
Set<String> groups = this.getSubscribedGroups(topic).getData();
Map<String, Object> res = new HashMap<>();
Collection<ConsumerConsole.TopicPartitionConsumeInfo> consumerDetail = consumerConsole.getConsumerDetail(groups);
@@ -212,6 +197,34 @@ public class ConsumerServiceImpl implements ConsumerService {
return ResponseData.create().data(Utils.abs(groupId.hashCode()) % size);
}
@Override public ResponseData<Set<String>> getSubscribedGroups(String topic) {
if (topicSubscribedInfo.isNeedRefresh(topic) && !lock.isLocked()) {
try {
lock.lock();
Set<String> groupIdList = consumerConsole.getConsumerGroupIdList(Collections.emptySet());
Map<String, Set<String>> cache = new HashMap<>();
Map<String, List<TopicPartition>> subscribeTopics = consumerConsole.listSubscribeTopics(groupIdList);
subscribeTopics.forEach((groupId, tl) -> {
tl.forEach(topicPartition -> {
String t = topicPartition.topic();
if (!cache.containsKey(t)) {
cache.put(t, new HashSet<>());
}
cache.get(t).add(groupId);
});
});
topicSubscribedInfo.refresh(cache);
} finally {
lock.unlock();
}
}
Set<String> groups = topicSubscribedInfo.getSubscribedGroups(topic);
return ResponseData.create(Set.class).data(groups).success();
}
class TopicSubscribedInfo {
long lastTime = System.currentTimeMillis();

View File

@@ -0,0 +1,212 @@
package com.xuxd.kafka.console.service.impl;
import com.xuxd.kafka.console.beans.QueryMessage;
import com.xuxd.kafka.console.beans.ResponseData;
import com.xuxd.kafka.console.beans.SendMessage;
import com.xuxd.kafka.console.beans.vo.ConsumerRecordVO;
import com.xuxd.kafka.console.beans.vo.MessageDetailVO;
import com.xuxd.kafka.console.service.ConsumerService;
import com.xuxd.kafka.console.service.MessageService;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import kafka.console.ConsumerConsole;
import kafka.console.MessageConsole;
import kafka.console.TopicConsole;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.BytesDeserializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.DoubleDeserializer;
import org.apache.kafka.common.serialization.FloatDeserializer;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Service;
import scala.Tuple2;
/**
* kafka-console-ui.
*
* @author xuxd
* @date 2021-12-11 09:43:44
**/
@Slf4j
@Service
public class MessageServiceImpl implements MessageService, ApplicationContextAware {
@Autowired
private MessageConsole messageConsole;
@Autowired
private TopicConsole topicConsole;
@Autowired
private ConsumerConsole consumerConsole;
private ApplicationContext applicationContext;
private Map<String, Deserializer> deserializerDict = new HashMap<>();
{
deserializerDict.put("ByteArray", new ByteArrayDeserializer());
deserializerDict.put("Integer", new IntegerDeserializer());
deserializerDict.put("String", new StringDeserializer());
deserializerDict.put("Float", new FloatDeserializer());
deserializerDict.put("Double", new DoubleDeserializer());
deserializerDict.put("Byte", new BytesDeserializer());
}
public static String defaultDeserializer = "String";
@Override public ResponseData searchByTime(QueryMessage queryMessage) {
int maxNums = 10000;
Set<TopicPartition> partitions = getPartitions(queryMessage);
long startTime = System.currentTimeMillis();
List<ConsumerRecord<byte[], byte[]>> records = messageConsole.searchBy(partitions, queryMessage.getStartTime(), queryMessage.getEndTime(), maxNums);
log.info("search message by time, cost time: {}", (System.currentTimeMillis() - startTime));
List<ConsumerRecordVO> vos = records.stream().filter(record -> record.timestamp() <= queryMessage.getEndTime())
.map(ConsumerRecordVO::fromConsumerRecord).collect(Collectors.toList());
Map<String, Object> res = new HashMap<>();
res.put("maxNum", maxNums);
res.put("realNum", vos.size());
res.put("data", vos.subList(0, Math.min(maxNums, vos.size())));
return ResponseData.create().data(res).success();
}
@Override public ResponseData searchByOffset(QueryMessage queryMessage) {
Map<TopicPartition, ConsumerRecord<byte[], byte[]>> recordMap = searchRecordByOffset(queryMessage);
return ResponseData.create().data(recordMap.values().stream().map(ConsumerRecordVO::fromConsumerRecord).collect(Collectors.toList())).success();
}
@Override public ResponseData searchDetail(QueryMessage queryMessage) {
if (queryMessage.getPartition() == -1) {
throw new IllegalArgumentException();
}
if (StringUtils.isBlank(queryMessage.getKeyDeserializer())) {
queryMessage.setKeyDeserializer(defaultDeserializer);
}
if (StringUtils.isBlank(queryMessage.getValueDeserializer())) {
queryMessage.setValueDeserializer(defaultDeserializer);
}
Map<TopicPartition, ConsumerRecord<byte[], byte[]>> recordMap = searchRecordByOffset(queryMessage);
ConsumerRecord<byte[], byte[]> record = recordMap.get(new TopicPartition(queryMessage.getTopic(), queryMessage.getPartition()));
if (record != null) {
MessageDetailVO vo = new MessageDetailVO();
vo.setTopic(record.topic());
vo.setPartition(record.partition());
vo.setOffset(record.offset());
vo.setTimestamp(record.timestamp());
vo.setTimestampType(record.timestampType().name());
try {
vo.setKey(deserializerDict.get(queryMessage.getKeyDeserializer()).deserialize(queryMessage.getTopic(), record.key()));
} catch (Exception e) {
vo.setKey("KeyDeserializer Error: " + e.getMessage());
}
try {
vo.setValue(deserializerDict.get(queryMessage.getValueDeserializer()).deserialize(queryMessage.getTopic(), record.value()));
} catch (Exception e) {
vo.setValue("ValueDeserializer Error: " + e.getMessage());
}
record.headers().forEach(header -> {
MessageDetailVO.HeaderVO headerVO = new MessageDetailVO.HeaderVO();
headerVO.setKey(header.key());
headerVO.setValue(new String(header.value()));
vo.getHeaders().add(headerVO);
});
// 为了尽量保持代码好看不直接注入另一个service层的实现类了
Set<String> groupIds = applicationContext.getBean(ConsumerService.class).getSubscribedGroups(record.topic()).getData();
Collection<ConsumerConsole.TopicPartitionConsumeInfo> consumerDetail = consumerConsole.getConsumerDetail(groupIds);
List<MessageDetailVO.ConsumerVO> consumerVOS = new LinkedList<>();
consumerDetail.forEach(consumerInfo -> {
if (consumerInfo.topicPartition().equals(new TopicPartition(record.topic(), record.partition()))) {
MessageDetailVO.ConsumerVO consumerVO = new MessageDetailVO.ConsumerVO();
consumerVO.setGroupId(consumerInfo.getGroupId());
consumerVO.setStatus(consumerInfo.getConsumerOffset() <= record.offset() ? "unconsume" : "consumed");
consumerVOS.add(consumerVO);
}
});
vo.setConsumers(consumerVOS);
return ResponseData.create().data(vo).success();
}
return ResponseData.create().failed("Not found message detail.");
}
@Override public ResponseData deserializerList() {
return ResponseData.create().data(deserializerDict.keySet()).success();
}
@Override public ResponseData send(SendMessage message) {
messageConsole.send(message.getTopic(), message.getPartition(), message.getKey(), message.getBody(), message.getNum());
return ResponseData.create().success();
}
@Override public ResponseData resend(SendMessage message) {
TopicPartition partition = new TopicPartition(message.getTopic(), message.getPartition());
Map<TopicPartition, Object> offsetTable = new HashMap<>(1, 1.0f);
offsetTable.put(partition, message.getOffset());
Map<TopicPartition, ConsumerRecord<byte[], byte[]>> recordMap = messageConsole.searchBy(offsetTable);
if (recordMap.isEmpty()) {
return ResponseData.create().failed("Get message failed.");
}
ConsumerRecord<byte[], byte[]> record = recordMap.get(partition);
ProducerRecord<byte[], byte[]> producerRecord = new ProducerRecord<>(record.topic(), record.partition(), record.key(), record.value(), record.headers());
Tuple2<Object, String> tuple2 = messageConsole.sendSync(producerRecord);
boolean success = (boolean) tuple2._1();
return success ? ResponseData.create().success("success: " + tuple2._2()) : ResponseData.create().failed(tuple2._2());
}
private Map<TopicPartition, ConsumerRecord<byte[], byte[]>> searchRecordByOffset(QueryMessage queryMessage) {
Set<TopicPartition> partitions = getPartitions(queryMessage);
Map<TopicPartition, Object> offsetTable = new HashMap<>();
partitions.forEach(tp -> {
offsetTable.put(tp, queryMessage.getOffset());
});
Map<TopicPartition, ConsumerRecord<byte[], byte[]>> recordMap = messageConsole.searchBy(offsetTable);
return recordMap;
}
private Set<TopicPartition> getPartitions(QueryMessage queryMessage) {
Set<TopicPartition> partitions = new HashSet<>();
if (queryMessage.getPartition() != -1) {
partitions.add(new TopicPartition(queryMessage.getTopic(), queryMessage.getPartition()));
} else {
List<TopicDescription> list = topicConsole.getTopicList(Collections.singleton(queryMessage.getTopic()));
if (CollectionUtils.isEmpty(list)) {
throw new IllegalArgumentException("Can not find topic info.");
}
Set<TopicPartition> set = list.get(0).partitions().stream()
.map(tp -> new TopicPartition(queryMessage.getTopic(), tp.partition())).collect(Collectors.toSet());
partitions.addAll(set);
}
return partitions;
}
@Override public void setApplicationContext(ApplicationContext context) throws BeansException {
this.applicationContext = context;
}
}

View File

@@ -5,16 +5,21 @@ import com.google.gson.Gson;
import com.google.gson.JsonObject;
import com.xuxd.kafka.console.beans.ResponseData;
import com.xuxd.kafka.console.beans.dos.MinOffsetAlignmentDO;
import com.xuxd.kafka.console.beans.vo.CurrentReassignmentVO;
import com.xuxd.kafka.console.beans.vo.OffsetAlignmentVO;
import com.xuxd.kafka.console.dao.MinOffsetAlignmentMapper;
import com.xuxd.kafka.console.service.OperationService;
import com.xuxd.kafka.console.utils.GsonUtil;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.stream.Collectors;
import kafka.console.OperationConsole;
import org.apache.kafka.clients.admin.PartitionReassignment;
import org.apache.kafka.common.TopicPartition;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.beans.factory.annotation.Autowired;
@@ -30,7 +35,7 @@ import scala.Tuple2;
@Service
public class OperationServiceImpl implements OperationService {
private Gson gson = new Gson();
private Gson gson = GsonUtil.INSTANCE.get();
@Autowired
private OperationConsole operationConsole;
@@ -122,4 +127,39 @@ public class OperationServiceImpl implements OperationService {
return (boolean) tuple2._1() ? ResponseData.create().success() : ResponseData.create().failed(tuple2._2());
}
@Override public ResponseData configThrottle(List<Integer> brokerList, long size) {
Tuple2<Object, String> tuple2 = operationConsole.modifyInterBrokerThrottle(new HashSet<>(brokerList), size);
return (boolean) tuple2._1() ? ResponseData.create().success() : ResponseData.create().failed(tuple2._2());
}
@Override public ResponseData removeThrottle(List<Integer> brokerList) {
Tuple2<Object, String> tuple2 = operationConsole.clearBrokerLevelThrottles(new HashSet<>(brokerList));
return (boolean) tuple2._1() ? ResponseData.create().success() : ResponseData.create().failed(tuple2._2());
}
@Override public ResponseData currentReassignments() {
Map<TopicPartition, PartitionReassignment> reassignmentMap = operationConsole.currentReassignments();
List<CurrentReassignmentVO> vos = reassignmentMap.entrySet().stream().map(entry -> {
TopicPartition partition = entry.getKey();
PartitionReassignment reassignment = entry.getValue();
return new CurrentReassignmentVO(partition.topic(),
partition.partition(), reassignment.replicas(), reassignment.addingReplicas(), reassignment.removingReplicas());
}).collect(Collectors.toList());
return ResponseData.create().data(vos).success();
}
@Override public ResponseData cancelReassignment(TopicPartition partition) {
Map<TopicPartition, Throwable> res = operationConsole.cancelPartitionReassignments(Collections.singleton(partition));
if (!res.isEmpty()) {
StringBuilder sb = new StringBuilder("Failed: ");
res.forEach((p, t) -> {
sb.append(p.toString()).append(": ").append(t.getMessage()).append(System.lineSeparator());
});
return ResponseData.create().failed(sb.toString());
}
return ResponseData.create().success();
}
}

View File

@@ -1,10 +1,15 @@
package com.xuxd.kafka.console.service.impl;
import com.google.gson.Gson;
import com.xuxd.kafka.console.beans.ReplicaAssignment;
import com.xuxd.kafka.console.beans.ResponseData;
import com.xuxd.kafka.console.beans.enums.TopicThrottleSwitch;
import com.xuxd.kafka.console.beans.enums.TopicType;
import com.xuxd.kafka.console.beans.vo.TopicDescriptionVO;
import com.xuxd.kafka.console.beans.vo.TopicPartitionVO;
import com.xuxd.kafka.console.service.TopicService;
import com.xuxd.kafka.console.utils.GsonUtil;
import java.util.Calendar;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
@@ -12,13 +17,16 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import kafka.console.MessageConsole;
import kafka.console.TopicConsole;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.admin.NewPartitions;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.TopicPartitionInfo;
import org.springframework.beans.factory.annotation.Autowired;
@@ -38,6 +46,11 @@ public class TopicServiceImpl implements TopicService {
@Autowired
private TopicConsole topicConsole;
@Autowired
private MessageConsole messageConsole;
private Gson gson = GsonUtil.INSTANCE.get();
@Override public ResponseData getTopicNameList(boolean internal) {
return ResponseData.create().data(topicConsole.getTopicNameList(internal)).success();
}
@@ -98,6 +111,10 @@ public class TopicServiceImpl implements TopicService {
mapTuple2._2().forEach((k, v) -> {
endTable.put(k.partition(), (Long) v);
});
// computer the valid time range.
Map<TopicPartition, Object> beginOffsetTable = new HashMap<>();
Map<TopicPartition, Object> endOffsetTable = new HashMap<>();
Map<Integer, TopicPartition> partitionCache = new HashMap<>();
for (TopicPartitionVO partitionVO : voList) {
long begin = beginTable.get(partitionVO.getPartition());
@@ -105,7 +122,29 @@ public class TopicServiceImpl implements TopicService {
partitionVO.setBeginOffset(begin);
partitionVO.setEndOffset(end);
partitionVO.setDiff(end - begin);
if (begin != end) {
TopicPartition partition = new TopicPartition(topic, partitionVO.getPartition());
partitionCache.put(partitionVO.getPartition(), partition);
beginOffsetTable.put(partition, begin);
endOffsetTable.put(partition, end - 1); // end must < endOff
} else {
partitionVO.setBeginTime(-1L);
partitionVO.setEndTime(-1L);
}
}
Map<TopicPartition, ConsumerRecord<byte[], byte[]>> beginRecordMap = messageConsole.searchBy(beginOffsetTable);
Map<TopicPartition, ConsumerRecord<byte[], byte[]>> endRecordMap = messageConsole.searchBy(endOffsetTable);
for (TopicPartitionVO partitionVO : voList) {
if (partitionVO.getBeginTime() != -1L) {
TopicPartition partition = partitionCache.get(partitionVO.getPartition());
partitionVO.setBeginTime(beginRecordMap.containsKey(partition) ? beginRecordMap.get(partition).timestamp() : -1L);
partitionVO.setEndTime(endRecordMap.containsKey(partition) ? endRecordMap.get(partition).timestamp() : -1L);
}
}
return ResponseData.create().data(voList).success();
}
@@ -130,4 +169,92 @@ public class TopicServiceImpl implements TopicService {
return success ? ResponseData.create().success() : ResponseData.create().failed(tuple2._2());
}
@Override public ResponseData getCurrentReplicaAssignment(String topic) {
Tuple2<Object, String> tuple2 = topicConsole.getCurrentReplicaAssignmentJson(topic);
boolean success = (boolean) tuple2._1();
return success ? ResponseData.create().data(gson.fromJson(tuple2._2(), ReplicaAssignment.class)).success() : ResponseData.create().failed(tuple2._2());
}
@Override public ResponseData updateReplicaAssignment(ReplicaAssignment assignment) {
Tuple2<Object, String> tuple2 = topicConsole.updateReplicas(gson.toJson(assignment), assignment.getInterBrokerThrottle());
boolean success = (boolean) tuple2._1();
return success ? ResponseData.create().success() : ResponseData.create().failed(tuple2._2());
}
@Override
public ResponseData configThrottle(String topic, List<Integer> partitions, TopicThrottleSwitch throttleSwitch) {
Tuple2<Object, String> tuple2 = null;
switch (throttleSwitch) {
case ON:
tuple2 = topicConsole.configThrottle(topic, partitions);
break;
case OFF:
tuple2 = topicConsole.clearThrottle(topic);
break;
default:
throw new IllegalArgumentException("switch is unknown.");
}
boolean success = (boolean) tuple2._1();
return success ? ResponseData.create().success() : ResponseData.create().failed(tuple2._2());
}
@Override public ResponseData sendStats(String topic) {
Calendar calendar = Calendar.getInstance();
long current = calendar.getTimeInMillis();
calendar.set(Calendar.HOUR_OF_DAY, 0);
calendar.set(Calendar.MINUTE, 0);
calendar.set(Calendar.SECOND, 0);
calendar.set(Calendar.MILLISECOND, 0);
long today = calendar.getTimeInMillis();
calendar.add(Calendar.DAY_OF_MONTH, -1);
long yesterday = calendar.getTimeInMillis();
Map<TopicPartition, Long> currentOffset = topicConsole.getOffsetForTimestamp(topic, current);
Map<TopicPartition, Long> todayOffset = topicConsole.getOffsetForTimestamp(topic, today);
Map<TopicPartition, Long> yesterdayOffset = topicConsole.getOffsetForTimestamp(topic, yesterday);
Map<String, Object> res = new HashMap<>();
// 昨天的消息数是今天减去昨天的
AtomicLong yesterdayTotal = new AtomicLong(0L), todayTotal = new AtomicLong(0L);
Map<Integer, Long> yesterdayDetail = new HashMap<>(), todayDetail = new HashMap<>();
todayOffset.forEach(((partition, aLong) -> {
Long last = yesterdayOffset.get(partition);
long diff = last == null ? aLong : aLong - last;
yesterdayDetail.put(partition.partition(), diff);
yesterdayTotal.addAndGet(diff);
}));
currentOffset.forEach(((partition, aLong) -> {
Long last = todayOffset.get(partition);
long diff = last == null ? aLong : aLong - last;
todayDetail.put(partition.partition(), diff);
todayTotal.addAndGet(diff);
}));
Map<String, Object> yes = new HashMap<>(), to = new HashMap<>();
yes.put("detail", convertList(yesterdayDetail));
yes.put("total", yesterdayTotal.get());
to.put("detail", convertList(todayDetail));
to.put("total", todayTotal.get());
res.put("yesterday", yes);
res.put("today", to);
// 今天的消息数是现在减去今天0时的
return ResponseData.create().data(res).success();
}
private List<Map<String, Object>> convertList(Map<Integer, Long> source) {
List<Map<String, Object>> collect = source.entrySet().stream().map(entry -> {
Map<String, Object> map = new HashMap<>(3, 1.0f);
map.put("partition", entry.getKey());
map.put("num", entry.getValue());
return map;
}).collect(Collectors.toList());
return collect;
}
}

View File

@@ -0,0 +1,19 @@
package com.xuxd.kafka.console.utils;
import com.google.gson.Gson;
/**
* kafka-console-ui.
*
* @author xuxd
* @date 2021-11-19 17:01:01
**/
public enum GsonUtil {
INSTANCE;
private Gson gson = new Gson();
public Gson get() {
return gson;
}
}

View File

@@ -1,15 +1,21 @@
package kafka.console
import java.util.Properties
import com.xuxd.kafka.console.config.KafkaConfig
import kafka.zk.{AdminZkClient, KafkaZkClient}
import org.apache.kafka.clients.CommonClientConfigs
import org.apache.kafka.clients.admin.{AbstractOptions, Admin, AdminClientConfig}
import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}
import org.apache.kafka.clients.admin._
import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer, OffsetAndMetadata}
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.config.SaslConfigs
import org.apache.kafka.common.serialization.ByteArrayDeserializer
import org.apache.kafka.common.requests.ListOffsetsResponse
import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySerializer, StringSerializer}
import org.apache.kafka.common.utils.Time
import org.slf4j.{Logger, LoggerFactory}
import java.util.Properties
import scala.collection.{Map, Seq}
import scala.jdk.CollectionConverters.{MapHasAsJava, MapHasAsScala}
/**
* kafka-console-ui.
@@ -55,6 +61,38 @@ class KafkaConsole(config: KafkaConfig) {
}
}
protected def withProducerAndCatchError(f: KafkaProducer[String, String] => Any, eh: Exception => Any,
extra: Properties = new Properties()): Any = {
val props = getProps()
props.putAll(extra)
props.put(ConsumerConfig.CLIENT_ID_CONFIG, String.valueOf(System.currentTimeMillis()))
val producer = new KafkaProducer[String, String](props, new StringSerializer, new StringSerializer)
try {
f(producer)
} catch {
case er: Exception => eh(er)
}
finally {
producer.close()
}
}
protected def withByteProducerAndCatchError(f: KafkaProducer[Array[Byte], Array[Byte]] => Any, eh: Exception => Any,
extra: Properties = new Properties()): Any = {
val props = getProps()
props.putAll(extra)
props.put(ConsumerConfig.CLIENT_ID_CONFIG, String.valueOf(System.currentTimeMillis()))
val producer = new KafkaProducer[Array[Byte], Array[Byte]](props, new ByteArraySerializer, new ByteArraySerializer)
try {
f(producer)
} catch {
case er: Exception => eh(er)
}
finally {
producer.close()
}
}
protected def withZKClient(f: AdminZkClient => Any): Any = {
val zkClient = KafkaZkClient(config.getZookeeperAddr, false, 30000, 30000, Int.MaxValue, Time.SYSTEM)
val adminZkClient = new AdminZkClient(zkClient)
@@ -89,3 +127,57 @@ class KafkaConsole(config: KafkaConfig) {
props
}
}
object KafkaConsole {
val log: Logger = LoggerFactory.getLogger(this.getClass)
def getCommittedOffsets(admin: Admin, groupId: String,
timeoutMs: Integer): Map[TopicPartition, OffsetAndMetadata] = {
admin.listConsumerGroupOffsets(
groupId, new ListConsumerGroupOffsetsOptions().timeoutMs(timeoutMs)
).partitionsToOffsetAndMetadata.get.asScala
}
def getLogTimestampOffsets(admin: Admin, topicPartitions: Seq[TopicPartition],
timestamp: java.lang.Long, timeoutMs: Integer): Map[TopicPartition, OffsetAndMetadata] = {
val timestampOffsets = topicPartitions.map { topicPartition =>
topicPartition -> OffsetSpec.forTimestamp(timestamp)
}.toMap
val offsets = admin.listOffsets(
timestampOffsets.asJava,
new ListOffsetsOptions().timeoutMs(timeoutMs)
).all.get
val (successfulOffsetsForTimes, unsuccessfulOffsetsForTimes) =
offsets.asScala.partition(_._2.offset != ListOffsetsResponse.UNKNOWN_OFFSET)
val successfulLogTimestampOffsets = successfulOffsetsForTimes.map {
case (topicPartition, listOffsetsResultInfo) => topicPartition -> new OffsetAndMetadata(listOffsetsResultInfo.offset)
}.toMap
unsuccessfulOffsetsForTimes.foreach { entry =>
log.warn(s"\nWarn: Partition " + entry._1.partition() + " from topic " + entry._1.topic() +
" is empty. Falling back to latest known offset.")
}
successfulLogTimestampOffsets ++ getLogEndOffsets(admin, unsuccessfulOffsetsForTimes.keySet.toSeq, timeoutMs)
}
def getLogEndOffsets(admin: Admin,
topicPartitions: Seq[TopicPartition], timeoutMs: Integer): Predef.Map[TopicPartition, OffsetAndMetadata] = {
val endOffsets = topicPartitions.map { topicPartition =>
topicPartition -> OffsetSpec.latest
}.toMap
val offsets = admin.listOffsets(
endOffsets.asJava,
new ListOffsetsOptions().timeoutMs(timeoutMs)
).all.get
val res = topicPartitions.map { topicPartition =>
Option(offsets.get(topicPartition)) match {
case Some(listOffsetsResultInfo) => topicPartition -> new OffsetAndMetadata(listOffsetsResultInfo.offset)
case _ =>
throw new IllegalArgumentException
}
}.toMap
res
}
}

View File

@@ -0,0 +1,196 @@
package kafka.console
import com.xuxd.kafka.console.config.KafkaConfig
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.TopicPartition
import java.time.Duration
import java.util
import java.util.Properties
import scala.collection.immutable
import scala.jdk.CollectionConverters.{CollectionHasAsScala, MapHasAsScala, SeqHasAsJava}
/**
* kafka-console-ui.
*
* @author xuxd
* @date 2021-12-11 09:39:40
* */
class MessageConsole(config: KafkaConfig) extends KafkaConsole(config: KafkaConfig) with Logging {
def searchBy(partitions: util.Collection[TopicPartition], startTime: Long, endTime: Long,
maxNums: Int): util.List[ConsumerRecord[Array[Byte], Array[Byte]]] = {
var startOffTable: immutable.Map[TopicPartition, Long] = Map.empty
var endOffTable: immutable.Map[TopicPartition, Long] = Map.empty
withAdminClientAndCatchError(admin => {
val startTable = KafkaConsole.getLogTimestampOffsets(admin, partitions.asScala.toSeq, startTime, timeoutMs)
startOffTable = startTable.map(t2 => (t2._1, t2._2.offset())).toMap
endOffTable = KafkaConsole.getLogTimestampOffsets(admin, partitions.asScala.toSeq, endTime, timeoutMs)
.map(t2 => (t2._1, t2._2.offset())).toMap
}, e => {
log.error("getLogTimestampOffsets error.", e)
throw new RuntimeException("getLogTimestampOffsets error", e)
})
var terminate: Boolean = (startOffTable == endOffTable)
val res = new util.LinkedList[ConsumerRecord[Array[Byte], Array[Byte]]]()
// 如果最小和最大偏移一致,就结束
if (!terminate) {
val arrive = new util.HashSet[TopicPartition](partitions)
val props = new Properties()
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
withConsumerAndCatchError(consumer => {
consumer.assign(partitions)
for ((tp, off) <- startOffTable) {
consumer.seek(tp, off)
}
// 终止条件
// 1.所有查询分区达都到最大偏移的时候
while (!terminate) {
// 达到查询的最大条数
if (res.size() >= maxNums) {
terminate = true
} else {
val records = consumer.poll(Duration.ofMillis(timeoutMs))
if (records.isEmpty) {
terminate = true
} else {
for ((tp, endOff) <- endOffTable) {
if (!terminate) {
var recordList = records.records(tp)
if (!recordList.isEmpty) {
val first = recordList.get(0)
if (first.offset() >= endOff) {
arrive.remove(tp)
} else {
//
// (String topic,
// int partition,
// long offset,
// long timestamp,
// TimestampType timestampType,
// Long checksum,
// int serializedKeySize,
// int serializedValueSize,
// K key,
// V value,
// Headers headers,
// Optional<Integer> leaderEpoch)
val nullVList = recordList.asScala.map(record => new ConsumerRecord[Array[Byte], Array[Byte]](record.topic(),
record.partition(),
record.offset(),
record.timestamp(),
record.timestampType(),
record.checksum(),
record.serializedKeySize(),
record.serializedValueSize(),
record.key(),
null,
record.headers(),
record.leaderEpoch())).toSeq.asJava
res.addAll(nullVList)
if (recordList.get(recordList.size() - 1).offset() >= endOff) {
arrive.remove(tp)
}
if (recordList != null) {
recordList = null
}
}
}
if (arrive.isEmpty) {
terminate = true
}
}
}
}
}
}
}, e => {
log.error("searchBy time error.", e)
})
}
res
}
def searchBy(
tp2o: util.Map[TopicPartition, Long]): util.Map[TopicPartition, ConsumerRecord[Array[Byte], Array[Byte]]] = {
val props = new Properties()
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
val res = new util.HashMap[TopicPartition, ConsumerRecord[Array[Byte], Array[Byte]]]()
withConsumerAndCatchError(consumer => {
var tpSet = tp2o.keySet()
val tpSetCopy = new util.HashSet[TopicPartition](tpSet)
val endOffsets = consumer.endOffsets(tpSet)
val beginOffsets = consumer.beginningOffsets(tpSet)
for ((tp, off) <- tp2o.asScala) {
val endOff = endOffsets.get(tp)
// if (endOff <= off) {
// consumer.seek(tp, endOff)
// tpSetCopy.remove(tp)
// } else {
// consumer.seek(tp, off)
// }
val beginOff = beginOffsets.get(tp)
if (off < beginOff || off >= endOff) {
tpSetCopy.remove(tp)
}
}
tpSet = tpSetCopy
consumer.assign(tpSet)
tpSet.asScala.foreach(tp => {
consumer.seek(tp, tp2o.get(tp))
})
var terminate = tpSet.isEmpty
while (!terminate) {
val records = consumer.poll(Duration.ofMillis(timeoutMs))
val tps = new util.HashSet(tpSet).asScala
for (tp <- tps) {
if (!res.containsKey(tp)) {
val recordList = records.records(tp)
if (!recordList.isEmpty) {
val record = recordList.get(0)
res.put(tp, record)
tpSet.remove(tp)
}
}
if (tpSet.isEmpty) {
terminate = true
}
}
}
}, e => {
log.error("searchBy offset error.", e)
})
res
}
def send(topic: String, partition: Int, key: String, value: String, num: Int): Unit = {
withProducerAndCatchError(producer => {
val nullKey = if (key != null && key.trim().length() == 0) null else key
for (a <- 1 to num) {
val record = if (partition != -1) new ProducerRecord[String, String](topic, partition, nullKey, value)
else new ProducerRecord[String, String](topic, nullKey, value)
producer.send(record)
}
}, e => log.error("send error.", e))
}
def sendSync(record: ProducerRecord[Array[Byte], Array[Byte]]): (Boolean, String) = {
withByteProducerAndCatchError(producer => {
val metadata = producer.send(record).get()
(true, metadata.toString())
}, e => {
log.error("send error.", e)
(false, e.getMessage)
}).asInstanceOf[(Boolean, String)]
}
}

View File

@@ -1,15 +1,15 @@
package kafka.console
import java.util.concurrent.TimeUnit
import java.util.{Collections, Properties}
import com.xuxd.kafka.console.config.KafkaConfig
import org.apache.kafka.clients.admin.ElectLeadersOptions
import kafka.admin.ReassignPartitionsCommand
import org.apache.kafka.clients.admin.{ElectLeadersOptions, ListPartitionReassignmentsOptions, PartitionReassignment}
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.common.serialization.ByteArrayDeserializer
import org.apache.kafka.common.{ElectionType, TopicPartition}
import scala.jdk.CollectionConverters.{CollectionHasAsScala, ListHasAsScala, MapHasAsScala, SeqHasAsJava, SetHasAsJava, SetHasAsScala}
import java.util.concurrent.TimeUnit
import java.util.{Collections, Properties}
import scala.jdk.CollectionConverters.{CollectionHasAsScala, ListHasAsScala, MapHasAsJava, MapHasAsScala, SeqHasAsJava, SetHasAsJava, SetHasAsScala}
/**
* kafka-console-ui.
@@ -210,4 +210,47 @@ class OperationConsole(config: KafkaConfig, topicConsole: TopicConsole,
val topicList = topicConsole.getTopicList(Collections.singleton(topic))
topicList.asScala.flatMap(_.partitions().asScala.map(t => new TopicPartition(topic, t.partition()))).toSet.asJava
}
}
def modifyInterBrokerThrottle(reassigningBrokers: util.Set[Int],
interBrokerThrottle: Long): (Boolean, String) = {
withAdminClientAndCatchError(admin => {
ReassignPartitionsCommand.modifyInterBrokerThrottle(admin, reassigningBrokers.asScala.toSet, interBrokerThrottle)
(true, "")
}, e => {
log.error("modifyInterBrokerThrottle error.", e)
(false, e.getMessage)
}).asInstanceOf[(Boolean, String)]
}
def clearBrokerLevelThrottles(brokers: util.Set[Int]): (Boolean, String) = {
withAdminClientAndCatchError(admin => {
ReassignPartitionsCommand.clearBrokerLevelThrottles(admin, brokers.asScala.toSet)
(true, "")
}, e => {
log.error("clearBrokerLevelThrottles error.", e)
(false, e.getMessage)
}).asInstanceOf[(Boolean, String)]
}
/**
* current reassigning is active.
*/
def currentReassignments(): util.Map[TopicPartition, PartitionReassignment] = {
withAdminClientAndCatchError(admin => {
admin.listPartitionReassignments(withTimeoutMs(new ListPartitionReassignmentsOptions)).reassignments().get()
}, e => {
Collections.emptyMap()
log.error("listPartitionReassignments error.", e)
}).asInstanceOf[util.Map[TopicPartition, PartitionReassignment]]
}
def cancelPartitionReassignments(reassignments: util.Set[TopicPartition]): util.Map[TopicPartition, Throwable] = {
withAdminClientAndCatchError(admin => {
val res = ReassignPartitionsCommand.cancelPartitionReassignments(admin, reassignments.asScala.toSet)
res.asJava
}, e => {
log.error("cancelPartitionReassignments error.", e)
throw e
}).asInstanceOf[util.Map[TopicPartition, Throwable]]
}
}

View File

@@ -1,14 +1,18 @@
package kafka.console
import java.util
import java.util.concurrent.TimeUnit
import java.util.{Collections, List, Set}
import com.xuxd.kafka.console.config.KafkaConfig
import kafka.admin.ReassignPartitionsCommand._
import kafka.utils.Json
import org.apache.kafka.clients.admin._
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException
import org.apache.kafka.common.utils.Time
import org.apache.kafka.common.{TopicPartition, TopicPartitionReplica}
import scala.jdk.CollectionConverters.{CollectionHasAsScala, SetHasAsJava}
import java.util
import java.util.concurrent.{ExecutionException, TimeUnit}
import java.util.{Collections, List, Set}
import scala.collection.{Map, Seq}
import scala.jdk.CollectionConverters.{CollectionHasAsScala, MapHasAsJava, MapHasAsScala, SeqHasAsJava, SetHasAsJava}
/**
* kafka-console-ui.
@@ -121,4 +125,180 @@ class TopicConsole(config: KafkaConfig) extends KafkaConsole(config: KafkaConfig
(false, e.getMessage)
}).asInstanceOf[(Boolean, String)]
}
def getCurrentReplicaAssignmentJson(topic: String): (Boolean, String) = {
withAdminClientAndCatchError(admin => {
val json = formatAsReassignmentJson(getReplicaAssignmentForTopics(admin, Seq(topic)), Map.empty)
(true, json)
}, e => {
log.error("getCurrentReplicaAssignmentJson error, ", e)
(false, e.getMessage)
}).asInstanceOf[(Boolean, String)]
}
def updateReplicas(reassignmentJson: String, interBrokerThrottle: Long = -1L): (Boolean, String) = {
withAdminClientAndCatchError(admin => {
executeAssignment(admin, reassignmentJson, interBrokerThrottle)
(true, "")
}, e => {
log.error("executeAssignment error, ", e)
(false, e.getMessage)
}).asInstanceOf[(Boolean, String)]
}
/**
* Copy and modify from @{link kafka.admin.ReassignPartitionsCommand#executeAssignment}.
*/
def executeAssignment(adminClient: Admin,
reassignmentJson: String,
interBrokerThrottle: Long = -1L,
logDirThrottle: Long = -1L,
timeoutMs: Long = 30000L,
time: Time = Time.SYSTEM): Unit = {
val (proposedParts, proposedReplicas) = parseExecuteAssignmentArgs(reassignmentJson)
val currentReassignments = adminClient.
listPartitionReassignments().reassignments().get().asScala
// If there is an existing assignment
// This helps avoid surprising users.
if (currentReassignments.nonEmpty) {
throw new TerseReassignmentFailureException("Cannot execute because there is an existing partition assignment.")
}
verifyBrokerIds(adminClient, proposedParts.values.flatten.toSet)
val currentParts = getReplicaAssignmentForPartitions(adminClient, proposedParts.keySet.toSet)
log.info("currentPartitionReplicaAssignment: " + currentPartitionReplicaAssignmentToString(proposedParts, currentParts))
log.info(s"newPartitionReplicaAssignment: $reassignmentJson")
if (interBrokerThrottle >= 0 || logDirThrottle >= 0) {
if (interBrokerThrottle >= 0) {
val moveMap = calculateProposedMoveMap(currentReassignments, proposedParts, currentParts)
modifyReassignmentThrottle(adminClient, moveMap)
}
if (logDirThrottle >= 0) {
val movingBrokers = calculateMovingBrokers(proposedReplicas.keySet.toSet)
modifyLogDirThrottle(adminClient, movingBrokers, logDirThrottle)
}
}
// Execute the partition reassignments.
val errors = alterPartitionReassignments(adminClient, proposedParts)
if (errors.nonEmpty) {
throw new TerseReassignmentFailureException(
"Error reassigning partition(s):%n%s".format(
errors.keySet.toBuffer.sortWith(compareTopicPartitions).map { part =>
s"$part: ${errors(part).getMessage}"
}.mkString(System.lineSeparator())))
}
if (proposedReplicas.nonEmpty) {
executeMoves(adminClient, proposedReplicas, timeoutMs, time)
}
}
def configThrottle(topic: String, partitions: util.List[Integer]): (Boolean, String) = {
withAdminClientAndCatchError(admin => {
val throttles = {
if (partitions.get(0) == -1) {
Map(topic -> "*")
} else {
val topicDescription = admin.describeTopics(Collections.singleton(topic), withTimeoutMs(new DescribeTopicsOptions))
.all().get().values().asScala.toList
def convert(partition: Integer, replicas: scala.List[Int]): String = {
replicas.map("%d:%d".format(partition, _)).toSet.mkString(",")
}
val ptor = topicDescription.head.partitions().asScala.map(info => (info.partition(), info.replicas().asScala.map(_.id()))).toMap
val conf = partitions.asScala.map(partition => convert(partition, ptor.get(partition) match {
case Some(v) => v.toList
case None => throw new IllegalArgumentException
})).toList
Map(topic -> conf.mkString(","))
}
}
modifyTopicThrottles(admin, throttles, throttles)
(true, "")
}, e => {
log.error("configThrottle error, ", e)
(false, e.getMessage)
}).asInstanceOf[(Boolean, String)]
}
def clearThrottle(topic: String): (Boolean, String) = {
withAdminClientAndCatchError(admin => {
clearTopicLevelThrottles(admin, Collections.singleton(topic).asScala.toSet)
(true, "")
}, e => {
log.error("clearThrottle error, ", e)
(false, e.getMessage)
}).asInstanceOf[(Boolean, String)]
}
def getOffsetForTimestamp(topic: String, timestamp: java.lang.Long): util.Map[TopicPartition, java.lang.Long] = {
withAdminClientAndCatchError(admin => {
val partitions = describeTopics(admin, Collections.singleton(topic)).get(topic) match {
case Some(topicDescription: TopicDescription) => topicDescription.partitions()
.asScala.map(info => new TopicPartition(topic, info.partition())).toSeq
case None => throw new IllegalArgumentException("topic is not exist.")
}
val offsetMap = KafkaConsole.getLogTimestampOffsets(admin, partitions, timestamp, timeoutMs)
offsetMap.map(tuple2 => (tuple2._1, tuple2._2.offset())).toMap.asJava
}, e => {
log.error("clearThrottle error, ", e)
Collections.emptyMap()
}).asInstanceOf[util.Map[TopicPartition, java.lang.Long]]
}
/**
* Get the current replica assignments for some topics.
*
* @param adminClient The AdminClient to use.
* @param topics The topics to get information about.
* @return A map from partitions to broker assignments.
* If any topic can't be found, an exception will be thrown.
*/
private def getReplicaAssignmentForTopics(adminClient: Admin,
topics: Seq[String])
: Map[TopicPartition, Seq[Int]] = {
describeTopics(adminClient, topics.toSet.asJava).flatMap {
case (topicName, topicDescription) => topicDescription.partitions.asScala.map { info =>
(new TopicPartition(topicName, info.partition), info.replicas.asScala.map(_.id).toSeq)
}
}
}
private def formatAsReassignmentJson(partitionsToBeReassigned: Map[TopicPartition, Seq[Int]],
replicaLogDirAssignment: Map[TopicPartitionReplica, String]): String = {
Json.encodeAsString(Map(
"version" -> 1,
"partitions" -> partitionsToBeReassigned.keySet.toBuffer.sortWith(compareTopicPartitions).map {
tp =>
val replicas = partitionsToBeReassigned(tp)
Map(
"topic" -> tp.topic,
"partition" -> tp.partition,
"replicas" -> replicas.asJava
).asJava
}.asJava
).asJava)
}
private def describeTopics(adminClient: Admin,
topics: Set[String])
: Map[String, TopicDescription] = {
adminClient.describeTopics(topics).values.asScala.map { case (topicName, topicDescriptionFuture) =>
try topicName -> topicDescriptionFuture.get
catch {
case t: ExecutionException if t.getCause.isInstanceOf[UnknownTopicOrPartitionException] =>
throw new ExecutionException(
new UnknownTopicOrPartitionException(s"Topic $topicName not found."))
}
}
}
private def modifyReassignmentThrottle(admin: Admin, moveMap: MoveMap): Unit = {
val leaderThrottles = calculateLeaderThrottles(moveMap)
val followerThrottles = calculateFollowerThrottles(moveMap)
modifyTopicThrottles(admin, leaderThrottles, followerThrottles)
}
}

View File

@@ -1,6 +1,7 @@
<template>
<div id="app">
<div id="nav">
<h2 class="logo">Kafka 控制台</h2>
<router-link to="/" class="pad-l-r">主页</router-link>
<span>|</span
><router-link to="/cluster-page" class="pad-l-r">集群</router-link>
@@ -8,6 +9,8 @@
><router-link to="/topic-page" class="pad-l-r">Topic</router-link>
<span>|</span
><router-link to="/group-page" class="pad-l-r">消费组</router-link>
<span>|</span
><router-link to="/message-page" class="pad-l-r">消息</router-link>
<span v-show="config.enableAcl">|</span
><router-link to="/acl-page" class="pad-l-r" v-show="config.enableAcl"
>Acl</router-link
@@ -44,7 +47,6 @@ export default {
font-family: Avenir, Helvetica, Arial, sans-serif;
-webkit-font-smoothing: antialiased;
-moz-osx-font-smoothing: grayscale;
text-align: center;
color: #2c3e50;
}
@@ -59,6 +61,7 @@ export default {
padding-top: 1%;
padding-bottom: 1%;
margin-bottom: 1%;
text-align: center;
}
#nav a {
@@ -81,4 +84,10 @@ export default {
height: 90%;
width: 100%;
}
.logo {
float: left;
left: 1%;
top: 1%;
position: absolute;
}
</style>

View File

@@ -43,6 +43,12 @@ const routes = [
component: () =>
import(/* webpackChunkName: "cluster" */ "../views/cluster/Cluster.vue"),
},
{
path: "/message-page",
name: "Message",
component: () =>
import(/* webpackChunkName: "cluster" */ "../views/message/Message.vue"),
},
];
const router = new VueRouter({

View File

@@ -117,6 +117,22 @@ export const KafkaTopicApi = {
url: "/topic/partition/new",
method: "post",
},
getCurrentReplicaAssignment: {
url: "/topic/replica/assignment",
method: "get",
},
updateReplicaAssignment: {
url: "/topic/replica/assignment",
method: "post",
},
configThrottle: {
url: "/topic/replica/throttle",
method: "post",
},
sendStats: {
url: "/topic/send/stats",
method: "get",
},
};
export const KafkaConsumerApi = {
@@ -190,4 +206,46 @@ export const KafkaOpApi = {
url: "/op/replication/preferred",
method: "post",
},
configThrottle: {
url: "/op/broker/throttle",
method: "post",
},
removeThrottle: {
url: "/op/broker/throttle",
method: "delete",
},
currentReassignments: {
url: "/op/replication/reassignments",
method: "get",
},
cancelReassignment: {
url: "/op/replication/reassignments",
method: "delete",
},
};
export const KafkaMessageApi = {
searchByTime: {
url: "/message/search/time",
method: "post",
},
searchByOffset: {
url: "/message/search/offset",
method: "post",
},
searchDetail: {
url: "/message/search/detail",
method: "post",
},
deserializerList: {
url: "/message/deserializer/list",
method: "get",
},
send: {
url: "/message/send",
method: "post",
},
resend: {
url: "/message/resend",
method: "post",
},
};

View File

@@ -6,7 +6,7 @@ import { VueAxios } from "./axios";
const request = axios.create({
// API 请求的默认前缀
baseURL: process.env.VUE_APP_API_BASE_URL,
timeout: 30000, // 请求超时时间
timeout: 120000, // 请求超时时间
});
// 异常拦截处理器

View File

@@ -47,6 +47,15 @@
@click="openResetOffsetByTimeDialog(k)"
>时间戳
</a-button>
<a-button
type="primary"
icon="reload"
size="small"
style="float: right"
@click="getConsumerDetail"
>
刷新
</a-button>
<hr />
<a-table
:columns="columns"
@@ -70,6 +79,11 @@
</a-button>
</div>
</a-table>
<p>
<strong style="color: red"
>注意重置位点时要求当前没有正在运行的消费端否则重置的时候会报错返回失败信息</strong
>
</p>
</div>
<a-modal

View File

@@ -32,6 +32,10 @@
/>
</a-form-item>
</a-form>
<hr />
<p>
*注意该时间为北京时间这里固定为东8区的计算时间如果所在地区不是采用北京时间中国大部分地区都是采用的北京时间请自行对照为当地时间重置
</p>
</a-spin>
</div>
</a-modal>

View File

@@ -0,0 +1,58 @@
<template>
<div class="content">
<a-spin :spinning="loading">
<a-tabs default-active-key="1" size="large" tabPosition="top">
<a-tab-pane key="1" tab="根据时间查询消息">
<SearchByTime :topic-list="topicList"></SearchByTime>
</a-tab-pane>
<a-tab-pane key="2" tab="根据偏移查询消息">
<SearchByOffset :topic-list="topicList"></SearchByOffset>
</a-tab-pane>
<a-tab-pane key="3" tab="在线发送">
<SendMessage :topic-list="topicList"></SendMessage>
</a-tab-pane>
</a-tabs>
</a-spin>
</div>
</template>
<script>
import SearchByTime from "@/views/message/SearchByTime";
import SearchByOffset from "@/views/message/SearchByOffset";
import request from "@/utils/request";
import { KafkaTopicApi } from "@/utils/api";
import notification from "ant-design-vue/lib/notification";
import SendMessage from "@/views/message/SendMessage";
export default {
name: "Message",
components: { SearchByTime, SearchByOffset, SendMessage },
data() {
return {
loading: false,
topicList: [],
};
},
methods: {
getTopicNameList() {
request({
url: KafkaTopicApi.getTopicNameList.url,
method: KafkaTopicApi.getTopicNameList.method,
}).then((res) => {
if (res.code == 0) {
this.topicList = res.data;
} else {
notification.error({
message: "error",
description: res.msg,
});
}
});
},
},
created() {
this.getTopicNameList();
},
};
</script>
<style scoped></style>

View File

@@ -0,0 +1,265 @@
<template>
<a-modal
title="消息详情"
:visible="show"
:width="800"
:mask="false"
:destroyOnClose="true"
:footer="null"
:maskClosable="false"
@cancel="handleCancel"
>
<div>
<a-spin :spinning="loading">
<div>
<h4>消息信息</h4>
<hr />
<div class="message-detail" id="message-detail">
<p>
<label class="title">Topic: </label>
<span class="m-info">{{ data.topic }}</span>
</p>
<p>
<label class="title">分区: </label>
<span class="m-info">{{ data.partition }}</span>
</p>
<p>
<label class="title">偏移: </label>
<span class="m-info">{{ data.offset }}</span>
</p>
<p>
<label class="title">消息头: </label>
<span class="m-info">{{ data.headers }}</span>
</p>
<p>
<label class="title">时间类型: </label>
<span class="m-info"
>{{
data.timestampType
}}(表示下面的时间是哪种类型消息创建写入日志亦或其它)</span
>
</p>
<p>
<label class="title">时间: </label>
<span class="m-info">{{ formatTime(data.timestamp) }}</span>
</p>
<p>
<label class="title">Key反序列化: </label>
<a-select
style="width: 120px"
v-model="keyDeserializer"
@change="keyDeserializerChange"
>
<a-select-option
v-for="v in deserializerList"
:key="v"
:value="v"
>
{{ v }}
</a-select-option>
</a-select>
<span>选一个合适反序列化器要不可能乱码了</span>
</p>
<p>
<label class="title">Key: </label>
<span class="m-info">{{ data.key }}</span>
</p>
<p>
<label class="title">消息体反序列化: </label>
<a-select
v-model="valueDeserializer"
style="width: 120px"
@change="valueDeserializerChange"
>
<a-select-option
v-for="v in deserializerList"
:key="v"
:value="v"
>
{{ v }}
</a-select-option>
</a-select>
<span>选一个合适反序列化器要不可能乱码了</span>
</p>
<p>
<label class="title">消息体: </label>
<a-textarea
type="textarea"
:value="data.value"
:rows="5"
:read-only="true"
></a-textarea>
</p>
</div>
</div>
<div>
<h4>消费信息</h4>
<hr />
<a-table
:columns="columns"
:data-source="data.consumers"
bordered
row-key="groupId"
>
<div slot="status" slot-scope="text">
<span v-if="text == 'consumed'">已消费</span
><span v-else style="color: red">未消费</span>
</div>
</a-table>
</div>
<div>
<h4>操作</h4>
<hr />
<a-popconfirm
title="确定将当前这条消息重新发回broker"
ok-text="确认"
cancel-text="取消"
@confirm="resend"
>
<a-button type="primary" icon="reload"> 重新发送 </a-button>
</a-popconfirm>
</div>
</a-spin>
</div>
</a-modal>
</template>
<script>
import request from "@/utils/request";
import { KafkaMessageApi } from "@/utils/api";
import notification from "ant-design-vue/lib/notification";
import moment from "moment";
export default {
name: "MessageDetail",
props: {
record: {},
visible: {
type: Boolean,
default: false,
},
},
data() {
return {
show: this.visible,
data: {},
loading: false,
deserializerList: [],
keyDeserializer: "String",
valueDeserializer: "String",
consumerDetail: [],
columns,
};
},
watch: {
visible(v) {
this.show = v;
if (this.show) {
this.getMessageDetail();
this.getDeserializerList();
}
},
},
methods: {
getMessageDetail() {
this.loading = true;
const params = Object.assign({}, this.record, {
keyDeserializer: this.keyDeserializer,
valueDeserializer: this.valueDeserializer,
});
request({
url: KafkaMessageApi.searchDetail.url,
method: KafkaMessageApi.searchDetail.method,
data: params,
}).then((res) => {
this.loading = false;
if (res.code != 0) {
notification.error({
message: "error",
description: res.msg,
});
} else {
this.data = res.data;
}
});
},
getDeserializerList() {
request({
url: KafkaMessageApi.deserializerList.url,
method: KafkaMessageApi.deserializerList.method,
}).then((res) => {
if (res.code != 0) {
notification.error({
message: "error",
description: res.msg,
});
} else {
this.deserializerList = res.data;
}
});
},
handleCancel() {
this.data = {};
this.$emit("closeDetailDialog", { refresh: false });
},
formatTime(time) {
return moment(time).format("YYYY-MM-DD HH:mm:ss:SSS");
},
keyDeserializerChange() {
this.getMessageDetail();
},
valueDeserializerChange() {
this.getMessageDetail();
},
resend() {
const params = Object.assign({}, this.data);
this.loading = true;
request({
url: KafkaMessageApi.resend.url,
method: KafkaMessageApi.resend.method,
data: params,
}).then((res) => {
this.loading = false;
if (res.code != 0) {
notification.error({
message: "error",
description: res.msg,
});
} else {
this.$message.success(res.msg);
}
});
},
},
};
const columns = [
{
title: "消费组",
dataIndex: "groupId",
key: "groupId",
},
{
title: "消费情况",
dataIndex: "status",
key: "status",
scopedSlots: { customRender: "status" },
},
];
</script>
<style scoped>
.m-info {
/*text-decoration: underline;*/
}
.title {
width: 15%;
display: inline-block;
text-align: right;
margin-right: 2%;
font-weight: bold;
}
.ant-spin-container #message-detail textarea {
max-width: 80% !important;
vertical-align: top !important;
}
</style>

View File

@@ -0,0 +1,95 @@
<template>
<div>
<a-table
:columns="columns"
:data-source="data"
bordered
:row-key="
(record, index) => {
return index;
}
"
>
<div slot="operation" slot-scope="record">
<a-button
size="small"
href="javascript:;"
class="operation-btn"
@click="openDetailDialog(record)"
>消息详情
</a-button>
</div>
</a-table>
<MessageDetail
:visible="showDetailDialog"
:record="record"
@closeDetailDialog="closeDetailDialog"
></MessageDetail>
</div>
</template>
<script>
import moment from "moment";
import MessageDetail from "@/views/message/MessageDetail";
export default {
name: "MessageList",
components: { MessageDetail },
props: {
data: {
type: Array,
},
},
data() {
return {
columns: columns,
showDetailDialog: false,
record: {},
};
},
methods: {
openDetailDialog(record) {
this.record = record;
this.showDetailDialog = true;
},
closeDetailDialog() {
this.showDetailDialog = false;
},
},
};
const columns = [
{
title: "topic",
dataIndex: "topic",
key: "topic",
width: 300,
},
{
title: "分区",
dataIndex: "partition",
key: "partition",
},
{
title: "偏移",
dataIndex: "offset",
key: "offset",
},
{
title: "时间",
dataIndex: "timestamp",
key: "timestamp",
slots: { title: "timestamp" },
scopedSlots: { customRender: "timestamp" },
customRender: (text) => {
return moment(text).format("YYYY-MM-DD HH:mm:ss:SSS");
},
},
{
title: "操作",
key: "operation",
scopedSlots: { customRender: "operation" },
width: 200,
},
];
</script>
<style scoped></style>

View File

@@ -0,0 +1,195 @@
<template>
<div class="tab-content">
<a-spin :spinning="loading">
<div id="search-offset-form-advanced-search">
<a-form
class="ant-advanced-search-form"
:form="form"
@submit="handleSearch"
>
<a-row :gutter="24">
<a-col :span="9">
<a-form-item label="topic">
<a-select
class="topic-select"
@change="handleTopicChange"
show-search
option-filter-prop="children"
v-decorator="[
'topic',
{
rules: [{ required: true, message: '请选择一个topic!' }],
},
]"
placeholder="请选择一个topic"
>
<a-select-option v-for="v in topicList" :key="v" :value="v">
{{ v }}
</a-select-option>
</a-select>
</a-form-item>
</a-col>
<a-col :span="6">
<a-form-item label="分区">
<a-select
class="type-select"
show-search
option-filter-prop="children"
v-model="selectPartition"
placeholder="请选择一个分区"
>
<a-select-option v-for="v in partitions" :key="v" :value="v">
<span v-if="v == -1">全部</span> <span v-else>{{ v }}</span>
</a-select-option>
</a-select>
</a-form-item>
</a-col>
<a-col :span="7">
<a-form-item label="偏移">
<a-input
v-decorator="[
'offset',
{
rules: [{ required: true, message: '请输入消息偏移!' }],
},
]"
placeholder="消息偏移"
/>
</a-form-item>
</a-col>
<a-col :span="2" :style="{ textAlign: 'right' }">
<a-form-item>
<a-button type="primary" html-type="submit"> 搜索</a-button>
</a-form-item>
</a-col>
</a-row>
</a-form>
</div>
<MessageList :data="data"></MessageList>
</a-spin>
</div>
</template>
<script>
import request from "@/utils/request";
import { KafkaMessageApi, KafkaTopicApi } from "@/utils/api";
import notification from "ant-design-vue/lib/notification";
import MessageList from "@/views/message/MessageList";
export default {
name: "SearchByOffset",
components: { MessageList },
props: {
topicList: {
type: Array,
},
},
data() {
return {
loading: false,
form: this.$form.createForm(this, { name: "message_search_offset" }),
partitions: [],
selectPartition: undefined,
rangeConfig: {
rules: [{ type: "array", required: true, message: "请选择时间!" }],
},
data: defaultData,
};
},
methods: {
handleSearch(e) {
e.preventDefault();
this.form.validateFields((err, values) => {
if (!err) {
const data = Object.assign({}, values, {
partition: this.selectPartition,
});
this.loading = true;
request({
url: KafkaMessageApi.searchByOffset.url,
method: KafkaMessageApi.searchByOffset.method,
data: data,
}).then((res) => {
this.loading = false;
if (res.code == 0) {
this.$message.success(res.msg);
this.data = res.data;
} else {
notification.error({
message: "error",
description: res.msg,
});
}
});
}
});
},
getPartitionInfo(topic) {
this.loading = true;
request({
url: KafkaTopicApi.getPartitionInfo.url + "?topic=" + topic,
method: KafkaTopicApi.getPartitionInfo.method,
}).then((res) => {
this.loading = false;
if (res.code != 0) {
notification.error({
message: "error",
description: res.msg,
});
} else {
this.partitions = res.data.map((v) => v.partition);
this.partitions.splice(0, 0, -1);
}
});
},
handleTopicChange(topic) {
this.selectPartition = -1;
this.getPartitionInfo(topic);
},
},
};
const defaultData = [];
</script>
<style scoped>
.tab-content {
width: 100%;
height: 100%;
}
.ant-advanced-search-form {
padding: 24px;
background: #fbfbfb;
border: 1px solid #d9d9d9;
border-radius: 6px;
}
.ant-advanced-search-form .ant-form-item {
display: flex;
}
.ant-advanced-search-form .ant-form-item-control-wrapper {
flex: 1;
}
#components-form-topic-advanced-search .ant-form {
max-width: none;
margin-bottom: 1%;
}
#search-offset-form-advanced-search .search-result-list {
margin-top: 16px;
border: 1px dashed #e9e9e9;
border-radius: 6px;
background-color: #fafafa;
min-height: 200px;
text-align: center;
padding-top: 80px;
}
.topic-select {
width: 400px !important;
}
.type-select {
width: 200px !important;
}
</style>

View File

@@ -0,0 +1,201 @@
<template>
<div class="tab-content">
<a-spin :spinning="loading">
<div id="search-time-form-advanced-search">
<a-form
class="ant-advanced-search-form"
:form="form"
@submit="handleSearch"
>
<a-row :gutter="24">
<a-col :span="9">
<a-form-item label="topic">
<a-select
class="topic-select"
@change="handleTopicChange"
show-search
option-filter-prop="children"
v-decorator="[
'topic',
{
rules: [{ required: true, message: '请选择一个topic!' }],
},
]"
placeholder="请选择一个topic"
>
<a-select-option v-for="v in topicList" :key="v" :value="v">
{{ v }}
</a-select-option>
</a-select>
</a-form-item>
</a-col>
<a-col :span="5">
<a-form-item label="分区">
<a-select
class="type-select"
show-search
option-filter-prop="children"
v-model="selectPartition"
placeholder="请选择一个分区"
>
<a-select-option v-for="v in partitions" :key="v" :value="v">
<span v-if="v == -1">全部</span> <span v-else>{{ v }}</span>
</a-select-option>
</a-select>
</a-form-item>
</a-col>
<a-col :span="8">
<a-form-item label="时间">
<a-range-picker
v-decorator="['time', rangeConfig]"
show-time
format="YYYY-MM-DD HH:mm:ss"
/>
</a-form-item>
</a-col>
<a-col :span="2" :style="{ textAlign: 'right' }">
<a-form-item>
<a-button type="primary" html-type="submit"> 搜索</a-button>
</a-form-item>
</a-col>
</a-row>
</a-form>
</div>
<p style="margin-top: 1%">
<strong
>检索条数:{{ data.realNum }},允许返回的最大条数:{{
data.maxNum
}}</strong
>
</p>
<MessageList :data="data.data"></MessageList>
</a-spin>
</div>
</template>
<script>
import request from "@/utils/request";
import { KafkaMessageApi, KafkaTopicApi } from "@/utils/api";
import notification from "ant-design-vue/lib/notification";
import MessageList from "@/views/message/MessageList";
export default {
name: "SearchByTime",
components: { MessageList },
props: {
topicList: {
type: Array,
},
},
data() {
return {
loading: false,
form: this.$form.createForm(this, { name: "message_search_time" }),
partitions: [],
selectPartition: undefined,
rangeConfig: {
rules: [{ type: "array", required: true, message: "请选择时间!" }],
},
data: defaultData,
};
},
methods: {
handleSearch(e) {
e.preventDefault();
this.form.validateFields((err, values) => {
if (!err) {
const data = Object.assign({}, values, {
partition: this.selectPartition,
});
data.startTime = values.time[0].valueOf();
data.endTime = values.time[1];
this.loading = true;
request({
url: KafkaMessageApi.searchByTime.url,
method: KafkaMessageApi.searchByTime.method,
data: data,
}).then((res) => {
this.loading = false;
if (res.code == 0) {
this.$message.success(res.msg);
this.data = res.data;
} else {
notification.error({
message: "error",
description: res.msg,
});
}
});
}
});
},
getPartitionInfo(topic) {
this.loading = true;
request({
url: KafkaTopicApi.getPartitionInfo.url + "?topic=" + topic,
method: KafkaTopicApi.getPartitionInfo.method,
}).then((res) => {
this.loading = false;
if (res.code != 0) {
notification.error({
message: "error",
description: res.msg,
});
} else {
this.partitions = res.data.map((v) => v.partition);
this.partitions.splice(0, 0, -1);
}
});
},
handleTopicChange(topic) {
this.selectPartition = -1;
this.getPartitionInfo(topic);
},
},
};
const defaultData = { realNum: 0, maxNum: 0 };
</script>
<style scoped>
.tab-content {
width: 100%;
height: 100%;
}
.ant-advanced-search-form {
padding: 24px;
background: #fbfbfb;
border: 1px solid #d9d9d9;
border-radius: 6px;
}
.ant-advanced-search-form .ant-form-item {
display: flex;
}
.ant-advanced-search-form .ant-form-item-control-wrapper {
flex: 1;
}
#components-form-topic-advanced-search .ant-form {
max-width: none;
margin-bottom: 1%;
}
#search-time-form-advanced-search .search-result-list {
margin-top: 16px;
border: 1px dashed #e9e9e9;
border-radius: 6px;
background-color: #fafafa;
min-height: 200px;
text-align: center;
padding-top: 80px;
}
.topic-select {
width: 400px !important;
}
.type-select {
width: 150px !important;
}
</style>

View File

@@ -0,0 +1,173 @@
<template>
<div class="content">
<a-spin :spinning="loading">
<a-form :form="form" @submit="handleSubmit">
<a-form-item label="Topic">
<a-select
class="topic-select"
@change="handleTopicChange"
show-search
option-filter-prop="children"
v-decorator="[
'topic',
{
rules: [{ required: true, message: '请选择一个topic!' }],
},
]"
placeholder="请选择一个topic"
>
<a-select-option v-for="v in topicList" :key="v" :value="v">
{{ v }}
</a-select-option>
</a-select>
</a-form-item>
<a-form-item label="分区">
<a-select
class="type-select"
show-search
option-filter-prop="children"
v-model="selectPartition"
placeholder="请选择一个分区"
>
<a-select-option v-for="v in partitions" :key="v" :value="v">
<span v-if="v == -1">默认</span> <span v-else>{{ v }}</span>
</a-select-option>
</a-select>
</a-form-item>
<a-form-item label="消息Key">
<a-input v-decorator="['key', { initialValue: 'key' }]" />
</a-form-item>
<a-form-item label="消息体" has-feedback>
<a-textarea
v-decorator="[
'body',
{
rules: [
{
required: true,
message: '输入消息体!',
},
],
},
]"
placeholder="输入消息体!"
/>
</a-form-item>
<a-form-item label="发送的消息数">
<a-input-number
v-decorator="[
'num',
{
initialValue: 1,
rules: [
{
required: true,
message: '输入消息数!',
},
],
},
]"
:min="1"
:max="32"
/>
</a-form-item>
<a-form-item :wrapper-col="{ span: 12, offset: 5 }">
<a-button type="primary" html-type="submit"> 提交 </a-button>
</a-form-item>
</a-form>
</a-spin>
</div>
</template>
<script>
import request from "@/utils/request";
import { KafkaTopicApi, KafkaMessageApi } from "@/utils/api";
import notification from "ant-design-vue/lib/notification";
export default {
name: "SendMessage",
components: {},
props: {
topicList: {
type: Array,
},
},
data() {
return {
form: this.$form.createForm(this, { name: "message_send" }),
loading: false,
partitions: [],
selectPartition: undefined,
};
},
methods: {
getTopicNameList() {
request({
url: KafkaTopicApi.getTopicNameList.url,
method: KafkaTopicApi.getTopicNameList.method,
}).then((res) => {
if (res.code == 0) {
this.topicList = res.data;
} else {
notification.error({
message: "error",
description: res.msg,
});
}
});
},
getPartitionInfo(topic) {
this.loading = true;
request({
url: KafkaTopicApi.getPartitionInfo.url + "?topic=" + topic,
method: KafkaTopicApi.getPartitionInfo.method,
}).then((res) => {
this.loading = false;
if (res.code != 0) {
notification.error({
message: "error",
description: res.msg,
});
} else {
this.partitions = res.data.map((v) => v.partition);
this.partitions.splice(0, 0, -1);
}
});
},
handleTopicChange(topic) {
this.selectPartition = -1;
this.getPartitionInfo(topic);
},
handleSubmit(e) {
e.preventDefault();
this.form.validateFields((err, values) => {
if (!err) {
const param = Object.assign({}, values, {
partition: this.selectPartition,
});
this.loading = true;
request({
url: KafkaMessageApi.send.url,
method: KafkaMessageApi.send.method,
data: param,
}).then((res) => {
this.loading = false;
if (res.code == 0) {
this.$message.success(res.msg);
} else {
notification.error({
message: "error",
description: res.msg,
});
}
});
}
});
},
},
created() {
this.getTopicNameList();
},
};
</script>
<style scoped></style>

View File

@@ -0,0 +1,157 @@
<template>
<a-modal
title="限流配置"
:visible="show"
:width="1000"
:mask="false"
:maskClosable="false"
okText="确认"
cancelText="取消"
:destroyOnClose="true"
@cancel="handleCancel"
@ok="ok"
>
<div>
<a-spin :spinning="loading">
<a-form
:form="form"
:label-col="{ span: 5 }"
:wrapper-col="{ span: 12 }"
>
<a-form-item label="Broker">
<a-select
mode="multiple"
option-filter-prop="children"
v-decorator="[
'brokerList',
{
initialValue: brokers,
rules: [{ required: true, message: '请选择一个broker!' }],
},
]"
placeholder="请选择一个broker"
>
<a-select-option v-for="v in brokers" :key="v" :value="v">
<span v-if="v == -1">全部</span> <span v-else>{{ v }}</span>
</a-select-option>
</a-select>
</a-form-item>
<a-form-item label="带宽">
<a-input-number
:min="1"
:max="1024"
v-decorator="[
'throttle',
{
initialValue: 1,
rules: [{ required: true, message: '输入带宽!' }],
},
]"
/>
<a-select default-value="MB" v-model="unit" style="width: 100px">
<a-select-option value="MB"> MB/s </a-select-option>
<a-select-option value="KB"> KB/s </a-select-option>
</a-select>
</a-form-item>
</a-form>
<hr />
<div><h4>注意:</h4></div>
<ul>
<li>该限速带宽指的是broker之间副本进行同步时占用的带宽</li>
<li>该配置是broker级别配置是针对broker上topic的副本</li>
<li>
在当前页面对指定broker限流配置后并不是说设置后该broker上的所有topic副本同步就被限制为当前流速了。这仅仅是速率设置如果需要对某topic的副本同步进行限流还需要去
Topic->限流 处操作只有进行限流操作的topic该限速才会对其生效
</li>
<li>
上面这句话的意思就是这里只配置topic副本同步的速率要使这个配置真正在某个topic上生效还要开启这个topic的限流
</li>
</ul>
<h4>如何检查限流配置是否成功:</h4>
kafka的限流速率是通过下面这两项配置的
<ul>
<li>leader.replication.throttled.rate</li>
<li>follower.replication.throttled.rate</li>
</ul>
只需通过
<strong>集群->属性配置</strong>
查看是否存在这两项配置如果存在便是配置的有限流值的大小就是速率单位kb/s
</a-spin>
</div>
</a-modal>
</template>
<script>
import request from "@/utils/request";
import { KafkaClusterApi, KafkaOpApi } from "@/utils/api";
import notification from "ant-design-vue/lib/notification";
export default {
name: "ConfigThrottle",
props: {
visible: {
type: Boolean,
default: false,
},
},
data() {
return {
show: this.visible,
loading: false,
form: this.$form.createForm(this, { name: "ConfigThrottleForm" }),
brokers: [],
unit: "MB",
};
},
watch: {
visible(v) {
this.show = v;
if (this.show) {
this.getClusterInfo();
}
},
},
methods: {
handleCancel() {
this.$emit("closeConfigThrottleDialog", { refresh: false });
},
getClusterInfo() {
this.loading = true;
request({
url: KafkaClusterApi.getClusterInfo.url,
method: KafkaClusterApi.getClusterInfo.method,
}).then((res) => {
this.loading = false;
this.brokers = [];
res.data.nodes.forEach((node) => this.brokers.push(node.id));
});
},
ok() {
this.form.validateFields((err, values) => {
if (!err) {
const data = Object.assign({}, values, { unit: this.unit });
this.loading = true;
request({
url: KafkaOpApi.configThrottle.url,
method: KafkaOpApi.configThrottle.method,
data: data,
}).then((res) => {
this.loading = false;
if (res.code == 0) {
this.$message.success(res.msg);
this.$emit("closeConfigThrottleDialog", { refresh: false });
} else {
notification.error({
message: "error",
description: res.msg,
});
}
});
}
});
},
},
};
</script>
<style scoped></style>

View File

@@ -0,0 +1,166 @@
<template>
<a-modal
title="正在进行副本重分配的分区"
:visible="show"
:width="1200"
:mask="false"
:destroyOnClose="true"
:footer="null"
:maskClosable="false"
@cancel="handleCancel"
>
<div>
<a-spin :spinning="loading">
<a-table
:columns="columns"
:data-source="data"
bordered
:rowKey="(record) => record.topic + record.partition"
>
<div slot="replicas" slot-scope="text">
<span v-for="i in text" :key="i">
{{ i }}
</span>
</div>
<div slot="addingReplicas" slot-scope="text">
<span v-for="i in text" :key="i">
{{ i }}
</span>
</div>
<div slot="removingReplicas" slot-scope="text">
<span v-for="i in text" :key="i">
{{ i }}
</span>
</div>
<div slot="operation" slot-scope="record">
<a-popconfirm
title="取消正在进行的副本重分配任务?"
ok-text="确认"
cancel-text="取消"
@confirm="cancelReassignment(record)"
>
<a-button size="small" href="javascript:;" class="operation-btn"
>取消
</a-button>
</a-popconfirm>
</div>
</a-table>
</a-spin>
</div>
</a-modal>
</template>
<script>
import request from "@/utils/request";
import { KafkaOpApi } from "@/utils/api";
import notification from "ant-design-vue/es/notification";
export default {
name: "CurrentReassignments",
props: {
visible: {
type: Boolean,
default: false,
},
},
data() {
return {
columns: columns,
show: this.visible,
data: [],
loading: false,
};
},
watch: {
visible(v) {
this.show = v;
if (this.show) {
this.currentReassignments();
}
},
},
methods: {
currentReassignments() {
this.loading = true;
const api = KafkaOpApi.currentReassignments;
request({
url: api.url,
method: api.method,
}).then((res) => {
this.loading = false;
if (res.code != 0) {
notification.error({
message: "error",
description: res.msg,
});
} else {
this.data = res.data;
this.yesterday = this.data.yesterday;
this.today = this.data.today;
}
});
},
handleCancel() {
this.$emit("closeCurrentReassignmentsDialog", {});
},
cancelReassignment(record) {
const param = { topic: record.topic, partition: record.partition };
this.loading = true;
const api = KafkaOpApi.cancelReassignment;
request({
url: api.url,
method: api.method,
data: param,
}).then((res) => {
this.loading = false;
if (res.code != 0) {
notification.error({
message: "error",
description: res.msg,
});
} else {
this.currentReassignments();
}
});
},
},
};
const columns = [
{
title: "Topic",
dataIndex: "topic",
key: "topic",
},
{
title: "分区",
dataIndex: "partition",
key: "partition",
},
{
title: "副本",
dataIndex: "replicas",
key: "replicas",
scopedSlots: { customRender: "replicas" },
},
{
title: "正在增加的副本",
dataIndex: "addingReplicas",
key: "addingReplicas",
scopedSlots: { customRender: "addingReplicas" },
},
{
title: "正在移除的副本",
dataIndex: "removingReplicas",
key: "removingReplicas",
scopedSlots: { customRender: "removingReplicas" },
},
{
title: "操作",
key: "operation",
scopedSlots: { customRender: "operation" },
},
];
</script>
<style scoped></style>

View File

@@ -1,5 +1,25 @@
<template>
<div class="content">
<div class="content-module">
<a-card title="Broker管理" style="width: 100%; text-align: left">
<p>
<a-button type="primary" @click="openConfigThrottleDialog">
配置限流
</a-button>
<label>说明</label>
<span
>设置指定broker上的topic的副本之间数据同步占用的带宽这个设置是broker级别的但是设置后还要去对应的topic上进行限流配置指定对这个topic的相关副本进行限制</span
>
</p>
<p>
<a-button type="primary" @click="openRemoveThrottleDialog">
解除限流
</a-button>
<label>说明</label>
<span>解除指定broker上的topic副本之间数据同步占用的带宽限制</span>
</p>
</a-card>
</div>
<div class="content-module">
<a-card title="副本管理" style="width: 100%; text-align: left">
<p>
@@ -9,6 +29,13 @@
<label>说明</label>
<span>将集群中所有分区leader副本设置为首选副本</span>
</p>
<p>
<a-button type="primary" @click="openCurrentReassignmentsDialog">
副本变更详情
</a-button>
<label>说明</label>
<span>查看正在进行副本变更/重分配的任务或者将其取消</span>
</p>
</a-card>
</div>
<div class="content-module">
@@ -66,6 +93,20 @@
@closeDataSyncSchemeDialog="closeDataSyncSchemeDialog"
>
</DataSyncScheme>
<ConfigThrottle
:visible="brokerManager.showConfigThrottleDialog"
@closeConfigThrottleDialog="closeConfigThrottleDialog"
>
</ConfigThrottle>
<RemoveThrottle
:visible="brokerManager.showRemoveThrottleDialog"
@closeRemoveThrottleDialog="closeRemoveThrottleDialog"
>
</RemoveThrottle>
<CurrentReassignments
:visible="replicationManager.showCurrentReassignmentsDialog"
@closeCurrentReassignmentsDialog="closeCurrentReassignmentsDialog"
></CurrentReassignments>
</div>
</template>
@@ -75,6 +116,9 @@ import MinOffsetAlignment from "@/views/op/MinOffsetAlignment";
import OffsetAlignmentTable from "@/views/op/OffsetAlignmentTable";
import ElectPreferredLeader from "@/views/op/ElectPreferredLeader";
import DataSyncScheme from "@/views/op/DataSyncScheme";
import ConfigThrottle from "@/views/op/ConfigThrottle";
import RemoveThrottle from "@/views/op/RemoveThrottle";
import CurrentReassignments from "@/views/op/CurrentReassignments";
export default {
name: "Operation",
components: {
@@ -83,6 +127,9 @@ export default {
OffsetAlignmentTable,
ElectPreferredLeader,
DataSyncScheme,
ConfigThrottle,
RemoveThrottle,
CurrentReassignments,
},
data() {
return {
@@ -94,6 +141,11 @@ export default {
},
replicationManager: {
showElectPreferredLeaderDialog: false,
showCurrentReassignmentsDialog: false,
},
brokerManager: {
showConfigThrottleDialog: false,
showRemoveThrottleDialog: false,
},
};
},
@@ -128,6 +180,24 @@ export default {
closeElectPreferredLeaderDialog() {
this.replicationManager.showElectPreferredLeaderDialog = false;
},
openConfigThrottleDialog() {
this.brokerManager.showConfigThrottleDialog = true;
},
closeConfigThrottleDialog() {
this.brokerManager.showConfigThrottleDialog = false;
},
openRemoveThrottleDialog() {
this.brokerManager.showRemoveThrottleDialog = true;
},
closeRemoveThrottleDialog() {
this.brokerManager.showRemoveThrottleDialog = false;
},
openCurrentReassignmentsDialog() {
this.replicationManager.showCurrentReassignmentsDialog = true;
},
closeCurrentReassignmentsDialog() {
this.replicationManager.showCurrentReassignmentsDialog = false;
},
},
};
</script>

View File

@@ -0,0 +1,127 @@
<template>
<a-modal
title="解除限流"
:visible="show"
:width="1000"
:mask="false"
:maskClosable="false"
okText="确认"
cancelText="取消"
:destroyOnClose="true"
@cancel="handleCancel"
@ok="ok"
>
<div>
<a-spin :spinning="loading">
<a-form
:form="form"
:label-col="{ span: 5 }"
:wrapper-col="{ span: 12 }"
>
<a-form-item label="Broker">
<a-select
mode="multiple"
option-filter-prop="children"
v-decorator="[
'brokerList',
{
initialValue: brokers,
rules: [{ required: true, message: '请选择一个broker!' }],
},
]"
placeholder="请选择一个broker"
>
<a-select-option v-for="v in brokers" :key="v" :value="v">
<span v-if="v == -1">全部</span> <span v-else>{{ v }}</span>
</a-select-option>
</a-select>
</a-form-item>
</a-form>
<hr />
<h4>如何检查是否配置的有限流速率:</h4>
kafka的限流速率是通过下面这两项配置的
<ul>
<li>leader.replication.throttled.rate</li>
<li>follower.replication.throttled.rate</li>
</ul>
只需通过
<strong>集群->属性配置</strong>
查看是否存在这两项配置如果不存在便是没有配置限流速率。如果未配置限流速率即使指定某个topic的分区副本进行限流没有速率也不限流。
</a-spin>
</div>
</a-modal>
</template>
<script>
import request from "@/utils/request";
import { KafkaClusterApi, KafkaOpApi } from "@/utils/api";
import notification from "ant-design-vue/lib/notification";
export default {
name: "RemoveThrottle",
props: {
visible: {
type: Boolean,
default: false,
},
},
data() {
return {
show: this.visible,
loading: false,
form: this.$form.createForm(this, { name: "RemoveThrottleForm" }),
brokers: [],
};
},
watch: {
visible(v) {
this.show = v;
if (this.show) {
this.getClusterInfo();
}
},
},
methods: {
handleCancel() {
this.$emit("closeRemoveThrottleDialog", { refresh: false });
},
getClusterInfo() {
this.loading = true;
request({
url: KafkaClusterApi.getClusterInfo.url,
method: KafkaClusterApi.getClusterInfo.method,
}).then((res) => {
this.loading = false;
this.brokers = [];
res.data.nodes.forEach((node) => this.brokers.push(node.id));
});
},
ok() {
this.form.validateFields((err, values) => {
if (!err) {
const data = Object.assign({}, values);
this.loading = true;
request({
url: KafkaOpApi.removeThrottle.url,
method: KafkaOpApi.removeThrottle.method,
data: data,
}).then((res) => {
this.loading = false;
if (res.code == 0) {
this.$message.success(res.msg);
this.$emit("closeRemoveThrottleDialog", { refresh: false });
} else {
notification.error({
message: "error",
description: res.msg,
});
}
});
}
});
},
},
};
</script>
<style scoped></style>

View File

@@ -0,0 +1,163 @@
<template>
<a-modal
:title="topic + '限流'"
:visible="show"
:width="1000"
:mask="false"
:maskClosable="false"
okText="确认"
cancelText="取消"
:destroyOnClose="true"
@cancel="handleCancel"
@ok="ok"
>
<div>
<a-spin :spinning="loading">
<a-form
:form="form"
:label-col="{ span: 5 }"
:wrapper-col="{ span: 12 }"
>
<a-form-item label="操作">
<a-radio-group
@change="onChange"
v-decorator="[
'operation',
{
initialValue: 'ON',
rules: [{ required: true, message: '请选择一个操作!' }],
},
]"
>
<a-radio value="ON"> 配置限流 </a-radio>
<a-radio value="OFF"> 移除所有分区限流配置 </a-radio>
</a-radio-group>
</a-form-item>
<a-form-item label="选择分区" v-show="showPartition">
<a-select
mode="multiple"
option-filter-prop="children"
v-decorator="[
'partitions',
{
initialValue: [-1],
rules: [{ required: true, message: '请选择一个分区!' }],
},
]"
placeholder="请选择一个分区"
>
<a-select-option v-for="v in partitions" :key="v" :value="v">
<span v-if="v == -1">全部</span> <span v-else>{{ v }}</span>
</a-select-option>
</a-select>
</a-form-item>
</a-form>
<hr />
<h4>说明:</h4>
该限流表示topic的副本的在不同broker之间数据同步占用带宽的限制该配置是一个topic级别的配置项。如未配置速率即使配置了这个限流也不会进行实际的限流操作。配置速率在
<span style="color: red">运维->配置限流</span> 处进行操作.
<h4>如何检查是否对哪些分区启用限流:</h4>
topic的限流是通过下面这两项配置的
<ul>
<li>leader.replication.throttled.replicas</li>
<li>follower.replication.throttled.replicas</li>
</ul>
只需通过
<strong>属性配置</strong>
查看这两项配置的值,格式为:"0:0,1:0"左侧为分区右侧为broker
id。示例表示[分区0的副本在broker 0上分区1的副本在broker 0上]。
</a-spin>
</div>
</a-modal>
</template>
<script>
import request from "@/utils/request";
import { KafkaTopicApi } from "@/utils/api";
import notification from "ant-design-vue/lib/notification";
export default {
name: "ConfigTopicThrottle",
props: {
visible: {
type: Boolean,
default: false,
},
topic: {
type: String,
default: "",
},
},
data() {
return {
show: this.visible,
loading: false,
form: this.$form.createForm(this, { name: "RemoveThrottleForm" }),
partitions: [],
showPartition: true,
};
},
watch: {
visible(v) {
this.show = v;
if (this.show) {
this.getPartitionInfo();
this.showPartition = true;
}
},
},
methods: {
handleCancel() {
this.$emit("closeThrottleDialog", { refresh: false });
},
getPartitionInfo() {
this.loading = true;
request({
url: KafkaTopicApi.getPartitionInfo.url + "?topic=" + this.topic,
method: KafkaTopicApi.getPartitionInfo.method,
}).then((res) => {
this.loading = false;
if (res.code != 0) {
notification.error({
message: "error",
description: res.msg,
});
} else {
this.partitions = res.data.map((e) => e.partition);
this.partitions.splice(0, 0, -1);
}
});
},
ok() {
this.form.validateFields((err, values) => {
if (!err) {
const data = Object.assign({}, values, { topic: this.topic });
this.loading = true;
request({
url: KafkaTopicApi.configThrottle.url,
method: KafkaTopicApi.configThrottle.method,
data: data,
}).then((res) => {
this.loading = false;
if (res.code == 0) {
this.$message.success(res.msg);
this.$emit("closeThrottleDialog", { refresh: false });
} else {
notification.error({
message: "error",
description: res.msg,
});
}
});
}
});
},
onChange(e) {
this.showPartition = !(e.target.value == "OFF");
},
},
};
</script>
<style scoped></style>

View File

@@ -12,6 +12,7 @@
<div>
<a-spin :spinning="loading">
<a-table
bordered
:columns="columns"
:data-source="data"
:rowKey="
@@ -21,19 +22,15 @@
"
>
<ul slot="replicas" slot-scope="text">
<ol v-for="i in text" :key="i">
{{
i
}}
</ol>
</ul>
<ul slot="isr" slot-scope="text">
<ol v-for="i in text" :key="i">
{{
i
}}
</ol>
<li v-for="i in text" :key="i">
{{ i }}
</li>
</ul>
<div slot="isr" slot-scope="text">
<span v-for="i in text" :key="i">
{{ i }}
</span>
</div>
<div slot="operation" slot-scope="record" v-show="!record.internal">
<a-popconfirm
:title="
@@ -52,7 +49,15 @@
</a-button>
</a-popconfirm>
</div>
<p slot="expandedRowRender" slot-scope="record" style="margin: 0">
有效消息的时间范围<span class="red-font">{{
formatTime(record.beginTime)
}}</span>
~
<span class="red-font">{{ formatTime(record.endTime) }}</span>
</p>
</a-table>
<p>友情提示点击+号展开可以查看当前分区的有效消息的时间范围</p>
</a-spin>
</div>
</a-modal>
@@ -62,6 +67,7 @@
import request from "@/utils/request";
import { KafkaOpApi, KafkaTopicApi } from "@/utils/api";
import notification from "ant-design-vue/es/notification";
import moment from "moment";
export default {
name: "PartitionInfo",
props: {
@@ -131,6 +137,11 @@ export default {
}
});
},
formatTime(timestamp) {
return timestamp != -1
? moment(timestamp).format("YYYY-MM-DD HH:mm:ss:SSS")
: timestamp;
},
},
};
@@ -172,6 +183,26 @@ const columns = [
dataIndex: "diff",
key: "diff",
},
// {
// title: "有效消息起始时间",
// dataIndex: "beginTime",
// key: "beginTime",
// slots: { title: "beginTime" },
// scopedSlots: { customRender: "internal" },
// customRender: (text) => {
// return text != -1 ? moment(text).format("YYYY-MM-DD HH:mm:ss:SSS") : text;
// },
// },
// {
// title: "有效消息结束时间",
// dataIndex: "endTime",
// key: "endTime",
// slots: { title: "endTime" },
// scopedSlots: { customRender: "internal" },
// customRender: (text) => {
// return text != -1 ? moment(text).format("YYYY-MM-DD HH:mm:ss:SSS") : text;
// },
// },
{
title: "操作",
key: "operation",
@@ -180,4 +211,11 @@ const columns = [
];
</script>
<style scoped></style>
<style scoped>
.red-font {
color: red;
}
.green-font {
color: green;
}
</style>

View File

@@ -0,0 +1,115 @@
<template>
<a-modal
:title="topic + '发送统计'"
:visible="show"
:width="1000"
:mask="false"
:destroyOnClose="true"
:footer="null"
:maskClosable="false"
@cancel="handleCancel"
>
<div>
<a-spin :spinning="loading">
<h4>今天发送消息数{{ today.total }}</h4>
<a-table
:columns="columns"
:data-source="today.detail"
bordered
:rowKey="(record) => record.partition"
>
</a-table>
<hr />
<h4>昨天发送消息数{{ yesterday.total }}</h4>
<a-table
:columns="columns"
:data-source="yesterday.detail"
bordered
:rowKey="(record) => record.partition"
>
</a-table>
</a-spin>
</div>
</a-modal>
</template>
<script>
import request from "@/utils/request";
import { KafkaTopicApi } from "@/utils/api";
import notification from "ant-design-vue/es/notification";
export default {
name: "SendStats",
props: {
topic: {
type: String,
default: "",
},
visible: {
type: Boolean,
default: false,
},
},
data() {
return {
columns: columns,
show: this.visible,
data: [],
loading: false,
yesterday: {},
today: {},
};
},
watch: {
visible(v) {
this.show = v;
if (this.show) {
this.sendStatus();
}
},
},
methods: {
sendStatus() {
this.loading = true;
const api = KafkaTopicApi.sendStats;
request({
url: api.url + "?topic=" + this.topic,
method: api.method,
}).then((res) => {
this.loading = false;
if (res.code != 0) {
notification.error({
message: "error",
description: res.msg,
});
} else {
this.data = res.data;
this.yesterday = this.data.yesterday;
this.today = this.data.today;
}
});
},
handleCancel() {
this.data = [];
this.yesterday = {};
this.today = {};
this.$emit("closeMessageStatsDialog", {});
},
},
};
const columns = [
{
title: "分区",
dataIndex: "partition",
key: "partition",
},
{
title: "数量",
dataIndex: "num",
key: "num",
},
];
</script>
<style scoped></style>

View File

@@ -98,6 +98,27 @@
@click="openTopicConfigDialog(record.name)"
>属性配置
</a-button>
<a-button
size="small"
href="javascript:;"
class="operation-btn"
@click="openUpdateReplicaDialog(record.name)"
>变更副本
</a-button>
<a-button
size="small"
href="javascript:;"
class="operation-btn"
@click="openMessageStatsDialog(record.name)"
>发送统计
</a-button>
<a-button
size="small"
href="javascript:;"
class="operation-btn"
@click="openThrottleDialog(record.name)"
>限流
</a-button>
</div>
</a-table>
<PartitionInfo
@@ -126,6 +147,21 @@
:topic="selectDetail.resourceName"
@closeTopicConfigDialog="closeTopicConfigDialog"
></TopicConfig>
<UpdateReplica
:visible="showUpdateReplicaDialog"
:topic="selectDetail.resourceName"
@closeUpdateReplicaDialog="closeUpdateReplicaDialog"
></UpdateReplica>
<ConfigTopicThrottle
:visible="showThrottleDialog"
:topic="selectDetail.resourceName"
@closeThrottleDialog="closeThrottleDialog"
></ConfigTopicThrottle>
<SendStats
:visible="showSendStatsDialog"
:topic="selectDetail.resourceName"
@closeMessageStatsDialog="closeMessageStatsDialog"
></SendStats>
</div>
</a-spin>
</div>
@@ -140,6 +176,9 @@ import CreateTopic from "@/views/topic/CreateTopic";
import AddPartition from "@/views/topic/AddPartition";
import ConsumedDetail from "@/views/topic/ConsumedDetail";
import TopicConfig from "@/views/topic/TopicConfig";
import UpdateReplica from "@/views/topic/UpdateReplica";
import ConfigTopicThrottle from "@/views/topic/ConfigTopicThrottle";
import SendStats from "@/views/topic/SendStats";
export default {
name: "Topic",
@@ -149,6 +188,9 @@ export default {
AddPartition,
ConsumedDetail,
TopicConfig,
UpdateReplica,
ConfigTopicThrottle,
SendStats,
},
data() {
return {
@@ -170,6 +212,9 @@ export default {
showAddPartition: false,
showConsumedDetailDialog: false,
showTopicConfigDialog: false,
showUpdateReplicaDialog: false,
showThrottleDialog: false,
showSendStatsDialog: false,
};
},
methods: {
@@ -250,6 +295,27 @@ export default {
closeTopicConfigDialog() {
this.showTopicConfigDialog = false;
},
openUpdateReplicaDialog(topic) {
this.showUpdateReplicaDialog = true;
this.selectDetail.resourceName = topic;
},
closeUpdateReplicaDialog() {
this.showUpdateReplicaDialog = false;
},
openMessageStatsDialog(topic) {
this.showSendStatsDialog = true;
this.selectDetail.resourceName = topic;
},
closeMessageStatsDialog() {
this.showSendStatsDialog = false;
},
openThrottleDialog(topic) {
this.showThrottleDialog = true;
this.selectDetail.resourceName = topic;
},
closeThrottleDialog() {
this.showThrottleDialog = false;
},
},
created() {
this.getTopicList();
@@ -281,7 +347,7 @@ const columns = [
title: "操作",
key: "operation",
scopedSlots: { customRender: "operation" },
width: 500,
width: 800,
},
];
</script>

View File

@@ -0,0 +1,211 @@
<template>
<a-modal
title="变更副本"
:visible="show"
:width="1200"
:mask="false"
:destroyOnClose="true"
:maskClosable="false"
@cancel="handleCancel"
okText="确认"
cancelText="取消"
@ok="handleOk"
>
<div>
<a-spin :spinning="loading">
<div class="replica-box">
<label>设置副本数</label
><a-input-number
id="inputNumber"
v-model="replicaNums"
:min="1"
:max="brokerSize"
@change="onChange"
/>
</div>
<div class="replica-box">
<label>是否要限流</label
><a-input-number
id="inputNumber"
v-model="data.interBrokerThrottle"
:min="-1"
:max="102400"
/>
<strong>
|说明broker之间副本同步带宽限制默认值为-1表示不限制不是-1表示限制该值并不表示流速至于流速配置
<span style="color: red">运维->配置限流</span> 处进行操作.</strong
>
</div>
<a-table
:columns="columns"
:data-source="data.partitions"
bordered
:rowKey="
(record, index) => {
return index;
}
"
>
<div slot="replicas" slot-scope="text">
<span v-for="i in text" :key="i">
{{ i }}
</span>
</div>
</a-table>
<p>
*正在进行即尚未完成的副本变更的任务可以在
<span style="color: red">运维->副本变更详情</span>
处查看也可以在那里将正在进行的任务取消
</p>
<p>
*如果是减少副本不用限流如果是增加副本数副本同步的时候如果有大量消息需要同步可能占用大量带宽担心会影响集群的稳定考虑是否开启限流同步完成可以再把该topic的限流关毕关闭操作可以点击
限流按钮 处理
</p>
</a-spin>
</div>
</a-modal>
</template>
<script>
import request from "@/utils/request";
import { KafkaClusterApi, KafkaTopicApi } from "@/utils/api";
import notification from "ant-design-vue/lib/notification";
export default {
name: "UpdateReplica",
props: {
topic: {
type: String,
default: "",
},
visible: {
type: Boolean,
default: false,
},
},
data() {
return {
columns: columns,
show: this.visible,
data: {},
loading: false,
form: this.$form.createForm(this, { name: "coordinated" }),
brokerSize: 0,
replicaNums: 0,
defaultReplicaNums: 0,
};
},
watch: {
visible(v) {
this.show = v;
if (this.show) {
this.getClusterInfo();
this.getCurrentReplicaAssignment();
}
},
},
methods: {
getCurrentReplicaAssignment() {
this.loading = true;
request({
url:
KafkaTopicApi.getCurrentReplicaAssignment.url +
"?topic=" +
this.topic,
method: KafkaTopicApi.getCurrentReplicaAssignment.method,
}).then((res) => {
this.loading = false;
if (res.code == 0) {
this.data = res.data;
if (this.data.partitions.length > 0) {
this.replicaNums = this.data.partitions[0].replicas.length;
this.defaultReplicaNums = this.replicaNums;
}
} else {
notification.error({
message: "error",
description: res.msg,
});
}
});
},
getClusterInfo() {
this.loading = true;
request({
url: KafkaClusterApi.getClusterInfo.url,
method: KafkaClusterApi.getClusterInfo.method,
}).then((res) => {
this.brokerSize = res.data.nodes.length;
});
},
handleCancel() {
this.data = {};
this.$emit("closeUpdateReplicaDialog", { refresh: false });
},
onChange(value) {
if (value < 1 || value > this.brokerSize) {
return false;
}
if (this.data.partitions.length > 0) {
this.data.partitions.forEach((p) => {
if (value > p.replicas.length) {
let num = p.replicas[p.replicas.length - 1];
for (let i = p.replicas.length; i < value; i++) {
p.replicas.push(++num % this.brokerSize);
}
}
if (value < p.replicas.length) {
for (let i = p.replicas.length; i > value; i--) {
p.replicas.pop();
}
}
});
}
},
handleOk() {
this.loading = true;
request({
url: KafkaTopicApi.updateReplicaAssignment.url,
method: KafkaTopicApi.updateReplicaAssignment.method,
data: this.data,
}).then((res) => {
this.loading = false;
if (res.code == 0) {
this.$message.success(res.msg);
this.$emit("closeUpdateReplicaDialog", { refresh: false });
} else {
notification.error({
message: "error",
description: res.msg,
});
}
});
},
},
};
const columns = [
{
title: "Topic",
dataIndex: "topic",
key: "topic",
},
{
title: "分区",
dataIndex: "partition",
key: "partition",
},
{
title: "副本",
dataIndex: "replicas",
key: "replicas",
scopedSlots: { customRender: "replicas" },
},
];
</script>
<style scoped>
.replica-box {
margin-bottom: 1%;
}
</style>