消息详情支持重新发送

This commit is contained in:
许晓东
2021-12-21 14:08:36 +08:00
parent 98f33bb2cc
commit 5930e44fdf
9 changed files with 94 additions and 3 deletions

View File

@@ -20,4 +20,6 @@ public class SendMessage {
private String body;
private int num;
private long offset;
}

View File

@@ -47,4 +47,9 @@ public class MessageController {
public Object send(@RequestBody SendMessage message) {
return messageService.send(message);
}
@PostMapping("/resend")
public Object resend(@RequestBody SendMessage message) {
return messageService.resend(message);
}
}

View File

@@ -21,4 +21,6 @@ public interface MessageService {
ResponseData deserializerList();
ResponseData send(SendMessage message);
ResponseData resend(SendMessage message);
}

View File

@@ -24,6 +24,7 @@ import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.BytesDeserializer;
@@ -37,6 +38,7 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Service;
import scala.Tuple2;
/**
* kafka-console-ui.
@@ -142,7 +144,7 @@ public class MessageServiceImpl implements MessageService, ApplicationContextAwa
if (consumerInfo.topicPartition().equals(new TopicPartition(record.topic(), record.partition()))) {
MessageDetailVO.ConsumerVO consumerVO = new MessageDetailVO.ConsumerVO();
consumerVO.setGroupId(consumerInfo.getGroupId());
consumerVO.setStatus(consumerInfo.getConsumerOffset() < record.offset() ? "unconsume" : "consumed");
consumerVO.setStatus(consumerInfo.getConsumerOffset() <= record.offset() ? "unconsume" : "consumed");
consumerVOS.add(consumerVO);
}
});
@@ -162,6 +164,21 @@ public class MessageServiceImpl implements MessageService, ApplicationContextAwa
return ResponseData.create().success();
}
@Override public ResponseData resend(SendMessage message) {
TopicPartition partition = new TopicPartition(message.getTopic(), message.getPartition());
Map<TopicPartition, Object> offsetTable = new HashMap<>(1, 1.0f);
offsetTable.put(partition, message.getOffset());
Map<TopicPartition, ConsumerRecord<byte[], byte[]>> recordMap = messageConsole.searchBy(offsetTable);
if (recordMap.isEmpty()) {
return ResponseData.create().failed("Get message failed.");
}
ConsumerRecord<byte[], byte[]> record = recordMap.get(partition);
ProducerRecord<byte[], byte[]> producerRecord = new ProducerRecord<>(record.topic(), record.partition(), record.key(), record.value(), record.headers());
Tuple2<Object, String> tuple2 = messageConsole.sendSync(producerRecord);
boolean success = (boolean) tuple2._1();
return success ? ResponseData.create().success("success: " + tuple2._2()) : ResponseData.create().failed(tuple2._2());
}
private Map<TopicPartition, ConsumerRecord<byte[], byte[]>> searchRecordByOffset(QueryMessage queryMessage) {
Set<TopicPartition> partitions = getPartitions(queryMessage);

View File

@@ -9,7 +9,7 @@ import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.config.SaslConfigs
import org.apache.kafka.common.requests.ListOffsetsResponse
import org.apache.kafka.common.serialization.{ByteArrayDeserializer, StringSerializer}
import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySerializer, StringSerializer}
import org.apache.kafka.common.utils.Time
import org.slf4j.{Logger, LoggerFactory}
@@ -77,6 +77,22 @@ class KafkaConsole(config: KafkaConfig) {
}
}
protected def withByteProducerAndCatchError(f: KafkaProducer[Array[Byte], Array[Byte]] => Any, eh: Exception => Any,
extra: Properties = new Properties()): Any = {
val props = getProps()
props.putAll(extra)
props.put(ConsumerConfig.CLIENT_ID_CONFIG, String.valueOf(System.currentTimeMillis()))
val producer = new KafkaProducer[Array[Byte], Array[Byte]](props, new ByteArraySerializer, new ByteArraySerializer)
try {
f(producer)
} catch {
case er: Exception => eh(er)
}
finally {
producer.close()
}
}
protected def withZKClient(f: AdminZkClient => Any): Any = {
val zkClient = KafkaZkClient(config.getZookeeperAddr, false, 30000, 30000, Int.MaxValue, Time.SYSTEM)
val adminZkClient = new AdminZkClient(zkClient)

View File

@@ -176,10 +176,21 @@ class MessageConsole(config: KafkaConfig) extends KafkaConsole(config: KafkaConf
withProducerAndCatchError(producer => {
val nullKey = if (key != null && key.trim().length() == 0) null else key
for (a <- 1 to num) {
val record = if (partition != -1) new ProducerRecord[String, String](topic, partition, nullKey, value) else new ProducerRecord[String, String](topic, nullKey, value)
val record = if (partition != -1) new ProducerRecord[String, String](topic, partition, nullKey, value)
else new ProducerRecord[String, String](topic, nullKey, value)
producer.send(record)
}
}, e => log.error("send error.", e))
}
def sendSync(record: ProducerRecord[Array[Byte], Array[Byte]]): (Boolean, String) = {
withByteProducerAndCatchError(producer => {
val metadata = producer.send(record).get()
(true, metadata.toString())
}, e => {
log.error("send error.", e)
(false, e.getMessage)
}).asInstanceOf[(Boolean, String)]
}
}

View File

@@ -244,4 +244,8 @@ export const KafkaMessageApi = {
url: "/message/send",
method: "post",
},
resend: {
url: "/message/resend",
method: "post",
},
};

View File

@@ -107,6 +107,18 @@
</div>
</a-table>
</div>
<div>
<h4>操作</h4>
<hr />
<a-popconfirm
title="确定将当前这条消息重新发回broker"
ok-text="确认"
cancel-text="取消"
@confirm="resend"
>
<a-button type="primary" icon="reload"> 重新发送 </a-button>
</a-popconfirm>
</div>
</a-spin>
</div>
</a-modal>
@@ -199,6 +211,25 @@ export default {
valueDeserializerChange() {
this.getMessageDetail();
},
resend() {
const params = Object.assign({}, this.data);
this.loading = true;
request({
url: KafkaMessageApi.resend.url,
method: KafkaMessageApi.resend.method,
data: params,
}).then((res) => {
this.loading = false;
if (res.code != 0) {
notification.error({
message: "error",
description: res.msg,
});
} else {
this.$message.success(res.msg);
}
});
},
},
};
const columns = [

View File

@@ -215,4 +215,7 @@ const columns = [
.red-font {
color: red;
}
.green-font {
color: green;
}
</style>