diff --git a/src/main/java/com/xuxd/kafka/console/beans/dto/ResetOffsetDTO.java b/src/main/java/com/xuxd/kafka/console/beans/dto/ResetOffsetDTO.java index 0d3d92f..c83d1ec 100644 --- a/src/main/java/com/xuxd/kafka/console/beans/dto/ResetOffsetDTO.java +++ b/src/main/java/com/xuxd/kafka/console/beans/dto/ResetOffsetDTO.java @@ -25,6 +25,8 @@ public class ResetOffsetDTO { private long offset; + private String dateStr; + public interface Level { int TOPIC = 1; int PARTITION = 2; diff --git a/src/main/java/com/xuxd/kafka/console/controller/ConsumerController.java b/src/main/java/com/xuxd/kafka/console/controller/ConsumerController.java index d5993b1..a9b9ab3 100644 --- a/src/main/java/com/xuxd/kafka/console/controller/ConsumerController.java +++ b/src/main/java/com/xuxd/kafka/console/controller/ConsumerController.java @@ -85,6 +85,7 @@ public class ConsumerController { res = consumerService.resetOffsetToEndpoint(offsetDTO.getGroupId(), offsetDTO.getTopic(), OffsetResetStrategy.LATEST); break; case ResetOffsetDTO.Type.TIMESTAMP: + res = consumerService.resetOffsetByDate(offsetDTO.getGroupId(), offsetDTO.getTopic(), offsetDTO.getDateStr()); break; default: return ResponseData.create().failed("unknown type"); diff --git a/src/main/java/com/xuxd/kafka/console/service/ConsumerService.java b/src/main/java/com/xuxd/kafka/console/service/ConsumerService.java index 7e04436..5a5f9bc 100644 --- a/src/main/java/com/xuxd/kafka/console/service/ConsumerService.java +++ b/src/main/java/com/xuxd/kafka/console/service/ConsumerService.java @@ -27,6 +27,8 @@ public interface ConsumerService { ResponseData resetOffsetToEndpoint(String groupId, String topic, OffsetResetStrategy strategy); + ResponseData resetOffsetByDate(String groupId, String topic, String dateStr); + ResponseData resetPartitionToTargetOffset(String groupId, TopicPartition partition, long offset); ResponseData getGroupIdList(); diff --git a/src/main/java/com/xuxd/kafka/console/service/impl/ConsumerServiceImpl.java b/src/main/java/com/xuxd/kafka/console/service/impl/ConsumerServiceImpl.java index 4e28e88..cdf9492 100644 --- a/src/main/java/com/xuxd/kafka/console/service/impl/ConsumerServiceImpl.java +++ b/src/main/java/com/xuxd/kafka/console/service/impl/ConsumerServiceImpl.java @@ -6,6 +6,7 @@ import com.xuxd.kafka.console.beans.vo.ConsumerDetailVO; import com.xuxd.kafka.console.beans.vo.ConsumerGroupVO; import com.xuxd.kafka.console.beans.vo.ConsumerMemberVO; import com.xuxd.kafka.console.service.ConsumerService; +import java.text.ParseException; import java.util.Collection; import java.util.Collections; import java.util.Comparator; @@ -22,6 +23,7 @@ import org.apache.kafka.clients.admin.MemberDescription; import org.apache.kafka.clients.consumer.OffsetResetStrategy; import org.apache.kafka.common.ConsumerGroupState; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.utils.Utils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import scala.Tuple2; @@ -131,6 +133,19 @@ public class ConsumerServiceImpl implements ConsumerService { return (boolean) tuple2._1() ? ResponseData.create().success() : ResponseData.create().failed(tuple2._2()); } + @Override public ResponseData resetOffsetByDate(String groupId, String topic, String dateStr) { + long timestamp = -1L; + try { + StringBuilder sb = new StringBuilder(dateStr.replace(" ", "T")).append(".000"); + timestamp = Utils.getDateTime(sb.toString()); + } catch (ParseException e) { + throw new IllegalArgumentException(e); + } + List partitions = consumerConsole.listSubscribeTopics(groupId).get(topic); + Tuple2 tuple2 = consumerConsole.resetOffsetByTimestamp(groupId, partitions, timestamp); + return (boolean) tuple2._1() ? ResponseData.create().success() : ResponseData.create().failed(tuple2._2()); + } + @Override public ResponseData resetPartitionToTargetOffset(String groupId, TopicPartition partition, long offset) { Tuple2 tuple2 = consumerConsole.resetPartitionToTargetOffset(groupId, partition, offset); return (boolean) tuple2._1() ? ResponseData.create().success() : ResponseData.create().failed(tuple2._2()); diff --git a/src/main/scala/kafka/console/ConsumerConsole.scala b/src/main/scala/kafka/console/ConsumerConsole.scala index 80edba3..e9ca8fb 100644 --- a/src/main/scala/kafka/console/ConsumerConsole.scala +++ b/src/main/scala/kafka/console/ConsumerConsole.scala @@ -7,12 +7,13 @@ import java.util.{Collections, Properties, Set} import com.xuxd.kafka.console.config.KafkaConfig import org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo -import org.apache.kafka.clients.admin.{ConsumerGroupDescription, DeleteConsumerGroupsOptions, ListConsumerGroupsOptions, OffsetSpec} +import org.apache.kafka.clients.admin._ import org.apache.kafka.clients.consumer.{ConsumerConfig, OffsetAndMetadata, OffsetResetStrategy} +import org.apache.kafka.common.requests.ListOffsetsResponse import org.apache.kafka.common.{ConsumerGroupState, TopicPartition} import scala.beans.BeanProperty -import scala.collection.{Map, mutable} +import scala.collection.{Map, Seq, mutable} import scala.jdk.CollectionConverters._ /** @@ -173,6 +174,19 @@ class ConsumerConsole(config: KafkaConfig) extends KafkaConsole(config: KafkaCon }).asInstanceOf[(Boolean, String)] } + def resetOffsetByTimestamp(groupId: String, topicPartitions: util.List[TopicPartition], + timestamp: java.lang.Long): (Boolean, String) = { + withAdminClientAndCatchError(admin => { + val logOffsets = getLogTimestampOffsets(admin, groupId, topicPartitions.asScala, timestamp) + + admin.alterConsumerGroupOffsets(groupId, logOffsets.asJava).all().get(timeoutMs, TimeUnit.MILLISECONDS) + (true, "") + }, e => { + log.error("resetOffsetByTimestamp error.", e) + (false, e.getMessage) + }).asInstanceOf[(Boolean, String)] + } + /** * * @return k: topic, v: list[topic]. @@ -196,7 +210,7 @@ class ConsumerConsole(config: KafkaConfig) extends KafkaConsole(config: KafkaCon def listSubscribeTopics(groups: util.Set[String]): util.Map[String, util.List[TopicPartition]] = { val map: util.Map[String, util.List[TopicPartition]] = new util.HashMap[String, util.List[TopicPartition]]() withAdminClientAndCatchError(admin => { - for(groupId <- groups.asScala) { + for (groupId <- groups.asScala) { val commitOffs = admin.listConsumerGroupOffsets( groupId ).partitionsToOffsetAndMetadata.get.asScala @@ -237,6 +251,49 @@ class ConsumerConsole(config: KafkaConfig) extends KafkaConsole(config: KafkaCon }).asInstanceOf[Map[TopicPartition, OffsetAndMetadata]] } + private def getLogTimestampOffsets(admin: Admin, groupId: String, topicPartitions: Seq[TopicPartition], + timestamp: java.lang.Long): Map[TopicPartition, OffsetAndMetadata] = { + val timestampOffsets = topicPartitions.map { topicPartition => + topicPartition -> OffsetSpec.forTimestamp(timestamp) + }.toMap + val offsets = admin.listOffsets( + timestampOffsets.asJava, + new ListOffsetsOptions().timeoutMs(timeoutMs) + ).all.get + val (successfulOffsetsForTimes, unsuccessfulOffsetsForTimes) = + offsets.asScala.partition(_._2.offset != ListOffsetsResponse.UNKNOWN_OFFSET) + + val successfulLogTimestampOffsets = successfulOffsetsForTimes.map { + case (topicPartition, listOffsetsResultInfo) => topicPartition -> new OffsetAndMetadata(listOffsetsResultInfo.offset) + }.toMap + + unsuccessfulOffsetsForTimes.foreach { entry => + log.warn(s"\nWarn: Partition " + entry._1.partition() + " from topic " + entry._1.topic() + + " is empty. Falling back to latest known offset.") + } + + successfulLogTimestampOffsets ++ getLogEndOffsets(admin, unsuccessfulOffsetsForTimes.keySet.toSeq) + } + + private def getLogEndOffsets(admin: Admin, + topicPartitions: Seq[TopicPartition]): Predef.Map[TopicPartition, OffsetAndMetadata] = { + val endOffsets = topicPartitions.map { topicPartition => + topicPartition -> OffsetSpec.latest + }.toMap + val offsets = admin.listOffsets( + endOffsets.asJava, + new ListOffsetsOptions().timeoutMs(timeoutMs) + ).all.get + val res = topicPartitions.map { topicPartition => + Option(offsets.get(topicPartition)) match { + case Some(listOffsetsResultInfo) => topicPartition -> new OffsetAndMetadata(listOffsetsResultInfo.offset) + case _ => + throw new IllegalArgumentException + } + }.toMap + res + } + class TopicPartitionConsumeInfo { @BeanProperty diff --git a/ui/src/views/group/ConsumerDetail.vue b/ui/src/views/group/ConsumerDetail.vue index 6fce7e3..01a83fd 100644 --- a/ui/src/views/group/ConsumerDetail.vue +++ b/ui/src/views/group/ConsumerDetail.vue @@ -40,7 +40,11 @@ - 时间戳
@@ -103,6 +107,12 @@ + @@ -112,9 +122,11 @@ import request from "@/utils/request"; import { KafkaConsumerApi } from "@/utils/api"; import notification from "ant-design-vue/es/notification"; +import ResetOffsetByTime from "@/views/group/ResetOffsetByTime"; export default { name: "ConsumerDetail", + components: { ResetOffsetByTime }, props: { group: { type: String, @@ -139,6 +151,7 @@ export default { resetPartitionOffsetForm: this.$form.createForm(this, { name: "resetPartitionOffsetForm", }), + showResetOffsetByTimeDialog: false, }; }, watch: { @@ -221,6 +234,16 @@ export default { } }); }, + openResetOffsetByTimeDialog(topic) { + this.select.topic = topic; + this.showResetOffsetByTimeDialog = true; + }, + closeResetOffsetByTimeDialog(params) { + this.showResetOffsetByTimeDialog = false; + if (params.refresh) { + this.getConsumerDetail(); + } + }, }, }; diff --git a/ui/src/views/group/ResetOffsetByTime.vue b/ui/src/views/group/ResetOffsetByTime.vue new file mode 100644 index 0000000..68b80bc --- /dev/null +++ b/ui/src/views/group/ResetOffsetByTime.vue @@ -0,0 +1,116 @@ + + + + +