新建订阅
This commit is contained in:
@@ -111,9 +111,14 @@ public class ConsumerServiceImpl implements ConsumerService {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// consumer message and commit offset.
|
// consumer message and commit offset.
|
||||||
consumerConsole.consumeMessageDoNothing(groupId, topic);
|
Tuple2<Object, String> tuple21 = consumerConsole.consumeMessageDoNothing(groupId, topic);
|
||||||
|
if (!(boolean) tuple21._1()) {
|
||||||
|
return ResponseData.create().failed(tuple21._2());
|
||||||
|
}
|
||||||
|
|
||||||
// reset consume offset to 0.
|
// reset consume offset to earliest.
|
||||||
return ResponseData.create().success();
|
Tuple2<Object, String> tuple2 = consumerConsole.resetOffsetToEarliest(groupId, topic);
|
||||||
|
|
||||||
|
return (boolean) tuple2._1() ? ResponseData.create().success() : ResponseData.create().failed(tuple2._2());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -84,10 +84,16 @@ class ConsumerConsole(config: KafkaConfig) extends KafkaConsole(config: KafkaCon
|
|||||||
val topicPartitionConsumeInfoMap = commitOffsets.keySet.map(topicPartition => {
|
val topicPartitionConsumeInfoMap = commitOffsets.keySet.map(topicPartition => {
|
||||||
val t = new TopicPartitionConsumeInfo
|
val t = new TopicPartitionConsumeInfo
|
||||||
t.topicPartition = topicPartition
|
t.topicPartition = topicPartition
|
||||||
t.logEndOffset = endOffsets.get(t.topicPartition).get.offset()
|
|
||||||
t.consumerOffset = getPartitionOffset(t.topicPartition).get
|
|
||||||
t.lag = t.logEndOffset - t.consumerOffset
|
|
||||||
t.groupId = consumerGroup.groupId()
|
t.groupId = consumerGroup.groupId()
|
||||||
|
t.consumerOffset = getPartitionOffset(t.topicPartition).get
|
||||||
|
endOffsets.get(t.topicPartition) match {
|
||||||
|
case None => t.lag = -1
|
||||||
|
case Some(v) => {
|
||||||
|
t.logEndOffset = v.offset()
|
||||||
|
t.lag = t.logEndOffset - t.consumerOffset
|
||||||
|
}
|
||||||
|
}
|
||||||
|
t.lag = t.logEndOffset - t.consumerOffset
|
||||||
(topicPartition, t)
|
(topicPartition, t)
|
||||||
}).toMap
|
}).toMap
|
||||||
|
|
||||||
@@ -113,18 +119,41 @@ class ConsumerConsole(config: KafkaConfig) extends KafkaConsole(config: KafkaCon
|
|||||||
res
|
res
|
||||||
}
|
}
|
||||||
|
|
||||||
def consumeMessageDoNothing(groupId: String, topic: String): Unit = {
|
def consumeMessageDoNothing(groupId: String, topic: String): (Boolean, String) = {
|
||||||
val props = new Properties()
|
val props = new Properties()
|
||||||
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId)
|
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId)
|
||||||
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
|
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
|
||||||
|
|
||||||
withConsumerAndCatchError(consumer => {
|
withConsumerAndCatchError(consumer => {
|
||||||
consumer.subscribe(Collections.singletonList(topic))
|
consumer.subscribe(Collections.singletonList(topic))
|
||||||
consumer.poll(Duration.ofSeconds(1))
|
for (i <- 1 to 2) {
|
||||||
|
consumer.poll(Duration.ofSeconds(1))
|
||||||
|
}
|
||||||
consumer.commitSync()
|
consumer.commitSync()
|
||||||
}, e=> {
|
(true, "")
|
||||||
|
}, e => {
|
||||||
log.error("subscribe error", e)
|
log.error("subscribe error", e)
|
||||||
}, props)
|
(false, e.getMessage)
|
||||||
|
}, props).asInstanceOf[(Boolean, String)]
|
||||||
|
}
|
||||||
|
|
||||||
|
def resetOffsetToEarliest(groupId: String, topic: String): (Boolean, String) = {
|
||||||
|
val props = new Properties()
|
||||||
|
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId)
|
||||||
|
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
|
||||||
|
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
|
||||||
|
withConsumerAndCatchError(consumer => {
|
||||||
|
consumer.subscribe(Collections.singleton(topic))
|
||||||
|
consumer.poll(0)
|
||||||
|
val partitions = consumer.partitionsFor(topic).asScala.map(p => new TopicPartition(topic, p.partition())).toList
|
||||||
|
consumer.seekToBeginning(partitions.asJava)
|
||||||
|
partitions.foreach(consumer.position(_))
|
||||||
|
consumer.commitSync()
|
||||||
|
(true, "")
|
||||||
|
}, e => {
|
||||||
|
log.error("resetOffsetToEarliest error", e)
|
||||||
|
(false, e.getMessage)
|
||||||
|
}, props).asInstanceOf[(Boolean, String)]
|
||||||
}
|
}
|
||||||
|
|
||||||
private def describeConsumerGroups(groupIds: util.Set[String]): mutable.Map[String, ConsumerGroupDescription] = {
|
private def describeConsumerGroups(groupIds: util.Set[String]): mutable.Map[String, ConsumerGroupDescription] = {
|
||||||
|
|||||||
Reference in New Issue
Block a user