增加消息根据时间范围发送统计的查询.

This commit is contained in:
许晓东
2023-12-03 19:24:16 +08:00
parent dc84144443
commit e368ba9527
9 changed files with 450 additions and 35 deletions

View File

@@ -0,0 +1,23 @@
package com.xuxd.kafka.console.beans.dto;
import lombok.Data;
import java.util.Date;
import java.util.Set;
/**
* 发送统计查询请求,指定时间段内,发送了多少消息.
* @author: xuxd
* @since: 2023/12/1 22:01
**/
@Data
public class QuerySendStatisticsDTO {
private String topic;
private Set<Integer> partition;
private Date startTime;
private Date endTime;
}

View File

@@ -0,0 +1,36 @@
package com.xuxd.kafka.console.beans.vo;
import lombok.Data;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Map;
/**
* @author: xuxd
* @since: 2023/12/1 17:49
**/
@Data
public class QuerySendStatisticsVO {
private static final String FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'";
private static final DateFormat DATE_FORMAT = new SimpleDateFormat(FORMAT);
private String topic;
private Long total;
private Map<Integer, Long> detail;
private String startTime;
private String endTime;
private String searchTime = format(new Date());
public static String format(Date date) {
return DATE_FORMAT.format(date);
}
}

View File

@@ -6,8 +6,10 @@ import com.xuxd.kafka.console.beans.QueryMessage;
import com.xuxd.kafka.console.beans.ResponseData;
import com.xuxd.kafka.console.beans.SendMessage;
import com.xuxd.kafka.console.beans.dto.QueryMessageDTO;
import com.xuxd.kafka.console.beans.dto.QuerySendStatisticsDTO;
import com.xuxd.kafka.console.service.MessageService;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
@@ -72,4 +74,12 @@ public class MessageController {
}
return messageService.delete(messages);
}
@PostMapping("/send/statistics")
public Object sendStatistics(@RequestBody QuerySendStatisticsDTO dto) {
if (StringUtils.isEmpty(dto.getTopic())) {
return ResponseData.create().failed("Topic is null");
}
return messageService.sendStatisticsByTime(dto);
}
}

View File

@@ -1,6 +1,7 @@
package com.xuxd.kafka.console.service;
import com.xuxd.kafka.console.beans.QueryMessage;
import com.xuxd.kafka.console.beans.dto.QuerySendStatisticsDTO;
import com.xuxd.kafka.console.beans.ResponseData;
import com.xuxd.kafka.console.beans.SendMessage;
@@ -29,4 +30,6 @@ public interface MessageService {
ResponseData resend(SendMessage message);
ResponseData delete(List<QueryMessage> messages);
ResponseData sendStatisticsByTime(QuerySendStatisticsDTO request);
}

View File

