mirror of
https://github.com/alibaba/higress.git
synced 2026-06-09 04:37:31 +08:00
feat: support KnativeIngress (#524)
This commit is contained in:
@@ -15,21 +15,29 @@
|
||||
package kube
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"reflect"
|
||||
"time"
|
||||
|
||||
"go.uber.org/atomic"
|
||||
istiokube "istio.io/istio/pkg/kube"
|
||||
apiExtensionsV1 "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset/typed/apiextensions/v1"
|
||||
metaV1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"k8s.io/apimachinery/pkg/watch"
|
||||
"k8s.io/client-go/rest"
|
||||
clienttesting "k8s.io/client-go/testing"
|
||||
"k8s.io/client-go/tools/clientcmd"
|
||||
kingressclient "knative.dev/networking/pkg/client/clientset/versioned"
|
||||
kingressfake "knative.dev/networking/pkg/client/clientset/versioned/fake"
|
||||
kingressinformer "knative.dev/networking/pkg/client/informers/externalversions"
|
||||
|
||||
higressclient "github.com/alibaba/higress/client/pkg/clientset/versioned"
|
||||
higressfake "github.com/alibaba/higress/client/pkg/clientset/versioned/fake"
|
||||
higressinformer "github.com/alibaba/higress/client/pkg/informers/externalversions"
|
||||
"github.com/alibaba/higress/pkg/config/constants"
|
||||
)
|
||||
|
||||
type Client interface {
|
||||
@@ -40,6 +48,11 @@ type Client interface {
|
||||
|
||||
// HigressInformer returns an informer for the higress client
|
||||
HigressInformer() higressinformer.SharedInformerFactory
|
||||
|
||||
//KIngress return the Knative kube client
|
||||
KIngress() kingressclient.Interface
|
||||
|
||||
KIngressInformer() kingressinformer.SharedInformerFactory
|
||||
}
|
||||
|
||||
type client struct {
|
||||
@@ -48,9 +61,12 @@ type client struct {
|
||||
higress higressclient.Interface
|
||||
higressInformer higressinformer.SharedInformerFactory
|
||||
|
||||
kingress kingressclient.Interface
|
||||
kingressInformer kingressinformer.SharedInformerFactory
|
||||
// If enable, will wait for cache syncs with extremely short delay. This should be used only for tests
|
||||
fastSync bool
|
||||
informerWatchesPending *atomic.Int32
|
||||
fastSync bool
|
||||
informerWatchesPending *atomic.Int32
|
||||
kinformerWatchesPending *atomic.Int32
|
||||
}
|
||||
|
||||
const resyncInterval = 0
|
||||
@@ -62,7 +78,9 @@ func NewFakeClient(objects ...runtime.Object) Client {
|
||||
c.higress = higressfake.NewSimpleClientset()
|
||||
c.higressInformer = higressinformer.NewSharedInformerFactoryWithOptions(c.higress, resyncInterval)
|
||||
c.informerWatchesPending = atomic.NewInt32(0)
|
||||
|
||||
c.kingress = kingressfake.NewSimpleClientset()
|
||||
c.kingressInformer = kingressinformer.NewSharedInformerFactoryWithOptions(c.kingress, resyncInterval)
|
||||
c.kinformerWatchesPending = atomic.NewInt32(0)
|
||||
// https://github.com/kubernetes/kubernetes/issues/95372
|
||||
// There is a race condition in the client fakes, where events that happen between the List and Watch
|
||||
// of an informer are dropped. To avoid this, we explicitly manage the list and watch, ensuring all lists
|
||||
@@ -90,6 +108,27 @@ func NewFakeClient(objects ...runtime.Object) Client {
|
||||
fc := c.higress.(*higressfake.Clientset)
|
||||
fc.PrependReactor("list", "&", listReactor)
|
||||
fc.PrependWatchReactor("*", watchReactor(fc.Tracker()))
|
||||
|
||||
klistReactor := func(action clienttesting.Action) (handled bool, ret runtime.Object, err error) {
|
||||
c.kinformerWatchesPending.Inc()
|
||||
return false, nil, nil
|
||||
}
|
||||
kwatchReactor := func(tracker clienttesting.ObjectTracker) func(action clienttesting.Action) (handled bool, ret watch.Interface, err error) {
|
||||
return func(action clienttesting.Action) (handled bool, ret watch.Interface, err error) {
|
||||
gvr := action.GetResource()
|
||||
ns := action.GetNamespace()
|
||||
watch, err := tracker.Watch(gvr, ns)
|
||||
if err != nil {
|
||||
return false, nil, err
|
||||
}
|
||||
c.kinformerWatchesPending.Dec()
|
||||
return true, watch, nil
|
||||
}
|
||||
}
|
||||
fcknative := c.kingress.(*kingressfake.Clientset)
|
||||
fcknative.PrependReactor("list", "&", klistReactor)
|
||||
fcknative.PrependWatchReactor("*", kwatchReactor(fcknative.Tracker()))
|
||||
|
||||
c.fastSync = true
|
||||
return c
|
||||
}
|
||||
@@ -107,9 +146,28 @@ func NewClient(clientConfig clientcmd.ClientConfig) (Client, error) {
|
||||
return nil, err
|
||||
}
|
||||
c.higressInformer = higressinformer.NewSharedInformerFactory(c.higress, resyncInterval)
|
||||
|
||||
c.kingress, err = kingressclient.NewForConfig(istioClient.RESTConfig())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if CheckKIngressCRDExist(istioClient.RESTConfig()) {
|
||||
c.kingressInformer = kingressinformer.NewSharedInformerFactory(c.kingress, resyncInterval)
|
||||
} else {
|
||||
c.kingressInformer = nil
|
||||
}
|
||||
|
||||
return &c, nil
|
||||
}
|
||||
|
||||
func (c *client) KIngress() kingressclient.Interface {
|
||||
return c.kingress
|
||||
}
|
||||
|
||||
func (c *client) KIngressInformer() kingressinformer.SharedInformerFactory {
|
||||
return c.kingressInformer
|
||||
}
|
||||
|
||||
func (c *client) Higress() higressclient.Interface {
|
||||
return c.higress
|
||||
}
|
||||
@@ -121,6 +179,7 @@ func (c *client) HigressInformer() higressinformer.SharedInformerFactory {
|
||||
func (c *client) RunAndWait(stop <-chan struct{}) {
|
||||
c.Client.RunAndWait(stop)
|
||||
c.higressInformer.Start(stop)
|
||||
|
||||
if c.fastSync {
|
||||
fastWaitForCacheSync(stop, c.higressInformer)
|
||||
_ = wait.PollImmediate(time.Microsecond*100, wait.ForeverTestTimeout, func() (bool, error) {
|
||||
@@ -137,6 +196,27 @@ func (c *client) RunAndWait(stop <-chan struct{}) {
|
||||
} else {
|
||||
c.higressInformer.WaitForCacheSync(stop)
|
||||
}
|
||||
|
||||
if c.kingressInformer != nil {
|
||||
c.kingressInformer.Start(stop)
|
||||
if c.fastSync {
|
||||
fastWaitForCacheSync(stop, c.kingressInformer)
|
||||
_ = wait.PollImmediate(time.Microsecond*100, wait.ForeverTestTimeout, func() (bool, error) {
|
||||
select {
|
||||
case <-stop:
|
||||
return false, fmt.Errorf("channel closed")
|
||||
default:
|
||||
}
|
||||
if c.informerWatchesPending.Load() == 0 {
|
||||
return true, nil
|
||||
}
|
||||
return false, nil
|
||||
})
|
||||
} else {
|
||||
c.kingressInformer.WaitForCacheSync(stop)
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
type reflectInformerSync interface {
|
||||
@@ -162,3 +242,23 @@ func fastWaitForCacheSync(stop <-chan struct{}, informerFactory reflectInformerS
|
||||
return true, nil
|
||||
})
|
||||
}
|
||||
|
||||
// Check Knative Ingress CRD
|
||||
func CheckKIngressCRDExist(config *rest.Config) bool {
|
||||
apiExtClientset, err := apiExtensionsV1.NewForConfig(config)
|
||||
if err != nil {
|
||||
fmt.Errorf("failed creating apiExtension Client: %v", err)
|
||||
return false
|
||||
}
|
||||
crdList, err := apiExtClientset.CustomResourceDefinitions().List(context.TODO(), metaV1.ListOptions{})
|
||||
if err != nil {
|
||||
fmt.Errorf("failed listing Custom Resource Definition: %v", err)
|
||||
return false
|
||||
}
|
||||
for _, crd := range crdList.Items {
|
||||
if crd.Name == constants.KnativeIngressCRDName {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user