增加消息转发接口.
This commit is contained in:
@@ -1,5 +1,6 @@
|
||||
package com.xuxd.kafka.console.service;
|
||||
|
||||
import com.xuxd.kafka.console.beans.ForwardMessage;
|
||||
import com.xuxd.kafka.console.beans.QueryMessage;
|
||||
import com.xuxd.kafka.console.beans.dto.QuerySendStatisticsDTO;
|
||||
import com.xuxd.kafka.console.beans.ResponseData;
|
||||
@@ -32,4 +33,6 @@ public interface MessageService {
|
||||
ResponseData delete(List<QueryMessage> messages);
|
||||
|
||||
ResponseData sendStatisticsByTime(QuerySendStatisticsDTO request);
|
||||
|
||||
ResponseData forward(ForwardMessage message);
|
||||
}
|
||||
|
||||
@@ -1,14 +1,13 @@
|
||||
package com.xuxd.kafka.console.service.impl;
|
||||
|
||||
import com.xuxd.kafka.console.beans.MessageFilter;
|
||||
import com.xuxd.kafka.console.beans.QueryMessage;
|
||||
import com.xuxd.kafka.console.beans.ResponseData;
|
||||
import com.xuxd.kafka.console.beans.SendMessage;
|
||||
import com.xuxd.kafka.console.beans.*;
|
||||
import com.xuxd.kafka.console.beans.dos.ClusterInfoDO;
|
||||
import com.xuxd.kafka.console.beans.dto.QuerySendStatisticsDTO;
|
||||
import com.xuxd.kafka.console.beans.enums.FilterType;
|
||||
import com.xuxd.kafka.console.beans.vo.ConsumerRecordVO;
|
||||
import com.xuxd.kafka.console.beans.vo.MessageDetailVO;
|
||||
import com.xuxd.kafka.console.beans.vo.QuerySendStatisticsVO;
|
||||
import com.xuxd.kafka.console.dao.ClusterInfoMapper;
|
||||
import com.xuxd.kafka.console.service.ConsumerService;
|
||||
import com.xuxd.kafka.console.service.MessageService;
|
||||
import kafka.console.ConsumerConsole;
|
||||
@@ -22,6 +21,8 @@ import org.apache.kafka.clients.admin.TopicDescription;
|
||||
import org.apache.kafka.clients.consumer.ConsumerRecord;
|
||||
import org.apache.kafka.clients.producer.ProducerRecord;
|
||||
import org.apache.kafka.common.TopicPartition;
|
||||
import org.apache.kafka.common.header.Header;
|
||||
import org.apache.kafka.common.header.internals.RecordHeader;
|
||||
import org.apache.kafka.common.serialization.*;
|
||||
import org.springframework.beans.BeansException;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
@@ -30,6 +31,7 @@ import org.springframework.context.ApplicationContextAware;
|
||||
import org.springframework.stereotype.Service;
|
||||
import scala.Tuple2;
|
||||
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.*;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
@@ -52,6 +54,9 @@ public class MessageServiceImpl implements MessageService, ApplicationContextAwa
|
||||
@Autowired
|
||||
private ConsumerConsole consumerConsole;
|
||||
|
||||
@Autowired
|
||||
private ClusterInfoMapper clusterInfoMapper;
|
||||
|
||||
private ApplicationContext applicationContext;
|
||||
|
||||
private Map<String, Deserializer> deserializerDict = new HashMap<>();
|
||||
@@ -304,6 +309,47 @@ public class MessageServiceImpl implements MessageService, ApplicationContextAwa
|
||||
return ResponseData.create().data(vo).success();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ResponseData forward(ForwardMessage message) {
|
||||
ClusterInfoDO clusterInfoDO = clusterInfoMapper.selectById(message.getTargetClusterId());
|
||||
if (clusterInfoDO == null) {
|
||||
return ResponseData.create().failed("Target cluster not found.");
|
||||
}
|
||||
SendMessage sendMessage = message.getMessage();
|
||||
List<SendMessage.Header> headers = sendMessage.getHeaders();
|
||||
ProducerRecord<byte[], byte[]> record = null;
|
||||
List<Header> recordHeaders = null;
|
||||
if (headers != null) {
|
||||
recordHeaders = new ArrayList<>();
|
||||
for (SendMessage.Header header : headers) {
|
||||
recordHeaders.add(new RecordHeader(header.getHeaderKey(), header.getHeaderValue().getBytes(StandardCharsets.UTF_8)));
|
||||
}
|
||||
}
|
||||
if (message.isSamePartition()) {
|
||||
// same partition
|
||||
record = new ProducerRecord<>(sendMessage.getTopic(),
|
||||
sendMessage.getPartition(),
|
||||
null,
|
||||
sendMessage.getKey() != null ? sendMessage.getKey().getBytes(StandardCharsets.UTF_8) : null,
|
||||
sendMessage.getBody() != null ? sendMessage.getBody().getBytes(StandardCharsets.UTF_8) : null,
|
||||
recordHeaders);
|
||||
} else {
|
||||
// no partition is specified
|
||||
record = new ProducerRecord<>(sendMessage.getTopic(),
|
||||
null,
|
||||
null,
|
||||
sendMessage.getKey() != null ? sendMessage.getKey().getBytes(StandardCharsets.UTF_8) : null,
|
||||
sendMessage.getBody() != null ? sendMessage.getBody().getBytes(StandardCharsets.UTF_8) : null,
|
||||
recordHeaders);
|
||||
}
|
||||
Tuple2<Object, String> tuple2 = messageConsole.sendSync(record);
|
||||
boolean success = (boolean) tuple2._1;
|
||||
if (!success) {
|
||||
return ResponseData.create().failed(tuple2._2);
|
||||
}
|
||||
return ResponseData.create().success();
|
||||
}
|
||||
|
||||
private Map<TopicPartition, ConsumerRecord<byte[], byte[]>> searchRecordByOffset(QueryMessage queryMessage) {
|
||||
Set<TopicPartition> partitions = getPartitions(queryMessage);
|
||||
|
||||
|
||||
Reference in New Issue
Block a user