From 1e86aa456969647ff07e25b36c57bc843d1a1f24 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=AE=B8=E6=99=93=E4=B8=9C?= <763795151@qq.com> Date: Mon, 16 Jun 2025 20:28:26 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E6=B6=88=E6=81=AF=E8=BD=AC?= =?UTF-8?q?=E5=8F=91=E5=8A=9F=E8=83=BD.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../kafka/console/beans/ForwardMessage.java | 5 + .../console/filter/ContextSetFilter.java | 5 + .../service/impl/MessageServiceImpl.java | 53 ++-- src/main/resources/application.yml | 2 +- ui/src/utils/api.js | 4 + ui/src/views/message/ForwardMessage.vue | 248 ++++++++++++++++++ ui/src/views/message/MessageDetail.vue | 26 ++ 7 files changed, 315 insertions(+), 28 deletions(-) create mode 100644 ui/src/views/message/ForwardMessage.vue diff --git a/src/main/java/com/xuxd/kafka/console/beans/ForwardMessage.java b/src/main/java/com/xuxd/kafka/console/beans/ForwardMessage.java index 84c4a9c..13cc26d 100644 --- a/src/main/java/com/xuxd/kafka/console/beans/ForwardMessage.java +++ b/src/main/java/com/xuxd/kafka/console/beans/ForwardMessage.java @@ -22,4 +22,9 @@ public class ForwardMessage { * 目标集群id. */ private long targetClusterId; + + /** + * 目标topic. + */ + private String targetTopic; } diff --git a/src/main/java/com/xuxd/kafka/console/filter/ContextSetFilter.java b/src/main/java/com/xuxd/kafka/console/filter/ContextSetFilter.java index a43fc1d..dc7a947 100644 --- a/src/main/java/com/xuxd/kafka/console/filter/ContextSetFilter.java +++ b/src/main/java/com/xuxd/kafka/console/filter/ContextSetFilter.java @@ -49,6 +49,10 @@ public class ContextSetFilter implements Filter { String uri = request.getRequestURI(); if (!excludes.contains(uri)) { String headerId = request.getHeader(Header.ID); + String specificId = request.getHeader(Header.SPECIFIC_ID); + if (StringUtils.isNotBlank(specificId)) { + headerId = specificId; + } if (StringUtils.isBlank(headerId)) { // ResponseData failed = ResponseData.create().failed("Cluster info is null."); ResponseData failed = ResponseData.create().failed("没有集群信息,请先切换集群"); @@ -84,5 +88,6 @@ public class ContextSetFilter implements Filter { interface Header { String ID = "X-Cluster-Info-Id"; String NAME = "X-Cluster-Info-Name"; + String SPECIFIC_ID = "X-Specific-Cluster-Info-Id"; } } diff --git a/src/main/java/com/xuxd/kafka/console/service/impl/MessageServiceImpl.java b/src/main/java/com/xuxd/kafka/console/service/impl/MessageServiceImpl.java index 101efeb..0963051 100644 --- a/src/main/java/com/xuxd/kafka/console/service/impl/MessageServiceImpl.java +++ b/src/main/java/com/xuxd/kafka/console/service/impl/MessageServiceImpl.java @@ -7,6 +7,8 @@ import com.xuxd.kafka.console.beans.enums.FilterType; import com.xuxd.kafka.console.beans.vo.ConsumerRecordVO; import com.xuxd.kafka.console.beans.vo.MessageDetailVO; import com.xuxd.kafka.console.beans.vo.QuerySendStatisticsVO; +import com.xuxd.kafka.console.config.ContextConfig; +import com.xuxd.kafka.console.config.ContextConfigHolder; import com.xuxd.kafka.console.dao.ClusterInfoMapper; import com.xuxd.kafka.console.service.ConsumerService; import com.xuxd.kafka.console.service.MessageService; @@ -21,8 +23,6 @@ import org.apache.kafka.clients.admin.TopicDescription; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.header.Header; -import org.apache.kafka.common.header.internals.RecordHeader; import org.apache.kafka.common.serialization.*; import org.springframework.beans.BeansException; import org.springframework.beans.factory.annotation.Autowired; @@ -31,7 +31,6 @@ import org.springframework.context.ApplicationContextAware; import org.springframework.stereotype.Service; import scala.Tuple2; -import java.nio.charset.StandardCharsets; import java.util.*; import java.util.stream.Collectors; @@ -316,32 +315,32 @@ public class MessageServiceImpl implements MessageService, ApplicationContextAwa return ResponseData.create().failed("Target cluster not found."); } SendMessage sendMessage = message.getMessage(); - List headers = sendMessage.getHeaders(); - ProducerRecord record = null; - List
recordHeaders = null; - if (headers != null) { - recordHeaders = new ArrayList<>(); - for (SendMessage.Header header : headers) { - recordHeaders.add(new RecordHeader(header.getHeaderKey(), header.getHeaderValue().getBytes(StandardCharsets.UTF_8))); - } + // first, search message detail + Map offsetTable = new HashMap<>(); + TopicPartition topicPartition = new TopicPartition(sendMessage.getTopic(), sendMessage.getPartition()); + offsetTable.put(topicPartition, sendMessage.getOffset()); + Map> recordMap = messageConsole.searchBy(offsetTable); + ConsumerRecord consumerRecord = recordMap.get(topicPartition); + if (consumerRecord == null) { + return ResponseData.create().failed("Source message not found."); } - if (message.isSamePartition()) { - // same partition - record = new ProducerRecord<>(sendMessage.getTopic(), - sendMessage.getPartition(), - null, - sendMessage.getKey() != null ? sendMessage.getKey().getBytes(StandardCharsets.UTF_8) : null, - sendMessage.getBody() != null ? sendMessage.getBody().getBytes(StandardCharsets.UTF_8) : null, - recordHeaders); - } else { - // no partition is specified - record = new ProducerRecord<>(sendMessage.getTopic(), - null, - null, - sendMessage.getKey() != null ? sendMessage.getKey().getBytes(StandardCharsets.UTF_8) : null, - sendMessage.getBody() != null ? sendMessage.getBody().getBytes(StandardCharsets.UTF_8) : null, - recordHeaders); + + String topic = message.getTargetTopic(); + if (StringUtils.isEmpty(topic)) { + topic = sendMessage.getTopic(); } + // copy from consumer record. + ProducerRecord record = new ProducerRecord<>(topic, + message.isSamePartition() ? consumerRecord.partition() : null, + consumerRecord.key(), + consumerRecord.value(), + consumerRecord.headers()); + + ContextConfig config = ContextConfigHolder.CONTEXT_CONFIG.get(); + config.setClusterInfoId(clusterInfoDO.getId()); + config.setClusterName(clusterInfoDO.getClusterName()); + config.setBootstrapServer(clusterInfoDO.getAddress()); + // send. Tuple2 tuple2 = messageConsole.sendSync(record); boolean success = (boolean) tuple2._1; if (!success) { diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index cc3a245..c56ceef 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -15,7 +15,7 @@ kafka: # 缓存连接,不缓存的情况下,每次请求建立连接. 即使每次请求建立连接,其实也很快,某些情况下开启ACL,查询可能很慢,可以设置连接缓存为true, # 或者想提高查询速度,也可以设置下面连接缓存为true # 缓存 admin client的连接 - cache-admin-connection: false + cache-admin-connection: true # 缓存 producer的连接 cache-producer-connection: false # 缓存 consumer的连接 diff --git a/ui/src/utils/api.js b/ui/src/utils/api.js index dc164c8..749ebcf 100644 --- a/ui/src/utils/api.js +++ b/ui/src/utils/api.js @@ -300,6 +300,10 @@ export const KafkaMessageApi = { url: "/message/send/statistics", method: "post", }, + forward: { + url: "/message/forward", + method: "post", + }, }; export const KafkaClientQuotaApi = { diff --git a/ui/src/views/message/ForwardMessage.vue b/ui/src/views/message/ForwardMessage.vue new file mode 100644 index 0000000..d1a3889 --- /dev/null +++ b/ui/src/views/message/ForwardMessage.vue @@ -0,0 +1,248 @@ + + + + + diff --git a/ui/src/views/message/MessageDetail.vue b/ui/src/views/message/MessageDetail.vue index 9d98bf7..a67cdd9 100644 --- a/ui/src/views/message/MessageDetail.vue +++ b/ui/src/views/message/MessageDetail.vue @@ -120,8 +120,22 @@ 重新发送 + + 转发消息 + + @@ -131,9 +145,11 @@ import request from "@/utils/request"; import { KafkaMessageApi } from "@/utils/api"; import notification from "ant-design-vue/lib/notification"; import moment from "moment"; +import ForwardMessage from "@/views/message/ForwardMessage.vue"; export default { name: "MessageDetail", + components: { ForwardMessage }, props: { record: {}, visible: { @@ -151,6 +167,7 @@ export default { valueDeserializer: "String", consumerDetail: [], columns, + showForwardDialog: false, }; }, watch: { @@ -232,6 +249,12 @@ export default { } }); }, + openForwardDialog() { + this.showForwardDialog = true; + }, + closeForwardDialog() { + this.showForwardDialog = false; + }, }, }; const columns = [ @@ -264,4 +287,7 @@ const columns = [ max-width: 80% !important; vertical-align: top !important; } +.mar-left { + margin-left: 1%; +}