新建订阅

This commit is contained in:
许晓东
2021-10-21 17:00:47 +08:00
parent 06351558b5
commit 00eb51eb2e
12 changed files with 212 additions and 12 deletions

View File

@@ -20,4 +20,6 @@ public interface ConsumerService {
ResponseData getConsumerMembers(String groupId);
ResponseData getConsumerDetail(String groupId);
ResponseData addSubscription(String groupId, String topic);
}

View File

@@ -15,7 +15,7 @@ import org.apache.kafka.clients.admin.NewTopic;
**/
public interface TopicService {
ResponseData getTopicNameList();
ResponseData getTopicNameList(boolean internal);
ResponseData getTopicList(String topic, TopicType type);

View File

@@ -98,4 +98,22 @@ public class ConsumerServiceImpl implements ConsumerService {
});
return ResponseData.create().data(res).success();
}
@Override public ResponseData addSubscription(String groupId, String topic) {
// check whether exist subscription relationship.
Collection<ConsumerConsole.TopicPartitionConsumeInfo> consumerDetail = consumerConsole.getConsumerDetail(Collections.singleton(groupId));
if (CollectionUtils.isNotEmpty(consumerDetail)) {
List<ConsumerConsole.TopicPartitionConsumeInfo> collect = consumerDetail.stream()
.filter(t -> t.getTopicPartition().topic().equals(topic)).collect(Collectors.toList());
if (CollectionUtils.isNotEmpty(collect)) {
return ResponseData.create().failed("The subscription exist.");
}
}
// consumer message and commit offset.
// reset consume offset to 0.
return ResponseData.create().success();
}
}

View File

@@ -38,8 +38,8 @@ public class TopicServiceImpl implements TopicService {
@Autowired
private TopicConsole topicConsole;
@Override public ResponseData getTopicNameList() {
return ResponseData.create().data(topicConsole.getTopicNameList(true)).success();
@Override public ResponseData getTopicNameList(boolean internal) {
return ResponseData.create().data(topicConsole.getTopicNameList(internal)).success();
}
@Override public ResponseData getTopicList(String topic, TopicType type) {