重置消费位点

This commit is contained in:
许晓东
2021-10-22 19:58:27 +08:00
parent 36bb140c79
commit e0e8e027d8
6 changed files with 114 additions and 2 deletions

View File

@@ -23,6 +23,8 @@ public class ResetOffsetDTO {
private int partition;
private long offset;
public interface Level {
int TOPIC = 1;
int PARTITION = 2;
@@ -32,5 +34,6 @@ public class ResetOffsetDTO {
int EARLIEST = 1;
int LATEST = 2;
int TIMESTAMP = 3;
int SPECIAL = 4;
}
}

View File

@@ -14,6 +14,7 @@ import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.common.ConsumerGroupState;
import org.apache.kafka.common.TopicPartition;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.DeleteMapping;
import org.springframework.web.bind.annotation.GetMapping;
@@ -90,6 +91,14 @@ public class ConsumerController {
}
break;
case ResetOffsetDTO.Level.PARTITION:
switch (offsetDTO.getType()) {
case ResetOffsetDTO.Type
.SPECIAL:
res = consumerService.resetPartitionToTargetOffset(offsetDTO.getGroupId(), new TopicPartition(offsetDTO.getTopic(), offsetDTO.getPartition()), offsetDTO.getOffset());
break;
default:
return ResponseData.create().failed("unknown type");
}
break;
default:
return ResponseData.create().failed("unknown level");

View File

@@ -5,6 +5,7 @@ import java.util.List;
import java.util.Set;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.common.ConsumerGroupState;
import org.apache.kafka.common.TopicPartition;
/**
* kafka-console-ui.
@@ -25,4 +26,6 @@ public interface ConsumerService {
ResponseData addSubscription(String groupId, String topic);
ResponseData resetOffsetToEndpoint(String groupId, String topic, OffsetResetStrategy strategy);
ResponseData resetPartitionToTargetOffset(String groupId, TopicPartition partition, long offset);
}

View File

@@ -21,6 +21,7 @@ import org.apache.kafka.clients.admin.ConsumerGroupDescription;
import org.apache.kafka.clients.admin.MemberDescription;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.common.ConsumerGroupState;
import org.apache.kafka.common.TopicPartition;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import scala.Tuple2;
@@ -127,4 +128,9 @@ public class ConsumerServiceImpl implements ConsumerService {
Tuple2<Object, String> tuple2 = consumerConsole.resetOffsetToEndpoint(groupId, topic, strategy);
return (boolean) tuple2._1() ? ResponseData.create().success() : ResponseData.create().failed(tuple2._2());
}
@Override public ResponseData resetPartitionToTargetOffset(String groupId, TopicPartition partition, long offset) {
Tuple2<Object, String> tuple2 = consumerConsole.resetPartitionToTargetOffset(groupId, partition, offset);
return (boolean) tuple2._1() ? ResponseData.create().success() : ResponseData.create().failed(tuple2._2());
}
}

View File

@@ -163,6 +163,16 @@ class ConsumerConsole(config: KafkaConfig) extends KafkaConsole(config: KafkaCon
}, props).asInstanceOf[(Boolean, String)]
}
def resetPartitionToTargetOffset(groupId: String, partition: TopicPartition, offset: Long): (Boolean, String) = {
withAdminClientAndCatchError(admin => {
admin.alterConsumerGroupOffsets(groupId, Map(partition -> new OffsetAndMetadata(offset)).asJava).all().get(timeoutMs, TimeUnit.MILLISECONDS)
(true, "")
}, e => {
log.error("resetPartitionToTargetOffset error.", e)
(false, e.getMessage)
}).asInstanceOf[(Boolean, String)]
}
private def describeConsumerGroups(groupIds: util.Set[String]): mutable.Map[String, ConsumerGroupDescription] = {
withAdminClientAndCatchError(admin => {
admin.describeConsumerGroups(groupIds).describedGroups().asScala.map {

View File

@@ -53,17 +53,56 @@
<span slot="clientId" slot-scope="text, record">
<span v-if="text"> {{ text }}@{{ record.host }} </span>
</span>
<div slot="operation" slot-scope="{}">
<div slot="operation" slot-scope="record">
<a-button
type="primary"
size="small"
href="javascript:;"
class="operation-btn"
@click="
openResetPartitionOffsetDialog(record.topic, record.partition)
"
>重置位点
</a-button>
</div>
</a-table>
</div>
<a-modal
id="resetPartitionOffsetModal"
:visible="showResetPartitionOffsetDialog"
:title="'重置' + select.topic + '[' + select.partition + ']消费位点'"
:destroyOnClose="true"
@cancel="closeResetPartitionOffsetDialog"
>
<template slot="footer">
<a-button key="back" @click="closeResetPartitionOffsetDialog">
取消
</a-button>
<a-button key="submit" type="primary" @click="resetPartitionOffset">
确认
</a-button>
</template>
<a-form
:form="resetPartitionOffsetForm"
:label-col="{ span: 8 }"
:wrapper-col="{ span: 12 }"
>
<a-form-item label="重置消费位点到">
<a-input-number
:min="0"
v-decorator="[
'offset',
{
initialValue: 0,
rules: [{ required: true, message: '输入消费位点!' }],
},
]"
/>
</a-form-item>
</a-form>
</a-modal>
</a-spin>
</div>
</a-modal>
@@ -92,6 +131,14 @@ export default {
show: this.visible,
data: [],
loading: false,
showResetPartitionOffsetDialog: false,
select: {
topic: "",
partition: 0,
},
resetPartitionOffsetForm: this.$form.createForm(this, {
name: "resetPartitionOffsetForm",
}),
};
},
watch: {
@@ -125,11 +172,19 @@ export default {
this.$emit("closeConsumerDetailDialog", {});
},
resetTopicOffsetToEndpoint(groupId, topic, type) {
this.requestResetOffset({
groupId: groupId,
topic: topic,
level: 1,
type: type,
});
},
requestResetOffset(data, callbackOnSuccess) {
this.loading = true;
request({
url: KafkaConsumerApi.resetOffset.url,
method: KafkaConsumerApi.resetOffset.method,
data: { groupId: groupId, topic: topic, level: 1, type: type },
data: data,
}).then((res) => {
this.loading = false;
if (res.code != 0) {
@@ -140,6 +195,29 @@ export default {
} else {
this.$message.success(res.msg);
this.getConsumerDetail();
if (callbackOnSuccess) {
callbackOnSuccess();
}
}
});
},
openResetPartitionOffsetDialog(topic, partition) {
this.showResetPartitionOffsetDialog = true;
this.select.topic = topic;
this.select.partition = partition;
},
closeResetPartitionOffsetDialog() {
this.showResetPartitionOffsetDialog = false;
},
resetPartitionOffset() {
this.resetPartitionOffsetForm.validateFields((err, values) => {
if (!err) {
const data = Object.assign({}, values);
Object.assign(data, this.select);
data.groupId = this.group;
data.level = 2;
data.type = 4;
this.requestResetOffset(data, this.closeResetPartitionOffsetDialog());
}
});
},
@@ -186,4 +264,7 @@ const columns = [
.color-font {
color: dodgerblue;
}
#resetPartitionOffsetModal .ant-input-number {
width: 100% !important;
}
</style>