send message with header
This commit is contained in:
@@ -1,5 +1,6 @@
|
||||
package com.xuxd.kafka.console.beans;
|
||||
|
||||
import java.util.List;
|
||||
import lombok.Data;
|
||||
|
||||
/**
|
||||
@@ -22,4 +23,13 @@ public class SendMessage {
|
||||
private int num;
|
||||
|
||||
private long offset;
|
||||
|
||||
private List<Header> headers;
|
||||
|
||||
@Data
|
||||
public static class Header{
|
||||
private String headerKey;
|
||||
|
||||
private String headerValue;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -56,6 +56,13 @@ public class MessageController {
|
||||
return messageService.send(message);
|
||||
}
|
||||
|
||||
@PostMapping("/sendWithHeader")
|
||||
@ControllerLog("在线发送消息")
|
||||
@Permission("message:send")
|
||||
public Object sendWithHeader(@RequestBody SendMessage message) {
|
||||
return messageService.sendWithHeader(message);
|
||||
}
|
||||
|
||||
@ControllerLog("重新发送消息")
|
||||
@Permission("message:resend")
|
||||
@PostMapping("/resend")
|
||||
|
||||
@@ -24,6 +24,8 @@ public interface MessageService {
|
||||
|
||||
ResponseData send(SendMessage message);
|
||||
|
||||
ResponseData sendWithHeader(SendMessage message);
|
||||
|
||||
ResponseData resend(SendMessage message);
|
||||
|
||||
ResponseData delete(List<QueryMessage> messages);
|
||||
|
||||
@@ -228,6 +228,14 @@ public class MessageServiceImpl implements MessageService, ApplicationContextAwa
|
||||
return ResponseData.create().success();
|
||||
}
|
||||
|
||||
@Override public ResponseData sendWithHeader(SendMessage message) {
|
||||
String[] headerKeys= message.getHeaders().stream().map(SendMessage.Header::getHeaderKey).toArray(String[]::new);
|
||||
String[] headerValues= message.getHeaders().stream().map(SendMessage.Header::getHeaderValue).toArray(String[]::new);
|
||||
log.info("send with header:keys{},values{}",headerKeys, headerValues);
|
||||
messageConsole.send(message.getTopic(), message.getPartition(), message.getKey(), message.getBody(), message.getNum(), headerKeys, headerValues);
|
||||
return ResponseData.create().success();
|
||||
}
|
||||
|
||||
@Override public ResponseData resend(SendMessage message) {
|
||||
TopicPartition partition = new TopicPartition(message.getTopic(), message.getPartition());
|
||||
Map<TopicPartition, Object> offsetTable = new HashMap<>(1, 1.0f);
|
||||
|
||||
@@ -8,10 +8,12 @@ import org.apache.kafka.clients.admin.{DeleteRecordsOptions, RecordsToDelete}
|
||||
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
|
||||
import org.apache.kafka.clients.producer.ProducerRecord
|
||||
import org.apache.kafka.common.TopicPartition
|
||||
import org.apache.kafka.common.header.Header
|
||||
import org.apache.kafka.common.header.internals.RecordHeader
|
||||
|
||||
import java.time.Duration
|
||||
import java.util
|
||||
import java.util.{Properties}
|
||||
import java.util.Properties
|
||||
import scala.collection.immutable
|
||||
import scala.jdk.CollectionConverters.{CollectionHasAsScala, MapHasAsScala, SeqHasAsJava}
|
||||
|
||||
@@ -218,7 +220,7 @@ class MessageConsole(config: KafkaConfig) extends KafkaConsole(config: KafkaConf
|
||||
|
||||
def send(topic: String, partition: Int, key: String, value: String, num: Int): Unit = {
|
||||
withProducerAndCatchError(producer => {
|
||||
val nullKey = if (key != null && key.trim().length() == 0) null else key
|
||||
val nullKey = if (key != null && key.trim().isEmpty) null else key
|
||||
for (a <- 1 to num) {
|
||||
val record = if (partition != -1) new ProducerRecord[String, String](topic, partition, nullKey, value)
|
||||
else new ProducerRecord[String, String](topic, nullKey, value)
|
||||
@@ -228,6 +230,24 @@ class MessageConsole(config: KafkaConfig) extends KafkaConsole(config: KafkaConf
|
||||
|
||||
}
|
||||
|
||||
def send(topic: String, partition: Int, key: String, value: String, num: Int, headerKeys: Array[String], headerValues: Array[String]): Unit = {
|
||||
withProducerAndCatchError(producer => {
|
||||
val nullKey = if (key != null && key.trim().isEmpty) null else key
|
||||
for (a <- 1 to num) {
|
||||
val record = if (partition != -1) new ProducerRecord[String, String](topic, partition, nullKey, value)
|
||||
else new ProducerRecord[String, String](topic, nullKey, value)
|
||||
if (!headerKeys.isEmpty && headerKeys.length == headerValues.length) {
|
||||
val headers: Array[Header] = headerKeys.zip(headerValues).map { case (key, value) =>
|
||||
new RecordHeader(key, value.getBytes())
|
||||
}
|
||||
headers.foreach(record.headers().add)
|
||||
}
|
||||
producer.send(record)
|
||||
}
|
||||
}, e => log.error("send error.", e))
|
||||
|
||||
}
|
||||
|
||||
def sendSync(record: ProducerRecord[Array[Byte], Array[Byte]]): (Boolean, String) = {
|
||||
withByteProducerAndCatchError(producer => {
|
||||
val metadata = producer.send(record).get()
|
||||
|
||||
Reference in New Issue
Block a user