diff --git a/src/main/java/com/xuxd/kafka/console/controller/ConsumerController.java b/src/main/java/com/xuxd/kafka/console/controller/ConsumerController.java index a9b9ab3..8418abd 100644 --- a/src/main/java/com/xuxd/kafka/console/controller/ConsumerController.java +++ b/src/main/java/com/xuxd/kafka/console/controller/ConsumerController.java @@ -122,4 +122,9 @@ public class ConsumerController { public Object getTopicSubscribedByGroups(@RequestParam String topic) { return consumerService.getTopicSubscribedByGroups(topic); } + + @GetMapping("/offset/partition") + public Object getOffsetPartition(@RequestParam String groupId) { + return consumerService.getOffsetPartition(groupId); + } } diff --git a/src/main/java/com/xuxd/kafka/console/service/ConsumerService.java b/src/main/java/com/xuxd/kafka/console/service/ConsumerService.java index 5a5f9bc..cd4726c 100644 --- a/src/main/java/com/xuxd/kafka/console/service/ConsumerService.java +++ b/src/main/java/com/xuxd/kafka/console/service/ConsumerService.java @@ -36,4 +36,6 @@ public interface ConsumerService { ResponseData getSubscribeTopicList(String groupId); ResponseData getTopicSubscribedByGroups(String topic); + + ResponseData getOffsetPartition(String groupId); } diff --git a/src/main/java/com/xuxd/kafka/console/service/impl/ConsumerServiceImpl.java b/src/main/java/com/xuxd/kafka/console/service/impl/ConsumerServiceImpl.java index cdf9492..a40c97d 100644 --- a/src/main/java/com/xuxd/kafka/console/service/impl/ConsumerServiceImpl.java +++ b/src/main/java/com/xuxd/kafka/console/service/impl/ConsumerServiceImpl.java @@ -17,12 +17,15 @@ import java.util.Map; import java.util.Set; import java.util.stream.Collectors; import kafka.console.ConsumerConsole; +import kafka.console.TopicConsole; import org.apache.commons.collections.CollectionUtils; import org.apache.kafka.clients.admin.ConsumerGroupDescription; import org.apache.kafka.clients.admin.MemberDescription; +import org.apache.kafka.clients.admin.TopicDescription; import org.apache.kafka.clients.consumer.OffsetResetStrategy; import org.apache.kafka.common.ConsumerGroupState; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.internals.Topic; import org.apache.kafka.common.utils.Utils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; @@ -42,6 +45,9 @@ public class ConsumerServiceImpl implements ConsumerService { private TopicSubscribedInfo topicSubscribedInfo = new TopicSubscribedInfo(); + @Autowired + private TopicConsole topicConsole; + @Override public ResponseData getConsumerGroupList(List groupIds, Set states) { String simulateGroup = "inner_xxx_not_exit_group_###" + System.currentTimeMillis(); Set groupList = new HashSet<>(); @@ -197,6 +203,15 @@ public class ConsumerServiceImpl implements ConsumerService { return ResponseData.create().data(res).success(); } + @Override public ResponseData getOffsetPartition(String groupId) { + List topicList = topicConsole.getTopicList(Collections.singleton(Topic.GROUP_METADATA_TOPIC_NAME)); + if (topicList.isEmpty()) { + return ResponseData.create().failed(Topic.GROUP_METADATA_TOPIC_NAME + " is null."); + } + int size = topicList.get(0).partitions().size(); + return ResponseData.create().data(Utils.abs(groupId.hashCode()) % size); + } + class TopicSubscribedInfo { long lastTime = System.currentTimeMillis(); diff --git a/ui/src/utils/api.js b/ui/src/utils/api.js index f5e689d..0b96958 100644 --- a/ui/src/utils/api.js +++ b/ui/src/utils/api.js @@ -156,6 +156,10 @@ export const KafkaConsumerApi = { url: "/consumer/topic/subscribed", method: "get", }, + getOffsetPartition: { + url: "/consumer/offset/partition", + method: "get", + }, }; export const KafkaClusterApi = { diff --git a/ui/src/views/group/Group.vue b/ui/src/views/group/Group.vue index 1c645c0..df25b91 100644 --- a/ui/src/views/group/Group.vue +++ b/ui/src/views/group/Group.vue @@ -107,6 +107,13 @@ @click="openConsumerDetailDialog(record.groupId)" >消费详情 + 位移分区 + + @@ -137,10 +149,11 @@ import notification from "ant-design-vue/es/notification"; import Member from "@/views/group/Member"; import ConsumerDetail from "@/views/group/ConsumerDetail"; import AddSupscription from "@/views/group/AddSupscription"; +import OffsetTopicPartition from "@/views/group/OffsetTopicPartition"; export default { name: "ConsumerGroup", - components: { Member, ConsumerDetail, AddSupscription }, + components: { Member, ConsumerDetail, AddSupscription, OffsetTopicPartition }, data() { return { queryParam: {}, @@ -161,6 +174,7 @@ export default { showConsumerGroupDialog: false, showConsumerDetailDialog: false, showAddSubscriptionDialog: false, + showOffsetPartitionDialog: false, }; }, methods: { @@ -226,6 +240,13 @@ export default { this.getConsumerGroupList(); } }, + openOffsetPartitionDialog(groupId) { + this.showOffsetPartitionDialog = true; + this.selectDetail.resourceName = groupId; + }, + closeOffsetPartitionDialog() { + this.showOffsetPartitionDialog = false; + }, }, created() { this.getConsumerGroupList(); diff --git a/ui/src/views/group/OffsetTopicPartition.vue b/ui/src/views/group/OffsetTopicPartition.vue new file mode 100644 index 0000000..d8d8f96 --- /dev/null +++ b/ui/src/views/group/OffsetTopicPartition.vue @@ -0,0 +1,78 @@ + + + + +