diff --git a/src/main/java/com/xuxd/kafka/console/beans/vo/OffsetAlignmentVO.java b/src/main/java/com/xuxd/kafka/console/beans/vo/OffsetAlignmentVO.java new file mode 100644 index 0000000..55b5ce5 --- /dev/null +++ b/src/main/java/com/xuxd/kafka/console/beans/vo/OffsetAlignmentVO.java @@ -0,0 +1,41 @@ +package com.xuxd.kafka.console.beans.vo; + +import com.google.gson.Gson; +import com.xuxd.kafka.console.beans.dos.MinOffsetAlignmentDO; +import java.util.Map; +import lombok.Data; + +/** + * kafka-console-ui. + * + * @author xuxd + * @date 2021-10-27 19:44:14 + **/ +@Data +public class OffsetAlignmentVO { + + private static final Gson GSON = new Gson(); + + private Long id; + + private String groupId; + + private String topic; + + private Map thatOffset; + + private Map thisOffset; + + private String updateTime; + + public static OffsetAlignmentVO from(MinOffsetAlignmentDO alignmentDO) { + OffsetAlignmentVO vo = new OffsetAlignmentVO(); + vo.id = alignmentDO.getId(); + vo.groupId = alignmentDO.getGroupId(); + vo.topic = alignmentDO.getTopic(); + vo.thatOffset = GSON.fromJson(alignmentDO.getThatOffset(), Map.class); + vo.thisOffset = GSON.fromJson(alignmentDO.getThisOffset(), Map.class); + vo.updateTime = alignmentDO.getUpdateTime(); + return vo; + } +} diff --git a/src/main/java/com/xuxd/kafka/console/controller/OperationController.java b/src/main/java/com/xuxd/kafka/console/controller/OperationController.java index 2f2503e..ead2d52 100644 --- a/src/main/java/com/xuxd/kafka/console/controller/OperationController.java +++ b/src/main/java/com/xuxd/kafka/console/controller/OperationController.java @@ -4,9 +4,12 @@ import com.xuxd.kafka.console.beans.dto.SyncDataDTO; import com.xuxd.kafka.console.service.OperationService; import org.apache.kafka.clients.admin.AdminClientConfig; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.web.bind.annotation.DeleteMapping; +import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; /** @@ -33,4 +36,14 @@ public class OperationController { dto.getProperties().put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, dto.getAddress()); return operationService.minOffsetAlignment(dto.getGroupId(), dto.getTopic(), dto.getProperties()); } + + @GetMapping("/sync/alignment/list") + public Object getAlignmentList() { + return operationService.getAlignmentList(); + } + + @DeleteMapping("/sync/alignment") + public Object deleteAlignment(@RequestParam Long id) { + return operationService.deleteAlignmentById(id); + } } diff --git a/src/main/java/com/xuxd/kafka/console/service/OperationService.java b/src/main/java/com/xuxd/kafka/console/service/OperationService.java index 8361af8..67ea35e 100644 --- a/src/main/java/com/xuxd/kafka/console/service/OperationService.java +++ b/src/main/java/com/xuxd/kafka/console/service/OperationService.java @@ -14,4 +14,8 @@ public interface OperationService { ResponseData syncConsumerOffset(String groupId, String topic, Properties thatProps); ResponseData minOffsetAlignment(String groupId, String topic, Properties thatProps); + + ResponseData getAlignmentList(); + + ResponseData deleteAlignmentById(Long id); } diff --git a/src/main/java/com/xuxd/kafka/console/service/impl/OperationServiceImpl.java b/src/main/java/com/xuxd/kafka/console/service/impl/OperationServiceImpl.java index 53b5afe..863b2c2 100644 --- a/src/main/java/com/xuxd/kafka/console/service/impl/OperationServiceImpl.java +++ b/src/main/java/com/xuxd/kafka/console/service/impl/OperationServiceImpl.java @@ -5,9 +5,11 @@ import com.google.gson.Gson; import com.google.gson.JsonObject; import com.xuxd.kafka.console.beans.ResponseData; import com.xuxd.kafka.console.beans.dos.MinOffsetAlignmentDO; +import com.xuxd.kafka.console.beans.vo.OffsetAlignmentVO; import com.xuxd.kafka.console.dao.MinOffsetAlignmentMapper; import com.xuxd.kafka.console.service.OperationService; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Properties; import kafka.console.OperationConsole; @@ -50,10 +52,10 @@ public class OperationServiceImpl implements OperationService { Map thatOffset = gson.fromJson(alignmentDO.getThatOffset(), Map.class); Map thisMinOffset = new HashMap<>(), thatMinOffset = new HashMap<>(); - thisOffset.forEach((k, v)-> { + thisOffset.forEach((k, v) -> { thisMinOffset.put(new TopicPartition(topic, Integer.valueOf(k)), Long.valueOf(v.toString())); }); - thatOffset.forEach((k, v)-> { + thatOffset.forEach((k, v) -> { thatMinOffset.put(new TopicPartition(topic, Integer.valueOf(k)), Long.valueOf(v.toString())); }); @@ -92,4 +94,17 @@ public class OperationServiceImpl implements OperationService { minOffsetAlignmentMapper.insert(alignmentDO); return ResponseData.create().success(); } + + @Override public ResponseData getAlignmentList() { + QueryWrapper wrapper = new QueryWrapper(); + wrapper.orderByDesc("update_time"); + List alignmentDOS = minOffsetAlignmentMapper.selectList(wrapper); + + return ResponseData.create().data(alignmentDOS.stream().map(OffsetAlignmentVO::from)).success(); + } + + @Override public ResponseData deleteAlignmentById(Long id) { + minOffsetAlignmentMapper.deleteById(id); + return ResponseData.create().success(); + } } diff --git a/ui/src/utils/api.js b/ui/src/utils/api.js index 585156b..5a4a31f 100644 --- a/ui/src/utils/api.js +++ b/ui/src/utils/api.js @@ -134,4 +134,12 @@ export const KafkaOpApi = { url: "/op/sync/min/offset/alignment", method: "post", }, + getOffsetAlignmentList: { + url: "/op/sync/alignment/list", + method: "get", + }, + deleteAlignment: { + url: "/op/sync/alignment", + method: "delete", + }, }; diff --git a/ui/src/views/op/OffsetAlignmentTable.vue b/ui/src/views/op/OffsetAlignmentTable.vue new file mode 100644 index 0000000..471eeb0 --- /dev/null +++ b/ui/src/views/op/OffsetAlignmentTable.vue @@ -0,0 +1,160 @@ + + + + + diff --git a/ui/src/views/op/Operation.vue b/ui/src/views/op/Operation.vue index b4da1bb..c9aa5d5 100644 --- a/ui/src/views/op/Operation.vue +++ b/ui/src/views/op/Operation.vue @@ -26,7 +26,10 @@ 同步消费位点时需要获取两端集群中订阅分区的最小位移进行消费位点计算,如需后面同步消费位点,在进行数据同步前,先进行最小位移对齐同步消费位点时需要获取两端集群中订阅分区的最小位移进行消费位点计算,如需后面同步消费位点,在进行数据同步前,先进行最小位移对齐, + 点击右侧查看:对齐信息

@@ -50,20 +53,26 @@ @closeMinOffsetAlignmentDialog="closeMinOffsetAlignmentDialog" > +