新增topic

This commit is contained in:
许晓东
2021-10-13 17:15:14 +08:00
parent fc73182740
commit b642647b2e
8 changed files with 244 additions and 3 deletions

View File

@@ -0,0 +1,30 @@
package com.xuxd.kafka.console.beans.dto;
import java.util.HashMap;
import java.util.Map;
import lombok.Data;
import org.apache.commons.collections.MapUtils;
import org.apache.kafka.clients.admin.NewTopic;
/**
* kafka-console-ui.
*
* @author xuxd
* @date 2021-10-13 14:58:11
**/
@Data
public class NewTopicDTO {
private String name;
private Integer numPartitions;
private Short replicationFactor;
private Map<String, String> configs = new HashMap<>();
public NewTopic toNewTopic() {
NewTopic topic = new NewTopic(name, numPartitions, replicationFactor);
if (MapUtils.isNotEmpty(configs)) {
topic.configs(configs);
}
return topic;
}
}

View File

@@ -1,10 +1,13 @@
package com.xuxd.kafka.console.controller;
import com.xuxd.kafka.console.beans.dto.NewTopicDTO;
import com.xuxd.kafka.console.beans.enums.TopicType;
import com.xuxd.kafka.console.service.TopicService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.DeleteMapping;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
@@ -36,4 +39,9 @@ public class TopicController {
public Object getTopicPartitionInfo(@RequestParam String topic) {
return topicService.getTopicPartitionInfo(topic);
}
@PostMapping("/new")
public Object createNewTopic(@RequestBody NewTopicDTO topicDTO) {
return topicService.createTopic(topicDTO.toNewTopic());
}
}

View File

@@ -2,6 +2,7 @@ package com.xuxd.kafka.console.service;
import com.xuxd.kafka.console.beans.ResponseData;
import com.xuxd.kafka.console.beans.enums.TopicType;
import org.apache.kafka.clients.admin.NewTopic;
/**
* kafka-console-ui.
@@ -18,4 +19,6 @@ public interface TopicService {
ResponseData deleteTopic(String topic);
ResponseData getTopicPartitionInfo(String topic);
ResponseData createTopic(NewTopic topic);
}

View File

@@ -16,6 +16,7 @@ import java.util.stream.Collectors;
import kafka.console.TopicConsole;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.TopicPartitionInfo;
@@ -106,4 +107,9 @@ public class TopicServiceImpl implements TopicService {
}
return ResponseData.create().data(voList).success();
}
@Override public ResponseData createTopic(NewTopic topic) {
Tuple2<Object, String> createResult = topicConsole.createTopic(topic);
return (boolean) createResult._1 ? ResponseData.create().success() : ResponseData.create().failed(String.valueOf(createResult._2));
}
}

View File

@@ -5,7 +5,7 @@ import java.util.concurrent.TimeUnit
import java.util.{Collections, List, Set}
import com.xuxd.kafka.console.config.KafkaConfig
import org.apache.kafka.clients.admin.{DeleteTopicsOptions, ListTopicsOptions, TopicDescription}
import org.apache.kafka.clients.admin.{CreateTopicsOptions, DeleteTopicsOptions, ListTopicsOptions, NewTopic, TopicDescription}
import org.apache.kafka.common.TopicPartition
import scala.jdk.CollectionConverters.{CollectionHasAsScala, SetHasAsJava}
@@ -93,4 +93,18 @@ class TopicConsole(config: KafkaConfig) extends KafkaConsole(config: KafkaConfig
(Collections.emptyMap(), Collections.emptyMap())
}).asInstanceOf[(util.Map[TopicPartition, Long], util.Map[TopicPartition, Long])]
}
/**
* create topic.
*/
def createTopic(topic: NewTopic): (Boolean, String) = {
withAdminClientAndCatchError(admin => {
val createResult = admin.createTopics(Collections.singleton(topic), new CreateTopicsOptions().retryOnQuotaViolation(false))
createResult.all().get(timeoutMs, TimeUnit.MILLISECONDS)
(true, "")
}, e => {
log.error("create topic error, topic: " + topic.name(), e)
(false, e.getMessage)
}).asInstanceOf[(Boolean, String)]
}
}

View File

