在线发送消息支持选择同步、异步发送.
This commit is contained in:
@@ -26,6 +26,11 @@ public class SendMessage {
|
||||
|
||||
private List<Header> headers;
|
||||
|
||||
/**
|
||||
* true: sync send.
|
||||
*/
|
||||
private boolean sync;
|
||||
|
||||
@Data
|
||||
public static class Header{
|
||||
private String headerKey;
|
||||
|
||||
@@ -231,9 +231,16 @@ public class MessageServiceImpl implements MessageService, ApplicationContextAwa
|
||||
@Override public ResponseData sendWithHeader(SendMessage message) {
|
||||
String[] headerKeys= message.getHeaders().stream().map(SendMessage.Header::getHeaderKey).toArray(String[]::new);
|
||||
String[] headerValues= message.getHeaders().stream().map(SendMessage.Header::getHeaderValue).toArray(String[]::new);
|
||||
log.info("send with header:keys{},values{}",headerKeys, headerValues);
|
||||
messageConsole.send(message.getTopic(), message.getPartition(), message.getKey(), message.getBody(), message.getNum(), headerKeys, headerValues);
|
||||
return ResponseData.create().success();
|
||||
// log.info("send with header:keys{},values{}",headerKeys, headerValues);
|
||||
Tuple2<Object, String> tuple2 = messageConsole.send(message.getTopic(),
|
||||
message.getPartition(),
|
||||
message.getKey(),
|
||||
message.getBody(),
|
||||
message.getNum(),
|
||||
headerKeys,
|
||||
headerValues,
|
||||
message.isSync());
|
||||
return (boolean)tuple2._1 ? ResponseData.create().success() : ResponseData.create().failed(tuple2._2);
|
||||
}
|
||||
|
||||
@Override public ResponseData resend(SendMessage message) {
|
||||
|
||||
@@ -64,4 +64,6 @@
|
||||
<appender-ref ref="CONSOLE"/>
|
||||
<appender-ref ref="AsyncFileAppender"/>
|
||||
</logger>
|
||||
|
||||
<logger name="org.apache.kafka" level="warn"/>
|
||||
</configuration>
|
||||
@@ -6,7 +6,7 @@ import com.xuxd.kafka.console.config.{ContextConfigHolder, KafkaConfig}
|
||||
import org.apache.commons.lang3.StringUtils
|
||||
import org.apache.kafka.clients.admin.{DeleteRecordsOptions, RecordsToDelete}
|
||||
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
|
||||
import org.apache.kafka.clients.producer.ProducerRecord
|
||||
import org.apache.kafka.clients.producer.{ProducerRecord, RecordMetadata}
|
||||
import org.apache.kafka.common.TopicPartition
|
||||
import org.apache.kafka.common.header.Header
|
||||
import org.apache.kafka.common.header.internals.RecordHeader
|
||||
@@ -14,7 +14,9 @@ import org.apache.kafka.common.header.internals.RecordHeader
|
||||
import java.time.Duration
|
||||
import java.util
|
||||
import java.util.Properties
|
||||
import java.util.concurrent.Future
|
||||
import scala.collection.immutable
|
||||
import scala.collection.mutable.ArrayBuffer
|
||||
import scala.jdk.CollectionConverters.{CollectionHasAsScala, MapHasAsScala, SeqHasAsJava}
|
||||
|
||||
/**
|
||||
@@ -230,9 +232,17 @@ class MessageConsole(config: KafkaConfig) extends KafkaConsole(config: KafkaConf
|
||||
|
||||
}
|
||||
|
||||
def send(topic: String, partition: Int, key: String, value: String, num: Int, headerKeys: Array[String], headerValues: Array[String]): Unit = {
|
||||
def send(topic: String,
|
||||
partition: Int,
|
||||
key: String,
|
||||
value: String,
|
||||
num: Int,
|
||||
headerKeys: Array[String],
|
||||
headerValues: Array[String],
|
||||
sync: Boolean): (Boolean, String) = {
|
||||
withProducerAndCatchError(producer => {
|
||||
val nullKey = if (key != null && key.trim().isEmpty) null else key
|
||||
val results = ArrayBuffer.empty[Future[RecordMetadata]]
|
||||
for (a <- 1 to num) {
|
||||
val record = if (partition != -1) new ProducerRecord[String, String](topic, partition, nullKey, value)
|
||||
else new ProducerRecord[String, String](topic, nullKey, value)
|
||||
@@ -242,11 +252,18 @@ class MessageConsole(config: KafkaConfig) extends KafkaConsole(config: KafkaConf
|
||||
}
|
||||
headers.foreach(record.headers().add)
|
||||
}
|
||||
producer.send(record)
|
||||
results += producer.send(record)
|
||||
}
|
||||
}, e => log.error("send error.", e))
|
||||
if (sync) {
|
||||
results.foreach(_.get())
|
||||
}
|
||||
(true, "")
|
||||
}, e => {
|
||||
log.error("send error.", e)
|
||||
(false, e.getMessage)
|
||||
})
|
||||
|
||||
}
|
||||
}.asInstanceOf[(Boolean, String)]
|
||||
|
||||
def sendSync(record: ProducerRecord[Array[Byte], Array[Byte]]): (Boolean, String) = {
|
||||
withByteProducerAndCatchError(producer => {
|
||||
|
||||
@@ -1,7 +1,12 @@
|
||||
<template>
|
||||
<div class="content">
|
||||
<a-spin :spinning="loading">
|
||||
<a-form :form="form" @submit="handleSubmit">
|
||||
<a-form
|
||||
:form="form"
|
||||
:label-col="{ span: 5 }"
|
||||
:wrapper-col="{ span: 12 }"
|
||||
@submit="handleSubmit"
|
||||
>
|
||||
<a-form-item label="Topic">
|
||||
<a-select
|
||||
class="topic-select"
|
||||
@@ -69,6 +74,7 @@
|
||||
</a-form-item>
|
||||
<a-form-item label="消息体" has-feedback>
|
||||
<a-textarea
|
||||
:autosize="{ minRows: 5 }"
|
||||
v-decorator="[
|
||||
'body',
|
||||
{
|
||||
@@ -101,6 +107,22 @@
|
||||
:max="32"
|
||||
/>
|
||||
</a-form-item>
|
||||
<a-form-item label="发送类型">
|
||||
<a-radio-group
|
||||
v-decorator="[
|
||||
'sync',
|
||||
{
|
||||
initialValue: 'false',
|
||||
rules: [{ required: true, message: '请选择一个发送类型!' }],
|
||||
},
|
||||
]"
|
||||
>
|
||||
<a-radio value="false"> 异步发送 </a-radio>
|
||||
<a-radio value="true">
|
||||
同步发送(发送失败,会返回错误信息)
|
||||
</a-radio>
|
||||
</a-radio-group>
|
||||
</a-form-item>
|
||||
<a-form-item :wrapper-col="{ span: 12, offset: 5 }">
|
||||
<a-button type="primary" html-type="submit"> 提交 </a-button>
|
||||
</a-form-item>
|
||||
|
||||
Reference in New Issue
Block a user