feat: Support only watching key resources in one namespace (#1821)

This commit is contained in:
Kent Dong
2025-03-03 15:40:44 +08:00
committed by GitHub
parent 988e2c1fa7
commit 188914a16b
17 changed files with 116 additions and 51 deletions

View File

@@ -15,21 +15,33 @@
package http2rpc
import (
"istio.io/istio/pkg/cluster"
"time"
"istio.io/istio/pkg/kube/controllers"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/cache"
v1 "github.com/alibaba/higress/client/pkg/apis/networking/v1"
"github.com/alibaba/higress/client/pkg/clientset/versioned"
informersv1 "github.com/alibaba/higress/client/pkg/informers/externalversions/networking/v1"
listersv1 "github.com/alibaba/higress/client/pkg/listers/networking/v1"
"github.com/alibaba/higress/pkg/ingress/kube/common"
"github.com/alibaba/higress/pkg/ingress/kube/controller"
kubeclient "github.com/alibaba/higress/pkg/kube"
)
type Http2RpcController controller.Controller[listersv1.Http2RpcLister]
func NewController(client kubeclient.Client, clusterId cluster.ID) Http2RpcController {
informer := client.HigressInformer().Networking().V1().Http2Rpcs().Informer()
return controller.NewCommonController("http2rpc", client.HigressInformer().Networking().V1().Http2Rpcs().Lister(),
informer, GetHttp2Rpc, clusterId)
func NewController(client kubeclient.Client, options common.Options) Http2RpcController {
var informer cache.SharedIndexInformer
if options.WatchNamespace == "" {
informer = client.HigressInformer().Networking().V1().Http2Rpcs().Informer()
} else {
informer = client.HigressInformer().InformerFor(&v1.Http2Rpc{}, func(client versioned.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer {
return informersv1.NewHttp2RpcInformer(client, options.WatchNamespace, resyncPeriod, nil)
})
}
return controller.NewCommonController("http2rpc", listersv1.NewHttp2RpcLister(informer.GetIndexer()), informer, GetHttp2Rpc, options.ClusterId)
}
func GetHttp2Rpc(lister listersv1.Http2RpcLister, namespacedName types.NamespacedName) (controllers.Object, error) {

View File

@@ -100,7 +100,7 @@ type controller struct {
// NewController creates a new Kubernetes controller
func NewController(localKubeClient, client kubeclient.Client, options common.Options,
secretController secret.SecretController) common.IngressController {
opts := ktypes.InformerOptions{}
opts := ktypes.InformerOptions{Namespace: options.WatchNamespace}
ingressInformer := util.GetInformerFiltered(client, opts, gvrIngressV1Beta1, &ingress.Ingress{},
func(options metav1.ListOptions) (runtime.Object, error) {
return client.Kube().NetworkingV1beta1().Ingresses(opts.Namespace).List(context.Background(), options)

View File

@@ -54,7 +54,7 @@ func TestIngressControllerApplies(t *testing.T) {
options := common.Options{IngressClass: "mse", ClusterId: ""}
secretController := secret.NewController(localKubeClient, options.ClusterId)
secretController := secret.NewController(localKubeClient, options)
ingressController := NewController(localKubeClient, client, options, secretController)
testcases := map[string]func(*testing.T, common.IngressController){
@@ -253,7 +253,7 @@ func TestIngressControllerConventions(t *testing.T) {
options := common.Options{IngressClass: "mse", ClusterId: "", EnableStatus: true}
secretController := secret.NewController(localKubeClient, options.ClusterId)
secretController := secret.NewController(localKubeClient, options)
ingressController := NewController(localKubeClient, client, options, secretController)
testcases := map[string]func(*testing.T, common.IngressController){
@@ -1142,7 +1142,7 @@ func TestIngressControllerProcessing(t *testing.T) {
options := common.Options{IngressClass: "mse", ClusterId: "", EnableStatus: true}
secretController := secret.NewController(localKubeClient, options.ClusterId)
secretController := secret.NewController(localKubeClient, options)
opts := ktypes.InformerOptions{}
ingressInformer := util.GetInformerFiltered(fakeClient, opts, gvrIngressV1Beta1, &ingress.Ingress{},

View File

@@ -92,7 +92,7 @@ type controller struct {
// NewController creates a new Kubernetes controller
func NewController(localKubeClient, client kubeclient.Client, options common.Options, secretController secret.SecretController) common.IngressController {
opts := ktypes.InformerOptions{}
opts := ktypes.InformerOptions{Namespace: options.WatchNamespace}
ingressInformer := schemakubeclient.GetInformerFilteredFromGVR(client, opts, gvr.Ingress)
ingressLister := networkinglister.NewIngressLister(ingressInformer.Informer.GetIndexer())
serviceInformer := schemakubeclient.GetInformerFilteredFromGVR(client, opts, gvr.Service)

View File

@@ -21,6 +21,7 @@ import (
"sort"
"strings"
"sync"
"time"
"github.com/hashicorp/go-multierror"
networking "istio.io/api/networking/v1alpha3"
@@ -43,7 +44,9 @@ import (
listerv1 "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
ingress "knative.dev/networking/pkg/apis/networking/v1alpha1"
networkingv1alpha1 "knative.dev/networking/pkg/client/listers/networking/v1alpha1"
"knative.dev/networking/pkg/client/clientset/versioned"
informernetworkingv1alpha1 "knative.dev/networking/pkg/client/informers/externalversions/networking/v1alpha1"
listernetworkingv1alpha1 "knative.dev/networking/pkg/client/listers/networking/v1alpha1"
"github.com/alibaba/higress/pkg/ingress/kube/annotations"
"github.com/alibaba/higress/pkg/ingress/kube/common"
@@ -76,7 +79,7 @@ type controller struct {
ingresses map[string]*ingress.Ingress
ingressInformer cache.SharedInformer
ingressLister networkingv1alpha1.IngressLister
ingressLister listernetworkingv1alpha1.IngressLister
serviceInformer informerfactory.StartableInformer
serviceLister listerv1.ServiceLister
secretController secret.SecretController
@@ -86,16 +89,23 @@ type controller struct {
// NewController creates a new Kubernetes controller
func NewController(localKubeClient, client kube.Client, options common.Options,
secretController secret.SecretController) common.KIngressController {
//var namespace string = "default"
ingressInformer := client.KIngressInformer().Networking().V1alpha1().Ingresses()
serviceInformer := schemakubeclient.GetInformerFilteredFromGVR(client, ktypes.InformerOptions{}, gvr.Service)
var ingressInformer cache.SharedIndexInformer
if options.WatchNamespace == "" {
ingressInformer = client.KIngressInformer().Networking().V1alpha1().Ingresses().Informer()
} else {
ingressInformer = client.KIngressInformer().InformerFor(&ingress.Ingress{}, func(c versioned.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer {
return informernetworkingv1alpha1.NewIngressInformer(c, options.WatchNamespace, resyncPeriod, nil)
})
}
ingressLister := listernetworkingv1alpha1.NewIngressLister(ingressInformer.GetIndexer())
serviceInformer := schemakubeclient.GetInformerFilteredFromGVR(client, ktypes.InformerOptions{Namespace: options.WatchNamespace}, gvr.Service)
serviceLister := listerv1.NewServiceLister(serviceInformer.Informer.GetIndexer())
c := &controller{
options: options,
ingresses: make(map[string]*ingress.Ingress),
ingressInformer: ingressInformer.Informer(),
ingressLister: ingressInformer.Lister(),
ingressInformer: ingressInformer,
ingressLister: ingressLister,
serviceInformer: serviceInformer,
serviceLister: serviceLister,
secretController: secretController,

View File

@@ -154,7 +154,7 @@ func TestKIngressControllerConventions(t *testing.T) {
options := common.Options{IngressClass: "mse", ClusterId: "", EnableStatus: true}
secretController := secret.NewController(localKubeClient, options.ClusterId)
secretController := secret.NewController(localKubeClient, options)
ingressController := NewController(localKubeClient, client, options, secretController)
testcases := map[string]func(*testing.T, common.KIngressController){

View File

@@ -15,21 +15,33 @@
package mcpbridge
import (
"istio.io/istio/pkg/cluster"
"time"
"istio.io/istio/pkg/kube/controllers"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/cache"
v1 "github.com/alibaba/higress/client/pkg/apis/networking/v1"
"github.com/alibaba/higress/client/pkg/clientset/versioned"
informersv1 "github.com/alibaba/higress/client/pkg/informers/externalversions/networking/v1"
listersv1 "github.com/alibaba/higress/client/pkg/listers/networking/v1"
"github.com/alibaba/higress/pkg/ingress/kube/common"
"github.com/alibaba/higress/pkg/ingress/kube/controller"
kubeclient "github.com/alibaba/higress/pkg/kube"
)
type McpBridgeController controller.Controller[listersv1.McpBridgeLister]
func NewController(client kubeclient.Client, clusterId cluster.ID) McpBridgeController {
informer := client.HigressInformer().Networking().V1().McpBridges().Informer()
return controller.NewCommonController("mcpbridge", client.HigressInformer().Networking().V1().McpBridges().Lister(),
informer, GetMcpBridge, clusterId)
func NewController(client kubeclient.Client, options common.Options) McpBridgeController {
var informer cache.SharedIndexInformer
if options.WatchNamespace == "" {
informer = client.HigressInformer().Networking().V1().McpBridges().Informer()
} else {
informer = client.HigressInformer().InformerFor(&v1.McpBridge{}, func(client versioned.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer {
return informersv1.NewMcpBridgeInformer(client, options.WatchNamespace, resyncPeriod, nil)
})
}
return controller.NewCommonController("mcpbridge", listersv1.NewMcpBridgeLister(informer.GetIndexer()), informer, GetMcpBridge, options.ClusterId)
}
func GetMcpBridge(lister listersv1.McpBridgeLister, namespacedName types.NamespacedName) (controllers.Object, error) {

View File

@@ -15,15 +15,14 @@
package secret
import (
"github.com/alibaba/higress/pkg/ingress/kube/common"
"github.com/alibaba/higress/pkg/ingress/kube/controller"
"istio.io/istio/pkg/cluster"
"istio.io/istio/pkg/config/schema/gvr"
schemakubeclient "istio.io/istio/pkg/config/schema/kubeclient"
kubeclient "istio.io/istio/pkg/kube"
"istio.io/istio/pkg/kube/controllers"
ktypes "istio.io/istio/pkg/kube/kubetypes"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/types"
listersv1 "k8s.io/client-go/listers/core/v1"
@@ -31,17 +30,17 @@ import (
type SecretController controller.Controller[listersv1.SecretLister]
func NewController(client kubeclient.Client, clusterId cluster.ID) SecretController {
func NewController(client kubeclient.Client, options common.Options) SecretController {
opts := ktypes.InformerOptions{
Namespace: metav1.NamespaceAll,
Cluster: clusterId,
Namespace: options.WatchNamespace,
Cluster: options.ClusterId,
FieldSelector: fields.AndSelectors(
fields.OneTermNotEqualSelector("type", "helm.sh/release.v1"),
fields.OneTermNotEqualSelector("type", string(v1.SecretTypeServiceAccountToken)),
).String(),
}
informer := schemakubeclient.GetInformerFilteredFromGVR(client, opts, gvr.Secret)
return controller.NewCommonController("secret", listersv1.NewSecretLister(informer.Informer.GetIndexer()), informer.Informer, GetSecret, clusterId)
return controller.NewCommonController("secret", listersv1.NewSecretLister(informer.Informer.GetIndexer()), informer.Informer, GetSecret, options.ClusterId)
}
func GetSecret(lister listersv1.SecretLister, namespacedName types.NamespacedName) (controllers.Object, error) {

View File

@@ -16,6 +16,7 @@ package secret
import (
"context"
"github.com/alibaba/higress/pkg/ingress/kube/common"
"reflect"
"sync"
"testing"
@@ -43,7 +44,7 @@ var period = time.Second
func TestController(t *testing.T) {
client := kubeclient.NewFakeClient()
ctrl := NewController(client, "fake-cluster")
ctrl := NewController(client, common.Options{ClusterId: "fake-cluster"})
stop := make(chan struct{})
t.Cleanup(func() {

View File

@@ -15,21 +15,33 @@
package wasmplugin
import (
"istio.io/istio/pkg/cluster"
"time"
"istio.io/istio/pkg/kube/controllers"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/cache"
v1 "github.com/alibaba/higress/client/pkg/apis/extensions/v1alpha1"
"github.com/alibaba/higress/client/pkg/clientset/versioned"
informersv1 "github.com/alibaba/higress/client/pkg/informers/externalversions/extensions/v1alpha1"
listersv1 "github.com/alibaba/higress/client/pkg/listers/extensions/v1alpha1"
"github.com/alibaba/higress/pkg/ingress/kube/common"
"github.com/alibaba/higress/pkg/ingress/kube/controller"
kubeclient "github.com/alibaba/higress/pkg/kube"
)
type WasmPluginController controller.Controller[listersv1.WasmPluginLister]
func NewController(client kubeclient.Client, clusterId cluster.ID) WasmPluginController {
informer := client.HigressInformer().Extensions().V1alpha1().WasmPlugins().Informer()
return controller.NewCommonController("wasmplugin", client.HigressInformer().Extensions().V1alpha1().WasmPlugins().Lister(),
informer, GetWasmPlugin, clusterId)
func NewController(client kubeclient.Client, options common.Options) WasmPluginController {
var informer cache.SharedIndexInformer
if options.WatchNamespace == "" {
informer = client.HigressInformer().Extensions().V1alpha1().WasmPlugins().Informer()
} else {
informer = client.HigressInformer().InformerFor(&v1.WasmPlugin{}, func(client versioned.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer {
return informersv1.NewWasmPluginInformer(client, options.WatchNamespace, resyncPeriod, nil)
})
}
return controller.NewCommonController("wasmplugin", listersv1.NewWasmPluginLister(informer.GetIndexer()), informer, GetWasmPlugin, options.ClusterId)
}
func GetWasmPlugin(lister listersv1.WasmPluginLister, namespacedName types.NamespacedName) (controllers.Object, error) {