查看消费端成员

This commit is contained in:
许晓东
2021-09-26 21:06:50 +08:00
parent 99438887e5
commit 9deb37e9dc
9 changed files with 249 additions and 6 deletions

View File

@@ -0,0 +1,29 @@
package com.xuxd.kafka.console.beans;
import lombok.Data;
/**
* kafka-console-ui.
*
* @author xuxd
* @date 2021-09-26 20:32:15
**/
@Data
public class TopicPartition implements Comparable {
private final String topic;
private final int partition;
@Override public int compareTo(Object o) {
if (o == null) {
return -1;
}
TopicPartition other = (TopicPartition) o;
if (!this.topic.equals(other.getTopic())) {
return this.compareTo(other);
}
return this.partition - other.partition;
}
}

View File

@@ -0,0 +1,38 @@
package com.xuxd.kafka.console.beans.vo;
import com.xuxd.kafka.console.beans.TopicPartition;
import java.util.Comparator;
import java.util.List;
import java.util.stream.Collectors;
import lombok.Data;
import org.apache.kafka.clients.admin.MemberDescription;
/**
* kafka-console-ui.
*
* @author xuxd
* @date 2021-09-26 20:02:59
**/
@Data
public class ConsumerMemberVO {
private String memberId;
private String groupInstanceId;
private String clientId;
private String host;
private List<TopicPartition> partitions;
public static ConsumerMemberVO from(MemberDescription description) {
ConsumerMemberVO vo = new ConsumerMemberVO();
vo.setMemberId(description.consumerId());
vo.setGroupInstanceId(description.groupInstanceId().orElse(""));
vo.setClientId(description.clientId());
vo.setHost(description.host());
List<TopicPartition> collect = description.assignment().topicPartitions().stream().map(t -> new TopicPartition(t.topic(), t.partition())).collect(Collectors.toList());
collect.sort(Comparator.naturalOrder());
vo.setPartitions(collect);
return vo;
}
}