mirror of
https://github.com/alibaba/higress.git
synced 2026-05-28 14:47:29 +08:00
feat: Support status sync for Gateway API resources (#1315)
This commit is contained in:
@@ -15,26 +15,37 @@
|
||||
package istio
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"sort"
|
||||
"strconv"
|
||||
"strings"
|
||||
|
||||
networking "istio.io/api/networking/v1alpha3"
|
||||
"istio.io/istio/pilot/pkg/model"
|
||||
serviceRegistryKube "istio.io/istio/pilot/pkg/serviceregistry/kube"
|
||||
"istio.io/istio/pkg/cluster"
|
||||
"istio.io/istio/pkg/config/host"
|
||||
"istio.io/istio/pkg/config/schema/gvk"
|
||||
"istio.io/istio/pkg/kube"
|
||||
"istio.io/istio/pkg/util/sets"
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
kerrors "k8s.io/apimachinery/pkg/api/errors"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
)
|
||||
|
||||
// GatewayContext contains a minimal subset of push context functionality to be exposed to GatewayAPIControllers
|
||||
type GatewayContext struct {
|
||||
ps *model.PushContext
|
||||
// Start - Updated by Higress
|
||||
client kube.Client
|
||||
domainSuffix string
|
||||
clusterID cluster.ID
|
||||
// End - Updated by Higress
|
||||
}
|
||||
|
||||
func NewGatewayContext(ps *model.PushContext) GatewayContext {
|
||||
return GatewayContext{ps}
|
||||
// Start - Updated by Higress
|
||||
|
||||
func NewGatewayContext(ps *model.PushContext, client kube.Client, domainSuffix string, clusterID cluster.ID) GatewayContext {
|
||||
return GatewayContext{ps, client, domainSuffix, clusterID}
|
||||
}
|
||||
|
||||
// ResolveGatewayInstances attempts to resolve all instances that a gateway will be exposed on.
|
||||
@@ -59,26 +70,20 @@ func (gc GatewayContext) ResolveGatewayInstances(
|
||||
foundExternal := sets.New[string]()
|
||||
foundPending := sets.New[string]()
|
||||
warnings := []string{}
|
||||
|
||||
// Cache endpoints to reduce redundant queries
|
||||
endpointsCache := make(map[string]*corev1.Endpoints)
|
||||
|
||||
for _, g := range gwsvcs {
|
||||
svc, f := gc.ps.ServiceIndex.HostnameAndNamespace[host.Name(g)][namespace]
|
||||
if !f {
|
||||
otherNamespaces := []string{}
|
||||
for ns := range gc.ps.ServiceIndex.HostnameAndNamespace[host.Name(g)] {
|
||||
otherNamespaces = append(otherNamespaces, `"`+ns+`"`) // Wrap in quotes for output
|
||||
}
|
||||
if len(otherNamespaces) > 0 {
|
||||
sort.Strings(otherNamespaces)
|
||||
warnings = append(warnings, fmt.Sprintf("hostname %q not found in namespace %q, but it was found in namespace(s) %v",
|
||||
g, namespace, strings.Join(otherNamespaces, ", ")))
|
||||
} else {
|
||||
warnings = append(warnings, fmt.Sprintf("hostname %q not found", g))
|
||||
}
|
||||
svc := gc.GetService(g, namespace, gvk.Service.Kind)
|
||||
if svc == nil {
|
||||
warnings = append(warnings, fmt.Sprintf("hostname %q not found", g))
|
||||
continue
|
||||
}
|
||||
svcKey := svc.Key()
|
||||
|
||||
for port := range ports {
|
||||
instances := gc.ps.ServiceInstancesByPort(svc, port, nil)
|
||||
if len(instances) > 0 {
|
||||
exists := checkServicePortExists(svc, port)
|
||||
if exists {
|
||||
foundInternal.Insert(fmt.Sprintf("%s:%d", g, port))
|
||||
if svc.Attributes.ClusterExternalAddresses.Len() > 0 {
|
||||
// Fetch external IPs from all clusters
|
||||
@@ -92,22 +97,30 @@ func (gc GatewayContext) ResolveGatewayInstances(
|
||||
}
|
||||
}
|
||||
} else {
|
||||
instancesByPort := gc.ps.ServiceInstances(svcKey)
|
||||
if instancesEmpty(instancesByPort) {
|
||||
endpoints, ok := endpointsCache[g]
|
||||
if !ok {
|
||||
endpoints = gc.GetEndpoints(g, namespace)
|
||||
endpointsCache[g] = endpoints
|
||||
}
|
||||
|
||||
if endpoints == nil {
|
||||
warnings = append(warnings, fmt.Sprintf("no instances found for hostname %q", g))
|
||||
} else {
|
||||
hintPort := sets.New[string]()
|
||||
for _, instances := range instancesByPort {
|
||||
for _, i := range instances {
|
||||
if i.Endpoint.EndpointPort == uint32(port) {
|
||||
hintPort.Insert(strconv.Itoa(i.ServicePort.Port))
|
||||
hintWorkloadPort := false
|
||||
for _, subset := range endpoints.Subsets {
|
||||
for _, subSetPort := range subset.Ports {
|
||||
if subSetPort.Port == int32(port) {
|
||||
hintWorkloadPort = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if hintWorkloadPort {
|
||||
break
|
||||
}
|
||||
}
|
||||
if hintPort.Len() > 0 {
|
||||
if hintWorkloadPort {
|
||||
warnings = append(warnings, fmt.Sprintf(
|
||||
"port %d not found for hostname %q (hint: the service port should be specified, not the workload port. Did you mean one of these ports: %v?)",
|
||||
port, g, sets.SortedList(hintPort)))
|
||||
"port %d not found for hostname %q (hint: the service port should be specified, not the workload port", port, g))
|
||||
} else {
|
||||
warnings = append(warnings, fmt.Sprintf("port %d not found for hostname %q", port, g))
|
||||
}
|
||||
@@ -119,15 +132,60 @@ func (gc GatewayContext) ResolveGatewayInstances(
|
||||
return sets.SortedList(foundInternal), sets.SortedList(foundExternal), sets.SortedList(foundPending), warnings
|
||||
}
|
||||
|
||||
func (gc GatewayContext) GetService(hostname, namespace string) *model.Service {
|
||||
return gc.ps.ServiceIndex.HostnameAndNamespace[host.Name(hostname)][namespace]
|
||||
func (gc GatewayContext) GetService(hostname, namespace, kind string) *model.Service {
|
||||
// Currently only supports type Kubernetes Service
|
||||
if kind != gvk.Service.Kind {
|
||||
log.Warnf("Unsupported kind: expected 'Service', but got '%s'", kind)
|
||||
return nil
|
||||
}
|
||||
serviceName := extractServiceName(hostname)
|
||||
|
||||
svc, err := gc.client.Kube().CoreV1().Services(namespace).Get(context.TODO(), serviceName, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
if kerrors.IsNotFound(err) {
|
||||
return nil
|
||||
}
|
||||
log.Errorf("failed to get service (serviceName: %s, namespace: %s): %v", serviceName, namespace, err)
|
||||
return nil
|
||||
}
|
||||
|
||||
return serviceRegistryKube.ConvertService(*svc, gc.domainSuffix, gc.clusterID)
|
||||
}
|
||||
|
||||
func instancesEmpty(m map[int][]*model.ServiceInstance) bool {
|
||||
for _, instances := range m {
|
||||
if len(instances) > 0 {
|
||||
return false
|
||||
func (gc GatewayContext) GetEndpoints(hostname, namespace string) *corev1.Endpoints {
|
||||
serviceName := extractServiceName(hostname)
|
||||
|
||||
endpoints, err := gc.client.Kube().CoreV1().Endpoints(namespace).Get(context.TODO(), serviceName, metav1.GetOptions{})
|
||||
|
||||
if err != nil {
|
||||
if kerrors.IsNotFound(err) {
|
||||
return nil
|
||||
}
|
||||
log.Errorf("failed to get endpoints (serviceName: %s, namespace: %s): %v", serviceName, namespace, err)
|
||||
return nil
|
||||
}
|
||||
|
||||
return endpoints
|
||||
}
|
||||
|
||||
func checkServicePortExists(svc *model.Service, port int) bool {
|
||||
if svc == nil {
|
||||
return false
|
||||
}
|
||||
for _, svcPort := range svc.Ports {
|
||||
if port == svcPort.Port {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return true
|
||||
return false
|
||||
}
|
||||
|
||||
func extractServiceName(hostName string) string {
|
||||
parts := strings.Split(hostName, ".")
|
||||
if len(parts) >= 4 {
|
||||
return parts[0]
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
// End - Updated by Higress
|
||||
|
||||
Reference in New Issue
Block a user