// Copyright Istio Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package istio import ( "crypto/sha256" "fmt" "strconv" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/intstr" inferencev1 "sigs.k8s.io/gateway-api-inference-extension/api/v1" gatewayv1 "sigs.k8s.io/gateway-api/apis/v1" gateway "sigs.k8s.io/gateway-api/apis/v1beta1" "istio.io/istio/pkg/config/constants" "istio.io/istio/pkg/config/schema/gvk" "istio.io/istio/pkg/kube/kclient" "istio.io/istio/pkg/kube/krt" "istio.io/istio/pkg/maps" "istio.io/istio/pkg/ptr" "istio.io/istio/pkg/slices" "istio.io/istio/pkg/util/sets" ) const ( maxServiceNameLength = 63 hashSize = 8 InferencePoolRefLabel = "higress.io/inferencepool-name" InferencePoolExtensionRefSvc = "higress.io/inferencepool-extension-service" InferencePoolExtensionRefPort = "higress.io/inferencepool-extension-port" InferencePoolExtensionRefFailureMode = "higress.io/inferencepool-extension-failure-mode" ) // // ManagedLabel is the label used to identify resources managed by this controller // const ManagedLabel = "inference.x-k8s.io/managed-by" // ControllerName is the name of this controller for labeling resources it manages const ControllerName = "inference-controller" var supportedControllers = getSupportedControllers() func getSupportedControllers() sets.Set[gatewayv1.GatewayController] { ret := sets.New[gatewayv1.GatewayController]() for _, controller := range builtinClasses { ret.Insert(controller) } return ret } type shadowServiceInfo struct { key types.NamespacedName selector map[string]string poolName string poolUID types.UID // targetPorts is the port number on the pods selected by the selector. // Currently, inference extension only supports a single target port. targetPorts []targetPort } type targetPort struct { port int32 } type extRefInfo struct { name string port int32 failureMode string } type InferencePool struct { shadowService shadowServiceInfo extRef extRefInfo gatewayParents sets.Set[types.NamespacedName] // Gateways that reference this InferencePool } func (i InferencePool) ResourceName() string { return i.shadowService.key.Namespace + "/" + i.shadowService.poolName } func InferencePoolCollection( pools krt.Collection[*inferencev1.InferencePool], services krt.Collection[*corev1.Service], httpRoutes krt.Collection[*gateway.HTTPRoute], gateways krt.Collection[*gateway.Gateway], routesByInferencePool krt.Index[string, *gateway.HTTPRoute], c *Controller, opts krt.OptionsBuilder, ) (krt.StatusCollection[*inferencev1.InferencePool, inferencev1.InferencePoolStatus], krt.Collection[InferencePool]) { return krt.NewStatusCollection(pools, func( ctx krt.HandlerContext, pool *inferencev1.InferencePool, ) (*inferencev1.InferencePoolStatus, *InferencePool) { // Fetch HTTPRoutes that reference this InferencePool once and reuse routeList := krt.Fetch(ctx, httpRoutes, krt.FilterIndex(routesByInferencePool, pool.Namespace+"/"+pool.Name)) // Find gateway parents that reference this InferencePool through HTTPRoutes gatewayParents := findGatewayParents(pool, routeList) // TODO: If no gateway parents, we should not do anything // note: we still need to filter out our Status to clean up previous reconciliations // Create the InferencePool only if there are Gateways connected var inferencePool *InferencePool if len(gatewayParents) > 0 { // Create the InferencePool object inferencePool = createInferencePoolObject(pool, gatewayParents) } // Calculate status status := calculateInferencePoolStatus(pool, gatewayParents, services, gateways, routeList) return status, inferencePool }, opts.WithName("InferenceExtension")...) } // createInferencePoolObject creates the InferencePool object with shadow service and extension ref info func createInferencePoolObject(pool *inferencev1.InferencePool, gatewayParents sets.Set[types.NamespacedName]) *InferencePool { // Build extension reference info extRef := extRefInfo{ name: string(pool.Spec.EndpointPickerRef.Name), } if pool.Spec.EndpointPickerRef.Port == nil { log.Errorf("invalid InferencePool %s/%s; endpointPickerRef port is required", pool.Namespace, pool.Name) return nil } extRef.port = int32(pool.Spec.EndpointPickerRef.Port.Number) extRef.failureMode = string(inferencev1.EndpointPickerFailClose) // Default failure mode if pool.Spec.EndpointPickerRef.FailureMode != inferencev1.EndpointPickerFailClose { extRef.failureMode = string(pool.Spec.EndpointPickerRef.FailureMode) } svcName, err := InferencePoolServiceName(pool.Name) if err != nil { log.Errorf("failed to generate service name for InferencePool %s: %v", pool.Name, err) return nil } shadowSvcInfo := shadowServiceInfo{ key: types.NamespacedName{ Name: svcName, Namespace: pool.GetNamespace(), }, selector: make(map[string]string, len(pool.Spec.Selector.MatchLabels)), poolName: pool.GetName(), targetPorts: make([]targetPort, 0, len(pool.Spec.TargetPorts)), poolUID: pool.GetUID(), } for k, v := range pool.Spec.Selector.MatchLabels { shadowSvcInfo.selector[string(k)] = string(v) } for _, port := range pool.Spec.TargetPorts { shadowSvcInfo.targetPorts = append(shadowSvcInfo.targetPorts, targetPort{port: int32(port.Number)}) } return &InferencePool{ shadowService: shadowSvcInfo, extRef: extRef, gatewayParents: gatewayParents, } } // calculateInferencePoolStatus calculates the complete status for an InferencePool func calculateInferencePoolStatus( pool *inferencev1.InferencePool, gatewayParents sets.Set[types.NamespacedName], services krt.Collection[*corev1.Service], gateways krt.Collection[*gateway.Gateway], routeList []*gateway.HTTPRoute, ) *inferencev1.InferencePoolStatus { // Calculate status for each gateway parent existingParents := pool.Status.DeepCopy().Parents finalParents := []inferencev1.ParentStatus{} // Add existing parents from other controllers (not managed by us) for _, existingParent := range existingParents { gtwName := string(existingParent.ParentRef.Name) gtwNamespace := pool.Namespace if existingParent.ParentRef.Namespace != "" { gtwNamespace = string(existingParent.ParentRef.Namespace) } parentKey := types.NamespacedName{ Name: gtwName, Namespace: gtwNamespace, } isCurrentlyOurs := gatewayParents.Contains(parentKey) // Keep parents that are not ours and not default status parents if !isCurrentlyOurs && !isOurManagedGateway(gateways, gtwNamespace, gtwName) && !isDefaultStatusParent(existingParent) { finalParents = append(finalParents, existingParent) } } // Calculate status for each of our gateway parents for gatewayParent := range gatewayParents { parentStatus := calculateSingleParentStatus(pool, gatewayParent, services, existingParents, routeList) finalParents = append(finalParents, parentStatus) } return &inferencev1.InferencePoolStatus{ Parents: finalParents, } } // findGatewayParents finds all Gateway parents that reference this InferencePool through HTTPRoutes func findGatewayParents( pool *inferencev1.InferencePool, routeList []*gateway.HTTPRoute, ) sets.Set[types.NamespacedName] { gatewayParents := sets.New[types.NamespacedName]() for _, route := range routeList { // Only process routes that reference our InferencePool if !routeReferencesInferencePool(route, pool) { continue } // Check the route's parent status to find accepted gateways for _, parentStatus := range route.Status.Parents { // Only consider parents managed by our supported controllers (from supportedControllers variable) // This filters out parents from other controllers we don't manage if !supportedControllers.Contains(parentStatus.ControllerName) { continue } // Get the gateway namespace (default to route namespace if not specified) gatewayNamespace := route.Namespace if ptr.OrEmpty(parentStatus.ParentRef.Namespace) != "" { gatewayNamespace = string(*parentStatus.ParentRef.Namespace) } gatewayParents.Insert(types.NamespacedName{ Name: string(parentStatus.ParentRef.Name), Namespace: gatewayNamespace, }) } } return gatewayParents } // routeReferencesInferencePool checks if an HTTPRoute references the given InferencePool func routeReferencesInferencePool(route *gateway.HTTPRoute, pool *inferencev1.InferencePool) bool { for _, rule := range route.Spec.Rules { for _, backendRef := range rule.BackendRefs { if !isInferencePoolBackendRef(backendRef.BackendRef) { continue } // Check if this backend ref points to our InferencePool if string(backendRef.BackendRef.Name) != pool.ObjectMeta.Name { continue } // Check namespace match backendRefNamespace := route.Namespace if ptr.OrEmpty(backendRef.BackendRef.Namespace) != "" { backendRefNamespace = string(*backendRef.BackendRef.Namespace) } if backendRefNamespace == pool.Namespace { return true } } } return false } // isInferencePoolBackendRef checks if a BackendRef is pointing to an InferencePool func isInferencePoolBackendRef(backendRef gatewayv1.BackendRef) bool { return ptr.OrEmpty(backendRef.Group) == gatewayv1.Group(gvk.InferencePool.Group) && ptr.OrEmpty(backendRef.Kind) == gatewayv1.Kind(gvk.InferencePool.Kind) } // calculateSingleParentStatus calculates the status for a single gateway parent func calculateSingleParentStatus( pool *inferencev1.InferencePool, gatewayParent types.NamespacedName, services krt.Collection[*corev1.Service], existingParents []inferencev1.ParentStatus, routeList []*gateway.HTTPRoute, ) inferencev1.ParentStatus { // Find existing status for this parent to preserve some conditions var existingConditions []metav1.Condition for _, existingParent := range existingParents { if string(existingParent.ParentRef.Name) == gatewayParent.Name && string(existingParent.ParentRef.Namespace) == gatewayParent.Namespace { existingConditions = existingParent.Conditions break } } // Filter to only keep conditions we manage filteredConditions := filterUsedConditions(existingConditions, inferencev1.InferencePoolConditionAccepted, inferencev1.InferencePoolConditionResolvedRefs) // Calculate Accepted status by checking HTTPRoute parent status acceptedStatus := calculateAcceptedStatus(pool, gatewayParent, routeList) // Calculate ResolvedRefs status resolvedRefsStatus := calculateResolvedRefsStatus(pool, services) // Build the final status return inferencev1.ParentStatus{ ParentRef: inferencev1.ParentReference{ Group: (*inferencev1.Group)(&gvk.Gateway.Group), Kind: inferencev1.Kind(gvk.Gateway.Kind), Namespace: inferencev1.Namespace(gatewayParent.Namespace), Name: inferencev1.ObjectName(gatewayParent.Name), }, Conditions: setConditions(pool.Generation, filteredConditions, map[string]*condition{ string(inferencev1.InferencePoolConditionAccepted): acceptedStatus, string(inferencev1.InferencePoolConditionResolvedRefs): resolvedRefsStatus, }), } } // calculateAcceptedStatus determines if the InferencePool is accepted by checking HTTPRoute parent status func calculateAcceptedStatus( pool *inferencev1.InferencePool, gatewayParent types.NamespacedName, routeList []*gateway.HTTPRoute, ) *condition { // Check if any HTTPRoute references this InferencePool and has this gateway as an accepted parent for _, route := range routeList { // Only process routes that reference our InferencePool if !routeReferencesInferencePool(route, pool) { continue } // Check if this route has our gateway as a parent and if it's accepted for _, parentStatus := range route.Status.Parents { // Only consider parents managed by supported controllers if !supportedControllers.Contains(parentStatus.ControllerName) { continue } // Check if this parent refers to our gateway gatewayNamespace := route.Namespace if ptr.OrEmpty(parentStatus.ParentRef.Namespace) != "" { gatewayNamespace = string(*parentStatus.ParentRef.Namespace) } if string(parentStatus.ParentRef.Name) == gatewayParent.Name && gatewayNamespace == gatewayParent.Namespace { // Check if this parent is accepted for _, parentCondition := range parentStatus.Conditions { if parentCondition.Type == string(gatewayv1.RouteConditionAccepted) { if parentCondition.Status == metav1.ConditionTrue { return &condition{ reason: string(inferencev1.InferencePoolReasonAccepted), status: metav1.ConditionTrue, message: "Referenced by an HTTPRoute accepted by the parentRef Gateway", } } return &condition{ reason: string(inferencev1.InferencePoolReasonHTTPRouteNotAccepted), status: metav1.ConditionFalse, message: fmt.Sprintf("Referenced HTTPRoute %s/%s not accepted by Gateway %s/%s: %s", route.Namespace, route.Name, gatewayParent.Namespace, gatewayParent.Name, parentCondition.Message), } } } // If no Accepted condition found, treat as unknown (parent is listed in status) return &condition{ reason: string(inferencev1.InferencePoolReasonAccepted), status: metav1.ConditionUnknown, message: "Referenced by an HTTPRoute unknown parentRef Gateway status", } } } } // If we get here, no HTTPRoute was found that references this InferencePool with this gateway as parent // This shouldn't happen in normal operation since we only call this for known gateway parents return &condition{ reason: string(inferencev1.InferencePoolReasonHTTPRouteNotAccepted), status: metav1.ConditionFalse, message: fmt.Sprintf("No HTTPRoute found referencing this InferencePool with Gateway %s/%s as parent", gatewayParent.Namespace, gatewayParent.Name), } } // calculateResolvedRefsStatus determines the states of the ExtensionRef // * if the kind is supported // * if the extensionRef is defined // * if the service exists in the same namespace as the InferencePool func calculateResolvedRefsStatus( pool *inferencev1.InferencePool, services krt.Collection[*corev1.Service], ) *condition { // Default Kind to Service if unset kind := string(pool.Spec.EndpointPickerRef.Kind) if kind == "" { kind = gvk.Service.Kind } if kind != gvk.Service.Kind { return &condition{ reason: string(inferencev1.InferencePoolReasonInvalidExtensionRef), status: metav1.ConditionFalse, message: "Unsupported ExtensionRef kind " + kind, } } name := string(pool.Spec.EndpointPickerRef.Name) if name == "" { return &condition{ reason: string(inferencev1.InferencePoolReasonInvalidExtensionRef), status: metav1.ConditionFalse, message: "ExtensionRef not defined", } } svc := ptr.Flatten(services.GetKey(fmt.Sprintf("%s/%s", pool.Namespace, name))) if svc == nil { return &condition{ reason: string(inferencev1.InferencePoolReasonInvalidExtensionRef), status: metav1.ConditionFalse, message: "Referenced ExtensionRef not found " + name, } } return &condition{ reason: string(inferencev1.InferencePoolReasonResolvedRefs), status: metav1.ConditionTrue, message: "Referenced ExtensionRef resolved successfully", } } // isDefaultStatusParent checks if this is a default status parent entry func isDefaultStatusParent(parent inferencev1.ParentStatus) bool { return string(parent.ParentRef.Kind) == "Status" && parent.ParentRef.Name == "default" } // isOurManagedGateway checks if a Gateway is managed by one of our supported controllers // This is used to identify stale parent entries that we previously added but are no longer referenced by HTTPRoutes func isOurManagedGateway(gateways krt.Collection[*gateway.Gateway], namespace, name string) bool { gtw := ptr.Flatten(gateways.GetKey(fmt.Sprintf("%s/%s", namespace, name))) if gtw == nil { return false } _, ok := builtinClasses[gtw.Spec.GatewayClassName] return ok } func filterUsedConditions(conditions []metav1.Condition, usedConditions ...inferencev1.InferencePoolConditionType) []metav1.Condition { var result []metav1.Condition for _, condition := range conditions { if slices.Contains(usedConditions, inferencev1.InferencePoolConditionType(condition.Type)) { result = append(result, condition) } } return result } // generateHash generates an 8-character SHA256 hash of the input string. func generateHash(input string, length int) string { hashBytes := sha256.Sum256([]byte(input)) hashString := fmt.Sprintf("%x", hashBytes) // Convert to hexadecimal string return hashString[:length] // Truncate to desired length } func InferencePoolServiceName(poolName string) (string, error) { ipSeparator := "-ip-" hash := generateHash(poolName, hashSize) svcName := poolName + ipSeparator + hash // Truncate if necessary to meet the Kubernetes naming constraints if len(svcName) > maxServiceNameLength { // Calculate the maximum allowed base name length maxBaseLength := maxServiceNameLength - len(ipSeparator) - hashSize if maxBaseLength < 0 { return "", fmt.Errorf("inference pool name: %s is too long", poolName) } // Truncate the base name and reconstruct the service name truncatedBase := poolName[:maxBaseLength] svcName = truncatedBase + ipSeparator + hash } return svcName, nil } func translateShadowServiceToService(existingLabels map[string]string, shadow shadowServiceInfo, extRef extRefInfo) *corev1.Service { // Create the ports used by the shadow service ports := make([]corev1.ServicePort, 0, len(shadow.targetPorts)) dummyPort := int32(54321) // Dummy port, not used for anything for i, port := range shadow.targetPorts { ports = append(ports, corev1.ServicePort{ Name: "port" + strconv.Itoa(i), Protocol: corev1.ProtocolTCP, Port: dummyPort + int32(i), TargetPort: intstr.FromInt(int(port.port)), }) } // Create a new service object based on the shadow service info svc := &corev1.Service{ ObjectMeta: metav1.ObjectMeta{ Name: shadow.key.Name, Namespace: shadow.key.Namespace, Labels: maps.MergeCopy(map[string]string{ InferencePoolRefLabel: shadow.poolName, InferencePoolExtensionRefSvc: extRef.name, InferencePoolExtensionRefPort: strconv.Itoa(int(extRef.port)), InferencePoolExtensionRefFailureMode: extRef.failureMode, constants.InternalServiceSemantics: constants.ServiceSemanticsInferencePool, }, existingLabels), }, Spec: corev1.ServiceSpec{ Selector: shadow.selector, Type: corev1.ServiceTypeClusterIP, ClusterIP: corev1.ClusterIPNone, // Headless service Ports: ports, }, } svc.SetOwnerReferences([]metav1.OwnerReference{ { APIVersion: gvk.InferencePool.GroupVersion(), Kind: gvk.InferencePool.Kind, Name: shadow.poolName, UID: shadow.poolUID, }, }) return svc } func (c *Controller) reconcileShadowService( svcClient kclient.Client[*corev1.Service], inferencePools krt.Collection[InferencePool], servicesCollection krt.Collection[*corev1.Service], ) func(key types.NamespacedName) error { return func(key types.NamespacedName) error { // Find the InferencePool that matches the key pool := inferencePools.GetKey(key.String()) if pool == nil { // we'll generally ignore these scenarios, since the InferencePool may have been deleted log.Debugf("inferencepool no longer exists", key.String()) return nil } // We found the InferencePool, now we need to translate it to a shadow Service // and check if it exists already existingService := ptr.Flatten(servicesCollection.GetKey(pool.shadowService.key.String())) // Check if we can manage this service var existingLabels map[string]string if existingService != nil { existingLabels = existingService.GetLabels() canManage, _ := c.canManageShadowServiceForInference(existingService) if !canManage { log.Debugf("skipping service %s/%s, already managed by another controller", key.Namespace, key.Name) return nil } } service := translateShadowServiceToService(existingLabels, pool.shadowService, pool.extRef) var err error if existingService == nil { // Create the service if it doesn't exist _, err = svcClient.Create(service) } else { // TODO: Don't overwrite resources: https://github.com/istio/istio/issues/56667 service.ResourceVersion = existingService.ResourceVersion _, err = svcClient.Update(service) } return err } } // canManage checks if a service should be managed by this controller func (c *Controller) canManageShadowServiceForInference(obj *corev1.Service) (bool, string) { if obj == nil { // No object exists, we can manage it return true, "" } _, inferencePoolManaged := obj.GetLabels()[InferencePoolRefLabel] // We can manage if it has no manager or if we are the manager return inferencePoolManaged, obj.GetResourceVersion() } func indexHTTPRouteByInferencePool(o *gateway.HTTPRoute) []string { var keys []string for _, rule := range o.Spec.Rules { for _, backendRef := range rule.BackendRefs { if isInferencePoolBackendRef(backendRef.BackendRef) { // If BackendRef.Namespace is not specified, the backend is in the same namespace as the HTTPRoute's backendRefNamespace := o.Namespace if ptr.OrEmpty(backendRef.BackendRef.Namespace) != "" { backendRefNamespace = string(*backendRef.BackendRef.Namespace) } key := backendRefNamespace + "/" + string(backendRef.Name) keys = append(keys, key) } } } return keys }