@@ -4,20 +4,13 @@ import com.xuxd.kafka.console.beans.MessageFilter;
import com.xuxd.kafka.console.beans.QueryMessage;
import com.xuxd.kafka.console.beans.ResponseData;
import com.xuxd.kafka.console.beans.SendMessage;
import com.xuxd.kafka.console.beans.dto.QuerySendStatisticsDTO;
import com.xuxd.kafka.console.beans.enums.FilterType;
import com.xuxd.kafka.console.beans.vo.ConsumerRecordVO;
import com.xuxd.kafka.console.beans.vo.MessageDetailVO;
import com.xuxd.kafka.console.beans.vo.QuerySendStatisticsVO;
import com.xuxd.kafka.console.service.ConsumerService;
import com.xuxd.kafka.console.service.MessageService;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import kafka.console.ConsumerConsole;
import kafka.console.MessageConsole;
import kafka.console.TopicConsole;
@@ -29,14 +22,7 @@ import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.BytesDeserializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.DoubleDeserializer;
import org.apache.kafka.common.serialization.FloatDeserializer;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.*;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
@@ -44,6 +30,9 @@ import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Service;
import scala.Tuple2;
import java.util.*;
import java.util.stream.Collectors;
/**
* kafka-console-ui.
*
@@ -79,7 +68,8 @@ public class MessageServiceImpl implements MessageService, ApplicationContextAwa
public static String defaultDeserializer = "String";
@Override public ResponseData searchByTime(QueryMessage queryMessage) {
@Override
public ResponseData searchByTime(QueryMessage queryMessage) {
int maxNums = 5000;
Object searchContent = null;
@@ -144,7 +134,7 @@ public class MessageServiceImpl implements MessageService, ApplicationContextAwa
List<ConsumerRecord<byte[], byte[]>> records = tuple2._1();
log.info("search message by time, cost time: {}", (System.currentTimeMillis() - startTime));
List<ConsumerRecordVO> vos = records.stream().filter(record -> record.timestamp() <= queryMessage.getEndTime())
.map(ConsumerRecordVO::fromConsumerRecord).collect(Collectors.toList());
.map(ConsumerRecordVO::fromConsumerRecord).collect(Collectors.toList());
Map<String, Object> res = new HashMap<>();
vos = vos.subList(0, Math.min(maxNums, vos.size()));
res.put("maxNum", maxNums);
@@ -154,13 +144,15 @@ public class MessageServiceImpl implements MessageService, ApplicationContextAwa
return ResponseData.create().data(res).success();
}
@Override public ResponseData searchByOffset(QueryMessage queryMessage) {
@Override
public ResponseData searchByOffset(QueryMessage queryMessage) {
Map<TopicPartition, ConsumerRecord<byte[], byte[]>> recordMap = searchRecordByOffset(queryMessage);
return ResponseData.create().data(recordMap.values().stream().map(ConsumerRecordVO::fromConsumerRecord).collect(Collectors.toList())).success();
}
@Override public ResponseData searchDetail(QueryMessage queryMessage) {
@Override
public ResponseData searchDetail(QueryMessage queryMessage) {
if (queryMessage.getPartition() == -1) {
throw new IllegalArgumentException();
}
@@ -219,18 +211,21 @@ public class MessageServiceImpl implements MessageService, ApplicationContextAwa
return ResponseData.create().failed("Not found message detail.");
}
@Override public ResponseData deserializerList() {
@Override
public ResponseData deserializerList() {
return ResponseData.create().data(deserializerDict.keySet()).success();
}
@Override public ResponseData send(SendMessage message) {
@Override
public ResponseData send(SendMessage message) {
messageConsole.send(message.getTopic(), message.getPartition(), message.getKey(), message.getBody(), message.getNum());
return ResponseData.create().success();
}
@Override public ResponseData sendWithHeader(SendMessage message) {
String[] headerKeys= message.getHeaders().stream().map(SendMessage.Header::getHeaderKey).toArray(String[]::new);
String[] headerValues= message.getHeaders().stream().map(SendMessage.Header::getHeaderValue).toArray(String[]::new);
@Override
public ResponseData sendWithHeader(SendMessage message) {
String[] headerKeys = message.getHeaders().stream().map(SendMessage.Header::getHeaderKey).toArray(String[]::new);
String[] headerValues = message.getHeaders().stream().map(SendMessage.Header::getHeaderValue).toArray(String[]::new);
// log.info("send with header:keys{},values{}",headerKeys, headerValues);
Tuple2<Object, String> tuple2 = messageConsole.send(message.getTopic(),
message.getPartition(),
@@ -240,10 +235,11 @@ public class MessageServiceImpl implements MessageService, ApplicationContextAwa
headerKeys,
headerValues,
message.isSync());
return (boolean)tuple2._1 ? ResponseData.create().success() : ResponseData.create().failed(tuple2._2);
return (boolean) tuple2._1 ? ResponseData.create().success() : ResponseData.create().failed(tuple2._2);
}
@Override public ResponseData resend(SendMessage message) {
@Override
public ResponseData resend(SendMessage message) {
TopicPartition partition = new TopicPartition(message.getTopic(), message.getPartition());
Map<TopicPartition, Object> offsetTable = new HashMap<>(1, 1.0f);
offsetTable.put(partition, message.getOffset());
@@ -270,6 +266,44 @@ public class MessageServiceImpl implements MessageService, ApplicationContextAwa
return success ? ResponseData.create().success() : ResponseData.create().failed(tuple2._2());
}
@Override
public ResponseData sendStatisticsByTime(QuerySendStatisticsDTO request) {
if (request.getPartition() != null && request.getPartition().contains(-1)) {
request.setPartition(Collections.emptySet());
}
Map<Integer, Long> startOffsetMap = topicConsole.getOffsetForTimestamp(request.getTopic(), request.getStartTime().getTime()).
entrySet().stream().collect(Collectors.toMap(e -> e.getKey().partition(), Map.Entry::getValue, (e1, e2) -> e2));
Map<Integer, Long> endOffsetMap = topicConsole.getOffsetForTimestamp(request.getTopic(), request.getEndTime().getTime()).
entrySet().stream().collect(Collectors.toMap(e -> e.getKey().partition(), Map.Entry::getValue, (e1, e2) -> e2));
Map<Integer, Long> diffOffsetMap = endOffsetMap.entrySet().stream().
collect(Collectors.toMap(e -> e.getKey(),
e -> Arrays.asList(e.getValue(), startOffsetMap.getOrDefault(e.getKey(), 0L)).
stream().reduce((a, b) -> a - b).get()));
if (CollectionUtils.isNotEmpty(request.getPartition())) {
Iterator<Map.Entry<Integer, Long>> iterator = diffOffsetMap.entrySet().iterator();
while (iterator.hasNext()) {
Integer partition = iterator.next().getKey();
if (!request.getPartition().contains(partition)) {
iterator.remove();
}
}
}
Long total = diffOffsetMap.values().stream().reduce(0L, (a, b) -> a + b);
QuerySendStatisticsVO vo = new QuerySendStatisticsVO();
vo.setTopic(request.getTopic());
vo.setTotal(total);
vo.setDetail(diffOffsetMap);
vo.setStartTime(QuerySendStatisticsVO.format(request.getStartTime()));
vo.setEndTime(QuerySendStatisticsVO.format(request.getEndTime()));
return ResponseData.create().data(vo).success();
}
private Map<TopicPartition, ConsumerRecord<byte[], byte[]>> searchRecordByOffset(QueryMessage queryMessage) {
Set<TopicPartition> partitions = getPartitions(queryMessage);
@@ -291,13 +325,14 @@ public class MessageServiceImpl implements MessageService, ApplicationContextAwa
throw new IllegalArgumentException("Can not find topic info.");
}
Set<TopicPartition> set = list.get(0).partitions().stream()
.map(tp -> new TopicPartition(queryMessage.getTopic(), tp.partition())).collect(Collectors.toSet());
.map(tp -> new TopicPartition(queryMessage.getTopic(), tp.partition())).collect(Collectors.toSet());
partitions.addAll(set);
}
return partitions;
}
@Override public void setApplicationContext(ApplicationContext context) throws BeansException {
@Override
public void setApplicationContext(ApplicationContext context) throws BeansException {
this.applicationContext = context;
}
}