diff --git a/pom.xml b/pom.xml index d19b5e3..72053fe 100644 --- a/pom.xml +++ b/pom.xml @@ -101,6 +101,14 @@ 1.18.20 provided + + + + com.google.code.gson + gson + 2.8.8 + + diff --git a/src/main/java/com/xuxd/kafka/console/beans/dos/MinOffsetAlignmentDO.java b/src/main/java/com/xuxd/kafka/console/beans/dos/MinOffsetAlignmentDO.java new file mode 100644 index 0000000..6f43999 --- /dev/null +++ b/src/main/java/com/xuxd/kafka/console/beans/dos/MinOffsetAlignmentDO.java @@ -0,0 +1,30 @@ +package com.xuxd.kafka.console.beans.dos; + +import com.baomidou.mybatisplus.annotation.IdType; +import com.baomidou.mybatisplus.annotation.TableId; +import com.baomidou.mybatisplus.annotation.TableName; +import lombok.Data; + +/** + * kafka-console-ui. + * + * @author xuxd + * @date 2021-10-26 10:32:05 + **/ +@Data +@TableName("t_min_offset_alignment") +public class MinOffsetAlignmentDO { + + @TableId(type = IdType.AUTO) + private Long id; + + private String groupId; + + private String topic; + + private String thatOffset; + + private String thisOffset; + + private String updateTime; +} diff --git a/src/main/java/com/xuxd/kafka/console/controller/OperationController.java b/src/main/java/com/xuxd/kafka/console/controller/OperationController.java index 3dec9ee..2f2503e 100644 --- a/src/main/java/com/xuxd/kafka/console/controller/OperationController.java +++ b/src/main/java/com/xuxd/kafka/console/controller/OperationController.java @@ -27,4 +27,10 @@ public class OperationController { dto.getProperties().put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, dto.getAddress()); return operationService.syncConsumerOffset(dto.getGroupId(), dto.getTopic(), dto.getProperties()); } + + @PostMapping("/sync/min/offset/alignment") + public Object minOffsetAlignment(@RequestBody SyncDataDTO dto) { + dto.getProperties().put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, dto.getAddress()); + return operationService.minOffsetAlignment(dto.getGroupId(), dto.getTopic(), dto.getProperties()); + } } diff --git a/src/main/java/com/xuxd/kafka/console/dao/MinOffsetAlignmentMapper.java b/src/main/java/com/xuxd/kafka/console/dao/MinOffsetAlignmentMapper.java new file mode 100644 index 0000000..07173f8 --- /dev/null +++ b/src/main/java/com/xuxd/kafka/console/dao/MinOffsetAlignmentMapper.java @@ -0,0 +1,13 @@ +package com.xuxd.kafka.console.dao; + +import com.baomidou.mybatisplus.core.mapper.BaseMapper; +import com.xuxd.kafka.console.beans.dos.MinOffsetAlignmentDO; + +/** + * kafka-console-ui. + * + * @author xuxd + * @date 2021-10-26 10:33:55 + **/ +public interface MinOffsetAlignmentMapper extends BaseMapper { +} diff --git a/src/main/java/com/xuxd/kafka/console/service/OperationService.java b/src/main/java/com/xuxd/kafka/console/service/OperationService.java index 1f5364b..8361af8 100644 --- a/src/main/java/com/xuxd/kafka/console/service/OperationService.java +++ b/src/main/java/com/xuxd/kafka/console/service/OperationService.java @@ -12,4 +12,6 @@ import java.util.Properties; public interface OperationService { ResponseData syncConsumerOffset(String groupId, String topic, Properties thatProps); + + ResponseData minOffsetAlignment(String groupId, String topic, Properties thatProps); } diff --git a/src/main/java/com/xuxd/kafka/console/service/impl/OperationServiceImpl.java b/src/main/java/com/xuxd/kafka/console/service/impl/OperationServiceImpl.java index 4955e96..b238880 100644 --- a/src/main/java/com/xuxd/kafka/console/service/impl/OperationServiceImpl.java +++ b/src/main/java/com/xuxd/kafka/console/service/impl/OperationServiceImpl.java @@ -1,9 +1,17 @@ package com.xuxd.kafka.console.service.impl; +import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; +import com.google.gson.Gson; +import com.google.gson.JsonObject; import com.xuxd.kafka.console.beans.ResponseData; +import com.xuxd.kafka.console.beans.dos.MinOffsetAlignmentDO; +import com.xuxd.kafka.console.dao.MinOffsetAlignmentMapper; import com.xuxd.kafka.console.service.OperationService; +import java.util.Map; import java.util.Properties; import kafka.console.OperationConsole; +import org.apache.kafka.common.TopicPartition; +import org.springframework.beans.factory.ObjectProvider; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import scala.Tuple2; @@ -17,13 +25,52 @@ import scala.Tuple2; @Service public class OperationServiceImpl implements OperationService { + private Gson gson = new Gson(); + @Autowired private OperationConsole operationConsole; + private MinOffsetAlignmentMapper minOffsetAlignmentMapper; + + public OperationServiceImpl(ObjectProvider minOffsetAlignmentMapper) { + this.minOffsetAlignmentMapper = minOffsetAlignmentMapper.getIfAvailable(); + } + @Override public ResponseData syncConsumerOffset(String groupId, String topic, Properties thatProps) { Tuple2 tuple2 = operationConsole.syncConsumerOffset(groupId, topic, thatProps); return (boolean) tuple2._1() ? ResponseData.create().success() : ResponseData.create().failed(tuple2._2()); } + + @Override public ResponseData minOffsetAlignment(String groupId, String topic, Properties thatProps) { + + Tuple2, Map> tuple2 = operationConsole.checkAndFetchMinOffset(groupId, topic, thatProps); + Map thisMinOffset = tuple2._1(); + Map thatMinOffset = tuple2._2(); + + JsonObject thisJson = new JsonObject(), thatJson = new JsonObject(); + thisMinOffset.forEach((k, v) -> { + thisJson.addProperty(String.valueOf(k.partition()), v.toString()); + }); + thatMinOffset.forEach((k, v) -> { + thatJson.addProperty(String.valueOf(k.partition()), v.toString()); + }); + + MinOffsetAlignmentDO alignmentDO = new MinOffsetAlignmentDO(); + alignmentDO.setGroupId(groupId); + alignmentDO.setTopic(topic); + + QueryWrapper wrapper = new QueryWrapper<>(); + wrapper.eq("group_id", groupId); + wrapper.eq("topic", topic); + if (minOffsetAlignmentMapper.selectCount(wrapper) > 0) { + minOffsetAlignmentMapper.delete(wrapper); + } + + alignmentDO.setThisOffset(thisJson.toString()); + alignmentDO.setThatOffset(thatJson.toString()); + minOffsetAlignmentMapper.insert(alignmentDO); + return ResponseData.create().success(); + } } diff --git a/src/main/resources/db/schema-h2.sql b/src/main/resources/db/schema-h2.sql index 2cf6f33..c4b3cb7 100644 --- a/src/main/resources/db/schema-h2.sql +++ b/src/main/resources/db/schema-h2.sql @@ -8,4 +8,16 @@ CREATE TABLE IF NOT EXISTS T_KAFKA_USER UPDATE_TIME TIMESTAMP NOT NULL DEFAULT NOW() COMMENT '更新时间', PRIMARY KEY (ID), UNIQUE (USERNAME) +); + +CREATE TABLE IF NOT EXISTS T_MIN_OFFSET_ALIGNMENT +( + ID IDENTITY NOT NULL COMMENT '主键ID', + GROUP_ID VARCHAR(128) NOT NULL DEFAULT '' COMMENT 'groupId', + TOPIC VARCHAR(128) NOT NULL DEFAULT '' COMMENT 'topic', + THAT_OFFSET VARCHAR(512) NOT NULL DEFAULT '' COMMENT 'min offset for that kafka cluster', + THIS_OFFSET VARCHAR(512) NOT NULL DEFAULT '' COMMENT 'min offset for this kafka cluster', + UPDATE_TIME TIMESTAMP NOT NULL DEFAULT NOW() COMMENT '更新时间', + PRIMARY KEY (ID), + UNIQUE (GROUP_ID, TOPIC) ); \ No newline at end of file diff --git a/src/main/scala/kafka/console/OperationConsole.scala b/src/main/scala/kafka/console/OperationConsole.scala index 38c1e77..58bd13b 100644 --- a/src/main/scala/kafka/console/OperationConsole.scala +++ b/src/main/scala/kafka/console/OperationConsole.scala @@ -92,4 +92,38 @@ class OperationConsole(config: KafkaConfig, topicConsole: TopicConsole, thatAdmin.close() } } + + import java.util + + /** + * check partition consistency and fetch the min offset for the topic. + * + * @param groupId group id. + * @param topic topic. + * @param props other kafka cluster config. + * @return _1: this min offset, _2: that min offset. + */ + def checkAndFetchMinOffset(groupId: String, topic: String, + props: Properties): (util.Map[TopicPartition, Long], util.Map[TopicPartition, Long]) = { + val thatAdmin = createAdminClient(props) + val thatConsumer = new KafkaConsumer(props, new ByteArrayDeserializer, new ByteArrayDeserializer) + + try { + val thisTopicPartitions = consumerConsole.listSubscribeTopics(groupId).get(topic).asScala.sortBy(_.partition()) + val thatTopicPartitionMap = thatAdmin.listConsumerGroupOffsets( + groupId + ).partitionsToOffsetAndMetadata.get(timeoutMs, TimeUnit.MILLISECONDS).asScala.filter(_._1.topic().equals(topic)) + val thatTopicPartitions = thatTopicPartitionMap.keySet.toList.sortBy(_.partition()) + if (thatTopicPartitions != thisTopicPartitions) { + throw new IllegalStateException("topic partition inconsistent.") + } + val thatMinTopicOffset = thatConsumer.beginningOffsets(thatTopicPartitions.asJava) + val thisMinTopicOffset = topicConsole.getTopicOffset(topic, thisTopicPartitions.asJava)._1 + + (thisMinTopicOffset, thatMinTopicOffset).asInstanceOf[(util.Map[TopicPartition, Long], util.Map[TopicPartition, Long])] + } finally { + thatAdmin.close() + thatConsumer.close() + } + } } diff --git a/ui/src/utils/api.js b/ui/src/utils/api.js index 7aa8886..585156b 100644 --- a/ui/src/utils/api.js +++ b/ui/src/utils/api.js @@ -130,4 +130,8 @@ export const KafkaOpApi = { url: "/op/sync/consumer/offset", method: "post", }, + minOffsetAlignment: { + url: "/op/sync/min/offset/alignment", + method: "post", + }, }; diff --git a/ui/src/views/op/MinOffsetAlignment.vue b/ui/src/views/op/MinOffsetAlignment.vue index 588116a..343a0fe 100644 --- a/ui/src/views/op/MinOffsetAlignment.vue +++ b/ui/src/views/op/MinOffsetAlignment.vue @@ -153,8 +153,8 @@ export default { } this.loading = true; request({ - url: KafkaOpApi.syncConsumerOffset.url, - method: KafkaOpApi.syncConsumerOffset.method, + url: KafkaOpApi.minOffsetAlignment.url, + method: KafkaOpApi.minOffsetAlignment.method, data: values, }).then((res) => { this.loading = false;