集群同步-》位移对齐记录展示
This commit is contained in:
@@ -0,0 +1,41 @@
|
||||
package com.xuxd.kafka.console.beans.vo;
|
||||
|
||||
import com.google.gson.Gson;
|
||||
import com.xuxd.kafka.console.beans.dos.MinOffsetAlignmentDO;
|
||||
import java.util.Map;
|
||||
import lombok.Data;
|
||||
|
||||
/**
|
||||
* kafka-console-ui.
|
||||
*
|
||||
* @author xuxd
|
||||
* @date 2021-10-27 19:44:14
|
||||
**/
|
||||
@Data
|
||||
public class OffsetAlignmentVO {
|
||||
|
||||
private static final Gson GSON = new Gson();
|
||||
|
||||
private Long id;
|
||||
|
||||
private String groupId;
|
||||
|
||||
private String topic;
|
||||
|
||||
private Map<String, Object> thatOffset;
|
||||
|
||||
private Map<String, Object> thisOffset;
|
||||
|
||||
private String updateTime;
|
||||
|
||||
public static OffsetAlignmentVO from(MinOffsetAlignmentDO alignmentDO) {
|
||||
OffsetAlignmentVO vo = new OffsetAlignmentVO();
|
||||
vo.id = alignmentDO.getId();
|
||||
vo.groupId = alignmentDO.getGroupId();
|
||||
vo.topic = alignmentDO.getTopic();
|
||||
vo.thatOffset = GSON.fromJson(alignmentDO.getThatOffset(), Map.class);
|
||||
vo.thisOffset = GSON.fromJson(alignmentDO.getThisOffset(), Map.class);
|
||||
vo.updateTime = alignmentDO.getUpdateTime();
|
||||
return vo;
|
||||
}
|
||||
}
|
||||
@@ -4,9 +4,12 @@ import com.xuxd.kafka.console.beans.dto.SyncDataDTO;
|
||||
import com.xuxd.kafka.console.service.OperationService;
|
||||
import org.apache.kafka.clients.admin.AdminClientConfig;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.web.bind.annotation.DeleteMapping;
|
||||
import org.springframework.web.bind.annotation.GetMapping;
|
||||
import org.springframework.web.bind.annotation.PostMapping;
|
||||
import org.springframework.web.bind.annotation.RequestBody;
|
||||
import org.springframework.web.bind.annotation.RequestMapping;
|
||||
import org.springframework.web.bind.annotation.RequestParam;
|
||||
import org.springframework.web.bind.annotation.RestController;
|
||||
|
||||
/**
|
||||
@@ -33,4 +36,14 @@ public class OperationController {
|
||||
dto.getProperties().put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, dto.getAddress());
|
||||
return operationService.minOffsetAlignment(dto.getGroupId(), dto.getTopic(), dto.getProperties());
|
||||
}
|
||||
|
||||
@GetMapping("/sync/alignment/list")
|
||||
public Object getAlignmentList() {
|
||||
return operationService.getAlignmentList();
|
||||
}
|
||||
|
||||
@DeleteMapping("/sync/alignment")
|
||||
public Object deleteAlignment(@RequestParam Long id) {
|
||||
return operationService.deleteAlignmentById(id);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -14,4 +14,8 @@ public interface OperationService {
|
||||
ResponseData syncConsumerOffset(String groupId, String topic, Properties thatProps);
|
||||
|
||||
ResponseData minOffsetAlignment(String groupId, String topic, Properties thatProps);
|
||||
|
||||
ResponseData getAlignmentList();
|
||||
|
||||
ResponseData deleteAlignmentById(Long id);
|
||||
}
|
||||
|
||||
@@ -5,9 +5,11 @@ import com.google.gson.Gson;
|
||||
import com.google.gson.JsonObject;
|
||||
import com.xuxd.kafka.console.beans.ResponseData;
|
||||
import com.xuxd.kafka.console.beans.dos.MinOffsetAlignmentDO;
|
||||
import com.xuxd.kafka.console.beans.vo.OffsetAlignmentVO;
|
||||
import com.xuxd.kafka.console.dao.MinOffsetAlignmentMapper;
|
||||
import com.xuxd.kafka.console.service.OperationService;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Properties;
|
||||
import kafka.console.OperationConsole;
|
||||
@@ -50,10 +52,10 @@ public class OperationServiceImpl implements OperationService {
|
||||
Map<String, Object> thatOffset = gson.fromJson(alignmentDO.getThatOffset(), Map.class);
|
||||
|
||||
Map<TopicPartition, Object> thisMinOffset = new HashMap<>(), thatMinOffset = new HashMap<>();
|
||||
thisOffset.forEach((k, v)-> {
|
||||
thisOffset.forEach((k, v) -> {
|
||||
thisMinOffset.put(new TopicPartition(topic, Integer.valueOf(k)), Long.valueOf(v.toString()));
|
||||
});
|
||||
thatOffset.forEach((k, v)-> {
|
||||
thatOffset.forEach((k, v) -> {
|
||||
thatMinOffset.put(new TopicPartition(topic, Integer.valueOf(k)), Long.valueOf(v.toString()));
|
||||
});
|
||||
|
||||
@@ -92,4 +94,17 @@ public class OperationServiceImpl implements OperationService {
|
||||
minOffsetAlignmentMapper.insert(alignmentDO);
|
||||
return ResponseData.create().success();
|
||||
}
|
||||
|
||||
@Override public ResponseData getAlignmentList() {
|
||||
QueryWrapper wrapper = new QueryWrapper();
|
||||
wrapper.orderByDesc("update_time");
|
||||
List<MinOffsetAlignmentDO> alignmentDOS = minOffsetAlignmentMapper.selectList(wrapper);
|
||||
|
||||
return ResponseData.create().data(alignmentDOS.stream().map(OffsetAlignmentVO::from)).success();
|
||||
}
|
||||
|
||||
@Override public ResponseData deleteAlignmentById(Long id) {
|
||||
minOffsetAlignmentMapper.deleteById(id);
|
||||
return ResponseData.create().success();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -134,4 +134,12 @@ export const KafkaOpApi = {
|
||||
url: "/op/sync/min/offset/alignment",
|
||||
method: "post",
|
||||
},
|
||||
getOffsetAlignmentList: {
|
||||
url: "/op/sync/alignment/list",
|
||||
method: "get",
|
||||
},
|
||||
deleteAlignment: {
|
||||
url: "/op/sync/alignment",
|
||||
method: "delete",
|
||||
},
|
||||
};
|
||||
|
||||
160
ui/src/views/op/OffsetAlignmentTable.vue
Normal file
160
ui/src/views/op/OffsetAlignmentTable.vue
Normal file
@@ -0,0 +1,160 @@
|
||||
<template>
|
||||
<a-modal
|
||||
title="位移对齐记录"
|
||||
:visible="show"
|
||||
:width="1200"
|
||||
:mask="false"
|
||||
:destroyOnClose="true"
|
||||
:footer="null"
|
||||
:maskClosable="false"
|
||||
@cancel="handleCancel"
|
||||
>
|
||||
<div>
|
||||
<a-spin :spinning="loading">
|
||||
<a-table :columns="columns" bordered :data-source="data" :rowKey="id">
|
||||
<ul slot="thisOffset" slot-scope="text">
|
||||
<ol v-for="(v, k) in text" :key="k">
|
||||
{{
|
||||
k
|
||||
}}:
|
||||
{{
|
||||
v
|
||||
}}
|
||||
</ol>
|
||||
</ul>
|
||||
<ul slot="thatOffset" slot-scope="text">
|
||||
<ol v-for="(v, k) in text" :key="k">
|
||||
{{
|
||||
k
|
||||
}}:
|
||||
{{
|
||||
v
|
||||
}}
|
||||
</ol>
|
||||
</ul>
|
||||
<div slot="operation" slot-scope="record">
|
||||
<a-popconfirm
|
||||
title="删除当前记录?"
|
||||
ok-text="确认"
|
||||
cancel-text="取消"
|
||||
@confirm="onDeleteOffsetAlignment(record)"
|
||||
>
|
||||
<a-button size="small" href="javascript:;" class="operation-btn"
|
||||
>删除</a-button
|
||||
>
|
||||
</a-popconfirm>
|
||||
</div>
|
||||
</a-table>
|
||||
</a-spin>
|
||||
</div>
|
||||
</a-modal>
|
||||
</template>
|
||||
|
||||
<script>
|
||||
import request from "@/utils/request";
|
||||
import { KafkaOpApi } from "@/utils/api";
|
||||
import notification from "ant-design-vue/es/notification";
|
||||
export default {
|
||||
name: "OffsetAlignmentTable",
|
||||
props: {
|
||||
visible: {
|
||||
type: Boolean,
|
||||
default: false,
|
||||
},
|
||||
},
|
||||
data() {
|
||||
return {
|
||||
columns: columns,
|
||||
show: this.visible,
|
||||
data: [],
|
||||
loading: false,
|
||||
};
|
||||
},
|
||||
watch: {
|
||||
visible(v) {
|
||||
this.show = v;
|
||||
if (this.show) {
|
||||
this.getAlignmentList();
|
||||
}
|
||||
},
|
||||
},
|
||||
methods: {
|
||||
getAlignmentList() {
|
||||
this.loading = true;
|
||||
request({
|
||||
url: KafkaOpApi.getOffsetAlignmentList.url,
|
||||
method: KafkaOpApi.getOffsetAlignmentList.method,
|
||||
}).then((res) => {
|
||||
this.loading = false;
|
||||
if (res.code != 0) {
|
||||
notification.error({
|
||||
message: "error",
|
||||
description: res.msg,
|
||||
});
|
||||
} else {
|
||||
this.data = res.data;
|
||||
}
|
||||
});
|
||||
},
|
||||
handleCancel() {
|
||||
this.data = [];
|
||||
this.$emit("closeOffsetAlignmentInfoDialog", {});
|
||||
},
|
||||
onDeleteOffsetAlignment(record) {
|
||||
this.loading = true;
|
||||
request({
|
||||
url: KafkaOpApi.deleteAlignment.url + "?id=" + record.id,
|
||||
method: KafkaOpApi.deleteAlignment.method,
|
||||
}).then((res) => {
|
||||
this.loading = false;
|
||||
if (res.code != 0) {
|
||||
notification.error({
|
||||
message: "error",
|
||||
description: res.msg,
|
||||
});
|
||||
} else {
|
||||
this.$message.success(res.msg);
|
||||
this.getAlignmentList();
|
||||
}
|
||||
});
|
||||
},
|
||||
},
|
||||
};
|
||||
|
||||
const columns = [
|
||||
{
|
||||
title: "消费组",
|
||||
dataIndex: "groupId",
|
||||
key: "groupId",
|
||||
},
|
||||
{
|
||||
title: "Topic",
|
||||
dataIndex: "topic",
|
||||
key: "topic",
|
||||
},
|
||||
{
|
||||
title: "当前集群标记位点",
|
||||
dataIndex: "thisOffset",
|
||||
key: "thisOffset",
|
||||
scopedSlots: { customRender: "thisOffset" },
|
||||
},
|
||||
{
|
||||
title: "外部集群标记位点",
|
||||
dataIndex: "thatOffset",
|
||||
key: "thatOffset",
|
||||
scopedSlots: { customRender: "thatOffset" },
|
||||
},
|
||||
{
|
||||
title: "更新时间",
|
||||
dataIndex: "updateTime",
|
||||
key: "updateTime",
|
||||
},
|
||||
{
|
||||
title: "操作",
|
||||
key: "operation",
|
||||
scopedSlots: { customRender: "operation" },
|
||||
},
|
||||
];
|
||||
</script>
|
||||
|
||||
<style scoped></style>
|
||||
@@ -26,7 +26,10 @@
|
||||
</a-button>
|
||||
<label>说明:</label>
|
||||
<span
|
||||
>同步消费位点时需要获取两端集群中订阅分区的最小位移进行消费位点计算,如需后面同步消费位点,在进行数据同步前,先进行最小位移对齐</span
|
||||
>同步消费位点时需要获取两端集群中订阅分区的最小位移进行消费位点计算,如需后面同步消费位点,在进行数据同步前,先进行最小位移对齐,
|
||||
点击右侧查看:</span
|
||||
><a href="javascript:;" @click="openOffsetAlignmentInfoDialog"
|
||||
>对齐信息</a
|
||||
>
|
||||
</p>
|
||||
<p>
|
||||
@@ -50,20 +53,26 @@
|
||||
@closeMinOffsetAlignmentDialog="closeMinOffsetAlignmentDialog"
|
||||
>
|
||||
</MinOffsetAlignment>
|
||||
<OffsetAlignmentTable
|
||||
:visible="syncData.showOffsetAlignmentInfoDialog"
|
||||
@closeOffsetAlignmentInfoDialog="closeOffsetAlignmentInfoDialog"
|
||||
></OffsetAlignmentTable>
|
||||
</div>
|
||||
</template>
|
||||
|
||||
<script>
|
||||
import SyncConsumerOffset from "@/views/op/SyncConsumerOffset";
|
||||
import MinOffsetAlignment from "@/views/op/MinOffsetAlignment";
|
||||
import OffsetAlignmentTable from "@/views/op/OffsetAlignmentTable";
|
||||
export default {
|
||||
name: "Operation",
|
||||
components: { SyncConsumerOffset, MinOffsetAlignment },
|
||||
components: { SyncConsumerOffset, MinOffsetAlignment, OffsetAlignmentTable },
|
||||
data() {
|
||||
return {
|
||||
syncData: {
|
||||
showSyncConsumerOffsetDialog: false,
|
||||
showMinOffsetAlignmentDialog: false,
|
||||
showOffsetAlignmentInfoDialog: false,
|
||||
},
|
||||
};
|
||||
},
|
||||
@@ -80,6 +89,12 @@ export default {
|
||||
closeMinOffsetAlignmentDialog() {
|
||||
this.syncData.showMinOffsetAlignmentDialog = false;
|
||||
},
|
||||
openOffsetAlignmentInfoDialog() {
|
||||
this.syncData.showOffsetAlignmentInfoDialog = true;
|
||||
},
|
||||
closeOffsetAlignmentInfoDialog() {
|
||||
this.syncData.showOffsetAlignmentInfoDialog = false;
|
||||
},
|
||||
},
|
||||
};
|
||||
</script>
|
||||
|
||||
Reference in New Issue
Block a user