mirror of
https://github.com/alibaba/higress.git
synced 2026-06-24 09:45:16 +08:00
refactor registry & add direct service discovery through staticIP or DNS (#261)
This commit is contained in:
@@ -710,6 +710,7 @@ func (m *IngressConfig) convertIstioWasmPlugin(obj *higressext.WasmPlugin) (*ext
|
|||||||
if !obj.DefaultConfigDisable {
|
if !obj.DefaultConfigDisable {
|
||||||
result.PluginConfig = obj.DefaultConfig
|
result.PluginConfig = obj.DefaultConfig
|
||||||
}
|
}
|
||||||
|
hasValidRule := false
|
||||||
if len(obj.MatchRules) > 0 {
|
if len(obj.MatchRules) > 0 {
|
||||||
if result.PluginConfig == nil {
|
if result.PluginConfig == nil {
|
||||||
result.PluginConfig = &types.Struct{
|
result.PluginConfig = &types.Struct{
|
||||||
@@ -769,14 +770,20 @@ func (m *IngressConfig) convertIstioWasmPlugin(obj *higressext.WasmPlugin) (*ext
|
|||||||
Kind: v,
|
Kind: v,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
result.PluginConfig.Fields["_rules_"] = &types.Value{
|
if len(ruleValues) > 0 {
|
||||||
Kind: &types.Value_ListValue{
|
hasValidRule = true
|
||||||
ListValue: &types.ListValue{
|
result.PluginConfig.Fields["_rules_"] = &types.Value{
|
||||||
Values: ruleValues,
|
Kind: &types.Value_ListValue{
|
||||||
|
ListValue: &types.ListValue{
|
||||||
|
Values: ruleValues,
|
||||||
|
},
|
||||||
},
|
},
|
||||||
},
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if !hasValidRule && obj.DefaultConfigDisable {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
return result, nil
|
return result, nil
|
||||||
|
|
||||||
}
|
}
|
||||||
@@ -807,6 +814,14 @@ func (m *IngressConfig) AddOrUpdateWasmPlugin(clusterNamespacedName util.Cluster
|
|||||||
IngressLog.Errorf("invalid wasmPlugin:%s, err:%v", clusterNamespacedName.Name, err)
|
IngressLog.Errorf("invalid wasmPlugin:%s, err:%v", clusterNamespacedName.Name, err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
if istioWasmPlugin == nil {
|
||||||
|
IngressLog.Infof("wasmPlugin:%s will not be transferred to istio since config disabled",
|
||||||
|
clusterNamespacedName.Name)
|
||||||
|
m.mutex.Lock()
|
||||||
|
delete(m.wasmPlugins, clusterNamespacedName.Name)
|
||||||
|
m.mutex.Unlock()
|
||||||
|
return
|
||||||
|
}
|
||||||
IngressLog.Debugf("wasmPlugin:%s convert to istioWasmPlugin:%v", clusterNamespacedName.Name, istioWasmPlugin)
|
IngressLog.Debugf("wasmPlugin:%s convert to istioWasmPlugin:%v", clusterNamespacedName.Name, istioWasmPlugin)
|
||||||
m.mutex.Lock()
|
m.mutex.Lock()
|
||||||
m.wasmPlugins[clusterNamespacedName.Name] = istioWasmPlugin
|
m.wasmPlugins[clusterNamespacedName.Name] = istioWasmPlugin
|
||||||
|
|||||||
164
registry/direct/watcher.go
Normal file
164
registry/direct/watcher.go
Normal file
@@ -0,0 +1,164 @@
|
|||||||
|
// Copyright (c) 2022 Alibaba Group Holding Ltd.
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
package direct
|
||||||
|
|
||||||
|
import (
|
||||||
|
"net"
|
||||||
|
"regexp"
|
||||||
|
"strconv"
|
||||||
|
"strings"
|
||||||
|
"sync"
|
||||||
|
|
||||||
|
"istio.io/api/networking/v1alpha3"
|
||||||
|
"istio.io/istio/pkg/config/protocol"
|
||||||
|
"istio.io/pkg/log"
|
||||||
|
|
||||||
|
apiv1 "github.com/alibaba/higress/api/networking/v1"
|
||||||
|
"github.com/alibaba/higress/pkg/common"
|
||||||
|
"github.com/alibaba/higress/registry"
|
||||||
|
provider "github.com/alibaba/higress/registry"
|
||||||
|
"github.com/alibaba/higress/registry/memory"
|
||||||
|
)
|
||||||
|
|
||||||
|
type watcher struct {
|
||||||
|
provider.BaseWatcher
|
||||||
|
apiv1.RegistryConfig
|
||||||
|
cache memory.Cache
|
||||||
|
mutex sync.Mutex
|
||||||
|
}
|
||||||
|
|
||||||
|
type WatcherOption func(w *watcher)
|
||||||
|
|
||||||
|
func NewWatcher(cache memory.Cache, opts ...WatcherOption) (provider.Watcher, error) {
|
||||||
|
w := &watcher{
|
||||||
|
cache: cache,
|
||||||
|
}
|
||||||
|
for _, opt := range opts {
|
||||||
|
opt(w)
|
||||||
|
}
|
||||||
|
return w, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func WithType(t string) WatcherOption {
|
||||||
|
return func(w *watcher) {
|
||||||
|
w.Type = t
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func WithName(name string) WatcherOption {
|
||||||
|
return func(w *watcher) {
|
||||||
|
w.Name = name
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func WithDomain(domain string) WatcherOption {
|
||||||
|
return func(w *watcher) {
|
||||||
|
w.Domain = domain
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func WithPort(port uint32) WatcherOption {
|
||||||
|
return func(w *watcher) {
|
||||||
|
w.Port = port
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *watcher) Run() {
|
||||||
|
w.mutex.Lock()
|
||||||
|
defer w.mutex.Unlock()
|
||||||
|
host := strings.Join([]string{w.Name, w.Type}, common.DotSeparator)
|
||||||
|
serviceEntry := w.generateServiceEntry(host)
|
||||||
|
if serviceEntry != nil {
|
||||||
|
w.cache.UpdateServiceEntryWrapper(host, &memory.ServiceEntryWrapper{
|
||||||
|
ServiceName: w.Name,
|
||||||
|
ServiceEntry: serviceEntry,
|
||||||
|
Suffix: w.Type,
|
||||||
|
RegistryType: w.Type,
|
||||||
|
})
|
||||||
|
w.UpdateService()
|
||||||
|
}
|
||||||
|
w.Ready(true)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *watcher) Stop() {
|
||||||
|
w.mutex.Lock()
|
||||||
|
defer w.mutex.Unlock()
|
||||||
|
host := strings.Join([]string{w.Name, w.Type}, common.DotSeparator)
|
||||||
|
w.cache.DeleteServiceEntryWrapper(host)
|
||||||
|
w.Ready(false)
|
||||||
|
}
|
||||||
|
|
||||||
|
var domainRegex = regexp.MustCompile(`^(?:[a-zA-Z0-9](?:[a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?\.)+[a-zA-Z]{2,6}$`)
|
||||||
|
|
||||||
|
func (w *watcher) generateServiceEntry(host string) *v1alpha3.ServiceEntry {
|
||||||
|
endpoints := make([]*v1alpha3.WorkloadEntry, 0)
|
||||||
|
for _, ep := range strings.Split(w.Domain, common.CommaSeparator) {
|
||||||
|
var endpoint *v1alpha3.WorkloadEntry
|
||||||
|
if w.Type == string(registry.Static) {
|
||||||
|
pair := strings.Split(ep, common.ColonSeparator)
|
||||||
|
if len(pair) != 2 {
|
||||||
|
log.Errorf("invalid endpoint:%s with static type", ep)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
port, err := strconv.ParseUint(pair[1], 10, 32)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("invalid port:%s of endpoint:%s", pair[1], ep)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
if net.ParseIP(pair[0]) == nil {
|
||||||
|
log.Errorf("invalid ip:%s of endpoint:%s", pair[0], ep)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
endpoint = &v1alpha3.WorkloadEntry{
|
||||||
|
Address: pair[0],
|
||||||
|
Ports: map[string]uint32{"http": uint32(port)},
|
||||||
|
}
|
||||||
|
} else if w.Type == string(registry.DNS) {
|
||||||
|
if !domainRegex.MatchString(ep) {
|
||||||
|
log.Errorf("invalid domain format:%s", ep)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
endpoint = &v1alpha3.WorkloadEntry{
|
||||||
|
Address: ep,
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
log.Errorf("unknown direct service type:%s", w.Type)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
endpoints = append(endpoints, endpoint)
|
||||||
|
}
|
||||||
|
if len(endpoints) == 0 {
|
||||||
|
log.Errorf("empty endpoints will not be pushed, host:%s", host)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
var ports []*v1alpha3.Port
|
||||||
|
ports = append(ports, &v1alpha3.Port{
|
||||||
|
Number: w.Port,
|
||||||
|
Name: "http",
|
||||||
|
Protocol: string(protocol.HTTP),
|
||||||
|
})
|
||||||
|
se := &v1alpha3.ServiceEntry{
|
||||||
|
Hosts: []string{host},
|
||||||
|
Ports: ports,
|
||||||
|
Location: v1alpha3.ServiceEntry_MESH_INTERNAL,
|
||||||
|
Endpoints: endpoints,
|
||||||
|
}
|
||||||
|
if w.Type == string(registry.Static) {
|
||||||
|
se.Resolution = v1alpha3.ServiceEntry_STATIC
|
||||||
|
} else if w.Type == string(registry.DNS) {
|
||||||
|
se.Resolution = v1alpha3.ServiceEntry_DNS
|
||||||
|
}
|
||||||
|
return se
|
||||||
|
}
|
||||||
@@ -29,6 +29,7 @@ import (
|
|||||||
type Cache interface {
|
type Cache interface {
|
||||||
UpdateServiceEntryWrapper(service string, data *ServiceEntryWrapper)
|
UpdateServiceEntryWrapper(service string, data *ServiceEntryWrapper)
|
||||||
DeleteServiceEntryWrapper(service string)
|
DeleteServiceEntryWrapper(service string)
|
||||||
|
PurgeStaleService()
|
||||||
UpdateServiceEntryEnpointWrapper(service, ip, regionId, zoneId, protocol string, labels map[string]string)
|
UpdateServiceEntryEnpointWrapper(service, ip, regionId, zoneId, protocol string, labels map[string]string)
|
||||||
GetServiceByEndpoints(requestVersions, endpoints map[string]bool, versionKey string, protocol common.Protocol) map[string][]string
|
GetServiceByEndpoints(requestVersions, endpoints map[string]bool, versionKey string, protocol common.Protocol) map[string][]string
|
||||||
GetAllServiceEntry() []*v1alpha3.ServiceEntry
|
GetAllServiceEntry() []*v1alpha3.ServiceEntry
|
||||||
@@ -39,20 +40,22 @@ type Cache interface {
|
|||||||
|
|
||||||
func NewCache() Cache {
|
func NewCache() Cache {
|
||||||
return &store{
|
return &store{
|
||||||
mux: &sync.RWMutex{},
|
mux: &sync.RWMutex{},
|
||||||
sew: make(map[string]*ServiceEntryWrapper),
|
sew: make(map[string]*ServiceEntryWrapper),
|
||||||
toBeUpdated: make([]*ServiceEntryWrapper, 0),
|
toBeUpdated: make([]*ServiceEntryWrapper, 0),
|
||||||
toBeDeleted: make([]*ServiceEntryWrapper, 0),
|
toBeDeleted: make([]*ServiceEntryWrapper, 0),
|
||||||
ip2services: make(map[string]map[string]bool),
|
ip2services: make(map[string]map[string]bool),
|
||||||
|
deferedDelete: make(map[string]struct{}),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
type store struct {
|
type store struct {
|
||||||
mux *sync.RWMutex
|
mux *sync.RWMutex
|
||||||
sew map[string]*ServiceEntryWrapper
|
sew map[string]*ServiceEntryWrapper
|
||||||
toBeUpdated []*ServiceEntryWrapper
|
toBeUpdated []*ServiceEntryWrapper
|
||||||
toBeDeleted []*ServiceEntryWrapper
|
toBeDeleted []*ServiceEntryWrapper
|
||||||
ip2services map[string]map[string]bool
|
ip2services map[string]map[string]bool
|
||||||
|
deferedDelete map[string]struct{}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *store) UpdateServiceEntryEnpointWrapper(service, ip, regionId, zoneId, protocol string, labels map[string]string) {
|
func (s *store) UpdateServiceEntryEnpointWrapper(service, ip, regionId, zoneId, protocol string, labels map[string]string) {
|
||||||
@@ -105,6 +108,12 @@ func (s *store) UpdateServiceEntryWrapper(service string, data *ServiceEntryWrap
|
|||||||
|
|
||||||
s.toBeUpdated = append(s.toBeUpdated, data)
|
s.toBeUpdated = append(s.toBeUpdated, data)
|
||||||
s.sew[service] = data
|
s.sew[service] = data
|
||||||
|
// service is updated, should not be deleted
|
||||||
|
if _, ok := s.deferedDelete[service]; ok {
|
||||||
|
delete(s.deferedDelete, service)
|
||||||
|
log.Debugf("service in deferedDelete updated, host:%s", service)
|
||||||
|
}
|
||||||
|
log.Infof("ServiceEntry updated, host:%s", service)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *store) DeleteServiceEntryWrapper(service string) {
|
func (s *store) DeleteServiceEntryWrapper(service string) {
|
||||||
@@ -114,7 +123,18 @@ func (s *store) DeleteServiceEntryWrapper(service string) {
|
|||||||
if data, exist := s.sew[service]; exist {
|
if data, exist := s.sew[service]; exist {
|
||||||
s.toBeDeleted = append(s.toBeDeleted, data)
|
s.toBeDeleted = append(s.toBeDeleted, data)
|
||||||
}
|
}
|
||||||
delete(s.sew, service)
|
s.deferedDelete[service] = struct{}{}
|
||||||
|
}
|
||||||
|
|
||||||
|
// should only be called when reconcile is done
|
||||||
|
func (s *store) PurgeStaleService() {
|
||||||
|
s.mux.Lock()
|
||||||
|
defer s.mux.Unlock()
|
||||||
|
for service := range s.deferedDelete {
|
||||||
|
delete(s.sew, service)
|
||||||
|
delete(s.deferedDelete, service)
|
||||||
|
log.Infof("ServiceEntry deleted, host:%s", service)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetServiceByEndpoints get the list of services of which "address:port" contained by the endpoints
|
// GetServiceByEndpoints get the list of services of which "address:port" contained by the endpoints
|
||||||
|
|||||||
@@ -60,8 +60,6 @@ type watcher struct {
|
|||||||
RegistryType provider.ServiceRegistryType `json:"registry_type"`
|
RegistryType provider.ServiceRegistryType `json:"registry_type"`
|
||||||
Status provider.WatcherStatus `json:"status"`
|
Status provider.WatcherStatus `json:"status"`
|
||||||
namingClient naming_client.INamingClient
|
namingClient naming_client.INamingClient
|
||||||
updateHandler provider.ServiceUpdateHandler
|
|
||||||
readyHandler provider.ReadyHandler
|
|
||||||
cache memory.Cache
|
cache memory.Cache
|
||||||
mutex *sync.Mutex
|
mutex *sync.Mutex
|
||||||
stop chan struct{}
|
stop chan struct{}
|
||||||
@@ -234,7 +232,7 @@ func (w *watcher) Run() {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
log.Errorf("first fetch services failed, err:%v", err)
|
log.Errorf("first fetch services failed, err:%v", err)
|
||||||
} else {
|
} else {
|
||||||
w.readyHandler(true)
|
w.Ready(true)
|
||||||
}
|
}
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
@@ -243,7 +241,7 @@ func (w *watcher) Run() {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
log.Errorf("fetch services failed, err:%v", err)
|
log.Errorf("fetch services failed, err:%v", err)
|
||||||
} else {
|
} else {
|
||||||
w.readyHandler(true)
|
w.Ready(true)
|
||||||
}
|
}
|
||||||
case <-w.stop:
|
case <-w.stop:
|
||||||
return
|
return
|
||||||
@@ -399,7 +397,7 @@ func (w *watcher) getSubscribeCallback(groupName string, serviceName string) fun
|
|||||||
host := strings.Join([]string{serviceName, suffix}, common.DotSeparator)
|
host := strings.Join([]string{serviceName, suffix}, common.DotSeparator)
|
||||||
|
|
||||||
return func(services []model.Instance, err error) {
|
return func(services []model.Instance, err error) {
|
||||||
defer w.updateHandler()
|
defer w.UpdateService()
|
||||||
|
|
||||||
//log.Info("callback", "serviceName", serviceName, "suffix", suffix, "details", services)
|
//log.Info("callback", "serviceName", serviceName, "suffix", suffix, "details", services)
|
||||||
|
|
||||||
@@ -484,8 +482,8 @@ func (w *watcher) Stop() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
w.isStop = true
|
w.isStop = true
|
||||||
w.stop <- struct{}{}
|
close(w.stop)
|
||||||
w.readyHandler(false)
|
w.Ready(false)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *watcher) IsHealthy() bool {
|
func (w *watcher) IsHealthy() bool {
|
||||||
@@ -496,14 +494,6 @@ func (w *watcher) GetRegistryType() string {
|
|||||||
return w.RegistryType.String()
|
return w.RegistryType.String()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *watcher) AppendServiceUpdateHandler(f func()) {
|
|
||||||
w.updateHandler = f
|
|
||||||
}
|
|
||||||
|
|
||||||
func (w *watcher) ReadyHandler(f func(bool)) {
|
|
||||||
w.readyHandler = f
|
|
||||||
}
|
|
||||||
|
|
||||||
func shouldSubscribe(serviceName string) bool {
|
func shouldSubscribe(serviceName string) bool {
|
||||||
prefixFilters := []string{"consumers:"}
|
prefixFilters := []string{"consumers:"}
|
||||||
fullFilters := []string{""}
|
fullFilters := []string{""}
|
||||||
|
|||||||
@@ -58,8 +58,6 @@ type watcher struct {
|
|||||||
RegistryType provider.ServiceRegistryType `json:"registry_type"`
|
RegistryType provider.ServiceRegistryType `json:"registry_type"`
|
||||||
Status provider.WatcherStatus `json:"status"`
|
Status provider.WatcherStatus `json:"status"`
|
||||||
namingClient naming_client.INamingClient
|
namingClient naming_client.INamingClient
|
||||||
updateHandler provider.ServiceUpdateHandler
|
|
||||||
readyHandler provider.ReadyHandler
|
|
||||||
cache memory.Cache
|
cache memory.Cache
|
||||||
mutex *sync.Mutex
|
mutex *sync.Mutex
|
||||||
stop chan struct{}
|
stop chan struct{}
|
||||||
@@ -200,7 +198,7 @@ func (w *watcher) Run() {
|
|||||||
defer ticker.Stop()
|
defer ticker.Stop()
|
||||||
w.Status = provider.ProbeWatcherStatus(w.Domain, strconv.FormatUint(uint64(w.Port), 10))
|
w.Status = provider.ProbeWatcherStatus(w.Domain, strconv.FormatUint(uint64(w.Port), 10))
|
||||||
w.fetchAllServices()
|
w.fetchAllServices()
|
||||||
w.readyHandler(true)
|
w.Ready(true)
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-ticker.C:
|
case <-ticker.C:
|
||||||
@@ -218,7 +216,6 @@ func (w *watcher) fetchAllServices() error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
fetchedServices := make(map[string]bool)
|
fetchedServices := make(map[string]bool)
|
||||||
|
|
||||||
for _, groupName := range w.NacosGroups {
|
for _, groupName := range w.NacosGroups {
|
||||||
for page := 1; ; page++ {
|
for page := 1; ; page++ {
|
||||||
ss, err := w.namingClient.GetAllServicesInfo(vo.GetAllServiceInfoParam{
|
ss, err := w.namingClient.GetAllServicesInfo(vo.GetAllServiceInfoParam{
|
||||||
@@ -305,7 +302,7 @@ func (w *watcher) getSubscribeCallback(groupName string, serviceName string) fun
|
|||||||
host := strings.Join([]string{serviceName, suffix}, common.DotSeparator)
|
host := strings.Join([]string{serviceName, suffix}, common.DotSeparator)
|
||||||
|
|
||||||
return func(services []model.SubscribeService, err error) {
|
return func(services []model.SubscribeService, err error) {
|
||||||
defer w.updateHandler()
|
defer w.UpdateService()
|
||||||
|
|
||||||
//log.Info("callback", "serviceName", serviceName, "suffix", suffix, "details", services)
|
//log.Info("callback", "serviceName", serviceName, "suffix", suffix, "details", services)
|
||||||
|
|
||||||
@@ -388,8 +385,8 @@ func (w *watcher) Stop() {
|
|||||||
w.cache.DeleteServiceEntryWrapper(host)
|
w.cache.DeleteServiceEntryWrapper(host)
|
||||||
}
|
}
|
||||||
w.isStop = true
|
w.isStop = true
|
||||||
w.stop <- struct{}{}
|
close(w.stop)
|
||||||
w.readyHandler(false)
|
w.Ready(false)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *watcher) IsHealthy() bool {
|
func (w *watcher) IsHealthy() bool {
|
||||||
@@ -400,14 +397,6 @@ func (w *watcher) GetRegistryType() string {
|
|||||||
return w.RegistryType.String()
|
return w.RegistryType.String()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *watcher) AppendServiceUpdateHandler(f func()) {
|
|
||||||
w.updateHandler = f
|
|
||||||
}
|
|
||||||
|
|
||||||
func (w *watcher) ReadyHandler(f func(bool)) {
|
|
||||||
w.readyHandler = f
|
|
||||||
}
|
|
||||||
|
|
||||||
func shouldSubscribe(serviceName string) bool {
|
func shouldSubscribe(serviceName string) bool {
|
||||||
prefixFilters := []string{"consumers:"}
|
prefixFilters := []string{"consumers:"}
|
||||||
fullFilters := []string{""}
|
fullFilters := []string{""}
|
||||||
|
|||||||
@@ -25,6 +25,7 @@ import (
|
|||||||
apiv1 "github.com/alibaba/higress/api/networking/v1"
|
apiv1 "github.com/alibaba/higress/api/networking/v1"
|
||||||
v1 "github.com/alibaba/higress/client/pkg/apis/networking/v1"
|
v1 "github.com/alibaba/higress/client/pkg/apis/networking/v1"
|
||||||
. "github.com/alibaba/higress/registry"
|
. "github.com/alibaba/higress/registry"
|
||||||
|
"github.com/alibaba/higress/registry/direct"
|
||||||
"github.com/alibaba/higress/registry/memory"
|
"github.com/alibaba/higress/registry/memory"
|
||||||
"github.com/alibaba/higress/registry/nacos"
|
"github.com/alibaba/higress/registry/nacos"
|
||||||
nacosv2 "github.com/alibaba/higress/registry/nacos/v2"
|
nacosv2 "github.com/alibaba/higress/registry/nacos/v2"
|
||||||
@@ -77,6 +78,26 @@ func (r *Reconciler) Reconcile(mcpbridge *v1.McpBridge) {
|
|||||||
errHappened := false
|
errHappened := false
|
||||||
log.Infof("ReconcileRegistries, toBeCreated: %d, toBeUpdated: %d, toBeDeleted: %d",
|
log.Infof("ReconcileRegistries, toBeCreated: %d, toBeUpdated: %d, toBeDeleted: %d",
|
||||||
len(toBeCreated), len(toBeUpdated), len(toBeDeleted))
|
len(toBeCreated), len(toBeUpdated), len(toBeDeleted))
|
||||||
|
for k := range toBeDeleted {
|
||||||
|
r.watchers[k].Stop()
|
||||||
|
delete(r.registries, k)
|
||||||
|
delete(r.watchers, k)
|
||||||
|
}
|
||||||
|
for k, v := range toBeUpdated {
|
||||||
|
r.watchers[k].Stop()
|
||||||
|
delete(r.registries, k)
|
||||||
|
delete(r.watchers, k)
|
||||||
|
watcher, err := r.generateWatcherFromRegistryConfig(v, &wg)
|
||||||
|
if err != nil {
|
||||||
|
errHappened = true
|
||||||
|
log.Errorf("ReconcileRegistries failed, err:%v", err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
go watcher.Run()
|
||||||
|
r.watchers[k] = watcher
|
||||||
|
r.registries[k] = v
|
||||||
|
}
|
||||||
for k, v := range toBeCreated {
|
for k, v := range toBeCreated {
|
||||||
watcher, err := r.generateWatcherFromRegistryConfig(v, &wg)
|
watcher, err := r.generateWatcherFromRegistryConfig(v, &wg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -89,31 +110,12 @@ func (r *Reconciler) Reconcile(mcpbridge *v1.McpBridge) {
|
|||||||
r.watchers[k] = watcher
|
r.watchers[k] = watcher
|
||||||
r.registries[k] = v
|
r.registries[k] = v
|
||||||
}
|
}
|
||||||
for k, v := range toBeUpdated {
|
|
||||||
go r.watchers[k].Stop()
|
|
||||||
delete(r.registries, k)
|
|
||||||
delete(r.watchers, k)
|
|
||||||
watcher, err := r.generateWatcherFromRegistryConfig(v, &wg)
|
|
||||||
if err != nil {
|
|
||||||
errHappened = true
|
|
||||||
log.Errorf("ReconcileRegistries failed, err:%v", err)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
go watcher.Run()
|
|
||||||
r.watchers[k] = watcher
|
|
||||||
r.registries[k] = v
|
|
||||||
}
|
|
||||||
for k := range toBeDeleted {
|
|
||||||
go r.watchers[k].Stop()
|
|
||||||
delete(r.registries, k)
|
|
||||||
delete(r.watchers, k)
|
|
||||||
}
|
|
||||||
if errHappened {
|
if errHappened {
|
||||||
log.Error("ReconcileRegistries failed, Init Watchers failed")
|
log.Error("ReconcileRegistries failed, Init Watchers failed")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
|
r.Cache.PurgeStaleService()
|
||||||
log.Infof("Registries is reconciled")
|
log.Infof("Registries is reconciled")
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -158,6 +160,14 @@ func (r *Reconciler) generateWatcherFromRegistryConfig(registry *apiv1.RegistryC
|
|||||||
zookeeper.WithPort(registry.Port),
|
zookeeper.WithPort(registry.Port),
|
||||||
zookeeper.WithZkServicesPath(registry.ZkServicesPath),
|
zookeeper.WithZkServicesPath(registry.ZkServicesPath),
|
||||||
)
|
)
|
||||||
|
case string(Static), string(DNS):
|
||||||
|
watcher, err = direct.NewWatcher(
|
||||||
|
r.Cache,
|
||||||
|
direct.WithType(registry.Type),
|
||||||
|
direct.WithName(registry.Name),
|
||||||
|
direct.WithDomain(registry.Domain),
|
||||||
|
direct.WithPort(registry.Port),
|
||||||
|
)
|
||||||
default:
|
default:
|
||||||
return nil, errors.New("unsupported registry type:" + registry.Type)
|
return nil, errors.New("unsupported registry type:" + registry.Type)
|
||||||
}
|
}
|
||||||
@@ -172,7 +182,7 @@ func (r *Reconciler) generateWatcherFromRegistryConfig(registry *apiv1.RegistryC
|
|||||||
once.Do(func() {
|
once.Do(func() {
|
||||||
wg.Done()
|
wg.Done()
|
||||||
if ready {
|
if ready {
|
||||||
log.Infof("Registry Watcher is ready, type:%s, name:%s", registry.Type, registry.Name)
|
log.Infof("Registry Watcher is ready, type:%s, name:%s", registry.Type, registry.Name)
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
|
|||||||
@@ -25,6 +25,8 @@ const (
|
|||||||
Consul ServiceRegistryType = "consul"
|
Consul ServiceRegistryType = "consul"
|
||||||
Nacos ServiceRegistryType = "nacos"
|
Nacos ServiceRegistryType = "nacos"
|
||||||
Nacos2 ServiceRegistryType = "nacos2"
|
Nacos2 ServiceRegistryType = "nacos2"
|
||||||
|
Static ServiceRegistryType = "static"
|
||||||
|
DNS ServiceRegistryType = "dns"
|
||||||
Healthy WatcherStatus = "healthy"
|
Healthy WatcherStatus = "healthy"
|
||||||
UnHealthy WatcherStatus = "unhealthy"
|
UnHealthy WatcherStatus = "unhealthy"
|
||||||
|
|
||||||
@@ -52,14 +54,21 @@ type Watcher interface {
|
|||||||
ReadyHandler(f func(bool))
|
ReadyHandler(f func(bool))
|
||||||
}
|
}
|
||||||
|
|
||||||
type BaseWatcher struct{}
|
type BaseWatcher struct {
|
||||||
|
UpdateService ServiceUpdateHandler
|
||||||
|
Ready ReadyHandler
|
||||||
|
}
|
||||||
|
|
||||||
func (w *BaseWatcher) Run() {}
|
func (w *BaseWatcher) Run() {}
|
||||||
func (w *BaseWatcher) Stop() {}
|
func (w *BaseWatcher) Stop() {}
|
||||||
func (w *BaseWatcher) IsHealthy() bool { return true }
|
func (w *BaseWatcher) IsHealthy() bool { return true }
|
||||||
func (w *BaseWatcher) GetRegistryType() string { return "" }
|
func (w *BaseWatcher) GetRegistryType() string { return "" }
|
||||||
func (w *BaseWatcher) AppendServiceUpdateHandler(f func()) {}
|
func (w *BaseWatcher) AppendServiceUpdateHandler(f func()) {
|
||||||
func (w *BaseWatcher) ReadyHandler(f func(bool)) {}
|
w.UpdateService = f
|
||||||
|
}
|
||||||
|
func (w *BaseWatcher) ReadyHandler(f func(bool)) {
|
||||||
|
w.Ready = f
|
||||||
|
}
|
||||||
|
|
||||||
type ServiceUpdateHandler func()
|
type ServiceUpdateHandler func()
|
||||||
type ReadyHandler func(bool)
|
type ReadyHandler func(bool)
|
||||||
|
|||||||
@@ -50,8 +50,6 @@ type watcher struct {
|
|||||||
RegistryType provider.ServiceRegistryType `json:"registry_type"`
|
RegistryType provider.ServiceRegistryType `json:"registry_type"`
|
||||||
Status provider.WatcherStatus `json:"status"`
|
Status provider.WatcherStatus `json:"status"`
|
||||||
serviceRemaind *atomic.Int32
|
serviceRemaind *atomic.Int32
|
||||||
updateHandler provider.ServiceUpdateHandler
|
|
||||||
readyHandler provider.ReadyHandler
|
|
||||||
cache memory.Cache
|
cache memory.Cache
|
||||||
mutex *sync.Mutex
|
mutex *sync.Mutex
|
||||||
stop chan struct{}
|
stop chan struct{}
|
||||||
@@ -339,7 +337,7 @@ func (w *watcher) DataChange(eventType Event) bool {
|
|||||||
Suffix: "zookeeper",
|
Suffix: "zookeeper",
|
||||||
RegistryType: w.Type,
|
RegistryType: w.Type,
|
||||||
})
|
})
|
||||||
w.updateHandler()
|
w.UpdateService()
|
||||||
} else if eventType.Action == EventTypeDel {
|
} else if eventType.Action == EventTypeDel {
|
||||||
w.seMux.Lock()
|
w.seMux.Lock()
|
||||||
value, ok := w.serviceEntry[host]
|
value, ok := w.serviceEntry[host]
|
||||||
@@ -370,7 +368,7 @@ func (w *watcher) DataChange(eventType Event) bool {
|
|||||||
RegistryType: w.Type,
|
RegistryType: w.Type,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
w.updateHandler()
|
w.UpdateService()
|
||||||
}
|
}
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
@@ -580,7 +578,7 @@ func (w *watcher) ChildToServiceEntry(children []string, interfaceName, zkPath s
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
w.seMux.Unlock()
|
w.seMux.Unlock()
|
||||||
w.updateHandler()
|
w.UpdateService()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -681,7 +679,7 @@ func (w *watcher) Run() {
|
|||||||
case <-ticker.C:
|
case <-ticker.C:
|
||||||
var needNewFetch bool
|
var needNewFetch bool
|
||||||
if w.IsReady() {
|
if w.IsReady() {
|
||||||
w.readyHandler(true)
|
w.Ready(true)
|
||||||
needNewFetch = true
|
needNewFetch = true
|
||||||
}
|
}
|
||||||
if firstFetchErr != nil || needNewFetch {
|
if firstFetchErr != nil || needNewFetch {
|
||||||
@@ -712,15 +710,13 @@ func (w *watcher) Stop() {
|
|||||||
for key := range w.serviceEntry {
|
for key := range w.serviceEntry {
|
||||||
w.cache.DeleteServiceEntryWrapper(key)
|
w.cache.DeleteServiceEntryWrapper(key)
|
||||||
}
|
}
|
||||||
w.updateHandler()
|
w.UpdateService()
|
||||||
w.seMux.Unlock()
|
w.seMux.Unlock()
|
||||||
|
|
||||||
w.stop <- struct{}{}
|
|
||||||
w.Done <- struct{}{}
|
|
||||||
close(w.stop)
|
close(w.stop)
|
||||||
close(w.Done)
|
close(w.Done)
|
||||||
w.zkClient.Close()
|
w.zkClient.Close()
|
||||||
w.readyHandler(false)
|
w.Ready(false)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *watcher) IsHealthy() bool {
|
func (w *watcher) IsHealthy() bool {
|
||||||
@@ -731,14 +727,6 @@ func (w *watcher) GetRegistryType() string {
|
|||||||
return w.RegistryType.String()
|
return w.RegistryType.String()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *watcher) AppendServiceUpdateHandler(f func()) {
|
|
||||||
w.updateHandler = f
|
|
||||||
}
|
|
||||||
|
|
||||||
func (w *watcher) ReadyHandler(f func(bool)) {
|
|
||||||
w.readyHandler = f
|
|
||||||
}
|
|
||||||
|
|
||||||
func (w *watcher) IsReady() bool {
|
func (w *watcher) IsReady() bool {
|
||||||
if w.serviceRemaind == nil {
|
if w.serviceRemaind == nil {
|
||||||
return true
|
return true
|
||||||
|
|||||||
Reference in New Issue
Block a user