diff --git a/src/main/java/com/xuxd/kafka/console/beans/SendMessage.java b/src/main/java/com/xuxd/kafka/console/beans/SendMessage.java index 7dbe6ce..b4e25b4 100644 --- a/src/main/java/com/xuxd/kafka/console/beans/SendMessage.java +++ b/src/main/java/com/xuxd/kafka/console/beans/SendMessage.java @@ -20,4 +20,6 @@ public class SendMessage { private String body; private int num; + + private long offset; } diff --git a/src/main/java/com/xuxd/kafka/console/controller/MessageController.java b/src/main/java/com/xuxd/kafka/console/controller/MessageController.java index 022c3bf..8e049fd 100644 --- a/src/main/java/com/xuxd/kafka/console/controller/MessageController.java +++ b/src/main/java/com/xuxd/kafka/console/controller/MessageController.java @@ -47,4 +47,9 @@ public class MessageController { public Object send(@RequestBody SendMessage message) { return messageService.send(message); } + + @PostMapping("/resend") + public Object resend(@RequestBody SendMessage message) { + return messageService.resend(message); + } } diff --git a/src/main/java/com/xuxd/kafka/console/service/MessageService.java b/src/main/java/com/xuxd/kafka/console/service/MessageService.java index 7c37176..bb4fa0d 100644 --- a/src/main/java/com/xuxd/kafka/console/service/MessageService.java +++ b/src/main/java/com/xuxd/kafka/console/service/MessageService.java @@ -21,4 +21,6 @@ public interface MessageService { ResponseData deserializerList(); ResponseData send(SendMessage message); + + ResponseData resend(SendMessage message); } diff --git a/src/main/java/com/xuxd/kafka/console/service/impl/MessageServiceImpl.java b/src/main/java/com/xuxd/kafka/console/service/impl/MessageServiceImpl.java index 02fe815..e8c4926 100644 --- a/src/main/java/com/xuxd/kafka/console/service/impl/MessageServiceImpl.java +++ b/src/main/java/com/xuxd/kafka/console/service/impl/MessageServiceImpl.java @@ -24,6 +24,7 @@ import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang3.StringUtils; import org.apache.kafka.clients.admin.TopicDescription; import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.ByteArrayDeserializer; import org.apache.kafka.common.serialization.BytesDeserializer; @@ -37,6 +38,7 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; import org.springframework.stereotype.Service; +import scala.Tuple2; /** * kafka-console-ui. @@ -142,7 +144,7 @@ public class MessageServiceImpl implements MessageService, ApplicationContextAwa if (consumerInfo.topicPartition().equals(new TopicPartition(record.topic(), record.partition()))) { MessageDetailVO.ConsumerVO consumerVO = new MessageDetailVO.ConsumerVO(); consumerVO.setGroupId(consumerInfo.getGroupId()); - consumerVO.setStatus(consumerInfo.getConsumerOffset() < record.offset() ? "unconsume" : "consumed"); + consumerVO.setStatus(consumerInfo.getConsumerOffset() <= record.offset() ? "unconsume" : "consumed"); consumerVOS.add(consumerVO); } }); @@ -162,6 +164,21 @@ public class MessageServiceImpl implements MessageService, ApplicationContextAwa return ResponseData.create().success(); } + @Override public ResponseData resend(SendMessage message) { + TopicPartition partition = new TopicPartition(message.getTopic(), message.getPartition()); + Map offsetTable = new HashMap<>(1, 1.0f); + offsetTable.put(partition, message.getOffset()); + Map> recordMap = messageConsole.searchBy(offsetTable); + if (recordMap.isEmpty()) { + return ResponseData.create().failed("Get message failed."); + } + ConsumerRecord record = recordMap.get(partition); + ProducerRecord producerRecord = new ProducerRecord<>(record.topic(), record.partition(), record.key(), record.value(), record.headers()); + Tuple2 tuple2 = messageConsole.sendSync(producerRecord); + boolean success = (boolean) tuple2._1(); + return success ? ResponseData.create().success("success: " + tuple2._2()) : ResponseData.create().failed(tuple2._2()); + } + private Map> searchRecordByOffset(QueryMessage queryMessage) { Set partitions = getPartitions(queryMessage); diff --git a/src/main/scala/kafka/console/KafkaConsole.scala b/src/main/scala/kafka/console/KafkaConsole.scala index 584d21f..fe8ae40 100644 --- a/src/main/scala/kafka/console/KafkaConsole.scala +++ b/src/main/scala/kafka/console/KafkaConsole.scala @@ -9,7 +9,7 @@ import org.apache.kafka.clients.producer.KafkaProducer import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.config.SaslConfigs import org.apache.kafka.common.requests.ListOffsetsResponse -import org.apache.kafka.common.serialization.{ByteArrayDeserializer, StringSerializer} +import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySerializer, StringSerializer} import org.apache.kafka.common.utils.Time import org.slf4j.{Logger, LoggerFactory} @@ -77,6 +77,22 @@ class KafkaConsole(config: KafkaConfig) { } } + protected def withByteProducerAndCatchError(f: KafkaProducer[Array[Byte], Array[Byte]] => Any, eh: Exception => Any, + extra: Properties = new Properties()): Any = { + val props = getProps() + props.putAll(extra) + props.put(ConsumerConfig.CLIENT_ID_CONFIG, String.valueOf(System.currentTimeMillis())) + val producer = new KafkaProducer[Array[Byte], Array[Byte]](props, new ByteArraySerializer, new ByteArraySerializer) + try { + f(producer) + } catch { + case er: Exception => eh(er) + } + finally { + producer.close() + } + } + protected def withZKClient(f: AdminZkClient => Any): Any = { val zkClient = KafkaZkClient(config.getZookeeperAddr, false, 30000, 30000, Int.MaxValue, Time.SYSTEM) val adminZkClient = new AdminZkClient(zkClient) diff --git a/src/main/scala/kafka/console/MessageConsole.scala b/src/main/scala/kafka/console/MessageConsole.scala index a6b57e8..f59468c 100644 --- a/src/main/scala/kafka/console/MessageConsole.scala +++ b/src/main/scala/kafka/console/MessageConsole.scala @@ -176,10 +176,21 @@ class MessageConsole(config: KafkaConfig) extends KafkaConsole(config: KafkaConf withProducerAndCatchError(producer => { val nullKey = if (key != null && key.trim().length() == 0) null else key for (a <- 1 to num) { - val record = if (partition != -1) new ProducerRecord[String, String](topic, partition, nullKey, value) else new ProducerRecord[String, String](topic, nullKey, value) + val record = if (partition != -1) new ProducerRecord[String, String](topic, partition, nullKey, value) + else new ProducerRecord[String, String](topic, nullKey, value) producer.send(record) } }, e => log.error("send error.", e)) } + + def sendSync(record: ProducerRecord[Array[Byte], Array[Byte]]): (Boolean, String) = { + withByteProducerAndCatchError(producer => { + val metadata = producer.send(record).get() + (true, metadata.toString()) + }, e => { + log.error("send error.", e) + (false, e.getMessage) + }).asInstanceOf[(Boolean, String)] + } } diff --git a/ui/src/utils/api.js b/ui/src/utils/api.js index 2e40dda..36a2f84 100644 --- a/ui/src/utils/api.js +++ b/ui/src/utils/api.js @@ -244,4 +244,8 @@ export const KafkaMessageApi = { url: "/message/send", method: "post", }, + resend: { + url: "/message/resend", + method: "post", + }, }; diff --git a/ui/src/views/message/MessageDetail.vue b/ui/src/views/message/MessageDetail.vue index 74eca06..c2d6986 100644 --- a/ui/src/views/message/MessageDetail.vue +++ b/ui/src/views/message/MessageDetail.vue @@ -107,6 +107,18 @@ +
+

操作

+
+ + 重新发送 + +
@@ -199,6 +211,25 @@ export default { valueDeserializerChange() { this.getMessageDetail(); }, + resend() { + const params = Object.assign({}, this.data); + this.loading = true; + request({ + url: KafkaMessageApi.resend.url, + method: KafkaMessageApi.resend.method, + data: params, + }).then((res) => { + this.loading = false; + if (res.code != 0) { + notification.error({ + message: "error", + description: res.msg, + }); + } else { + this.$message.success(res.msg); + } + }); + }, }, }; const columns = [ diff --git a/ui/src/views/topic/PartitionInfo.vue b/ui/src/views/topic/PartitionInfo.vue index bc28d86..e4ba54f 100644 --- a/ui/src/views/topic/PartitionInfo.vue +++ b/ui/src/views/topic/PartitionInfo.vue @@ -215,4 +215,7 @@ const columns = [ .red-font { color: red; } +.green-font { + color: green; +}