diff --git a/src/main/java/com/xuxd/kafka/console/beans/ForwardMessage.java b/src/main/java/com/xuxd/kafka/console/beans/ForwardMessage.java new file mode 100644 index 0000000..84c4a9c --- /dev/null +++ b/src/main/java/com/xuxd/kafka/console/beans/ForwardMessage.java @@ -0,0 +1,25 @@ +package com.xuxd.kafka.console.beans; + +import lombok.Data; + +/** + * 消息转发请求参数. + * + * @author: xuxd + * @since: 2025/6/5 16:52 + **/ +@Data +public class ForwardMessage { + + private SendMessage message; + + /** + * 是否还发到同一个分区. + */ + private boolean samePartition; + + /** + * 目标集群id. + */ + private long targetClusterId; +} diff --git a/src/main/java/com/xuxd/kafka/console/controller/MessageController.java b/src/main/java/com/xuxd/kafka/console/controller/MessageController.java index e644c17..c1eec93 100644 --- a/src/main/java/com/xuxd/kafka/console/controller/MessageController.java +++ b/src/main/java/com/xuxd/kafka/console/controller/MessageController.java @@ -2,6 +2,7 @@ package com.xuxd.kafka.console.controller; import com.xuxd.kafka.console.aspect.annotation.ControllerLog; import com.xuxd.kafka.console.aspect.annotation.Permission; +import com.xuxd.kafka.console.beans.ForwardMessage; import com.xuxd.kafka.console.beans.QueryMessage; import com.xuxd.kafka.console.beans.ResponseData; import com.xuxd.kafka.console.beans.SendMessage; @@ -83,4 +84,11 @@ public class MessageController { } return messageService.sendStatisticsByTime(dto); } + + @Permission("message:forward") + @ControllerLog("消息转发") + @PostMapping("/forward") + public Object forward(@RequestBody ForwardMessage message) { + return messageService.forward(message); + } } diff --git a/src/main/java/com/xuxd/kafka/console/service/MessageService.java b/src/main/java/com/xuxd/kafka/console/service/MessageService.java index f32890d..63f1bc2 100644 --- a/src/main/java/com/xuxd/kafka/console/service/MessageService.java +++ b/src/main/java/com/xuxd/kafka/console/service/MessageService.java @@ -1,5 +1,6 @@ package com.xuxd.kafka.console.service; +import com.xuxd.kafka.console.beans.ForwardMessage; import com.xuxd.kafka.console.beans.QueryMessage; import com.xuxd.kafka.console.beans.dto.QuerySendStatisticsDTO; import com.xuxd.kafka.console.beans.ResponseData; @@ -32,4 +33,6 @@ public interface MessageService { ResponseData delete(List messages); ResponseData sendStatisticsByTime(QuerySendStatisticsDTO request); + + ResponseData forward(ForwardMessage message); } diff --git a/src/main/java/com/xuxd/kafka/console/service/impl/MessageServiceImpl.java b/src/main/java/com/xuxd/kafka/console/service/impl/MessageServiceImpl.java index 3921ec0..101efeb 100644 --- a/src/main/java/com/xuxd/kafka/console/service/impl/MessageServiceImpl.java +++ b/src/main/java/com/xuxd/kafka/console/service/impl/MessageServiceImpl.java @@ -1,14 +1,13 @@ package com.xuxd.kafka.console.service.impl; -import com.xuxd.kafka.console.beans.MessageFilter; -import com.xuxd.kafka.console.beans.QueryMessage; -import com.xuxd.kafka.console.beans.ResponseData; -import com.xuxd.kafka.console.beans.SendMessage; +import com.xuxd.kafka.console.beans.*; +import com.xuxd.kafka.console.beans.dos.ClusterInfoDO; import com.xuxd.kafka.console.beans.dto.QuerySendStatisticsDTO; import com.xuxd.kafka.console.beans.enums.FilterType; import com.xuxd.kafka.console.beans.vo.ConsumerRecordVO; import com.xuxd.kafka.console.beans.vo.MessageDetailVO; import com.xuxd.kafka.console.beans.vo.QuerySendStatisticsVO; +import com.xuxd.kafka.console.dao.ClusterInfoMapper; import com.xuxd.kafka.console.service.ConsumerService; import com.xuxd.kafka.console.service.MessageService; import kafka.console.ConsumerConsole; @@ -22,6 +21,8 @@ import org.apache.kafka.clients.admin.TopicDescription; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.header.Header; +import org.apache.kafka.common.header.internals.RecordHeader; import org.apache.kafka.common.serialization.*; import org.springframework.beans.BeansException; import org.springframework.beans.factory.annotation.Autowired; @@ -30,6 +31,7 @@ import org.springframework.context.ApplicationContextAware; import org.springframework.stereotype.Service; import scala.Tuple2; +import java.nio.charset.StandardCharsets; import java.util.*; import java.util.stream.Collectors; @@ -52,6 +54,9 @@ public class MessageServiceImpl implements MessageService, ApplicationContextAwa @Autowired private ConsumerConsole consumerConsole; + @Autowired + private ClusterInfoMapper clusterInfoMapper; + private ApplicationContext applicationContext; private Map deserializerDict = new HashMap<>(); @@ -304,6 +309,47 @@ public class MessageServiceImpl implements MessageService, ApplicationContextAwa return ResponseData.create().data(vo).success(); } + @Override + public ResponseData forward(ForwardMessage message) { + ClusterInfoDO clusterInfoDO = clusterInfoMapper.selectById(message.getTargetClusterId()); + if (clusterInfoDO == null) { + return ResponseData.create().failed("Target cluster not found."); + } + SendMessage sendMessage = message.getMessage(); + List headers = sendMessage.getHeaders(); + ProducerRecord record = null; + List
recordHeaders = null; + if (headers != null) { + recordHeaders = new ArrayList<>(); + for (SendMessage.Header header : headers) { + recordHeaders.add(new RecordHeader(header.getHeaderKey(), header.getHeaderValue().getBytes(StandardCharsets.UTF_8))); + } + } + if (message.isSamePartition()) { + // same partition + record = new ProducerRecord<>(sendMessage.getTopic(), + sendMessage.getPartition(), + null, + sendMessage.getKey() != null ? sendMessage.getKey().getBytes(StandardCharsets.UTF_8) : null, + sendMessage.getBody() != null ? sendMessage.getBody().getBytes(StandardCharsets.UTF_8) : null, + recordHeaders); + } else { + // no partition is specified + record = new ProducerRecord<>(sendMessage.getTopic(), + null, + null, + sendMessage.getKey() != null ? sendMessage.getKey().getBytes(StandardCharsets.UTF_8) : null, + sendMessage.getBody() != null ? sendMessage.getBody().getBytes(StandardCharsets.UTF_8) : null, + recordHeaders); + } + Tuple2 tuple2 = messageConsole.sendSync(record); + boolean success = (boolean) tuple2._1; + if (!success) { + return ResponseData.create().failed(tuple2._2); + } + return ResponseData.create().success(); + } + private Map> searchRecordByOffset(QueryMessage queryMessage) { Set partitions = getPartitions(queryMessage); diff --git a/src/main/resources/db/data-h2.sql b/src/main/resources/db/data-h2.sql index 5df7a90..63575e3 100644 --- a/src/main/resources/db/data-h2.sql +++ b/src/main/resources/db/data-h2.sql @@ -43,6 +43,7 @@ insert into t_sys_permission(id, name,type,parent_id,permission) values(65,'在 insert into t_sys_permission(id, name,type,parent_id,permission) values(66,'消息详情',1,61,'message:detail'); insert into t_sys_permission(id, name,type,parent_id,permission) values(67,'重新发送',1,61,'message:resend'); insert into t_sys_permission(id, name,type,parent_id,permission) values(68,'发送统计',1,61,'message:send-statistics'); +insert into t_sys_permission(id, name,type,parent_id,permission) values(69,'消息转发',1,61,'message:forward'); insert into t_sys_permission(id, name,type,parent_id,permission) values(80,'限流',0,null,'quota'); insert into t_sys_permission(id, name,type,parent_id,permission) values(81,'用户',1,80,'quota:user'); @@ -102,8 +103,8 @@ insert into t_sys_permission(id, name,type,parent_id,permission) values(171,'取 -- t_sys_permission end-- -- t_sys_role start-- -insert into t_sys_role(id, role_name, description, permission_ids) VALUES (1,'超级管理员','超级管理员','12,13,14,22,23,24,25,26,27,28,29,30,34,35,31,32,33,42,43,44,45,46,47,48,49,50,62,63,64,65,66,67,68,81,82,83,84,85,86,87,88,101,102,103,104,105,106,107,108,109,110,111,112,113,114,115,141,142,143,144,145,146,147,148,149,150,151,152,153,161,162,163,164,165,166,167,168,169,171,170'); -insert into t_sys_role(id, role_name, description, permission_ids) VALUES (2,'普通管理员','普通管理员,不能更改用户信息','12,13,14,22,23,24,25,26,27,28,29,30,34,35,31,32,33,42,43,44,45,46,47,48,49,50,62,63,64,65,66,67,68,81,82,83,84,85,86,87,88,101,102,103,104,105,106,107,108,109,110,111,112,113,114,115,141,146,149,150,161,162,163,164,165,166,167,168,169,171,170'); +insert into t_sys_role(id, role_name, description, permission_ids) VALUES (1,'超级管理员','超级管理员','12,13,14,22,23,24,25,26,27,28,29,30,34,35,31,32,33,42,43,44,45,46,47,48,49,50,62,63,64,65,66,67,68,69,81,82,83,84,85,86,87,88,101,102,103,104,105,106,107,108,109,110,111,112,113,114,115,141,142,143,144,145,146,147,148,149,150,151,152,153,161,162,163,164,165,166,167,168,169,171,170'); +insert into t_sys_role(id, role_name, description, permission_ids) VALUES (2,'普通管理员','普通管理员,不能更改用户信息','12,13,14,22,23,24,25,26,27,28,29,30,34,35,31,32,33,42,43,44,45,46,47,48,49,50,62,63,64,65,66,67,68,69,81,82,83,84,85,86,87,88,101,102,103,104,105,106,107,108,109,110,111,112,113,114,115,141,146,149,150,161,162,163,164,165,166,167,168,169,171,170'); -- insert into t_sys_role(id, role_name, description, permission_ids) VALUES (2,'访客','访客','12,13,22,26,29,32,44,45,50,62,63,81,83,85,141,146,149,150,161,163'); -- t_sys_role end--