diff --git a/pkg/ingress/config/ingress_config.go b/pkg/ingress/config/ingress_config.go index b19241de1..7b6a2aa25 100644 --- a/pkg/ingress/config/ingress_config.go +++ b/pkg/ingress/config/ingress_config.go @@ -710,6 +710,7 @@ func (m *IngressConfig) convertIstioWasmPlugin(obj *higressext.WasmPlugin) (*ext if !obj.DefaultConfigDisable { result.PluginConfig = obj.DefaultConfig } + hasValidRule := false if len(obj.MatchRules) > 0 { if result.PluginConfig == nil { result.PluginConfig = &types.Struct{ @@ -769,14 +770,20 @@ func (m *IngressConfig) convertIstioWasmPlugin(obj *higressext.WasmPlugin) (*ext Kind: v, }) } - result.PluginConfig.Fields["_rules_"] = &types.Value{ - Kind: &types.Value_ListValue{ - ListValue: &types.ListValue{ - Values: ruleValues, + if len(ruleValues) > 0 { + hasValidRule = true + result.PluginConfig.Fields["_rules_"] = &types.Value{ + Kind: &types.Value_ListValue{ + ListValue: &types.ListValue{ + Values: ruleValues, + }, }, - }, + } } } + if !hasValidRule && obj.DefaultConfigDisable { + return nil, nil + } return result, nil } @@ -807,6 +814,14 @@ func (m *IngressConfig) AddOrUpdateWasmPlugin(clusterNamespacedName util.Cluster IngressLog.Errorf("invalid wasmPlugin:%s, err:%v", clusterNamespacedName.Name, err) return } + if istioWasmPlugin == nil { + IngressLog.Infof("wasmPlugin:%s will not be transferred to istio since config disabled", + clusterNamespacedName.Name) + m.mutex.Lock() + delete(m.wasmPlugins, clusterNamespacedName.Name) + m.mutex.Unlock() + return + } IngressLog.Debugf("wasmPlugin:%s convert to istioWasmPlugin:%v", clusterNamespacedName.Name, istioWasmPlugin) m.mutex.Lock() m.wasmPlugins[clusterNamespacedName.Name] = istioWasmPlugin diff --git a/registry/direct/watcher.go b/registry/direct/watcher.go new file mode 100644 index 000000000..ed3e3f9a9 --- /dev/null +++ b/registry/direct/watcher.go @@ -0,0 +1,164 @@ +// Copyright (c) 2022 Alibaba Group Holding Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package direct + +import ( + "net" + "regexp" + "strconv" + "strings" + "sync" + + "istio.io/api/networking/v1alpha3" + "istio.io/istio/pkg/config/protocol" + "istio.io/pkg/log" + + apiv1 "github.com/alibaba/higress/api/networking/v1" + "github.com/alibaba/higress/pkg/common" + "github.com/alibaba/higress/registry" + provider "github.com/alibaba/higress/registry" + "github.com/alibaba/higress/registry/memory" +) + +type watcher struct { + provider.BaseWatcher + apiv1.RegistryConfig + cache memory.Cache + mutex sync.Mutex +} + +type WatcherOption func(w *watcher) + +func NewWatcher(cache memory.Cache, opts ...WatcherOption) (provider.Watcher, error) { + w := &watcher{ + cache: cache, + } + for _, opt := range opts { + opt(w) + } + return w, nil +} + +func WithType(t string) WatcherOption { + return func(w *watcher) { + w.Type = t + } +} + +func WithName(name string) WatcherOption { + return func(w *watcher) { + w.Name = name + } +} + +func WithDomain(domain string) WatcherOption { + return func(w *watcher) { + w.Domain = domain + } +} + +func WithPort(port uint32) WatcherOption { + return func(w *watcher) { + w.Port = port + } +} + +func (w *watcher) Run() { + w.mutex.Lock() + defer w.mutex.Unlock() + host := strings.Join([]string{w.Name, w.Type}, common.DotSeparator) + serviceEntry := w.generateServiceEntry(host) + if serviceEntry != nil { + w.cache.UpdateServiceEntryWrapper(host, &memory.ServiceEntryWrapper{ + ServiceName: w.Name, + ServiceEntry: serviceEntry, + Suffix: w.Type, + RegistryType: w.Type, + }) + w.UpdateService() + } + w.Ready(true) +} + +func (w *watcher) Stop() { + w.mutex.Lock() + defer w.mutex.Unlock() + host := strings.Join([]string{w.Name, w.Type}, common.DotSeparator) + w.cache.DeleteServiceEntryWrapper(host) + w.Ready(false) +} + +var domainRegex = regexp.MustCompile(`^(?:[a-zA-Z0-9](?:[a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?\.)+[a-zA-Z]{2,6}$`) + +func (w *watcher) generateServiceEntry(host string) *v1alpha3.ServiceEntry { + endpoints := make([]*v1alpha3.WorkloadEntry, 0) + for _, ep := range strings.Split(w.Domain, common.CommaSeparator) { + var endpoint *v1alpha3.WorkloadEntry + if w.Type == string(registry.Static) { + pair := strings.Split(ep, common.ColonSeparator) + if len(pair) != 2 { + log.Errorf("invalid endpoint:%s with static type", ep) + return nil + } + port, err := strconv.ParseUint(pair[1], 10, 32) + if err != nil { + log.Errorf("invalid port:%s of endpoint:%s", pair[1], ep) + return nil + } + if net.ParseIP(pair[0]) == nil { + log.Errorf("invalid ip:%s of endpoint:%s", pair[0], ep) + return nil + } + endpoint = &v1alpha3.WorkloadEntry{ + Address: pair[0], + Ports: map[string]uint32{"http": uint32(port)}, + } + } else if w.Type == string(registry.DNS) { + if !domainRegex.MatchString(ep) { + log.Errorf("invalid domain format:%s", ep) + return nil + } + endpoint = &v1alpha3.WorkloadEntry{ + Address: ep, + } + } else { + log.Errorf("unknown direct service type:%s", w.Type) + return nil + } + endpoints = append(endpoints, endpoint) + } + if len(endpoints) == 0 { + log.Errorf("empty endpoints will not be pushed, host:%s", host) + return nil + } + var ports []*v1alpha3.Port + ports = append(ports, &v1alpha3.Port{ + Number: w.Port, + Name: "http", + Protocol: string(protocol.HTTP), + }) + se := &v1alpha3.ServiceEntry{ + Hosts: []string{host}, + Ports: ports, + Location: v1alpha3.ServiceEntry_MESH_INTERNAL, + Endpoints: endpoints, + } + if w.Type == string(registry.Static) { + se.Resolution = v1alpha3.ServiceEntry_STATIC + } else if w.Type == string(registry.DNS) { + se.Resolution = v1alpha3.ServiceEntry_DNS + } + return se +} diff --git a/registry/memory/cache.go b/registry/memory/cache.go index 346a4105c..b8ed2a373 100644 --- a/registry/memory/cache.go +++ b/registry/memory/cache.go @@ -29,6 +29,7 @@ import ( type Cache interface { UpdateServiceEntryWrapper(service string, data *ServiceEntryWrapper) DeleteServiceEntryWrapper(service string) + PurgeStaleService() UpdateServiceEntryEnpointWrapper(service, ip, regionId, zoneId, protocol string, labels map[string]string) GetServiceByEndpoints(requestVersions, endpoints map[string]bool, versionKey string, protocol common.Protocol) map[string][]string GetAllServiceEntry() []*v1alpha3.ServiceEntry @@ -39,20 +40,22 @@ type Cache interface { func NewCache() Cache { return &store{ - mux: &sync.RWMutex{}, - sew: make(map[string]*ServiceEntryWrapper), - toBeUpdated: make([]*ServiceEntryWrapper, 0), - toBeDeleted: make([]*ServiceEntryWrapper, 0), - ip2services: make(map[string]map[string]bool), + mux: &sync.RWMutex{}, + sew: make(map[string]*ServiceEntryWrapper), + toBeUpdated: make([]*ServiceEntryWrapper, 0), + toBeDeleted: make([]*ServiceEntryWrapper, 0), + ip2services: make(map[string]map[string]bool), + deferedDelete: make(map[string]struct{}), } } type store struct { - mux *sync.RWMutex - sew map[string]*ServiceEntryWrapper - toBeUpdated []*ServiceEntryWrapper - toBeDeleted []*ServiceEntryWrapper - ip2services map[string]map[string]bool + mux *sync.RWMutex + sew map[string]*ServiceEntryWrapper + toBeUpdated []*ServiceEntryWrapper + toBeDeleted []*ServiceEntryWrapper + ip2services map[string]map[string]bool + deferedDelete map[string]struct{} } func (s *store) UpdateServiceEntryEnpointWrapper(service, ip, regionId, zoneId, protocol string, labels map[string]string) { @@ -105,6 +108,12 @@ func (s *store) UpdateServiceEntryWrapper(service string, data *ServiceEntryWrap s.toBeUpdated = append(s.toBeUpdated, data) s.sew[service] = data + // service is updated, should not be deleted + if _, ok := s.deferedDelete[service]; ok { + delete(s.deferedDelete, service) + log.Debugf("service in deferedDelete updated, host:%s", service) + } + log.Infof("ServiceEntry updated, host:%s", service) } func (s *store) DeleteServiceEntryWrapper(service string) { @@ -114,7 +123,18 @@ func (s *store) DeleteServiceEntryWrapper(service string) { if data, exist := s.sew[service]; exist { s.toBeDeleted = append(s.toBeDeleted, data) } - delete(s.sew, service) + s.deferedDelete[service] = struct{}{} +} + +// should only be called when reconcile is done +func (s *store) PurgeStaleService() { + s.mux.Lock() + defer s.mux.Unlock() + for service := range s.deferedDelete { + delete(s.sew, service) + delete(s.deferedDelete, service) + log.Infof("ServiceEntry deleted, host:%s", service) + } } // GetServiceByEndpoints get the list of services of which "address:port" contained by the endpoints diff --git a/registry/nacos/v2/watcher.go b/registry/nacos/v2/watcher.go index 88a3765f5..be76e3a5e 100644 --- a/registry/nacos/v2/watcher.go +++ b/registry/nacos/v2/watcher.go @@ -60,8 +60,6 @@ type watcher struct { RegistryType provider.ServiceRegistryType `json:"registry_type"` Status provider.WatcherStatus `json:"status"` namingClient naming_client.INamingClient - updateHandler provider.ServiceUpdateHandler - readyHandler provider.ReadyHandler cache memory.Cache mutex *sync.Mutex stop chan struct{} @@ -234,7 +232,7 @@ func (w *watcher) Run() { if err != nil { log.Errorf("first fetch services failed, err:%v", err) } else { - w.readyHandler(true) + w.Ready(true) } for { select { @@ -243,7 +241,7 @@ func (w *watcher) Run() { if err != nil { log.Errorf("fetch services failed, err:%v", err) } else { - w.readyHandler(true) + w.Ready(true) } case <-w.stop: return @@ -399,7 +397,7 @@ func (w *watcher) getSubscribeCallback(groupName string, serviceName string) fun host := strings.Join([]string{serviceName, suffix}, common.DotSeparator) return func(services []model.Instance, err error) { - defer w.updateHandler() + defer w.UpdateService() //log.Info("callback", "serviceName", serviceName, "suffix", suffix, "details", services) @@ -484,8 +482,8 @@ func (w *watcher) Stop() { } w.isStop = true - w.stop <- struct{}{} - w.readyHandler(false) + close(w.stop) + w.Ready(false) } func (w *watcher) IsHealthy() bool { @@ -496,14 +494,6 @@ func (w *watcher) GetRegistryType() string { return w.RegistryType.String() } -func (w *watcher) AppendServiceUpdateHandler(f func()) { - w.updateHandler = f -} - -func (w *watcher) ReadyHandler(f func(bool)) { - w.readyHandler = f -} - func shouldSubscribe(serviceName string) bool { prefixFilters := []string{"consumers:"} fullFilters := []string{""} diff --git a/registry/nacos/watcher.go b/registry/nacos/watcher.go index a473ba01d..82506ec74 100644 --- a/registry/nacos/watcher.go +++ b/registry/nacos/watcher.go @@ -58,8 +58,6 @@ type watcher struct { RegistryType provider.ServiceRegistryType `json:"registry_type"` Status provider.WatcherStatus `json:"status"` namingClient naming_client.INamingClient - updateHandler provider.ServiceUpdateHandler - readyHandler provider.ReadyHandler cache memory.Cache mutex *sync.Mutex stop chan struct{} @@ -200,7 +198,7 @@ func (w *watcher) Run() { defer ticker.Stop() w.Status = provider.ProbeWatcherStatus(w.Domain, strconv.FormatUint(uint64(w.Port), 10)) w.fetchAllServices() - w.readyHandler(true) + w.Ready(true) for { select { case <-ticker.C: @@ -218,7 +216,6 @@ func (w *watcher) fetchAllServices() error { return nil } fetchedServices := make(map[string]bool) - for _, groupName := range w.NacosGroups { for page := 1; ; page++ { ss, err := w.namingClient.GetAllServicesInfo(vo.GetAllServiceInfoParam{ @@ -305,7 +302,7 @@ func (w *watcher) getSubscribeCallback(groupName string, serviceName string) fun host := strings.Join([]string{serviceName, suffix}, common.DotSeparator) return func(services []model.SubscribeService, err error) { - defer w.updateHandler() + defer w.UpdateService() //log.Info("callback", "serviceName", serviceName, "suffix", suffix, "details", services) @@ -388,8 +385,8 @@ func (w *watcher) Stop() { w.cache.DeleteServiceEntryWrapper(host) } w.isStop = true - w.stop <- struct{}{} - w.readyHandler(false) + close(w.stop) + w.Ready(false) } func (w *watcher) IsHealthy() bool { @@ -400,14 +397,6 @@ func (w *watcher) GetRegistryType() string { return w.RegistryType.String() } -func (w *watcher) AppendServiceUpdateHandler(f func()) { - w.updateHandler = f -} - -func (w *watcher) ReadyHandler(f func(bool)) { - w.readyHandler = f -} - func shouldSubscribe(serviceName string) bool { prefixFilters := []string{"consumers:"} fullFilters := []string{""} diff --git a/registry/reconcile/reconcile.go b/registry/reconcile/reconcile.go index dab5e590b..a4e214ef7 100644 --- a/registry/reconcile/reconcile.go +++ b/registry/reconcile/reconcile.go @@ -25,6 +25,7 @@ import ( apiv1 "github.com/alibaba/higress/api/networking/v1" v1 "github.com/alibaba/higress/client/pkg/apis/networking/v1" . "github.com/alibaba/higress/registry" + "github.com/alibaba/higress/registry/direct" "github.com/alibaba/higress/registry/memory" "github.com/alibaba/higress/registry/nacos" nacosv2 "github.com/alibaba/higress/registry/nacos/v2" @@ -77,6 +78,26 @@ func (r *Reconciler) Reconcile(mcpbridge *v1.McpBridge) { errHappened := false log.Infof("ReconcileRegistries, toBeCreated: %d, toBeUpdated: %d, toBeDeleted: %d", len(toBeCreated), len(toBeUpdated), len(toBeDeleted)) + for k := range toBeDeleted { + r.watchers[k].Stop() + delete(r.registries, k) + delete(r.watchers, k) + } + for k, v := range toBeUpdated { + r.watchers[k].Stop() + delete(r.registries, k) + delete(r.watchers, k) + watcher, err := r.generateWatcherFromRegistryConfig(v, &wg) + if err != nil { + errHappened = true + log.Errorf("ReconcileRegistries failed, err:%v", err) + continue + } + + go watcher.Run() + r.watchers[k] = watcher + r.registries[k] = v + } for k, v := range toBeCreated { watcher, err := r.generateWatcherFromRegistryConfig(v, &wg) if err != nil { @@ -89,31 +110,12 @@ func (r *Reconciler) Reconcile(mcpbridge *v1.McpBridge) { r.watchers[k] = watcher r.registries[k] = v } - for k, v := range toBeUpdated { - go r.watchers[k].Stop() - delete(r.registries, k) - delete(r.watchers, k) - watcher, err := r.generateWatcherFromRegistryConfig(v, &wg) - if err != nil { - errHappened = true - log.Errorf("ReconcileRegistries failed, err:%v", err) - continue - } - - go watcher.Run() - r.watchers[k] = watcher - r.registries[k] = v - } - for k := range toBeDeleted { - go r.watchers[k].Stop() - delete(r.registries, k) - delete(r.watchers, k) - } if errHappened { log.Error("ReconcileRegistries failed, Init Watchers failed") return } wg.Wait() + r.Cache.PurgeStaleService() log.Infof("Registries is reconciled") } @@ -158,6 +160,14 @@ func (r *Reconciler) generateWatcherFromRegistryConfig(registry *apiv1.RegistryC zookeeper.WithPort(registry.Port), zookeeper.WithZkServicesPath(registry.ZkServicesPath), ) + case string(Static), string(DNS): + watcher, err = direct.NewWatcher( + r.Cache, + direct.WithType(registry.Type), + direct.WithName(registry.Name), + direct.WithDomain(registry.Domain), + direct.WithPort(registry.Port), + ) default: return nil, errors.New("unsupported registry type:" + registry.Type) } @@ -172,7 +182,7 @@ func (r *Reconciler) generateWatcherFromRegistryConfig(registry *apiv1.RegistryC once.Do(func() { wg.Done() if ready { - log.Infof("Registry Watcher is ready, type:%s, name:%s", registry.Type, registry.Name) + log.Infof("Registry Watcher is ready, type:%s, name:%s", registry.Type, registry.Name) } }) }) diff --git a/registry/watcher.go b/registry/watcher.go index 942b15909..129ec0ec9 100644 --- a/registry/watcher.go +++ b/registry/watcher.go @@ -25,6 +25,8 @@ const ( Consul ServiceRegistryType = "consul" Nacos ServiceRegistryType = "nacos" Nacos2 ServiceRegistryType = "nacos2" + Static ServiceRegistryType = "static" + DNS ServiceRegistryType = "dns" Healthy WatcherStatus = "healthy" UnHealthy WatcherStatus = "unhealthy" @@ -52,14 +54,21 @@ type Watcher interface { ReadyHandler(f func(bool)) } -type BaseWatcher struct{} +type BaseWatcher struct { + UpdateService ServiceUpdateHandler + Ready ReadyHandler +} -func (w *BaseWatcher) Run() {} -func (w *BaseWatcher) Stop() {} -func (w *BaseWatcher) IsHealthy() bool { return true } -func (w *BaseWatcher) GetRegistryType() string { return "" } -func (w *BaseWatcher) AppendServiceUpdateHandler(f func()) {} -func (w *BaseWatcher) ReadyHandler(f func(bool)) {} +func (w *BaseWatcher) Run() {} +func (w *BaseWatcher) Stop() {} +func (w *BaseWatcher) IsHealthy() bool { return true } +func (w *BaseWatcher) GetRegistryType() string { return "" } +func (w *BaseWatcher) AppendServiceUpdateHandler(f func()) { + w.UpdateService = f +} +func (w *BaseWatcher) ReadyHandler(f func(bool)) { + w.Ready = f +} type ServiceUpdateHandler func() type ReadyHandler func(bool) diff --git a/registry/zookeeper/watcher.go b/registry/zookeeper/watcher.go index 60a59d94b..c9921cdf4 100644 --- a/registry/zookeeper/watcher.go +++ b/registry/zookeeper/watcher.go @@ -50,8 +50,6 @@ type watcher struct { RegistryType provider.ServiceRegistryType `json:"registry_type"` Status provider.WatcherStatus `json:"status"` serviceRemaind *atomic.Int32 - updateHandler provider.ServiceUpdateHandler - readyHandler provider.ReadyHandler cache memory.Cache mutex *sync.Mutex stop chan struct{} @@ -339,7 +337,7 @@ func (w *watcher) DataChange(eventType Event) bool { Suffix: "zookeeper", RegistryType: w.Type, }) - w.updateHandler() + w.UpdateService() } else if eventType.Action == EventTypeDel { w.seMux.Lock() value, ok := w.serviceEntry[host] @@ -370,7 +368,7 @@ func (w *watcher) DataChange(eventType Event) bool { RegistryType: w.Type, }) } - w.updateHandler() + w.UpdateService() } return true } @@ -580,7 +578,7 @@ func (w *watcher) ChildToServiceEntry(children []string, interfaceName, zkPath s } } w.seMux.Unlock() - w.updateHandler() + w.UpdateService() } } @@ -681,7 +679,7 @@ func (w *watcher) Run() { case <-ticker.C: var needNewFetch bool if w.IsReady() { - w.readyHandler(true) + w.Ready(true) needNewFetch = true } if firstFetchErr != nil || needNewFetch { @@ -712,15 +710,13 @@ func (w *watcher) Stop() { for key := range w.serviceEntry { w.cache.DeleteServiceEntryWrapper(key) } - w.updateHandler() + w.UpdateService() w.seMux.Unlock() - w.stop <- struct{}{} - w.Done <- struct{}{} close(w.stop) close(w.Done) w.zkClient.Close() - w.readyHandler(false) + w.Ready(false) } func (w *watcher) IsHealthy() bool { @@ -731,14 +727,6 @@ func (w *watcher) GetRegistryType() string { return w.RegistryType.String() } -func (w *watcher) AppendServiceUpdateHandler(f func()) { - w.updateHandler = f -} - -func (w *watcher) ReadyHandler(f func(bool)) { - w.readyHandler = f -} - func (w *watcher) IsReady() bool { if w.serviceRemaind == nil { return true