发送统计
This commit is contained in:
@@ -88,4 +88,9 @@ public class TopicController {
|
||||
public Object configThrottle(@RequestBody TopicThrottleDTO dto) {
|
||||
return topicService.configThrottle(dto.getTopic(), dto.getPartitions(), dto.getOperation());
|
||||
}
|
||||
|
||||
@GetMapping("/send/stats")
|
||||
public Object sendStats(@RequestParam String topic) {
|
||||
return topicService.sendStats(topic);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -5,8 +5,6 @@ import com.xuxd.kafka.console.beans.ResponseData;
|
||||
import com.xuxd.kafka.console.beans.enums.TopicThrottleSwitch;
|
||||
import com.xuxd.kafka.console.beans.enums.TopicType;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Properties;
|
||||
import org.apache.kafka.clients.admin.NewTopic;
|
||||
|
||||
/**
|
||||
@@ -33,5 +31,7 @@ public interface TopicService {
|
||||
|
||||
ResponseData updateReplicaAssignment(ReplicaAssignment assignment);
|
||||
|
||||
ResponseData configThrottle(String topic, List<Integer> partitions, TopicThrottleSwitch throttleSwitch);
|
||||
ResponseData configThrottle(String topic, List<Integer> partitions, TopicThrottleSwitch throttleSwitch);
|
||||
|
||||
ResponseData sendStats(String topic);
|
||||
}
|
||||
|
||||
@@ -9,6 +9,7 @@ import com.xuxd.kafka.console.beans.vo.TopicDescriptionVO;
|
||||
import com.xuxd.kafka.console.beans.vo.TopicPartitionVO;
|
||||
import com.xuxd.kafka.console.service.TopicService;
|
||||
import com.xuxd.kafka.console.utils.GsonUtil;
|
||||
import java.util.Calendar;
|
||||
import java.util.Collections;
|
||||
import java.util.Comparator;
|
||||
import java.util.HashMap;
|
||||
@@ -16,6 +17,7 @@ import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.stream.Collectors;
|
||||
import kafka.console.TopicConsole;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
@@ -150,7 +152,8 @@ public class TopicServiceImpl implements TopicService {
|
||||
return success ? ResponseData.create().success() : ResponseData.create().failed(tuple2._2());
|
||||
}
|
||||
|
||||
@Override public ResponseData configThrottle(String topic, List<Integer> partitions, TopicThrottleSwitch throttleSwitch) {
|
||||
@Override
|
||||
public ResponseData configThrottle(String topic, List<Integer> partitions, TopicThrottleSwitch throttleSwitch) {
|
||||
Tuple2<Object, String> tuple2 = null;
|
||||
switch (throttleSwitch) {
|
||||
case ON:
|
||||
@@ -165,4 +168,62 @@ public class TopicServiceImpl implements TopicService {
|
||||
boolean success = (boolean) tuple2._1();
|
||||
return success ? ResponseData.create().success() : ResponseData.create().failed(tuple2._2());
|
||||
}
|
||||
|
||||
@Override public ResponseData sendStats(String topic) {
|
||||
Calendar calendar = Calendar.getInstance();
|
||||
long current = calendar.getTimeInMillis();
|
||||
|
||||
calendar.set(Calendar.HOUR_OF_DAY, 0);
|
||||
calendar.set(Calendar.MINUTE, 0);
|
||||
calendar.set(Calendar.SECOND, 0);
|
||||
calendar.set(Calendar.MILLISECOND, 0);
|
||||
long today = calendar.getTimeInMillis();
|
||||
|
||||
calendar.add(Calendar.DAY_OF_MONTH, -1);
|
||||
long yesterday = calendar.getTimeInMillis();
|
||||
|
||||
Map<TopicPartition, Long> currentOffset = topicConsole.getOffsetForTimestamp(topic, current);
|
||||
Map<TopicPartition, Long> todayOffset = topicConsole.getOffsetForTimestamp(topic, today);
|
||||
Map<TopicPartition, Long> yesterdayOffset = topicConsole.getOffsetForTimestamp(topic, yesterday);
|
||||
|
||||
Map<String, Object> res = new HashMap<>();
|
||||
|
||||
// 昨天的消息数是今天减去昨天的
|
||||
AtomicLong yesterdayTotal = new AtomicLong(0L), todayTotal = new AtomicLong(0L);
|
||||
Map<Integer, Long> yesterdayDetail = new HashMap<>(), todayDetail = new HashMap<>();
|
||||
todayOffset.forEach(((partition, aLong) -> {
|
||||
Long last = yesterdayOffset.get(partition);
|
||||
long diff = last == null ? aLong : aLong - last;
|
||||
yesterdayDetail.put(partition.partition(), diff);
|
||||
yesterdayTotal.addAndGet(diff);
|
||||
}));
|
||||
currentOffset.forEach(((partition, aLong) -> {
|
||||
Long last = todayOffset.get(partition);
|
||||
long diff = last == null ? aLong : aLong - last;
|
||||
todayDetail.put(partition.partition(), diff);
|
||||
todayTotal.addAndGet(diff);
|
||||
}));
|
||||
|
||||
Map<String, Object> yes = new HashMap<>(), to = new HashMap<>();
|
||||
yes.put("detail", convertList(yesterdayDetail));
|
||||
yes.put("total", yesterdayTotal.get());
|
||||
to.put("detail", convertList(todayDetail));
|
||||
to.put("total", todayTotal.get());
|
||||
|
||||
res.put("yesterday", yes);
|
||||
res.put("today", to);
|
||||
// 今天的消息数是现在减去今天0时的
|
||||
return ResponseData.create().data(res).success();
|
||||
}
|
||||
|
||||
private List<Map<String, Object>> convertList(Map<Integer, Long> source) {
|
||||
List<Map<String, Object>> collect = source.entrySet().stream().map(entry -> {
|
||||
Map<String, Object> map = new HashMap<>(3, 1.0f);
|
||||
map.put("partition", entry.getKey());
|
||||
map.put("num", entry.getValue());
|
||||
return map;
|
||||
}).collect(Collectors.toList());
|
||||
|
||||
return collect;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,15 +1,20 @@
|
||||
package kafka.console
|
||||
|
||||
import java.util.Properties
|
||||
|
||||
import com.xuxd.kafka.console.config.KafkaConfig
|
||||
import kafka.zk.{AdminZkClient, KafkaZkClient}
|
||||
import org.apache.kafka.clients.CommonClientConfigs
|
||||
import org.apache.kafka.clients.admin.{AbstractOptions, Admin, AdminClientConfig}
|
||||
import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}
|
||||
import org.apache.kafka.clients.admin._
|
||||
import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer, OffsetAndMetadata}
|
||||
import org.apache.kafka.common.TopicPartition
|
||||
import org.apache.kafka.common.config.SaslConfigs
|
||||
import org.apache.kafka.common.requests.ListOffsetsResponse
|
||||
import org.apache.kafka.common.serialization.ByteArrayDeserializer
|
||||
import org.apache.kafka.common.utils.Time
|
||||
import org.slf4j.{Logger, LoggerFactory}
|
||||
|
||||
import java.util.Properties
|
||||
import scala.collection.{Map, Seq}
|
||||
import scala.jdk.CollectionConverters.{MapHasAsJava, MapHasAsScala}
|
||||
|
||||
/**
|
||||
* kafka-console-ui.
|
||||
@@ -89,3 +94,57 @@ class KafkaConsole(config: KafkaConfig) {
|
||||
props
|
||||
}
|
||||
}
|
||||
|
||||
object KafkaConsole {
|
||||
val log: Logger = LoggerFactory.getLogger(this.getClass)
|
||||
|
||||
def getCommittedOffsets(admin: Admin, groupId: String,
|
||||
timeoutMs: Integer): Map[TopicPartition, OffsetAndMetadata] = {
|
||||
admin.listConsumerGroupOffsets(
|
||||
groupId, new ListConsumerGroupOffsetsOptions().timeoutMs(timeoutMs)
|
||||
).partitionsToOffsetAndMetadata.get.asScala
|
||||
}
|
||||
|
||||
def getLogTimestampOffsets(admin: Admin, topicPartitions: Seq[TopicPartition],
|
||||
timestamp: java.lang.Long, timeoutMs: Integer): Map[TopicPartition, OffsetAndMetadata] = {
|
||||
val timestampOffsets = topicPartitions.map { topicPartition =>
|
||||
topicPartition -> OffsetSpec.forTimestamp(timestamp)
|
||||
}.toMap
|
||||
val offsets = admin.listOffsets(
|
||||
timestampOffsets.asJava,
|
||||
new ListOffsetsOptions().timeoutMs(timeoutMs)
|
||||
).all.get
|
||||
val (successfulOffsetsForTimes, unsuccessfulOffsetsForTimes) =
|
||||
offsets.asScala.partition(_._2.offset != ListOffsetsResponse.UNKNOWN_OFFSET)
|
||||
|
||||
val successfulLogTimestampOffsets = successfulOffsetsForTimes.map {
|
||||
case (topicPartition, listOffsetsResultInfo) => topicPartition -> new OffsetAndMetadata(listOffsetsResultInfo.offset)
|
||||
}.toMap
|
||||
|
||||
unsuccessfulOffsetsForTimes.foreach { entry =>
|
||||
log.warn(s"\nWarn: Partition " + entry._1.partition() + " from topic " + entry._1.topic() +
|
||||
" is empty. Falling back to latest known offset.")
|
||||
}
|
||||
|
||||
successfulLogTimestampOffsets ++ getLogEndOffsets(admin, unsuccessfulOffsetsForTimes.keySet.toSeq, timeoutMs)
|
||||
}
|
||||
|
||||
def getLogEndOffsets(admin: Admin,
|
||||
topicPartitions: Seq[TopicPartition], timeoutMs: Integer): Predef.Map[TopicPartition, OffsetAndMetadata] = {
|
||||
val endOffsets = topicPartitions.map { topicPartition =>
|
||||
topicPartition -> OffsetSpec.latest
|
||||
}.toMap
|
||||
val offsets = admin.listOffsets(
|
||||
endOffsets.asJava,
|
||||
new ListOffsetsOptions().timeoutMs(timeoutMs)
|
||||
).all.get
|
||||
val res = topicPartitions.map { topicPartition =>
|
||||
Option(offsets.get(topicPartition)) match {
|
||||
case Some(listOffsetsResultInfo) => topicPartition -> new OffsetAndMetadata(listOffsetsResultInfo.offset)
|
||||
case _ =>
|
||||
throw new IllegalArgumentException
|
||||
}
|
||||
}.toMap
|
||||
res
|
||||
}
|
||||
}
|
||||
|
||||
@@ -234,6 +234,21 @@ class TopicConsole(config: KafkaConfig) extends KafkaConsole(config: KafkaConfig
|
||||
}).asInstanceOf[(Boolean, String)]
|
||||
}
|
||||
|
||||
def getOffsetForTimestamp(topic: String, timestamp: java.lang.Long): util.Map[TopicPartition, java.lang.Long] = {
|
||||
withAdminClientAndCatchError(admin => {
|
||||
val partitions = describeTopics(admin, Collections.singleton(topic)).get(topic) match {
|
||||
case Some(topicDescription: TopicDescription) => topicDescription.partitions()
|
||||
.asScala.map(info => new TopicPartition(topic, info.partition())).toSeq
|
||||
case None => throw new IllegalArgumentException("topic is not exist.")
|
||||
}
|
||||
val offsetMap = KafkaConsole.getLogTimestampOffsets(admin, partitions, timestamp, timeoutMs)
|
||||
offsetMap.map(tuple2 => (tuple2._1, tuple2._2.offset())).toMap.asJava
|
||||
}, e => {
|
||||
log.error("clearThrottle error, ", e)
|
||||
Collections.emptyMap()
|
||||
}).asInstanceOf[util.Map[TopicPartition, java.lang.Long]]
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the current replica assignments for some topics.
|
||||
*
|
||||
|
||||
Reference in New Issue
Block a user