feat: Support adding a proxy server in between when forwarding requests to upstream (#2710)

Co-authored-by: 澄潭 <zty98751@alibaba-inc.com>
This commit is contained in:
Kent Dong
2025-08-15 16:15:42 +08:00
committed by GitHub
parent 995bcc2168
commit 5822868f87
21 changed files with 1097 additions and 253 deletions

View File

@@ -37,6 +37,7 @@ import (
"github.com/alibaba/higress/registry/memory"
"github.com/alibaba/higress/registry/nacos"
nacosv2 "github.com/alibaba/higress/registry/nacos/v2"
"github.com/alibaba/higress/registry/proxy"
"github.com/alibaba/higress/registry/zookeeper"
)
@@ -47,6 +48,7 @@ const (
type Reconciler struct {
memory.Cache
registries map[string]*apiv1.RegistryConfig
proxies map[string]*apiv1.ProxyConfig
watchers map[string]Watcher
serviceUpdate func()
client kube.Client
@@ -58,6 +60,7 @@ func NewReconciler(serviceUpdate func(), client kube.Client, namespace, clusterI
return &Reconciler{
Cache: memory.NewCache(),
registries: make(map[string]*apiv1.RegistryConfig),
proxies: make(map[string]*apiv1.ProxyConfig),
watchers: make(map[string]Watcher),
serviceUpdate: serviceUpdate,
client: client,
@@ -67,11 +70,45 @@ func NewReconciler(serviceUpdate func(), client kube.Client, namespace, clusterI
}
func (r *Reconciler) Reconcile(mcpbridge *v1.McpBridge) error {
newRegistries := make(map[string]*apiv1.RegistryConfig)
var registries []*apiv1.RegistryConfig
var proxies []*apiv1.ProxyConfig
if mcpbridge != nil {
for _, registry := range mcpbridge.Spec.Registries {
newRegistries[path.Join(registry.Type, registry.Name)] = registry
if proxy.NeedToFillProxyListenerPorts(mcpbridge.Spec.Proxies) {
// Make a deep copy of the McpBridge resource to avoid modifying the original one
mcpBridgeForUpdate := mcpbridge.DeepCopy()
if proxy.FillProxyListenerPorts(mcpBridgeForUpdate.Spec.Proxies) {
// Some listener ports are filled, we need to update the resource and reconcile again
mcpBridgeClient := r.client.Higress().NetworkingV1().McpBridges(mcpBridgeForUpdate.Namespace)
if _, err := mcpBridgeClient.Update(context.Background(), mcpBridgeForUpdate, metav1.UpdateOptions{}); err != nil {
return fmt.Errorf("failed to save filled proxy listener ports: %v", err)
}
return nil
}
}
registries = mcpbridge.Spec.Registries
proxies = mcpbridge.Spec.Proxies
}
if err := r.reconcileRegistries(registries); err != nil {
return err
}
if err := r.reconcileProxies(proxies); err != nil {
return err
}
if r.Cache.PurgeStaleItems() {
// Something stale are purged. We need to notify the service update handler
r.serviceUpdate()
}
return nil
}
func (r *Reconciler) reconcileRegistries(registries []*apiv1.RegistryConfig) error {
newRegistries := make(map[string]*apiv1.RegistryConfig)
for _, registry := range registries {
newRegistries[path.Join(registry.Type, registry.Name)] = registry
}
var wg sync.WaitGroup
toBeCreated := make(map[string]*apiv1.RegistryConfig)
@@ -142,7 +179,6 @@ func (r *Reconciler) Reconcile(mcpbridge *v1.McpBridge) error {
case <-readyTimer.C:
return errors.New("ReoncileRegistries failed, waiting for ready timeout")
}
r.Cache.PurgeStaleService()
log.Infof("Registries is reconciled")
return nil
}
@@ -221,6 +257,7 @@ func (r *Reconciler) generateWatcherFromRegistryConfig(registry *apiv1.RegistryC
direct.WithPort(registry.Port),
direct.WithProtocol(registry.Protocol),
direct.WithSNI(registry.Sni),
direct.WithProxyName(registry.ProxyName),
)
case string(Eureka):
watcher, err = eureka.NewWatcher(
@@ -289,6 +326,55 @@ func (r *Reconciler) getAuthOption(registry *apiv1.RegistryConfig) (AuthOption,
return authOption, nil
}
func (r *Reconciler) reconcileProxies(proxies []*apiv1.ProxyConfig) error {
newProxies := make(map[string]*apiv1.ProxyConfig)
for _, p := range proxies {
newProxies[p.Name] = p
}
toBeUpdated := make(map[string]*apiv1.ProxyConfig)
toBeDeleted := make(map[string]*apiv1.ProxyConfig)
for key, newProxy := range newProxies {
if oldProxy, ok := r.proxies[key]; !ok || !reflect.DeepEqual(newProxy, oldProxy) {
toBeUpdated[key] = newProxy
}
}
for key, oldProxy := range r.proxies {
if _, ok := newProxies[key]; !ok {
toBeDeleted[key] = oldProxy
}
}
log.Infof("ReconcileProxies, toBeUpdated: %d, toBeDeleted: %d",
len(toBeUpdated), len(toBeDeleted))
needNotify := false
for k := range toBeDeleted {
r.Cache.DeleteProxyWrapper(k)
delete(r.proxies, k)
needNotify = true
}
for k, v := range toBeUpdated {
proxyWrapper := proxy.BuildProxyWrapper(v)
if proxyWrapper == nil {
continue
}
r.Cache.UpdateProxyWrapper(k, proxyWrapper)
r.proxies[k] = v
needNotify = true
}
if needNotify {
r.serviceUpdate()
}
log.Infof("Proxies are reconciled")
return nil
}
func (r *Reconciler) GetMcpServers() []*higressmcpserver.McpServer {
mcpServersFromMcp := r.GetAllConfigs(higressmcpserver.GvkMcpServer)
servers := make([]*higressmcpserver.McpServer, 0, len(mcpServersFromMcp))