consumer group search and delete function

This commit is contained in:
许晓东
2021-09-11 16:19:13 +08:00
parent d36791e600
commit 968d053bcd
8 changed files with 101 additions and 43 deletions

View File

@@ -1,10 +0,0 @@
package com.xuxd.kafka.console;
/**
* kafka-console-ui.
*
* @author xuxd
* @date 2021-09-10 20:03:01
**/
public class CounterSet {
}

View File

@@ -14,5 +14,5 @@ public class QueryConsumerGroupDTO {
private String groupId; private String groupId;
private List<String> State; private List<String> states;
} }

View File

@@ -11,9 +11,11 @@ import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.common.ConsumerGroupState; import org.apache.kafka.common.ConsumerGroupState;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.DeleteMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController; import org.springframework.web.bind.annotation.RestController;
/** /**
@@ -29,7 +31,7 @@ public class ConsumerController {
@Autowired @Autowired
private ConsumerService consumerService; private ConsumerService consumerService;
@GetMapping("/group/list") @PostMapping("/group/list")
public Object getGroupList(@RequestBody(required = false) QueryConsumerGroupDTO dto) { public Object getGroupList(@RequestBody(required = false) QueryConsumerGroupDTO dto) {
if (Objects.isNull(dto)) { if (Objects.isNull(dto)) {
return consumerService.getConsumerGroupList(null, null); return consumerService.getConsumerGroupList(null, null);
@@ -37,9 +39,14 @@ public class ConsumerController {
List<String> groupIdList = StringUtils.isNotBlank(dto.getGroupId()) ? Collections.singletonList(dto.getGroupId()) : Collections.emptyList(); List<String> groupIdList = StringUtils.isNotBlank(dto.getGroupId()) ? Collections.singletonList(dto.getGroupId()) : Collections.emptyList();
Set<ConsumerGroupState> stateSet = new HashSet<>(); Set<ConsumerGroupState> stateSet = new HashSet<>();
if (CollectionUtils.isNotEmpty(dto.getState())) { if (CollectionUtils.isNotEmpty(dto.getStates())) {
dto.getState().stream().forEach(s -> stateSet.add(ConsumerGroupState.valueOf(s))); dto.getStates().stream().forEach(s -> stateSet.add(ConsumerGroupState.valueOf(s.toUpperCase())));
} }
return consumerService.getConsumerGroupList(groupIdList, stateSet); return consumerService.getConsumerGroupList(groupIdList, stateSet);
} }
@DeleteMapping("/group")
public Object deleteConsumerGroup(@RequestParam String groupId) {
return consumerService.deleteConsumerGroup(groupId);
}
} }

View File

@@ -14,4 +14,6 @@ import org.apache.kafka.common.ConsumerGroupState;
public interface ConsumerService { public interface ConsumerService {
ResponseData getConsumerGroupList(List<String> groupIds, Set<ConsumerGroupState> states); ResponseData getConsumerGroupList(List<String> groupIds, Set<ConsumerGroupState> states);
ResponseData deleteConsumerGroup(String groupId);
} }

View File

@@ -5,6 +5,7 @@ import com.xuxd.kafka.console.beans.ResponseData;
import com.xuxd.kafka.console.beans.vo.ConsumerGroupVO; import com.xuxd.kafka.console.beans.vo.ConsumerGroupVO;
import com.xuxd.kafka.console.service.ConsumerService; import com.xuxd.kafka.console.service.ConsumerService;
import java.util.Collections; import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
@@ -13,6 +14,7 @@ import kafka.console.ConsumerConsole;
import org.apache.kafka.common.ConsumerGroupState; import org.apache.kafka.common.ConsumerGroupState;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import scala.Tuple2;
/** /**
* kafka-console-ui. * kafka-console-ui.
@@ -44,6 +46,12 @@ public class ConsumerServiceImpl implements ConsumerService {
groupList.addAll(consumerConsole.getConsumerGroupIdList(states)); groupList.addAll(consumerConsole.getConsumerGroupIdList(states));
} }
List<ConsumerGroupVO> consumerGroupVOS = consumerConsole.getConsumerGroupList(groupList).stream().map(c -> ConsumerGroupVO.from(c)).collect(Collectors.toList()); List<ConsumerGroupVO> consumerGroupVOS = consumerConsole.getConsumerGroupList(groupList).stream().map(c -> ConsumerGroupVO.from(c)).collect(Collectors.toList());
consumerGroupVOS.sort(Comparator.comparing(ConsumerGroupVO::getGroupId));
return ResponseData.create().data(new CounterList<>(consumerGroupVOS)).success(); return ResponseData.create().data(new CounterList<>(consumerGroupVOS)).success();
} }
@Override public ResponseData deleteConsumerGroup(String groupId) {
Tuple2<Object, String> tuple2 = consumerConsole.deleteConsumerGroups(Collections.singletonList(groupId));
return (Boolean) tuple2._1 ? ResponseData.create().success() : ResponseData.create().failed(tuple2._2);
}
} }

View File

@@ -4,7 +4,7 @@ import java.util
import java.util.{Collections, Set} import java.util.{Collections, Set}
import com.xuxd.kafka.console.config.KafkaConfig import com.xuxd.kafka.console.config.KafkaConfig
import org.apache.kafka.clients.admin.{ConsumerGroupDescription, ListConsumerGroupsOptions} import org.apache.kafka.clients.admin.{ConsumerGroupDescription, DeleteConsumerGroupsOptions, ListConsumerGroupsOptions}
import org.apache.kafka.common.ConsumerGroupState import org.apache.kafka.common.ConsumerGroupState
import scala.jdk.CollectionConverters.{CollectionHasAsScala, SetHasAsJava} import scala.jdk.CollectionConverters.{CollectionHasAsScala, SetHasAsJava}
@@ -35,4 +35,19 @@ class ConsumerConsole(config: KafkaConfig) extends KafkaConsole(config: KafkaCon
Collections.emptySet() Collections.emptySet()
}).asInstanceOf[Set[ConsumerGroupDescription]] }).asInstanceOf[Set[ConsumerGroupDescription]]
} }
def deleteConsumerGroups(groupIds: util.Collection[String]): (Boolean, String) = {
if (groupIds == null || groupIds.isEmpty) {
(false, "group id is empty.")
} else {
withAdminClientAndCatchError(admin => {
admin.deleteConsumerGroups(groupIds, new DeleteConsumerGroupsOptions).all().get()
(true, "")
}
, e => {
log.error("deleteConsumerGroups error.", e)
(false, e.getMessage)
}).asInstanceOf[(Boolean, String)]
}
}
} }

View File

@@ -70,6 +70,10 @@ export const KafkaTopicApi = {
export const KafkaConsumerApi = { export const KafkaConsumerApi = {
getConsumerGroupList: { getConsumerGroupList: {
url: "/consumer/group/list", url: "/consumer/group/list",
method: "get", method: "post",
},
deleteConsumerGroup: {
url: "/consumer/group",
method: "delete",
}, },
}; };

View File

@@ -9,29 +9,43 @@
> >
<a-row :gutter="24"> <a-row :gutter="24">
<a-col :span="8"> <a-col :span="8">
<a-form-item :label="`topic`"> <a-form-item :label="`消费组`">
<a-input <a-input
placeholder="topic" placeholder="groupId"
class="input-w" class="input-w"
v-decorator="['topic']" v-decorator="['groupId']"
/> />
</a-form-item> </a-form-item>
</a-col> </a-col>
<a-col :span="12">
<a-form-item :label="`状态`">
<a-checkbox-group v-decorator="['states']" style="width: 100%">
<a-row>
<a-col :span="8"> <a-col :span="8">
<a-form-item :label="`类型`"> <a-checkbox value="Empty"> Empty</a-checkbox>
<a-select </a-col>
class="type-select" <a-col :span="8">
v-decorator="['type', { initialValue: 'all' }]" <a-checkbox value="PreparingRebalance">
placeholder="Please select a country" PreparingRebalance
> </a-checkbox>
<a-select-option value="all"> 所有 </a-select-option> </a-col>
<a-select-option value="normal"> 普通 </a-select-option> <a-col :span="8">
<a-select-option value="system"> 系统 </a-select-option> <a-checkbox value="CompletingRebalance">
</a-select> CompletingRebalance
</a-checkbox>
</a-col>
<a-col :span="8">
<a-checkbox value="Stable"> Stable</a-checkbox>
</a-col>
<a-col :span="8">
<a-checkbox value="Dead"> Dead</a-checkbox>
</a-col>
</a-row>
</a-checkbox-group>
</a-form-item> </a-form-item>
</a-col> </a-col>
<a-col :span="8" :style="{ textAlign: 'right' }"> <a-col :span="4" :style="{ textAlign: 'right' }">
<a-form-item> <a-form-item>
<a-button type="primary" html-type="submit"> 搜索</a-button> <a-button type="primary" html-type="submit"> 搜索</a-button>
<a-button :style="{ marginLeft: '8px' }" @click="handleReset"> <a-button :style="{ marginLeft: '8px' }" @click="handleReset">
@@ -65,11 +79,11 @@
:title="'删除消费组: ' + record.groupId + ''" :title="'删除消费组: ' + record.groupId + ''"
ok-text="确认" ok-text="确认"
cancel-text="取消" cancel-text="取消"
@confirm="deleteTopic(record.groupId)" @confirm="deleteGroup(record.groupId)"
> >
<a-button size="small" href="javascript:;" class="operation-btn" <a-button size="small" href="javascript:;" class="operation-btn"
>删除</a-button >删除
> </a-button>
</a-popconfirm> </a-popconfirm>
</div> </div>
</a-table> </a-table>
@@ -79,18 +93,21 @@
<script> <script>
import request from "@/utils/request"; import request from "@/utils/request";
import { KafkaTopicApi, KafkaConsumerApi } from "@/utils/api"; import { KafkaConsumerApi } from "@/utils/api";
import notification from "ant-design-vue/es/notification"; import notification from "ant-design-vue/es/notification";
export default { export default {
name: "ConsumerGroup", name: "ConsumerGroup",
components: {}, components: {},
data() { data() {
return { return {
queryParam: { type: "all" }, queryParam: {},
data: [], data: [],
columns, columns,
selectRow: {}, selectRow: {},
form: this.$form.createForm(this, { name: "topic_advanced_search" }), form: this.$form.createForm(this, {
name: "consumer_group_advanced_search",
}),
showUpdateUser: false, showUpdateUser: false,
deleteUserConfirm: false, deleteUserConfirm: false,
selectDetail: { selectDetail: {
@@ -111,19 +128,19 @@ export default {
}, },
getConsumerGroupList() { getConsumerGroupList() {
// Object.assign(this.queryParam, this.form.getFieldsValue()); Object.assign(this.queryParam, this.form.getFieldsValue());
request({ request({
url: KafkaConsumerApi.getConsumerGroupList.url, url: KafkaConsumerApi.getConsumerGroupList.url,
method: KafkaConsumerApi.getConsumerGroupList.method, method: KafkaConsumerApi.getConsumerGroupList.method,
params: this.queryParam, data: this.queryParam,
}).then((res) => { }).then((res) => {
this.data = res.data.list; this.data = res.data.list;
}); });
}, },
deleteTopic(topic) { deleteGroup(group) {
request({ request({
url: KafkaTopicApi.deleteTopic.url + "?topic=" + topic, url: KafkaConsumerApi.deleteConsumerGroup.url + "?groupId=" + group,
method: KafkaTopicApi.deleteTopic.method, method: KafkaConsumerApi.deleteConsumerGroup.method,
}).then((res) => { }).then((res) => {
if (res.code == 0) { if (res.code == 0) {
this.$message.success(res.msg); this.$message.success(res.msg);
@@ -163,6 +180,21 @@ const columns = [
slots: { title: "state" }, slots: { title: "state" },
scopedSlots: { customRender: "state" }, scopedSlots: { customRender: "state" },
}, },
{
title: "分区分配器",
dataIndex: "partitionAssignor",
key: "partitionAssignor",
},
{
title: "协调者节点",
dataIndex: "coordinator",
key: "coordinator",
},
// {
// title: "授权操作数量",
// dataIndex: "authorizedOperations",
// key: "authorizedOperations",
// },
{ {
title: "操作", title: "操作",
key: "operation", key: "operation",