diff --git a/src/main/java/com/xuxd/kafka/console/controller/TopicController.java b/src/main/java/com/xuxd/kafka/console/controller/TopicController.java index 1051840..e775318 100644 --- a/src/main/java/com/xuxd/kafka/console/controller/TopicController.java +++ b/src/main/java/com/xuxd/kafka/console/controller/TopicController.java @@ -88,4 +88,9 @@ public class TopicController { public Object configThrottle(@RequestBody TopicThrottleDTO dto) { return topicService.configThrottle(dto.getTopic(), dto.getPartitions(), dto.getOperation()); } + + @GetMapping("/send/stats") + public Object sendStats(@RequestParam String topic) { + return topicService.sendStats(topic); + } } diff --git a/src/main/java/com/xuxd/kafka/console/service/TopicService.java b/src/main/java/com/xuxd/kafka/console/service/TopicService.java index dd0f584..2255e7a 100644 --- a/src/main/java/com/xuxd/kafka/console/service/TopicService.java +++ b/src/main/java/com/xuxd/kafka/console/service/TopicService.java @@ -5,8 +5,6 @@ import com.xuxd.kafka.console.beans.ResponseData; import com.xuxd.kafka.console.beans.enums.TopicThrottleSwitch; import com.xuxd.kafka.console.beans.enums.TopicType; import java.util.List; -import java.util.Map; -import java.util.Properties; import org.apache.kafka.clients.admin.NewTopic; /** @@ -33,5 +31,7 @@ public interface TopicService { ResponseData updateReplicaAssignment(ReplicaAssignment assignment); - ResponseData configThrottle(String topic, List partitions, TopicThrottleSwitch throttleSwitch); + ResponseData configThrottle(String topic, List partitions, TopicThrottleSwitch throttleSwitch); + + ResponseData sendStats(String topic); } diff --git a/src/main/java/com/xuxd/kafka/console/service/impl/TopicServiceImpl.java b/src/main/java/com/xuxd/kafka/console/service/impl/TopicServiceImpl.java index 0c3f0e4..f74b97d 100644 --- a/src/main/java/com/xuxd/kafka/console/service/impl/TopicServiceImpl.java +++ b/src/main/java/com/xuxd/kafka/console/service/impl/TopicServiceImpl.java @@ -9,6 +9,7 @@ import com.xuxd.kafka.console.beans.vo.TopicDescriptionVO; import com.xuxd.kafka.console.beans.vo.TopicPartitionVO; import com.xuxd.kafka.console.service.TopicService; import com.xuxd.kafka.console.utils.GsonUtil; +import java.util.Calendar; import java.util.Collections; import java.util.Comparator; import java.util.HashMap; @@ -16,6 +17,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; import kafka.console.TopicConsole; import lombok.extern.slf4j.Slf4j; @@ -150,7 +152,8 @@ public class TopicServiceImpl implements TopicService { return success ? ResponseData.create().success() : ResponseData.create().failed(tuple2._2()); } - @Override public ResponseData configThrottle(String topic, List partitions, TopicThrottleSwitch throttleSwitch) { + @Override + public ResponseData configThrottle(String topic, List partitions, TopicThrottleSwitch throttleSwitch) { Tuple2 tuple2 = null; switch (throttleSwitch) { case ON: @@ -165,4 +168,62 @@ public class TopicServiceImpl implements TopicService { boolean success = (boolean) tuple2._1(); return success ? ResponseData.create().success() : ResponseData.create().failed(tuple2._2()); } + + @Override public ResponseData sendStats(String topic) { + Calendar calendar = Calendar.getInstance(); + long current = calendar.getTimeInMillis(); + + calendar.set(Calendar.HOUR_OF_DAY, 0); + calendar.set(Calendar.MINUTE, 0); + calendar.set(Calendar.SECOND, 0); + calendar.set(Calendar.MILLISECOND, 0); + long today = calendar.getTimeInMillis(); + + calendar.add(Calendar.DAY_OF_MONTH, -1); + long yesterday = calendar.getTimeInMillis(); + + Map currentOffset = topicConsole.getOffsetForTimestamp(topic, current); + Map todayOffset = topicConsole.getOffsetForTimestamp(topic, today); + Map yesterdayOffset = topicConsole.getOffsetForTimestamp(topic, yesterday); + + Map res = new HashMap<>(); + + // 昨天的消息数是今天减去昨天的 + AtomicLong yesterdayTotal = new AtomicLong(0L), todayTotal = new AtomicLong(0L); + Map yesterdayDetail = new HashMap<>(), todayDetail = new HashMap<>(); + todayOffset.forEach(((partition, aLong) -> { + Long last = yesterdayOffset.get(partition); + long diff = last == null ? aLong : aLong - last; + yesterdayDetail.put(partition.partition(), diff); + yesterdayTotal.addAndGet(diff); + })); + currentOffset.forEach(((partition, aLong) -> { + Long last = todayOffset.get(partition); + long diff = last == null ? aLong : aLong - last; + todayDetail.put(partition.partition(), diff); + todayTotal.addAndGet(diff); + })); + + Map yes = new HashMap<>(), to = new HashMap<>(); + yes.put("detail", convertList(yesterdayDetail)); + yes.put("total", yesterdayTotal.get()); + to.put("detail", convertList(todayDetail)); + to.put("total", todayTotal.get()); + + res.put("yesterday", yes); + res.put("today", to); + // 今天的消息数是现在减去今天0时的 + return ResponseData.create().data(res).success(); + } + + private List> convertList(Map source) { + List> collect = source.entrySet().stream().map(entry -> { + Map map = new HashMap<>(3, 1.0f); + map.put("partition", entry.getKey()); + map.put("num", entry.getValue()); + return map; + }).collect(Collectors.toList()); + + return collect; + } } diff --git a/src/main/scala/kafka/console/KafkaConsole.scala b/src/main/scala/kafka/console/KafkaConsole.scala index 8e496ca..25e04f0 100644 --- a/src/main/scala/kafka/console/KafkaConsole.scala +++ b/src/main/scala/kafka/console/KafkaConsole.scala @@ -1,15 +1,20 @@ package kafka.console -import java.util.Properties - import com.xuxd.kafka.console.config.KafkaConfig import kafka.zk.{AdminZkClient, KafkaZkClient} import org.apache.kafka.clients.CommonClientConfigs -import org.apache.kafka.clients.admin.{AbstractOptions, Admin, AdminClientConfig} -import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer} +import org.apache.kafka.clients.admin._ +import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer, OffsetAndMetadata} +import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.config.SaslConfigs +import org.apache.kafka.common.requests.ListOffsetsResponse import org.apache.kafka.common.serialization.ByteArrayDeserializer import org.apache.kafka.common.utils.Time +import org.slf4j.{Logger, LoggerFactory} + +import java.util.Properties +import scala.collection.{Map, Seq} +import scala.jdk.CollectionConverters.{MapHasAsJava, MapHasAsScala} /** * kafka-console-ui. @@ -89,3 +94,57 @@ class KafkaConsole(config: KafkaConfig) { props } } + +object KafkaConsole { + val log: Logger = LoggerFactory.getLogger(this.getClass) + + def getCommittedOffsets(admin: Admin, groupId: String, + timeoutMs: Integer): Map[TopicPartition, OffsetAndMetadata] = { + admin.listConsumerGroupOffsets( + groupId, new ListConsumerGroupOffsetsOptions().timeoutMs(timeoutMs) + ).partitionsToOffsetAndMetadata.get.asScala + } + + def getLogTimestampOffsets(admin: Admin, topicPartitions: Seq[TopicPartition], + timestamp: java.lang.Long, timeoutMs: Integer): Map[TopicPartition, OffsetAndMetadata] = { + val timestampOffsets = topicPartitions.map { topicPartition => + topicPartition -> OffsetSpec.forTimestamp(timestamp) + }.toMap + val offsets = admin.listOffsets( + timestampOffsets.asJava, + new ListOffsetsOptions().timeoutMs(timeoutMs) + ).all.get + val (successfulOffsetsForTimes, unsuccessfulOffsetsForTimes) = + offsets.asScala.partition(_._2.offset != ListOffsetsResponse.UNKNOWN_OFFSET) + + val successfulLogTimestampOffsets = successfulOffsetsForTimes.map { + case (topicPartition, listOffsetsResultInfo) => topicPartition -> new OffsetAndMetadata(listOffsetsResultInfo.offset) + }.toMap + + unsuccessfulOffsetsForTimes.foreach { entry => + log.warn(s"\nWarn: Partition " + entry._1.partition() + " from topic " + entry._1.topic() + + " is empty. Falling back to latest known offset.") + } + + successfulLogTimestampOffsets ++ getLogEndOffsets(admin, unsuccessfulOffsetsForTimes.keySet.toSeq, timeoutMs) + } + + def getLogEndOffsets(admin: Admin, + topicPartitions: Seq[TopicPartition], timeoutMs: Integer): Predef.Map[TopicPartition, OffsetAndMetadata] = { + val endOffsets = topicPartitions.map { topicPartition => + topicPartition -> OffsetSpec.latest + }.toMap + val offsets = admin.listOffsets( + endOffsets.asJava, + new ListOffsetsOptions().timeoutMs(timeoutMs) + ).all.get + val res = topicPartitions.map { topicPartition => + Option(offsets.get(topicPartition)) match { + case Some(listOffsetsResultInfo) => topicPartition -> new OffsetAndMetadata(listOffsetsResultInfo.offset) + case _ => + throw new IllegalArgumentException + } + }.toMap + res + } +} diff --git a/src/main/scala/kafka/console/TopicConsole.scala b/src/main/scala/kafka/console/TopicConsole.scala index 61905e9..f557a1c 100644 --- a/src/main/scala/kafka/console/TopicConsole.scala +++ b/src/main/scala/kafka/console/TopicConsole.scala @@ -234,6 +234,21 @@ class TopicConsole(config: KafkaConfig) extends KafkaConsole(config: KafkaConfig }).asInstanceOf[(Boolean, String)] } + def getOffsetForTimestamp(topic: String, timestamp: java.lang.Long): util.Map[TopicPartition, java.lang.Long] = { + withAdminClientAndCatchError(admin => { + val partitions = describeTopics(admin, Collections.singleton(topic)).get(topic) match { + case Some(topicDescription: TopicDescription) => topicDescription.partitions() + .asScala.map(info => new TopicPartition(topic, info.partition())).toSeq + case None => throw new IllegalArgumentException("topic is not exist.") + } + val offsetMap = KafkaConsole.getLogTimestampOffsets(admin, partitions, timestamp, timeoutMs) + offsetMap.map(tuple2 => (tuple2._1, tuple2._2.offset())).toMap.asJava + }, e => { + log.error("clearThrottle error, ", e) + Collections.emptyMap() + }).asInstanceOf[util.Map[TopicPartition, java.lang.Long]] + } + /** * Get the current replica assignments for some topics. * diff --git a/ui/src/utils/api.js b/ui/src/utils/api.js index 47e4ab4..dff50a3 100644 --- a/ui/src/utils/api.js +++ b/ui/src/utils/api.js @@ -129,6 +129,10 @@ export const KafkaTopicApi = { url: "/topic/replica/throttle", method: "post", }, + sendStats: { + url: "/topic/send/stats", + method: "get", + }, }; export const KafkaConsumerApi = { diff --git a/ui/src/views/topic/SendStats.vue b/ui/src/views/topic/SendStats.vue new file mode 100644 index 0000000..a83c487 --- /dev/null +++ b/ui/src/views/topic/SendStats.vue @@ -0,0 +1,115 @@ + + + + + diff --git a/ui/src/views/topic/Topic.vue b/ui/src/views/topic/Topic.vue index 0981285..66b9eb4 100644 --- a/ui/src/views/topic/Topic.vue +++ b/ui/src/views/topic/Topic.vue @@ -109,7 +109,7 @@ size="small" href="javascript:;" class="operation-btn" - @click="openMessageStatsDialog" + @click="openMessageStatsDialog(record.name)" >发送统计 + @@ -173,6 +178,7 @@ import ConsumedDetail from "@/views/topic/ConsumedDetail"; import TopicConfig from "@/views/topic/TopicConfig"; import UpdateReplica from "@/views/topic/UpdateReplica"; import ConfigTopicThrottle from "@/views/topic/ConfigTopicThrottle"; +import SendStats from "@/views/topic/SendStats"; export default { name: "Topic", @@ -184,6 +190,7 @@ export default { TopicConfig, UpdateReplica, ConfigTopicThrottle, + SendStats, }, data() { return { @@ -207,6 +214,7 @@ export default { showTopicConfigDialog: false, showUpdateReplicaDialog: false, showThrottleDialog: false, + showSendStatsDialog: false, }; }, methods: { @@ -294,8 +302,12 @@ export default { closeUpdateReplicaDialog() { this.showUpdateReplicaDialog = false; }, - openMessageStatsDialog() { - this.$message.info("此功能尚不支持,下个版本支持"); + openMessageStatsDialog(topic) { + this.showSendStatsDialog = true; + this.selectDetail.resourceName = topic; + }, + closeMessageStatsDialog() { + this.showSendStatsDialog = false; }, openThrottleDialog(topic) { this.showThrottleDialog = true;