add consumer group console

This commit is contained in:
许晓东
2021-09-10 21:06:16 +08:00
parent cb55c20c0b
commit 098af95dc7
9 changed files with 196 additions and 1 deletions

View File

@@ -48,7 +48,7 @@ sh bin/shutdown.sh
除了webstorm是开发前端的ide可以根据自己需要代替jdk scala是必须有的。
# 本地开发配置
以我自己为例开发环境里的工具准备好然后代码clone到本地。
## 后端配置
## 后端配置w
1. 用idea打开项目
2. 打开idea的Project Structure(Settings) -> Modules -> 设置src/main/scala为Sources因为约定src/main/java是源码目录所以这里要再加一个
3. 打开idea的Project Structure(Settings) -> Libraries 添加scala sdk然后选择本地下载的scala 2.13的目录,确定添加进来

View File

@@ -0,0 +1,10 @@
package com.xuxd.kafka.console;
/**
* kafka-console-ui.
*
* @author xuxd
* @date 2021-09-10 20:03:01
**/
public class CounterSet {
}

View File

@@ -23,4 +23,11 @@ public class CounterList<T> {
public int getTotal() {
return list != null ? list.size() : 0;
}
@Override public String toString() {
return "CounterList{" +
"list=" + list +
", total=" + getTotal() +
'}';
}
}

View File

@@ -0,0 +1,41 @@
package com.xuxd.kafka.console.beans.vo;
import lombok.Data;
import org.apache.kafka.clients.admin.ConsumerGroupDescription;
/**
* kafka-console-ui.
*
* @author xuxd
* @date 2021-09-10 20:16:07
**/
@Data
public class ConsumerGroupVO {
private String groupId;
private boolean isSimpleConsumerGroup;
private int members;
private String partitionAssignor;
private String state;
private String coordinator;
private int authorizedOperations;
public static ConsumerGroupVO from(ConsumerGroupDescription description) {
ConsumerGroupVO vo = new ConsumerGroupVO();
vo.setGroupId(description.groupId());
vo.setSimpleConsumerGroup(description.isSimpleConsumerGroup());
vo.setMembers(description.members().size());
vo.setPartitionAssignor(description.partitionAssignor());
vo.setState(description.state().name());
vo.setCoordinator(description.coordinator() != null ? description.coordinator().toString() : "");
vo.setAuthorizedOperations(description.authorizedOperations() != null ? description.authorizedOperations().size() : 0);
return vo;
}
}

View File

@@ -1,5 +1,6 @@
package com.xuxd.kafka.console.config;
import kafka.console.ConsumerConsole;
import kafka.console.KafkaAclConsole;
import kafka.console.KafkaConfigConsole;
import kafka.console.TopicConsole;
@@ -29,4 +30,9 @@ public class KafkaConfiguration {
public TopicConsole topicConsole(KafkaConfig config) {
return new TopicConsole(config);
}
@Bean
public ConsumerConsole consumerConsole(KafkaConfig config) {
return new ConsumerConsole(config);
}
}

View File

@@ -0,0 +1,17 @@
package com.xuxd.kafka.console.service;
import com.xuxd.kafka.console.beans.ResponseData;
import java.util.List;
import java.util.Set;
import org.apache.kafka.common.ConsumerGroupState;
/**
* kafka-console-ui.
*
* @author xuxd
* @date 2021-09-10 19:39:26
**/
public interface ConsumerService {
ResponseData getConsumerGroupList(List<String> groupIds, Set<ConsumerGroupState> states);
}

View File

@@ -0,0 +1,49 @@
package com.xuxd.kafka.console.service.impl;
import com.xuxd.kafka.console.beans.CounterList;
import com.xuxd.kafka.console.beans.ResponseData;
import com.xuxd.kafka.console.beans.vo.ConsumerGroupVO;
import com.xuxd.kafka.console.service.ConsumerService;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import kafka.console.ConsumerConsole;
import org.apache.kafka.common.ConsumerGroupState;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
/**
* kafka-console-ui.
*
* @author xuxd
* @date 2021-09-10 19:40:10
**/
@Service
public class ConsumerServiceImpl implements ConsumerService {
@Autowired
private ConsumerConsole consumerConsole;
@Override public ResponseData getConsumerGroupList(List<String> groupIds, Set<ConsumerGroupState> states) {
Set<String> groupList = new HashSet<>();
if (groupIds != null && !groupIds.isEmpty()) {
if (states != null && !states.isEmpty()) {
Set<String> stateGroup = consumerConsole.getConsumerGroupIdList(states);
Set<String> filterGroupList = groupIds.stream().filter(x -> stateGroup.contains(x)).collect(Collectors.toSet());
if (filterGroupList.isEmpty()) {
return ResponseData.create().data(Collections.emptyList()).success();
} else {
groupList.addAll(filterGroupList);
}
} else {
groupList.addAll(groupIds);
}
} else {
groupList.addAll(consumerConsole.getConsumerGroupIdList(states));
}
List<ConsumerGroupVO> consumerGroupVOS = consumerConsole.getConsumerGroupList(groupList).stream().map(c -> ConsumerGroupVO.from(c)).collect(Collectors.toList());
return ResponseData.create().data(new CounterList<>(consumerGroupVOS)).success();
}
}

View File

@@ -0,0 +1,38 @@
package kafka.console
import java.util
import java.util.{Collections, Set}
import com.xuxd.kafka.console.config.KafkaConfig
import org.apache.kafka.clients.admin.{ConsumerGroupDescription, ListConsumerGroupsOptions}
import org.apache.kafka.common.ConsumerGroupState
import scala.jdk.CollectionConverters.{CollectionHasAsScala, SetHasAsJava}
/**
* kafka-console-ui. kafka consumer console.
*
* @author xuxd
* @date 2021-09-10 17:19:31
* */
class ConsumerConsole(config: KafkaConfig) extends KafkaConsole(config: KafkaConfig) with Logging {
def getConsumerGroupIdList(states: Set[ConsumerGroupState]): Set[String] = {
withAdminClientAndCatchError(admin => admin.listConsumerGroups(new ListConsumerGroupsOptions().inStates(states)).all().get()
.asScala.map(_.groupId()).toSet.asJava,
e => {
log.error("listConsumerGroups error.", e)
Collections.emptySet()
}).asInstanceOf[Set[String]]
}
def getConsumerGroupList(groupIds: util.Collection[String]): Set[ConsumerGroupDescription] = {
val searchGroupIds: Set[String] = if (groupIds == null || groupIds.isEmpty) getConsumerGroupIdList(null) else new util.HashSet[String](groupIds)
withAdminClientAndCatchError(admin => new util.HashSet[ConsumerGroupDescription](admin.describeConsumerGroups(searchGroupIds).all().get().values()),
e => {
log.error("listConsumerGroups error.", e)
Collections.emptySet()
}).asInstanceOf[Set[ConsumerGroupDescription]]
}
}

View File

@@ -0,0 +1,27 @@
package com.xuxd.kafka.console.service.impl;
import com.xuxd.kafka.console.service.ConsumerService;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import static org.junit.jupiter.api.Assertions.*;
/**
* kafka-console-ui.
*
* @author xuxd
* @date 2021-09-10 20:11:22
**/
@SpringBootTest
public class ConsumerServiceImplTest {
@Autowired
private ConsumerService consumerService;
@Test
public void getConsumerGroupList() {
Object data = consumerService.getConsumerGroupList(null, null).getData();
System.out.println(data);
}
}