diff --git a/src/main/java/com/xuxd/kafka/console/beans/SendMessage.java b/src/main/java/com/xuxd/kafka/console/beans/SendMessage.java index 5471bcb..a56eaf6 100644 --- a/src/main/java/com/xuxd/kafka/console/beans/SendMessage.java +++ b/src/main/java/com/xuxd/kafka/console/beans/SendMessage.java @@ -26,6 +26,11 @@ public class SendMessage { private List
headers; + /** + * true: sync send. + */ + private boolean sync; + @Data public static class Header{ private String headerKey; diff --git a/src/main/java/com/xuxd/kafka/console/service/impl/MessageServiceImpl.java b/src/main/java/com/xuxd/kafka/console/service/impl/MessageServiceImpl.java index b10f98b..2d01282 100644 --- a/src/main/java/com/xuxd/kafka/console/service/impl/MessageServiceImpl.java +++ b/src/main/java/com/xuxd/kafka/console/service/impl/MessageServiceImpl.java @@ -231,9 +231,16 @@ public class MessageServiceImpl implements MessageService, ApplicationContextAwa @Override public ResponseData sendWithHeader(SendMessage message) { String[] headerKeys= message.getHeaders().stream().map(SendMessage.Header::getHeaderKey).toArray(String[]::new); String[] headerValues= message.getHeaders().stream().map(SendMessage.Header::getHeaderValue).toArray(String[]::new); - log.info("send with header:keys{},values{}",headerKeys, headerValues); - messageConsole.send(message.getTopic(), message.getPartition(), message.getKey(), message.getBody(), message.getNum(), headerKeys, headerValues); - return ResponseData.create().success(); +// log.info("send with header:keys{},values{}",headerKeys, headerValues); + Tuple2 tuple2 = messageConsole.send(message.getTopic(), + message.getPartition(), + message.getKey(), + message.getBody(), + message.getNum(), + headerKeys, + headerValues, + message.isSync()); + return (boolean)tuple2._1 ? ResponseData.create().success() : ResponseData.create().failed(tuple2._2); } @Override public ResponseData resend(SendMessage message) { diff --git a/src/main/resources/logback-spring.xml b/src/main/resources/logback-spring.xml index 73490f5..46a675a 100644 --- a/src/main/resources/logback-spring.xml +++ b/src/main/resources/logback-spring.xml @@ -64,4 +64,6 @@ + + \ No newline at end of file diff --git a/src/main/scala/kafka/console/MessageConsole.scala b/src/main/scala/kafka/console/MessageConsole.scala index bee1eec..bbc36bb 100644 --- a/src/main/scala/kafka/console/MessageConsole.scala +++ b/src/main/scala/kafka/console/MessageConsole.scala @@ -6,7 +6,7 @@ import com.xuxd.kafka.console.config.{ContextConfigHolder, KafkaConfig} import org.apache.commons.lang3.StringUtils import org.apache.kafka.clients.admin.{DeleteRecordsOptions, RecordsToDelete} import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord} -import org.apache.kafka.clients.producer.ProducerRecord +import org.apache.kafka.clients.producer.{ProducerRecord, RecordMetadata} import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.header.Header import org.apache.kafka.common.header.internals.RecordHeader @@ -14,7 +14,9 @@ import org.apache.kafka.common.header.internals.RecordHeader import java.time.Duration import java.util import java.util.Properties +import java.util.concurrent.Future import scala.collection.immutable +import scala.collection.mutable.ArrayBuffer import scala.jdk.CollectionConverters.{CollectionHasAsScala, MapHasAsScala, SeqHasAsJava} /** @@ -230,9 +232,17 @@ class MessageConsole(config: KafkaConfig) extends KafkaConsole(config: KafkaConf } - def send(topic: String, partition: Int, key: String, value: String, num: Int, headerKeys: Array[String], headerValues: Array[String]): Unit = { + def send(topic: String, + partition: Int, + key: String, + value: String, + num: Int, + headerKeys: Array[String], + headerValues: Array[String], + sync: Boolean): (Boolean, String) = { withProducerAndCatchError(producer => { val nullKey = if (key != null && key.trim().isEmpty) null else key + val results = ArrayBuffer.empty[Future[RecordMetadata]] for (a <- 1 to num) { val record = if (partition != -1) new ProducerRecord[String, String](topic, partition, nullKey, value) else new ProducerRecord[String, String](topic, nullKey, value) @@ -242,11 +252,18 @@ class MessageConsole(config: KafkaConfig) extends KafkaConsole(config: KafkaConf } headers.foreach(record.headers().add) } - producer.send(record) + results += producer.send(record) } - }, e => log.error("send error.", e)) + if (sync) { + results.foreach(_.get()) + } + (true, "") + }, e => { + log.error("send error.", e) + (false, e.getMessage) + }) - } + }.asInstanceOf[(Boolean, String)] def sendSync(record: ProducerRecord[Array[Byte], Array[Byte]]): (Boolean, String) = { withByteProducerAndCatchError(producer => { diff --git a/ui/src/views/message/SendMessage.vue b/ui/src/views/message/SendMessage.vue index 219bb6a..d59723f 100644 --- a/ui/src/views/message/SendMessage.vue +++ b/ui/src/views/message/SendMessage.vue @@ -1,7 +1,12 @@