mirror of
https://github.com/alibaba/higress.git
synced 2026-06-09 04:37:31 +08:00
mcpbridge新增vport元素,以修复服务后端端口不一致导致的兼容性问题 || =mcpbridge added a vport element to fix compatibility issues caused by inconsistent ports in the service backend (#2859)
This commit is contained in:
@@ -81,6 +81,12 @@ func NewWatcher(cache memory.Cache, opts ...WatcherOption) (provider.Watcher, er
|
||||
return w, nil
|
||||
}
|
||||
|
||||
func WithVport(vport *apiv1.RegistryConfig_VPort) WatcherOption {
|
||||
return func(w *watcher) {
|
||||
w.Vport = vport
|
||||
}
|
||||
}
|
||||
|
||||
func WithEurekaFullRefreshInterval(refreshInterval int64) WatcherOption {
|
||||
return func(w *watcher) {
|
||||
if refreshInterval < int64(DefaultFullRefreshIntervalLimit) {
|
||||
@@ -151,6 +157,9 @@ func (w *watcher) Stop() {
|
||||
w.cache.DeleteServiceWrapper(makeHost(serviceName))
|
||||
}
|
||||
w.UpdateService()
|
||||
w.isStop = true
|
||||
close(w.stop)
|
||||
w.Ready(false)
|
||||
}
|
||||
|
||||
func (w *watcher) IsHealthy() bool {
|
||||
@@ -200,7 +209,7 @@ func (w *watcher) subscribe(service *fargo.Application) error {
|
||||
defer w.UpdateService()
|
||||
|
||||
if len(service.Instances) != 0 {
|
||||
se, err := generateServiceEntry(service)
|
||||
se, err := w.generateServiceEntry(service)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -252,10 +261,10 @@ func convertMap(m map[string]interface{}) map[string]string {
|
||||
return result
|
||||
}
|
||||
|
||||
func generateServiceEntry(app *fargo.Application) (*v1alpha3.ServiceEntry, error) {
|
||||
func (w *watcher) generateServiceEntry(app *fargo.Application) (*v1alpha3.ServiceEntry, error) {
|
||||
portList := make([]*v1alpha3.ServicePort, 0)
|
||||
endpoints := make([]*v1alpha3.WorkloadEntry, 0)
|
||||
|
||||
sePort := provider.GetServiceVport(makeHost(app.Name), w.Vport)
|
||||
for _, instance := range app.Instances {
|
||||
protocol := common.HTTP
|
||||
if val, _ := instance.Metadata.GetString("protocol"); val != "" {
|
||||
@@ -269,7 +278,13 @@ func generateServiceEntry(app *fargo.Application) (*v1alpha3.ServiceEntry, error
|
||||
Protocol: protocol.String(),
|
||||
}
|
||||
if len(portList) == 0 {
|
||||
portList = append(portList, port)
|
||||
if sePort != nil {
|
||||
sePort.Name = port.Name
|
||||
sePort.Protocol = port.Protocol
|
||||
portList = append(portList, sePort)
|
||||
} else {
|
||||
portList = append(portList, port)
|
||||
}
|
||||
}
|
||||
endpoint := v1alpha3.WorkloadEntry{
|
||||
Address: instance.IPAddr,
|
||||
|
||||
@@ -196,6 +196,12 @@ func NewWatcher(cache memory.Cache, opts ...WatcherOption) (provider.Watcher, er
|
||||
}
|
||||
}
|
||||
|
||||
func WithVport(vport *apiv1.RegistryConfig_VPort) WatcherOption {
|
||||
return func(w *watcher) {
|
||||
w.Vport = vport
|
||||
}
|
||||
}
|
||||
|
||||
func WithNacosAddressServer(nacosAddressServer string) WatcherOption {
|
||||
return func(w *watcher) {
|
||||
w.NacosAddressServer = nacosAddressServer
|
||||
@@ -529,7 +535,7 @@ func (w *watcher) generateServiceEntry(host string, services []model.Instance) *
|
||||
portList := make([]*v1alpha3.ServicePort, 0)
|
||||
endpoints := make([]*v1alpha3.WorkloadEntry, 0)
|
||||
isDnsService := false
|
||||
|
||||
sePort := provider.GetServiceVport(host, w.Vport)
|
||||
for _, service := range services {
|
||||
protocol := common.HTTP
|
||||
if service.Metadata != nil && service.Metadata["protocol"] != "" {
|
||||
@@ -541,7 +547,13 @@ func (w *watcher) generateServiceEntry(host string, services []model.Instance) *
|
||||
Protocol: protocol.String(),
|
||||
}
|
||||
if len(portList) == 0 {
|
||||
portList = append(portList, port)
|
||||
if sePort != nil {
|
||||
sePort.Name = port.Name
|
||||
sePort.Protocol = port.Protocol
|
||||
portList = append(portList, sePort)
|
||||
} else {
|
||||
portList = append(portList, port)
|
||||
}
|
||||
}
|
||||
if !isValidIP(service.Ip) {
|
||||
isDnsService = true
|
||||
|
||||
@@ -119,6 +119,12 @@ func NewWatcher(cache memory.Cache, opts ...WatcherOption) (provider.Watcher, er
|
||||
return w, nil
|
||||
}
|
||||
|
||||
func WithVport(vport *apiv1.RegistryConfig_VPort) WatcherOption {
|
||||
return func(w *watcher) {
|
||||
w.Vport = vport
|
||||
}
|
||||
}
|
||||
|
||||
func WithNacosNamespaceId(nacosNamespaceId string) WatcherOption {
|
||||
return func(w *watcher) {
|
||||
if nacosNamespaceId == "" {
|
||||
@@ -326,7 +332,7 @@ func (w *watcher) getSubscribeCallback(groupName string, serviceName string) fun
|
||||
func (w *watcher) generateServiceEntry(host string, services []model.SubscribeService) *v1alpha3.ServiceEntry {
|
||||
portList := make([]*v1alpha3.ServicePort, 0)
|
||||
endpoints := make([]*v1alpha3.WorkloadEntry, 0)
|
||||
|
||||
sePort := provider.GetServiceVport(host, w.Vport)
|
||||
for _, service := range services {
|
||||
protocol := common.HTTP
|
||||
if service.Metadata != nil && service.Metadata["protocol"] != "" {
|
||||
@@ -340,7 +346,13 @@ func (w *watcher) generateServiceEntry(host string, services []model.SubscribeSe
|
||||
Protocol: protocol.String(),
|
||||
}
|
||||
if len(portList) == 0 {
|
||||
portList = append(portList, port)
|
||||
if sePort != nil {
|
||||
sePort.Name = port.Name
|
||||
sePort.Protocol = port.Protocol
|
||||
portList = append(portList, sePort)
|
||||
} else {
|
||||
portList = append(portList, port)
|
||||
}
|
||||
}
|
||||
endpoint := v1alpha3.WorkloadEntry{
|
||||
Address: service.Ip,
|
||||
|
||||
@@ -205,6 +205,7 @@ func (r *Reconciler) generateWatcherFromRegistryConfig(registry *apiv1.RegistryC
|
||||
nacos.WithNacosGroups(registry.NacosGroups),
|
||||
nacos.WithNacosRefreshInterval(registry.NacosRefreshInterval),
|
||||
nacos.WithAuthOption(authOption),
|
||||
nacos.WithVport(registry.Vport),
|
||||
)
|
||||
case string(Nacos2), string(Nacos3):
|
||||
watcher, err = nacosv2.NewWatcher(
|
||||
@@ -226,6 +227,7 @@ func (r *Reconciler) generateWatcherFromRegistryConfig(registry *apiv1.RegistryC
|
||||
nacosv2.WithClusterId(r.clusterId),
|
||||
nacosv2.WithNamespace(r.namespace),
|
||||
nacosv2.WithAuthOption(authOption),
|
||||
nacosv2.WithVport(registry.Vport),
|
||||
)
|
||||
case string(Zookeeper):
|
||||
watcher, err = zookeeper.NewWatcher(
|
||||
@@ -266,6 +268,7 @@ func (r *Reconciler) generateWatcherFromRegistryConfig(registry *apiv1.RegistryC
|
||||
eureka.WithDomain(registry.Domain),
|
||||
eureka.WithType(registry.Type),
|
||||
eureka.WithPort(registry.Port),
|
||||
eureka.WithVport(registry.Vport),
|
||||
)
|
||||
default:
|
||||
return nil, errors.New("unsupported registry type:" + registry.Type)
|
||||
|
||||
@@ -15,7 +15,11 @@
|
||||
package registry
|
||||
|
||||
import (
|
||||
apiv1 "github.com/alibaba/higress/api/networking/v1"
|
||||
"istio.io/api/networking/v1alpha3"
|
||||
"istio.io/pkg/log"
|
||||
"net"
|
||||
"strings"
|
||||
"time"
|
||||
)
|
||||
|
||||
@@ -89,3 +93,29 @@ func ProbeWatcherStatus(host string, port string) WatcherStatus {
|
||||
_ = conn.Close()
|
||||
return Healthy
|
||||
}
|
||||
|
||||
func GetServiceVport(host string, vport *apiv1.RegistryConfig_VPort) *v1alpha3.ServicePort {
|
||||
if vport == nil {
|
||||
log.Warnf("there is no vport exist for: %s, skip", host)
|
||||
return nil
|
||||
}
|
||||
for _, service := range vport.Services {
|
||||
if strings.EqualFold(service.Name, host) && isValidPort(service.Value) {
|
||||
log.Infof("service %s vport exist, use service vport %d", host, service.Value)
|
||||
return &v1alpha3.ServicePort{
|
||||
Number: service.Value,
|
||||
}
|
||||
}
|
||||
}
|
||||
if isValidPort(vport.Default) {
|
||||
log.Infof("there is only default vport exist, use default vport %d", vport.Default)
|
||||
return &v1alpha3.ServicePort{
|
||||
Number: vport.Default,
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func isValidPort(port uint32) bool {
|
||||
return port > 0 && port <= 65535
|
||||
}
|
||||
Reference in New Issue
Block a user