Support nacos discovery (#108)

This commit is contained in:
澄潭
2022-12-14 14:39:06 +08:00
committed by GitHub
parent e01377f3ea
commit bf607ae554
69 changed files with 3314 additions and 319 deletions

View File

@@ -33,17 +33,20 @@ import (
"istio.io/istio/pkg/config/constants"
"istio.io/istio/pkg/config/schema/collection"
"istio.io/istio/pkg/config/schema/gvk"
"istio.io/istio/pkg/kube"
listersv1 "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
netlisterv1 "github.com/alibaba/higress/client/pkg/listers/networking/v1"
"github.com/alibaba/higress/pkg/ingress/kube/annotations"
"github.com/alibaba/higress/pkg/ingress/kube/common"
"github.com/alibaba/higress/pkg/ingress/kube/ingress"
"github.com/alibaba/higress/pkg/ingress/kube/ingressv1"
secretkube "github.com/alibaba/higress/pkg/ingress/kube/secret/kube"
"github.com/alibaba/higress/pkg/ingress/kube/mcpbridge"
"github.com/alibaba/higress/pkg/ingress/kube/secret"
"github.com/alibaba/higress/pkg/ingress/kube/util"
. "github.com/alibaba/higress/pkg/ingress/log"
"github.com/alibaba/higress/pkg/kube"
"github.com/alibaba/higress/registry/reconcile"
)
var (
@@ -65,12 +68,21 @@ type IngressConfig struct {
gatewayHandlers []model.EventHandler
destinationRuleHandlers []model.EventHandler
envoyFilterHandlers []model.EventHandler
serviceEntryHandlers []model.EventHandler
watchErrorHandler cache.WatchErrorHandler
cachedEnvoyFilters []config.Config
watchedSecretSet sets.Set
RegistryReconciler *reconcile.Reconciler
mcpbridgeReconciled bool
mcpbridgeController mcpbridge.McpBridgeController
mcpbridgeLister netlisterv1.McpBridgeLister
XDSUpdater model.XDSUpdater
annotationHandler annotations.AnnotationHandler
@@ -86,7 +98,7 @@ func NewIngressConfig(localKubeClient kube.Client, XDSUpdater model.XDSUpdater,
if clusterId == "Kubernetes" {
clusterId = ""
}
return &IngressConfig{
config := &IngressConfig{
remoteIngressControllers: make(map[string]common.IngressController),
localKubeClient: localKubeClient,
XDSUpdater: XDSUpdater,
@@ -97,15 +109,15 @@ func NewIngressConfig(localKubeClient kube.Client, XDSUpdater model.XDSUpdater,
watchedSecretSet: sets.NewSet(),
namespace: namespace,
}
mcpbridgeController := mcpbridge.NewController(localKubeClient, clusterId)
mcpbridgeController.AddEventHandler(config.AddOrUpdateMcpBridge, config.DeleteMcpBridge)
config.mcpbridgeController = mcpbridgeController
config.mcpbridgeLister = mcpbridgeController.Lister()
return config
}
func (m *IngressConfig) RegisterEventHandler(kind config.GroupVersionKind, f model.EventHandler) {
IngressLog.Infof("register resource %v", kind)
if kind != gvk.VirtualService && kind != gvk.Gateway &&
kind != gvk.DestinationRule && kind != gvk.EnvoyFilter {
return
}
switch kind {
case gvk.VirtualService:
m.virtualServiceHandlers = append(m.virtualServiceHandlers, f)
@@ -118,6 +130,9 @@ func (m *IngressConfig) RegisterEventHandler(kind config.GroupVersionKind, f mod
case gvk.EnvoyFilter:
m.envoyFilterHandlers = append(m.envoyFilterHandlers, f)
case gvk.ServiceEntry:
m.serviceEntryHandlers = append(m.serviceEntryHandlers, f)
}
for _, remoteIngressController := range m.remoteIngressControllers {
@@ -126,7 +141,7 @@ func (m *IngressConfig) RegisterEventHandler(kind config.GroupVersionKind, f mod
}
func (m *IngressConfig) AddLocalCluster(options common.Options) common.IngressController {
secretController := secretkube.NewController(m.localKubeClient, options)
secretController := secret.NewController(m.localKubeClient, options.ClusterId)
secretController.AddEventHandler(m.ReflectSecretChanges)
var ingressController common.IngressController
@@ -165,7 +180,8 @@ func (m *IngressConfig) List(typ config.GroupVersionKind, namespace string) ([]c
if typ != gvk.Gateway &&
typ != gvk.VirtualService &&
typ != gvk.DestinationRule &&
typ != gvk.EnvoyFilter {
typ != gvk.EnvoyFilter &&
typ != gvk.ServiceEntry {
return nil, common.ErrUnsupportedOp
}
@@ -200,6 +216,8 @@ func (m *IngressConfig) List(typ config.GroupVersionKind, namespace string) ([]c
return m.convertVirtualService(wrapperConfigs), nil
case gvk.DestinationRule:
return m.convertDestinationRule(wrapperConfigs), nil
case gvk.ServiceEntry:
return m.convertServiceEntry(wrapperConfigs), nil
}
return nil, nil
@@ -412,7 +430,6 @@ func (m *IngressConfig) convertVirtualService(configs []common.WrapperConfig) []
// We generate some specific envoy filter here to avoid duplicated computation.
m.convertEnvoyFilter(&convertOptions)
return out
}
@@ -467,6 +484,26 @@ func (m *IngressConfig) convertEnvoyFilter(convertOptions *common.ConvertOptions
m.mutex.Unlock()
}
func (m *IngressConfig) convertServiceEntry([]common.WrapperConfig) []config.Config {
if m.RegistryReconciler == nil {
return nil
}
serviceEntries := m.RegistryReconciler.GetAllServiceEntryWrapper()
out := make([]config.Config, 0, len(serviceEntries))
for _, se := range serviceEntries {
out = append(out, config.Config{
Meta: config.Meta{
GroupVersionKind: gvk.ServiceEntry,
Name: se.ServiceEntry.Hosts[0],
Namespace: "mcp",
CreationTimestamp: se.GetCreateTime(),
},
Spec: se.ServiceEntry,
})
}
return out
}
func (m *IngressConfig) convertDestinationRule(configs []common.WrapperConfig) []config.Config {
convertOptions := common.ConvertOptions{
Service2TrafficPolicy: map[common.ServiceKey]*common.WrapperTrafficPolicy{},
@@ -618,6 +655,52 @@ func (m *IngressConfig) applyInternalActiveRedirect(convertOptions *common.Conve
}
}
func (m *IngressConfig) AddOrUpdateMcpBridge(clusterNamespacedName util.ClusterNamespacedName) {
// TODO: get resource name from config
if clusterNamespacedName.Name != "default" || clusterNamespacedName.Namespace != m.namespace {
return
}
mcpbridge, err := m.mcpbridgeLister.McpBridges(clusterNamespacedName.Namespace).Get(clusterNamespacedName.Name)
if err != nil {
IngressLog.Errorf("Mcpbridge is not found, namespace:%s, name:%s",
clusterNamespacedName.Namespace, clusterNamespacedName.Name)
return
}
if m.RegistryReconciler == nil {
m.RegistryReconciler = reconcile.NewReconciler(func() {
metadata := config.Meta{
Name: "mcpbridge-serviceentry",
Namespace: m.namespace,
GroupVersionKind: gvk.ServiceEntry,
// Set this label so that we do not compare configs and just push.
Labels: map[string]string{constants.AlwaysPushLabel: "true"},
}
for _, f := range m.serviceEntryHandlers {
IngressLog.Debug("McpBridge triggerd serviceEntry update")
f(config.Config{Meta: metadata}, config.Config{Meta: metadata}, model.EventUpdate)
}
})
}
reconciler := m.RegistryReconciler
go func() {
reconciler.Reconcile(mcpbridge)
m.mutex.Lock()
m.mcpbridgeReconciled = true
m.mutex.Unlock()
}()
}
func (m *IngressConfig) DeleteMcpBridge(clusterNamespacedName util.ClusterNamespacedName) {
// TODO: get resource name from config
if clusterNamespacedName.Name != "default" || clusterNamespacedName.Namespace != m.namespace {
return
}
if m.RegistryReconciler != nil {
go m.RegistryReconciler.Reconcile(nil)
m.RegistryReconciler = nil
}
}
func (m *IngressConfig) ReflectSecretChanges(clusterNamespacedName util.ClusterNamespacedName) {
var hit bool
m.mutex.RLock()
@@ -785,7 +868,9 @@ func constructBasicAuthEnvoyFilter(rules *common.BasicAuthRules, namespace strin
}, nil
}
func (m *IngressConfig) Run(<-chan struct{}) {}
func (m *IngressConfig) Run(stop <-chan struct{}) {
m.mcpbridgeController.Run(stop)
}
func (m *IngressConfig) HasSynced() bool {
m.mutex.RLock()
@@ -795,7 +880,9 @@ func (m *IngressConfig) HasSynced() bool {
return false
}
}
if !m.mcpbridgeController.HasSynced() || !m.mcpbridgeReconciled {
return false
}
IngressLog.Info("Ingress config controller synced.")
return true
}
@@ -818,7 +905,7 @@ func (m *IngressConfig) GetIngressDomains() model.IngressDomainCollection {
}
func (m *IngressConfig) Schemas() collection.Schemas {
return common.Schemas
return common.IngressIR
}
func (m *IngressConfig) Get(config.GroupVersionKind, string, string) *config.Config {

View File

@@ -24,7 +24,6 @@ import (
"istio.io/istio/pkg/config"
"istio.io/istio/pkg/config/schema/gvk"
"istio.io/istio/pkg/config/xds"
"istio.io/istio/pkg/kube"
ingress "k8s.io/api/networking/v1"
ingressv1beta1 "k8s.io/api/networking/v1beta1"
@@ -32,6 +31,7 @@ import (
"github.com/alibaba/higress/pkg/ingress/kube/common"
controllerv1beta1 "github.com/alibaba/higress/pkg/ingress/kube/ingress"
controllerv1 "github.com/alibaba/higress/pkg/ingress/kube/ingressv1"
"github.com/alibaba/higress/pkg/kube"
)
func TestNormalizeWeightedCluster(t *testing.T) {

View File

@@ -61,6 +61,8 @@ type Ingress struct {
Fallback *FallbackConfig
Auth *AuthConfig
Destination *DestinationConfig
}
func (i *Ingress) NeedRegexMatch() bool {
@@ -129,6 +131,7 @@ func NewAnnotationHandlerManager() AnnotationHandler {
loadBalance{},
fallback{},
auth{},
destination{},
},
gatewayHandlers: []GatewayHandler{
downstreamTLS{},

View File

@@ -0,0 +1,122 @@
// Copyright (c) 2022 Alibaba Group Holding Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package annotations
import (
"bufio"
"strconv"
"strings"
networking "istio.io/api/networking/v1alpha3"
. "github.com/alibaba/higress/pkg/ingress/log"
)
const (
destinationKey = "destination"
)
var _ Parser = destination{}
type DestinationConfig struct {
McpDestination []*networking.HTTPRouteDestination
}
type destination struct{}
func (a destination) Parse(annotations Annotations, config *Ingress, globalContext *GlobalContext) error {
if !needDestinationConfig(annotations) {
return nil
}
value, err := annotations.ParseStringForHigress(destinationKey)
if err != nil {
IngressLog.Errorf("parse destination error %v within ingress %s/%s", err, config.Namespace, config.Name)
return nil
}
lines := splitLines(value)
var destinations []*networking.HTTPRouteDestination
var weightSum int64
for _, line := range lines {
// fmt: [weight] <host>[:port] [subset]
// example: 100% my-svc.DEFAULT-GROUP.xxxx.nacos:8080 v1
pairs := strings.Fields(line)
var weight int64 = 100
var addrIndex int
if strings.HasSuffix(pairs[0], "%") {
weight, err = strconv.ParseInt(strings.TrimSuffix(pairs[0], "%"), 10, 32)
if err != nil {
IngressLog.Errorf("parse destination atoi error %v within ingress %s/%s", err, config.Namespace, config.Name)
return nil
}
addrIndex++
}
weightSum += weight
if len(pairs) < addrIndex+1 {
IngressLog.Errorf("destination %s has no address within ingress %s/%s", value, config.Namespace, config.Name)
return nil
}
address := pairs[addrIndex]
host := address
var port string
colon := strings.LastIndex(address, ":")
if colon != -1 {
host, port = address[:colon], address[colon+1:]
}
var subset string
if len(pairs) >= addrIndex+2 {
subset = pairs[addrIndex+1]
}
dest := &networking.HTTPRouteDestination{
Destination: &networking.Destination{
Host: host,
Subset: subset,
},
Weight: int32(weight),
}
if port != "" {
portNumber, err := strconv.ParseUint(port, 10, 32)
if err != nil {
IngressLog.Errorf("destination addr %s has invalid port %s within ingress %s/%s", address, port, config.Namespace, config.Name)
return nil
}
dest.Destination.Port = &networking.PortSelector{
Number: uint32(portNumber),
}
}
IngressLog.Debugf("destination generated for ingress %s/%s: %v", config.Namespace, config.Name, dest)
destinations = append(destinations, dest)
}
if weightSum != 100 {
IngressLog.Errorf("destination has invalid weight sum %d within ingress %s/%s", weightSum, config.Namespace, config.Name)
return nil
}
config.Destination = &DestinationConfig{
McpDestination: destinations,
}
return nil
}
func needDestinationConfig(annotations Annotations) bool {
return annotations.HasHigress(destinationKey)
}
func splitLines(s string) []string {
var lines []string
sc := bufio.NewScanner(strings.NewReader(s))
for sc.Scan() {
lines = append(lines, sc.Text())
}
return lines
}

View File

@@ -12,23 +12,17 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package secret
package common
import (
listerv1 "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
"github.com/alibaba/higress/pkg/ingress/kube/util"
"istio.io/istio/pkg/config/schema/collection"
"istio.io/istio/pkg/config/schema/collections"
)
type Controller interface {
AddEventHandler(func(util.ClusterNamespacedName))
Run(stop <-chan struct{})
HasSynced() bool
Lister() listerv1.SecretLister
Informer() cache.SharedIndexInformer
}
var IngressIR = collection.NewSchemasBuilder().
MustAdd(collections.IstioNetworkingV1Alpha3Destinationrules).
MustAdd(collections.IstioNetworkingV1Alpha3Envoyfilters).
MustAdd(collections.IstioNetworkingV1Alpha3Gateways).
MustAdd(collections.IstioNetworkingV1Alpha3Serviceentries).
MustAdd(collections.IstioNetworkingV1Alpha3Virtualservices).
Build()

View File

@@ -28,9 +28,20 @@ import (
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/version"
netv1 "github.com/alibaba/higress/client/pkg/apis/networking/v1"
. "github.com/alibaba/higress/pkg/ingress/log"
)
func ValidateBackendResource(resource *v1.TypedLocalObjectReference) bool {
if resource == nil || resource.APIGroup == nil ||
*resource.APIGroup != netv1.SchemeGroupVersion.Group ||
resource.Kind != "McpBridge" || resource.Name != "default" {
IngressLog.Warnf("invalid mcpbridge resource: %v", resource)
return false
}
return true
}
// V1Available check if the "networking/v1" Ingress is available.
func V1Available(client kube.Client) bool {
// check kubernetes version to use new ingress package or not

View File

@@ -0,0 +1,152 @@
// Copyright (c) 2022 Alibaba Group Holding Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package controller
import (
"errors"
"time"
"istio.io/istio/pilot/pkg/model"
"istio.io/istio/pkg/kube/controllers"
kerrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/types"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
"github.com/alibaba/higress/pkg/ingress/kube/util"
. "github.com/alibaba/higress/pkg/ingress/log"
)
type Controller[lister any] interface {
AddEventHandler(addOrUpdate func(util.ClusterNamespacedName), delete ...func(util.ClusterNamespacedName))
Run(stop <-chan struct{})
HasSynced() bool
Lister() lister
Informer() cache.SharedIndexInformer
}
type GetObjectFunc[lister any] func(lister, types.NamespacedName) (controllers.Object, error)
type CommonController[lister any] struct {
typeName string
queue workqueue.RateLimitingInterface
informer cache.SharedIndexInformer
lister lister
updateHandler func(util.ClusterNamespacedName)
removeHandler func(util.ClusterNamespacedName)
getFunc GetObjectFunc[lister]
clusterId string
}
func NewCommonController[lister any](typeName string, listerObj lister, informer cache.SharedIndexInformer,
getFunc GetObjectFunc[lister], clusterId string) Controller[lister] {
q := workqueue.NewRateLimitingQueue(workqueue.DefaultItemBasedRateLimiter())
handler := controllers.LatestVersionHandlerFuncs(controllers.EnqueueForSelf(q))
informer.AddEventHandler(handler)
return &CommonController[lister]{
typeName: typeName,
queue: q,
lister: listerObj,
informer: informer,
clusterId: clusterId,
getFunc: getFunc,
}
}
func (c *CommonController[lister]) Lister() lister {
return c.lister
}
func (c *CommonController[lister]) Informer() cache.SharedIndexInformer {
return c.informer
}
func (c *CommonController[lister]) AddEventHandler(addOrUpdate func(util.ClusterNamespacedName), delete ...func(util.ClusterNamespacedName)) {
c.updateHandler = addOrUpdate
if len(delete) > 0 {
c.removeHandler = delete[0]
}
}
func (c *CommonController[lister]) Run(stop <-chan struct{}) {
defer utilruntime.HandleCrash()
defer c.queue.ShutDown()
if !cache.WaitForCacheSync(stop, c.HasSynced) {
IngressLog.Errorf("Failed to sync %s controller cache", c.typeName)
return
}
IngressLog.Debugf("%s cache has synced")
go wait.Until(c.worker, time.Second, stop)
<-stop
}
func (c *CommonController[lister]) worker() {
for c.processNextWorkItem() {
}
}
func (c *CommonController[lister]) processNextWorkItem() bool {
key, quit := c.queue.Get()
if quit {
return false
}
defer c.queue.Done(key)
ingressNamespacedName := key.(types.NamespacedName)
IngressLog.Debugf("%s %s push to queue", c.typeName, ingressNamespacedName)
if err := c.onEvent(ingressNamespacedName); err != nil {
IngressLog.Errorf("error processing %s item (%v) (retrying): %v", c.typeName, key, err)
c.queue.AddRateLimited(key)
} else {
c.queue.Forget(key)
}
return true
}
func (c *CommonController[lister]) onEvent(namespacedName types.NamespacedName) error {
if c.getFunc == nil {
return errors.New("getFunc is nil")
}
obj := util.ClusterNamespacedName{
NamespacedName: model.NamespacedName{
Namespace: namespacedName.Namespace,
Name: namespacedName.Name,
},
ClusterId: c.clusterId,
}
_, err := c.getFunc(c.lister, namespacedName)
if err != nil {
if kerrors.IsNotFound(err) {
if c.removeHandler == nil {
return nil
}
c.removeHandler(obj)
} else {
return err
}
}
c.updateHandler(obj)
return nil
}
func (c *CommonController[lister]) HasSynced() bool {
return c.informer.HasSynced()
}

View File

@@ -82,13 +82,14 @@ type controller struct {
// May be nil if ingress class is not supported in the cluster
classes v1beta1.IngressClassInformer
secretController secret.Controller
secretController secret.SecretController
statusSyncer *statusSyncer
}
// NewController creates a new Kubernetes controller
func NewController(localKubeClient, client kubeclient.Client, options common.Options, secretController secret.Controller) common.IngressController {
func NewController(localKubeClient, client kubeclient.Client, options common.Options,
secretController secret.SecretController) common.IngressController {
q := workqueue.NewRateLimitingQueue(workqueue.DefaultItemBasedRateLimiter())
ingressInformer := client.KubeInformer().Networking().V1beta1().Ingresses()
@@ -471,7 +472,7 @@ func (c *controller) ConvertHTTPRoute(convertOptions *common.ConvertOptions, wra
return fmt.Errorf("invalid ingress rule %s:%s in cluster %s, either `defaultBackend` or `rules` must be specified", cfg.Namespace, cfg.Name, c.options.ClusterId)
}
if ingressV1.Backend != nil && ingressV1.Backend.ServiceName != "" {
if ingressV1.Backend != nil && (ingressV1.Backend.ServiceName != "" || ingressV1.Backend.Resource != nil) {
convertOptions.HasDefaultBackend = true
}
@@ -571,7 +572,8 @@ func (c *controller) ConvertHTTPRoute(convertOptions *common.ConvertOptions, wra
// backend service check
var event common.Event
wrapperHttpRoute.HTTPRoute.Route, event = c.backendToRouteDestination(&httpPath.Backend, cfg.Namespace, ingressRouteBuilder)
destinationConfig := wrapper.AnnotationsConfig.Destination
wrapperHttpRoute.HTTPRoute.Route, event = c.backendToRouteDestination(&httpPath.Backend, cfg.Namespace, ingressRouteBuilder, destinationConfig)
if ingressRouteBuilder.Event != common.Normal {
event = ingressRouteBuilder.Event
@@ -752,7 +754,8 @@ func (c *controller) ApplyCanaryIngress(convertOptions *common.ConvertOptions, w
ingressRouteBuilder := convertOptions.IngressRouteCache.New(canary)
// backend service check
var event common.Event
canary.HTTPRoute.Route, event = c.backendToRouteDestination(&httpPath.Backend, cfg.Namespace, ingressRouteBuilder)
destinationConfig := wrapper.AnnotationsConfig.Destination
canary.HTTPRoute.Route, event = c.backendToRouteDestination(&httpPath.Backend, cfg.Namespace, ingressRouteBuilder, destinationConfig)
if event != common.Normal {
common.IncrementInvalidIngress(c.options.ClusterId, event)
ingressRouteBuilder.Event = event
@@ -881,31 +884,40 @@ func (c *controller) ConvertTrafficPolicy(convertOptions *common.ConvertOptions,
}
func (c *controller) createDefaultRoute(wrapper *common.WrapperConfig, backend *ingress.IngressBackend, host string) *common.WrapperHTTPRoute {
if backend == nil || backend.ServiceName == "" {
if backend == nil {
return nil
}
namespace := wrapper.Config.Namespace
var routeDestination []*networking.HTTPRouteDestination
port := &networking.PortSelector{}
if backend.ServicePort.Type == intstr.Int {
port.Number = uint32(backend.ServicePort.IntVal)
if common.ValidateBackendResource(backend.Resource) {
routeDestination = wrapper.AnnotationsConfig.Destination.McpDestination
} else {
resolvedPort, err := resolveNamedPort(backend, namespace, c.serviceLister)
if err != nil {
if backend.ServiceName == "" {
return nil
}
port.Number = uint32(resolvedPort)
}
namespace := wrapper.Config.Namespace
routeDestination := []*networking.HTTPRouteDestination{
{
Destination: &networking.Destination{
Host: util.CreateServiceFQDN(namespace, backend.ServiceName),
Port: port,
port := &networking.PortSelector{}
if backend.ServicePort.Type == intstr.Int {
port.Number = uint32(backend.ServicePort.IntVal)
} else {
resolvedPort, err := resolveNamedPort(backend, namespace, c.serviceLister)
if err != nil {
return nil
}
port.Number = uint32(resolvedPort)
}
routeDestination = []*networking.HTTPRouteDestination{
{
Destination: &networking.Destination{
Host: util.CreateServiceFQDN(namespace, backend.ServiceName),
Port: port,
},
Weight: 100,
},
Weight: 100,
},
}
}
route := &common.WrapperHTTPRoute{
@@ -954,12 +966,15 @@ func isCanaryRoute(canary, route *common.WrapperHTTPRoute) bool {
}
func (c *controller) backendToRouteDestination(backend *ingress.IngressBackend, namespace string,
builder *common.IngressRouteBuilder) ([]*networking.HTTPRouteDestination, common.Event) {
builder *common.IngressRouteBuilder, config *annotations.DestinationConfig) ([]*networking.HTTPRouteDestination, common.Event) {
if backend == nil {
return nil, common.InvalidBackendService
}
if backend.ServiceName == "" {
if config != nil {
return config.McpDestination, common.Normal
}
return nil, common.InvalidBackendService
}

View File

@@ -80,13 +80,13 @@ type controller struct {
serviceLister listerv1.ServiceLister
classes networkingv1.IngressClassInformer
secretController secret.Controller
secretController secret.SecretController
statusSyncer *statusSyncer
}
// NewController creates a new Kubernetes controller
func NewController(localKubeClient, client kubeclient.Client, options common.Options, secretController secret.Controller) common.IngressController {
func NewController(localKubeClient, client kubeclient.Client, options common.Options, secretController secret.SecretController) common.IngressController {
q := workqueue.NewRateLimitingQueue(workqueue.DefaultItemBasedRateLimiter())
ingressInformer := client.KubeInformer().Networking().V1().Ingresses()
@@ -465,8 +465,10 @@ func (c *controller) ConvertHTTPRoute(convertOptions *common.ConvertOptions, wra
return fmt.Errorf("invalid ingress rule %s:%s in cluster %s, either `defaultBackend` or `rules` must be specified", cfg.Namespace, cfg.Name, c.options.ClusterId)
}
if ingressV1.DefaultBackend != nil && ingressV1.DefaultBackend.Service != nil &&
ingressV1.DefaultBackend.Service.Name != "" {
if ingressV1.DefaultBackend != nil &&
((ingressV1.DefaultBackend.Service != nil &&
ingressV1.DefaultBackend.Service.Name != "") ||
ingressV1.DefaultBackend.Resource != nil) {
convertOptions.HasDefaultBackend = true
}
@@ -566,8 +568,8 @@ func (c *controller) ConvertHTTPRoute(convertOptions *common.ConvertOptions, wra
// backend service check
var event common.Event
wrapperHttpRoute.HTTPRoute.Route, event = c.backendToRouteDestination(&httpPath.Backend, cfg.Namespace, ingressRouteBuilder)
destinationConfig := wrapper.AnnotationsConfig.Destination
wrapperHttpRoute.HTTPRoute.Route, event = c.backendToRouteDestination(&httpPath.Backend, cfg.Namespace, ingressRouteBuilder, destinationConfig)
if ingressRouteBuilder.Event != common.Normal {
event = ingressRouteBuilder.Event
}
@@ -748,7 +750,8 @@ func (c *controller) ApplyCanaryIngress(convertOptions *common.ConvertOptions, w
ingressRouteBuilder := convertOptions.IngressRouteCache.New(canary)
// backend service check
var event common.Event
canary.HTTPRoute.Route, event = c.backendToRouteDestination(&httpPath.Backend, cfg.Namespace, ingressRouteBuilder)
destinationConfig := wrapper.AnnotationsConfig.Destination
canary.HTTPRoute.Route, event = c.backendToRouteDestination(&httpPath.Backend, cfg.Namespace, ingressRouteBuilder, destinationConfig)
if event != common.Normal {
common.IncrementInvalidIngress(c.options.ClusterId, event)
ingressRouteBuilder.Event = event
@@ -877,32 +880,38 @@ func (c *controller) ConvertTrafficPolicy(convertOptions *common.ConvertOptions,
}
func (c *controller) createDefaultRoute(wrapper *common.WrapperConfig, backend *ingress.IngressBackend, host string) *common.WrapperHTTPRoute {
if backend == nil || backend.Service == nil || backend.Service.Name == "" {
if backend == nil {
return nil
}
service := backend.Service
namespace := wrapper.Config.Namespace
var routeDestination []*networking.HTTPRouteDestination
port := &networking.PortSelector{}
if service.Port.Number > 0 {
port.Number = uint32(service.Port.Number)
if common.ValidateBackendResource(backend.Resource) {
routeDestination = wrapper.AnnotationsConfig.Destination.McpDestination
} else {
resolvedPort, err := resolveNamedPort(service, namespace, c.serviceLister)
if err != nil {
return nil
}
port.Number = uint32(resolvedPort)
}
service := backend.Service
namespace := wrapper.Config.Namespace
routeDestination := []*networking.HTTPRouteDestination{
{
Destination: &networking.Destination{
Host: util.CreateServiceFQDN(namespace, service.Name),
Port: port,
port := &networking.PortSelector{}
if service.Port.Number > 0 {
port.Number = uint32(service.Port.Number)
} else {
resolvedPort, err := resolveNamedPort(service, namespace, c.serviceLister)
if err != nil {
return nil
}
port.Number = uint32(resolvedPort)
}
routeDestination = []*networking.HTTPRouteDestination{
{
Destination: &networking.Destination{
Host: util.CreateServiceFQDN(namespace, service.Name),
Port: port,
},
Weight: 100,
},
Weight: 100,
},
}
}
route := &common.WrapperHTTPRoute{
@@ -951,16 +960,19 @@ func isCanaryRoute(canary, route *common.WrapperHTTPRoute) bool {
}
func (c *controller) backendToRouteDestination(backend *ingress.IngressBackend, namespace string,
builder *common.IngressRouteBuilder) ([]*networking.HTTPRouteDestination, common.Event) {
if backend == nil || backend.Service == nil {
builder *common.IngressRouteBuilder, config *annotations.DestinationConfig) ([]*networking.HTTPRouteDestination, common.Event) {
if backend == nil || (backend.Service == nil && backend.Resource == nil) {
return nil, common.InvalidBackendService
}
if backend.Service == nil {
if config != nil {
return config.McpDestination, common.Normal
}
return nil, common.InvalidBackendService
}
service := backend.Service
if service.Name == "" {
return nil, common.InvalidBackendService
}
builder.PortName = service.Port.Name
port := &networking.PortSelector{}

View File

@@ -0,0 +1,46 @@
// Copyright (c) 2022 Alibaba Group Holding Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package mcpbridge
import (
"time"
"istio.io/istio/pkg/kube/controllers"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/cache"
v1 "github.com/alibaba/higress/client/pkg/apis/networking/v1"
"github.com/alibaba/higress/client/pkg/clientset/versioned"
informersv1 "github.com/alibaba/higress/client/pkg/informers/externalversions/networking/v1"
listersv1 "github.com/alibaba/higress/client/pkg/listers/networking/v1"
"github.com/alibaba/higress/pkg/ingress/kube/controller"
kubeclient "github.com/alibaba/higress/pkg/kube"
)
type McpBridgeController controller.Controller[listersv1.McpBridgeLister]
func NewController(client kubeclient.Client, clusterId string) McpBridgeController {
informer := client.HigressInformer().InformerFor(&v1.McpBridge{}, func(k versioned.Interface, resync time.Duration) cache.SharedIndexInformer {
return informersv1.NewMcpBridgeInformer(k, metav1.NamespaceAll, resync,
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
})
return controller.NewCommonController("mcpbridge", listersv1.NewMcpBridgeLister(informer.GetIndexer()),
informer, GetMcpBridge, clusterId)
}
func GetMcpBridge(lister listersv1.McpBridgeLister, namespacedName types.NamespacedName) (controllers.Object, error) {
return lister.McpBridges(namespacedName.Namespace).Get(namespacedName.Name)
}

View File

@@ -0,0 +1,53 @@
// Copyright (c) 2022 Alibaba Group Holding Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package secret
import (
"time"
kubeclient "istio.io/istio/pkg/kube"
"istio.io/istio/pkg/kube/controllers"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/types"
informersv1 "k8s.io/client-go/informers/core/v1"
"k8s.io/client-go/kubernetes"
listersv1 "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
"github.com/alibaba/higress/pkg/ingress/kube/controller"
)
type SecretController controller.Controller[listersv1.SecretLister]
func NewController(client kubeclient.Client, clusterId string) SecretController {
informer := client.KubeInformer().InformerFor(&v1.Secret{}, func(k kubernetes.Interface, resync time.Duration) cache.SharedIndexInformer {
return informersv1.NewFilteredSecretInformer(
k, metav1.NamespaceAll, resync, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
func(options *metav1.ListOptions) {
options.FieldSelector = fields.AndSelectors(
fields.OneTermNotEqualSelector("type", "helm.sh/release.v1"),
fields.OneTermNotEqualSelector("type", string(v1.SecretTypeServiceAccountToken)),
).String()
},
)
})
return controller.NewCommonController("secret", listersv1.NewSecretLister(informer.GetIndexer()), informer, GetSecret, clusterId)
}
func GetSecret(lister listersv1.SecretLister, namespacedName types.NamespacedName) (controllers.Object, error) {
return lister.Secrets(namespacedName.Namespace).Get(namespacedName.Name)
}

View File

@@ -1,148 +0,0 @@
// Copyright (c) 2022 Alibaba Group Holding Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package kube
import (
"time"
"istio.io/istio/pilot/pkg/model"
kubeclient "istio.io/istio/pkg/kube"
"istio.io/istio/pkg/kube/controllers"
v1 "k8s.io/api/core/v1"
kerrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/types"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
informersv1 "k8s.io/client-go/informers/core/v1"
"k8s.io/client-go/kubernetes"
listersv1 "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
"github.com/alibaba/higress/pkg/ingress/kube/common"
"github.com/alibaba/higress/pkg/ingress/kube/secret"
"github.com/alibaba/higress/pkg/ingress/kube/util"
. "github.com/alibaba/higress/pkg/ingress/log"
)
var _ secret.Controller = &controller{}
type controller struct {
queue workqueue.RateLimitingInterface
informer cache.SharedIndexInformer
lister listersv1.SecretLister
handler func(util.ClusterNamespacedName)
clusterId string
}
// NewController is copied from NewCredentialsController.
func NewController(client kubeclient.Client, options common.Options) secret.Controller {
q := workqueue.NewRateLimitingQueue(workqueue.DefaultItemBasedRateLimiter())
informer := client.KubeInformer().InformerFor(&v1.Secret{}, func(k kubernetes.Interface, resync time.Duration) cache.SharedIndexInformer {
return informersv1.NewFilteredSecretInformer(
k, metav1.NamespaceAll, resync, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
func(options *metav1.ListOptions) {
options.FieldSelector = fields.AndSelectors(
fields.OneTermNotEqualSelector("type", "helm.sh/release.v1"),
fields.OneTermNotEqualSelector("type", string(v1.SecretTypeServiceAccountToken)),
).String()
},
)
})
handler := controllers.LatestVersionHandlerFuncs(controllers.EnqueueForSelf(q))
informer.AddEventHandler(handler)
return &controller{
queue: q,
informer: informer,
lister: listersv1.NewSecretLister(informer.GetIndexer()),
clusterId: options.ClusterId,
}
}
func (c *controller) Lister() listersv1.SecretLister {
return c.lister
}
func (c *controller) Informer() cache.SharedIndexInformer {
return c.informer
}
func (c *controller) AddEventHandler(f func(util.ClusterNamespacedName)) {
c.handler = f
}
func (c *controller) Run(stop <-chan struct{}) {
defer utilruntime.HandleCrash()
defer c.queue.ShutDown()
if !cache.WaitForCacheSync(stop, c.HasSynced) {
IngressLog.Errorf("Failed to sync secret controller cache")
return
}
go wait.Until(c.worker, time.Second, stop)
<-stop
}
func (c *controller) worker() {
for c.processNextWorkItem() {
}
}
func (c *controller) processNextWorkItem() bool {
key, quit := c.queue.Get()
if quit {
return false
}
defer c.queue.Done(key)
ingressNamespacedName := key.(types.NamespacedName)
IngressLog.Debugf("secret %s push to queue", ingressNamespacedName)
if err := c.onEvent(ingressNamespacedName); err != nil {
IngressLog.Errorf("error processing secret item (%v) (retrying): %v", key, err)
c.queue.AddRateLimited(key)
} else {
c.queue.Forget(key)
}
return true
}
func (c *controller) onEvent(namespacedName types.NamespacedName) error {
_, err := c.lister.Secrets(namespacedName.Namespace).Get(namespacedName.Name)
if err != nil {
if kerrors.IsNotFound(err) {
return nil
} else {
return err
}
}
// We only care about add or update event.
c.handler(util.ClusterNamespacedName{
NamespacedName: model.NamespacedName{
Namespace: namespacedName.Namespace,
Name: namespacedName.Name,
},
ClusterId: c.clusterId,
})
return nil
}
func (c *controller) HasSynced() bool {
return c.informer.HasSynced()
}

View File

@@ -27,6 +27,45 @@ import (
"istio.io/istio/pilot/pkg/xds"
)
type ServiceEntryGenerator struct {
Server *xds.DiscoveryServer
}
func (c ServiceEntryGenerator) Generate(proxy *model.Proxy, push *model.PushContext, w *model.WatchedResource,
updates *model.PushRequest) ([]*any.Any, model.XdsLogDetails, error) {
resources := make([]*any.Any, 0)
configs := push.AllServiceEntries
for _, config := range configs {
body, err := types.MarshalAny(config.Spec.(*networking.ServiceEntry))
if err != nil {
return nil, model.DefaultXdsLogDetails, err
}
createTime, err := types.TimestampProto(config.CreationTimestamp)
if err != nil {
return nil, model.DefaultXdsLogDetails, err
}
resource := &mcp.Resource{
Body: body,
Metadata: &mcp.Metadata{
Name: path.Join(config.Namespace, config.Name),
CreateTime: createTime,
},
}
mcpAny, err := ptypes.MarshalAny(resource)
if err != nil {
return nil, model.DefaultXdsLogDetails, err
}
resources = append(resources, mcpAny)
}
return resources, model.DefaultXdsLogDetails, nil
}
func (c ServiceEntryGenerator) GenerateDeltas(proxy *model.Proxy, push *model.PushContext, updates *model.PushRequest,
w *model.WatchedResource) ([]*any.Any, []string, model.XdsLogDetails, bool, error) {
// TODO: delta implement
return nil, nil, model.DefaultXdsLogDetails, false, nil
}
type VirtualServiceGenerator struct {
Server *xds.DiscoveryServer
}