Supports MCP service configuration protocol and SNI, along with various other fixes. (#1369)

This commit is contained in:
澄潭
2024-10-09 15:54:19 +08:00
committed by GitHub
parent 3ed28f2a66
commit ecf52aecfc
23 changed files with 282 additions and 94 deletions

View File

@@ -237,7 +237,7 @@ func (w *watcher) Stop() {
// clean the cache
suffix := strings.Join([]string{serviceName, w.ConsulDatacenter, w.Type}, common.DotSeparator)
host := strings.ReplaceAll(suffix, common.Underscore, common.Hyphen)
w.cache.DeleteServiceEntryWrapper(host)
w.cache.DeleteServiceWrapper(host)
}
w.isStop = true
close(w.stop)
@@ -295,15 +295,16 @@ func (w *watcher) getSubscribeCallback(serviceName string) func(idx uint64, data
serviceEntry := w.generateServiceEntry(host, services)
if serviceEntry != nil {
log.Infof("consul update serviceEntry %s cache", host)
w.cache.UpdateServiceEntryWrapper(host, &memory.ServiceEntryWrapper{
w.cache.UpdateServiceWrapper(host, &memory.ServiceWrapper{
ServiceEntry: serviceEntry,
ServiceName: serviceName,
Suffix: suffix,
RegistryType: w.Type,
RegistryName: w.Name,
})
} else {
log.Infof("consul serviceEntry %s is nil", host)
//w.cache.DeleteServiceEntryWrapper(host)
//w.cache.DeleteServiceWrapper(host)
}
}
}

View File

@@ -22,14 +22,15 @@ import (
"sync"
"istio.io/api/networking/v1alpha3"
"istio.io/istio/pkg/config/protocol"
"istio.io/pkg/log"
apiv1 "github.com/alibaba/higress/api/networking/v1"
"github.com/alibaba/higress/pkg/common"
ingress "github.com/alibaba/higress/pkg/ingress/kube/common"
"github.com/alibaba/higress/registry"
provider "github.com/alibaba/higress/registry"
"github.com/alibaba/higress/registry/memory"
"github.com/go-errors/errors"
)
type watcher struct {
@@ -48,6 +49,9 @@ func NewWatcher(cache memory.Cache, opts ...WatcherOption) (provider.Watcher, er
for _, opt := range opts {
opt(w)
}
if common.ParseProtocol(w.Protocol) == common.Unsupported {
return nil, errors.Errorf("invalid protocol:%s", w.Protocol)
}
return w, nil
}
@@ -75,17 +79,42 @@ func WithPort(port uint32) WatcherOption {
}
}
func WithProtocol(protocol string) WatcherOption {
return func(w *watcher) {
w.Protocol = protocol
if w.Protocol == "" {
w.Protocol = string(common.HTTP)
}
}
}
func WithSNI(sni string) WatcherOption {
return func(w *watcher) {
w.Sni = sni
}
}
func (w *watcher) Run() {
w.mutex.Lock()
defer w.mutex.Unlock()
host := strings.Join([]string{w.Name, w.Type}, common.DotSeparator)
serviceEntry := w.generateServiceEntry(host)
if serviceEntry != nil {
w.cache.UpdateServiceEntryWrapper(host, &memory.ServiceEntryWrapper{
ServiceName: w.Name,
ServiceEntry: serviceEntry,
Suffix: w.Type,
RegistryType: w.Type,
var destinationRuleWrapper *ingress.WrapperDestinationRule
destinationRule := w.generateDestinationRule(serviceEntry)
if destinationRule != nil {
destinationRuleWrapper = &ingress.WrapperDestinationRule{
DestinationRule: destinationRule,
ServiceKey: ingress.CreateMcpServiceKey(host, int32(w.Port)),
}
}
w.cache.UpdateServiceWrapper(host, &memory.ServiceWrapper{
ServiceName: w.Name,
ServiceEntry: serviceEntry,
Suffix: w.Type,
RegistryType: w.Type,
RegistryName: w.Name,
DestinationRuleWrapper: destinationRuleWrapper,
})
w.UpdateService()
}
@@ -96,7 +125,7 @@ func (w *watcher) Stop() {
w.mutex.Lock()
defer w.mutex.Unlock()
host := strings.Join([]string{w.Name, w.Type}, common.DotSeparator)
w.cache.DeleteServiceEntryWrapper(host)
w.cache.DeleteServiceWrapper(host)
w.Ready(false)
}
@@ -146,8 +175,8 @@ func (w *watcher) generateServiceEntry(host string) *v1alpha3.ServiceEntry {
var ports []*v1alpha3.ServicePort
ports = append(ports, &v1alpha3.ServicePort{
Number: w.Port,
Name: "http",
Protocol: string(protocol.HTTP),
Name: w.Protocol,
Protocol: string(common.ParseProtocol(w.Protocol)),
})
se := &v1alpha3.ServiceEntry{
Hosts: []string{host},
@@ -163,6 +192,34 @@ func (w *watcher) generateServiceEntry(host string) *v1alpha3.ServiceEntry {
return se
}
func (w *watcher) generateDestinationRule(se *v1alpha3.ServiceEntry) *v1alpha3.DestinationRule {
if !common.Protocol(se.Ports[0].Protocol).IsHTTPS() {
return nil
}
sni := w.Sni
// DNS type, automatically sets SNI based on domain name.
if sni == "" && w.Type == string(registry.DNS) && len(se.Endpoints) == 1 {
sni = w.Domain
}
return &v1alpha3.DestinationRule{
Host: se.Hosts[0],
TrafficPolicy: &v1alpha3.TrafficPolicy{
PortLevelSettings: []*v1alpha3.TrafficPolicy_PortTrafficPolicy{
&v1alpha3.TrafficPolicy_PortTrafficPolicy{
Port: &v1alpha3.PortSelector{
Number: se.Ports[0].Number,
},
Tls: &v1alpha3.ClientTLSSettings{
Mode: v1alpha3.ClientTLSSettings_SIMPLE,
Sni: sni,
},
},
},
},
}
}
func (w *watcher) GetRegistryType() string {
return w.RegistryConfig.Type
}

View File

@@ -147,7 +147,7 @@ func (w *watcher) Stop() {
log.Errorf("Failed to unsubscribe service : %v", serviceName)
continue
}
w.cache.DeleteServiceEntryWrapper(makeHost(serviceName))
w.cache.DeleteServiceWrapper(makeHost(serviceName))
}
w.UpdateService()
}
@@ -203,17 +203,18 @@ func (w *watcher) subscribe(service *fargo.Application) error {
if err != nil {
return err
}
w.cache.UpdateServiceEntryWrapper(makeHost(service.Name), &memory.ServiceEntryWrapper{
w.cache.UpdateServiceWrapper(makeHost(service.Name), &memory.ServiceWrapper{
ServiceName: service.Name,
ServiceEntry: se,
Suffix: suffix,
RegistryType: w.Type,
RegistryName: w.Name,
})
return nil
}
if w.updateCacheWhenEmpty {
w.cache.DeleteServiceEntryWrapper(makeHost(service.Name))
w.cache.DeleteServiceWrapper(makeHost(service.Name))
}
return nil

View File

@@ -24,26 +24,28 @@ import (
"istio.io/pkg/log"
"github.com/alibaba/higress/pkg/common"
ingress "github.com/alibaba/higress/pkg/ingress/kube/common"
)
type Cache interface {
UpdateServiceEntryWrapper(service string, data *ServiceEntryWrapper)
DeleteServiceEntryWrapper(service string)
UpdateServiceWrapper(service string, data *ServiceWrapper)
DeleteServiceWrapper(service string)
PurgeStaleService()
UpdateServiceEntryEndpointWrapper(service, ip, regionId, zoneId, protocol string, labels map[string]string)
GetServiceByEndpoints(requestVersions, endpoints map[string]bool, versionKey string, protocol common.Protocol) map[string][]string
GetAllServiceEntry() []*v1alpha3.ServiceEntry
GetAllServiceEntryWrapper() []*ServiceEntryWrapper
GetIncrementalServiceEntryWrapper() (updatedList []*ServiceEntryWrapper, deletedList []*ServiceEntryWrapper)
GetAllServiceWrapper() []*ServiceWrapper
GetAllDestinationRuleWrapper() []*ingress.WrapperDestinationRule
GetIncrementalServiceWrapper() (updatedList []*ServiceWrapper, deletedList []*ServiceWrapper)
RemoveEndpointByIp(ip string)
}
func NewCache() Cache {
return &store{
mux: &sync.RWMutex{},
sew: make(map[string]*ServiceEntryWrapper),
toBeUpdated: make([]*ServiceEntryWrapper, 0),
toBeDeleted: make([]*ServiceEntryWrapper, 0),
sew: make(map[string]*ServiceWrapper),
toBeUpdated: make([]*ServiceWrapper, 0),
toBeDeleted: make([]*ServiceWrapper, 0),
ip2services: make(map[string]map[string]bool),
deferedDelete: make(map[string]struct{}),
}
@@ -51,9 +53,9 @@ func NewCache() Cache {
type store struct {
mux *sync.RWMutex
sew map[string]*ServiceEntryWrapper
toBeUpdated []*ServiceEntryWrapper
toBeDeleted []*ServiceEntryWrapper
sew map[string]*ServiceWrapper
toBeUpdated []*ServiceWrapper
toBeDeleted []*ServiceWrapper
ip2services map[string]map[string]bool
deferedDelete map[string]struct{}
}
@@ -94,7 +96,7 @@ func (s *store) UpdateServiceEntryEndpointWrapper(service, ip, regionId, zoneId,
return
}
func (s *store) UpdateServiceEntryWrapper(service string, data *ServiceEntryWrapper) {
func (s *store) UpdateServiceWrapper(service string, data *ServiceWrapper) {
s.mux.Lock()
defer s.mux.Unlock()
@@ -116,7 +118,7 @@ func (s *store) UpdateServiceEntryWrapper(service string, data *ServiceEntryWrap
log.Infof("ServiceEntry updated, host:%s", service)
}
func (s *store) DeleteServiceEntryWrapper(service string) {
func (s *store) DeleteServiceWrapper(service string) {
s.mux.Lock()
defer s.mux.Unlock()
@@ -199,31 +201,46 @@ func (s *store) GetAllServiceEntry() []*v1alpha3.ServiceEntry {
return seList
}
// GetAllServiceEntryWrapper get all ServiceEntryWrapper in the store for xds push
func (s *store) GetAllServiceEntryWrapper() []*ServiceEntryWrapper {
// GetAllServiceWrapper get all ServiceWrapper in the store for xds push
func (s *store) GetAllServiceWrapper() []*ServiceWrapper {
s.mux.RLock()
defer s.mux.RUnlock()
defer s.cleanUpdateAndDeleteArray()
sewList := make([]*ServiceEntryWrapper, 0)
sewList := make([]*ServiceWrapper, 0)
for _, serviceEntryWrapper := range s.sew {
sewList = append(sewList, serviceEntryWrapper.DeepCopy())
}
return sewList
}
// GetIncrementalServiceEntryWrapper get incremental ServiceEntryWrapper in the store for xds push
func (s *store) GetIncrementalServiceEntryWrapper() ([]*ServiceEntryWrapper, []*ServiceEntryWrapper) {
// GetAllDestinationRuleWrapper get all DestinationRuleWrapper in the store for xds push
func (s *store) GetAllDestinationRuleWrapper() []*ingress.WrapperDestinationRule {
s.mux.RLock()
defer s.mux.RUnlock()
defer s.cleanUpdateAndDeleteArray()
updatedList := make([]*ServiceEntryWrapper, 0)
drwList := make([]*ingress.WrapperDestinationRule, 0)
for _, serviceEntryWrapper := range s.sew {
if serviceEntryWrapper.DestinationRuleWrapper != nil {
drwList = append(drwList, serviceEntryWrapper.DeepCopy().DestinationRuleWrapper)
}
}
return drwList
}
// GetIncrementalServiceWrapper get incremental ServiceWrapper in the store for xds push
func (s *store) GetIncrementalServiceWrapper() ([]*ServiceWrapper, []*ServiceWrapper) {
s.mux.RLock()
defer s.mux.RUnlock()
defer s.cleanUpdateAndDeleteArray()
updatedList := make([]*ServiceWrapper, 0)
for _, serviceEntryWrapper := range s.toBeUpdated {
updatedList = append(updatedList, serviceEntryWrapper.DeepCopy())
}
deletedList := make([]*ServiceEntryWrapper, 0)
deletedList := make([]*ServiceWrapper, 0)
for _, serviceEntryWrapper := range s.toBeDeleted {
deletedList = append(deletedList, serviceEntryWrapper.DeepCopy())
}
@@ -236,7 +253,7 @@ func (s *store) cleanUpdateAndDeleteArray() {
s.toBeDeleted = nil
}
func (s *store) updateIpMap(service string, data *ServiceEntryWrapper) {
func (s *store) updateIpMap(service string, data *ServiceWrapper) {
for _, ep := range data.ServiceEntry.Endpoints {
if s.ip2services[ep.Address] == nil {
s.ip2services[ep.Address] = make(map[string]bool)

View File

@@ -18,27 +18,37 @@ import (
"time"
"istio.io/api/networking/v1alpha3"
"github.com/alibaba/higress/pkg/ingress/kube/common"
)
type ServiceEntryWrapper struct {
ServiceName string
ServiceEntry *v1alpha3.ServiceEntry
Suffix string
RegistryType string
createTime time.Time
type ServiceWrapper struct {
ServiceName string
ServiceEntry *v1alpha3.ServiceEntry
DestinationRuleWrapper *common.WrapperDestinationRule
Suffix string
RegistryType string
RegistryName string
createTime time.Time
}
func (sew *ServiceEntryWrapper) DeepCopy() *ServiceEntryWrapper {
return &ServiceEntryWrapper{
ServiceEntry: sew.ServiceEntry.DeepCopy(),
createTime: sew.GetCreateTime(),
func (sew *ServiceWrapper) DeepCopy() *ServiceWrapper {
res := &ServiceWrapper{}
res = sew
res.ServiceEntry = sew.ServiceEntry.DeepCopy()
res.createTime = sew.GetCreateTime()
if sew.DestinationRuleWrapper != nil {
res.DestinationRuleWrapper = sew.DestinationRuleWrapper
res.DestinationRuleWrapper.DestinationRule = sew.DestinationRuleWrapper.DestinationRule.DeepCopy()
}
return res
}
func (sew *ServiceEntryWrapper) SetCreateTime(createTime time.Time) {
func (sew *ServiceWrapper) SetCreateTime(createTime time.Time) {
sew.createTime = createTime
}
func (sew *ServiceEntryWrapper) GetCreateTime() time.Time {
func (sew *ServiceWrapper) GetCreateTime() time.Time {
return sew.createTime
}

View File

@@ -66,7 +66,7 @@ type watcher struct {
isStop bool
addrProvider *address.NacosAddressProvider
updateCacheWhenEmpty bool
nacosClientConfig *constant.ClientConfig
nacosClientConfig *constant.ClientConfig
authOption provider.AuthOption
}
@@ -413,7 +413,7 @@ func (w *watcher) getSubscribeCallback(groupName string, serviceName string) fun
if err != nil {
if strings.Contains(err.Error(), "hosts is empty") {
if w.updateCacheWhenEmpty {
w.cache.DeleteServiceEntryWrapper(host)
w.cache.DeleteServiceWrapper(host)
}
} else {
log.Errorf("callback error:%v", err)
@@ -425,11 +425,12 @@ func (w *watcher) getSubscribeCallback(groupName string, serviceName string) fun
return
}
serviceEntry := w.generateServiceEntry(host, services)
w.cache.UpdateServiceEntryWrapper(host, &memory.ServiceEntryWrapper{
w.cache.UpdateServiceWrapper(host, &memory.ServiceWrapper{
ServiceName: serviceName,
ServiceEntry: serviceEntry,
Suffix: suffix,
RegistryType: w.Type,
RegistryName: w.Name,
})
}
}
@@ -487,7 +488,7 @@ func (w *watcher) Stop() {
suffix := strings.Join([]string{s[0], w.NacosNamespace, "nacos"}, common.DotSeparator)
suffix = strings.ReplaceAll(suffix, common.Underscore, common.Hyphen)
host := strings.Join([]string{s[1], suffix}, common.DotSeparator)
w.cache.DeleteServiceEntryWrapper(host)
w.cache.DeleteServiceWrapper(host)
}
w.isStop = true

View File

@@ -301,7 +301,7 @@ func (w *watcher) getSubscribeCallback(groupName string, serviceName string) fun
if err != nil {
if strings.Contains(err.Error(), "hosts is empty") {
if w.updateCacheWhenEmpty {
w.cache.DeleteServiceEntryWrapper(host)
w.cache.DeleteServiceWrapper(host)
}
} else {
log.Errorf("callback error:%v", err)
@@ -312,11 +312,12 @@ func (w *watcher) getSubscribeCallback(groupName string, serviceName string) fun
return
}
serviceEntry := w.generateServiceEntry(host, services)
w.cache.UpdateServiceEntryWrapper(host, &memory.ServiceEntryWrapper{
w.cache.UpdateServiceWrapper(host, &memory.ServiceWrapper{
ServiceName: serviceName,
ServiceEntry: serviceEntry,
Suffix: suffix,
RegistryType: w.Type,
RegistryName: w.Name,
})
}
}
@@ -374,7 +375,7 @@ func (w *watcher) Stop() {
suffix := strings.Join([]string{s[0], w.NacosNamespace, w.Type}, common.DotSeparator)
suffix = strings.ReplaceAll(suffix, common.Underscore, common.Hyphen)
host := strings.Join([]string{s[1], suffix}, common.DotSeparator)
w.cache.DeleteServiceEntryWrapper(host)
w.cache.DeleteServiceWrapper(host)
}
w.isStop = true
close(w.stop)

View File

@@ -211,6 +211,8 @@ func (r *Reconciler) generateWatcherFromRegistryConfig(registry *apiv1.RegistryC
direct.WithName(registry.Name),
direct.WithDomain(registry.Domain),
direct.WithPort(registry.Port),
direct.WithProtocol(registry.Protocol),
direct.WithSNI(registry.Sni),
)
case string(Eureka):
watcher, err = eureka.NewWatcher(

View File

@@ -331,11 +331,12 @@ func (w *watcher) DataChange(eventType Event) bool {
se := w.generateServiceEntry(w.serviceEntry[host])
w.seMux.Unlock()
w.cache.UpdateServiceEntryWrapper(host, &memory.ServiceEntryWrapper{
w.cache.UpdateServiceWrapper(host, &memory.ServiceWrapper{
ServiceName: host,
ServiceEntry: se,
Suffix: "zookeeper",
RegistryType: w.Type,
RegistryName: w.Name,
})
w.UpdateService()
} else if eventType.Action == EventTypeDel {
@@ -358,14 +359,15 @@ func (w *watcher) DataChange(eventType Event) bool {
//todo update
if len(se.Endpoints) == 0 {
if !w.keepStaleWhenEmpty {
w.cache.DeleteServiceEntryWrapper(host)
w.cache.DeleteServiceWrapper(host)
}
} else {
w.cache.UpdateServiceEntryWrapper(host, &memory.ServiceEntryWrapper{
w.cache.UpdateServiceWrapper(host, &memory.ServiceWrapper{
ServiceName: host,
ServiceEntry: se,
Suffix: "zookeeper",
RegistryType: w.Type,
RegistryName: w.Name,
})
}
w.UpdateService()
@@ -560,20 +562,22 @@ func (w *watcher) ChildToServiceEntry(children []string, interfaceName, zkPath s
if !reflect.DeepEqual(value, config) {
w.serviceEntry[host] = config
//todo update or create serviceentry
w.cache.UpdateServiceEntryWrapper(host, &memory.ServiceEntryWrapper{
w.cache.UpdateServiceWrapper(host, &memory.ServiceWrapper{
ServiceName: host,
ServiceEntry: se,
Suffix: "zookeeper",
RegistryType: w.Type,
RegistryName: w.Name,
})
}
} else {
w.serviceEntry[host] = config
w.cache.UpdateServiceEntryWrapper(host, &memory.ServiceEntryWrapper{
w.cache.UpdateServiceWrapper(host, &memory.ServiceWrapper{
ServiceName: host,
ServiceEntry: se,
Suffix: "zookeeper",
RegistryType: w.Type,
RegistryName: w.Name,
})
}
}
@@ -708,7 +712,7 @@ func (w *watcher) Stop() {
w.seMux.Lock()
for key := range w.serviceEntry {
w.cache.DeleteServiceEntryWrapper(key)
w.cache.DeleteServiceWrapper(key)
}
w.UpdateService()
w.seMux.Unlock()