diff --git a/src/main/java/com/xuxd/kafka/console/beans/QueryMessage.java b/src/main/java/com/xuxd/kafka/console/beans/QueryMessage.java index be23dea..49aff5b 100644 --- a/src/main/java/com/xuxd/kafka/console/beans/QueryMessage.java +++ b/src/main/java/com/xuxd/kafka/console/beans/QueryMessage.java @@ -18,4 +18,6 @@ public class QueryMessage { private long startTime; private long endTime; + + private long offset; } diff --git a/src/main/java/com/xuxd/kafka/console/beans/dto/QueryMessageDTO.java b/src/main/java/com/xuxd/kafka/console/beans/dto/QueryMessageDTO.java index cbafd17..3c83df4 100644 --- a/src/main/java/com/xuxd/kafka/console/beans/dto/QueryMessageDTO.java +++ b/src/main/java/com/xuxd/kafka/console/beans/dto/QueryMessageDTO.java @@ -21,12 +21,22 @@ public class QueryMessageDTO { private Date endTime; + private Long offset; + public QueryMessage toQueryMessage() { QueryMessage queryMessage = new QueryMessage(); queryMessage.setTopic(topic); queryMessage.setPartition(partition); - queryMessage.setStartTime(startTime.getTime()); - queryMessage.setEndTime(endTime.getTime()); + if (startTime != null) { + queryMessage.setStartTime(startTime.getTime()); + } + if (endTime != null) { + queryMessage.setEndTime(endTime.getTime()); + } + + if (offset != null) { + queryMessage.setOffset(offset); + } return queryMessage; } diff --git a/src/main/java/com/xuxd/kafka/console/controller/MessageController.java b/src/main/java/com/xuxd/kafka/console/controller/MessageController.java index cfce28a..68ad12f 100644 --- a/src/main/java/com/xuxd/kafka/console/controller/MessageController.java +++ b/src/main/java/com/xuxd/kafka/console/controller/MessageController.java @@ -25,4 +25,9 @@ public class MessageController { public Object searchByTime(@RequestBody QueryMessageDTO dto) { return messageService.searchByTime(dto.toQueryMessage()); } + + @PostMapping("/search/offset") + public Object searchByOffset(@RequestBody QueryMessageDTO dto) { + return messageService.searchByOffset(dto.toQueryMessage()); + } } diff --git a/src/main/java/com/xuxd/kafka/console/service/MessageService.java b/src/main/java/com/xuxd/kafka/console/service/MessageService.java index cdaa10c..4c54de2 100644 --- a/src/main/java/com/xuxd/kafka/console/service/MessageService.java +++ b/src/main/java/com/xuxd/kafka/console/service/MessageService.java @@ -12,4 +12,6 @@ import com.xuxd.kafka.console.beans.ResponseData; public interface MessageService { ResponseData searchByTime(QueryMessage queryMessage); + + ResponseData searchByOffset(QueryMessage queryMessage); } diff --git a/src/main/java/com/xuxd/kafka/console/service/impl/MessageServiceImpl.java b/src/main/java/com/xuxd/kafka/console/service/impl/MessageServiceImpl.java index d140980..830fe6d 100644 --- a/src/main/java/com/xuxd/kafka/console/service/impl/MessageServiceImpl.java +++ b/src/main/java/com/xuxd/kafka/console/service/impl/MessageServiceImpl.java @@ -38,6 +38,29 @@ public class MessageServiceImpl implements MessageService { @Override public ResponseData searchByTime(QueryMessage queryMessage) { int maxNums = 10000; + Set partitions = getPartitions(queryMessage); + List> records = messageConsole.searchBy(partitions, queryMessage.getStartTime(), queryMessage.getEndTime(), maxNums); + List vos = records.stream().filter(record -> record.timestamp() <= queryMessage.getEndTime()) + .map(ConsumerRecordVO::fromConsumerRecord).collect(Collectors.toList()); + Map res = new HashMap<>(); + res.put("maxNum", maxNums); + res.put("realNum", vos.size()); + res.put("data", vos.subList(0, Math.min(maxNums, vos.size()))); + return ResponseData.create().data(res).success(); + } + + @Override public ResponseData searchByOffset(QueryMessage queryMessage) { + Set partitions = getPartitions(queryMessage); + Map offsetTable = new HashMap<>(); + partitions.forEach(tp -> { + offsetTable.put(tp, queryMessage.getOffset()); + }); + Map> recordMap = messageConsole.searchBy(offsetTable); + + return ResponseData.create().data(recordMap.values().stream().map(ConsumerRecordVO::fromConsumerRecord).collect(Collectors.toList())).success(); + } + + private Set getPartitions(QueryMessage queryMessage) { Set partitions = new HashSet<>(); if (queryMessage.getPartition() != -1) { partitions.add(new TopicPartition(queryMessage.getTopic(), queryMessage.getPartition())); @@ -50,13 +73,6 @@ public class MessageServiceImpl implements MessageService { .map(tp -> new TopicPartition(queryMessage.getTopic(), tp.partition())).collect(Collectors.toSet()); partitions.addAll(set); } - List> records = messageConsole.searchBy(partitions, queryMessage.getStartTime(), queryMessage.getEndTime(), maxNums); - List vos = records.stream().filter(record -> record.timestamp() <= queryMessage.getEndTime()) - .map(ConsumerRecordVO::fromConsumerRecord).collect(Collectors.toList()); - Map res = new HashMap<>(); - res.put("maxNum", maxNums); - res.put("realNum", vos.size()); - res.put("data", vos.subList(0, Math.min(maxNums, vos.size()))); - return ResponseData.create().data(res).success(); + return partitions; } } diff --git a/src/main/scala/kafka/console/MessageConsole.scala b/src/main/scala/kafka/console/MessageConsole.scala index 038e802..6f1c937 100644 --- a/src/main/scala/kafka/console/MessageConsole.scala +++ b/src/main/scala/kafka/console/MessageConsole.scala @@ -8,7 +8,7 @@ import java.time.Duration import java.util import java.util.Properties import scala.collection.immutable -import scala.jdk.CollectionConverters.CollectionHasAsScala +import scala.jdk.CollectionConverters.{CollectionHasAsScala, MapHasAsScala} /** * kafka-console-ui. @@ -86,4 +86,59 @@ class MessageConsole(config: KafkaConfig) extends KafkaConsole(config: KafkaConf res } + + def searchBy( + tp2o: util.Map[TopicPartition, Long]): util.Map[TopicPartition, ConsumerRecord[Array[Byte], Array[Byte]]] = { + val props = new Properties() + props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false") + val res = new util.HashMap[TopicPartition, ConsumerRecord[Array[Byte], Array[Byte]]]() + withConsumerAndCatchError(consumer => { + var tpSet = tp2o.keySet() + val tpSetCopy = new util.HashSet[TopicPartition](tpSet) + val endOffsets = consumer.endOffsets(tpSet) + val beginOffsets = consumer.beginningOffsets(tpSet) + for ((tp, off) <- tp2o.asScala) { + val endOff = endOffsets.get(tp) + // if (endOff <= off) { + // consumer.seek(tp, endOff) + // tpSetCopy.remove(tp) + // } else { + // consumer.seek(tp, off) + // } + val beginOff = beginOffsets.get(tp) + if (off < beginOff || off >= endOff) { + tpSetCopy.remove(tp) + } + } + + tpSet = tpSetCopy + consumer.assign(tpSet) + tpSet.asScala.foreach(tp => { + consumer.seek(tp, tp2o.get(tp)) + }) + + var terminate = tpSet.isEmpty + while (!terminate) { + val records = consumer.poll(Duration.ofMillis(timeoutMs)) + val tps = new util.HashSet(tpSet).asScala + for (tp <- tps) { + if (!res.containsKey(tp)) { + val recordList = records.records(tp) + if (!recordList.isEmpty) { + val record = recordList.get(0) + res.put(tp, record) + tpSet.remove(tp) + } + } + if (tpSet.isEmpty) { + terminate = true + } + } + } + + }, e => { + log.error("searchBy offset error.", e) + }) + res + } } diff --git a/ui/src/utils/api.js b/ui/src/utils/api.js index fcf51bd..4d5d586 100644 --- a/ui/src/utils/api.js +++ b/ui/src/utils/api.js @@ -228,4 +228,8 @@ export const KafkaMessageApi = { url: "/message/search/time", method: "post", }, + searchByOffset: { + url: "/message/search/offset", + method: "post", + }, }; diff --git a/ui/src/views/message/Message.vue b/ui/src/views/message/Message.vue index cd0af11..66eec63 100644 --- a/ui/src/views/message/Message.vue +++ b/ui/src/views/message/Message.vue @@ -6,7 +6,7 @@ - 根据位移查询消息 + 消息发送1 @@ -16,12 +16,13 @@ + + diff --git a/ui/src/views/message/SearchByOffset.vue b/ui/src/views/message/SearchByOffset.vue new file mode 100644 index 0000000..b2f0485 --- /dev/null +++ b/ui/src/views/message/SearchByOffset.vue @@ -0,0 +1,192 @@ + + + + + diff --git a/ui/src/views/message/SearchByTime.vue b/ui/src/views/message/SearchByTime.vue index f46bfa8..1a7f12b 100644 --- a/ui/src/views/message/SearchByTime.vue +++ b/ui/src/views/message/SearchByTime.vue @@ -61,8 +61,6 @@ - -

检索条数:{{ data.realNum }},允许返回的最大条数:{{ @@ -70,18 +68,7 @@ }}

- -
- 消息详情 - -
-
+ @@ -90,11 +77,11 @@ import request from "@/utils/request"; import { KafkaMessageApi, KafkaTopicApi } from "@/utils/api"; import notification from "ant-design-vue/lib/notification"; -import moment from "moment"; +import MessageList from "@/views/message/MessageList"; export default { name: "SearchByTime", - components: {}, + components: { MessageList }, props: { topicList: { type: Array, @@ -110,7 +97,6 @@ export default { rules: [{ type: "array", required: true, message: "请选择时间!" }], }, data: defaultData, - columns: columns, }; }, methods: { @@ -166,41 +152,6 @@ export default { }, }, }; - -const columns = [ - { - title: "topic", - dataIndex: "topic", - key: "topic", - width: 300, - }, - { - title: "分区", - dataIndex: "partition", - key: "partition", - }, - { - title: "偏移", - dataIndex: "offset", - key: "offset", - }, - { - title: "时间", - dataIndex: "timestamp", - key: "timestamp", - slots: { title: "timestamp" }, - scopedSlots: { customRender: "timestamp" }, - customRender: (text) => { - return moment(text).format("YYYY-MM-DD HH:mm:ss:SSS"); - }, - }, - { - title: "操作", - key: "operation", - scopedSlots: { customRender: "operation" }, - width: 200, - }, -]; const defaultData = { realNum: 0, maxNum: 0 }; @@ -240,20 +191,6 @@ const defaultData = { realNum: 0, maxNum: 0 }; padding-top: 80px; } -.input-w { - width: 400px; -} - -.operation-row-button { - height: 4%; - text-align: left; - margin-bottom: 8px; -} - -.operation-btn { - margin-right: 3%; -} - .type-select { width: 200px !important; }