增加消息转发功能.

This commit is contained in:
许晓东
2025-06-16 20:28:26 +08:00
parent 780d05bc06
commit 1e86aa4569
7 changed files with 315 additions and 28 deletions

View File

@@ -22,4 +22,9 @@ public class ForwardMessage {
* 目标集群id.
*/
private long targetClusterId;
/**
* 目标topic.
*/
private String targetTopic;
}

View File

@@ -49,6 +49,10 @@ public class ContextSetFilter implements Filter {
String uri = request.getRequestURI();
if (!excludes.contains(uri)) {
String headerId = request.getHeader(Header.ID);
String specificId = request.getHeader(Header.SPECIFIC_ID);
if (StringUtils.isNotBlank(specificId)) {
headerId = specificId;
}
if (StringUtils.isBlank(headerId)) {
// ResponseData failed = ResponseData.create().failed("Cluster info is null.");
ResponseData failed = ResponseData.create().failed("没有集群信息,请先切换集群");
@@ -84,5 +88,6 @@ public class ContextSetFilter implements Filter {
interface Header {
String ID = "X-Cluster-Info-Id";
String NAME = "X-Cluster-Info-Name";
String SPECIFIC_ID = "X-Specific-Cluster-Info-Id";
}
}

View File

@@ -7,6 +7,8 @@ import com.xuxd.kafka.console.beans.enums.FilterType;
import com.xuxd.kafka.console.beans.vo.ConsumerRecordVO;
import com.xuxd.kafka.console.beans.vo.MessageDetailVO;
import com.xuxd.kafka.console.beans.vo.QuerySendStatisticsVO;
import com.xuxd.kafka.console.config.ContextConfig;
import com.xuxd.kafka.console.config.ContextConfigHolder;
import com.xuxd.kafka.console.dao.ClusterInfoMapper;
import com.xuxd.kafka.console.service.ConsumerService;
import com.xuxd.kafka.console.service.MessageService;
@@ -21,8 +23,6 @@ import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.serialization.*;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.annotation.Autowired;
@@ -31,7 +31,6 @@ import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Service;
import scala.Tuple2;
import java.nio.charset.StandardCharsets;
import java.util.*;
import java.util.stream.Collectors;
@@ -316,32 +315,32 @@ public class MessageServiceImpl implements MessageService, ApplicationContextAwa
return ResponseData.create().failed("Target cluster not found.");
}
SendMessage sendMessage = message.getMessage();
List<SendMessage.Header> headers = sendMessage.getHeaders();
ProducerRecord<byte[], byte[]> record = null;
List<Header> recordHeaders = null;
if (headers != null) {
recordHeaders = new ArrayList<>();
for (SendMessage.Header header : headers) {
recordHeaders.add(new RecordHeader(header.getHeaderKey(), header.getHeaderValue().getBytes(StandardCharsets.UTF_8)));
}
// first, search message detail
Map<TopicPartition, Object> offsetTable = new HashMap<>();
TopicPartition topicPartition = new TopicPartition(sendMessage.getTopic(), sendMessage.getPartition());
offsetTable.put(topicPartition, sendMessage.getOffset());
Map<TopicPartition, ConsumerRecord<byte[], byte[]>> recordMap = messageConsole.searchBy(offsetTable);
ConsumerRecord<byte[], byte[]> consumerRecord = recordMap.get(topicPartition);
if (consumerRecord == null) {
return ResponseData.create().failed("Source message not found.");
}
if (message.isSamePartition()) {
// same partition
record = new ProducerRecord<>(sendMessage.getTopic(),
sendMessage.getPartition(),
null,
sendMessage.getKey() != null ? sendMessage.getKey().getBytes(StandardCharsets.UTF_8) : null,
sendMessage.getBody() != null ? sendMessage.getBody().getBytes(StandardCharsets.UTF_8) : null,
recordHeaders);
} else {
// no partition is specified
record = new ProducerRecord<>(sendMessage.getTopic(),
null,
null,
sendMessage.getKey() != null ? sendMessage.getKey().getBytes(StandardCharsets.UTF_8) : null,
sendMessage.getBody() != null ? sendMessage.getBody().getBytes(StandardCharsets.UTF_8) : null,
recordHeaders);
String topic = message.getTargetTopic();
if (StringUtils.isEmpty(topic)) {
topic = sendMessage.getTopic();
}
// copy from consumer record.
ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(topic,
message.isSamePartition() ? consumerRecord.partition() : null,
consumerRecord.key(),
consumerRecord.value(),
consumerRecord.headers());
ContextConfig config = ContextConfigHolder.CONTEXT_CONFIG.get();
config.setClusterInfoId(clusterInfoDO.getId());
config.setClusterName(clusterInfoDO.getClusterName());
config.setBootstrapServer(clusterInfoDO.getAddress());
// send.
Tuple2<Object, String> tuple2 = messageConsole.sendSync(record);
boolean success = (boolean) tuple2._1;
if (!success) {