diff --git a/src/main/java/com/xuxd/kafka/console/beans/MessageFilter.java b/src/main/java/com/xuxd/kafka/console/beans/MessageFilter.java new file mode 100644 index 0000000..784972b --- /dev/null +++ b/src/main/java/com/xuxd/kafka/console/beans/MessageFilter.java @@ -0,0 +1,73 @@ +package com.xuxd.kafka.console.beans; + +import com.xuxd.kafka.console.beans.enums.FilterType; +import org.apache.kafka.common.serialization.Deserializer; + +/** + * kafka-console-ui. + * + * @author xuxd + * @date 2021-12-29 15:30:08 + **/ +public class MessageFilter { + + private FilterType filterType = FilterType.NONE; + + private Object searchContent = null; + + private String headerKey = null; + + private String headerValue = null; + + private Deserializer deserializer = null; + + private boolean isContainsValue = false; + + public FilterType getFilterType() { + return filterType; + } + + public void setFilterType(FilterType filterType) { + this.filterType = filterType; + } + + public Object getSearchContent() { + return searchContent; + } + + public void setSearchContent(Object searchContent) { + this.searchContent = searchContent; + } + + public String getHeaderKey() { + return headerKey; + } + + public void setHeaderKey(String headerKey) { + this.headerKey = headerKey; + } + + public String getHeaderValue() { + return headerValue; + } + + public void setHeaderValue(String headerValue) { + this.headerValue = headerValue; + } + + public Deserializer getDeserializer() { + return deserializer; + } + + public void setDeserializer(Deserializer deserializer) { + this.deserializer = deserializer; + } + + public boolean isContainsValue() { + return isContainsValue; + } + + public void setContainsValue(boolean containsValue) { + isContainsValue = containsValue; + } +} diff --git a/src/main/java/com/xuxd/kafka/console/beans/QueryMessage.java b/src/main/java/com/xuxd/kafka/console/beans/QueryMessage.java index f620e82..ab914ff 100644 --- a/src/main/java/com/xuxd/kafka/console/beans/QueryMessage.java +++ b/src/main/java/com/xuxd/kafka/console/beans/QueryMessage.java @@ -1,5 +1,6 @@ package com.xuxd.kafka.console.beans; +import com.xuxd.kafka.console.beans.enums.FilterType; import lombok.Data; /** @@ -24,4 +25,12 @@ public class QueryMessage { private String keyDeserializer; private String valueDeserializer; + + private FilterType filter; + + private String value; + + private String headerKey; + + private String headerValue; } diff --git a/src/main/java/com/xuxd/kafka/console/beans/dto/QueryMessageDTO.java b/src/main/java/com/xuxd/kafka/console/beans/dto/QueryMessageDTO.java index 965f8a5..f2411dc 100644 --- a/src/main/java/com/xuxd/kafka/console/beans/dto/QueryMessageDTO.java +++ b/src/main/java/com/xuxd/kafka/console/beans/dto/QueryMessageDTO.java @@ -1,8 +1,10 @@ package com.xuxd.kafka.console.beans.dto; import com.xuxd.kafka.console.beans.QueryMessage; +import com.xuxd.kafka.console.beans.enums.FilterType; import java.util.Date; import lombok.Data; +import org.apache.commons.lang3.StringUtils; /** * kafka-console-ui. @@ -27,6 +29,14 @@ public class QueryMessageDTO { private String valueDeserializer; + private String filter; + + private String value; + + private String headerKey; + + private String headerValue; + public QueryMessage toQueryMessage() { QueryMessage queryMessage = new QueryMessage(); queryMessage.setTopic(topic); @@ -45,6 +55,21 @@ public class QueryMessageDTO { queryMessage.setKeyDeserializer(keyDeserializer); queryMessage.setValueDeserializer(valueDeserializer); + if (StringUtils.isNotBlank(filter)) { + queryMessage.setFilter(FilterType.valueOf(filter.toUpperCase())); + } else { + queryMessage.setFilter(FilterType.NONE); + } + if (StringUtils.isNotBlank(value)) { + queryMessage.setValue(value.trim()); + } + if (StringUtils.isNotBlank(headerKey)) { + queryMessage.setHeaderKey(headerKey.trim()); + } + if (StringUtils.isNotBlank(headerValue)) { + queryMessage.setHeaderValue(headerValue.trim()); + } + return queryMessage; } } diff --git a/src/main/java/com/xuxd/kafka/console/beans/enums/FilterType.java b/src/main/java/com/xuxd/kafka/console/beans/enums/FilterType.java new file mode 100644 index 0000000..97e793d --- /dev/null +++ b/src/main/java/com/xuxd/kafka/console/beans/enums/FilterType.java @@ -0,0 +1,11 @@ +package com.xuxd.kafka.console.beans.enums; + +/** + * kafka-console-ui. + * + * @author xuxd + * @date 2021-12-29 14:36:01 + **/ +public enum FilterType { + NONE, BODY, HEADER +} diff --git a/src/main/java/com/xuxd/kafka/console/service/impl/MessageServiceImpl.java b/src/main/java/com/xuxd/kafka/console/service/impl/MessageServiceImpl.java index e8c4926..f0ce85a 100644 --- a/src/main/java/com/xuxd/kafka/console/service/impl/MessageServiceImpl.java +++ b/src/main/java/com/xuxd/kafka/console/service/impl/MessageServiceImpl.java @@ -1,8 +1,10 @@ package com.xuxd.kafka.console.service.impl; +import com.xuxd.kafka.console.beans.MessageFilter; import com.xuxd.kafka.console.beans.QueryMessage; import com.xuxd.kafka.console.beans.ResponseData; import com.xuxd.kafka.console.beans.SendMessage; +import com.xuxd.kafka.console.beans.enums.FilterType; import com.xuxd.kafka.console.beans.vo.ConsumerRecordVO; import com.xuxd.kafka.console.beans.vo.MessageDetailVO; import com.xuxd.kafka.console.service.ConsumerService; @@ -32,6 +34,7 @@ import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.DoubleDeserializer; import org.apache.kafka.common.serialization.FloatDeserializer; import org.apache.kafka.common.serialization.IntegerDeserializer; +import org.apache.kafka.common.serialization.LongDeserializer; import org.apache.kafka.common.serialization.StringDeserializer; import org.springframework.beans.BeansException; import org.springframework.beans.factory.annotation.Autowired; @@ -70,6 +73,7 @@ public class MessageServiceImpl implements MessageService, ApplicationContextAwa deserializerDict.put("Float", new FloatDeserializer()); deserializerDict.put("Double", new DoubleDeserializer()); deserializerDict.put("Byte", new BytesDeserializer()); + deserializerDict.put("Long", new LongDeserializer()); } public static String defaultDeserializer = "String"; @@ -77,9 +81,65 @@ public class MessageServiceImpl implements MessageService, ApplicationContextAwa @Override public ResponseData searchByTime(QueryMessage queryMessage) { int maxNums = 10000; + Object searchContent = null; + String headerKey = null; + String headerValue = null; + MessageFilter filter = new MessageFilter(); + switch (queryMessage.getFilter()) { + case BODY: + if (StringUtils.isBlank(queryMessage.getValue())) { + queryMessage.setFilter(FilterType.NONE); + } else { + if (StringUtils.isBlank(queryMessage.getValueDeserializer())) { + queryMessage.setValueDeserializer(defaultDeserializer); + } + switch (queryMessage.getValueDeserializer()) { + case "String": + searchContent = String.valueOf(queryMessage.getValue()); + filter.setContainsValue(true); + break; + case "Integer": + searchContent = Integer.valueOf(queryMessage.getValue()); + break; + case "Float": + searchContent = Float.valueOf(queryMessage.getValue()); + break; + case "Double": + searchContent = Double.valueOf(queryMessage.getValue()); + break; + case "Long": + searchContent = Long.valueOf(queryMessage.getValue()); + break; + default: + throw new IllegalArgumentException("Message body type not support."); + } + } + break; + case HEADER: + headerKey = queryMessage.getHeaderKey(); + if (StringUtils.isBlank(headerKey)) { + queryMessage.setFilter(FilterType.NONE); + } else { + if (StringUtils.isNotBlank(queryMessage.getHeaderValue())) { + headerValue = String.valueOf(queryMessage.getHeaderValue()); + } + } + break; + default: + break; + } + + FilterType filterType = queryMessage.getFilter(); + Deserializer deserializer = deserializerDict.get(queryMessage.getValueDeserializer()); + filter.setFilterType(filterType); + filter.setSearchContent(searchContent); + filter.setDeserializer(deserializer); + filter.setHeaderKey(headerKey); + filter.setHeaderValue(headerValue); + Set partitions = getPartitions(queryMessage); long startTime = System.currentTimeMillis(); - List> records = messageConsole.searchBy(partitions, queryMessage.getStartTime(), queryMessage.getEndTime(), maxNums); + List> records = messageConsole.searchBy(partitions, queryMessage.getStartTime(), queryMessage.getEndTime(), maxNums, filter); log.info("search message by time, cost time: {}", (System.currentTimeMillis() - startTime)); List vos = records.stream().filter(record -> record.timestamp() <= queryMessage.getEndTime()) .map(ConsumerRecordVO::fromConsumerRecord).collect(Collectors.toList()); diff --git a/src/main/scala/kafka/console/MessageConsole.scala b/src/main/scala/kafka/console/MessageConsole.scala index f59468c..19fb7cd 100644 --- a/src/main/scala/kafka/console/MessageConsole.scala +++ b/src/main/scala/kafka/console/MessageConsole.scala @@ -1,6 +1,9 @@ package kafka.console +import com.xuxd.kafka.console.beans.MessageFilter +import com.xuxd.kafka.console.beans.enums.FilterType import com.xuxd.kafka.console.config.KafkaConfig +import org.apache.commons.lang3.StringUtils import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord} import org.apache.kafka.clients.producer.ProducerRecord import org.apache.kafka.common.TopicPartition @@ -20,7 +23,7 @@ import scala.jdk.CollectionConverters.{CollectionHasAsScala, MapHasAsScala, SeqH class MessageConsole(config: KafkaConfig) extends KafkaConsole(config: KafkaConfig) with Logging { def searchBy(partitions: util.Collection[TopicPartition], startTime: Long, endTime: Long, - maxNums: Int): util.List[ConsumerRecord[Array[Byte], Array[Byte]]] = { + maxNums: Int, filter: MessageFilter): util.List[ConsumerRecord[Array[Byte], Array[Byte]]] = { var startOffTable: immutable.Map[TopicPartition, Long] = Map.empty var endOffTable: immutable.Map[TopicPartition, Long] = Map.empty withAdminClientAndCatchError(admin => { @@ -33,8 +36,44 @@ class MessageConsole(config: KafkaConfig) extends KafkaConsole(config: KafkaConf log.error("getLogTimestampOffsets error.", e) throw new RuntimeException("getLogTimestampOffsets error", e) }) + + val headerValueBytes = if (StringUtils.isNotEmpty(filter.getHeaderValue())) filter.getHeaderValue().getBytes() else None + + def filterMessage(record: ConsumerRecord[Array[Byte], Array[Byte]]): Boolean = { + filter.getFilterType() match { + case FilterType.BODY => { + val body = filter.getDeserializer().deserialize(record.topic(), record.value()) + var contains = false + if (filter.isContainsValue) { + contains = body.asInstanceOf[String].contains(filter.getSearchContent().asInstanceOf[String]) + } else { + contains = body.equals(filter.getSearchContent) + } + contains + } + case FilterType.HEADER => { + if (StringUtils.isNotEmpty(filter.getHeaderKey()) && StringUtils.isNotEmpty(filter.getHeaderValue())) { + val iterator = record.headers().headers(filter.getHeaderKey()).iterator() + var contains = false + while (iterator.hasNext() && !contains) { + val next = iterator.next().value() + contains = (next.sameElements(headerValueBytes.asInstanceOf[Array[Byte]])) + } + contains + } else if (StringUtils.isNotEmpty(filter.getHeaderKey()) && StringUtils.isEmpty(filter.getHeaderValue())) { + record.headers().headers(filter.getHeaderKey()).iterator().hasNext() + } else { + true + } + } + case FilterType.NONE => true + } + } + var terminate: Boolean = (startOffTable == endOffTable) val res = new util.LinkedList[ConsumerRecord[Array[Byte], Array[Byte]]]() + // 检索的消息条数 + var searchNums = 0 // 如果最小和最大偏移一致,就结束 if (!terminate) { @@ -51,7 +90,7 @@ class MessageConsole(config: KafkaConfig) extends KafkaConsole(config: KafkaConf // 1.所有查询分区达都到最大偏移的时候 while (!terminate) { // 达到查询的最大条数 - if (res.size() >= maxNums) { + if (searchNums >= maxNums) { terminate = true } else { val records = consumer.poll(Duration.ofMillis(timeoutMs)) @@ -67,6 +106,7 @@ class MessageConsole(config: KafkaConfig) extends KafkaConsole(config: KafkaConf if (first.offset() >= endOff) { arrive.remove(tp) } else { + searchNums += recordList.size() // // (String topic, // int partition, @@ -80,7 +120,7 @@ class MessageConsole(config: KafkaConfig) extends KafkaConsole(config: KafkaConf // V value, // Headers headers, // Optional leaderEpoch) - val nullVList = recordList.asScala.map(record => new ConsumerRecord[Array[Byte], Array[Byte]](record.topic(), + val nullVList = recordList.asScala.filter(filterMessage(_)).map(record => new ConsumerRecord[Array[Byte], Array[Byte]](record.topic(), record.partition(), record.offset(), record.timestamp(), diff --git a/ui/src/views/message/SearchByTime.vue b/ui/src/views/message/SearchByTime.vue index 92fa3fb..f3d1360 100644 --- a/ui/src/views/message/SearchByTime.vue +++ b/ui/src/views/message/SearchByTime.vue @@ -84,7 +84,7 @@ @@ -92,7 +92,7 @@