消费详情接口

This commit is contained in:
许晓东
2021-10-11 21:11:27 +08:00
parent 2baf8d093e
commit 338101396f
5 changed files with 116 additions and 4 deletions

View File

@@ -49,7 +49,7 @@ sh bin/shutdown.sh
除了webstorm是开发前端的ide可以根据自己需要代替jdk scala是必须有的。
# 本地开发配置
以我自己为例开发环境里的工具准备好然后代码clone到本地。
## 后端配置w
## 后端配置
1. 用idea打开项目
2. 打开idea的Project Structure(Settings) -> Modules -> 设置src/main/scala为Sources因为约定src/main/java是源码目录所以这里要再加一个
3. 打开idea的Project Structure(Settings) -> Libraries 添加scala sdk然后选择本地下载的scala 2.13的目录,确定添加进来

View File

@@ -55,4 +55,9 @@ public class ConsumerController {
public Object getConsumerMembers(@RequestParam String groupId) {
return consumerService.getConsumerMembers(groupId);
}
@GetMapping("/detail")
public Object getConsumerDetail(@RequestParam String groupId) {
return consumerService.getConsumerDetail(groupId);
}
}

View File

@@ -18,4 +18,6 @@ public interface ConsumerService {
ResponseData deleteConsumerGroup(String groupId);
ResponseData getConsumerMembers(String groupId);
ResponseData getConsumerDetail(String groupId);
}

View File

@@ -78,4 +78,8 @@ public class ConsumerServiceImpl implements ConsumerService {
vos.sort(Comparator.comparing(ConsumerMemberVO::getClientId));
return ResponseData.create().data(vos).success();
}
@Override public ResponseData getConsumerDetail(String groupId) {
return ResponseData.create().data(consumerConsole.getConsumerDetail(Collections.singleton(groupId))).success();
}
}

View File

@@ -1,13 +1,18 @@
package kafka.console
import java.util
import java.util.concurrent.TimeUnit
import java.util.{Collections, Set}
import com.xuxd.kafka.console.config.KafkaConfig
import org.apache.kafka.clients.admin.{ConsumerGroupDescription, DeleteConsumerGroupsOptions, ListConsumerGroupsOptions}
import org.apache.kafka.common.ConsumerGroupState
import org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo
import org.apache.kafka.clients.admin.{ConsumerGroupDescription, DeleteConsumerGroupsOptions, ListConsumerGroupsOptions, OffsetSpec}
import org.apache.kafka.clients.consumer.OffsetAndMetadata
import org.apache.kafka.common.{ConsumerGroupState, TopicPartition}
import scala.jdk.CollectionConverters.{CollectionHasAsScala, SetHasAsJava}
import scala.beans.BeanProperty
import scala.collection.{Map, Seq, mutable}
import scala.jdk.CollectionConverters._
/**
* kafka-console-ui. kafka consumer console.
@@ -50,4 +55,100 @@ class ConsumerConsole(config: KafkaConfig) extends KafkaConsole(config: KafkaCon
}).asInstanceOf[(Boolean, String)]
}
}
def getConsumerDetail(groupIds: util.Set[String]): util.Collection[TopicPartitionConsumeInfo] = {
// (groupId -> consumerGroup)
val consumerGroups = describeConsumerGroups(groupIds)
val groupOffsets = for ((groupId, consumerGroup) <- consumerGroups) yield {
// consumer group commit offset
val commitOffsets = getCommittedOffsets(groupId)
// get topic offset
def getPartitionOffset(
tp: TopicPartition): Option[Long] = commitOffsets.get(tp).filter(_ != null).map(_.offset)
// val topicOffsets = Map[TopicPartition, Option[Long]]() ++ (for ((t, o) <- commitOffsets) yield t -> o.offset())
val endOffsets = withAdminClientAndCatchError(admin => {
val endOffsets = commitOffsets.keySet.map { topicPartition =>
topicPartition -> OffsetSpec.latest
}.toMap
admin.listOffsets(endOffsets.asJava).all().get(timeoutMs, TimeUnit.MILLISECONDS)
}, e => {
log.error("listOffsets error.", e)
Collections.emptyMap()
}).asInstanceOf[util.Map[TopicPartition, ListOffsetsResultInfo]].asScala
val topicPartitionConsumeInfoMap = commitOffsets.keySet.map(topicPartition => {
val t = new TopicPartitionConsumeInfo
t.topicPartition = topicPartition
t.logEndOffset = endOffsets.get(t.topicPartition).get.offset()
t.consumerOffset = getPartitionOffset(t.topicPartition).get
t.lag = t.logEndOffset - t.consumerOffset
(topicPartition, t)
}).toMap
consumerGroup.members().asScala.filter(!_.assignment().topicPartitions().isEmpty).foreach(m => {
m.assignment().topicPartitions().asScala.foreach(topicPartition => {
val t = topicPartitionConsumeInfoMap.get(topicPartition).get
t.clientId = m.clientId()
t.consumerId = m.consumerId()
})
})
topicPartitionConsumeInfoMap
}
groupOffsets.asJava.asInstanceOf[ util.Collection[TopicPartitionConsumeInfo]]
}
private def describeConsumerGroups(groupIds: util.Set[String]): mutable.Map[String, ConsumerGroupDescription] = {
withAdminClientAndCatchError(admin => {
admin.describeConsumerGroups(groupIds).describedGroups().asScala.map {
case (groupId, groupDescriptionFuture) => (groupId, groupDescriptionFuture.get())
}
}, e => {
log.error("describeConsumerGroups error.", e)
mutable.Map.empty
}).asInstanceOf[mutable.Map[String, ConsumerGroupDescription]]
}
private def getCommittedOffsets(groupId: String): Map[TopicPartition, OffsetAndMetadata] = {
withAdminClientAndCatchError(admin => {
admin.listConsumerGroupOffsets(
groupId
).partitionsToOffsetAndMetadata.get.asScala
}, e => {
log.error("describeConsumerGroups error.", e)
mutable.Map.empty
}).asInstanceOf[Map[TopicPartition, OffsetAndMetadata]]
}
class TopicPartitionConsumeInfo {
@BeanProperty
var topicPartition: TopicPartition = null
@BeanProperty
var groupId = ""
@BeanProperty
var consumerOffset: Long = 0L
@BeanProperty
var logEndOffset: Long = 0L
@BeanProperty
var lag = 0L
@BeanProperty
var consumerId = ""
@BeanProperty
var clientId = ""
@BeanProperty
var host = ""
}
}