根据时间戳重围消费位点

This commit is contained in:
许晓东
2021-11-07 22:47:51 +08:00
parent c5810a1fc6
commit 353921694f
7 changed files with 220 additions and 4 deletions

View File

@@ -25,6 +25,8 @@ public class ResetOffsetDTO {
private long offset;
private String dateStr;
public interface Level {
int TOPIC = 1;
int PARTITION = 2;

View File

@@ -85,6 +85,7 @@ public class ConsumerController {
res = consumerService.resetOffsetToEndpoint(offsetDTO.getGroupId(), offsetDTO.getTopic(), OffsetResetStrategy.LATEST);
break;
case ResetOffsetDTO.Type.TIMESTAMP:
res = consumerService.resetOffsetByDate(offsetDTO.getGroupId(), offsetDTO.getTopic(), offsetDTO.getDateStr());
break;
default:
return ResponseData.create().failed("unknown type");

View File

@@ -27,6 +27,8 @@ public interface ConsumerService {
ResponseData resetOffsetToEndpoint(String groupId, String topic, OffsetResetStrategy strategy);
ResponseData resetOffsetByDate(String groupId, String topic, String dateStr);
ResponseData resetPartitionToTargetOffset(String groupId, TopicPartition partition, long offset);
ResponseData getGroupIdList();

View File

@@ -6,6 +6,7 @@ import com.xuxd.kafka.console.beans.vo.ConsumerDetailVO;
import com.xuxd.kafka.console.beans.vo.ConsumerGroupVO;
import com.xuxd.kafka.console.beans.vo.ConsumerMemberVO;
import com.xuxd.kafka.console.service.ConsumerService;
import java.text.ParseException;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
@@ -22,6 +23,7 @@ import org.apache.kafka.clients.admin.MemberDescription;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.common.ConsumerGroupState;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.Utils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import scala.Tuple2;
@@ -131,6 +133,19 @@ public class ConsumerServiceImpl implements ConsumerService {
return (boolean) tuple2._1() ? ResponseData.create().success() : ResponseData.create().failed(tuple2._2());
}
@Override public ResponseData resetOffsetByDate(String groupId, String topic, String dateStr) {
long timestamp = -1L;
try {
StringBuilder sb = new StringBuilder(dateStr.replace(" ", "T")).append(".000");
timestamp = Utils.getDateTime(sb.toString());
} catch (ParseException e) {
throw new IllegalArgumentException(e);
}
List<TopicPartition> partitions = consumerConsole.listSubscribeTopics(groupId).get(topic);
Tuple2<Object, String> tuple2 = consumerConsole.resetOffsetByTimestamp(groupId, partitions, timestamp);
return (boolean) tuple2._1() ? ResponseData.create().success() : ResponseData.create().failed(tuple2._2());
}
@Override public ResponseData resetPartitionToTargetOffset(String groupId, TopicPartition partition, long offset) {
Tuple2<Object, String> tuple2 = consumerConsole.resetPartitionToTargetOffset(groupId, partition, offset);
return (boolean) tuple2._1() ? ResponseData.create().success() : ResponseData.create().failed(tuple2._2());

View File

@@ -7,12 +7,13 @@ import java.util.{Collections, Properties, Set}
import com.xuxd.kafka.console.config.KafkaConfig
import org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo
import org.apache.kafka.clients.admin.{ConsumerGroupDescription, DeleteConsumerGroupsOptions, ListConsumerGroupsOptions, OffsetSpec}
import org.apache.kafka.clients.admin._
import org.apache.kafka.clients.consumer.{ConsumerConfig, OffsetAndMetadata, OffsetResetStrategy}
import org.apache.kafka.common.requests.ListOffsetsResponse
import org.apache.kafka.common.{ConsumerGroupState, TopicPartition}
import scala.beans.BeanProperty
import scala.collection.{Map, mutable}
import scala.collection.{Map, Seq, mutable}
import scala.jdk.CollectionConverters._
/**
@@ -173,6 +174,19 @@ class ConsumerConsole(config: KafkaConfig) extends KafkaConsole(config: KafkaCon
}).asInstanceOf[(Boolean, String)]
}
def resetOffsetByTimestamp(groupId: String, topicPartitions: util.List[TopicPartition],
timestamp: java.lang.Long): (Boolean, String) = {
withAdminClientAndCatchError(admin => {
val logOffsets = getLogTimestampOffsets(admin, groupId, topicPartitions.asScala, timestamp)
admin.alterConsumerGroupOffsets(groupId, logOffsets.asJava).all().get(timeoutMs, TimeUnit.MILLISECONDS)
(true, "")
}, e => {
log.error("resetOffsetByTimestamp error.", e)
(false, e.getMessage)
}).asInstanceOf[(Boolean, String)]
}
/**
*
* @return k: topic, v: list[topic].
@@ -196,7 +210,7 @@ class ConsumerConsole(config: KafkaConfig) extends KafkaConsole(config: KafkaCon
def listSubscribeTopics(groups: util.Set[String]): util.Map[String, util.List[TopicPartition]] = {
val map: util.Map[String, util.List[TopicPartition]] = new util.HashMap[String, util.List[TopicPartition]]()
withAdminClientAndCatchError(admin => {
for(groupId <- groups.asScala) {
for (groupId <- groups.asScala) {
val commitOffs = admin.listConsumerGroupOffsets(
groupId
).partitionsToOffsetAndMetadata.get.asScala
@@ -237,6 +251,49 @@ class ConsumerConsole(config: KafkaConfig) extends KafkaConsole(config: KafkaCon
}).asInstanceOf[Map[TopicPartition, OffsetAndMetadata]]
}
private def getLogTimestampOffsets(admin: Admin, groupId: String, topicPartitions: Seq[TopicPartition],
timestamp: java.lang.Long): Map[TopicPartition, OffsetAndMetadata] = {
val timestampOffsets = topicPartitions.map { topicPartition =>
topicPartition -> OffsetSpec.forTimestamp(timestamp)
}.toMap
val offsets = admin.listOffsets(
timestampOffsets.asJava,
new ListOffsetsOptions().timeoutMs(timeoutMs)
).all.get
val (successfulOffsetsForTimes, unsuccessfulOffsetsForTimes) =
offsets.asScala.partition(_._2.offset != ListOffsetsResponse.UNKNOWN_OFFSET)
val successfulLogTimestampOffsets = successfulOffsetsForTimes.map {
case (topicPartition, listOffsetsResultInfo) => topicPartition -> new OffsetAndMetadata(listOffsetsResultInfo.offset)
}.toMap
unsuccessfulOffsetsForTimes.foreach { entry =>
log.warn(s"\nWarn: Partition " + entry._1.partition() + " from topic " + entry._1.topic() +
" is empty. Falling back to latest known offset.")
}
successfulLogTimestampOffsets ++ getLogEndOffsets(admin, unsuccessfulOffsetsForTimes.keySet.toSeq)
}
private def getLogEndOffsets(admin: Admin,
topicPartitions: Seq[TopicPartition]): Predef.Map[TopicPartition, OffsetAndMetadata] = {
val endOffsets = topicPartitions.map { topicPartition =>
topicPartition -> OffsetSpec.latest
}.toMap
val offsets = admin.listOffsets(
endOffsets.asJava,
new ListOffsetsOptions().timeoutMs(timeoutMs)
).all.get
val res = topicPartitions.map { topicPartition =>
Option(offsets.get(topicPartition)) match {
case Some(listOffsetsResultInfo) => topicPartition -> new OffsetAndMetadata(listOffsetsResultInfo.offset)
case _ =>
throw new IllegalArgumentException
}
}.toMap
res
}
class TopicPartitionConsumeInfo {
@BeanProperty

View File

@@ -40,7 +40,11 @@
</a-button>
</a-popconfirm>
<a-button size="small" type="danger" style="margin-right: 1%"
<a-button
size="small"
type="danger"
style="margin-right: 1%"
@click="openResetOffsetByTimeDialog(k)"
>时间戳
</a-button>
<hr />
@@ -103,6 +107,12 @@
</a-form-item>
</a-form>
</a-modal>
<ResetOffsetByTime
:visible="showResetOffsetByTimeDialog"
:group="group"
:topic="select.topic"
@closeResetOffsetByTimeDialog="closeResetOffsetByTimeDialog"
></ResetOffsetByTime>
</a-spin>
</div>
</a-modal>
@@ -112,9 +122,11 @@
import request from "@/utils/request";
import { KafkaConsumerApi } from "@/utils/api";
import notification from "ant-design-vue/es/notification";
import ResetOffsetByTime from "@/views/group/ResetOffsetByTime";
export default {
name: "ConsumerDetail",
components: { ResetOffsetByTime },
props: {
group: {
type: String,
@@ -139,6 +151,7 @@ export default {
resetPartitionOffsetForm: this.$form.createForm(this, {
name: "resetPartitionOffsetForm",
}),
showResetOffsetByTimeDialog: false,
};
},
watch: {
@@ -221,6 +234,16 @@ export default {
}
});
},
openResetOffsetByTimeDialog(topic) {
this.select.topic = topic;
this.showResetOffsetByTimeDialog = true;
},
closeResetOffsetByTimeDialog(params) {
this.showResetOffsetByTimeDialog = false;
if (params.refresh) {
this.getConsumerDetail();
}
},
},
};

View File

@@ -0,0 +1,116 @@
<template>
<a-modal
title="重置消费位点"
:visible="show"
:width="600"
:mask="false"
:destroyOnClose="true"
:maskClosable="false"
@cancel="handleCancel"
@ok="resetOffset"
okText="提交"
cancelText="取消"
>
<div>
<a-spin :spinning="loading">
<a-form
:form="resetOffsetForm"
:label-col="{ span: 8 }"
:wrapper-col="{ span: 12 }"
>
<a-form-item label="重置消费位点到">
<a-date-picker
show-time
placeholder="选择重置到哪个时间"
:locale="locale"
v-decorator="[
'dateTime',
{
rules: [{ required: true, message: '输入消费位点!' }],
},
]"
/>
</a-form-item>
</a-form>
</a-spin>
</div>
</a-modal>
</template>
<script>
import request from "@/utils/request";
import { KafkaConsumerApi } from "@/utils/api";
import notification from "ant-design-vue/es/notification";
import locale from "ant-design-vue/lib/date-picker/locale/zh_CN";
import moment from "moment";
export default {
name: "ResetOffsetByTime",
props: {
group: {
type: String,
default: "",
},
topic: {
type: String,
default: "",
},
visible: {
type: Boolean,
default: false,
},
},
data() {
return {
locale,
show: this.visible,
loading: false,
resetOffsetForm: this.$form.createForm(this, {
name: "resetOffsetForm",
}),
};
},
watch: {
visible(v) {
this.show = v;
},
},
methods: {
handleCancel() {
this.$emit("closeResetOffsetByTimeDialog", {});
},
resetOffset() {
this.resetOffsetForm.validateFields((e, v) => {
if (e) {
return;
}
v.dateStr = moment(v.dateTime).format("YYYY-MM-DD HH:mm:ss");
const data = { dateStr: v.dateStr };
data.groupId = this.group;
data.topic = this.topic;
data.level = 1;
data.type = 3;
this.loading = true;
request({
url: KafkaConsumerApi.resetOffset.url,
method: KafkaConsumerApi.resetOffset.method,
data: data,
}).then((res) => {
this.loading = false;
if (res.code != 0) {
notification.error({
message: "error",
description: res.msg,
});
} else {
this.$emit("closeResetOffsetByTimeDialog", { refresh: true });
}
});
});
},
},
};
</script>
<style scoped></style>