From a06b6dbb5fdc009812c11c9d9c362a2e3194a830 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?=E8=AE=B8=E6=99=93=E4=B8=9C?= <763795151@qq.com>
Date: Tue, 26 Oct 2021 20:20:46 +0800
Subject: [PATCH] =?UTF-8?q?=E9=9B=86=E7=BE=A4=E5=90=8C=E6=AD=A5-=E3=80=8B?=
=?UTF-8?q?=E6=9C=80=E5=B0=8F=E4=BD=8D=E7=A7=BB=E5=AF=B9=E9=BD=90?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
pom.xml | 8 ++++
.../beans/dos/MinOffsetAlignmentDO.java | 30 ++++++++++++
.../controller/OperationController.java | 6 +++
.../console/dao/MinOffsetAlignmentMapper.java | 13 +++++
.../console/service/OperationService.java | 2 +
.../service/impl/OperationServiceImpl.java | 47 +++++++++++++++++++
src/main/resources/db/schema-h2.sql | 12 +++++
.../kafka/console/OperationConsole.scala | 34 ++++++++++++++
ui/src/utils/api.js | 4 ++
ui/src/views/op/MinOffsetAlignment.vue | 4 +-
10 files changed, 158 insertions(+), 2 deletions(-)
create mode 100644 src/main/java/com/xuxd/kafka/console/beans/dos/MinOffsetAlignmentDO.java
create mode 100644 src/main/java/com/xuxd/kafka/console/dao/MinOffsetAlignmentMapper.java
diff --git a/pom.xml b/pom.xml
index d19b5e3..72053fe 100644
--- a/pom.xml
+++ b/pom.xml
@@ -101,6 +101,14 @@
1.18.20
provided
+
+
+
+ com.google.code.gson
+ gson
+ 2.8.8
+
+
diff --git a/src/main/java/com/xuxd/kafka/console/beans/dos/MinOffsetAlignmentDO.java b/src/main/java/com/xuxd/kafka/console/beans/dos/MinOffsetAlignmentDO.java
new file mode 100644
index 0000000..6f43999
--- /dev/null
+++ b/src/main/java/com/xuxd/kafka/console/beans/dos/MinOffsetAlignmentDO.java
@@ -0,0 +1,30 @@
+package com.xuxd.kafka.console.beans.dos;
+
+import com.baomidou.mybatisplus.annotation.IdType;
+import com.baomidou.mybatisplus.annotation.TableId;
+import com.baomidou.mybatisplus.annotation.TableName;
+import lombok.Data;
+
+/**
+ * kafka-console-ui.
+ *
+ * @author xuxd
+ * @date 2021-10-26 10:32:05
+ **/
+@Data
+@TableName("t_min_offset_alignment")
+public class MinOffsetAlignmentDO {
+
+ @TableId(type = IdType.AUTO)
+ private Long id;
+
+ private String groupId;
+
+ private String topic;
+
+ private String thatOffset;
+
+ private String thisOffset;
+
+ private String updateTime;
+}
diff --git a/src/main/java/com/xuxd/kafka/console/controller/OperationController.java b/src/main/java/com/xuxd/kafka/console/controller/OperationController.java
index 3dec9ee..2f2503e 100644
--- a/src/main/java/com/xuxd/kafka/console/controller/OperationController.java
+++ b/src/main/java/com/xuxd/kafka/console/controller/OperationController.java
@@ -27,4 +27,10 @@ public class OperationController {
dto.getProperties().put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, dto.getAddress());
return operationService.syncConsumerOffset(dto.getGroupId(), dto.getTopic(), dto.getProperties());
}
+
+ @PostMapping("/sync/min/offset/alignment")
+ public Object minOffsetAlignment(@RequestBody SyncDataDTO dto) {
+ dto.getProperties().put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, dto.getAddress());
+ return operationService.minOffsetAlignment(dto.getGroupId(), dto.getTopic(), dto.getProperties());
+ }
}
diff --git a/src/main/java/com/xuxd/kafka/console/dao/MinOffsetAlignmentMapper.java b/src/main/java/com/xuxd/kafka/console/dao/MinOffsetAlignmentMapper.java
new file mode 100644
index 0000000..07173f8
--- /dev/null
+++ b/src/main/java/com/xuxd/kafka/console/dao/MinOffsetAlignmentMapper.java
@@ -0,0 +1,13 @@
+package com.xuxd.kafka.console.dao;
+
+import com.baomidou.mybatisplus.core.mapper.BaseMapper;
+import com.xuxd.kafka.console.beans.dos.MinOffsetAlignmentDO;
+
+/**
+ * kafka-console-ui.
+ *
+ * @author xuxd
+ * @date 2021-10-26 10:33:55
+ **/
+public interface MinOffsetAlignmentMapper extends BaseMapper {
+}
diff --git a/src/main/java/com/xuxd/kafka/console/service/OperationService.java b/src/main/java/com/xuxd/kafka/console/service/OperationService.java
index 1f5364b..8361af8 100644
--- a/src/main/java/com/xuxd/kafka/console/service/OperationService.java
+++ b/src/main/java/com/xuxd/kafka/console/service/OperationService.java
@@ -12,4 +12,6 @@ import java.util.Properties;
public interface OperationService {
ResponseData syncConsumerOffset(String groupId, String topic, Properties thatProps);
+
+ ResponseData minOffsetAlignment(String groupId, String topic, Properties thatProps);
}
diff --git a/src/main/java/com/xuxd/kafka/console/service/impl/OperationServiceImpl.java b/src/main/java/com/xuxd/kafka/console/service/impl/OperationServiceImpl.java
index 4955e96..b238880 100644
--- a/src/main/java/com/xuxd/kafka/console/service/impl/OperationServiceImpl.java
+++ b/src/main/java/com/xuxd/kafka/console/service/impl/OperationServiceImpl.java
@@ -1,9 +1,17 @@
package com.xuxd.kafka.console.service.impl;
+import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
+import com.google.gson.Gson;
+import com.google.gson.JsonObject;
import com.xuxd.kafka.console.beans.ResponseData;
+import com.xuxd.kafka.console.beans.dos.MinOffsetAlignmentDO;
+import com.xuxd.kafka.console.dao.MinOffsetAlignmentMapper;
import com.xuxd.kafka.console.service.OperationService;
+import java.util.Map;
import java.util.Properties;
import kafka.console.OperationConsole;
+import org.apache.kafka.common.TopicPartition;
+import org.springframework.beans.factory.ObjectProvider;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import scala.Tuple2;
@@ -17,13 +25,52 @@ import scala.Tuple2;
@Service
public class OperationServiceImpl implements OperationService {
+ private Gson gson = new Gson();
+
@Autowired
private OperationConsole operationConsole;
+ private MinOffsetAlignmentMapper minOffsetAlignmentMapper;
+
+ public OperationServiceImpl(ObjectProvider minOffsetAlignmentMapper) {
+ this.minOffsetAlignmentMapper = minOffsetAlignmentMapper.getIfAvailable();
+ }
+
@Override public ResponseData syncConsumerOffset(String groupId, String topic, Properties thatProps) {
Tuple2