diff --git a/src/main/java/com/xuxd/kafka/console/controller/ConfigController.java b/src/main/java/com/xuxd/kafka/console/controller/ConfigController.java index 69f5f9e..d11ca37 100644 --- a/src/main/java/com/xuxd/kafka/console/controller/ConfigController.java +++ b/src/main/java/com/xuxd/kafka/console/controller/ConfigController.java @@ -46,6 +46,16 @@ public class ConfigController { return configService.getTopicConfig(topic); } + @PostMapping("/topic") + public Object setTopicConfig(@RequestBody AlterConfigDTO dto) { + return configService.alterTopicConfig(dto.getEntity(), dto.to(), AlterType.SET); + } + + @DeleteMapping("/topic") + public Object deleteTopicConfig(@RequestBody AlterConfigDTO dto) { + return configService.alterTopicConfig(dto.getEntity(), dto.to(), AlterType.DELETE); + } + @GetMapping("/broker") public Object getBrokerConfig(String brokerId) { return configService.getBrokerConfig(brokerId); diff --git a/src/main/java/com/xuxd/kafka/console/service/ConfigService.java b/src/main/java/com/xuxd/kafka/console/service/ConfigService.java index dc10a23..182866d 100644 --- a/src/main/java/com/xuxd/kafka/console/service/ConfigService.java +++ b/src/main/java/com/xuxd/kafka/console/service/ConfigService.java @@ -17,4 +17,6 @@ public interface ConfigService { ResponseData getBrokerConfig(String brokerId); ResponseData alterBrokerConfig(String brokerId, ConfigEntry entry, AlterType type); + + ResponseData alterTopicConfig(String topic, ConfigEntry entry, AlterType type); } diff --git a/src/main/java/com/xuxd/kafka/console/service/impl/ConfigServiceImpl.java b/src/main/java/com/xuxd/kafka/console/service/impl/ConfigServiceImpl.java index 0af307e..d6fc418 100644 --- a/src/main/java/com/xuxd/kafka/console/service/impl/ConfigServiceImpl.java +++ b/src/main/java/com/xuxd/kafka/console/service/impl/ConfigServiceImpl.java @@ -49,4 +49,18 @@ public class ConfigServiceImpl implements ConfigService { return (boolean) tuple2._1() ? ResponseData.create().success() : ResponseData.create().failed(tuple2._2()); } + @Override + public ResponseData alterTopicConfig(String topic, ConfigEntry entry, AlterType type) { + Tuple2 tuple2 = null; + switch (type) { + case SET: + tuple2 = configConsole.setTopicConfig(topic, entry); + break; + case DELETE: + tuple2 = configConsole.deleteTopicConfig(topic, entry); + break; + } + return (boolean) tuple2._1() ? ResponseData.create().success() : ResponseData.create().failed(tuple2._2()); + } + } diff --git a/src/main/scala/kafka/console/ConfigConsole.scala b/src/main/scala/kafka/console/ConfigConsole.scala index 541ecfe..fbf5d62 100644 --- a/src/main/scala/kafka/console/ConfigConsole.scala +++ b/src/main/scala/kafka/console/ConfigConsole.scala @@ -39,6 +39,14 @@ class ConfigConsole(config: KafkaConfig) extends KafkaConsole(config: KafkaConfi alterConfig(ConfigType.Broker, broker, entry, AlterConfigOp.OpType.DELETE) } + def setTopicConfig(topic: String, entry: ConfigEntry): (Boolean, String) = { + alterConfig(ConfigType.Topic, topic, entry, AlterConfigOp.OpType.SET) + } + + def deleteTopicConfig(topic: String, entry: ConfigEntry): (Boolean, String) = { + alterConfig(ConfigType.Topic, topic, entry, AlterConfigOp.OpType.DELETE) + } + def getConfig(entityType: String, entityName: String): List[ConfigEntry] = { getResourceConfig(entityType, entityName, false).asJava }