集群同步-》同步消费位点

This commit is contained in:
许晓东
2021-10-25 00:10:08 +08:00
parent 66e7ea0676
commit 5ccf9013e5
13 changed files with 233 additions and 6 deletions

View File

@@ -173,6 +173,18 @@ class ConsumerConsole(config: KafkaConfig) extends KafkaConsole(config: KafkaCon
}).asInstanceOf[(Boolean, String)]
}
def listSubscribeTopics(groupId: String): util.Map[String, util.List[TopicPartition]] = {
val commitOffs = getCommittedOffsets(groupId)
val map: util.Map[String, util.List[TopicPartition]] = new util.HashMap[String, util.List[TopicPartition]]()
for (t <- commitOffs.keySet) {
if (!map.containsKey(t.topic())) {
map.put(t.topic(), new util.ArrayList[TopicPartition]())
}
map.get(t.topic()).add(t)
}
map
}
private def describeConsumerGroups(groupIds: util.Set[String]): mutable.Map[String, ConsumerGroupDescription] = {
withAdminClientAndCatchError(admin => {
admin.describeConsumerGroups(groupIds).describedGroups().asScala.map {