diff --git a/src/main/java/com/xuxd/kafka/console/beans/dto/AddSubscriptionDTO.java b/src/main/java/com/xuxd/kafka/console/beans/dto/AddSubscriptionDTO.java new file mode 100644 index 0000000..2c149ac --- /dev/null +++ b/src/main/java/com/xuxd/kafka/console/beans/dto/AddSubscriptionDTO.java @@ -0,0 +1,17 @@ +package com.xuxd.kafka.console.beans.dto; + +import lombok.Data; + +/** + * kafka-console-ui. + * + * @author xuxd + * @date 2021-10-21 15:43:56 + **/ +@Data +public class AddSubscriptionDTO { + + private String groupId; + + private String topic; +} diff --git a/src/main/java/com/xuxd/kafka/console/controller/ConsumerController.java b/src/main/java/com/xuxd/kafka/console/controller/ConsumerController.java index 5316260..163fe81 100644 --- a/src/main/java/com/xuxd/kafka/console/controller/ConsumerController.java +++ b/src/main/java/com/xuxd/kafka/console/controller/ConsumerController.java @@ -1,5 +1,6 @@ package com.xuxd.kafka.console.controller; +import com.xuxd.kafka.console.beans.dto.AddSubscriptionDTO; import com.xuxd.kafka.console.beans.dto.QueryConsumerGroupDTO; import com.xuxd.kafka.console.service.ConsumerService; import java.util.Collections; @@ -60,4 +61,9 @@ public class ConsumerController { public Object getConsumerDetail(@RequestParam String groupId) { return consumerService.getConsumerDetail(groupId); } + + @PostMapping("/subscription") + public Object addSubscription(@RequestBody AddSubscriptionDTO subscriptionDTO) { + return consumerService.addSubscription(subscriptionDTO.getGroupId(), subscriptionDTO.getTopic()); + } } diff --git a/src/main/java/com/xuxd/kafka/console/controller/TopicController.java b/src/main/java/com/xuxd/kafka/console/controller/TopicController.java index 60cde4a..986bbed 100644 --- a/src/main/java/com/xuxd/kafka/console/controller/TopicController.java +++ b/src/main/java/com/xuxd/kafka/console/controller/TopicController.java @@ -30,6 +30,11 @@ public class TopicController { @Autowired private TopicService topicService; + @GetMapping + public Object getTopicNameList() { + return topicService.getTopicNameList(false); + } + @GetMapping("/list") public Object getTopicList(@RequestParam(required = false) String topic, @RequestParam String type) { return topicService.getTopicList(topic, TopicType.valueOf(type.toUpperCase())); diff --git a/src/main/java/com/xuxd/kafka/console/service/ConsumerService.java b/src/main/java/com/xuxd/kafka/console/service/ConsumerService.java index e821ef3..53c2df2 100644 --- a/src/main/java/com/xuxd/kafka/console/service/ConsumerService.java +++ b/src/main/java/com/xuxd/kafka/console/service/ConsumerService.java @@ -20,4 +20,6 @@ public interface ConsumerService { ResponseData getConsumerMembers(String groupId); ResponseData getConsumerDetail(String groupId); + + ResponseData addSubscription(String groupId, String topic); } diff --git a/src/main/java/com/xuxd/kafka/console/service/TopicService.java b/src/main/java/com/xuxd/kafka/console/service/TopicService.java index 6901386..31ac5f5 100644 --- a/src/main/java/com/xuxd/kafka/console/service/TopicService.java +++ b/src/main/java/com/xuxd/kafka/console/service/TopicService.java @@ -15,7 +15,7 @@ import org.apache.kafka.clients.admin.NewTopic; **/ public interface TopicService { - ResponseData getTopicNameList(); + ResponseData getTopicNameList(boolean internal); ResponseData getTopicList(String topic, TopicType type); diff --git a/src/main/java/com/xuxd/kafka/console/service/impl/ConsumerServiceImpl.java b/src/main/java/com/xuxd/kafka/console/service/impl/ConsumerServiceImpl.java index 012e514..116c841 100644 --- a/src/main/java/com/xuxd/kafka/console/service/impl/ConsumerServiceImpl.java +++ b/src/main/java/com/xuxd/kafka/console/service/impl/ConsumerServiceImpl.java @@ -98,4 +98,22 @@ public class ConsumerServiceImpl implements ConsumerService { }); return ResponseData.create().data(res).success(); } + + @Override public ResponseData addSubscription(String groupId, String topic) { + // check whether exist subscription relationship. + Collection consumerDetail = consumerConsole.getConsumerDetail(Collections.singleton(groupId)); + if (CollectionUtils.isNotEmpty(consumerDetail)) { + List collect = consumerDetail.stream() + .filter(t -> t.getTopicPartition().topic().equals(topic)).collect(Collectors.toList()); + if (CollectionUtils.isNotEmpty(collect)) { + return ResponseData.create().failed("The subscription exist."); + } + } + + // consumer message and commit offset. + + + // reset consume offset to 0. + return ResponseData.create().success(); + } } diff --git a/src/main/java/com/xuxd/kafka/console/service/impl/TopicServiceImpl.java b/src/main/java/com/xuxd/kafka/console/service/impl/TopicServiceImpl.java index a64cbda..55636cf 100644 --- a/src/main/java/com/xuxd/kafka/console/service/impl/TopicServiceImpl.java +++ b/src/main/java/com/xuxd/kafka/console/service/impl/TopicServiceImpl.java @@ -38,8 +38,8 @@ public class TopicServiceImpl implements TopicService { @Autowired private TopicConsole topicConsole; - @Override public ResponseData getTopicNameList() { - return ResponseData.create().data(topicConsole.getTopicNameList(true)).success(); + @Override public ResponseData getTopicNameList(boolean internal) { + return ResponseData.create().data(topicConsole.getTopicNameList(internal)).success(); } @Override public ResponseData getTopicList(String topic, TopicType type) { diff --git a/src/test/java/com/xuxd/kafka/console/service/impl/TopicServiceImplTest.java b/src/test/java/com/xuxd/kafka/console/service/impl/TopicServiceImplTest.java index e607d3e..c241e0a 100644 --- a/src/test/java/com/xuxd/kafka/console/service/impl/TopicServiceImplTest.java +++ b/src/test/java/com/xuxd/kafka/console/service/impl/TopicServiceImplTest.java @@ -22,7 +22,7 @@ public class TopicServiceImplTest { @Test public void getTopicNameList() { - log.info(topicService.getTopicNameList().getData().toString()); + log.info(topicService.getTopicNameList(true).getData().toString()); } @Test diff --git a/ui/src/utils/api.js b/ui/src/utils/api.js index 5a1ea79..1658070 100644 --- a/ui/src/utils/api.js +++ b/ui/src/utils/api.js @@ -57,6 +57,10 @@ export const KafkaConfigApi = { }; export const KafkaTopicApi = { + getTopicNameList: { + url: "/topic", + method: "get", + }, getTopicList: { url: "/topic/list", method: "get", @@ -96,6 +100,10 @@ export const KafkaConsumerApi = { url: "/consumer/detail", method: "get", }, + addSubscription: { + url: "/consumer/subscription", + method: "post", + }, }; export const KafkaClusterApi = { diff --git a/ui/src/views/group/AddSupscription.vue b/ui/src/views/group/AddSupscription.vue new file mode 100644 index 0000000..03ac791 --- /dev/null +++ b/ui/src/views/group/AddSupscription.vue @@ -0,0 +1,135 @@ + + + + + diff --git a/ui/src/views/group/Group.vue b/ui/src/views/group/Group.vue index 6e2df55..ad5bef9 100644 --- a/ui/src/views/group/Group.vue +++ b/ui/src/views/group/Group.vue @@ -61,7 +61,9 @@
- + 新增订阅
+ + @@ -122,10 +129,11 @@ import { KafkaConsumerApi } from "@/utils/api"; import notification from "ant-design-vue/es/notification"; import Member from "@/views/group/Member"; import ConsumerDetail from "@/views/group/ConsumerDetail"; +import AddSupscription from "@/views/group/AddSupscription"; export default { name: "ConsumerGroup", - components: { Member, ConsumerDetail }, + components: { Member, ConsumerDetail, AddSupscription }, data() { return { queryParam: {}, @@ -145,6 +153,7 @@ export default { loading: false, showConsumerGroupDialog: false, showConsumerDetailDialog: false, + showAddSubscriptionDialog: false, }; }, methods: { @@ -201,6 +210,12 @@ export default { closeConsumerDetailDialog() { this.showConsumerDetailDialog = false; }, + openAddSubscriptionDialog() { + this.showAddSubscriptionDialog = true; + }, + closeAddSubscriptionDialog() { + this.showAddSubscriptionDialog = false; + }, }, created() { this.getConsumerGroupList(); diff --git a/ui/src/views/topic/CreateTopic.vue b/ui/src/views/topic/CreateTopic.vue index f7d154a..d331199 100644 --- a/ui/src/views/topic/CreateTopic.vue +++ b/ui/src/views/topic/CreateTopic.vue @@ -100,15 +100,9 @@ export default { watch: { visible(v) { this.show = v; - if (this.show) { - this.getPartitionInfo(); - } }, }, methods: { - getPartitionInfo() { - this.loading = false; - }, handleSubmit(e) { e.preventDefault(); this.form.validateFields((err, values) => {