diff --git a/src/main/java/com/xuxd/kafka/console/beans/SendMessage.java b/src/main/java/com/xuxd/kafka/console/beans/SendMessage.java index b4e25b4..5471bcb 100644 --- a/src/main/java/com/xuxd/kafka/console/beans/SendMessage.java +++ b/src/main/java/com/xuxd/kafka/console/beans/SendMessage.java @@ -1,5 +1,6 @@ package com.xuxd.kafka.console.beans; +import java.util.List; import lombok.Data; /** @@ -22,4 +23,13 @@ public class SendMessage { private int num; private long offset; + + private List
headers; + + @Data + public static class Header{ + private String headerKey; + + private String headerValue; + } } diff --git a/src/main/java/com/xuxd/kafka/console/controller/MessageController.java b/src/main/java/com/xuxd/kafka/console/controller/MessageController.java index 3c927b4..98dd5de 100644 --- a/src/main/java/com/xuxd/kafka/console/controller/MessageController.java +++ b/src/main/java/com/xuxd/kafka/console/controller/MessageController.java @@ -56,6 +56,13 @@ public class MessageController { return messageService.send(message); } + @PostMapping("/sendWithHeader") + @ControllerLog("在线发送消息") + @Permission("message:send") + public Object sendWithHeader(@RequestBody SendMessage message) { + return messageService.sendWithHeader(message); + } + @ControllerLog("重新发送消息") @Permission("message:resend") @PostMapping("/resend") diff --git a/src/main/java/com/xuxd/kafka/console/service/MessageService.java b/src/main/java/com/xuxd/kafka/console/service/MessageService.java index f7ae0bc..192e944 100644 --- a/src/main/java/com/xuxd/kafka/console/service/MessageService.java +++ b/src/main/java/com/xuxd/kafka/console/service/MessageService.java @@ -24,6 +24,8 @@ public interface MessageService { ResponseData send(SendMessage message); + ResponseData sendWithHeader(SendMessage message); + ResponseData resend(SendMessage message); ResponseData delete(List messages); diff --git a/src/main/java/com/xuxd/kafka/console/service/impl/MessageServiceImpl.java b/src/main/java/com/xuxd/kafka/console/service/impl/MessageServiceImpl.java index c8be627..b10f98b 100644 --- a/src/main/java/com/xuxd/kafka/console/service/impl/MessageServiceImpl.java +++ b/src/main/java/com/xuxd/kafka/console/service/impl/MessageServiceImpl.java @@ -228,6 +228,14 @@ public class MessageServiceImpl implements MessageService, ApplicationContextAwa return ResponseData.create().success(); } + @Override public ResponseData sendWithHeader(SendMessage message) { + String[] headerKeys= message.getHeaders().stream().map(SendMessage.Header::getHeaderKey).toArray(String[]::new); + String[] headerValues= message.getHeaders().stream().map(SendMessage.Header::getHeaderValue).toArray(String[]::new); + log.info("send with header:keys{},values{}",headerKeys, headerValues); + messageConsole.send(message.getTopic(), message.getPartition(), message.getKey(), message.getBody(), message.getNum(), headerKeys, headerValues); + return ResponseData.create().success(); + } + @Override public ResponseData resend(SendMessage message) { TopicPartition partition = new TopicPartition(message.getTopic(), message.getPartition()); Map offsetTable = new HashMap<>(1, 1.0f); diff --git a/src/main/scala/kafka/console/MessageConsole.scala b/src/main/scala/kafka/console/MessageConsole.scala index 34f1e5f..bee1eec 100644 --- a/src/main/scala/kafka/console/MessageConsole.scala +++ b/src/main/scala/kafka/console/MessageConsole.scala @@ -8,10 +8,12 @@ import org.apache.kafka.clients.admin.{DeleteRecordsOptions, RecordsToDelete} import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord} import org.apache.kafka.clients.producer.ProducerRecord import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.header.Header +import org.apache.kafka.common.header.internals.RecordHeader import java.time.Duration import java.util -import java.util.{Properties} +import java.util.Properties import scala.collection.immutable import scala.jdk.CollectionConverters.{CollectionHasAsScala, MapHasAsScala, SeqHasAsJava} @@ -218,7 +220,7 @@ class MessageConsole(config: KafkaConfig) extends KafkaConsole(config: KafkaConf def send(topic: String, partition: Int, key: String, value: String, num: Int): Unit = { withProducerAndCatchError(producer => { - val nullKey = if (key != null && key.trim().length() == 0) null else key + val nullKey = if (key != null && key.trim().isEmpty) null else key for (a <- 1 to num) { val record = if (partition != -1) new ProducerRecord[String, String](topic, partition, nullKey, value) else new ProducerRecord[String, String](topic, nullKey, value) @@ -228,6 +230,24 @@ class MessageConsole(config: KafkaConfig) extends KafkaConsole(config: KafkaConf } + def send(topic: String, partition: Int, key: String, value: String, num: Int, headerKeys: Array[String], headerValues: Array[String]): Unit = { + withProducerAndCatchError(producer => { + val nullKey = if (key != null && key.trim().isEmpty) null else key + for (a <- 1 to num) { + val record = if (partition != -1) new ProducerRecord[String, String](topic, partition, nullKey, value) + else new ProducerRecord[String, String](topic, nullKey, value) + if (!headerKeys.isEmpty && headerKeys.length == headerValues.length) { + val headers: Array[Header] = headerKeys.zip(headerValues).map { case (key, value) => + new RecordHeader(key, value.getBytes()) + } + headers.foreach(record.headers().add) + } + producer.send(record) + } + }, e => log.error("send error.", e)) + + } + def sendSync(record: ProducerRecord[Array[Byte], Array[Byte]]): (Boolean, String) = { withByteProducerAndCatchError(producer => { val metadata = producer.send(record).get() diff --git a/ui/src/utils/api.js b/ui/src/utils/api.js index 90649d5..1c56399 100644 --- a/ui/src/utils/api.js +++ b/ui/src/utils/api.js @@ -288,6 +288,10 @@ export const KafkaMessageApi = { url: "/message/send", method: "post", }, + sendWithHeader: { + url: "/message/sendWithHeader", + method: "post", + }, resend: { url: "/message/resend", method: "post", diff --git a/ui/src/views/message/SendMessage.vue b/ui/src/views/message/SendMessage.vue index ef46eff..fa54080 100644 --- a/ui/src/views/message/SendMessage.vue +++ b/ui/src/views/message/SendMessage.vue @@ -15,61 +15,72 @@ }, ]" placeholder="请选择一个topic" - > + > {{ v }} - + 默认 {{ v }} + + + + + + + + + + + + + + + + 添加 +
header keyheader value
删除
+
+ v-decorator="[ + 'body', + { + rules: [ + { + required: true, + message: '输入消息体!', + }, + ], + }, + ]" + placeholder="输入消息体!" /> - + 提交 @@ -97,6 +108,7 @@ export default { loading: false, partitions: [], selectPartition: undefined, + rows: [{ headerKey: '', headerValue: '' }], }; }, methods: { @@ -137,17 +149,26 @@ export default { this.selectPartition = -1; this.getPartitionInfo(topic); }, + addRow() { + if(this.rows.length<32){ + this.rows.push({ HeaderKey: '', HeaderValue: '' }); + } + }, + deleteRow(index) { + this.rows.splice(index, 1); + }, handleSubmit(e) { - e.preventDefault(); + e.preventDefault(); this.form.validateFields((err, values) => { if (!err) { const param = Object.assign({}, values, { partition: this.selectPartition, + headers: this.rows, }); this.loading = true; request({ - url: KafkaMessageApi.send.url, - method: KafkaMessageApi.send.method, + url: KafkaMessageApi.sendWithHeader.url, + method: KafkaMessageApi.sendWithHeader.method, data: param, }).then((res) => { this.loading = false;