消息查询过滤
This commit is contained in:
@@ -0,0 +1,73 @@
|
||||
package com.xuxd.kafka.console.beans;
|
||||
|
||||
import com.xuxd.kafka.console.beans.enums.FilterType;
|
||||
import org.apache.kafka.common.serialization.Deserializer;
|
||||
|
||||
/**
|
||||
* kafka-console-ui.
|
||||
*
|
||||
* @author xuxd
|
||||
* @date 2021-12-29 15:30:08
|
||||
**/
|
||||
public class MessageFilter {
|
||||
|
||||
private FilterType filterType = FilterType.NONE;
|
||||
|
||||
private Object searchContent = null;
|
||||
|
||||
private String headerKey = null;
|
||||
|
||||
private String headerValue = null;
|
||||
|
||||
private Deserializer deserializer = null;
|
||||
|
||||
private boolean isContainsValue = false;
|
||||
|
||||
public FilterType getFilterType() {
|
||||
return filterType;
|
||||
}
|
||||
|
||||
public void setFilterType(FilterType filterType) {
|
||||
this.filterType = filterType;
|
||||
}
|
||||
|
||||
public Object getSearchContent() {
|
||||
return searchContent;
|
||||
}
|
||||
|
||||
public void setSearchContent(Object searchContent) {
|
||||
this.searchContent = searchContent;
|
||||
}
|
||||
|
||||
public String getHeaderKey() {
|
||||
return headerKey;
|
||||
}
|
||||
|
||||
public void setHeaderKey(String headerKey) {
|
||||
this.headerKey = headerKey;
|
||||
}
|
||||
|
||||
public String getHeaderValue() {
|
||||
return headerValue;
|
||||
}
|
||||
|
||||
public void setHeaderValue(String headerValue) {
|
||||
this.headerValue = headerValue;
|
||||
}
|
||||
|
||||
public Deserializer getDeserializer() {
|
||||
return deserializer;
|
||||
}
|
||||
|
||||
public void setDeserializer(Deserializer deserializer) {
|
||||
this.deserializer = deserializer;
|
||||
}
|
||||
|
||||
public boolean isContainsValue() {
|
||||
return isContainsValue;
|
||||
}
|
||||
|
||||
public void setContainsValue(boolean containsValue) {
|
||||
isContainsValue = containsValue;
|
||||
}
|
||||
}
|
||||
@@ -1,5 +1,6 @@
|
||||
package com.xuxd.kafka.console.beans;
|
||||
|
||||
import com.xuxd.kafka.console.beans.enums.FilterType;
|
||||
import lombok.Data;
|
||||
|
||||
/**
|
||||
@@ -24,4 +25,12 @@ public class QueryMessage {
|
||||
private String keyDeserializer;
|
||||
|
||||
private String valueDeserializer;
|
||||
|
||||
private FilterType filter;
|
||||
|
||||
private String value;
|
||||
|
||||
private String headerKey;
|
||||
|
||||
private String headerValue;
|
||||
}
|
||||
|
||||
@@ -1,8 +1,10 @@
|
||||
package com.xuxd.kafka.console.beans.dto;
|
||||
|
||||
import com.xuxd.kafka.console.beans.QueryMessage;
|
||||
import com.xuxd.kafka.console.beans.enums.FilterType;
|
||||
import java.util.Date;
|
||||
import lombok.Data;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
|
||||
/**
|
||||
* kafka-console-ui.
|
||||
@@ -27,6 +29,14 @@ public class QueryMessageDTO {
|
||||
|
||||
private String valueDeserializer;
|
||||
|
||||
private String filter;
|
||||
|
||||
private String value;
|
||||
|
||||
private String headerKey;
|
||||
|
||||
private String headerValue;
|
||||
|
||||
public QueryMessage toQueryMessage() {
|
||||
QueryMessage queryMessage = new QueryMessage();
|
||||
queryMessage.setTopic(topic);
|
||||
@@ -45,6 +55,21 @@ public class QueryMessageDTO {
|
||||
queryMessage.setKeyDeserializer(keyDeserializer);
|
||||
queryMessage.setValueDeserializer(valueDeserializer);
|
||||
|
||||
if (StringUtils.isNotBlank(filter)) {
|
||||
queryMessage.setFilter(FilterType.valueOf(filter.toUpperCase()));
|
||||
} else {
|
||||
queryMessage.setFilter(FilterType.NONE);
|
||||
}
|
||||
if (StringUtils.isNotBlank(value)) {
|
||||
queryMessage.setValue(value.trim());
|
||||
}
|
||||
if (StringUtils.isNotBlank(headerKey)) {
|
||||
queryMessage.setHeaderKey(headerKey.trim());
|
||||
}
|
||||
if (StringUtils.isNotBlank(headerValue)) {
|
||||
queryMessage.setHeaderValue(headerValue.trim());
|
||||
}
|
||||
|
||||
return queryMessage;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,11 @@
|
||||
package com.xuxd.kafka.console.beans.enums;
|
||||
|
||||
/**
|
||||
* kafka-console-ui.
|
||||
*
|
||||
* @author xuxd
|
||||
* @date 2021-12-29 14:36:01
|
||||
**/
|
||||
public enum FilterType {
|
||||
NONE, BODY, HEADER
|
||||
}
|
||||
@@ -1,8 +1,10 @@
|
||||
package com.xuxd.kafka.console.service.impl;
|
||||
|
||||
import com.xuxd.kafka.console.beans.MessageFilter;
|
||||
import com.xuxd.kafka.console.beans.QueryMessage;
|
||||
import com.xuxd.kafka.console.beans.ResponseData;
|
||||
import com.xuxd.kafka.console.beans.SendMessage;
|
||||
import com.xuxd.kafka.console.beans.enums.FilterType;
|
||||
import com.xuxd.kafka.console.beans.vo.ConsumerRecordVO;
|
||||
import com.xuxd.kafka.console.beans.vo.MessageDetailVO;
|
||||
import com.xuxd.kafka.console.service.ConsumerService;
|
||||
@@ -32,6 +34,7 @@ import org.apache.kafka.common.serialization.Deserializer;
|
||||
import org.apache.kafka.common.serialization.DoubleDeserializer;
|
||||
import org.apache.kafka.common.serialization.FloatDeserializer;
|
||||
import org.apache.kafka.common.serialization.IntegerDeserializer;
|
||||
import org.apache.kafka.common.serialization.LongDeserializer;
|
||||
import org.apache.kafka.common.serialization.StringDeserializer;
|
||||
import org.springframework.beans.BeansException;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
@@ -70,6 +73,7 @@ public class MessageServiceImpl implements MessageService, ApplicationContextAwa
|
||||
deserializerDict.put("Float", new FloatDeserializer());
|
||||
deserializerDict.put("Double", new DoubleDeserializer());
|
||||
deserializerDict.put("Byte", new BytesDeserializer());
|
||||
deserializerDict.put("Long", new LongDeserializer());
|
||||
}
|
||||
|
||||
public static String defaultDeserializer = "String";
|
||||
@@ -77,9 +81,65 @@ public class MessageServiceImpl implements MessageService, ApplicationContextAwa
|
||||
@Override public ResponseData searchByTime(QueryMessage queryMessage) {
|
||||
int maxNums = 10000;
|
||||
|
||||
Object searchContent = null;
|
||||
String headerKey = null;
|
||||
String headerValue = null;
|
||||
MessageFilter filter = new MessageFilter();
|
||||
switch (queryMessage.getFilter()) {
|
||||
case BODY:
|
||||
if (StringUtils.isBlank(queryMessage.getValue())) {
|
||||
queryMessage.setFilter(FilterType.NONE);
|
||||
} else {
|
||||
if (StringUtils.isBlank(queryMessage.getValueDeserializer())) {
|
||||
queryMessage.setValueDeserializer(defaultDeserializer);
|
||||
}
|
||||
switch (queryMessage.getValueDeserializer()) {
|
||||
case "String":
|
||||
searchContent = String.valueOf(queryMessage.getValue());
|
||||
filter.setContainsValue(true);
|
||||
break;
|
||||
case "Integer":
|
||||
searchContent = Integer.valueOf(queryMessage.getValue());
|
||||
break;
|
||||
case "Float":
|
||||
searchContent = Float.valueOf(queryMessage.getValue());
|
||||
break;
|
||||
case "Double":
|
||||
searchContent = Double.valueOf(queryMessage.getValue());
|
||||
break;
|
||||
case "Long":
|
||||
searchContent = Long.valueOf(queryMessage.getValue());
|
||||
break;
|
||||
default:
|
||||
throw new IllegalArgumentException("Message body type not support.");
|
||||
}
|
||||
}
|
||||
break;
|
||||
case HEADER:
|
||||
headerKey = queryMessage.getHeaderKey();
|
||||
if (StringUtils.isBlank(headerKey)) {
|
||||
queryMessage.setFilter(FilterType.NONE);
|
||||
} else {
|
||||
if (StringUtils.isNotBlank(queryMessage.getHeaderValue())) {
|
||||
headerValue = String.valueOf(queryMessage.getHeaderValue());
|
||||
}
|
||||
}
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
|
||||
FilterType filterType = queryMessage.getFilter();
|
||||
Deserializer deserializer = deserializerDict.get(queryMessage.getValueDeserializer());
|
||||
filter.setFilterType(filterType);
|
||||
filter.setSearchContent(searchContent);
|
||||
filter.setDeserializer(deserializer);
|
||||
filter.setHeaderKey(headerKey);
|
||||
filter.setHeaderValue(headerValue);
|
||||
|
||||
Set<TopicPartition> partitions = getPartitions(queryMessage);
|
||||
long startTime = System.currentTimeMillis();
|
||||
List<ConsumerRecord<byte[], byte[]>> records = messageConsole.searchBy(partitions, queryMessage.getStartTime(), queryMessage.getEndTime(), maxNums);
|
||||
List<ConsumerRecord<byte[], byte[]>> records = messageConsole.searchBy(partitions, queryMessage.getStartTime(), queryMessage.getEndTime(), maxNums, filter);
|
||||
log.info("search message by time, cost time: {}", (System.currentTimeMillis() - startTime));
|
||||
List<ConsumerRecordVO> vos = records.stream().filter(record -> record.timestamp() <= queryMessage.getEndTime())
|
||||
.map(ConsumerRecordVO::fromConsumerRecord).collect(Collectors.toList());
|
||||
|
||||
@@ -1,6 +1,9 @@
|
||||
package kafka.console
|
||||
|
||||
import com.xuxd.kafka.console.beans.MessageFilter
|
||||
import com.xuxd.kafka.console.beans.enums.FilterType
|
||||
import com.xuxd.kafka.console.config.KafkaConfig
|
||||
import org.apache.commons.lang3.StringUtils
|
||||
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
|
||||
import org.apache.kafka.clients.producer.ProducerRecord
|
||||
import org.apache.kafka.common.TopicPartition
|
||||
@@ -20,7 +23,7 @@ import scala.jdk.CollectionConverters.{CollectionHasAsScala, MapHasAsScala, SeqH
|
||||
class MessageConsole(config: KafkaConfig) extends KafkaConsole(config: KafkaConfig) with Logging {
|
||||
|
||||
def searchBy(partitions: util.Collection[TopicPartition], startTime: Long, endTime: Long,
|
||||
maxNums: Int): util.List[ConsumerRecord[Array[Byte], Array[Byte]]] = {
|
||||
maxNums: Int, filter: MessageFilter): util.List[ConsumerRecord[Array[Byte], Array[Byte]]] = {
|
||||
var startOffTable: immutable.Map[TopicPartition, Long] = Map.empty
|
||||
var endOffTable: immutable.Map[TopicPartition, Long] = Map.empty
|
||||
withAdminClientAndCatchError(admin => {
|
||||
@@ -33,8 +36,44 @@ class MessageConsole(config: KafkaConfig) extends KafkaConsole(config: KafkaConf
|
||||
log.error("getLogTimestampOffsets error.", e)
|
||||
throw new RuntimeException("getLogTimestampOffsets error", e)
|
||||
})
|
||||
|
||||
val headerValueBytes = if (StringUtils.isNotEmpty(filter.getHeaderValue())) filter.getHeaderValue().getBytes() else None
|
||||
|
||||
def filterMessage(record: ConsumerRecord[Array[Byte], Array[Byte]]): Boolean = {
|
||||
filter.getFilterType() match {
|
||||
case FilterType.BODY => {
|
||||
val body = filter.getDeserializer().deserialize(record.topic(), record.value())
|
||||
var contains = false
|
||||
if (filter.isContainsValue) {
|
||||
contains = body.asInstanceOf[String].contains(filter.getSearchContent().asInstanceOf[String])
|
||||
} else {
|
||||
contains = body.equals(filter.getSearchContent)
|
||||
}
|
||||
contains
|
||||
}
|
||||
case FilterType.HEADER => {
|
||||
if (StringUtils.isNotEmpty(filter.getHeaderKey()) && StringUtils.isNotEmpty(filter.getHeaderValue())) {
|
||||
val iterator = record.headers().headers(filter.getHeaderKey()).iterator()
|
||||
var contains = false
|
||||
while (iterator.hasNext() && !contains) {
|
||||
val next = iterator.next().value()
|
||||
contains = (next.sameElements(headerValueBytes.asInstanceOf[Array[Byte]]))
|
||||
}
|
||||
contains
|
||||
} else if (StringUtils.isNotEmpty(filter.getHeaderKey()) && StringUtils.isEmpty(filter.getHeaderValue())) {
|
||||
record.headers().headers(filter.getHeaderKey()).iterator().hasNext()
|
||||
} else {
|
||||
true
|
||||
}
|
||||
}
|
||||
case FilterType.NONE => true
|
||||
}
|
||||
}
|
||||
|
||||
var terminate: Boolean = (startOffTable == endOffTable)
|
||||
val res = new util.LinkedList[ConsumerRecord[Array[Byte], Array[Byte]]]()
|
||||
// 检索的消息条数
|
||||
var searchNums = 0
|
||||
// 如果最小和最大偏移一致,就结束
|
||||
if (!terminate) {
|
||||
|
||||
@@ -51,7 +90,7 @@ class MessageConsole(config: KafkaConfig) extends KafkaConsole(config: KafkaConf
|
||||
// 1.所有查询分区达都到最大偏移的时候
|
||||
while (!terminate) {
|
||||
// 达到查询的最大条数
|
||||
if (res.size() >= maxNums) {
|
||||
if (searchNums >= maxNums) {
|
||||
terminate = true
|
||||
} else {
|
||||
val records = consumer.poll(Duration.ofMillis(timeoutMs))
|
||||
@@ -67,6 +106,7 @@ class MessageConsole(config: KafkaConfig) extends KafkaConsole(config: KafkaConf
|
||||
if (first.offset() >= endOff) {
|
||||
arrive.remove(tp)
|
||||
} else {
|
||||
searchNums += recordList.size()
|
||||
//
|
||||
// (String topic,
|
||||
// int partition,
|
||||
@@ -80,7 +120,7 @@ class MessageConsole(config: KafkaConfig) extends KafkaConsole(config: KafkaConf
|
||||
// V value,
|
||||
// Headers headers,
|
||||
// Optional<Integer> leaderEpoch)
|
||||
val nullVList = recordList.asScala.map(record => new ConsumerRecord[Array[Byte], Array[Byte]](record.topic(),
|
||||
val nullVList = recordList.asScala.filter(filterMessage(_)).map(record => new ConsumerRecord[Array[Byte], Array[Byte]](record.topic(),
|
||||
record.partition(),
|
||||
record.offset(),
|
||||
record.timestamp(),
|
||||
|
||||
@@ -84,7 +84,7 @@
|
||||
<a-form-item label="消息内容">
|
||||
<a-input
|
||||
class="msg-body"
|
||||
v-decorator="['body']"
|
||||
v-decorator="['value']"
|
||||
placeholder="请输入消息内容"
|
||||
/>
|
||||
</a-form-item>
|
||||
@@ -92,7 +92,7 @@
|
||||
<a-col :span="8">
|
||||
<a-form-item label="消息类型">
|
||||
<a-select
|
||||
v-decorator="['bodyType', { initialValue: 'String' }]"
|
||||
v-decorator="['valueDeserializer', { initialValue: 'String' }]"
|
||||
class="body-type"
|
||||
>
|
||||
<a-select-option
|
||||
|
||||
Reference in New Issue
Block a user