upgrade to istio 1.19 (#1211)

Co-authored-by: CH3CHO <ch3cho@qq.com>
Co-authored-by: rinfx <893383980@qq.com>
This commit is contained in:
澄潭
2024-08-26 09:51:47 +08:00
committed by GitHub
parent a2c2d1d521
commit f7a419770d
401 changed files with 21171 additions and 7255 deletions

View File

@@ -27,23 +27,24 @@ import (
wasm "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/http/wasm/v3"
httppb "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/network/http_connection_manager/v3"
v3 "github.com/envoyproxy/go-control-plane/envoy/extensions/wasm/v3"
"github.com/gogo/protobuf/jsonpb"
"github.com/gogo/protobuf/types"
"github.com/golang/protobuf/jsonpb"
_struct "github.com/golang/protobuf/ptypes/struct"
"github.com/golang/protobuf/ptypes/wrappers"
"go.uber.org/atomic"
"google.golang.org/protobuf/types/known/anypb"
extensions "istio.io/api/extensions/v1alpha1"
networking "istio.io/api/networking/v1alpha3"
istiotype "istio.io/api/type/v1beta1"
"istio.io/istio/pilot/pkg/model"
networkingutil "istio.io/istio/pilot/pkg/networking/util"
"istio.io/istio/pilot/pkg/util/sets"
istiomodel "istio.io/istio/pilot/pkg/model"
"istio.io/istio/pilot/pkg/util/protoconv"
"istio.io/istio/pkg/cluster"
"istio.io/istio/pkg/config"
"istio.io/istio/pkg/config/constants"
"istio.io/istio/pkg/config/schema/collection"
"istio.io/istio/pkg/config/schema/gvk"
kerrors "k8s.io/apimachinery/pkg/api/errors"
ktypes "k8s.io/apimachinery/pkg/types"
"istio.io/istio/pkg/config/schema/kind"
"istio.io/istio/pkg/log"
"istio.io/istio/pkg/util/sets"
v1 "k8s.io/api/core/v1"
listersv1 "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
@@ -55,6 +56,7 @@ import (
"github.com/alibaba/higress/pkg/ingress/kube/annotations"
"github.com/alibaba/higress/pkg/ingress/kube/common"
"github.com/alibaba/higress/pkg/ingress/kube/configmap"
"github.com/alibaba/higress/pkg/ingress/kube/gateway"
"github.com/alibaba/higress/pkg/ingress/kube/http2rpc"
"github.com/alibaba/higress/pkg/ingress/kube/ingress"
"github.com/alibaba/higress/pkg/ingress/kube/ingressv1"
@@ -69,9 +71,9 @@ import (
)
var (
_ model.ConfigStoreCache = &IngressConfig{}
_ model.IngressStore = &IngressConfig{}
Http2RpcMethodMap = func() map[string]string {
_ istiomodel.ConfigStoreController = &IngressConfig{}
_ istiomodel.IngressStore = &IngressConfig{}
Http2RpcMethodMap = func() map[string]string {
return map[string]string{
"GET": "ALL_GET",
"POST": "ALL_POST",
@@ -95,31 +97,29 @@ const (
)
type IngressConfig struct {
// key: cluster id
remoteIngressControllers map[string]common.IngressController
remoteIngressControllers map[cluster.ID]common.IngressController
remoteGatewayControllers map[cluster.ID]common.GatewayController
mutex sync.RWMutex
ingressRouteCache model.IngressRouteCollection
ingressDomainCache model.IngressDomainCollection
ingressRouteCache istiomodel.IngressRouteCollection
ingressDomainCache istiomodel.IngressDomainCollection
localKubeClient kube.Client
virtualServiceHandlers []model.EventHandler
gatewayHandlers []model.EventHandler
destinationRuleHandlers []model.EventHandler
envoyFilterHandlers []model.EventHandler
serviceEntryHandlers []model.EventHandler
wasmPluginHandlers []model.EventHandler
virtualServiceHandlers []istiomodel.EventHandler
gatewayHandlers []istiomodel.EventHandler
destinationRuleHandlers []istiomodel.EventHandler
envoyFilterHandlers []istiomodel.EventHandler
serviceEntryHandlers []istiomodel.EventHandler
wasmPluginHandlers []istiomodel.EventHandler
watchErrorHandler cache.WatchErrorHandler
cachedEnvoyFilters []config.Config
watchedSecretSet sets.Set
watchedSecretSet sets.Set[string]
RegistryReconciler *reconcile.Reconciler
mcpbridgeReconciled *atomic.Bool
mcpbridgeController mcpbridge.McpBridgeController
mcpbridgeLister netlisterv1.McpBridgeLister
@@ -138,30 +138,33 @@ type IngressConfig struct {
configmapMgr *configmap.ConfigmapMgr
XDSUpdater model.XDSUpdater
XDSUpdater istiomodel.XDSUpdater
annotationHandler annotations.AnnotationHandler
globalGatewayName string
namespace string
clusterId string
clusterId cluster.ID
httpsConfigMgr *cert.ConfigMgr
}
func NewIngressConfig(localKubeClient kube.Client, XDSUpdater model.XDSUpdater, namespace, clusterId string) *IngressConfig {
func NewIngressConfig(localKubeClient kube.Client, xdsUpdater istiomodel.XDSUpdater, namespace string, clusterId cluster.ID) *IngressConfig {
if clusterId == "Kubernetes" {
clusterId = ""
}
config := &IngressConfig{
remoteIngressControllers: make(map[string]common.IngressController),
remoteIngressControllers: make(map[cluster.ID]common.IngressController),
remoteGatewayControllers: make(map[cluster.ID]common.GatewayController),
localKubeClient: localKubeClient,
XDSUpdater: XDSUpdater,
XDSUpdater: xdsUpdater,
annotationHandler: annotations.NewAnnotationHandlerManager(),
clusterId: clusterId,
watchedSecretSet: sets.NewSet(),
globalGatewayName: namespace + "/" + common.CreateConvertedName(clusterId.String(), "global"),
watchedSecretSet: sets.New[string](),
namespace: namespace,
mcpbridgeReconciled: atomic.NewBool(false),
wasmPlugins: make(map[string]*extensions.WasmPlugin),
http2rpcs: make(map[string]*higressv1.Http2Rpc),
}
@@ -181,15 +184,15 @@ func NewIngressConfig(localKubeClient kube.Client, XDSUpdater model.XDSUpdater,
config.http2rpcLister = http2rpcController.Lister()
higressConfigController := configmap.NewController(localKubeClient, clusterId, namespace)
config.configmapMgr = configmap.NewConfigmapMgr(XDSUpdater, namespace, higressConfigController, higressConfigController.Lister())
config.configmapMgr = configmap.NewConfigmapMgr(xdsUpdater, namespace, higressConfigController, higressConfigController.Lister())
httpsConfigMgr, _ := cert.NewConfigMgr(namespace, localKubeClient)
httpsConfigMgr, _ := cert.NewConfigMgr(namespace, localKubeClient.Kube())
config.httpsConfigMgr = httpsConfigMgr
return config
}
func (m *IngressConfig) RegisterEventHandler(kind config.GroupVersionKind, f model.EventHandler) {
func (m *IngressConfig) RegisterEventHandler(kind config.GroupVersionKind, f istiomodel.EventHandler) {
IngressLog.Infof("register resource %v", kind)
switch kind {
case gvk.VirtualService:
@@ -214,9 +217,12 @@ func (m *IngressConfig) RegisterEventHandler(kind config.GroupVersionKind, f mod
for _, remoteIngressController := range m.remoteIngressControllers {
remoteIngressController.RegisterEventHandler(kind, f)
}
for _, remoteGatewayController := range m.remoteGatewayControllers {
remoteGatewayController.RegisterEventHandler(kind, f)
}
}
func (m *IngressConfig) AddLocalCluster(options common.Options) common.IngressController {
func (m *IngressConfig) AddLocalCluster(options common.Options) {
secretController := secret.NewController(m.localKubeClient, options.ClusterId)
secretController.AddEventHandler(m.ReflectSecretChanges)
@@ -227,32 +233,38 @@ func (m *IngressConfig) AddLocalCluster(options common.Options) common.IngressCo
} else {
ingressController = ingressv1.NewController(m.localKubeClient, m.localKubeClient, options, secretController)
}
m.remoteIngressControllers[options.ClusterId] = ingressController
return ingressController
m.remoteGatewayControllers[options.ClusterId] = gateway.NewController(m.localKubeClient, options)
}
func (m *IngressConfig) InitializeCluster(ingressController common.IngressController, stop <-chan struct{}) error {
_ = ingressController.SetWatchErrorHandler(m.watchErrorHandler)
go ingressController.Run(stop)
return nil
}
func (m *IngressConfig) List(typ config.GroupVersionKind, namespace string) ([]config.Config, error) {
func (m *IngressConfig) List(typ config.GroupVersionKind, namespace string) []config.Config {
if typ != gvk.Gateway &&
typ != gvk.VirtualService &&
typ != gvk.DestinationRule &&
typ != gvk.EnvoyFilter &&
typ != gvk.ServiceEntry &&
typ != gvk.WasmPlugin {
return nil, common.ErrUnsupportedOp
return nil
}
var configs = make([]config.Config, 0)
if configsFromIngress := m.listFromIngressControllers(typ, namespace); configsFromIngress != nil {
configs = append(configs, configsFromIngress...)
}
if configsFromGateway := m.listFromGatewayControllers(typ, namespace); configsFromGateway != nil {
configs = append(configs, configsFromGateway...)
}
return configs
}
func (m *IngressConfig) listFromIngressControllers(typ config.GroupVersionKind, namespace string) []config.Config {
// Currently, only support list all namespaces gateways or virtualservices.
if namespace != "" {
IngressLog.Warnf("ingress store only support type %s of all namespace.", typ)
return nil, common.ErrUnsupportedOp
IngressLog.Warnf("ingress store only support type %s of all namespace, request namespace: %s", typ, namespace)
return nil
}
if typ == gvk.EnvoyFilter {
@@ -271,11 +283,11 @@ func (m *IngressConfig) List(typ config.GroupVersionKind, namespace string) ([]c
}
if len(envoyFilters) == 0 {
IngressLog.Infof("resource type %s, configs number %d", typ, len(m.cachedEnvoyFilters))
return m.cachedEnvoyFilters, nil
return m.cachedEnvoyFilters
}
envoyFilters = append(envoyFilters, m.cachedEnvoyFilters...)
IngressLog.Infof("resource type %s, configs number %d", typ, len(envoyFilters))
return envoyFilters, nil
return envoyFilters
}
var configs []config.Config
@@ -291,26 +303,36 @@ func (m *IngressConfig) List(typ config.GroupVersionKind, namespace string) ([]c
IngressLog.Infof("resource type %s, configs number %d", typ, len(wrapperConfigs))
switch typ {
case gvk.Gateway:
return m.convertGateways(wrapperConfigs), nil
return m.convertGateways(wrapperConfigs)
case gvk.VirtualService:
return m.convertVirtualService(wrapperConfigs), nil
return m.convertVirtualService(wrapperConfigs)
case gvk.DestinationRule:
return m.convertDestinationRule(wrapperConfigs), nil
return m.convertDestinationRule(wrapperConfigs)
case gvk.ServiceEntry:
return m.convertServiceEntry(wrapperConfigs), nil
return m.convertServiceEntry(wrapperConfigs)
case gvk.WasmPlugin:
return m.convertWasmPlugin(wrapperConfigs), nil
return m.convertWasmPlugin(wrapperConfigs)
}
return nil, nil
return nil
}
func (m *IngressConfig) listFromGatewayControllers(typ config.GroupVersionKind, namespace string) []config.Config {
var configs []config.Config
for _, gatewayController := range m.remoteGatewayControllers {
if clusterConfigs := gatewayController.List(typ, namespace); clusterConfigs != nil {
configs = append(configs, clusterConfigs...)
}
}
return configs
}
func (m *IngressConfig) createWrapperConfigs(configs []config.Config) []common.WrapperConfig {
var wrapperConfigs []common.WrapperConfig
// Init global context
clusterSecretListers := map[string]listersv1.SecretLister{}
clusterServiceListers := map[string]listersv1.ServiceLister{}
clusterSecretListers := map[cluster.ID]listersv1.SecretLister{}
clusterServiceListers := map[cluster.ID]listersv1.ServiceLister{}
m.mutex.RLock()
for clusterId, controller := range m.remoteIngressControllers {
clusterSecretListers[clusterId] = controller.SecretLister()
@@ -318,7 +340,7 @@ func (m *IngressConfig) createWrapperConfigs(configs []config.Config) []common.W
}
m.mutex.RUnlock()
globalContext := &annotations.GlobalContext{
WatchedSecrets: sets.NewSet(),
WatchedSecrets: sets.New[string](),
ClusterSecretLister: clusterSecretListers,
ClusterServiceList: clusterServiceListers,
}
@@ -389,7 +411,7 @@ func (m *IngressConfig) convertGateways(configs []common.WrapperConfig) []config
Name: common.CreateConvertedName(constants.IstioIngressGatewayName, cleanHost),
Namespace: m.namespace,
Annotations: map[string]string{
common.ClusterIdAnnotation: gateway.ClusterId,
common.ClusterIdAnnotation: gateway.ClusterId.String(),
common.HostAnnotation: gateway.Host,
},
},
@@ -483,7 +505,7 @@ func (m *IngressConfig) convertVirtualService(configs []common.WrapperConfig) []
cleanHost := common.CleanHost(host)
// namespace/name, name format: (istio cluster id)-host
gateways := []string{m.namespace + "/" +
common.CreateConvertedName(m.clusterId, cleanHost),
common.CreateConvertedName(m.clusterId.String(), cleanHost),
common.CreateConvertedName(constants.IstioIngressGatewayName, cleanHost)}
wrapperVS, exist := convertOptions.VirtualServices[host]
@@ -507,7 +529,7 @@ func (m *IngressConfig) convertVirtualService(configs []common.WrapperConfig) []
Name: common.CreateConvertedName(constants.IstioIngressGatewayName, firstRoute.WrapperConfig.Config.Namespace, firstRoute.WrapperConfig.Config.Name, cleanHost),
Namespace: m.namespace,
Annotations: map[string]string{
common.ClusterIdAnnotation: firstRoute.ClusterId,
common.ClusterIdAnnotation: firstRoute.ClusterId.String(),
},
},
Spec: vs,
@@ -810,9 +832,18 @@ func (m *IngressConfig) convertIstioWasmPlugin(obj *higressext.WasmPlugin) (*ext
PluginConfig: obj.PluginConfig,
PluginName: obj.PluginName,
Phase: extensions.PluginPhase(obj.Phase),
FailStrategy: extensions.FailStrategy(obj.FailStrategy),
Priority: obj.Priority,
}
if obj.GetPriority() != nil {
result.Priority = &types.Int64Value{Value: int64(obj.GetPriority().Value)}
if obj.VmConfig != nil {
result.VmConfig = &extensions.VmConfig{}
for _, env := range obj.VmConfig.Env {
result.VmConfig.Env = append(result.VmConfig.Env, &extensions.EnvVar{
Name: env.Name,
ValueFrom: extensions.EnvValueSource(env.ValueFrom),
Value: env.Value,
})
}
}
if result.PluginConfig != nil {
return result, nil
@@ -823,70 +854,71 @@ func (m *IngressConfig) convertIstioWasmPlugin(obj *higressext.WasmPlugin) (*ext
hasValidRule := false
if len(obj.MatchRules) > 0 {
if result.PluginConfig == nil {
result.PluginConfig = &types.Struct{
Fields: map[string]*types.Value{},
result.PluginConfig = &_struct.Struct{
Fields: map[string]*_struct.Value{},
}
}
var ruleValues []*types.Value
var ruleValues []*_struct.Value
for _, rule := range obj.MatchRules {
if rule.ConfigDisable {
continue
}
if rule.Config == nil {
rule.Config = &types.Struct{
Fields: map[string]*types.Value{},
rule.Config = &_struct.Struct{
Fields: map[string]*_struct.Value{},
}
}
v := &types.Value_StructValue{
v := &_struct.Value_StructValue{
StructValue: rule.Config,
}
var matchItems []*types.Value
var matchItems []*_struct.Value
// match ingress
for _, ing := range rule.Ingress {
matchItems = append(matchItems, &types.Value{
Kind: &types.Value_StringValue{
matchItems = append(matchItems, &_struct.Value{
Kind: &_struct.Value_StringValue{
StringValue: ing,
},
})
}
if len(matchItems) > 0 {
v.StructValue.Fields["_match_route_"] = &types.Value{
Kind: &types.Value_ListValue{
ListValue: &types.ListValue{
v.StructValue.Fields["_match_route_"] = &_struct.Value{
Kind: &_struct.Value_ListValue{
ListValue: &_struct.ListValue{
Values: matchItems,
},
},
}
ruleValues = append(ruleValues, &types.Value{
ruleValues = append(ruleValues, &_struct.Value{
Kind: v,
})
continue
}
// match domain
for _, domain := range rule.Domain {
matchItems = append(matchItems, &types.Value{
Kind: &types.Value_StringValue{
matchItems = append(matchItems, &_struct.Value{
Kind: &_struct.Value_StringValue{
StringValue: domain,
},
})
}
if len(matchItems) > 0 {
v.StructValue.Fields["_match_domain_"] = &types.Value{
Kind: &types.Value_ListValue{
ListValue: &types.ListValue{
v.StructValue.Fields["_match_domain_"] = &_struct.Value{
Kind: &_struct.Value_ListValue{
ListValue: &_struct.ListValue{
Values: matchItems,
},
},
}
ruleValues = append(ruleValues, &types.Value{
ruleValues = append(ruleValues, &_struct.Value{
Kind: v,
})
continue
}
// match service
for _, service := range rule.Service {
matchItems = append(matchItems, &types.Value{
Kind: &types.Value_StringValue{
matchItems = append(matchItems, &_struct.Value{
Kind: &_struct.Value_StringValue{
StringValue: service,
},
})
@@ -894,22 +926,22 @@ func (m *IngressConfig) convertIstioWasmPlugin(obj *higressext.WasmPlugin) (*ext
if len(matchItems) == 0 {
return nil, fmt.Errorf("invalid match rule has no match condition, rule:%v", rule)
}
v.StructValue.Fields["_match_service_"] = &types.Value{
Kind: &types.Value_ListValue{
ListValue: &types.ListValue{
v.StructValue.Fields["_match_service_"] = &_struct.Value{
Kind: &_struct.Value_ListValue{
ListValue: &_struct.ListValue{
Values: matchItems,
},
},
}
ruleValues = append(ruleValues, &types.Value{
ruleValues = append(ruleValues, &_struct.Value{
Kind: v,
})
}
if len(ruleValues) > 0 {
hasValidRule = true
result.PluginConfig.Fields["_rules_"] = &types.Value{
Kind: &types.Value_ListValue{
ListValue: &types.ListValue{
result.PluginConfig.Fields["_rules_"] = &_struct.Value{
Kind: &_struct.Value_ListValue{
ListValue: &_struct.ListValue{
Values: ruleValues,
},
},
@@ -941,8 +973,8 @@ func (m *IngressConfig) AddOrUpdateWasmPlugin(clusterNamespacedName util.Cluster
Labels: map[string]string{constants.AlwaysPushLabel: "true"},
}
for _, f := range m.wasmPluginHandlers {
IngressLog.Debug("WasmPlugin triggered update")
f(config.Config{Meta: metadata}, config.Config{Meta: metadata}, model.EventUpdate)
IngressLog.Debug("WasmPlugin triggerd update")
f(config.Config{Meta: metadata}, config.Config{Meta: metadata}, istiomodel.EventUpdate)
}
istioWasmPlugin, err := m.convertIstioWasmPlugin(&wasmPlugin.Spec)
if err != nil {
@@ -983,8 +1015,8 @@ func (m *IngressConfig) DeleteWasmPlugin(clusterNamespacedName util.ClusterNames
Labels: map[string]string{constants.AlwaysPushLabel: "true"},
}
for _, f := range m.wasmPluginHandlers {
IngressLog.Debug("WasmPlugin triggered update")
f(config.Config{Meta: metadata}, config.Config{Meta: metadata}, model.EventDelete)
IngressLog.Debug("WasmPlugin triggerd update")
f(config.Config{Meta: metadata}, config.Config{Meta: metadata}, istiomodel.EventDelete)
}
}
}
@@ -1010,8 +1042,8 @@ func (m *IngressConfig) AddOrUpdateMcpBridge(clusterNamespacedName util.ClusterN
Labels: map[string]string{constants.AlwaysPushLabel: "true"},
}
for _, f := range m.serviceEntryHandlers {
IngressLog.Debug("McpBridge triggered serviceEntry update")
f(config.Config{Meta: metadata}, config.Config{Meta: metadata}, model.EventUpdate)
IngressLog.Debug("McpBridge triggerd serviceEntry update")
f(config.Config{Meta: metadata}, config.Config{Meta: metadata}, istiomodel.EventUpdate)
}
}, m.localKubeClient, m.namespace)
}
@@ -1021,7 +1053,7 @@ func (m *IngressConfig) AddOrUpdateMcpBridge(clusterNamespacedName util.ClusterN
IngressLog.Errorf("Mcpbridge reconcile failed, err:%v", err)
return
}
m.mcpbridgeReconciled.Store(true)
IngressLog.Info("Mcpbridge reconciled")
}
func (m *IngressConfig) DeleteMcpBridge(clusterNamespacedName util.ClusterNamespacedName) {
@@ -1049,15 +1081,15 @@ func (m *IngressConfig) AddOrUpdateHttp2Rpc(clusterNamespacedName util.ClusterNa
m.http2rpcs[clusterNamespacedName.Name] = &http2rpc.Spec
m.mutex.Unlock()
IngressLog.Infof("AddOrUpdateHttp2Rpc http2rpc ingress name %s", clusterNamespacedName.Name)
push := func(kind config.GroupVersionKind) {
m.XDSUpdater.ConfigUpdate(&model.PushRequest{
push := func(gvk config.GroupVersionKind) {
m.XDSUpdater.ConfigUpdate(&istiomodel.PushRequest{
Full: true,
ConfigsUpdated: map[model.ConfigKey]struct{}{{
Kind: kind,
ConfigsUpdated: map[istiomodel.ConfigKey]struct{}{{
Kind: kind.MustFromGVK(gvk),
Name: clusterNamespacedName.Name,
Namespace: clusterNamespacedName.Namespace,
}: {}},
Reason: []model.TriggerReason{"Http2Rpc-AddOrUpdate"},
Reason: istiomodel.NewReasonStats("Http2Rpc-AddOrUpdate"),
})
}
push(gvk.VirtualService)
@@ -1077,16 +1109,16 @@ func (m *IngressConfig) DeleteHttp2Rpc(clusterNamespacedName util.ClusterNamespa
}
m.mutex.Unlock()
if hit {
IngressLog.Infof("Http2Rpc triggered deleted event executed %s", clusterNamespacedName.Name)
push := func(kind config.GroupVersionKind) {
m.XDSUpdater.ConfigUpdate(&model.PushRequest{
IngressLog.Infof("Http2Rpc triggerd deleted event executed %s", clusterNamespacedName.Name)
push := func(gvk config.GroupVersionKind) {
m.XDSUpdater.ConfigUpdate(&istiomodel.PushRequest{
Full: true,
ConfigsUpdated: map[model.ConfigKey]struct{}{{
Kind: kind,
ConfigsUpdated: map[istiomodel.ConfigKey]struct{}{{
Kind: kind.MustFromGVK(gvk),
Name: clusterNamespacedName.Name,
Namespace: clusterNamespacedName.Namespace,
}: {}},
Reason: []model.TriggerReason{"Http2Rpc-Deleted"},
Reason: istiomodel.NewReasonStats("Http2Rpc-Deleted"),
})
}
push(gvk.VirtualService)
@@ -1103,15 +1135,15 @@ func (m *IngressConfig) ReflectSecretChanges(clusterNamespacedName util.ClusterN
m.mutex.RUnlock()
if hit {
push := func(kind config.GroupVersionKind) {
m.XDSUpdater.ConfigUpdate(&model.PushRequest{
push := func(gvk config.GroupVersionKind) {
m.XDSUpdater.ConfigUpdate(&istiomodel.PushRequest{
Full: true,
ConfigsUpdated: map[model.ConfigKey]struct{}{{
Kind: kind,
ConfigsUpdated: map[istiomodel.ConfigKey]struct{}{{
Kind: kind.MustFromGVK(gvk),
Name: clusterNamespacedName.Name,
Namespace: clusterNamespacedName.Namespace,
}: {}},
Reason: []model.TriggerReason{"auth-secret-change"},
Reason: istiomodel.NewReasonStats("auth-secret-change"),
})
}
push(gvk.VirtualService)
@@ -1284,7 +1316,7 @@ func (m *IngressConfig) constructHttp2RpcEnvoyFilter(http2rpcConfig *annotations
}, nil
}
func (m *IngressConfig) constructHttp2RpcMethods(dubbo *higressv1.DubboService) (*types.Struct, error) {
func (m *IngressConfig) constructHttp2RpcMethods(dubbo *higressv1.DubboService) (*_struct.Struct, error) {
httpRouterTemplate := `{
"route": {
"upgrade_configs": [
@@ -1365,9 +1397,12 @@ func (m *IngressConfig) constructHttp2RpcMethods(dubbo *higressv1.DubboService)
return result, nil
}
func buildPatchStruct(config string) *types.Struct {
val := &types.Struct{}
_ = jsonpb.Unmarshal(strings.NewReader(config), val)
func buildPatchStruct(config string) *_struct.Struct {
val := &_struct.Struct{}
err := jsonpb.Unmarshal(strings.NewReader(config), val)
if err != nil {
log.Errorf("jsonpb unmarshal failed: %s", config)
}
return val
}
@@ -1398,7 +1433,7 @@ func constructBasicAuthEnvoyFilter(rules *common.BasicAuthRules, namespace strin
},
},
},
Configuration: networkingutil.MessageToAny(configuration),
Configuration: protoconv.MessageToAny(configuration),
},
}
@@ -1414,7 +1449,7 @@ func constructBasicAuthEnvoyFilter(rules *common.BasicAuthRules, namespace strin
},
}
gogoTypedConfig, err := util.MessageToGoGoStruct(typedConfig)
pbTypedConfig, err := util.MessageToStruct(typedConfig)
if err != nil {
return nil, err
}
@@ -1446,7 +1481,7 @@ func constructBasicAuthEnvoyFilter(rules *common.BasicAuthRules, namespace strin
},
Patch: &networking.EnvoyFilter_Patch{
Operation: networking.EnvoyFilter_Patch_INSERT_AFTER,
Value: gogoTypedConfig,
Value: pbTypedConfig,
},
},
},
@@ -1479,6 +1514,14 @@ func QueryRpcServiceVersion(serviceEntry *memory.ServiceEntryWrapper, serviceNam
}
func (m *IngressConfig) Run(stop <-chan struct{}) {
for _, remoteIngressController := range m.remoteIngressControllers {
_ = remoteIngressController.SetWatchErrorHandler(m.watchErrorHandler)
go remoteIngressController.Run(stop)
}
for _, remoteGatewayController := range m.remoteGatewayControllers {
_ = remoteGatewayController.SetWatchErrorHandler(m.watchErrorHandler)
go remoteGatewayController.Run(stop)
}
go m.mcpbridgeController.Run(stop)
go m.wasmPluginController.Run(stop)
go m.http2rpcController.Run(stop)
@@ -1493,22 +1536,14 @@ func (m *IngressConfig) HasSynced() bool {
return false
}
}
if !m.mcpbridgeController.HasSynced() {
return false
} else {
_, err := m.mcpbridgeController.Get(ktypes.NamespacedName{
Namespace: m.namespace,
Name: DefaultMcpbridgeName,
})
if err != nil {
if !kerrors.IsNotFound(err) {
return false
}
// mcpbridge exist
} else if !m.mcpbridgeReconciled.Load() {
for _, remoteGatewayController := range m.remoteGatewayControllers {
if !remoteGatewayController.HasSynced() {
return false
}
}
if !m.mcpbridgeController.HasSynced() {
return false
}
if !m.wasmPluginController.HasSynced() {
return false
}
@@ -1527,18 +1562,30 @@ func (m *IngressConfig) SetWatchErrorHandler(f func(r *cache.Reflector, err erro
return nil
}
func (m *IngressConfig) GetIngressRoutes() model.IngressRouteCollection {
func (m *IngressConfig) GetIngressRoutes() istiomodel.IngressRouteCollection {
m.mutex.RLock()
defer m.mutex.RUnlock()
return m.ingressRouteCache
}
func (m *IngressConfig) GetIngressDomains() model.IngressDomainCollection {
func (m *IngressConfig) GetIngressDomains() istiomodel.IngressDomainCollection {
m.mutex.RLock()
defer m.mutex.RUnlock()
return m.ingressDomainCache
}
func (m *IngressConfig) CheckIngress(clusterName string) istiomodel.CheckIngressResponse {
return istiomodel.CheckIngressResponse{}
}
func (m *IngressConfig) Services(clusterName string) ([]*v1.Service, error) {
return nil, nil
}
func (m *IngressConfig) IngressControllers() map[string]string {
return nil
}
func (m *IngressConfig) Schemas() collection.Schemas {
return common.IngressIR
}

View File

@@ -21,6 +21,7 @@ import (
"github.com/stretchr/testify/assert"
"google.golang.org/protobuf/proto"
networking "istio.io/api/networking/v1alpha3"
"istio.io/istio/pkg/cluster"
"istio.io/istio/pkg/config"
"istio.io/istio/pkg/config/schema/gvk"
"istio.io/istio/pkg/config/xds"
@@ -127,7 +128,7 @@ func TestConvertGatewaysForIngress(t *testing.T) {
ingressV1Beta1Controller := controllerv1beta1.NewController(fake, fake, v1Beta1Options, nil)
ingressV1Controller := controllerv1.NewController(fake, fake, v1Options, nil)
m := NewIngressConfig(fake, nil, "wakanda", "gw-123-istio")
m.remoteIngressControllers = map[string]common.IngressController{
m.remoteIngressControllers = map[cluster.ID]common.IngressController{
"ingress-v1beta1": ingressV1Beta1Controller,
"ingress-v1": ingressV1Controller,
}

View File

@@ -18,12 +18,15 @@ import (
"sync"
networking "istio.io/api/networking/v1alpha3"
"istio.io/istio/pilot/pkg/model"
"istio.io/istio/pilot/pkg/util/sets"
istiomodel "istio.io/istio/pilot/pkg/model"
"istio.io/istio/pkg/cluster"
"istio.io/istio/pkg/config"
"istio.io/istio/pkg/config/constants"
"istio.io/istio/pkg/config/schema/collection"
"istio.io/istio/pkg/config/schema/gvk"
"istio.io/istio/pkg/config/schema/kind"
"istio.io/istio/pkg/util/sets"
v1 "k8s.io/api/core/v1"
listersv1 "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
@@ -38,40 +41,41 @@ import (
)
var (
_ model.ConfigStoreCache = &KIngressConfig{}
_ model.IngressStore = &KIngressConfig{}
_ istiomodel.ConfigStoreController = &KIngressConfig{}
_ istiomodel.IngressStore = &KIngressConfig{}
)
type KIngressConfig struct {
// key: cluster id
remoteIngressControllers map[string]common.KIngressController
remoteIngressControllers map[cluster.ID]common.KIngressController
mutex sync.RWMutex
ingressRouteCache model.IngressRouteCollection
ingressDomainCache model.IngressDomainCollection
ingressRouteCache istiomodel.IngressRouteCollection
ingressDomainCache istiomodel.IngressDomainCollection
localKubeClient kube.Client
virtualServiceHandlers []model.EventHandler
gatewayHandlers []model.EventHandler
envoyFilterHandlers []model.EventHandler
WatchErrorHandler cache.WatchErrorHandler
virtualServiceHandlers []istiomodel.EventHandler
gatewayHandlers []istiomodel.EventHandler
envoyFilterHandlers []istiomodel.EventHandler
watchErrorHandler cache.WatchErrorHandler
cachedEnvoyFilters []config.Config
watchedSecretSet sets.Set
watchedSecretSet sets.Set[string]
RegistryReconciler *reconcile.Reconciler
XDSUpdater model.XDSUpdater
XDSUpdater istiomodel.XDSUpdater
annotationHandler annotations.AnnotationHandler
globalGatewayName string
namespace string
clusterId string
clusterId cluster.ID
}
func NewKIngressConfig(localKubeClient kube.Client, XDSUpdater model.XDSUpdater, namespace, clusterId string) *KIngressConfig {
func NewKIngressConfig(localKubeClient kube.Client, XDSUpdater istiomodel.XDSUpdater, namespace string, clusterId cluster.ID) *KIngressConfig {
if localKubeClient.KIngressInformer() == nil {
return nil
}
@@ -79,19 +83,19 @@ func NewKIngressConfig(localKubeClient kube.Client, XDSUpdater model.XDSUpdater,
clusterId = ""
}
config := &KIngressConfig{
remoteIngressControllers: make(map[string]common.KIngressController),
remoteIngressControllers: make(map[cluster.ID]common.KIngressController),
localKubeClient: localKubeClient,
XDSUpdater: XDSUpdater,
annotationHandler: annotations.NewAnnotationHandlerManager(),
clusterId: clusterId,
watchedSecretSet: sets.NewSet(),
globalGatewayName: namespace + "/" + common.CreateConvertedName(clusterId.String(), "global"),
watchedSecretSet: sets.New[string](),
namespace: namespace,
}
return config
}
func (m *KIngressConfig) RegisterEventHandler(kind config.GroupVersionKind, f model.EventHandler) {
func (m *KIngressConfig) RegisterEventHandler(kind config.GroupVersionKind, f istiomodel.EventHandler) {
IngressLog.Infof("register resource %v", kind)
switch kind {
case gvk.VirtualService:
@@ -121,24 +125,18 @@ func (m *KIngressConfig) AddLocalCluster(options common.Options) common.KIngress
return ingressController
}
func (m *KIngressConfig) InitializeCluster(ingressController common.KIngressController, stop <-chan struct{}) error {
_ = ingressController.SetWatchErrorHandler(m.WatchErrorHandler)
go ingressController.Run(stop)
return nil
}
func (m *KIngressConfig) List(typ config.GroupVersionKind, namespace string) ([]config.Config, error) {
func (m *KIngressConfig) List(typ config.GroupVersionKind, namespace string) []config.Config {
if typ == gvk.EnvoyFilter || typ == gvk.DestinationRule || typ == gvk.WasmPlugin || typ == gvk.ServiceEntry {
return nil, nil
return nil
}
if typ != gvk.Gateway && typ != gvk.VirtualService {
return nil, common.ErrUnsupportedOp
return nil
}
// Currently, only support list all namespaces gateways or virtualservices.
if namespace != "" {
IngressLog.Warnf("ingress store only support type %s of all namespace.", typ)
return nil, common.ErrUnsupportedOp
return nil
}
var configs []config.Config
@@ -154,19 +152,19 @@ func (m *KIngressConfig) List(typ config.GroupVersionKind, namespace string) ([]
IngressLog.Infof("resource type %s, configs number %d", typ, len(wrapperConfigs))
switch typ {
case gvk.Gateway:
return m.convertGateways(wrapperConfigs), nil
return m.convertGateways(wrapperConfigs)
case gvk.VirtualService:
return m.convertVirtualService(wrapperConfigs), nil
return m.convertVirtualService(wrapperConfigs)
}
return nil, nil
return nil
}
func (m *KIngressConfig) createWrapperConfigs(configs []config.Config) []common.WrapperConfig {
var wrapperConfigs []common.WrapperConfig
// Init global context
clusterSecretListers := map[string]listersv1.SecretLister{}
clusterServiceListers := map[string]listersv1.ServiceLister{}
clusterSecretListers := map[cluster.ID]listersv1.SecretLister{}
clusterServiceListers := map[cluster.ID]listersv1.ServiceLister{}
m.mutex.RLock()
for clusterId, controller := range m.remoteIngressControllers {
clusterSecretListers[clusterId] = controller.SecretLister()
@@ -174,7 +172,7 @@ func (m *KIngressConfig) createWrapperConfigs(configs []config.Config) []common.
}
m.mutex.RUnlock()
globalContext := &annotations.GlobalContext{
WatchedSecrets: sets.NewSet(),
WatchedSecrets: sets.New[string](),
ClusterSecretLister: clusterSecretListers,
ClusterServiceList: clusterServiceListers,
}
@@ -240,7 +238,7 @@ func (m *KIngressConfig) convertGateways(configs []common.WrapperConfig) []confi
Name: common.CreateConvertedName(constants.IstioIngressGatewayName, cleanHost),
Namespace: m.namespace,
Annotations: map[string]string{
common.ClusterIdAnnotation: gateway.ClusterId,
common.ClusterIdAnnotation: gateway.ClusterId.String(),
common.HostAnnotation: gateway.Host,
},
},
@@ -312,7 +310,7 @@ func (m *KIngressConfig) convertVirtualService(configs []common.WrapperConfig) [
cleanHost := common.CleanHost(host)
// namespace/name, name format: (istio cluster id)-host
gateways := []string{m.namespace + "/" +
common.CreateConvertedName(m.clusterId, cleanHost),
common.CreateConvertedName(m.clusterId.String(), cleanHost),
common.CreateConvertedName(constants.IstioIngressGatewayName, cleanHost)}
wrapperVS, exist := convertOptions.VirtualServices[host]
@@ -333,7 +331,7 @@ func (m *KIngressConfig) convertVirtualService(configs []common.WrapperConfig) [
Name: common.CreateConvertedName(constants.IstioIngressGatewayName, firstRoute.WrapperConfig.Config.Namespace, firstRoute.WrapperConfig.Config.Name, cleanHost),
Namespace: m.namespace,
Annotations: map[string]string{
common.ClusterIdAnnotation: firstRoute.ClusterId,
common.ClusterIdAnnotation: firstRoute.ClusterId.String(),
},
},
Spec: vs,
@@ -466,15 +464,15 @@ func (m *KIngressConfig) ReflectSecretChanges(clusterNamespacedName util.Cluster
m.mutex.RUnlock()
if hit {
push := func(kind config.GroupVersionKind) {
m.XDSUpdater.ConfigUpdate(&model.PushRequest{
push := func(gvk config.GroupVersionKind) {
m.XDSUpdater.ConfigUpdate(&istiomodel.PushRequest{
Full: true,
ConfigsUpdated: map[model.ConfigKey]struct{}{{
Kind: kind,
ConfigsUpdated: map[istiomodel.ConfigKey]struct{}{{
Kind: kind.MustFromGVK(gvk),
Name: clusterNamespacedName.Name,
Namespace: clusterNamespacedName.Namespace,
}: {}},
Reason: []model.TriggerReason{"auth-secret-change"},
Reason: istiomodel.NewReasonStats("auth-secret-change"),
})
}
push(gvk.VirtualService)
@@ -482,7 +480,12 @@ func (m *KIngressConfig) ReflectSecretChanges(clusterNamespacedName util.Cluster
}
}
func (m *KIngressConfig) Run(stop <-chan struct{}) {}
func (m *KIngressConfig) Run(stop <-chan struct{}) {
for _, remoteIngressController := range m.remoteIngressControllers {
_ = remoteIngressController.SetWatchErrorHandler(m.watchErrorHandler)
go remoteIngressController.Run(stop)
}
}
func (m *KIngressConfig) HasSynced() bool {
IngressLog.Info("In Kingress Synced.")
@@ -500,22 +503,34 @@ func (m *KIngressConfig) HasSynced() bool {
}
func (m *KIngressConfig) SetWatchErrorHandler(f func(r *cache.Reflector, err error)) error {
m.WatchErrorHandler = f
m.watchErrorHandler = f
return nil
}
func (m *KIngressConfig) GetIngressRoutes() model.IngressRouteCollection {
func (m *KIngressConfig) GetIngressRoutes() istiomodel.IngressRouteCollection {
m.mutex.RLock()
defer m.mutex.RUnlock()
return m.ingressRouteCache
}
func (m *KIngressConfig) GetIngressDomains() model.IngressDomainCollection {
func (m *KIngressConfig) GetIngressDomains() istiomodel.IngressDomainCollection {
m.mutex.RLock()
defer m.mutex.RUnlock()
return m.ingressDomainCache
}
func (m *KIngressConfig) CheckIngress(clusterName string) istiomodel.CheckIngressResponse {
return istiomodel.CheckIngressResponse{}
}
func (m *KIngressConfig) Services(clusterName string) ([]*v1.Service, error) {
return nil, nil
}
func (m *KIngressConfig) IngressControllers() map[string]string {
return nil
}
func (m *KIngressConfig) Schemas() collection.Schemas {
return common.IngressIR
}

View File

@@ -18,8 +18,10 @@ import (
"fmt"
"testing"
"github.com/stretchr/testify/assert"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
networking "istio.io/api/networking/v1alpha3"
"istio.io/istio/pkg/cluster"
"istio.io/istio/pkg/config"
"istio.io/istio/pkg/config/schema/gvk"
"k8s.io/apimachinery/pkg/util/intstr"
@@ -117,7 +119,7 @@ func TestConvertGatewaysForKIngress(t *testing.T) {
}
kingressV1Controller := kcontrollerv1.NewController(fake, fake, v1Options, nil)
m := NewKIngressConfig(fake, nil, "wakanda", "gw-123-istio")
m.remoteIngressControllers = map[string]common.KIngressController{
m.remoteIngressControllers = map[cluster.ID]common.KIngressController{
"kingress": kingressV1Controller,
}
@@ -465,6 +467,13 @@ func TestConvertGatewaysForKIngress(t *testing.T) {
},
}
unexportedIgnoredTypes := []interface{}{
networking.Gateway{},
networking.Server{},
networking.Port{},
networking.ServerTLSSettings{},
}
for _, testCase := range testCases {
t.Run(testCase.name, func(t *testing.T) {
result := m.convertGateways(testCase.inputConfig)
@@ -473,9 +482,14 @@ func TestConvertGatewaysForKIngress(t *testing.T) {
for _, item := range result {
host := common.GetHost(item.Annotations)
fmt.Print(item)
//assert.Equal(t, testCase.expect[host], item)
target[host] = item
//break
}
//assert.Equal(t, testCase.expect, target)
if diff := cmp.Diff(target, testCase.expect, cmpopts.IgnoreUnexported(unexportedIgnoredTypes...)); diff != "" {
t.Errorf("convertGateways() mismatch (-want +got):\n%s", diff)
}
assert.Equal(t, testCase.expect, target)
})
}
}