diff --git a/src/main/java/com/xuxd/kafka/console/beans/dto/ResetOffsetDTO.java b/src/main/java/com/xuxd/kafka/console/beans/dto/ResetOffsetDTO.java index 996fe5b..0d3d92f 100644 --- a/src/main/java/com/xuxd/kafka/console/beans/dto/ResetOffsetDTO.java +++ b/src/main/java/com/xuxd/kafka/console/beans/dto/ResetOffsetDTO.java @@ -23,6 +23,8 @@ public class ResetOffsetDTO { private int partition; + private long offset; + public interface Level { int TOPIC = 1; int PARTITION = 2; @@ -32,5 +34,6 @@ public class ResetOffsetDTO { int EARLIEST = 1; int LATEST = 2; int TIMESTAMP = 3; + int SPECIAL = 4; } } diff --git a/src/main/java/com/xuxd/kafka/console/controller/ConsumerController.java b/src/main/java/com/xuxd/kafka/console/controller/ConsumerController.java index dab52be..c45e499 100644 --- a/src/main/java/com/xuxd/kafka/console/controller/ConsumerController.java +++ b/src/main/java/com/xuxd/kafka/console/controller/ConsumerController.java @@ -14,6 +14,7 @@ import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang3.StringUtils; import org.apache.kafka.clients.consumer.OffsetResetStrategy; import org.apache.kafka.common.ConsumerGroupState; +import org.apache.kafka.common.TopicPartition; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.DeleteMapping; import org.springframework.web.bind.annotation.GetMapping; @@ -90,6 +91,14 @@ public class ConsumerController { } break; case ResetOffsetDTO.Level.PARTITION: + switch (offsetDTO.getType()) { + case ResetOffsetDTO.Type + .SPECIAL: + res = consumerService.resetPartitionToTargetOffset(offsetDTO.getGroupId(), new TopicPartition(offsetDTO.getTopic(), offsetDTO.getPartition()), offsetDTO.getOffset()); + break; + default: + return ResponseData.create().failed("unknown type"); + } break; default: return ResponseData.create().failed("unknown level"); diff --git a/src/main/java/com/xuxd/kafka/console/service/ConsumerService.java b/src/main/java/com/xuxd/kafka/console/service/ConsumerService.java index 6467339..52b3b6c 100644 --- a/src/main/java/com/xuxd/kafka/console/service/ConsumerService.java +++ b/src/main/java/com/xuxd/kafka/console/service/ConsumerService.java @@ -5,6 +5,7 @@ import java.util.List; import java.util.Set; import org.apache.kafka.clients.consumer.OffsetResetStrategy; import org.apache.kafka.common.ConsumerGroupState; +import org.apache.kafka.common.TopicPartition; /** * kafka-console-ui. @@ -25,4 +26,6 @@ public interface ConsumerService { ResponseData addSubscription(String groupId, String topic); ResponseData resetOffsetToEndpoint(String groupId, String topic, OffsetResetStrategy strategy); + + ResponseData resetPartitionToTargetOffset(String groupId, TopicPartition partition, long offset); } diff --git a/src/main/java/com/xuxd/kafka/console/service/impl/ConsumerServiceImpl.java b/src/main/java/com/xuxd/kafka/console/service/impl/ConsumerServiceImpl.java index e1701b9..decc0dd 100644 --- a/src/main/java/com/xuxd/kafka/console/service/impl/ConsumerServiceImpl.java +++ b/src/main/java/com/xuxd/kafka/console/service/impl/ConsumerServiceImpl.java @@ -21,6 +21,7 @@ import org.apache.kafka.clients.admin.ConsumerGroupDescription; import org.apache.kafka.clients.admin.MemberDescription; import org.apache.kafka.clients.consumer.OffsetResetStrategy; import org.apache.kafka.common.ConsumerGroupState; +import org.apache.kafka.common.TopicPartition; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import scala.Tuple2; @@ -127,4 +128,9 @@ public class ConsumerServiceImpl implements ConsumerService { Tuple2 tuple2 = consumerConsole.resetOffsetToEndpoint(groupId, topic, strategy); return (boolean) tuple2._1() ? ResponseData.create().success() : ResponseData.create().failed(tuple2._2()); } + + @Override public ResponseData resetPartitionToTargetOffset(String groupId, TopicPartition partition, long offset) { + Tuple2 tuple2 = consumerConsole.resetPartitionToTargetOffset(groupId, partition, offset); + return (boolean) tuple2._1() ? ResponseData.create().success() : ResponseData.create().failed(tuple2._2()); + } } diff --git a/src/main/scala/kafka/console/ConsumerConsole.scala b/src/main/scala/kafka/console/ConsumerConsole.scala index c0de58a..0843d34 100644 --- a/src/main/scala/kafka/console/ConsumerConsole.scala +++ b/src/main/scala/kafka/console/ConsumerConsole.scala @@ -163,6 +163,16 @@ class ConsumerConsole(config: KafkaConfig) extends KafkaConsole(config: KafkaCon }, props).asInstanceOf[(Boolean, String)] } + def resetPartitionToTargetOffset(groupId: String, partition: TopicPartition, offset: Long): (Boolean, String) = { + withAdminClientAndCatchError(admin => { + admin.alterConsumerGroupOffsets(groupId, Map(partition -> new OffsetAndMetadata(offset)).asJava).all().get(timeoutMs, TimeUnit.MILLISECONDS) + (true, "") + }, e => { + log.error("resetPartitionToTargetOffset error.", e) + (false, e.getMessage) + }).asInstanceOf[(Boolean, String)] + } + private def describeConsumerGroups(groupIds: util.Set[String]): mutable.Map[String, ConsumerGroupDescription] = { withAdminClientAndCatchError(admin => { admin.describeConsumerGroups(groupIds).describedGroups().asScala.map { diff --git a/ui/src/views/group/ConsumerDetail.vue b/ui/src/views/group/ConsumerDetail.vue index 2d3cab2..7f67478 100644 --- a/ui/src/views/group/ConsumerDetail.vue +++ b/ui/src/views/group/ConsumerDetail.vue @@ -53,17 +53,56 @@ {{ text }}@{{ record.host }} -
+
重置位点
+ + + + + + + + + + @@ -92,6 +131,14 @@ export default { show: this.visible, data: [], loading: false, + showResetPartitionOffsetDialog: false, + select: { + topic: "", + partition: 0, + }, + resetPartitionOffsetForm: this.$form.createForm(this, { + name: "resetPartitionOffsetForm", + }), }; }, watch: { @@ -125,11 +172,19 @@ export default { this.$emit("closeConsumerDetailDialog", {}); }, resetTopicOffsetToEndpoint(groupId, topic, type) { + this.requestResetOffset({ + groupId: groupId, + topic: topic, + level: 1, + type: type, + }); + }, + requestResetOffset(data, callbackOnSuccess) { this.loading = true; request({ url: KafkaConsumerApi.resetOffset.url, method: KafkaConsumerApi.resetOffset.method, - data: { groupId: groupId, topic: topic, level: 1, type: type }, + data: data, }).then((res) => { this.loading = false; if (res.code != 0) { @@ -140,6 +195,29 @@ export default { } else { this.$message.success(res.msg); this.getConsumerDetail(); + if (callbackOnSuccess) { + callbackOnSuccess(); + } + } + }); + }, + openResetPartitionOffsetDialog(topic, partition) { + this.showResetPartitionOffsetDialog = true; + this.select.topic = topic; + this.select.partition = partition; + }, + closeResetPartitionOffsetDialog() { + this.showResetPartitionOffsetDialog = false; + }, + resetPartitionOffset() { + this.resetPartitionOffsetForm.validateFields((err, values) => { + if (!err) { + const data = Object.assign({}, values); + Object.assign(data, this.select); + data.groupId = this.group; + data.level = 2; + data.type = 4; + this.requestResetOffset(data, this.closeResetPartitionOffsetDialog()); } }); }, @@ -186,4 +264,7 @@ const columns = [ .color-font { color: dodgerblue; } +#resetPartitionOffsetModal .ant-input-number { + width: 100% !important; +}