mirror of
https://github.com/alibaba/higress.git
synced 2026-05-31 08:07:26 +08:00
Feat: upgrade gateway api to latest (#3160)
This commit is contained in:
@@ -23,7 +23,7 @@ import (
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/apimachinery/pkg/util/intstr"
|
||||
inferencev1alpha2 "sigs.k8s.io/gateway-api-inference-extension/api/v1alpha2"
|
||||
inferencev1 "sigs.k8s.io/gateway-api-inference-extension/api/v1"
|
||||
gatewayv1 "sigs.k8s.io/gateway-api/apis/v1"
|
||||
gateway "sigs.k8s.io/gateway-api/apis/v1beta1"
|
||||
|
||||
@@ -63,11 +63,17 @@ func getSupportedControllers() sets.Set[gatewayv1.GatewayController] {
|
||||
}
|
||||
|
||||
type shadowServiceInfo struct {
|
||||
key types.NamespacedName
|
||||
selector map[string]string
|
||||
poolName string
|
||||
poolUID types.UID
|
||||
targetPort int32
|
||||
key types.NamespacedName
|
||||
selector map[string]string
|
||||
poolName string
|
||||
poolUID types.UID
|
||||
// targetPorts is the port number on the pods selected by the selector.
|
||||
// Currently, inference extension only supports a single target port.
|
||||
targetPorts []targetPort
|
||||
}
|
||||
|
||||
type targetPort struct {
|
||||
port int32
|
||||
}
|
||||
|
||||
type extRefInfo struct {
|
||||
@@ -87,19 +93,19 @@ func (i InferencePool) ResourceName() string {
|
||||
}
|
||||
|
||||
func InferencePoolCollection(
|
||||
pools krt.Collection[*inferencev1alpha2.InferencePool],
|
||||
pools krt.Collection[*inferencev1.InferencePool],
|
||||
services krt.Collection[*corev1.Service],
|
||||
httpRoutes krt.Collection[*gateway.HTTPRoute],
|
||||
gateways krt.Collection[*gateway.Gateway],
|
||||
routesByInferencePool krt.Index[string, *gateway.HTTPRoute],
|
||||
c *Controller,
|
||||
opts krt.OptionsBuilder,
|
||||
) (krt.StatusCollection[*inferencev1alpha2.InferencePool, inferencev1alpha2.InferencePoolStatus], krt.Collection[InferencePool]) {
|
||||
) (krt.StatusCollection[*inferencev1.InferencePool, inferencev1.InferencePoolStatus], krt.Collection[InferencePool]) {
|
||||
return krt.NewStatusCollection(pools,
|
||||
func(
|
||||
ctx krt.HandlerContext,
|
||||
pool *inferencev1alpha2.InferencePool,
|
||||
) (*inferencev1alpha2.InferencePoolStatus, *InferencePool) {
|
||||
pool *inferencev1.InferencePool,
|
||||
) (*inferencev1.InferencePoolStatus, *InferencePool) {
|
||||
// Fetch HTTPRoutes that reference this InferencePool once and reuse
|
||||
routeList := krt.Fetch(ctx, httpRoutes, krt.FilterIndex(routesByInferencePool, pool.Namespace+"/"+pool.Name))
|
||||
|
||||
@@ -107,7 +113,7 @@ func InferencePoolCollection(
|
||||
gatewayParents := findGatewayParents(pool, routeList)
|
||||
|
||||
// TODO: If no gateway parents, we should not do anything
|
||||
// note: we stil need to filter out our Status to clean up previous reconciliations
|
||||
// note: we still need to filter out our Status to clean up previous reconciliations
|
||||
|
||||
// Create the InferencePool only if there are Gateways connected
|
||||
var inferencePool *InferencePool
|
||||
@@ -124,20 +130,21 @@ func InferencePoolCollection(
|
||||
}
|
||||
|
||||
// createInferencePoolObject creates the InferencePool object with shadow service and extension ref info
|
||||
func createInferencePoolObject(pool *inferencev1alpha2.InferencePool, gatewayParents sets.Set[types.NamespacedName]) *InferencePool {
|
||||
func createInferencePoolObject(pool *inferencev1.InferencePool, gatewayParents sets.Set[types.NamespacedName]) *InferencePool {
|
||||
// Build extension reference info
|
||||
extRef := extRefInfo{
|
||||
name: string(pool.Spec.ExtensionRef.Name),
|
||||
name: string(pool.Spec.EndpointPickerRef.Name),
|
||||
}
|
||||
if pool.Spec.ExtensionRef.PortNumber != nil {
|
||||
extRef.port = int32(*pool.Spec.ExtensionRef.PortNumber)
|
||||
} else {
|
||||
extRef.port = 9002 // Default port for the inference extension
|
||||
|
||||
if pool.Spec.EndpointPickerRef.Port == nil {
|
||||
log.Errorf("invalid InferencePool %s/%s; endpointPickerRef port is required", pool.Namespace, pool.Name)
|
||||
return nil
|
||||
}
|
||||
if pool.Spec.ExtensionRef.FailureMode != nil {
|
||||
extRef.failureMode = string(*pool.Spec.ExtensionRef.FailureMode)
|
||||
} else {
|
||||
extRef.failureMode = string(inferencev1alpha2.FailClose)
|
||||
extRef.port = int32(pool.Spec.EndpointPickerRef.Port.Number)
|
||||
|
||||
extRef.failureMode = string(inferencev1.EndpointPickerFailClose) // Default failure mode
|
||||
if pool.Spec.EndpointPickerRef.FailureMode != inferencev1.EndpointPickerFailClose {
|
||||
extRef.failureMode = string(pool.Spec.EndpointPickerRef.FailureMode)
|
||||
}
|
||||
|
||||
svcName, err := InferencePoolServiceName(pool.Name)
|
||||
@@ -151,16 +158,20 @@ func createInferencePoolObject(pool *inferencev1alpha2.InferencePool, gatewayPar
|
||||
Name: svcName,
|
||||
Namespace: pool.GetNamespace(),
|
||||
},
|
||||
selector: make(map[string]string, len(pool.Spec.Selector)),
|
||||
poolName: pool.GetName(),
|
||||
targetPort: pool.Spec.TargetPortNumber,
|
||||
poolUID: pool.GetUID(),
|
||||
selector: make(map[string]string, len(pool.Spec.Selector.MatchLabels)),
|
||||
poolName: pool.GetName(),
|
||||
targetPorts: make([]targetPort, 0, len(pool.Spec.TargetPorts)),
|
||||
poolUID: pool.GetUID(),
|
||||
}
|
||||
|
||||
for k, v := range pool.Spec.Selector {
|
||||
for k, v := range pool.Spec.Selector.MatchLabels {
|
||||
shadowSvcInfo.selector[string(k)] = string(v)
|
||||
}
|
||||
|
||||
for _, port := range pool.Spec.TargetPorts {
|
||||
shadowSvcInfo.targetPorts = append(shadowSvcInfo.targetPorts, targetPort{port: int32(port.Number)})
|
||||
}
|
||||
|
||||
return &InferencePool{
|
||||
shadowService: shadowSvcInfo,
|
||||
extRef: extRef,
|
||||
@@ -170,22 +181,22 @@ func createInferencePoolObject(pool *inferencev1alpha2.InferencePool, gatewayPar
|
||||
|
||||
// calculateInferencePoolStatus calculates the complete status for an InferencePool
|
||||
func calculateInferencePoolStatus(
|
||||
pool *inferencev1alpha2.InferencePool,
|
||||
pool *inferencev1.InferencePool,
|
||||
gatewayParents sets.Set[types.NamespacedName],
|
||||
services krt.Collection[*corev1.Service],
|
||||
gateways krt.Collection[*gateway.Gateway],
|
||||
routeList []*gateway.HTTPRoute,
|
||||
) *inferencev1alpha2.InferencePoolStatus {
|
||||
) *inferencev1.InferencePoolStatus {
|
||||
// Calculate status for each gateway parent
|
||||
existingParents := pool.Status.DeepCopy().Parents
|
||||
finalParents := []inferencev1alpha2.PoolStatus{}
|
||||
finalParents := []inferencev1.ParentStatus{}
|
||||
|
||||
// Add existing parents from other controllers (not managed by us)
|
||||
for _, existingParent := range existingParents {
|
||||
gtwName := string(existingParent.GatewayRef.Name)
|
||||
gtwName := string(existingParent.ParentRef.Name)
|
||||
gtwNamespace := pool.Namespace
|
||||
if existingParent.GatewayRef.Namespace != nil {
|
||||
gtwNamespace = string(*existingParent.GatewayRef.Namespace)
|
||||
if existingParent.ParentRef.Namespace != "" {
|
||||
gtwNamespace = string(existingParent.ParentRef.Namespace)
|
||||
}
|
||||
parentKey := types.NamespacedName{
|
||||
Name: gtwName,
|
||||
@@ -208,14 +219,14 @@ func calculateInferencePoolStatus(
|
||||
finalParents = append(finalParents, parentStatus)
|
||||
}
|
||||
|
||||
return &inferencev1alpha2.InferencePoolStatus{
|
||||
return &inferencev1.InferencePoolStatus{
|
||||
Parents: finalParents,
|
||||
}
|
||||
}
|
||||
|
||||
// findGatewayParents finds all Gateway parents that reference this InferencePool through HTTPRoutes
|
||||
func findGatewayParents(
|
||||
pool *inferencev1alpha2.InferencePool,
|
||||
pool *inferencev1.InferencePool,
|
||||
routeList []*gateway.HTTPRoute,
|
||||
) sets.Set[types.NamespacedName] {
|
||||
gatewayParents := sets.New[types.NamespacedName]()
|
||||
@@ -251,7 +262,7 @@ func findGatewayParents(
|
||||
}
|
||||
|
||||
// routeReferencesInferencePool checks if an HTTPRoute references the given InferencePool
|
||||
func routeReferencesInferencePool(route *gateway.HTTPRoute, pool *inferencev1alpha2.InferencePool) bool {
|
||||
func routeReferencesInferencePool(route *gateway.HTTPRoute, pool *inferencev1.InferencePool) bool {
|
||||
for _, rule := range route.Spec.Rules {
|
||||
for _, backendRef := range rule.BackendRefs {
|
||||
if !isInferencePoolBackendRef(backendRef.BackendRef) {
|
||||
@@ -285,17 +296,17 @@ func isInferencePoolBackendRef(backendRef gatewayv1.BackendRef) bool {
|
||||
|
||||
// calculateSingleParentStatus calculates the status for a single gateway parent
|
||||
func calculateSingleParentStatus(
|
||||
pool *inferencev1alpha2.InferencePool,
|
||||
pool *inferencev1.InferencePool,
|
||||
gatewayParent types.NamespacedName,
|
||||
services krt.Collection[*corev1.Service],
|
||||
existingParents []inferencev1alpha2.PoolStatus,
|
||||
existingParents []inferencev1.ParentStatus,
|
||||
routeList []*gateway.HTTPRoute,
|
||||
) inferencev1alpha2.PoolStatus {
|
||||
) inferencev1.ParentStatus {
|
||||
// Find existing status for this parent to preserve some conditions
|
||||
var existingConditions []metav1.Condition
|
||||
for _, existingParent := range existingParents {
|
||||
if string(existingParent.GatewayRef.Name) == gatewayParent.Name &&
|
||||
string(ptr.OrEmpty(existingParent.GatewayRef.Namespace)) == gatewayParent.Namespace {
|
||||
if string(existingParent.ParentRef.Name) == gatewayParent.Name &&
|
||||
string(existingParent.ParentRef.Namespace) == gatewayParent.Namespace {
|
||||
existingConditions = existingParent.Conditions
|
||||
break
|
||||
}
|
||||
@@ -303,8 +314,8 @@ func calculateSingleParentStatus(
|
||||
|
||||
// Filter to only keep conditions we manage
|
||||
filteredConditions := filterUsedConditions(existingConditions,
|
||||
inferencev1alpha2.InferencePoolConditionAccepted,
|
||||
inferencev1alpha2.InferencePoolConditionResolvedRefs)
|
||||
inferencev1.InferencePoolConditionAccepted,
|
||||
inferencev1.InferencePoolConditionResolvedRefs)
|
||||
|
||||
// Calculate Accepted status by checking HTTPRoute parent status
|
||||
acceptedStatus := calculateAcceptedStatus(pool, gatewayParent, routeList)
|
||||
@@ -313,23 +324,23 @@ func calculateSingleParentStatus(
|
||||
resolvedRefsStatus := calculateResolvedRefsStatus(pool, services)
|
||||
|
||||
// Build the final status
|
||||
return inferencev1alpha2.PoolStatus{
|
||||
GatewayRef: inferencev1alpha2.ParentGatewayReference{
|
||||
Group: (*inferencev1alpha2.Group)(&gvk.Gateway.Group),
|
||||
Kind: (*inferencev1alpha2.Kind)(&gvk.Gateway.Kind),
|
||||
Namespace: (*inferencev1alpha2.Namespace)(&gatewayParent.Namespace),
|
||||
Name: inferencev1alpha2.ObjectName(gatewayParent.Name),
|
||||
return inferencev1.ParentStatus{
|
||||
ParentRef: inferencev1.ParentReference{
|
||||
Group: (*inferencev1.Group)(&gvk.Gateway.Group),
|
||||
Kind: inferencev1.Kind(gvk.Gateway.Kind),
|
||||
Namespace: inferencev1.Namespace(gatewayParent.Namespace),
|
||||
Name: inferencev1.ObjectName(gatewayParent.Name),
|
||||
},
|
||||
Conditions: setConditions(pool.Generation, filteredConditions, map[string]*condition{
|
||||
string(inferencev1alpha2.InferencePoolConditionAccepted): acceptedStatus,
|
||||
string(inferencev1alpha2.InferencePoolConditionResolvedRefs): resolvedRefsStatus,
|
||||
string(inferencev1.InferencePoolConditionAccepted): acceptedStatus,
|
||||
string(inferencev1.InferencePoolConditionResolvedRefs): resolvedRefsStatus,
|
||||
}),
|
||||
}
|
||||
}
|
||||
|
||||
// calculateAcceptedStatus determines if the InferencePool is accepted by checking HTTPRoute parent status
|
||||
func calculateAcceptedStatus(
|
||||
pool *inferencev1alpha2.InferencePool,
|
||||
pool *inferencev1.InferencePool,
|
||||
gatewayParent types.NamespacedName,
|
||||
routeList []*gateway.HTTPRoute,
|
||||
) *condition {
|
||||
@@ -359,13 +370,13 @@ func calculateAcceptedStatus(
|
||||
if parentCondition.Type == string(gatewayv1.RouteConditionAccepted) {
|
||||
if parentCondition.Status == metav1.ConditionTrue {
|
||||
return &condition{
|
||||
reason: string(inferencev1alpha2.InferencePoolReasonAccepted),
|
||||
reason: string(inferencev1.InferencePoolReasonAccepted),
|
||||
status: metav1.ConditionTrue,
|
||||
message: "Referenced by an HTTPRoute accepted by the parentRef Gateway",
|
||||
}
|
||||
}
|
||||
return &condition{
|
||||
reason: string(inferencev1alpha2.InferencePoolReasonHTTPRouteNotAccepted),
|
||||
reason: string(inferencev1.InferencePoolReasonHTTPRouteNotAccepted),
|
||||
status: metav1.ConditionFalse,
|
||||
message: fmt.Sprintf("Referenced HTTPRoute %s/%s not accepted by Gateway %s/%s: %s",
|
||||
route.Namespace, route.Name, gatewayParent.Namespace, gatewayParent.Name, parentCondition.Message),
|
||||
@@ -375,7 +386,7 @@ func calculateAcceptedStatus(
|
||||
|
||||
// If no Accepted condition found, treat as unknown (parent is listed in status)
|
||||
return &condition{
|
||||
reason: string(inferencev1alpha2.InferencePoolReasonAccepted),
|
||||
reason: string(inferencev1.InferencePoolReasonAccepted),
|
||||
status: metav1.ConditionUnknown,
|
||||
message: "Referenced by an HTTPRoute unknown parentRef Gateway status",
|
||||
}
|
||||
@@ -386,7 +397,7 @@ func calculateAcceptedStatus(
|
||||
// If we get here, no HTTPRoute was found that references this InferencePool with this gateway as parent
|
||||
// This shouldn't happen in normal operation since we only call this for known gateway parents
|
||||
return &condition{
|
||||
reason: string(inferencev1alpha2.InferencePoolReasonHTTPRouteNotAccepted),
|
||||
reason: string(inferencev1.InferencePoolReasonHTTPRouteNotAccepted),
|
||||
status: metav1.ConditionFalse,
|
||||
message: fmt.Sprintf("No HTTPRoute found referencing this InferencePool with Gateway %s/%s as parent",
|
||||
gatewayParent.Namespace, gatewayParent.Name),
|
||||
@@ -398,42 +409,51 @@ func calculateAcceptedStatus(
|
||||
// * if the extensionRef is defined
|
||||
// * if the service exists in the same namespace as the InferencePool
|
||||
func calculateResolvedRefsStatus(
|
||||
pool *inferencev1alpha2.InferencePool,
|
||||
pool *inferencev1.InferencePool,
|
||||
services krt.Collection[*corev1.Service],
|
||||
) *condition {
|
||||
// defaults to service
|
||||
if pool.Spec.ExtensionRef.Kind != nil && string(*pool.Spec.ExtensionRef.Kind) != gvk.Service.Kind {
|
||||
// Default Kind to Service if unset
|
||||
kind := string(pool.Spec.EndpointPickerRef.Kind)
|
||||
if kind == "" {
|
||||
kind = gvk.Service.Kind
|
||||
}
|
||||
|
||||
if kind != gvk.Service.Kind {
|
||||
return &condition{
|
||||
reason: string(inferencev1alpha2.InferencePoolReasonInvalidExtensionRef),
|
||||
reason: string(inferencev1.InferencePoolReasonInvalidExtensionRef),
|
||||
status: metav1.ConditionFalse,
|
||||
message: "Unsupported ExtensionRef kind " + string(*pool.Spec.ExtensionRef.Kind),
|
||||
message: "Unsupported ExtensionRef kind " + kind,
|
||||
}
|
||||
}
|
||||
if string(pool.Spec.ExtensionRef.Name) == "" {
|
||||
|
||||
name := string(pool.Spec.EndpointPickerRef.Name)
|
||||
if name == "" {
|
||||
return &condition{
|
||||
reason: string(inferencev1alpha2.InferencePoolReasonInvalidExtensionRef),
|
||||
reason: string(inferencev1.InferencePoolReasonInvalidExtensionRef),
|
||||
status: metav1.ConditionFalse,
|
||||
message: "ExtensionRef not defined",
|
||||
}
|
||||
}
|
||||
svc := ptr.Flatten(services.GetKey(fmt.Sprintf("%s/%s", pool.Namespace, pool.Spec.ExtensionRef.Name)))
|
||||
|
||||
svc := ptr.Flatten(services.GetKey(fmt.Sprintf("%s/%s", pool.Namespace, name)))
|
||||
if svc == nil {
|
||||
return &condition{
|
||||
reason: string(inferencev1alpha2.InferencePoolReasonInvalidExtensionRef),
|
||||
reason: string(inferencev1.InferencePoolReasonInvalidExtensionRef),
|
||||
status: metav1.ConditionFalse,
|
||||
message: "Referenced ExtensionRef not found " + string(pool.Spec.ExtensionRef.Name),
|
||||
message: "Referenced ExtensionRef not found " + name,
|
||||
}
|
||||
}
|
||||
|
||||
return &condition{
|
||||
reason: string(inferencev1alpha2.InferencePoolConditionResolvedRefs),
|
||||
reason: string(inferencev1.InferencePoolReasonResolvedRefs),
|
||||
status: metav1.ConditionTrue,
|
||||
message: "Referenced ExtensionRef resolved successfully",
|
||||
}
|
||||
}
|
||||
|
||||
// isDefaultStatusParent checks if this is a default status parent entry
|
||||
func isDefaultStatusParent(parent inferencev1alpha2.PoolStatus) bool {
|
||||
return string(ptr.OrEmpty(parent.GatewayRef.Kind)) == "Status" && parent.GatewayRef.Name == "default"
|
||||
func isDefaultStatusParent(parent inferencev1.ParentStatus) bool {
|
||||
return string(parent.ParentRef.Kind) == "Status" && parent.ParentRef.Name == "default"
|
||||
}
|
||||
|
||||
// isOurManagedGateway checks if a Gateway is managed by one of our supported controllers
|
||||
@@ -447,10 +467,10 @@ func isOurManagedGateway(gateways krt.Collection[*gateway.Gateway], namespace, n
|
||||
return ok
|
||||
}
|
||||
|
||||
func filterUsedConditions(conditions []metav1.Condition, usedConditions ...inferencev1alpha2.InferencePoolConditionType) []metav1.Condition {
|
||||
func filterUsedConditions(conditions []metav1.Condition, usedConditions ...inferencev1.InferencePoolConditionType) []metav1.Condition {
|
||||
var result []metav1.Condition
|
||||
for _, condition := range conditions {
|
||||
if slices.Contains(usedConditions, inferencev1alpha2.InferencePoolConditionType(condition.Type)) {
|
||||
if slices.Contains(usedConditions, inferencev1.InferencePoolConditionType(condition.Type)) {
|
||||
result = append(result, condition)
|
||||
}
|
||||
}
|
||||
@@ -484,6 +504,18 @@ func InferencePoolServiceName(poolName string) (string, error) {
|
||||
}
|
||||
|
||||
func translateShadowServiceToService(existingLabels map[string]string, shadow shadowServiceInfo, extRef extRefInfo) *corev1.Service {
|
||||
// Create the ports used by the shadow service
|
||||
ports := make([]corev1.ServicePort, 0, len(shadow.targetPorts))
|
||||
dummyPort := int32(54321) // Dummy port, not used for anything
|
||||
for i, port := range shadow.targetPorts {
|
||||
ports = append(ports, corev1.ServicePort{
|
||||
Name: "port" + strconv.Itoa(i),
|
||||
Protocol: corev1.ProtocolTCP,
|
||||
Port: dummyPort + int32(i),
|
||||
TargetPort: intstr.FromInt(int(port.port)),
|
||||
})
|
||||
}
|
||||
|
||||
// Create a new service object based on the shadow service info
|
||||
svc := &corev1.Service{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
@@ -501,13 +533,7 @@ func translateShadowServiceToService(existingLabels map[string]string, shadow sh
|
||||
Selector: shadow.selector,
|
||||
Type: corev1.ServiceTypeClusterIP,
|
||||
ClusterIP: corev1.ClusterIPNone, // Headless service
|
||||
Ports: []corev1.ServicePort{ // adding dummy port, not used for anything
|
||||
{
|
||||
Protocol: "TCP",
|
||||
Port: int32(54321),
|
||||
TargetPort: intstr.FromInt(int(shadow.targetPort)),
|
||||
},
|
||||
},
|
||||
Ports: ports,
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user