diff --git a/src/main/java/com/xuxd/kafka/console/beans/SendMessage.java b/src/main/java/com/xuxd/kafka/console/beans/SendMessage.java new file mode 100644 index 0000000..7dbe6ce --- /dev/null +++ b/src/main/java/com/xuxd/kafka/console/beans/SendMessage.java @@ -0,0 +1,23 @@ +package com.xuxd.kafka.console.beans; + +import lombok.Data; + +/** + * kafka-console-ui. + * + * @author xuxd + * @date 2021-12-19 23:28:31 + **/ +@Data +public class SendMessage { + + private String topic; + + private int partition; + + private String key; + + private String body; + + private int num; +} diff --git a/src/main/java/com/xuxd/kafka/console/controller/MessageController.java b/src/main/java/com/xuxd/kafka/console/controller/MessageController.java index 9fae9c6..022c3bf 100644 --- a/src/main/java/com/xuxd/kafka/console/controller/MessageController.java +++ b/src/main/java/com/xuxd/kafka/console/controller/MessageController.java @@ -1,5 +1,6 @@ package com.xuxd.kafka.console.controller; +import com.xuxd.kafka.console.beans.SendMessage; import com.xuxd.kafka.console.beans.dto.QueryMessageDTO; import com.xuxd.kafka.console.service.MessageService; import org.springframework.beans.factory.annotation.Autowired; @@ -41,4 +42,9 @@ public class MessageController { public Object deserializerList() { return messageService.deserializerList(); } + + @PostMapping("/send") + public Object send(@RequestBody SendMessage message) { + return messageService.send(message); + } } diff --git a/src/main/java/com/xuxd/kafka/console/service/MessageService.java b/src/main/java/com/xuxd/kafka/console/service/MessageService.java index e5eebfc..7c37176 100644 --- a/src/main/java/com/xuxd/kafka/console/service/MessageService.java +++ b/src/main/java/com/xuxd/kafka/console/service/MessageService.java @@ -2,6 +2,7 @@ package com.xuxd.kafka.console.service; import com.xuxd.kafka.console.beans.QueryMessage; import com.xuxd.kafka.console.beans.ResponseData; +import com.xuxd.kafka.console.beans.SendMessage; /** * kafka-console-ui. @@ -18,4 +19,6 @@ public interface MessageService { ResponseData searchDetail(QueryMessage queryMessage); ResponseData deserializerList(); + + ResponseData send(SendMessage message); } diff --git a/src/main/java/com/xuxd/kafka/console/service/impl/MessageServiceImpl.java b/src/main/java/com/xuxd/kafka/console/service/impl/MessageServiceImpl.java index 1240e29..02fe815 100644 --- a/src/main/java/com/xuxd/kafka/console/service/impl/MessageServiceImpl.java +++ b/src/main/java/com/xuxd/kafka/console/service/impl/MessageServiceImpl.java @@ -2,6 +2,7 @@ package com.xuxd.kafka.console.service.impl; import com.xuxd.kafka.console.beans.QueryMessage; import com.xuxd.kafka.console.beans.ResponseData; +import com.xuxd.kafka.console.beans.SendMessage; import com.xuxd.kafka.console.beans.vo.ConsumerRecordVO; import com.xuxd.kafka.console.beans.vo.MessageDetailVO; import com.xuxd.kafka.console.service.ConsumerService; @@ -24,6 +25,8 @@ import org.apache.commons.lang3.StringUtils; import org.apache.kafka.clients.admin.TopicDescription; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.serialization.ByteArrayDeserializer; +import org.apache.kafka.common.serialization.BytesDeserializer; import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.DoubleDeserializer; import org.apache.kafka.common.serialization.FloatDeserializer; @@ -59,10 +62,12 @@ public class MessageServiceImpl implements MessageService, ApplicationContextAwa private Map deserializerDict = new HashMap<>(); { + deserializerDict.put("ByteArray", new ByteArrayDeserializer()); deserializerDict.put("Integer", new IntegerDeserializer()); deserializerDict.put("String", new StringDeserializer()); deserializerDict.put("Float", new FloatDeserializer()); deserializerDict.put("Double", new DoubleDeserializer()); + deserializerDict.put("Byte", new BytesDeserializer()); } public static String defaultDeserializer = "String"; @@ -152,6 +157,11 @@ public class MessageServiceImpl implements MessageService, ApplicationContextAwa return ResponseData.create().data(deserializerDict.keySet()).success(); } + @Override public ResponseData send(SendMessage message) { + messageConsole.send(message.getTopic(), message.getPartition(), message.getKey(), message.getBody(), message.getNum()); + return ResponseData.create().success(); + } + private Map> searchRecordByOffset(QueryMessage queryMessage) { Set partitions = getPartitions(queryMessage); diff --git a/src/main/scala/kafka/console/KafkaConsole.scala b/src/main/scala/kafka/console/KafkaConsole.scala index 25e04f0..584d21f 100644 --- a/src/main/scala/kafka/console/KafkaConsole.scala +++ b/src/main/scala/kafka/console/KafkaConsole.scala @@ -5,10 +5,11 @@ import kafka.zk.{AdminZkClient, KafkaZkClient} import org.apache.kafka.clients.CommonClientConfigs import org.apache.kafka.clients.admin._ import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer, OffsetAndMetadata} +import org.apache.kafka.clients.producer.KafkaProducer import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.config.SaslConfigs import org.apache.kafka.common.requests.ListOffsetsResponse -import org.apache.kafka.common.serialization.ByteArrayDeserializer +import org.apache.kafka.common.serialization.{ByteArrayDeserializer, StringSerializer} import org.apache.kafka.common.utils.Time import org.slf4j.{Logger, LoggerFactory} @@ -60,6 +61,22 @@ class KafkaConsole(config: KafkaConfig) { } } + protected def withProducerAndCatchError(f: KafkaProducer[String, String] => Any, eh: Exception => Any, + extra: Properties = new Properties()): Any = { + val props = getProps() + props.putAll(extra) + props.put(ConsumerConfig.CLIENT_ID_CONFIG, String.valueOf(System.currentTimeMillis())) + val producer = new KafkaProducer[String, String](props, new StringSerializer, new StringSerializer) + try { + f(producer) + } catch { + case er: Exception => eh(er) + } + finally { + producer.close() + } + } + protected def withZKClient(f: AdminZkClient => Any): Any = { val zkClient = KafkaZkClient(config.getZookeeperAddr, false, 30000, 30000, Int.MaxValue, Time.SYSTEM) val adminZkClient = new AdminZkClient(zkClient) diff --git a/src/main/scala/kafka/console/MessageConsole.scala b/src/main/scala/kafka/console/MessageConsole.scala index 0679a8a..a6b57e8 100644 --- a/src/main/scala/kafka/console/MessageConsole.scala +++ b/src/main/scala/kafka/console/MessageConsole.scala @@ -2,6 +2,7 @@ package kafka.console import com.xuxd.kafka.console.config.KafkaConfig import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord} +import org.apache.kafka.clients.producer.ProducerRecord import org.apache.kafka.common.TopicPartition import java.time.Duration @@ -170,4 +171,15 @@ class MessageConsole(config: KafkaConfig) extends KafkaConsole(config: KafkaConf }) res } + + def send(topic: String, partition: Int, key: String, value: String, num: Int): Unit = { + withProducerAndCatchError(producer => { + val nullKey = if (key != null && key.trim().length() == 0) null else key + for (a <- 1 to num) { + val record = if (partition != -1) new ProducerRecord[String, String](topic, partition, nullKey, value) else new ProducerRecord[String, String](topic, nullKey, value) + producer.send(record) + } + }, e => log.error("send error.", e)) + + } } diff --git a/ui/src/utils/api.js b/ui/src/utils/api.js index c9fae3e..2e40dda 100644 --- a/ui/src/utils/api.js +++ b/ui/src/utils/api.js @@ -240,4 +240,8 @@ export const KafkaMessageApi = { url: "/message/deserializer/list", method: "get", }, + send: { + url: "/message/send", + method: "post", + }, }; diff --git a/ui/src/views/message/SendMessage.vue b/ui/src/views/message/SendMessage.vue index e4314b4..ef46eff 100644 --- a/ui/src/views/message/SendMessage.vue +++ b/ui/src/views/message/SendMessage.vue @@ -1,7 +1,7 @@