@@ -69,6 +69,10 @@ export const KafkaTopicApi = {
url: "/topic/partition",
method: "get",
},
creatTopic: {
url: "/topic/new",
method: "post",
},
};
export const KafkaConsumerApi = {

View File

@@ -0,0 +1,158 @@
<template>
<a-modal
title="新增Topic"
:visible="show"
:width="800"
:mask="false"
:destroyOnClose="true"
:footer="null"
:maskClosable="false"
@cancel="handleCancel"
>
<div>
<a-spin :spinning="loading">
<a-form
:form="form"
:label-col="{ span: 5 }"
:wrapper-col="{ span: 12 }"
@submit="handleSubmit"
>
<a-form-item label="Topic名称">
<a-input
v-decorator="[
'name',
{ rules: [{ required: true, message: '输入topic名称!' }] },
]"
placeholder="topic"
/>
</a-form-item>
<a-form-item label="分区">
<a-input-number
:min="1"
:max="128"
v-decorator="[
'numPartitions',
{
initialValue: 1,
rules: [{ required: true, message: '输入分区数!' }],
},
]"
/>
<span class="ant-form-text"> 个分区 </span>
</a-form-item>
<a-form-item label="副本">
<a-input-number
:min="1"
:max="32"
v-decorator="[
'replicationFactor',
{
initialValue: 1,
rules: [{ required: true, message: '输入副本数!' }],
},
]"
/>
<span class="ant-form-text"> 个副本 </span>
</a-form-item>
<a-form-item label="属性">
<a-textarea
rows="5"
placeholder="格式示例如下:
max.message.bytes=1024
retention.bytes=1024
retention.ms=3600000"
v-decorator="['configs']"
/>
</a-form-item>
<a-form-item :wrapper-col="{ span: 12, offset: 5 }">
<a-button type="primary" html-type="submit"> 提交 </a-button>
</a-form-item>
</a-form>
</a-spin>
</div>
</a-modal>
</template>
<script>
import request from "@/utils/request";
import { KafkaTopicApi } from "@/utils/api";
import notification from "ant-design-vue/es/notification";
export default {
name: "CreateTopic",
props: {
topic: {
type: String,
default: "",
},
visible: {
type: Boolean,
default: false,
},
},
data() {
return {
show: this.visible,
data: [],
loading: false,
form: this.$form.createForm(this, { name: "coordinated" }),
};
},
watch: {
visible(v) {
this.show = v;
if (this.show) {
this.getPartitionInfo();
}
},
},
methods: {
getPartitionInfo() {
this.loading = false;
},
handleSubmit(e) {
e.preventDefault();
this.form.validateFields((err, values) => {
if (!err) {
if (values.configs) {
const config = {};
values.configs.split("\n").forEach((e) => {
const c = e.split("=");
if (c.length > 1) {
let k = c[0].trim(),
v = c[1].trim();
if (k && v) {
config[k] = v;
}
}
});
values.configs = config;
}
this.loading = true;
request({
url: KafkaTopicApi.creatTopic.url,
method: KafkaTopicApi.creatTopic.method,
data: values,
}).then((res) => {
this.loading = false;
if (res.code == 0) {
this.$message.success(res.msg);
this.$emit("closeCreateTopicDialog", { refresh: true });
} else {
notification.error({
message: "error",
description: res.msg,
});
}
});
}
});
},
handleCancel() {
this.data = [];
this.$emit("closeCreateTopicDialog", { refresh: false });
},
},
};
</script>
<style scoped></style>

View File

@@ -44,7 +44,9 @@
</a-form>
</div>
<div class="operation-row-button">
<a-button type="primary" @click="handleReset">新增/更新</a-button>
<a-button type="primary" @click="openCreateTopicDialog"
>新增</a-button
>
</div>
<a-table :columns="columns" :data-source="data" bordered row-key="name">
<div slot="partitions" slot-scope="text, record">
@@ -75,6 +77,11 @@
:visible="showPartitionInfo"
@closePartitionInfoDialog="closePartitionInfoDialog"
></PartitionInfo>
<CreateTopic
:visible="showCreateTopic"
@closeCreateTopicDialog="closeCreateTopicDialog"
>
</CreateTopic>
</div>
</a-spin>
</div>
@@ -85,10 +92,11 @@ import request from "@/utils/request";
import { KafkaTopicApi } from "@/utils/api";
import notification from "ant-design-vue/es/notification";
import PartitionInfo from "@/views/topic/PartitionInfo";
import CreateTopic from "@/views/topic/CreateTopic";
export default {
name: "Topic",
components: { PartitionInfo },
components: { PartitionInfo, CreateTopic },
data() {
return {
queryParam: { type: "normal" },
@@ -105,6 +113,7 @@ export default {
},
showPartitionInfo: false,
loading: false,
showCreateTopic: false,
};
},
methods: {
@@ -152,6 +161,15 @@ export default {
closePartitionInfoDialog() {
this.showPartitionInfo = false;
},
openCreateTopicDialog() {
this.showCreateTopic = true;
},
closeCreateTopicDialog(res) {
this.showCreateTopic = false;
if (res.refresh) {
this.getTopicList();
}
},
},
created() {
this.getTopicList();