消息查询过滤

This commit is contained in:
许晓东
2021-12-30 14:17:47 +08:00
parent ad39f4e82c
commit 02abe67fce
3 changed files with 18 additions and 10 deletions

View File

@@ -79,7 +79,7 @@ public class MessageServiceImpl implements MessageService, ApplicationContextAwa
public static String defaultDeserializer = "String";
@Override public ResponseData searchByTime(QueryMessage queryMessage) {
int maxNums = 10000;
int maxNums = 5000;
Object searchContent = null;
String headerKey = null;
@@ -139,14 +139,17 @@ public class MessageServiceImpl implements MessageService, ApplicationContextAwa
Set<TopicPartition> partitions = getPartitions(queryMessage);
long startTime = System.currentTimeMillis();
List<ConsumerRecord<byte[], byte[]>> records = messageConsole.searchBy(partitions, queryMessage.getStartTime(), queryMessage.getEndTime(), maxNums, filter);
Tuple2<List<ConsumerRecord<byte[], byte[]>>, Object> tuple2 = messageConsole.searchBy(partitions, queryMessage.getStartTime(), queryMessage.getEndTime(), maxNums, filter);
List<ConsumerRecord<byte[], byte[]>> records = tuple2._1();
log.info("search message by time, cost time: {}", (System.currentTimeMillis() - startTime));
List<ConsumerRecordVO> vos = records.stream().filter(record -> record.timestamp() <= queryMessage.getEndTime())
.map(ConsumerRecordVO::fromConsumerRecord).collect(Collectors.toList());
Map<String, Object> res = new HashMap<>();
vos = vos.subList(0, Math.min(maxNums, vos.size()));
res.put("maxNum", maxNums);
res.put("realNum", vos.size());
res.put("data", vos.subList(0, Math.min(maxNums, vos.size())));
res.put("searchNum", tuple2._2());
res.put("data", vos);
return ResponseData.create().data(res).success();
}

View File

@@ -23,7 +23,7 @@ import scala.jdk.CollectionConverters.{CollectionHasAsScala, MapHasAsScala, SeqH
class MessageConsole(config: KafkaConfig) extends KafkaConsole(config: KafkaConfig) with Logging {
def searchBy(partitions: util.Collection[TopicPartition], startTime: Long, endTime: Long,
maxNums: Int, filter: MessageFilter): util.List[ConsumerRecord[Array[Byte], Array[Byte]]] = {
maxNums: Int, filter: MessageFilter): (util.List[ConsumerRecord[Array[Byte], Array[Byte]]], Int) = {
var startOffTable: immutable.Map[TopicPartition, Long] = Map.empty
var endOffTable: immutable.Map[TopicPartition, Long] = Map.empty
withAdminClientAndCatchError(admin => {
@@ -154,7 +154,7 @@ class MessageConsole(config: KafkaConfig) extends KafkaConsole(config: KafkaConf
})
}
res
(res, searchNums)
}
def searchBy(