diff --git a/src/main/java/com/xuxd/kafka/console/service/impl/ConsumerServiceImpl.java b/src/main/java/com/xuxd/kafka/console/service/impl/ConsumerServiceImpl.java index 116c841..20bbaf7 100644 --- a/src/main/java/com/xuxd/kafka/console/service/impl/ConsumerServiceImpl.java +++ b/src/main/java/com/xuxd/kafka/console/service/impl/ConsumerServiceImpl.java @@ -111,7 +111,7 @@ public class ConsumerServiceImpl implements ConsumerService { } // consumer message and commit offset. - + consumerConsole.consumeMessageDoNothing(groupId, topic); // reset consume offset to 0. return ResponseData.create().success(); diff --git a/src/main/scala/kafka/console/ConsumerConsole.scala b/src/main/scala/kafka/console/ConsumerConsole.scala index 658974f..7f71d4a 100644 --- a/src/main/scala/kafka/console/ConsumerConsole.scala +++ b/src/main/scala/kafka/console/ConsumerConsole.scala @@ -1,13 +1,14 @@ package kafka.console +import java.time.Duration import java.util import java.util.concurrent.TimeUnit -import java.util.{Collections, Set} +import java.util.{Collections, Properties, Set} import com.xuxd.kafka.console.config.KafkaConfig import org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo import org.apache.kafka.clients.admin.{ConsumerGroupDescription, DeleteConsumerGroupsOptions, ListConsumerGroupsOptions, OffsetSpec} -import org.apache.kafka.clients.consumer.OffsetAndMetadata +import org.apache.kafka.clients.consumer.{ConsumerConfig, OffsetAndMetadata} import org.apache.kafka.common.{ConsumerGroupState, TopicPartition} import scala.beans.BeanProperty @@ -112,6 +113,20 @@ class ConsumerConsole(config: KafkaConfig) extends KafkaConsole(config: KafkaCon res } + def consumeMessageDoNothing(groupId: String, topic: String): Unit = { + val props = new Properties() + props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId) + props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false") + + withConsumerAndCatchError(consumer => { + consumer.subscribe(Collections.singletonList(topic)) + consumer.poll(Duration.ofSeconds(1)) + consumer.commitSync() + }, e=> { + log.error("subscribe error", e) + }, props) + } + private def describeConsumerGroups(groupIds: util.Set[String]): mutable.Map[String, ConsumerGroupDescription] = { withAdminClientAndCatchError(admin => { admin.describeConsumerGroups(groupIds).describedGroups().asScala.map { diff --git a/ui/src/views/group/Group.vue b/ui/src/views/group/Group.vue index ad5bef9..01b486e 100644 --- a/ui/src/views/group/Group.vue +++ b/ui/src/views/group/Group.vue @@ -213,8 +213,11 @@ export default { openAddSubscriptionDialog() { this.showAddSubscriptionDialog = true; }, - closeAddSubscriptionDialog() { + closeAddSubscriptionDialog(res) { this.showAddSubscriptionDialog = false; + if (res.refresh) { + this.getConsumerGroupList(); + } }, }, created() {