集群同步-》同步消费位移
This commit is contained in:
@@ -64,12 +64,12 @@ public class OperationServiceImpl implements OperationService {
|
|||||||
|
|
||||||
@Override public ResponseData minOffsetAlignment(String groupId, String topic, Properties thatProps) {
|
@Override public ResponseData minOffsetAlignment(String groupId, String topic, Properties thatProps) {
|
||||||
|
|
||||||
Tuple2<Map<TopicPartition, Object>, Map<TopicPartition, Object>> tuple2 = operationConsole.checkAndFetchMinOffset(groupId, topic, thatProps);
|
Tuple2<Map<TopicPartition, Object>, Map<TopicPartition, Object>> tuple2 = operationConsole.checkAndFetchOffset(groupId, topic, thatProps);
|
||||||
Map<TopicPartition, Object> thisMinOffset = tuple2._1();
|
Map<TopicPartition, Object> thisMaxOffset = tuple2._1();
|
||||||
Map<TopicPartition, Object> thatMinOffset = tuple2._2();
|
Map<TopicPartition, Object> thatMinOffset = tuple2._2();
|
||||||
|
|
||||||
JsonObject thisJson = new JsonObject(), thatJson = new JsonObject();
|
JsonObject thisJson = new JsonObject(), thatJson = new JsonObject();
|
||||||
thisMinOffset.forEach((k, v) -> {
|
thisMaxOffset.forEach((k, v) -> {
|
||||||
thisJson.addProperty(String.valueOf(k.partition()), v.toString());
|
thisJson.addProperty(String.valueOf(k.partition()), v.toString());
|
||||||
});
|
});
|
||||||
thatMinOffset.forEach((k, v) -> {
|
thatMinOffset.forEach((k, v) -> {
|
||||||
|
|||||||
@@ -96,7 +96,7 @@ class OperationConsole(config: KafkaConfig, topicConsole: TopicConsole,
|
|||||||
import java.util
|
import java.util
|
||||||
|
|
||||||
def syncConsumerOffset(groupId: String, topic: String, props: Properties,
|
def syncConsumerOffset(groupId: String, topic: String, props: Properties,
|
||||||
thisMinOffset: util.Map[TopicPartition, Long],
|
thisMaxOffset: util.Map[TopicPartition, Long],
|
||||||
thatMinOffset: util.Map[TopicPartition, Long]): (Boolean, String) = {
|
thatMinOffset: util.Map[TopicPartition, Long]): (Boolean, String) = {
|
||||||
val thatAdmin = createAdminClient(props)
|
val thatAdmin = createAdminClient(props)
|
||||||
try {
|
try {
|
||||||
@@ -143,9 +143,9 @@ class OperationConsole(config: KafkaConfig, topicConsole: TopicConsole,
|
|||||||
}
|
}
|
||||||
})
|
})
|
||||||
// get from this kafka: min offset of topic
|
// get from this kafka: min offset of topic
|
||||||
val thisMinTopicOffset = thisMinOffset.asScala
|
val thisMaxTopicOffset = thisMaxOffset.asScala
|
||||||
|
|
||||||
for ((t: TopicPartition, o: Long) <- thisMinTopicOffset) {
|
for ((t: TopicPartition, o: Long) <- thisMaxTopicOffset) {
|
||||||
val res = consumerConsole.resetPartitionToTargetOffset(groupId, t, o + thatDiffOffset.get(t).get)
|
val res = consumerConsole.resetPartitionToTargetOffset(groupId, t, o + thatDiffOffset.get(t).get)
|
||||||
if (!res._1) {
|
if (!res._1) {
|
||||||
log.error(s"reset $t offset failed")
|
log.error(s"reset $t offset failed")
|
||||||
@@ -164,14 +164,14 @@ class OperationConsole(config: KafkaConfig, topicConsole: TopicConsole,
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* check partition consistency and fetch the min offset for the topic.
|
* check partition consistency and fetch the min offset for the that kafka topic, max offset for this kafka.
|
||||||
*
|
*
|
||||||
* @param groupId group id.
|
* @param groupId group id.
|
||||||
* @param topic topic.
|
* @param topic topic.
|
||||||
* @param props other kafka cluster config.
|
* @param props other kafka cluster config.
|
||||||
* @return _1: this min offset, _2: that min offset.
|
* @return _1: this max offset, _2: that min offset.
|
||||||
*/
|
*/
|
||||||
def checkAndFetchMinOffset(groupId: String, topic: String,
|
def checkAndFetchOffset(groupId: String, topic: String,
|
||||||
props: Properties): (util.Map[TopicPartition, Long], util.Map[TopicPartition, Long]) = {
|
props: Properties): (util.Map[TopicPartition, Long], util.Map[TopicPartition, Long]) = {
|
||||||
val thatAdmin = createAdminClient(props)
|
val thatAdmin = createAdminClient(props)
|
||||||
val thatConsumer = new KafkaConsumer(props, new ByteArrayDeserializer, new ByteArrayDeserializer)
|
val thatConsumer = new KafkaConsumer(props, new ByteArrayDeserializer, new ByteArrayDeserializer)
|
||||||
@@ -186,9 +186,9 @@ class OperationConsole(config: KafkaConfig, topicConsole: TopicConsole,
|
|||||||
throw new IllegalStateException("topic partition inconsistent.")
|
throw new IllegalStateException("topic partition inconsistent.")
|
||||||
}
|
}
|
||||||
val thatMinTopicOffset = thatConsumer.beginningOffsets(thatTopicPartitions.asJava)
|
val thatMinTopicOffset = thatConsumer.beginningOffsets(thatTopicPartitions.asJava)
|
||||||
val thisMinTopicOffset = topicConsole.getTopicOffset(topic, thisTopicPartitions.asJava)._1
|
val thisMaxTopicOffset = topicConsole.getTopicOffset(topic, thisTopicPartitions.asJava)._2
|
||||||
|
|
||||||
(thisMinTopicOffset, thatMinTopicOffset).asInstanceOf[(util.Map[TopicPartition, Long], util.Map[TopicPartition, Long])]
|
(thisMaxTopicOffset, thatMinTopicOffset).asInstanceOf[(util.Map[TopicPartition, Long], util.Map[TopicPartition, Long])]
|
||||||
} finally {
|
} finally {
|
||||||
thatAdmin.close()
|
thatAdmin.close()
|
||||||
thatConsumer.close()
|
thatConsumer.close()
|
||||||
|
|||||||
Reference in New Issue
Block a user