Support wasm api (#129)

This commit is contained in:
澄潭
2023-01-18 14:56:51 +08:00
committed by GitHub
parent 2133c273e9
commit d40a7c1f34
54 changed files with 3161 additions and 99 deletions

View File

@@ -16,6 +16,8 @@ package config
import (
"encoding/json"
"errors"
"fmt"
"strings"
"sync"
@@ -23,9 +25,12 @@ import (
wasm "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/http/wasm/v3"
httppb "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/network/http_connection_manager/v3"
v3 "github.com/envoyproxy/go-control-plane/envoy/extensions/wasm/v3"
"github.com/gogo/protobuf/types"
"github.com/golang/protobuf/ptypes/wrappers"
"google.golang.org/protobuf/types/known/anypb"
extensions "istio.io/api/extensions/v1alpha1"
networking "istio.io/api/networking/v1alpha3"
istiotype "istio.io/api/type/v1beta1"
"istio.io/istio/pilot/pkg/model"
networkingutil "istio.io/istio/pilot/pkg/networking/util"
"istio.io/istio/pilot/pkg/util/sets"
@@ -36,6 +41,8 @@ import (
listersv1 "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
higressext "github.com/alibaba/higress/api/extensions/v1alpha1"
extlisterv1 "github.com/alibaba/higress/client/pkg/listers/extensions/v1alpha1"
netlisterv1 "github.com/alibaba/higress/client/pkg/listers/networking/v1"
"github.com/alibaba/higress/pkg/ingress/kube/annotations"
"github.com/alibaba/higress/pkg/ingress/kube/common"
@@ -44,6 +51,7 @@ import (
"github.com/alibaba/higress/pkg/ingress/kube/mcpbridge"
"github.com/alibaba/higress/pkg/ingress/kube/secret"
"github.com/alibaba/higress/pkg/ingress/kube/util"
"github.com/alibaba/higress/pkg/ingress/kube/wasmplugin"
. "github.com/alibaba/higress/pkg/ingress/log"
"github.com/alibaba/higress/pkg/kube"
"github.com/alibaba/higress/registry/reconcile"
@@ -69,6 +77,7 @@ type IngressConfig struct {
destinationRuleHandlers []model.EventHandler
envoyFilterHandlers []model.EventHandler
serviceEntryHandlers []model.EventHandler
wasmPluginHandlers []model.EventHandler
watchErrorHandler cache.WatchErrorHandler
cachedEnvoyFilters []config.Config
@@ -83,6 +92,12 @@ type IngressConfig struct {
mcpbridgeLister netlisterv1.McpBridgeLister
wasmPluginController wasmplugin.WasmPluginController
wasmPluginLister extlisterv1.WasmPluginLister
wasmPlugins map[string]*extensions.WasmPlugin
XDSUpdater model.XDSUpdater
annotationHandler annotations.AnnotationHandler
@@ -109,11 +124,17 @@ func NewIngressConfig(localKubeClient kube.Client, XDSUpdater model.XDSUpdater,
watchedSecretSet: sets.NewSet(),
namespace: namespace,
mcpbridgeReconciled: true,
wasmPlugins: make(map[string]*extensions.WasmPlugin),
}
mcpbridgeController := mcpbridge.NewController(localKubeClient, clusterId)
mcpbridgeController.AddEventHandler(config.AddOrUpdateMcpBridge, config.DeleteMcpBridge)
config.mcpbridgeController = mcpbridgeController
config.mcpbridgeLister = mcpbridgeController.Lister()
wasmPluginController := wasmplugin.NewController(localKubeClient, clusterId)
wasmPluginController.AddEventHandler(config.AddOrUpdateWasmPlugin, config.DeleteWasmPlugin)
config.wasmPluginController = wasmPluginController
config.wasmPluginLister = wasmPluginController.Lister()
return config
}
@@ -134,6 +155,9 @@ func (m *IngressConfig) RegisterEventHandler(kind config.GroupVersionKind, f mod
case gvk.ServiceEntry:
m.serviceEntryHandlers = append(m.serviceEntryHandlers, f)
case gvk.WasmPlugin:
m.wasmPluginHandlers = append(m.wasmPluginHandlers, f)
}
for _, remoteIngressController := range m.remoteIngressControllers {
@@ -169,7 +193,8 @@ func (m *IngressConfig) List(typ config.GroupVersionKind, namespace string) ([]c
typ != gvk.VirtualService &&
typ != gvk.DestinationRule &&
typ != gvk.EnvoyFilter &&
typ != gvk.ServiceEntry {
typ != gvk.ServiceEntry &&
typ != gvk.WasmPlugin {
return nil, common.ErrUnsupportedOp
}
@@ -206,6 +231,8 @@ func (m *IngressConfig) List(typ config.GroupVersionKind, namespace string) ([]c
return m.convertDestinationRule(wrapperConfigs), nil
case gvk.ServiceEntry:
return m.convertServiceEntry(wrapperConfigs), nil
case gvk.WasmPlugin:
return m.convertWasmPlugin(wrapperConfigs), nil
}
return nil, nil
@@ -472,6 +499,23 @@ func (m *IngressConfig) convertEnvoyFilter(convertOptions *common.ConvertOptions
m.mutex.Unlock()
}
func (m *IngressConfig) convertWasmPlugin([]common.WrapperConfig) []config.Config {
m.mutex.RLock()
defer m.mutex.RUnlock()
out := make([]config.Config, 0, len(m.wasmPlugins))
for name, wasmPlugin := range m.wasmPlugins {
out = append(out, config.Config{
Meta: config.Meta{
GroupVersionKind: gvk.WasmPlugin,
Name: name,
Namespace: m.namespace,
},
Spec: wasmPlugin,
})
}
return out
}
func (m *IngressConfig) convertServiceEntry([]common.WrapperConfig) []config.Config {
if m.RegistryReconciler == nil {
return nil
@@ -643,6 +687,153 @@ func (m *IngressConfig) applyInternalActiveRedirect(convertOptions *common.Conve
}
}
func (m *IngressConfig) convertIstioWasmPlugin(obj *higressext.WasmPlugin) (*extensions.WasmPlugin, error) {
result := &extensions.WasmPlugin{
Selector: &istiotype.WorkloadSelector{
MatchLabels: map[string]string{
"higress": m.namespace + "-higress-gateway",
},
},
Url: obj.Url,
Sha256: obj.Sha256,
ImagePullPolicy: extensions.PullPolicy(obj.ImagePullPolicy),
ImagePullSecret: obj.ImagePullSecret,
VerificationKey: obj.VerificationKey,
PluginConfig: obj.PluginConfig,
PluginName: obj.PluginName,
Phase: extensions.PluginPhase(obj.Phase),
Priority: obj.Priority,
}
if result.PluginConfig != nil {
return result, nil
}
result.PluginConfig = obj.DefaultConfig
if len(obj.MatchRules) > 0 {
if result.PluginConfig == nil {
result.PluginConfig = &types.Struct{
Fields: map[string]*types.Value{},
}
}
var ruleValues []*types.Value
for _, rule := range obj.MatchRules {
if rule.Config == nil {
return nil, errors.New("invalid rule has no config")
}
v := &types.Value_StructValue{
StructValue: rule.Config,
}
var matchItems []*types.Value
for _, ing := range rule.Ingress {
matchItems = append(matchItems, &types.Value{
Kind: &types.Value_StringValue{
StringValue: ing,
},
})
}
if len(matchItems) > 0 {
v.StructValue.Fields["_match_route_"] = &types.Value{
Kind: &types.Value_ListValue{
ListValue: &types.ListValue{
Values: matchItems,
},
},
}
ruleValues = append(ruleValues, &types.Value{
Kind: v,
})
continue
}
for _, domain := range rule.Domain {
matchItems = append(matchItems, &types.Value{
Kind: &types.Value_StringValue{
StringValue: domain,
},
})
}
if len(matchItems) == 0 {
return nil, fmt.Errorf("invalid match rule has no match condition, rule:%v", rule)
}
v.StructValue.Fields["_match_domain_"] = &types.Value{
Kind: &types.Value_ListValue{
ListValue: &types.ListValue{
Values: matchItems,
},
},
}
ruleValues = append(ruleValues, &types.Value{
Kind: v,
})
}
result.PluginConfig.Fields["_rules_"] = &types.Value{
Kind: &types.Value_ListValue{
ListValue: &types.ListValue{
Values: ruleValues,
},
},
}
}
return result, nil
}
func (m *IngressConfig) AddOrUpdateWasmPlugin(clusterNamespacedName util.ClusterNamespacedName) {
if clusterNamespacedName.Namespace != m.namespace {
return
}
wasmPlugin, err := m.wasmPluginLister.WasmPlugins(clusterNamespacedName.Namespace).Get(clusterNamespacedName.Name)
if err != nil {
IngressLog.Errorf("wasmPlugin is not found, namespace:%s, name:%s",
clusterNamespacedName.Namespace, clusterNamespacedName.Name)
return
}
metadata := config.Meta{
Name: clusterNamespacedName.Name + "-wasmplugin",
Namespace: clusterNamespacedName.Namespace,
GroupVersionKind: gvk.WasmPlugin,
// Set this label so that we do not compare configs and just push.
Labels: map[string]string{constants.AlwaysPushLabel: "true"},
}
for _, f := range m.wasmPluginHandlers {
IngressLog.Debug("WasmPlugin triggerd update")
f(config.Config{Meta: metadata}, config.Config{Meta: metadata}, model.EventUpdate)
}
istioWasmPlugin, err := m.convertIstioWasmPlugin(&wasmPlugin.Spec)
if err != nil {
IngressLog.Errorf("invalid wasmPlugin:%s, err:%v", clusterNamespacedName.Name, err)
return
}
IngressLog.Debugf("wasmPlugin:%s convert to istioWasmPlugin:%v", clusterNamespacedName.Name, istioWasmPlugin)
m.mutex.Lock()
m.wasmPlugins[clusterNamespacedName.Name] = istioWasmPlugin
m.mutex.Unlock()
}
func (m *IngressConfig) DeleteWasmPlugin(clusterNamespacedName util.ClusterNamespacedName) {
if clusterNamespacedName.Namespace != m.namespace {
return
}
var hit bool
m.mutex.Lock()
if _, ok := m.wasmPlugins[clusterNamespacedName.Name]; ok {
delete(m.wasmPlugins, clusterNamespacedName.Name)
hit = true
}
m.mutex.Unlock()
if hit {
metadata := config.Meta{
Name: clusterNamespacedName.Name + "-wasmplugin",
Namespace: clusterNamespacedName.Namespace,
GroupVersionKind: gvk.WasmPlugin,
// Set this label so that we do not compare configs and just push.
Labels: map[string]string{constants.AlwaysPushLabel: "true"},
}
for _, f := range m.wasmPluginHandlers {
IngressLog.Debug("WasmPlugin triggerd update")
f(config.Config{Meta: metadata}, config.Config{Meta: metadata}, model.EventDelete)
}
}
}
func (m *IngressConfig) AddOrUpdateMcpBridge(clusterNamespacedName util.ClusterNamespacedName) {
// TODO: get resource name from config
if clusterNamespacedName.Name != "default" || clusterNamespacedName.Namespace != m.namespace {
@@ -860,7 +1051,8 @@ func constructBasicAuthEnvoyFilter(rules *common.BasicAuthRules, namespace strin
}
func (m *IngressConfig) Run(stop <-chan struct{}) {
m.mcpbridgeController.Run(stop)
go m.mcpbridgeController.Run(stop)
go m.wasmPluginController.Run(stop)
}
func (m *IngressConfig) HasSynced() bool {
@@ -874,6 +1066,9 @@ func (m *IngressConfig) HasSynced() bool {
if !m.mcpbridgeController.HasSynced() || !m.mcpbridgeReconciled {
return false
}
if !m.wasmPluginController.HasSynced() {
return false
}
IngressLog.Info("Ingress config controller synced.")
return true
}

View File

@@ -25,4 +25,5 @@ var IngressIR = collection.NewSchemasBuilder().
MustAdd(collections.IstioNetworkingV1Alpha3Gateways).
MustAdd(collections.IstioNetworkingV1Alpha3Serviceentries).
MustAdd(collections.IstioNetworkingV1Alpha3Virtualservices).
MustAdd(collections.IstioExtensionsV1Alpha1Wasmplugins).
Build()

View File

@@ -277,12 +277,15 @@ func partMd5(raw string) string {
return encoded[:4] + encoded[len(encoded)-4:]
}
func GenerateUniqueRouteName(route *WrapperHTTPRoute) string {
func GenerateUniqueRouteName(defaultNs string, route *WrapperHTTPRoute) string {
if route.WrapperConfig.Config.Namespace == defaultNs {
return route.WrapperConfig.Config.Name
}
return route.Meta()
}
func GenerateUniqueRouteNameWithSuffix(route *WrapperHTTPRoute, suffix string) string {
return CreateConvertedName(route.Meta(), suffix)
func GenerateUniqueRouteNameWithSuffix(defaultNs string, route *WrapperHTTPRoute, suffix string) string {
return CreateConvertedName(GenerateUniqueRouteName(defaultNs, route), suffix)
}
func SplitServiceFQDN(fqdn string) (string, string, bool) {

View File

@@ -198,7 +198,8 @@ func TestGenerateUniqueRouteName(t *testing.T) {
},
}
assert.Equal(t, "bar/foo", GenerateUniqueRouteName(input))
assert.Equal(t, "bar/foo", GenerateUniqueRouteName("xxx", input))
assert.Equal(t, "foo", GenerateUniqueRouteName("bar", input))
}

View File

@@ -548,7 +548,7 @@ func (c *controller) ConvertHTTPRoute(convertOptions *common.ConvertOptions, wra
}
wrapperHttpRoute.OriginPath = path
wrapperHttpRoute.HTTPRoute.Match = []*networking.HTTPMatchRequest{httpMatch}
wrapperHttpRoute.HTTPRoute.Name = common.GenerateUniqueRouteName(wrapperHttpRoute)
wrapperHttpRoute.HTTPRoute.Name = common.GenerateUniqueRouteName(c.options.SystemNamespace, wrapperHttpRoute)
ingressRouteBuilder := convertOptions.IngressRouteCache.New(wrapperHttpRoute)
@@ -753,7 +753,7 @@ func (c *controller) ApplyCanaryIngress(convertOptions *common.ConvertOptions, w
}
canary.OriginPath = path
canary.HTTPRoute.Match = []*networking.HTTPMatchRequest{httpMatch}
canary.HTTPRoute.Name = common.GenerateUniqueRouteName(canary)
canary.HTTPRoute.Name = common.GenerateUniqueRouteName(c.options.SystemNamespace, canary)
ingressRouteBuilder := convertOptions.IngressRouteCache.New(canary)
// backend service check
@@ -781,7 +781,7 @@ func (c *controller) ApplyCanaryIngress(convertOptions *common.ConvertOptions, w
if byHeader {
IngressLog.Debug("Insert canary route by header")
annotations.ApplyByHeader(canary.HTTPRoute, route.HTTPRoute, canary.WrapperConfig.AnnotationsConfig)
canary.HTTPRoute.Name = common.GenerateUniqueRouteName(canary)
canary.HTTPRoute.Name = common.GenerateUniqueRouteName(c.options.SystemNamespace, canary)
} else {
IngressLog.Debug("Merge canary route by weight")
if route.WeightTotal == 0 {
@@ -809,7 +809,7 @@ func (c *controller) ApplyCanaryIngress(convertOptions *common.ConvertOptions, w
convertOptions.HTTPRoutes[rule.Host] = routes
// Recreate route name.
ingressRouteBuilder.RouteName = common.GenerateUniqueRouteName(canary)
ingressRouteBuilder.RouteName = common.GenerateUniqueRouteName(c.options.SystemNamespace, canary)
convertOptions.IngressRouteCache.Add(ingressRouteBuilder)
} else {
convertOptions.IngressRouteCache.Update(targetRoute)
@@ -935,7 +935,7 @@ func (c *controller) createDefaultRoute(wrapper *common.WrapperConfig, backend *
OriginPathType: common.Prefix,
OriginPath: "/",
}
route.HTTPRoute.Name = common.GenerateUniqueRouteNameWithSuffix(route, "default")
route.HTTPRoute.Name = common.GenerateUniqueRouteNameWithSuffix(c.options.SystemNamespace, route, "default")
return route
}

View File

@@ -544,7 +544,7 @@ func (c *controller) ConvertHTTPRoute(convertOptions *common.ConvertOptions, wra
}
wrapperHttpRoute.OriginPath = path
wrapperHttpRoute.HTTPRoute.Match = []*networking.HTTPMatchRequest{httpMatch}
wrapperHttpRoute.HTTPRoute.Name = common.GenerateUniqueRouteName(wrapperHttpRoute)
wrapperHttpRoute.HTTPRoute.Name = common.GenerateUniqueRouteName(c.options.SystemNamespace, wrapperHttpRoute)
ingressRouteBuilder := convertOptions.IngressRouteCache.New(wrapperHttpRoute)
@@ -750,7 +750,7 @@ func (c *controller) ApplyCanaryIngress(convertOptions *common.ConvertOptions, w
}
canary.OriginPath = path
canary.HTTPRoute.Match = []*networking.HTTPMatchRequest{httpMatch}
canary.HTTPRoute.Name = common.GenerateUniqueRouteName(canary)
canary.HTTPRoute.Name = common.GenerateUniqueRouteName(c.options.SystemNamespace, canary)
ingressRouteBuilder := convertOptions.IngressRouteCache.New(canary)
// backend service check
@@ -778,7 +778,7 @@ func (c *controller) ApplyCanaryIngress(convertOptions *common.ConvertOptions, w
if byHeader {
IngressLog.Debug("Insert canary route by header")
annotations.ApplyByHeader(canary.HTTPRoute, route.HTTPRoute, canary.WrapperConfig.AnnotationsConfig)
canary.HTTPRoute.Name = common.GenerateUniqueRouteName(canary)
canary.HTTPRoute.Name = common.GenerateUniqueRouteName(c.options.SystemNamespace, canary)
} else {
IngressLog.Debug("Merge canary route by weight")
if route.WeightTotal == 0 {
@@ -806,7 +806,7 @@ func (c *controller) ApplyCanaryIngress(convertOptions *common.ConvertOptions, w
convertOptions.HTTPRoutes[rule.Host] = routes
// Recreate route name.
ingressRouteBuilder.RouteName = common.GenerateUniqueRouteName(canary)
ingressRouteBuilder.RouteName = common.GenerateUniqueRouteName(c.options.SystemNamespace, canary)
convertOptions.IngressRouteCache.Add(ingressRouteBuilder)
} else {
convertOptions.IngressRouteCache.Update(targetRoute)
@@ -930,7 +930,7 @@ func (c *controller) createDefaultRoute(wrapper *common.WrapperConfig, backend *
OriginPathType: common.Prefix,
OriginPath: "/",
}
route.HTTPRoute.Name = common.GenerateUniqueRouteNameWithSuffix(route, "default")
route.HTTPRoute.Name = common.GenerateUniqueRouteNameWithSuffix(c.options.SystemNamespace, route, "default")
return route
}

View File

@@ -0,0 +1,46 @@
// Copyright (c) 2022 Alibaba Group Holding Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package wasmplugin
import (
"time"
"istio.io/istio/pkg/kube/controllers"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/cache"
v1 "github.com/alibaba/higress/client/pkg/apis/extensions/v1alpha1"
"github.com/alibaba/higress/client/pkg/clientset/versioned"
informersv1 "github.com/alibaba/higress/client/pkg/informers/externalversions/extensions/v1alpha1"
listersv1 "github.com/alibaba/higress/client/pkg/listers/extensions/v1alpha1"
"github.com/alibaba/higress/pkg/ingress/kube/controller"
kubeclient "github.com/alibaba/higress/pkg/kube"
)
type WasmPluginController controller.Controller[listersv1.WasmPluginLister]
func NewController(client kubeclient.Client, clusterId string) WasmPluginController {
informer := client.HigressInformer().InformerFor(&v1.WasmPlugin{}, func(k versioned.Interface, resync time.Duration) cache.SharedIndexInformer {
return informersv1.NewWasmPluginInformer(k, metav1.NamespaceAll, resync,
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
})
return controller.NewCommonController("wasmplugin", listersv1.NewWasmPluginLister(informer.GetIndexer()),
informer, GetWasmPlugin, clusterId)
}
func GetWasmPlugin(lister listersv1.WasmPluginLister, namespacedName types.NamespacedName) (controllers.Object, error) {
return lister.WasmPlugins(namespacedName.Namespace).Get(namespacedName.Name)
}