消费提交位移分区

This commit is contained in:
许晓东
2021-11-14 22:10:04 +08:00
parent 1f3daec445
commit a6ddb913f6
6 changed files with 126 additions and 1 deletions

View File

@@ -122,4 +122,9 @@ public class ConsumerController {
public Object getTopicSubscribedByGroups(@RequestParam String topic) {
return consumerService.getTopicSubscribedByGroups(topic);
}
@GetMapping("/offset/partition")
public Object getOffsetPartition(@RequestParam String groupId) {
return consumerService.getOffsetPartition(groupId);
}
}

View File

@@ -36,4 +36,6 @@ public interface ConsumerService {
ResponseData getSubscribeTopicList(String groupId);
ResponseData getTopicSubscribedByGroups(String topic);
ResponseData getOffsetPartition(String groupId);
}

View File

@@ -17,12 +17,15 @@ import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import kafka.console.ConsumerConsole;
import kafka.console.TopicConsole;
import org.apache.commons.collections.CollectionUtils;
import org.apache.kafka.clients.admin.ConsumerGroupDescription;
import org.apache.kafka.clients.admin.MemberDescription;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.common.ConsumerGroupState;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.internals.Topic;
import org.apache.kafka.common.utils.Utils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@@ -42,6 +45,9 @@ public class ConsumerServiceImpl implements ConsumerService {
private TopicSubscribedInfo topicSubscribedInfo = new TopicSubscribedInfo();
@Autowired
private TopicConsole topicConsole;
@Override public ResponseData getConsumerGroupList(List<String> groupIds, Set<ConsumerGroupState> states) {
String simulateGroup = "inner_xxx_not_exit_group_###" + System.currentTimeMillis();
Set<String> groupList = new HashSet<>();
@@ -197,6 +203,15 @@ public class ConsumerServiceImpl implements ConsumerService {
return ResponseData.create().data(res).success();
}
@Override public ResponseData getOffsetPartition(String groupId) {
List<TopicDescription> topicList = topicConsole.getTopicList(Collections.singleton(Topic.GROUP_METADATA_TOPIC_NAME));
if (topicList.isEmpty()) {
return ResponseData.create().failed(Topic.GROUP_METADATA_TOPIC_NAME + " is null.");
}
int size = topicList.get(0).partitions().size();
return ResponseData.create().data(Utils.abs(groupId.hashCode()) % size);
}
class TopicSubscribedInfo {
long lastTime = System.currentTimeMillis();

View File

@@ -156,6 +156,10 @@ export const KafkaConsumerApi = {
url: "/consumer/topic/subscribed",
method: "get",
},
getOffsetPartition: {
url: "/consumer/offset/partition",
method: "get",
},
};
export const KafkaClusterApi = {

View File

@@ -107,6 +107,13 @@
@click="openConsumerDetailDialog(record.groupId)"
>消费详情
</a-button>
<a-button
size="small"
href="javascript:;"
class="operation-btn"
@click="openOffsetPartitionDialog(record.groupId)"
>位移分区
</a-button>
</div>
</a-table>
<Member
@@ -125,6 +132,11 @@
@closeAddSubscriptionDialog="closeAddSubscriptionDialog"
>
</AddSupscription>
<OffsetTopicPartition
:visible="showOffsetPartitionDialog"
:group="selectDetail.resourceName"
@closeOffsetPartitionDialog="closeOffsetPartitionDialog"
></OffsetTopicPartition>
</div>
</a-spin>
</div>
@@ -137,10 +149,11 @@ import notification from "ant-design-vue/es/notification";
import Member from "@/views/group/Member";
import ConsumerDetail from "@/views/group/ConsumerDetail";
import AddSupscription from "@/views/group/AddSupscription";
import OffsetTopicPartition from "@/views/group/OffsetTopicPartition";
export default {
name: "ConsumerGroup",
components: { Member, ConsumerDetail, AddSupscription },
components: { Member, ConsumerDetail, AddSupscription, OffsetTopicPartition },
data() {
return {
queryParam: {},
@@ -161,6 +174,7 @@ export default {
showConsumerGroupDialog: false,
showConsumerDetailDialog: false,
showAddSubscriptionDialog: false,
showOffsetPartitionDialog: false,
};
},
methods: {
@@ -226,6 +240,13 @@ export default {
this.getConsumerGroupList();
}
},
openOffsetPartitionDialog(groupId) {
this.showOffsetPartitionDialog = true;
this.selectDetail.resourceName = groupId;
},
closeOffsetPartitionDialog() {
this.showOffsetPartitionDialog = false;
},
},
created() {
this.getConsumerGroupList();

View File

@@ -0,0 +1,78 @@
<template>
<a-modal
title="位移主题分区"
:visible="show"
:width="800"
:mask="false"
:destroyOnClose="true"
:footer="null"
:maskClosable="false"
@cancel="handleCancel"
>
<div>
<a-spin :spinning="loading">
{{ group }}提交位移到位移主题的[{{ data }}]分区
</a-spin>
</div>
</a-modal>
</template>
<script>
import request from "@/utils/request";
import { KafkaConsumerApi } from "@/utils/api";
import notification from "ant-design-vue/es/notification";
export default {
name: "OffsetTopicPartition",
props: {
group: {
type: String,
default: "",
},
visible: {
type: Boolean,
default: false,
},
},
data() {
return {
show: this.visible,
data: [],
loading: false,
};
},
watch: {
visible(v) {
this.show = v;
if (this.show) {
this.getOffsetPartition();
}
},
},
methods: {
getOffsetPartition() {
this.loading = true;
request({
url: KafkaConsumerApi.getOffsetPartition.url + "?groupId=" + this.group,
method: KafkaConsumerApi.getOffsetPartition.method,
}).then((res) => {
this.loading = false;
if (res.code != 0) {
notification.error({
message: "error",
description: res.msg,
});
} else {
this.data = res.data;
}
});
},
handleCancel() {
this.data = [];
this.$emit("closeOffsetPartitionDialog", {});
},
},
};
</script>
<style scoped></style>