diff --git a/src/main/java/com/xuxd/kafka/console/beans/QueryMessage.java b/src/main/java/com/xuxd/kafka/console/beans/QueryMessage.java new file mode 100644 index 0000000..be23dea --- /dev/null +++ b/src/main/java/com/xuxd/kafka/console/beans/QueryMessage.java @@ -0,0 +1,21 @@ +package com.xuxd.kafka.console.beans; + +import lombok.Data; + +/** + * kafka-console-ui. + * + * @author xuxd + * @date 2021-12-11 09:45:49 + **/ +@Data +public class QueryMessage { + + private String topic; + + private int partition; + + private long startTime; + + private long endTime; +} diff --git a/src/main/java/com/xuxd/kafka/console/beans/dto/QueryMessageDTO.java b/src/main/java/com/xuxd/kafka/console/beans/dto/QueryMessageDTO.java new file mode 100644 index 0000000..cbafd17 --- /dev/null +++ b/src/main/java/com/xuxd/kafka/console/beans/dto/QueryMessageDTO.java @@ -0,0 +1,33 @@ +package com.xuxd.kafka.console.beans.dto; + +import com.xuxd.kafka.console.beans.QueryMessage; +import java.util.Date; +import lombok.Data; + +/** + * kafka-console-ui. + * + * @author xuxd + * @date 2021-12-11 09:17:59 + **/ +@Data +public class QueryMessageDTO { + + private String topic; + + private int partition; + + private Date startTime; + + private Date endTime; + + public QueryMessage toQueryMessage() { + QueryMessage queryMessage = new QueryMessage(); + queryMessage.setTopic(topic); + queryMessage.setPartition(partition); + queryMessage.setStartTime(startTime.getTime()); + queryMessage.setEndTime(endTime.getTime()); + + return queryMessage; + } +} diff --git a/src/main/java/com/xuxd/kafka/console/beans/vo/ConsumerRecordVO.java b/src/main/java/com/xuxd/kafka/console/beans/vo/ConsumerRecordVO.java new file mode 100644 index 0000000..5473828 --- /dev/null +++ b/src/main/java/com/xuxd/kafka/console/beans/vo/ConsumerRecordVO.java @@ -0,0 +1,32 @@ +package com.xuxd.kafka.console.beans.vo; + +import lombok.Data; +import org.apache.kafka.clients.consumer.ConsumerRecord; + +/** + * kafka-console-ui. + * + * @author xuxd + * @date 2021-12-11 14:19:35 + **/ +@Data +public class ConsumerRecordVO { + + private String topic; + + private int partition; + + private long offset; + + private long timestamp; + + public static ConsumerRecordVO fromConsumerRecord(ConsumerRecord record) { + ConsumerRecordVO vo = new ConsumerRecordVO(); + vo.setTopic(record.topic()); + vo.setPartition(record.partition()); + vo.setOffset(record.offset()); + vo.setTimestamp(record.timestamp()); + + return vo; + } +} diff --git a/src/main/java/com/xuxd/kafka/console/config/KafkaConfiguration.java b/src/main/java/com/xuxd/kafka/console/config/KafkaConfiguration.java index 0900b59..c2277fb 100644 --- a/src/main/java/com/xuxd/kafka/console/config/KafkaConfiguration.java +++ b/src/main/java/com/xuxd/kafka/console/config/KafkaConfiguration.java @@ -5,6 +5,7 @@ import kafka.console.ConfigConsole; import kafka.console.ConsumerConsole; import kafka.console.KafkaAclConsole; import kafka.console.KafkaConfigConsole; +import kafka.console.MessageConsole; import kafka.console.OperationConsole; import kafka.console.TopicConsole; import org.springframework.context.annotation.Bean; @@ -54,4 +55,9 @@ public class KafkaConfiguration { ConsumerConsole consumerConsole) { return new OperationConsole(config, topicConsole, consumerConsole); } + + @Bean + public MessageConsole messageConsole(KafkaConfig config) { + return new MessageConsole(config); + } } diff --git a/src/main/java/com/xuxd/kafka/console/controller/MessageController.java b/src/main/java/com/xuxd/kafka/console/controller/MessageController.java new file mode 100644 index 0000000..cfce28a --- /dev/null +++ b/src/main/java/com/xuxd/kafka/console/controller/MessageController.java @@ -0,0 +1,28 @@ +package com.xuxd.kafka.console.controller; + +import com.xuxd.kafka.console.beans.dto.QueryMessageDTO; +import com.xuxd.kafka.console.service.MessageService; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestBody; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; + +/** + * kafka-console-ui. + * + * @author xuxd + * @date 2021-12-11 09:22:19 + **/ +@RestController +@RequestMapping("/message") +public class MessageController { + + @Autowired + private MessageService messageService; + + @PostMapping("/search/time") + public Object searchByTime(@RequestBody QueryMessageDTO dto) { + return messageService.searchByTime(dto.toQueryMessage()); + } +} diff --git a/src/main/java/com/xuxd/kafka/console/service/MessageService.java b/src/main/java/com/xuxd/kafka/console/service/MessageService.java new file mode 100644 index 0000000..cdaa10c --- /dev/null +++ b/src/main/java/com/xuxd/kafka/console/service/MessageService.java @@ -0,0 +1,15 @@ +package com.xuxd.kafka.console.service; + +import com.xuxd.kafka.console.beans.QueryMessage; +import com.xuxd.kafka.console.beans.ResponseData; + +/** + * kafka-console-ui. + * + * @author xuxd + * @date 2021-12-11 09:43:26 + **/ +public interface MessageService { + + ResponseData searchByTime(QueryMessage queryMessage); +} diff --git a/src/main/java/com/xuxd/kafka/console/service/impl/MessageServiceImpl.java b/src/main/java/com/xuxd/kafka/console/service/impl/MessageServiceImpl.java new file mode 100644 index 0000000..d140980 --- /dev/null +++ b/src/main/java/com/xuxd/kafka/console/service/impl/MessageServiceImpl.java @@ -0,0 +1,62 @@ +package com.xuxd.kafka.console.service.impl; + +import com.xuxd.kafka.console.beans.QueryMessage; +import com.xuxd.kafka.console.beans.ResponseData; +import com.xuxd.kafka.console.beans.vo.ConsumerRecordVO; +import com.xuxd.kafka.console.service.MessageService; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; +import kafka.console.MessageConsole; +import kafka.console.TopicConsole; +import org.apache.commons.collections.CollectionUtils; +import org.apache.kafka.clients.admin.TopicDescription; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.TopicPartition; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +/** + * kafka-console-ui. + * + * @author xuxd + * @date 2021-12-11 09:43:44 + **/ +@Service +public class MessageServiceImpl implements MessageService { + + @Autowired + private MessageConsole messageConsole; + + @Autowired + private TopicConsole topicConsole; + + @Override public ResponseData searchByTime(QueryMessage queryMessage) { + int maxNums = 10000; + + Set partitions = new HashSet<>(); + if (queryMessage.getPartition() != -1) { + partitions.add(new TopicPartition(queryMessage.getTopic(), queryMessage.getPartition())); + } else { + List list = topicConsole.getTopicList(Collections.singleton(queryMessage.getTopic())); + if (CollectionUtils.isEmpty(list)) { + throw new IllegalArgumentException("Can not find topic info."); + } + Set set = list.get(0).partitions().stream() + .map(tp -> new TopicPartition(queryMessage.getTopic(), tp.partition())).collect(Collectors.toSet()); + partitions.addAll(set); + } + List> records = messageConsole.searchBy(partitions, queryMessage.getStartTime(), queryMessage.getEndTime(), maxNums); + List vos = records.stream().filter(record -> record.timestamp() <= queryMessage.getEndTime()) + .map(ConsumerRecordVO::fromConsumerRecord).collect(Collectors.toList()); + Map res = new HashMap<>(); + res.put("maxNum", maxNums); + res.put("realNum", vos.size()); + res.put("data", vos.subList(0, Math.min(maxNums, vos.size()))); + return ResponseData.create().data(res).success(); + } +} diff --git a/src/main/scala/kafka/console/MessageConsole.scala b/src/main/scala/kafka/console/MessageConsole.scala new file mode 100644 index 0000000..038e802 --- /dev/null +++ b/src/main/scala/kafka/console/MessageConsole.scala @@ -0,0 +1,89 @@ +package kafka.console + +import com.xuxd.kafka.console.config.KafkaConfig +import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord} +import org.apache.kafka.common.TopicPartition + +import java.time.Duration +import java.util +import java.util.Properties +import scala.collection.immutable +import scala.jdk.CollectionConverters.CollectionHasAsScala + +/** + * kafka-console-ui. + * + * @author xuxd + * @date 2021-12-11 09:39:40 + * */ +class MessageConsole(config: KafkaConfig) extends KafkaConsole(config: KafkaConfig) with Logging { + + def searchBy(partitions: util.Collection[TopicPartition], startTime: Long, endTime: Long, + maxNums: Int): util.List[ConsumerRecord[Array[Byte], Array[Byte]]] = { + var startOffTable: immutable.Map[TopicPartition, Long] = Map.empty + var endOffTable: immutable.Map[TopicPartition, Long] = Map.empty + withAdminClientAndCatchError(admin => { + val startTable = KafkaConsole.getLogTimestampOffsets(admin, partitions.asScala.toSeq, startTime, timeoutMs) + startOffTable = startTable.map(t2 => (t2._1, t2._2.offset())).toMap + + endOffTable = KafkaConsole.getLogTimestampOffsets(admin, partitions.asScala.toSeq, endTime, timeoutMs) + .map(t2 => (t2._1, t2._2.offset())).toMap + }, e => { + log.error("getLogTimestampOffsets error.", e) + throw new RuntimeException("getLogTimestampOffsets error", e) + }) + var terminate: Boolean = (startOffTable == endOffTable) + val res = new util.LinkedList[ConsumerRecord[Array[Byte], Array[Byte]]]() + // 如果最小和最大偏移一致,就结束 + if (!terminate) { + + val arrive = new util.HashSet[TopicPartition](partitions) + val props = new Properties() + props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false") + withConsumerAndCatchError(consumer => { + consumer.assign(partitions) + for ((tp, off) <- startOffTable) { + consumer.seek(tp, off) + } + + // 终止条件 + // 1.所有查询分区达都到最大偏移的时候 + while (!terminate) { + // 达到查询的最大条数 + if (res.size() >= maxNums) { + terminate = true + } else { + val records = consumer.poll(Duration.ofMillis(timeoutMs)) + + for ((tp, endOff) <- endOffTable) { + if (!terminate) { + val recordList = records.records(tp) + if (recordList.isEmpty) { + arrive.remove(tp) + } else { + val first = recordList.get(0) + if (first.offset() >= endOff) { + arrive.remove(tp) + } else { + res.addAll(recordList) + if (recordList.get(recordList.size() - 1).offset() >= endOff) { + arrive.remove(tp) + } + } + } + + if (arrive.isEmpty) { + terminate = true; + } + } + } + } + } + }, e => { + log.error("searchBy time error.", e) + }) + } + + res + } +} diff --git a/ui/src/utils/api.js b/ui/src/utils/api.js index e1bc2d2..fcf51bd 100644 --- a/ui/src/utils/api.js +++ b/ui/src/utils/api.js @@ -223,3 +223,9 @@ export const KafkaOpApi = { method: "delete", }, }; +export const KafkaMessageApi = { + searchByTime: { + url: "/message/search/time", + method: "post", + }, +}; diff --git a/ui/src/views/message/SearchByTime.vue b/ui/src/views/message/SearchByTime.vue index e8e14a5..f46bfa8 100644 --- a/ui/src/views/message/SearchByTime.vue +++ b/ui/src/views/message/SearchByTime.vue @@ -61,14 +61,36 @@ + + +

+ 检索条数:{{ data.realNum }},允许返回的最大条数:{{ + data.maxNum + }} +

+ +
+ 消息详情 + +
+