diff --git a/README.md b/README.md index e7c8cca..1ba4361 100644 --- a/README.md +++ b/README.md @@ -48,7 +48,7 @@ sh bin/shutdown.sh 除了webstorm是开发前端的ide可以根据自己需要代替,jdk scala是必须有的。 # 本地开发配置 以我自己为例,开发环境里的工具准备好,然后代码clone到本地。 -## 后端配置 +## 后端配置w 1. 用idea打开项目 2. 打开idea的Project Structure(Settings) -> Modules -> 设置src/main/scala为Sources,因为约定src/main/java是源码目录,所以这里要再加一个 3. 打开idea的Project Structure(Settings) -> Libraries 添加scala sdk,然后选择本地下载的scala 2.13的目录,确定添加进来 diff --git a/src/main/java/com/xuxd/kafka/console/CounterSet.java b/src/main/java/com/xuxd/kafka/console/CounterSet.java new file mode 100644 index 0000000..c2a1775 --- /dev/null +++ b/src/main/java/com/xuxd/kafka/console/CounterSet.java @@ -0,0 +1,10 @@ +package com.xuxd.kafka.console; + +/** + * kafka-console-ui. + * + * @author xuxd + * @date 2021-09-10 20:03:01 + **/ +public class CounterSet { +} diff --git a/src/main/java/com/xuxd/kafka/console/beans/CounterList.java b/src/main/java/com/xuxd/kafka/console/beans/CounterList.java index 68374da..3a33a56 100644 --- a/src/main/java/com/xuxd/kafka/console/beans/CounterList.java +++ b/src/main/java/com/xuxd/kafka/console/beans/CounterList.java @@ -23,4 +23,11 @@ public class CounterList { public int getTotal() { return list != null ? list.size() : 0; } + + @Override public String toString() { + return "CounterList{" + + "list=" + list + + ", total=" + getTotal() + + '}'; + } } diff --git a/src/main/java/com/xuxd/kafka/console/beans/vo/ConsumerGroupVO.java b/src/main/java/com/xuxd/kafka/console/beans/vo/ConsumerGroupVO.java new file mode 100644 index 0000000..1a18d4e --- /dev/null +++ b/src/main/java/com/xuxd/kafka/console/beans/vo/ConsumerGroupVO.java @@ -0,0 +1,41 @@ +package com.xuxd.kafka.console.beans.vo; + +import lombok.Data; +import org.apache.kafka.clients.admin.ConsumerGroupDescription; + +/** + * kafka-console-ui. + * + * @author xuxd + * @date 2021-09-10 20:16:07 + **/ +@Data +public class ConsumerGroupVO { + + private String groupId; + + private boolean isSimpleConsumerGroup; + + private int members; + + private String partitionAssignor; + + private String state; + + private String coordinator; + + private int authorizedOperations; + + public static ConsumerGroupVO from(ConsumerGroupDescription description) { + ConsumerGroupVO vo = new ConsumerGroupVO(); + vo.setGroupId(description.groupId()); + vo.setSimpleConsumerGroup(description.isSimpleConsumerGroup()); + vo.setMembers(description.members().size()); + vo.setPartitionAssignor(description.partitionAssignor()); + vo.setState(description.state().name()); + vo.setCoordinator(description.coordinator() != null ? description.coordinator().toString() : ""); + vo.setAuthorizedOperations(description.authorizedOperations() != null ? description.authorizedOperations().size() : 0); + + return vo; + } +} diff --git a/src/main/java/com/xuxd/kafka/console/config/KafkaConfiguration.java b/src/main/java/com/xuxd/kafka/console/config/KafkaConfiguration.java index 53877d8..4035072 100644 --- a/src/main/java/com/xuxd/kafka/console/config/KafkaConfiguration.java +++ b/src/main/java/com/xuxd/kafka/console/config/KafkaConfiguration.java @@ -1,5 +1,6 @@ package com.xuxd.kafka.console.config; +import kafka.console.ConsumerConsole; import kafka.console.KafkaAclConsole; import kafka.console.KafkaConfigConsole; import kafka.console.TopicConsole; @@ -29,4 +30,9 @@ public class KafkaConfiguration { public TopicConsole topicConsole(KafkaConfig config) { return new TopicConsole(config); } + + @Bean + public ConsumerConsole consumerConsole(KafkaConfig config) { + return new ConsumerConsole(config); + } } diff --git a/src/main/java/com/xuxd/kafka/console/service/ConsumerService.java b/src/main/java/com/xuxd/kafka/console/service/ConsumerService.java new file mode 100644 index 0000000..f83a179 --- /dev/null +++ b/src/main/java/com/xuxd/kafka/console/service/ConsumerService.java @@ -0,0 +1,17 @@ +package com.xuxd.kafka.console.service; + +import com.xuxd.kafka.console.beans.ResponseData; +import java.util.List; +import java.util.Set; +import org.apache.kafka.common.ConsumerGroupState; + +/** + * kafka-console-ui. + * + * @author xuxd + * @date 2021-09-10 19:39:26 + **/ +public interface ConsumerService { + + ResponseData getConsumerGroupList(List groupIds, Set states); +} diff --git a/src/main/java/com/xuxd/kafka/console/service/impl/ConsumerServiceImpl.java b/src/main/java/com/xuxd/kafka/console/service/impl/ConsumerServiceImpl.java new file mode 100644 index 0000000..2f393b3 --- /dev/null +++ b/src/main/java/com/xuxd/kafka/console/service/impl/ConsumerServiceImpl.java @@ -0,0 +1,49 @@ +package com.xuxd.kafka.console.service.impl; + +import com.xuxd.kafka.console.beans.CounterList; +import com.xuxd.kafka.console.beans.ResponseData; +import com.xuxd.kafka.console.beans.vo.ConsumerGroupVO; +import com.xuxd.kafka.console.service.ConsumerService; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; +import kafka.console.ConsumerConsole; +import org.apache.kafka.common.ConsumerGroupState; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +/** + * kafka-console-ui. + * + * @author xuxd + * @date 2021-09-10 19:40:10 + **/ +@Service +public class ConsumerServiceImpl implements ConsumerService { + + @Autowired + private ConsumerConsole consumerConsole; + + @Override public ResponseData getConsumerGroupList(List groupIds, Set states) { + Set groupList = new HashSet<>(); + if (groupIds != null && !groupIds.isEmpty()) { + if (states != null && !states.isEmpty()) { + Set stateGroup = consumerConsole.getConsumerGroupIdList(states); + Set filterGroupList = groupIds.stream().filter(x -> stateGroup.contains(x)).collect(Collectors.toSet()); + if (filterGroupList.isEmpty()) { + return ResponseData.create().data(Collections.emptyList()).success(); + } else { + groupList.addAll(filterGroupList); + } + } else { + groupList.addAll(groupIds); + } + } else { + groupList.addAll(consumerConsole.getConsumerGroupIdList(states)); + } + List consumerGroupVOS = consumerConsole.getConsumerGroupList(groupList).stream().map(c -> ConsumerGroupVO.from(c)).collect(Collectors.toList()); + return ResponseData.create().data(new CounterList<>(consumerGroupVOS)).success(); + } +} diff --git a/src/main/scala/kafka/console/ConsumerConsole.scala b/src/main/scala/kafka/console/ConsumerConsole.scala new file mode 100644 index 0000000..1865df3 --- /dev/null +++ b/src/main/scala/kafka/console/ConsumerConsole.scala @@ -0,0 +1,38 @@ +package kafka.console + +import java.util +import java.util.{Collections, Set} + +import com.xuxd.kafka.console.config.KafkaConfig +import org.apache.kafka.clients.admin.{ConsumerGroupDescription, ListConsumerGroupsOptions} +import org.apache.kafka.common.ConsumerGroupState + +import scala.jdk.CollectionConverters.{CollectionHasAsScala, SetHasAsJava} + +/** + * kafka-console-ui. kafka consumer console. + * + * @author xuxd + * @date 2021-09-10 17:19:31 + * */ +class ConsumerConsole(config: KafkaConfig) extends KafkaConsole(config: KafkaConfig) with Logging { + + def getConsumerGroupIdList(states: Set[ConsumerGroupState]): Set[String] = { + + withAdminClientAndCatchError(admin => admin.listConsumerGroups(new ListConsumerGroupsOptions().inStates(states)).all().get() + .asScala.map(_.groupId()).toSet.asJava, + e => { + log.error("listConsumerGroups error.", e) + Collections.emptySet() + }).asInstanceOf[Set[String]] + } + + def getConsumerGroupList(groupIds: util.Collection[String]): Set[ConsumerGroupDescription] = { + val searchGroupIds: Set[String] = if (groupIds == null || groupIds.isEmpty) getConsumerGroupIdList(null) else new util.HashSet[String](groupIds) + withAdminClientAndCatchError(admin => new util.HashSet[ConsumerGroupDescription](admin.describeConsumerGroups(searchGroupIds).all().get().values()), + e => { + log.error("listConsumerGroups error.", e) + Collections.emptySet() + }).asInstanceOf[Set[ConsumerGroupDescription]] + } +} diff --git a/src/test/java/com/xuxd/kafka/console/service/impl/ConsumerServiceImplTest.java b/src/test/java/com/xuxd/kafka/console/service/impl/ConsumerServiceImplTest.java new file mode 100644 index 0000000..4eed96b --- /dev/null +++ b/src/test/java/com/xuxd/kafka/console/service/impl/ConsumerServiceImplTest.java @@ -0,0 +1,27 @@ +package com.xuxd.kafka.console.service.impl; + +import com.xuxd.kafka.console.service.ConsumerService; +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; + +import static org.junit.jupiter.api.Assertions.*; + +/** + * kafka-console-ui. + * + * @author xuxd + * @date 2021-09-10 20:11:22 + **/ +@SpringBootTest +public class ConsumerServiceImplTest { + + @Autowired + private ConsumerService consumerService; + + @Test + public void getConsumerGroupList() { + Object data = consumerService.getConsumerGroupList(null, null).getData(); + System.out.println(data); + } +} \ No newline at end of file