diff --git a/src/main/java/com/xuxd/kafka/console/beans/dto/QuerySendStatisticsDTO.java b/src/main/java/com/xuxd/kafka/console/beans/dto/QuerySendStatisticsDTO.java new file mode 100644 index 0000000..7bb2df9 --- /dev/null +++ b/src/main/java/com/xuxd/kafka/console/beans/dto/QuerySendStatisticsDTO.java @@ -0,0 +1,23 @@ +package com.xuxd.kafka.console.beans.dto; + +import lombok.Data; + +import java.util.Date; +import java.util.Set; + +/** + * 发送统计查询请求,指定时间段内,发送了多少消息. + * @author: xuxd + * @since: 2023/12/1 22:01 + **/ +@Data +public class QuerySendStatisticsDTO { + + private String topic; + + private Set partition; + + private Date startTime; + + private Date endTime; +} diff --git a/src/main/java/com/xuxd/kafka/console/beans/vo/QuerySendStatisticsVO.java b/src/main/java/com/xuxd/kafka/console/beans/vo/QuerySendStatisticsVO.java new file mode 100644 index 0000000..7841fe6 --- /dev/null +++ b/src/main/java/com/xuxd/kafka/console/beans/vo/QuerySendStatisticsVO.java @@ -0,0 +1,36 @@ +package com.xuxd.kafka.console.beans.vo; + +import lombok.Data; + +import java.text.DateFormat; +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.Map; + +/** + * @author: xuxd + * @since: 2023/12/1 17:49 + **/ +@Data +public class QuerySendStatisticsVO { + + private static final String FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"; + + private static final DateFormat DATE_FORMAT = new SimpleDateFormat(FORMAT); + + private String topic; + + private Long total; + + private Map detail; + + private String startTime; + + private String endTime; + + private String searchTime = format(new Date()); + + public static String format(Date date) { + return DATE_FORMAT.format(date); + } +} diff --git a/src/main/java/com/xuxd/kafka/console/controller/MessageController.java b/src/main/java/com/xuxd/kafka/console/controller/MessageController.java index ef9be13..b220e2a 100644 --- a/src/main/java/com/xuxd/kafka/console/controller/MessageController.java +++ b/src/main/java/com/xuxd/kafka/console/controller/MessageController.java @@ -6,8 +6,10 @@ import com.xuxd.kafka.console.beans.QueryMessage; import com.xuxd.kafka.console.beans.ResponseData; import com.xuxd.kafka.console.beans.SendMessage; import com.xuxd.kafka.console.beans.dto.QueryMessageDTO; +import com.xuxd.kafka.console.beans.dto.QuerySendStatisticsDTO; import com.xuxd.kafka.console.service.MessageService; import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.lang3.StringUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.*; @@ -72,4 +74,12 @@ public class MessageController { } return messageService.delete(messages); } + + @PostMapping("/send/statistics") + public Object sendStatistics(@RequestBody QuerySendStatisticsDTO dto) { + if (StringUtils.isEmpty(dto.getTopic())) { + return ResponseData.create().failed("Topic is null"); + } + return messageService.sendStatisticsByTime(dto); + } } diff --git a/src/main/java/com/xuxd/kafka/console/service/MessageService.java b/src/main/java/com/xuxd/kafka/console/service/MessageService.java index 192e944..f32890d 100644 --- a/src/main/java/com/xuxd/kafka/console/service/MessageService.java +++ b/src/main/java/com/xuxd/kafka/console/service/MessageService.java @@ -1,6 +1,7 @@ package com.xuxd.kafka.console.service; import com.xuxd.kafka.console.beans.QueryMessage; +import com.xuxd.kafka.console.beans.dto.QuerySendStatisticsDTO; import com.xuxd.kafka.console.beans.ResponseData; import com.xuxd.kafka.console.beans.SendMessage; @@ -29,4 +30,6 @@ public interface MessageService { ResponseData resend(SendMessage message); ResponseData delete(List messages); + + ResponseData sendStatisticsByTime(QuerySendStatisticsDTO request); } diff --git a/src/main/java/com/xuxd/kafka/console/service/impl/MessageServiceImpl.java b/src/main/java/com/xuxd/kafka/console/service/impl/MessageServiceImpl.java index 2d01282..7c8fd76 100644 --- a/src/main/java/com/xuxd/kafka/console/service/impl/MessageServiceImpl.java +++ b/src/main/java/com/xuxd/kafka/console/service/impl/MessageServiceImpl.java @@ -4,20 +4,13 @@ import com.xuxd.kafka.console.beans.MessageFilter; import com.xuxd.kafka.console.beans.QueryMessage; import com.xuxd.kafka.console.beans.ResponseData; import com.xuxd.kafka.console.beans.SendMessage; +import com.xuxd.kafka.console.beans.dto.QuerySendStatisticsDTO; import com.xuxd.kafka.console.beans.enums.FilterType; import com.xuxd.kafka.console.beans.vo.ConsumerRecordVO; import com.xuxd.kafka.console.beans.vo.MessageDetailVO; +import com.xuxd.kafka.console.beans.vo.QuerySendStatisticsVO; import com.xuxd.kafka.console.service.ConsumerService; import com.xuxd.kafka.console.service.MessageService; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.stream.Collectors; import kafka.console.ConsumerConsole; import kafka.console.MessageConsole; import kafka.console.TopicConsole; @@ -29,14 +22,7 @@ import org.apache.kafka.clients.admin.TopicDescription; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.serialization.ByteArrayDeserializer; -import org.apache.kafka.common.serialization.BytesDeserializer; -import org.apache.kafka.common.serialization.Deserializer; -import org.apache.kafka.common.serialization.DoubleDeserializer; -import org.apache.kafka.common.serialization.FloatDeserializer; -import org.apache.kafka.common.serialization.IntegerDeserializer; -import org.apache.kafka.common.serialization.LongDeserializer; -import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.*; import org.springframework.beans.BeansException; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.ApplicationContext; @@ -44,6 +30,9 @@ import org.springframework.context.ApplicationContextAware; import org.springframework.stereotype.Service; import scala.Tuple2; +import java.util.*; +import java.util.stream.Collectors; + /** * kafka-console-ui. * @@ -79,7 +68,8 @@ public class MessageServiceImpl implements MessageService, ApplicationContextAwa public static String defaultDeserializer = "String"; - @Override public ResponseData searchByTime(QueryMessage queryMessage) { + @Override + public ResponseData searchByTime(QueryMessage queryMessage) { int maxNums = 5000; Object searchContent = null; @@ -144,7 +134,7 @@ public class MessageServiceImpl implements MessageService, ApplicationContextAwa List> records = tuple2._1(); log.info("search message by time, cost time: {}", (System.currentTimeMillis() - startTime)); List vos = records.stream().filter(record -> record.timestamp() <= queryMessage.getEndTime()) - .map(ConsumerRecordVO::fromConsumerRecord).collect(Collectors.toList()); + .map(ConsumerRecordVO::fromConsumerRecord).collect(Collectors.toList()); Map res = new HashMap<>(); vos = vos.subList(0, Math.min(maxNums, vos.size())); res.put("maxNum", maxNums); @@ -154,13 +144,15 @@ public class MessageServiceImpl implements MessageService, ApplicationContextAwa return ResponseData.create().data(res).success(); } - @Override public ResponseData searchByOffset(QueryMessage queryMessage) { + @Override + public ResponseData searchByOffset(QueryMessage queryMessage) { Map> recordMap = searchRecordByOffset(queryMessage); return ResponseData.create().data(recordMap.values().stream().map(ConsumerRecordVO::fromConsumerRecord).collect(Collectors.toList())).success(); } - @Override public ResponseData searchDetail(QueryMessage queryMessage) { + @Override + public ResponseData searchDetail(QueryMessage queryMessage) { if (queryMessage.getPartition() == -1) { throw new IllegalArgumentException(); } @@ -219,18 +211,21 @@ public class MessageServiceImpl implements MessageService, ApplicationContextAwa return ResponseData.create().failed("Not found message detail."); } - @Override public ResponseData deserializerList() { + @Override + public ResponseData deserializerList() { return ResponseData.create().data(deserializerDict.keySet()).success(); } - @Override public ResponseData send(SendMessage message) { + @Override + public ResponseData send(SendMessage message) { messageConsole.send(message.getTopic(), message.getPartition(), message.getKey(), message.getBody(), message.getNum()); return ResponseData.create().success(); } - @Override public ResponseData sendWithHeader(SendMessage message) { - String[] headerKeys= message.getHeaders().stream().map(SendMessage.Header::getHeaderKey).toArray(String[]::new); - String[] headerValues= message.getHeaders().stream().map(SendMessage.Header::getHeaderValue).toArray(String[]::new); + @Override + public ResponseData sendWithHeader(SendMessage message) { + String[] headerKeys = message.getHeaders().stream().map(SendMessage.Header::getHeaderKey).toArray(String[]::new); + String[] headerValues = message.getHeaders().stream().map(SendMessage.Header::getHeaderValue).toArray(String[]::new); // log.info("send with header:keys{},values{}",headerKeys, headerValues); Tuple2 tuple2 = messageConsole.send(message.getTopic(), message.getPartition(), @@ -240,10 +235,11 @@ public class MessageServiceImpl implements MessageService, ApplicationContextAwa headerKeys, headerValues, message.isSync()); - return (boolean)tuple2._1 ? ResponseData.create().success() : ResponseData.create().failed(tuple2._2); + return (boolean) tuple2._1 ? ResponseData.create().success() : ResponseData.create().failed(tuple2._2); } - @Override public ResponseData resend(SendMessage message) { + @Override + public ResponseData resend(SendMessage message) { TopicPartition partition = new TopicPartition(message.getTopic(), message.getPartition()); Map offsetTable = new HashMap<>(1, 1.0f); offsetTable.put(partition, message.getOffset()); @@ -270,6 +266,44 @@ public class MessageServiceImpl implements MessageService, ApplicationContextAwa return success ? ResponseData.create().success() : ResponseData.create().failed(tuple2._2()); } + @Override + public ResponseData sendStatisticsByTime(QuerySendStatisticsDTO request) { + + if (request.getPartition() != null && request.getPartition().contains(-1)) { + request.setPartition(Collections.emptySet()); + } + + Map startOffsetMap = topicConsole.getOffsetForTimestamp(request.getTopic(), request.getStartTime().getTime()). + entrySet().stream().collect(Collectors.toMap(e -> e.getKey().partition(), Map.Entry::getValue, (e1, e2) -> e2)); + Map endOffsetMap = topicConsole.getOffsetForTimestamp(request.getTopic(), request.getEndTime().getTime()). + entrySet().stream().collect(Collectors.toMap(e -> e.getKey().partition(), Map.Entry::getValue, (e1, e2) -> e2)); + + Map diffOffsetMap = endOffsetMap.entrySet().stream(). + collect(Collectors.toMap(e -> e.getKey(), + e -> Arrays.asList(e.getValue(), startOffsetMap.getOrDefault(e.getKey(), 0L)). + stream().reduce((a, b) -> a - b).get())); + + if (CollectionUtils.isNotEmpty(request.getPartition())) { + Iterator> iterator = diffOffsetMap.entrySet().iterator(); + while (iterator.hasNext()) { + Integer partition = iterator.next().getKey(); + if (!request.getPartition().contains(partition)) { + iterator.remove(); + } + } + } + + Long total = diffOffsetMap.values().stream().reduce(0L, (a, b) -> a + b); + QuerySendStatisticsVO vo = new QuerySendStatisticsVO(); + vo.setTopic(request.getTopic()); + vo.setTotal(total); + vo.setDetail(diffOffsetMap); + vo.setStartTime(QuerySendStatisticsVO.format(request.getStartTime())); + vo.setEndTime(QuerySendStatisticsVO.format(request.getEndTime())); + + return ResponseData.create().data(vo).success(); + } + private Map> searchRecordByOffset(QueryMessage queryMessage) { Set partitions = getPartitions(queryMessage); @@ -291,13 +325,14 @@ public class MessageServiceImpl implements MessageService, ApplicationContextAwa throw new IllegalArgumentException("Can not find topic info."); } Set set = list.get(0).partitions().stream() - .map(tp -> new TopicPartition(queryMessage.getTopic(), tp.partition())).collect(Collectors.toSet()); + .map(tp -> new TopicPartition(queryMessage.getTopic(), tp.partition())).collect(Collectors.toSet()); partitions.addAll(set); } return partitions; } - @Override public void setApplicationContext(ApplicationContext context) throws BeansException { + @Override + public void setApplicationContext(ApplicationContext context) throws BeansException { this.applicationContext = context; } } diff --git a/src/main/scala/kafka/console/KafkaConsole.scala b/src/main/scala/kafka/console/KafkaConsole.scala index 74b6949..c6d11a8 100644 --- a/src/main/scala/kafka/console/KafkaConsole.scala +++ b/src/main/scala/kafka/console/KafkaConsole.scala @@ -189,10 +189,10 @@ object KafkaConsole { case (topicPartition, listOffsetsResultInfo) => topicPartition -> new OffsetAndMetadata(listOffsetsResultInfo.offset) }.toMap - unsuccessfulOffsetsForTimes.foreach { entry => - log.warn(s"\nWarn: Partition " + entry._1.partition() + " from topic " + entry._1.topic() + - " is empty. Falling back to latest known offset.") - } +// unsuccessfulOffsetsForTimes.foreach { entry => +// log.warn(s"\nWarn: Partition " + entry._1.partition() + " from topic " + entry._1.topic() + +// " is empty. Falling back to latest known offset.") +// } successfulLogTimestampOffsets ++ getLogEndOffsets(admin, unsuccessfulOffsetsForTimes.keySet.toSeq, timeoutMs) } diff --git a/ui/src/utils/api.js b/ui/src/utils/api.js index 90649d5..5e73ef4 100644 --- a/ui/src/utils/api.js +++ b/ui/src/utils/api.js @@ -296,6 +296,10 @@ export const KafkaMessageApi = { url: "/message", method: "delete", }, + sendStatistics: { + url: "/message/send/statistics", + method: "post", + }, }; export const KafkaClientQuotaApi = { diff --git a/ui/src/views/message/Message.vue b/ui/src/views/message/Message.vue index 99df8be..54ac4b5 100644 --- a/ui/src/views/message/Message.vue +++ b/ui/src/views/message/Message.vue @@ -23,6 +23,10 @@ + + + + @@ -31,6 +35,7 @@ + +