diff --git a/README.md b/README.md index 23f6227..0d68727 100644 --- a/README.md +++ b/README.md @@ -49,7 +49,7 @@ sh bin/shutdown.sh 除了webstorm是开发前端的ide可以根据自己需要代替,jdk scala是必须有的。 # 本地开发配置 以我自己为例,开发环境里的工具准备好,然后代码clone到本地。 -## 后端配置w +## 后端配置 1. 用idea打开项目 2. 打开idea的Project Structure(Settings) -> Modules -> 设置src/main/scala为Sources,因为约定src/main/java是源码目录,所以这里要再加一个 3. 打开idea的Project Structure(Settings) -> Libraries 添加scala sdk,然后选择本地下载的scala 2.13的目录,确定添加进来 diff --git a/src/main/java/com/xuxd/kafka/console/controller/ConsumerController.java b/src/main/java/com/xuxd/kafka/console/controller/ConsumerController.java index bb0a7c4..5316260 100644 --- a/src/main/java/com/xuxd/kafka/console/controller/ConsumerController.java +++ b/src/main/java/com/xuxd/kafka/console/controller/ConsumerController.java @@ -55,4 +55,9 @@ public class ConsumerController { public Object getConsumerMembers(@RequestParam String groupId) { return consumerService.getConsumerMembers(groupId); } + + @GetMapping("/detail") + public Object getConsumerDetail(@RequestParam String groupId) { + return consumerService.getConsumerDetail(groupId); + } } diff --git a/src/main/java/com/xuxd/kafka/console/service/ConsumerService.java b/src/main/java/com/xuxd/kafka/console/service/ConsumerService.java index 5ad1a9f..e821ef3 100644 --- a/src/main/java/com/xuxd/kafka/console/service/ConsumerService.java +++ b/src/main/java/com/xuxd/kafka/console/service/ConsumerService.java @@ -18,4 +18,6 @@ public interface ConsumerService { ResponseData deleteConsumerGroup(String groupId); ResponseData getConsumerMembers(String groupId); + + ResponseData getConsumerDetail(String groupId); } diff --git a/src/main/java/com/xuxd/kafka/console/service/impl/ConsumerServiceImpl.java b/src/main/java/com/xuxd/kafka/console/service/impl/ConsumerServiceImpl.java index 246237a..fc8bae5 100644 --- a/src/main/java/com/xuxd/kafka/console/service/impl/ConsumerServiceImpl.java +++ b/src/main/java/com/xuxd/kafka/console/service/impl/ConsumerServiceImpl.java @@ -78,4 +78,8 @@ public class ConsumerServiceImpl implements ConsumerService { vos.sort(Comparator.comparing(ConsumerMemberVO::getClientId)); return ResponseData.create().data(vos).success(); } + + @Override public ResponseData getConsumerDetail(String groupId) { + return ResponseData.create().data(consumerConsole.getConsumerDetail(Collections.singleton(groupId))).success(); + } } diff --git a/src/main/scala/kafka/console/ConsumerConsole.scala b/src/main/scala/kafka/console/ConsumerConsole.scala index 7c425f5..18fbc66 100644 --- a/src/main/scala/kafka/console/ConsumerConsole.scala +++ b/src/main/scala/kafka/console/ConsumerConsole.scala @@ -1,13 +1,18 @@ package kafka.console import java.util +import java.util.concurrent.TimeUnit import java.util.{Collections, Set} import com.xuxd.kafka.console.config.KafkaConfig -import org.apache.kafka.clients.admin.{ConsumerGroupDescription, DeleteConsumerGroupsOptions, ListConsumerGroupsOptions} -import org.apache.kafka.common.ConsumerGroupState +import org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo +import org.apache.kafka.clients.admin.{ConsumerGroupDescription, DeleteConsumerGroupsOptions, ListConsumerGroupsOptions, OffsetSpec} +import org.apache.kafka.clients.consumer.OffsetAndMetadata +import org.apache.kafka.common.{ConsumerGroupState, TopicPartition} -import scala.jdk.CollectionConverters.{CollectionHasAsScala, SetHasAsJava} +import scala.beans.BeanProperty +import scala.collection.{Map, Seq, mutable} +import scala.jdk.CollectionConverters._ /** * kafka-console-ui. kafka consumer console. @@ -50,4 +55,100 @@ class ConsumerConsole(config: KafkaConfig) extends KafkaConsole(config: KafkaCon }).asInstanceOf[(Boolean, String)] } } + + def getConsumerDetail(groupIds: util.Set[String]): util.Collection[TopicPartitionConsumeInfo] = { + // (groupId -> consumerGroup) + val consumerGroups = describeConsumerGroups(groupIds) + + val groupOffsets = for ((groupId, consumerGroup) <- consumerGroups) yield { + // consumer group commit offset + val commitOffsets = getCommittedOffsets(groupId) + + // get topic offset + def getPartitionOffset( + tp: TopicPartition): Option[Long] = commitOffsets.get(tp).filter(_ != null).map(_.offset) + + // val topicOffsets = Map[TopicPartition, Option[Long]]() ++ (for ((t, o) <- commitOffsets) yield t -> o.offset()) + + val endOffsets = withAdminClientAndCatchError(admin => { + val endOffsets = commitOffsets.keySet.map { topicPartition => + topicPartition -> OffsetSpec.latest + }.toMap + admin.listOffsets(endOffsets.asJava).all().get(timeoutMs, TimeUnit.MILLISECONDS) + }, e => { + log.error("listOffsets error.", e) + Collections.emptyMap() + }).asInstanceOf[util.Map[TopicPartition, ListOffsetsResultInfo]].asScala + + val topicPartitionConsumeInfoMap = commitOffsets.keySet.map(topicPartition => { + val t = new TopicPartitionConsumeInfo + t.topicPartition = topicPartition + t.logEndOffset = endOffsets.get(t.topicPartition).get.offset() + t.consumerOffset = getPartitionOffset(t.topicPartition).get + t.lag = t.logEndOffset - t.consumerOffset + (topicPartition, t) + }).toMap + + consumerGroup.members().asScala.filter(!_.assignment().topicPartitions().isEmpty).foreach(m => { + m.assignment().topicPartitions().asScala.foreach(topicPartition => { + val t = topicPartitionConsumeInfoMap.get(topicPartition).get + t.clientId = m.clientId() + t.consumerId = m.consumerId() + }) + }) + + topicPartitionConsumeInfoMap + } + groupOffsets.asJava.asInstanceOf[ util.Collection[TopicPartitionConsumeInfo]] + } + + private def describeConsumerGroups(groupIds: util.Set[String]): mutable.Map[String, ConsumerGroupDescription] = { + withAdminClientAndCatchError(admin => { + admin.describeConsumerGroups(groupIds).describedGroups().asScala.map { + case (groupId, groupDescriptionFuture) => (groupId, groupDescriptionFuture.get()) + } + }, e => { + log.error("describeConsumerGroups error.", e) + mutable.Map.empty + }).asInstanceOf[mutable.Map[String, ConsumerGroupDescription]] + } + + private def getCommittedOffsets(groupId: String): Map[TopicPartition, OffsetAndMetadata] = { + withAdminClientAndCatchError(admin => { + admin.listConsumerGroupOffsets( + groupId + ).partitionsToOffsetAndMetadata.get.asScala + }, e => { + log.error("describeConsumerGroups error.", e) + mutable.Map.empty + }).asInstanceOf[Map[TopicPartition, OffsetAndMetadata]] + } + + class TopicPartitionConsumeInfo { + + @BeanProperty + var topicPartition: TopicPartition = null + + @BeanProperty + var groupId = "" + + @BeanProperty + var consumerOffset: Long = 0L + + @BeanProperty + var logEndOffset: Long = 0L + + @BeanProperty + var lag = 0L + + @BeanProperty + var consumerId = "" + + @BeanProperty + var clientId = "" + + @BeanProperty + var host = "" + } + }