Files
higress/pkg/ingress/kube/gateway/istio/inferencepool_collection.go
2025-11-26 10:15:00 +08:00

626 lines
22 KiB
Go

// Copyright Istio Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package istio
import (
"crypto/sha256"
"fmt"
"strconv"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/intstr"
inferencev1 "sigs.k8s.io/gateway-api-inference-extension/api/v1"
gatewayv1 "sigs.k8s.io/gateway-api/apis/v1"
gateway "sigs.k8s.io/gateway-api/apis/v1beta1"
"istio.io/istio/pkg/config/constants"
"istio.io/istio/pkg/config/schema/gvk"
"istio.io/istio/pkg/kube/kclient"
"istio.io/istio/pkg/kube/krt"
"istio.io/istio/pkg/maps"
"istio.io/istio/pkg/ptr"
"istio.io/istio/pkg/slices"
"istio.io/istio/pkg/util/sets"
)
const (
maxServiceNameLength = 63
hashSize = 8
InferencePoolRefLabel = "higress.io/inferencepool-name"
InferencePoolExtensionRefSvc = "higress.io/inferencepool-extension-service"
InferencePoolExtensionRefPort = "higress.io/inferencepool-extension-port"
InferencePoolExtensionRefFailureMode = "higress.io/inferencepool-extension-failure-mode"
)
// // ManagedLabel is the label used to identify resources managed by this controller
// const ManagedLabel = "inference.x-k8s.io/managed-by"
// ControllerName is the name of this controller for labeling resources it manages
const ControllerName = "inference-controller"
var supportedControllers = getSupportedControllers()
func getSupportedControllers() sets.Set[gatewayv1.GatewayController] {
ret := sets.New[gatewayv1.GatewayController]()
for _, controller := range builtinClasses {
ret.Insert(controller)
}
return ret
}
type shadowServiceInfo struct {
key types.NamespacedName
selector map[string]string
poolName string
poolUID types.UID
// targetPorts is the port number on the pods selected by the selector.
// Currently, inference extension only supports a single target port.
targetPorts []targetPort
}
type targetPort struct {
port int32
}
type extRefInfo struct {
name string
port int32
failureMode string
}
type InferencePool struct {
shadowService shadowServiceInfo
extRef extRefInfo
gatewayParents sets.Set[types.NamespacedName] // Gateways that reference this InferencePool
}
func (i InferencePool) ResourceName() string {
return i.shadowService.key.Namespace + "/" + i.shadowService.poolName
}
func InferencePoolCollection(
pools krt.Collection[*inferencev1.InferencePool],
services krt.Collection[*corev1.Service],
httpRoutes krt.Collection[*gateway.HTTPRoute],
gateways krt.Collection[*gateway.Gateway],
routesByInferencePool krt.Index[string, *gateway.HTTPRoute],
c *Controller,
opts krt.OptionsBuilder,
) (krt.StatusCollection[*inferencev1.InferencePool, inferencev1.InferencePoolStatus], krt.Collection[InferencePool]) {
return krt.NewStatusCollection(pools,
func(
ctx krt.HandlerContext,
pool *inferencev1.InferencePool,
) (*inferencev1.InferencePoolStatus, *InferencePool) {
// Fetch HTTPRoutes that reference this InferencePool once and reuse
routeList := krt.Fetch(ctx, httpRoutes, krt.FilterIndex(routesByInferencePool, pool.Namespace+"/"+pool.Name))
// Find gateway parents that reference this InferencePool through HTTPRoutes
gatewayParents := findGatewayParents(pool, routeList)
// TODO: If no gateway parents, we should not do anything
// note: we still need to filter out our Status to clean up previous reconciliations
// Create the InferencePool only if there are Gateways connected
var inferencePool *InferencePool
if len(gatewayParents) > 0 {
// Create the InferencePool object
inferencePool = createInferencePoolObject(pool, gatewayParents)
}
// Calculate status
status := calculateInferencePoolStatus(pool, gatewayParents, services, gateways, routeList)
return status, inferencePool
}, opts.WithName("InferenceExtension")...)
}
// createInferencePoolObject creates the InferencePool object with shadow service and extension ref info
func createInferencePoolObject(pool *inferencev1.InferencePool, gatewayParents sets.Set[types.NamespacedName]) *InferencePool {
// Build extension reference info
extRef := extRefInfo{
name: string(pool.Spec.EndpointPickerRef.Name),
}
if pool.Spec.EndpointPickerRef.Port == nil {
log.Errorf("invalid InferencePool %s/%s; endpointPickerRef port is required", pool.Namespace, pool.Name)
return nil
}
extRef.port = int32(pool.Spec.EndpointPickerRef.Port.Number)
extRef.failureMode = string(inferencev1.EndpointPickerFailClose) // Default failure mode
if pool.Spec.EndpointPickerRef.FailureMode != inferencev1.EndpointPickerFailClose {
extRef.failureMode = string(pool.Spec.EndpointPickerRef.FailureMode)
}
svcName, err := InferencePoolServiceName(pool.Name)
if err != nil {
log.Errorf("failed to generate service name for InferencePool %s: %v", pool.Name, err)
return nil
}
shadowSvcInfo := shadowServiceInfo{
key: types.NamespacedName{
Name: svcName,
Namespace: pool.GetNamespace(),
},
selector: make(map[string]string, len(pool.Spec.Selector.MatchLabels)),
poolName: pool.GetName(),
targetPorts: make([]targetPort, 0, len(pool.Spec.TargetPorts)),
poolUID: pool.GetUID(),
}
for k, v := range pool.Spec.Selector.MatchLabels {
shadowSvcInfo.selector[string(k)] = string(v)
}
for _, port := range pool.Spec.TargetPorts {
shadowSvcInfo.targetPorts = append(shadowSvcInfo.targetPorts, targetPort{port: int32(port.Number)})
}
return &InferencePool{
shadowService: shadowSvcInfo,
extRef: extRef,
gatewayParents: gatewayParents,
}
}
// calculateInferencePoolStatus calculates the complete status for an InferencePool
func calculateInferencePoolStatus(
pool *inferencev1.InferencePool,
gatewayParents sets.Set[types.NamespacedName],
services krt.Collection[*corev1.Service],
gateways krt.Collection[*gateway.Gateway],
routeList []*gateway.HTTPRoute,
) *inferencev1.InferencePoolStatus {
// Calculate status for each gateway parent
existingParents := pool.Status.DeepCopy().Parents
finalParents := []inferencev1.ParentStatus{}
// Add existing parents from other controllers (not managed by us)
for _, existingParent := range existingParents {
gtwName := string(existingParent.ParentRef.Name)
gtwNamespace := pool.Namespace
if existingParent.ParentRef.Namespace != "" {
gtwNamespace = string(existingParent.ParentRef.Namespace)
}
parentKey := types.NamespacedName{
Name: gtwName,
Namespace: gtwNamespace,
}
isCurrentlyOurs := gatewayParents.Contains(parentKey)
// Keep parents that are not ours and not default status parents
if !isCurrentlyOurs &&
!isOurManagedGateway(gateways, gtwNamespace, gtwName) &&
!isDefaultStatusParent(existingParent) {
finalParents = append(finalParents, existingParent)
}
}
// Calculate status for each of our gateway parents
for gatewayParent := range gatewayParents {
parentStatus := calculateSingleParentStatus(pool, gatewayParent, services, existingParents, routeList)
finalParents = append(finalParents, parentStatus)
}
return &inferencev1.InferencePoolStatus{
Parents: finalParents,
}
}
// findGatewayParents finds all Gateway parents that reference this InferencePool through HTTPRoutes
func findGatewayParents(
pool *inferencev1.InferencePool,
routeList []*gateway.HTTPRoute,
) sets.Set[types.NamespacedName] {
gatewayParents := sets.New[types.NamespacedName]()
for _, route := range routeList {
// Only process routes that reference our InferencePool
if !routeReferencesInferencePool(route, pool) {
continue
}
// Check the route's parent status to find accepted gateways
for _, parentStatus := range route.Status.Parents {
// Only consider parents managed by our supported controllers (from supportedControllers variable)
// This filters out parents from other controllers we don't manage
if !supportedControllers.Contains(parentStatus.ControllerName) {
continue
}
// Get the gateway namespace (default to route namespace if not specified)
gatewayNamespace := route.Namespace
if ptr.OrEmpty(parentStatus.ParentRef.Namespace) != "" {
gatewayNamespace = string(*parentStatus.ParentRef.Namespace)
}
gatewayParents.Insert(types.NamespacedName{
Name: string(parentStatus.ParentRef.Name),
Namespace: gatewayNamespace,
})
}
}
return gatewayParents
}
// routeReferencesInferencePool checks if an HTTPRoute references the given InferencePool
func routeReferencesInferencePool(route *gateway.HTTPRoute, pool *inferencev1.InferencePool) bool {
for _, rule := range route.Spec.Rules {
for _, backendRef := range rule.BackendRefs {
if !isInferencePoolBackendRef(backendRef.BackendRef) {
continue
}
// Check if this backend ref points to our InferencePool
if string(backendRef.BackendRef.Name) != pool.ObjectMeta.Name {
continue
}
// Check namespace match
backendRefNamespace := route.Namespace
if ptr.OrEmpty(backendRef.BackendRef.Namespace) != "" {
backendRefNamespace = string(*backendRef.BackendRef.Namespace)
}
if backendRefNamespace == pool.Namespace {
return true
}
}
}
return false
}
// isInferencePoolBackendRef checks if a BackendRef is pointing to an InferencePool
func isInferencePoolBackendRef(backendRef gatewayv1.BackendRef) bool {
return ptr.OrEmpty(backendRef.Group) == gatewayv1.Group(gvk.InferencePool.Group) &&
ptr.OrEmpty(backendRef.Kind) == gatewayv1.Kind(gvk.InferencePool.Kind)
}
// calculateSingleParentStatus calculates the status for a single gateway parent
func calculateSingleParentStatus(
pool *inferencev1.InferencePool,
gatewayParent types.NamespacedName,
services krt.Collection[*corev1.Service],
existingParents []inferencev1.ParentStatus,
routeList []*gateway.HTTPRoute,
) inferencev1.ParentStatus {
// Find existing status for this parent to preserve some conditions
var existingConditions []metav1.Condition
for _, existingParent := range existingParents {
if string(existingParent.ParentRef.Name) == gatewayParent.Name &&
string(existingParent.ParentRef.Namespace) == gatewayParent.Namespace {
existingConditions = existingParent.Conditions
break
}
}
// Filter to only keep conditions we manage
filteredConditions := filterUsedConditions(existingConditions,
inferencev1.InferencePoolConditionAccepted,
inferencev1.InferencePoolConditionResolvedRefs)
// Calculate Accepted status by checking HTTPRoute parent status
acceptedStatus := calculateAcceptedStatus(pool, gatewayParent, routeList)
// Calculate ResolvedRefs status
resolvedRefsStatus := calculateResolvedRefsStatus(pool, services)
// Build the final status
return inferencev1.ParentStatus{
ParentRef: inferencev1.ParentReference{
Group: (*inferencev1.Group)(&gvk.Gateway.Group),
Kind: inferencev1.Kind(gvk.Gateway.Kind),
Namespace: inferencev1.Namespace(gatewayParent.Namespace),
Name: inferencev1.ObjectName(gatewayParent.Name),
},
Conditions: setConditions(pool.Generation, filteredConditions, map[string]*condition{
string(inferencev1.InferencePoolConditionAccepted): acceptedStatus,
string(inferencev1.InferencePoolConditionResolvedRefs): resolvedRefsStatus,
}),
}
}
// calculateAcceptedStatus determines if the InferencePool is accepted by checking HTTPRoute parent status
func calculateAcceptedStatus(
pool *inferencev1.InferencePool,
gatewayParent types.NamespacedName,
routeList []*gateway.HTTPRoute,
) *condition {
// Check if any HTTPRoute references this InferencePool and has this gateway as an accepted parent
for _, route := range routeList {
// Only process routes that reference our InferencePool
if !routeReferencesInferencePool(route, pool) {
continue
}
// Check if this route has our gateway as a parent and if it's accepted
for _, parentStatus := range route.Status.Parents {
// Only consider parents managed by supported controllers
if !supportedControllers.Contains(parentStatus.ControllerName) {
continue
}
// Check if this parent refers to our gateway
gatewayNamespace := route.Namespace
if ptr.OrEmpty(parentStatus.ParentRef.Namespace) != "" {
gatewayNamespace = string(*parentStatus.ParentRef.Namespace)
}
if string(parentStatus.ParentRef.Name) == gatewayParent.Name && gatewayNamespace == gatewayParent.Namespace {
// Check if this parent is accepted
for _, parentCondition := range parentStatus.Conditions {
if parentCondition.Type == string(gatewayv1.RouteConditionAccepted) {
if parentCondition.Status == metav1.ConditionTrue {
return &condition{
reason: string(inferencev1.InferencePoolReasonAccepted),
status: metav1.ConditionTrue,
message: "Referenced by an HTTPRoute accepted by the parentRef Gateway",
}
}
return &condition{
reason: string(inferencev1.InferencePoolReasonHTTPRouteNotAccepted),
status: metav1.ConditionFalse,
message: fmt.Sprintf("Referenced HTTPRoute %s/%s not accepted by Gateway %s/%s: %s",
route.Namespace, route.Name, gatewayParent.Namespace, gatewayParent.Name, parentCondition.Message),
}
}
}
// If no Accepted condition found, treat as unknown (parent is listed in status)
return &condition{
reason: string(inferencev1.InferencePoolReasonAccepted),
status: metav1.ConditionUnknown,
message: "Referenced by an HTTPRoute unknown parentRef Gateway status",
}
}
}
}
// If we get here, no HTTPRoute was found that references this InferencePool with this gateway as parent
// This shouldn't happen in normal operation since we only call this for known gateway parents
return &condition{
reason: string(inferencev1.InferencePoolReasonHTTPRouteNotAccepted),
status: metav1.ConditionFalse,
message: fmt.Sprintf("No HTTPRoute found referencing this InferencePool with Gateway %s/%s as parent",
gatewayParent.Namespace, gatewayParent.Name),
}
}
// calculateResolvedRefsStatus determines the states of the ExtensionRef
// * if the kind is supported
// * if the extensionRef is defined
// * if the service exists in the same namespace as the InferencePool
func calculateResolvedRefsStatus(
pool *inferencev1.InferencePool,
services krt.Collection[*corev1.Service],
) *condition {
// Default Kind to Service if unset
kind := string(pool.Spec.EndpointPickerRef.Kind)
if kind == "" {
kind = gvk.Service.Kind
}
if kind != gvk.Service.Kind {
return &condition{
reason: string(inferencev1.InferencePoolReasonInvalidExtensionRef),
status: metav1.ConditionFalse,
message: "Unsupported ExtensionRef kind " + kind,
}
}
name := string(pool.Spec.EndpointPickerRef.Name)
if name == "" {
return &condition{
reason: string(inferencev1.InferencePoolReasonInvalidExtensionRef),
status: metav1.ConditionFalse,
message: "ExtensionRef not defined",
}
}
svc := ptr.Flatten(services.GetKey(fmt.Sprintf("%s/%s", pool.Namespace, name)))
if svc == nil {
return &condition{
reason: string(inferencev1.InferencePoolReasonInvalidExtensionRef),
status: metav1.ConditionFalse,
message: "Referenced ExtensionRef not found " + name,
}
}
return &condition{
reason: string(inferencev1.InferencePoolReasonResolvedRefs),
status: metav1.ConditionTrue,
message: "Referenced ExtensionRef resolved successfully",
}
}
// isDefaultStatusParent checks if this is a default status parent entry
func isDefaultStatusParent(parent inferencev1.ParentStatus) bool {
return string(parent.ParentRef.Kind) == "Status" && parent.ParentRef.Name == "default"
}
// isOurManagedGateway checks if a Gateway is managed by one of our supported controllers
// This is used to identify stale parent entries that we previously added but are no longer referenced by HTTPRoutes
func isOurManagedGateway(gateways krt.Collection[*gateway.Gateway], namespace, name string) bool {
gtw := ptr.Flatten(gateways.GetKey(fmt.Sprintf("%s/%s", namespace, name)))
if gtw == nil {
return false
}
_, ok := builtinClasses[gtw.Spec.GatewayClassName]
return ok
}
func filterUsedConditions(conditions []metav1.Condition, usedConditions ...inferencev1.InferencePoolConditionType) []metav1.Condition {
var result []metav1.Condition
for _, condition := range conditions {
if slices.Contains(usedConditions, inferencev1.InferencePoolConditionType(condition.Type)) {
result = append(result, condition)
}
}
return result
}
// generateHash generates an 8-character SHA256 hash of the input string.
func generateHash(input string, length int) string {
hashBytes := sha256.Sum256([]byte(input))
hashString := fmt.Sprintf("%x", hashBytes) // Convert to hexadecimal string
return hashString[:length] // Truncate to desired length
}
func InferencePoolServiceName(poolName string) (string, error) {
ipSeparator := "-ip-"
hash := generateHash(poolName, hashSize)
svcName := poolName + ipSeparator + hash
// Truncate if necessary to meet the Kubernetes naming constraints
if len(svcName) > maxServiceNameLength {
// Calculate the maximum allowed base name length
maxBaseLength := maxServiceNameLength - len(ipSeparator) - hashSize
if maxBaseLength < 0 {
return "", fmt.Errorf("inference pool name: %s is too long", poolName)
}
// Truncate the base name and reconstruct the service name
truncatedBase := poolName[:maxBaseLength]
svcName = truncatedBase + ipSeparator + hash
}
return svcName, nil
}
func translateShadowServiceToService(existingLabels map[string]string, shadow shadowServiceInfo, extRef extRefInfo) *corev1.Service {
// Create the ports used by the shadow service
ports := make([]corev1.ServicePort, 0, len(shadow.targetPorts))
dummyPort := int32(54321) // Dummy port, not used for anything
for i, port := range shadow.targetPorts {
ports = append(ports, corev1.ServicePort{
Name: "port" + strconv.Itoa(i),
Protocol: corev1.ProtocolTCP,
Port: dummyPort + int32(i),
TargetPort: intstr.FromInt(int(port.port)),
})
}
// Create a new service object based on the shadow service info
svc := &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: shadow.key.Name,
Namespace: shadow.key.Namespace,
Labels: maps.MergeCopy(map[string]string{
InferencePoolRefLabel: shadow.poolName,
InferencePoolExtensionRefSvc: extRef.name,
InferencePoolExtensionRefPort: strconv.Itoa(int(extRef.port)),
InferencePoolExtensionRefFailureMode: extRef.failureMode,
constants.InternalServiceSemantics: constants.ServiceSemanticsInferencePool,
}, existingLabels),
},
Spec: corev1.ServiceSpec{
Selector: shadow.selector,
Type: corev1.ServiceTypeClusterIP,
ClusterIP: corev1.ClusterIPNone, // Headless service
Ports: ports,
},
}
svc.SetOwnerReferences([]metav1.OwnerReference{
{
APIVersion: gvk.InferencePool.GroupVersion(),
Kind: gvk.InferencePool.Kind,
Name: shadow.poolName,
UID: shadow.poolUID,
},
})
return svc
}
func (c *Controller) reconcileShadowService(
svcClient kclient.Client[*corev1.Service],
inferencePools krt.Collection[InferencePool],
servicesCollection krt.Collection[*corev1.Service],
) func(key types.NamespacedName) error {
return func(key types.NamespacedName) error {
// Find the InferencePool that matches the key
pool := inferencePools.GetKey(key.String())
if pool == nil {
// we'll generally ignore these scenarios, since the InferencePool may have been deleted
log.Debugf("inferencepool no longer exists", key.String())
return nil
}
// We found the InferencePool, now we need to translate it to a shadow Service
// and check if it exists already
existingService := ptr.Flatten(servicesCollection.GetKey(pool.shadowService.key.String()))
// Check if we can manage this service
var existingLabels map[string]string
if existingService != nil {
existingLabels = existingService.GetLabels()
canManage, _ := c.canManageShadowServiceForInference(existingService)
if !canManage {
log.Debugf("skipping service %s/%s, already managed by another controller", key.Namespace, key.Name)
return nil
}
}
service := translateShadowServiceToService(existingLabels, pool.shadowService, pool.extRef)
var err error
if existingService == nil {
// Create the service if it doesn't exist
_, err = svcClient.Create(service)
} else {
// TODO: Don't overwrite resources: https://github.com/istio/istio/issues/56667
service.ResourceVersion = existingService.ResourceVersion
_, err = svcClient.Update(service)
}
return err
}
}
// canManage checks if a service should be managed by this controller
func (c *Controller) canManageShadowServiceForInference(obj *corev1.Service) (bool, string) {
if obj == nil {
// No object exists, we can manage it
return true, ""
}
_, inferencePoolManaged := obj.GetLabels()[InferencePoolRefLabel]
// We can manage if it has no manager or if we are the manager
return inferencePoolManaged, obj.GetResourceVersion()
}
func indexHTTPRouteByInferencePool(o *gateway.HTTPRoute) []string {
var keys []string
for _, rule := range o.Spec.Rules {
for _, backendRef := range rule.BackendRefs {
if isInferencePoolBackendRef(backendRef.BackendRef) {
// If BackendRef.Namespace is not specified, the backend is in the same namespace as the HTTPRoute's
backendRefNamespace := o.Namespace
if ptr.OrEmpty(backendRef.BackendRef.Namespace) != "" {
backendRefNamespace = string(*backendRef.BackendRef.Namespace)
}
key := backendRefNamespace + "/" + string(backendRef.Name)
keys = append(keys, key)
}
}
}
return keys
}