diff --git a/src/main/java/com/xuxd/kafka/console/beans/dto/SyncDataDTO.java b/src/main/java/com/xuxd/kafka/console/beans/dto/SyncDataDTO.java new file mode 100644 index 0000000..4a71b7c --- /dev/null +++ b/src/main/java/com/xuxd/kafka/console/beans/dto/SyncDataDTO.java @@ -0,0 +1,22 @@ +package com.xuxd.kafka.console.beans.dto; + +import java.util.Properties; +import lombok.Data; + +/** + * kafka-console-ui. + * + * @author xuxd + * @date 2021-10-24 23:10:47 + **/ +@Data +public class SyncDataDTO { + + private String address; + + private String groupId; + + private String topic; + + private Properties properties = new Properties(); +} diff --git a/src/main/java/com/xuxd/kafka/console/config/KafkaConfiguration.java b/src/main/java/com/xuxd/kafka/console/config/KafkaConfiguration.java index 9d1ed83..0900b59 100644 --- a/src/main/java/com/xuxd/kafka/console/config/KafkaConfiguration.java +++ b/src/main/java/com/xuxd/kafka/console/config/KafkaConfiguration.java @@ -5,6 +5,7 @@ import kafka.console.ConfigConsole; import kafka.console.ConsumerConsole; import kafka.console.KafkaAclConsole; import kafka.console.KafkaConfigConsole; +import kafka.console.OperationConsole; import kafka.console.TopicConsole; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @@ -47,4 +48,10 @@ public class KafkaConfiguration { public ConfigConsole configConsole(KafkaConfig config) { return new ConfigConsole(config); } + + @Bean + public OperationConsole operationConsole(KafkaConfig config, TopicConsole topicConsole, + ConsumerConsole consumerConsole) { + return new OperationConsole(config, topicConsole, consumerConsole); + } } diff --git a/src/main/java/com/xuxd/kafka/console/controller/ConsumerController.java b/src/main/java/com/xuxd/kafka/console/controller/ConsumerController.java index 83ab071..0de784a 100644 --- a/src/main/java/com/xuxd/kafka/console/controller/ConsumerController.java +++ b/src/main/java/com/xuxd/kafka/console/controller/ConsumerController.java @@ -111,4 +111,9 @@ public class ConsumerController { public Object getGroupIdList() { return consumerService.getGroupIdList(); } + + @GetMapping("/topic/list") + public Object getSubscribeTopicList(@RequestParam String groupId) { + return consumerService.getSubscribeTopicList(groupId); + } } diff --git a/src/main/java/com/xuxd/kafka/console/controller/OperationController.java b/src/main/java/com/xuxd/kafka/console/controller/OperationController.java new file mode 100644 index 0000000..3dec9ee --- /dev/null +++ b/src/main/java/com/xuxd/kafka/console/controller/OperationController.java @@ -0,0 +1,30 @@ +package com.xuxd.kafka.console.controller; + +import com.xuxd.kafka.console.beans.dto.SyncDataDTO; +import com.xuxd.kafka.console.service.OperationService; +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestBody; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; + +/** + * kafka-console-ui. + * + * @author xuxd + * @date 2021-10-24 23:13:28 + **/ +@RestController +@RequestMapping("/op") +public class OperationController { + + @Autowired + private OperationService operationService; + + @PostMapping("/sync/consumer/offset") + public Object syncConsumerOffset(@RequestBody SyncDataDTO dto) { + dto.getProperties().put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, dto.getAddress()); + return operationService.syncConsumerOffset(dto.getGroupId(), dto.getTopic(), dto.getProperties()); + } +} diff --git a/src/main/java/com/xuxd/kafka/console/service/ConsumerService.java b/src/main/java/com/xuxd/kafka/console/service/ConsumerService.java index 06ffed7..eb1e2da 100644 --- a/src/main/java/com/xuxd/kafka/console/service/ConsumerService.java +++ b/src/main/java/com/xuxd/kafka/console/service/ConsumerService.java @@ -30,4 +30,6 @@ public interface ConsumerService { ResponseData resetPartitionToTargetOffset(String groupId, TopicPartition partition, long offset); ResponseData getGroupIdList(); + + ResponseData getSubscribeTopicList(String groupId); } diff --git a/src/main/java/com/xuxd/kafka/console/service/OperationService.java b/src/main/java/com/xuxd/kafka/console/service/OperationService.java new file mode 100644 index 0000000..1f5364b --- /dev/null +++ b/src/main/java/com/xuxd/kafka/console/service/OperationService.java @@ -0,0 +1,15 @@ +package com.xuxd.kafka.console.service; + +import com.xuxd.kafka.console.beans.ResponseData; +import java.util.Properties; + +/** + * kafka-console-ui. + * + * @author xuxd + * @date 2021-10-24 23:12:43 + **/ +public interface OperationService { + + ResponseData syncConsumerOffset(String groupId, String topic, Properties thatProps); +} diff --git a/src/main/java/com/xuxd/kafka/console/service/impl/ConsumerServiceImpl.java b/src/main/java/com/xuxd/kafka/console/service/impl/ConsumerServiceImpl.java index 666dd64..7b8f75c 100644 --- a/src/main/java/com/xuxd/kafka/console/service/impl/ConsumerServiceImpl.java +++ b/src/main/java/com/xuxd/kafka/console/service/impl/ConsumerServiceImpl.java @@ -138,4 +138,8 @@ public class ConsumerServiceImpl implements ConsumerService { Set stateGroup = consumerConsole.getConsumerGroupIdList(null); return ResponseData.create().data(stateGroup).success(); } + + @Override public ResponseData getSubscribeTopicList(String groupId) { + return ResponseData.create().data(consumerConsole.listSubscribeTopics(groupId).keySet()).success(); + } } diff --git a/src/main/java/com/xuxd/kafka/console/service/impl/OperationServiceImpl.java b/src/main/java/com/xuxd/kafka/console/service/impl/OperationServiceImpl.java new file mode 100644 index 0000000..4955e96 --- /dev/null +++ b/src/main/java/com/xuxd/kafka/console/service/impl/OperationServiceImpl.java @@ -0,0 +1,29 @@ +package com.xuxd.kafka.console.service.impl; + +import com.xuxd.kafka.console.beans.ResponseData; +import com.xuxd.kafka.console.service.OperationService; +import java.util.Properties; +import kafka.console.OperationConsole; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; +import scala.Tuple2; + +/** + * kafka-console-ui. + * + * @author xuxd + * @date 2021-10-24 23:12:54 + **/ +@Service +public class OperationServiceImpl implements OperationService { + + @Autowired + private OperationConsole operationConsole; + + @Override public ResponseData syncConsumerOffset(String groupId, String topic, Properties thatProps) { + + Tuple2 tuple2 = operationConsole.syncConsumerOffset(groupId, topic, thatProps); + + return (boolean) tuple2._1() ? ResponseData.create().success() : ResponseData.create().failed(tuple2._2()); + } +} diff --git a/src/main/scala/kafka/console/ConsumerConsole.scala b/src/main/scala/kafka/console/ConsumerConsole.scala index 0843d34..8125a4b 100644 --- a/src/main/scala/kafka/console/ConsumerConsole.scala +++ b/src/main/scala/kafka/console/ConsumerConsole.scala @@ -173,6 +173,18 @@ class ConsumerConsole(config: KafkaConfig) extends KafkaConsole(config: KafkaCon }).asInstanceOf[(Boolean, String)] } + def listSubscribeTopics(groupId: String): util.Map[String, util.List[TopicPartition]] = { + val commitOffs = getCommittedOffsets(groupId) + val map: util.Map[String, util.List[TopicPartition]] = new util.HashMap[String, util.List[TopicPartition]]() + for (t <- commitOffs.keySet) { + if (!map.containsKey(t.topic())) { + map.put(t.topic(), new util.ArrayList[TopicPartition]()) + } + map.get(t.topic()).add(t) + } + map + } + private def describeConsumerGroups(groupIds: util.Set[String]): mutable.Map[String, ConsumerGroupDescription] = { withAdminClientAndCatchError(admin => { admin.describeConsumerGroups(groupIds).describedGroups().asScala.map { diff --git a/src/main/scala/kafka/console/KafkaConsole.scala b/src/main/scala/kafka/console/KafkaConsole.scala index cb4e9b2..049e574 100644 --- a/src/main/scala/kafka/console/KafkaConsole.scala +++ b/src/main/scala/kafka/console/KafkaConsole.scala @@ -65,6 +65,10 @@ class KafkaConsole(config: KafkaConfig) { } } + protected def createAdminClient(props: Properties): Admin = { + Admin.create(props) + } + private def createAdminClient(): Admin = { Admin.create(getProps()) } diff --git a/src/main/scala/kafka/console/OperationConsole.scala b/src/main/scala/kafka/console/OperationConsole.scala new file mode 100644 index 0000000..5c85642 --- /dev/null +++ b/src/main/scala/kafka/console/OperationConsole.scala @@ -0,0 +1,39 @@ +package kafka.console + +import java.util.Properties +import java.util.concurrent.TimeUnit + +import com.xuxd.kafka.console.config.KafkaConfig + +import scala.jdk.CollectionConverters.{ListHasAsScala, MapHasAsScala} + +/** + * kafka-console-ui. + * + * @author xuxd + * @date 2021-10-24 23:23:30 + * */ +class OperationConsole(config: KafkaConfig, topicConsole: TopicConsole, + consumerConsole: ConsumerConsole) extends KafkaConsole(config: KafkaConfig) with Logging { + + def syncConsumerOffset(groupId: String, topic: String, props: Properties): (Boolean, String) = { + val thatAdmin = createAdminClient(props) + try { + val thisTopicPartitions = consumerConsole.listSubscribeTopics(groupId).get(topic).asScala.sortBy(_.partition()) + + val thatTopicPartitions = thatAdmin.listConsumerGroupOffsets( + groupId + ).partitionsToOffsetAndMetadata.get(timeoutMs, TimeUnit.MILLISECONDS).asScala.filter(_._1.topic().equals(topic)).keySet.toList.sortBy(_.partition()) + if (thatTopicPartitions != thisTopicPartitions) { + throw new IllegalStateException("topic partition inconsistent.") + } + + } catch { + case ex => throw ex + } finally { + thatAdmin.close() + } + + (true, "") + } +} diff --git a/ui/src/utils/api.js b/ui/src/utils/api.js index 073691e..7aa8886 100644 --- a/ui/src/utils/api.js +++ b/ui/src/utils/api.js @@ -112,6 +112,10 @@ export const KafkaConsumerApi = { url: "/consumer/group/id/list", method: "get", }, + getSubscribeTopicList: { + url: "/consumer/topic/list", + method: "get", + }, }; export const KafkaClusterApi = { @@ -120,3 +124,10 @@ export const KafkaClusterApi = { method: "get", }, }; + +export const KafkaOpApi = { + syncConsumerOffset: { + url: "/op/sync/consumer/offset", + method: "post", + }, +}; diff --git a/ui/src/views/op/SyncConsumerOffset.vue b/ui/src/views/op/SyncConsumerOffset.vue index 2203abc..acc17e1 100644 --- a/ui/src/views/op/SyncConsumerOffset.vue +++ b/ui/src/views/op/SyncConsumerOffset.vue @@ -19,6 +19,7 @@ > - + + + - 提交 + 提交 @@ -70,8 +78,9 @@