mirror of
https://github.com/alibaba/higress.git
synced 2026-06-01 08:37:26 +08:00
fix reconcile of mcpbridge (#559)
This commit is contained in:
@@ -152,13 +152,13 @@ build-gateway: prebuild external/package/envoy-amd64.tar.gz external/package/env
|
|||||||
cd external/istio; BUILD_WITH_CONTAINER=1 BUILDX_PLATFORM=true DOCKER_BUILD_VARIANTS=default DOCKER_TARGETS="docker.proxyv2" make docker
|
cd external/istio; BUILD_WITH_CONTAINER=1 BUILDX_PLATFORM=true DOCKER_BUILD_VARIANTS=default DOCKER_TARGETS="docker.proxyv2" make docker
|
||||||
|
|
||||||
build-gateway-local: prebuild external/package/envoy-amd64.tar.gz external/package/envoy-arm64.tar.gz build-pilot
|
build-gateway-local: prebuild external/package/envoy-amd64.tar.gz external/package/envoy-arm64.tar.gz build-pilot
|
||||||
cd external/istio; BUILD_WITH_CONTAINER=1 BUILDX_PLATFORM=false DOCKER_BUILD_VARIANTS=default DOCKER_TARGETS="docker.proxyv2" make docker
|
cd external/istio; rm -rf out/linux_amd64; GOOS_LOCAL=linux TARGET_OS=linux BUILD_WITH_CONTAINER=1 BUILDX_PLATFORM=false DOCKER_BUILD_VARIANTS=default DOCKER_TARGETS="docker.proxyv2" make docker
|
||||||
|
|
||||||
build-istio: prebuild build-pilot
|
build-istio: prebuild build-pilot
|
||||||
cd external/istio; BUILD_WITH_CONTAINER=1 BUILDX_PLATFORM=true DOCKER_BUILD_VARIANTS=default DOCKER_TARGETS="docker.pilot" make docker
|
cd external/istio; BUILD_WITH_CONTAINER=1 BUILDX_PLATFORM=true DOCKER_BUILD_VARIANTS=default DOCKER_TARGETS="docker.pilot" make docker
|
||||||
|
|
||||||
build-istio-local: prebuild build-pilot
|
build-istio-local: prebuild
|
||||||
cd external/istio; BUILD_WITH_CONTAINER=1 BUILDX_PLATFORM=false DOCKER_BUILD_VARIANTS=default DOCKER_TARGETS="docker.pilot" make docker
|
cd external/istio; rm -rf out/linux_amd64; GOOS_LOCAL=linux TARGET_OS=linux BUILD_WITH_CONTAINER=1 BUILDX_PLATFORM=false DOCKER_BUILD_VARIANTS=default DOCKER_TARGETS="docker.pilot" make docker
|
||||||
|
|
||||||
build-wasmplugins:
|
build-wasmplugins:
|
||||||
./tools/hack/build-wasm-plugins.sh
|
./tools/hack/build-wasm-plugins.sh
|
||||||
|
|||||||
@@ -41,6 +41,8 @@ import (
|
|||||||
"istio.io/istio/pkg/config/constants"
|
"istio.io/istio/pkg/config/constants"
|
||||||
"istio.io/istio/pkg/config/schema/collection"
|
"istio.io/istio/pkg/config/schema/collection"
|
||||||
"istio.io/istio/pkg/config/schema/gvk"
|
"istio.io/istio/pkg/config/schema/gvk"
|
||||||
|
kerrors "k8s.io/apimachinery/pkg/api/errors"
|
||||||
|
ktypes "k8s.io/apimachinery/pkg/types"
|
||||||
listersv1 "k8s.io/client-go/listers/core/v1"
|
listersv1 "k8s.io/client-go/listers/core/v1"
|
||||||
"k8s.io/client-go/tools/cache"
|
"k8s.io/client-go/tools/cache"
|
||||||
|
|
||||||
@@ -86,6 +88,10 @@ var (
|
|||||||
}
|
}
|
||||||
)
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
DefaultMcpbridgeName = "default"
|
||||||
|
)
|
||||||
|
|
||||||
type IngressConfig struct {
|
type IngressConfig struct {
|
||||||
// key: cluster id
|
// key: cluster id
|
||||||
remoteIngressControllers map[string]common.IngressController
|
remoteIngressControllers map[string]common.IngressController
|
||||||
@@ -155,7 +161,7 @@ func NewIngressConfig(localKubeClient kube.Client, XDSUpdater model.XDSUpdater,
|
|||||||
common.CreateConvertedName(clusterId, "global"),
|
common.CreateConvertedName(clusterId, "global"),
|
||||||
watchedSecretSet: sets.NewSet(),
|
watchedSecretSet: sets.NewSet(),
|
||||||
namespace: namespace,
|
namespace: namespace,
|
||||||
mcpbridgeReconciled: atomic.NewBool(true),
|
mcpbridgeReconciled: atomic.NewBool(false),
|
||||||
wasmPlugins: make(map[string]*extensions.WasmPlugin),
|
wasmPlugins: make(map[string]*extensions.WasmPlugin),
|
||||||
http2rpcs: make(map[string]*higressv1.Http2Rpc),
|
http2rpcs: make(map[string]*higressv1.Http2Rpc),
|
||||||
}
|
}
|
||||||
@@ -939,7 +945,7 @@ func (m *IngressConfig) DeleteWasmPlugin(clusterNamespacedName util.ClusterNames
|
|||||||
|
|
||||||
func (m *IngressConfig) AddOrUpdateMcpBridge(clusterNamespacedName util.ClusterNamespacedName) {
|
func (m *IngressConfig) AddOrUpdateMcpBridge(clusterNamespacedName util.ClusterNamespacedName) {
|
||||||
// TODO: get resource name from config
|
// TODO: get resource name from config
|
||||||
if clusterNamespacedName.Name != "default" || clusterNamespacedName.Namespace != m.namespace {
|
if clusterNamespacedName.Name != DefaultMcpbridgeName || clusterNamespacedName.Namespace != m.namespace {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
mcpbridge, err := m.mcpbridgeLister.McpBridges(clusterNamespacedName.Namespace).Get(clusterNamespacedName.Name)
|
mcpbridge, err := m.mcpbridgeLister.McpBridges(clusterNamespacedName.Namespace).Get(clusterNamespacedName.Name)
|
||||||
@@ -948,7 +954,6 @@ func (m *IngressConfig) AddOrUpdateMcpBridge(clusterNamespacedName util.ClusterN
|
|||||||
clusterNamespacedName.Namespace, clusterNamespacedName.Name)
|
clusterNamespacedName.Namespace, clusterNamespacedName.Name)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
m.mcpbridgeReconciled.Store(false)
|
|
||||||
if m.RegistryReconciler == nil {
|
if m.RegistryReconciler == nil {
|
||||||
m.RegistryReconciler = reconcile.NewReconciler(func() {
|
m.RegistryReconciler = reconcile.NewReconciler(func() {
|
||||||
metadata := config.Meta{
|
metadata := config.Meta{
|
||||||
@@ -1404,8 +1409,21 @@ func (m *IngressConfig) HasSynced() bool {
|
|||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if !m.mcpbridgeController.HasSynced() || !m.mcpbridgeReconciled.Load() {
|
if !m.mcpbridgeController.HasSynced() {
|
||||||
return false
|
return false
|
||||||
|
} else {
|
||||||
|
_, err := m.mcpbridgeController.Get(ktypes.NamespacedName{
|
||||||
|
Namespace: m.namespace,
|
||||||
|
Name: DefaultMcpbridgeName,
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
if !kerrors.IsNotFound(err) {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
// mcpbridge exist
|
||||||
|
} else if !m.mcpbridgeReconciled.Load() {
|
||||||
|
return false
|
||||||
|
}
|
||||||
}
|
}
|
||||||
if !m.wasmPluginController.HasSynced() {
|
if !m.wasmPluginController.HasSynced() {
|
||||||
return false
|
return false
|
||||||
|
|||||||
@@ -502,7 +502,7 @@ func (m *KIngressConfig) HasSynced() bool {
|
|||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
IngressLog.Info("Ingress config controller synced.")
|
IngressLog.Info("KIngress config controller synced.")
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -40,6 +40,8 @@ type Controller[lister any] interface {
|
|||||||
|
|
||||||
Lister() lister
|
Lister() lister
|
||||||
|
|
||||||
|
Get(types.NamespacedName) (controllers.Object, error)
|
||||||
|
|
||||||
Informer() cache.SharedIndexInformer
|
Informer() cache.SharedIndexInformer
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -147,6 +149,10 @@ func (c *CommonController[lister]) onEvent(namespacedName types.NamespacedName)
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *CommonController[lister]) Get(namespacedName types.NamespacedName) (controllers.Object, error) {
|
||||||
|
return c.getFunc(c.lister, namespacedName)
|
||||||
|
}
|
||||||
|
|
||||||
func (c *CommonController[lister]) HasSynced() bool {
|
func (c *CommonController[lister]) HasSynced() bool {
|
||||||
return c.informer.HasSynced()
|
return c.informer.HasSynced()
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -21,6 +21,7 @@ import (
|
|||||||
"path"
|
"path"
|
||||||
"reflect"
|
"reflect"
|
||||||
"sync"
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
"istio.io/pkg/log"
|
"istio.io/pkg/log"
|
||||||
|
|
||||||
@@ -38,6 +39,10 @@ import (
|
|||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
DefaultReadyTimeout = time.Second * 60
|
||||||
|
)
|
||||||
|
|
||||||
type Reconciler struct {
|
type Reconciler struct {
|
||||||
memory.Cache
|
memory.Cache
|
||||||
registries map[string]*apiv1.RegistryConfig
|
registries map[string]*apiv1.RegistryConfig
|
||||||
@@ -123,7 +128,17 @@ func (r *Reconciler) Reconcile(mcpbridge *v1.McpBridge) error {
|
|||||||
if errHappened {
|
if errHappened {
|
||||||
return errors.New("ReconcileRegistries failed, Init Watchers failed")
|
return errors.New("ReconcileRegistries failed, Init Watchers failed")
|
||||||
}
|
}
|
||||||
|
var ready = make(chan struct{})
|
||||||
|
readyTimer := time.NewTimer(DefaultReadyTimeout)
|
||||||
|
go func() {
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
|
ready <- struct{}{}
|
||||||
|
}()
|
||||||
|
select {
|
||||||
|
case <-ready:
|
||||||
|
case <-readyTimer.C:
|
||||||
|
return errors.New("ReoncileRegistries failed, waiting for ready timeout")
|
||||||
|
}
|
||||||
r.Cache.PurgeStaleService()
|
r.Cache.PurgeStaleService()
|
||||||
log.Infof("Registries is reconciled")
|
log.Infof("Registries is reconciled")
|
||||||
return nil
|
return nil
|
||||||
|
|||||||
Reference in New Issue
Block a user