diff --git a/src/main/java/com/xuxd/kafka/console/beans/dto/ResetOffsetDTO.java b/src/main/java/com/xuxd/kafka/console/beans/dto/ResetOffsetDTO.java new file mode 100644 index 0000000..996fe5b --- /dev/null +++ b/src/main/java/com/xuxd/kafka/console/beans/dto/ResetOffsetDTO.java @@ -0,0 +1,36 @@ +package com.xuxd.kafka.console.beans.dto; + +import lombok.Data; + +/** + * kafka-console-ui. + * + * @author xuxd + * @date 2021-10-22 16:21:28 + **/ +@Data +public class ResetOffsetDTO { + + // 重置粒度:1-> topic,2->partition + private int level; + + // 重置类型:1-> earliest, 2-> latest, 3-> timestamp + private int type; + + private String groupId; + + private String topic; + + private int partition; + + public interface Level { + int TOPIC = 1; + int PARTITION = 2; + } + + public interface Type { + int EARLIEST = 1; + int LATEST = 2; + int TIMESTAMP = 3; + } +} diff --git a/src/main/java/com/xuxd/kafka/console/controller/ConsumerController.java b/src/main/java/com/xuxd/kafka/console/controller/ConsumerController.java index 163fe81..dab52be 100644 --- a/src/main/java/com/xuxd/kafka/console/controller/ConsumerController.java +++ b/src/main/java/com/xuxd/kafka/console/controller/ConsumerController.java @@ -1,7 +1,9 @@ package com.xuxd.kafka.console.controller; +import com.xuxd.kafka.console.beans.ResponseData; import com.xuxd.kafka.console.beans.dto.AddSubscriptionDTO; import com.xuxd.kafka.console.beans.dto.QueryConsumerGroupDTO; +import com.xuxd.kafka.console.beans.dto.ResetOffsetDTO; import com.xuxd.kafka.console.service.ConsumerService; import java.util.Collections; import java.util.HashSet; @@ -10,6 +12,7 @@ import java.util.Objects; import java.util.Set; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang3.StringUtils; +import org.apache.kafka.clients.consumer.OffsetResetStrategy; import org.apache.kafka.common.ConsumerGroupState; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.DeleteMapping; @@ -66,4 +69,32 @@ public class ConsumerController { public Object addSubscription(@RequestBody AddSubscriptionDTO subscriptionDTO) { return consumerService.addSubscription(subscriptionDTO.getGroupId(), subscriptionDTO.getTopic()); } + + @PostMapping("/reset/offset") + public Object restOffset(@RequestBody ResetOffsetDTO offsetDTO) { + ResponseData res = ResponseData.create().failed("unknown"); + switch (offsetDTO.getLevel()) { + case ResetOffsetDTO.Level.TOPIC: + switch (offsetDTO.getType()) { + case ResetOffsetDTO.Type + .EARLIEST: + res = consumerService.resetOffsetToEndpoint(offsetDTO.getGroupId(), offsetDTO.getTopic(), OffsetResetStrategy.EARLIEST); + break; + case ResetOffsetDTO.Type.LATEST: + res = consumerService.resetOffsetToEndpoint(offsetDTO.getGroupId(), offsetDTO.getTopic(), OffsetResetStrategy.LATEST); + break; + case ResetOffsetDTO.Type.TIMESTAMP: + break; + default: + return ResponseData.create().failed("unknown type"); + } + break; + case ResetOffsetDTO.Level.PARTITION: + break; + default: + return ResponseData.create().failed("unknown level"); + } + + return res; + } } diff --git a/src/main/java/com/xuxd/kafka/console/service/ConsumerService.java b/src/main/java/com/xuxd/kafka/console/service/ConsumerService.java index 53c2df2..6467339 100644 --- a/src/main/java/com/xuxd/kafka/console/service/ConsumerService.java +++ b/src/main/java/com/xuxd/kafka/console/service/ConsumerService.java @@ -3,6 +3,7 @@ package com.xuxd.kafka.console.service; import com.xuxd.kafka.console.beans.ResponseData; import java.util.List; import java.util.Set; +import org.apache.kafka.clients.consumer.OffsetResetStrategy; import org.apache.kafka.common.ConsumerGroupState; /** @@ -22,4 +23,6 @@ public interface ConsumerService { ResponseData getConsumerDetail(String groupId); ResponseData addSubscription(String groupId, String topic); + + ResponseData resetOffsetToEndpoint(String groupId, String topic, OffsetResetStrategy strategy); } diff --git a/src/main/java/com/xuxd/kafka/console/service/impl/ConsumerServiceImpl.java b/src/main/java/com/xuxd/kafka/console/service/impl/ConsumerServiceImpl.java index 92c2798..e1701b9 100644 --- a/src/main/java/com/xuxd/kafka/console/service/impl/ConsumerServiceImpl.java +++ b/src/main/java/com/xuxd/kafka/console/service/impl/ConsumerServiceImpl.java @@ -19,6 +19,7 @@ import kafka.console.ConsumerConsole; import org.apache.commons.collections.CollectionUtils; import org.apache.kafka.clients.admin.ConsumerGroupDescription; import org.apache.kafka.clients.admin.MemberDescription; +import org.apache.kafka.clients.consumer.OffsetResetStrategy; import org.apache.kafka.common.ConsumerGroupState; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; @@ -37,7 +38,7 @@ public class ConsumerServiceImpl implements ConsumerService { private ConsumerConsole consumerConsole; @Override public ResponseData getConsumerGroupList(List groupIds, Set states) { - String simulateGroup = "inner_xxx_not_exit_group_###"; + String simulateGroup = "inner_xxx_not_exit_group_###" + System.currentTimeMillis(); Set groupList = new HashSet<>(); if (groupIds != null && !groupIds.isEmpty()) { if (states != null && !states.isEmpty()) { @@ -121,4 +122,9 @@ public class ConsumerServiceImpl implements ConsumerService { return (boolean) tuple2._1() ? ResponseData.create().success() : ResponseData.create().failed(tuple2._2()); } + + @Override public ResponseData resetOffsetToEndpoint(String groupId, String topic, OffsetResetStrategy strategy) { + Tuple2 tuple2 = consumerConsole.resetOffsetToEndpoint(groupId, topic, strategy); + return (boolean) tuple2._1() ? ResponseData.create().success() : ResponseData.create().failed(tuple2._2()); + } } diff --git a/src/main/scala/kafka/console/ConsumerConsole.scala b/src/main/scala/kafka/console/ConsumerConsole.scala index 3df49ae..c0de58a 100644 --- a/src/main/scala/kafka/console/ConsumerConsole.scala +++ b/src/main/scala/kafka/console/ConsumerConsole.scala @@ -8,7 +8,7 @@ import java.util.{Collections, Properties, Set} import com.xuxd.kafka.console.config.KafkaConfig import org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo import org.apache.kafka.clients.admin.{ConsumerGroupDescription, DeleteConsumerGroupsOptions, ListConsumerGroupsOptions, OffsetSpec} -import org.apache.kafka.clients.consumer.{ConsumerConfig, OffsetAndMetadata} +import org.apache.kafka.clients.consumer.{ConsumerConfig, OffsetAndMetadata, OffsetResetStrategy} import org.apache.kafka.common.{ConsumerGroupState, TopicPartition} import scala.beans.BeanProperty @@ -138,20 +138,27 @@ class ConsumerConsole(config: KafkaConfig) extends KafkaConsole(config: KafkaCon } def resetOffsetToEarliest(groupId: String, topic: String): (Boolean, String) = { + resetOffsetToEndpoint(groupId, topic, OffsetResetStrategy.EARLIEST) + } + + def resetOffsetToEndpoint(groupId: String, topic: String, strategy: OffsetResetStrategy): (Boolean, String) = { val props = new Properties() props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId) props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false") - props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, strategy.name().toLowerCase); withConsumerAndCatchError(consumer => { consumer.subscribe(Collections.singleton(topic)) consumer.poll(0) val partitions = consumer.partitionsFor(topic).asScala.map(p => new TopicPartition(topic, p.partition())).toList - consumer.seekToBeginning(partitions.asJava) + strategy match { + case OffsetResetStrategy.EARLIEST => consumer.seekToBeginning(partitions.asJava) + case OffsetResetStrategy.LATEST => consumer.seekToEnd(partitions.asJava) + } partitions.foreach(consumer.position(_)) consumer.commitSync() (true, "") }, e => { - log.error("resetOffsetToEarliest error", e) + log.error("resetOffsetToEndpoint error", e) (false, e.getMessage) }, props).asInstanceOf[(Boolean, String)] } diff --git a/ui/src/utils/api.js b/ui/src/utils/api.js index 1658070..5d11dee 100644 --- a/ui/src/utils/api.js +++ b/ui/src/utils/api.js @@ -104,6 +104,10 @@ export const KafkaConsumerApi = { url: "/consumer/subscription", method: "post", }, + resetOffset: { + url: "/consumer/reset/offset", + method: "post", + }, }; export const KafkaClusterApi = { diff --git a/ui/src/views/group/ConsumerDetail.vue b/ui/src/views/group/ConsumerDetail.vue index 63a8852..2d3cab2 100644 --- a/ui/src/views/group/ConsumerDetail.vue +++ b/ui/src/views/group/ConsumerDetail.vue @@ -12,7 +12,37 @@
-

Topic: {{ k }} | 积压: {{ v.lag }}

+ Topic: {{ k }} | 积压: {{ v.lag }} + | 重置消费位点->: + + 最小位点 + + + + 最新位点 + + + + 时间戳 +
{{ text }}@{{ record.host }} +
+ 重置位点 + +
@@ -85,6 +124,25 @@ export default { this.data = []; this.$emit("closeConsumerDetailDialog", {}); }, + resetTopicOffsetToEndpoint(groupId, topic, type) { + this.loading = true; + request({ + url: KafkaConsumerApi.resetOffset.url, + method: KafkaConsumerApi.resetOffset.method, + data: { groupId: groupId, topic: topic, level: 1, type: type }, + }).then((res) => { + this.loading = false; + if (res.code != 0) { + notification.error({ + message: "error", + description: res.msg, + }); + } else { + this.$message.success(res.msg); + this.getConsumerDetail(); + } + }); + }, }, }; @@ -115,7 +173,17 @@ const columns = [ dataIndex: "lag", key: "lag", }, + { + title: "操作", + key: "operation", + scopedSlots: { customRender: "operation" }, + width: 500, + }, ]; - + diff --git a/ui/src/views/group/Group.vue b/ui/src/views/group/Group.vue index 01b486e..e883ce5 100644 --- a/ui/src/views/group/Group.vue +++ b/ui/src/views/group/Group.vue @@ -313,6 +313,7 @@ const columns = [ .operation-row-button { height: 4%; text-align: left; + margin-bottom: 8px; } .operation-btn { diff --git a/ui/src/views/topic/Topic.vue b/ui/src/views/topic/Topic.vue index a233aa4..d06ce86 100644 --- a/ui/src/views/topic/Topic.vue +++ b/ui/src/views/topic/Topic.vue @@ -273,6 +273,7 @@ const columns = [ .operation-row-button { height: 4%; text-align: left; + margin-bottom: 8px; } .operation-btn {