This commit is contained in:
许晓东
2023-09-21 16:52:46 +08:00
7 changed files with 114 additions and 42 deletions

View File

@@ -1,5 +1,6 @@
package com.xuxd.kafka.console.beans; package com.xuxd.kafka.console.beans;
import java.util.List;
import lombok.Data; import lombok.Data;
/** /**
@@ -22,4 +23,13 @@ public class SendMessage {
private int num; private int num;
private long offset; private long offset;
private List<Header> headers;
@Data
public static class Header{
private String headerKey;
private String headerValue;
}
} }

View File

@@ -56,6 +56,13 @@ public class MessageController {
return messageService.send(message); return messageService.send(message);
} }
@PostMapping("/sendWithHeader")
@ControllerLog("在线发送消息")
@Permission("message:send")
public Object sendWithHeader(@RequestBody SendMessage message) {
return messageService.sendWithHeader(message);
}
@ControllerLog("重新发送消息") @ControllerLog("重新发送消息")
@Permission("message:resend") @Permission("message:resend")
@PostMapping("/resend") @PostMapping("/resend")

View File

@@ -24,6 +24,8 @@ public interface MessageService {
ResponseData send(SendMessage message); ResponseData send(SendMessage message);
ResponseData sendWithHeader(SendMessage message);
ResponseData resend(SendMessage message); ResponseData resend(SendMessage message);
ResponseData delete(List<QueryMessage> messages); ResponseData delete(List<QueryMessage> messages);

View File

@@ -228,6 +228,14 @@ public class MessageServiceImpl implements MessageService, ApplicationContextAwa
return ResponseData.create().success(); return ResponseData.create().success();
} }
@Override public ResponseData sendWithHeader(SendMessage message) {
String[] headerKeys= message.getHeaders().stream().map(SendMessage.Header::getHeaderKey).toArray(String[]::new);
String[] headerValues= message.getHeaders().stream().map(SendMessage.Header::getHeaderValue).toArray(String[]::new);
log.info("send with header:keys{},values{}",headerKeys, headerValues);
messageConsole.send(message.getTopic(), message.getPartition(), message.getKey(), message.getBody(), message.getNum(), headerKeys, headerValues);
return ResponseData.create().success();
}
@Override public ResponseData resend(SendMessage message) { @Override public ResponseData resend(SendMessage message) {
TopicPartition partition = new TopicPartition(message.getTopic(), message.getPartition()); TopicPartition partition = new TopicPartition(message.getTopic(), message.getPartition());
Map<TopicPartition, Object> offsetTable = new HashMap<>(1, 1.0f); Map<TopicPartition, Object> offsetTable = new HashMap<>(1, 1.0f);

View File

@@ -8,10 +8,12 @@ import org.apache.kafka.clients.admin.{DeleteRecordsOptions, RecordsToDelete}
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord} import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.kafka.clients.producer.ProducerRecord import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.header.Header
import org.apache.kafka.common.header.internals.RecordHeader
import java.time.Duration import java.time.Duration
import java.util import java.util
import java.util.{Properties} import java.util.Properties
import scala.collection.immutable import scala.collection.immutable
import scala.jdk.CollectionConverters.{CollectionHasAsScala, MapHasAsScala, SeqHasAsJava} import scala.jdk.CollectionConverters.{CollectionHasAsScala, MapHasAsScala, SeqHasAsJava}
@@ -218,7 +220,7 @@ class MessageConsole(config: KafkaConfig) extends KafkaConsole(config: KafkaConf
def send(topic: String, partition: Int, key: String, value: String, num: Int): Unit = { def send(topic: String, partition: Int, key: String, value: String, num: Int): Unit = {
withProducerAndCatchError(producer => { withProducerAndCatchError(producer => {
val nullKey = if (key != null && key.trim().length() == 0) null else key val nullKey = if (key != null && key.trim().isEmpty) null else key
for (a <- 1 to num) { for (a <- 1 to num) {
val record = if (partition != -1) new ProducerRecord[String, String](topic, partition, nullKey, value) val record = if (partition != -1) new ProducerRecord[String, String](topic, partition, nullKey, value)
else new ProducerRecord[String, String](topic, nullKey, value) else new ProducerRecord[String, String](topic, nullKey, value)
@@ -228,6 +230,24 @@ class MessageConsole(config: KafkaConfig) extends KafkaConsole(config: KafkaConf
} }
def send(topic: String, partition: Int, key: String, value: String, num: Int, headerKeys: Array[String], headerValues: Array[String]): Unit = {
withProducerAndCatchError(producer => {
val nullKey = if (key != null && key.trim().isEmpty) null else key
for (a <- 1 to num) {
val record = if (partition != -1) new ProducerRecord[String, String](topic, partition, nullKey, value)
else new ProducerRecord[String, String](topic, nullKey, value)
if (!headerKeys.isEmpty && headerKeys.length == headerValues.length) {
val headers: Array[Header] = headerKeys.zip(headerValues).map { case (key, value) =>
new RecordHeader(key, value.getBytes())
}
headers.foreach(record.headers().add)
}
producer.send(record)
}
}, e => log.error("send error.", e))
}
def sendSync(record: ProducerRecord[Array[Byte], Array[Byte]]): (Boolean, String) = { def sendSync(record: ProducerRecord[Array[Byte], Array[Byte]]): (Boolean, String) = {
withByteProducerAndCatchError(producer => { withByteProducerAndCatchError(producer => {
val metadata = producer.send(record).get() val metadata = producer.send(record).get()

View File

@@ -288,6 +288,10 @@ export const KafkaMessageApi = {
url: "/message/send", url: "/message/send",
method: "post", method: "post",
}, },
sendWithHeader: {
url: "/message/sendWithHeader",
method: "post",
},
resend: { resend: {
url: "/message/resend", url: "/message/resend",
method: "post", method: "post",

View File

@@ -15,61 +15,72 @@
}, },
]" ]"
placeholder="请选择一个topic" placeholder="请选择一个topic"
> >
<a-select-option v-for="v in topicList" :key="v" :value="v"> <a-select-option v-for="v in topicList" :key="v" :value="v">
{{ v }} {{ v }}
</a-select-option> </a-select-option>
</a-select> </a-select>
</a-form-item> </a-form-item>
<a-form-item label="分区"> <a-form-item label="分区">
<a-select <a-select class="type-select" show-search option-filter-prop="children" v-model="selectPartition"
class="type-select" placeholder="请选择一个分区">
show-search
option-filter-prop="children"
v-model="selectPartition"
placeholder="请选择一个分区"
>
<a-select-option v-for="v in partitions" :key="v" :value="v"> <a-select-option v-for="v in partitions" :key="v" :value="v">
<span v-if="v == -1">默认</span> <span v-else>{{ v }}</span> <span v-if="v == -1">默认</span> <span v-else>{{ v }}</span>
</a-select-option> </a-select-option>
</a-select> </a-select>
</a-form-item> </a-form-item>
<a-form-item label="消息头">
<table>
<thead>
<tr>
<th>header key</th>
<th>header value</th>
</tr>
</thead>
<tbody>
<tr v-for="(row, index) in rows" :key="index">
<td> <a-input v-model="row.headerKey" /></td>
<td> <a-input v-model="row.headerValue" /></td>
<td><a-button type="primary" @click="deleteRow(index)">删除</a-button></td>
</tr>
</tbody>
<a-button type="primary" @click="addRow">添加</a-button>
</table>
</a-form-item>
<a-form-item label="消息Key"> <a-form-item label="消息Key">
<a-input v-decorator="['key', { initialValue: 'key' }]" /> <a-input v-decorator="['key', { initialValue: 'key' }]" />
</a-form-item> </a-form-item>
<a-form-item label="消息体" has-feedback> <a-form-item label="消息体" has-feedback>
<a-textarea <a-textarea
v-decorator="[ v-decorator="[
'body', 'body',
{ {
rules: [ rules: [
{ {
required: true, required: true,
message: '输入消息体!', message: '输入消息体!',
}, },
], ],
}, },
]" ]"
placeholder="输入消息体!" placeholder="输入消息体!" />
/>
</a-form-item> </a-form-item>
<a-form-item label="发送的消息数"> <a-form-item label="发送的消息数">
<a-input-number <a-input-number v-decorator="[
v-decorator="[ 'num',
'num', {
{ initialValue: 1,
initialValue: 1, rules: [
rules: [ {
{ required: true,
required: true, message: '输入消息数!',
message: '输入消息数!', },
}, ],
], },
}, ]"
]" :min="1"
:min="1" :max="32"
:max="32" />
/>
</a-form-item> </a-form-item>
<a-form-item :wrapper-col="{ span: 12, offset: 5 }"> <a-form-item :wrapper-col="{ span: 12, offset: 5 }">
<a-button type="primary" html-type="submit"> 提交 </a-button> <a-button type="primary" html-type="submit"> 提交 </a-button>
@@ -97,6 +108,7 @@ export default {
loading: false, loading: false,
partitions: [], partitions: [],
selectPartition: undefined, selectPartition: undefined,
rows: [{ headerKey: '', headerValue: '' }],
}; };
}, },
methods: { methods: {
@@ -137,17 +149,26 @@ export default {
this.selectPartition = -1; this.selectPartition = -1;
this.getPartitionInfo(topic); this.getPartitionInfo(topic);
}, },
addRow() {
if(this.rows.length<32){
this.rows.push({ HeaderKey: '', HeaderValue: '' });
}
},
deleteRow(index) {
this.rows.splice(index, 1);
},
handleSubmit(e) { handleSubmit(e) {
e.preventDefault(); e.preventDefault();
this.form.validateFields((err, values) => { this.form.validateFields((err, values) => {
if (!err) { if (!err) {
const param = Object.assign({}, values, { const param = Object.assign({}, values, {
partition: this.selectPartition, partition: this.selectPartition,
headers: this.rows,
}); });
this.loading = true; this.loading = true;
request({ request({
url: KafkaMessageApi.send.url, url: KafkaMessageApi.sendWithHeader.url,
method: KafkaMessageApi.send.method, method: KafkaMessageApi.sendWithHeader.method,
data: param, data: param,
}).then((res) => { }).then((res) => {
this.loading = false; this.loading = false;