From dc3e496aa0e8421253ee91dc3147843a5c265c19 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=BE=84=E6=BD=AD?= Date: Fri, 8 Sep 2023 15:59:22 +0800 Subject: [PATCH] fix a concurrency issue of mcprbidge reconcile (#511) --- pkg/ingress/config/ingress_config.go | 23 +++++++++++------------ registry/reconcile/reconcile.go | 6 +++--- 2 files changed, 14 insertions(+), 15 deletions(-) diff --git a/pkg/ingress/config/ingress_config.go b/pkg/ingress/config/ingress_config.go index d77c8416c..07bd7a599 100644 --- a/pkg/ingress/config/ingress_config.go +++ b/pkg/ingress/config/ingress_config.go @@ -29,6 +29,7 @@ import ( "github.com/gogo/protobuf/jsonpb" "github.com/gogo/protobuf/types" "github.com/golang/protobuf/ptypes/wrappers" + "go.uber.org/atomic" "google.golang.org/protobuf/types/known/anypb" extensions "istio.io/api/extensions/v1alpha1" networking "istio.io/api/networking/v1alpha3" @@ -109,7 +110,7 @@ type IngressConfig struct { RegistryReconciler *reconcile.Reconciler - mcpbridgeReconciled bool + mcpbridgeReconciled *atomic.Bool mcpbridgeController mcpbridge.McpBridgeController @@ -154,7 +155,7 @@ func NewIngressConfig(localKubeClient kube.Client, XDSUpdater model.XDSUpdater, common.CreateConvertedName(clusterId, "global"), watchedSecretSet: sets.NewSet(), namespace: namespace, - mcpbridgeReconciled: true, + mcpbridgeReconciled: atomic.NewBool(true), wasmPlugins: make(map[string]*extensions.WasmPlugin), http2rpcs: make(map[string]*higressv1.Http2Rpc), } @@ -947,9 +948,7 @@ func (m *IngressConfig) AddOrUpdateMcpBridge(clusterNamespacedName util.ClusterN clusterNamespacedName.Namespace, clusterNamespacedName.Name) return } - m.mutex.Lock() - m.mcpbridgeReconciled = false - m.mutex.Unlock() + m.mcpbridgeReconciled.Store(false) if m.RegistryReconciler == nil { m.RegistryReconciler = reconcile.NewReconciler(func() { metadata := config.Meta{ @@ -966,12 +965,12 @@ func (m *IngressConfig) AddOrUpdateMcpBridge(clusterNamespacedName util.ClusterN }, m.localKubeClient, m.namespace) } reconciler := m.RegistryReconciler - go func() { - reconciler.Reconcile(mcpbridge) - m.mutex.Lock() - m.mcpbridgeReconciled = true - m.mutex.Unlock() - }() + err = reconciler.Reconcile(mcpbridge) + if err != nil { + IngressLog.Errorf("Mcpbridge reconcile failed, err:%v", err) + return + } + m.mcpbridgeReconciled.Store(true) } func (m *IngressConfig) DeleteMcpBridge(clusterNamespacedName util.ClusterNamespacedName) { @@ -1405,7 +1404,7 @@ func (m *IngressConfig) HasSynced() bool { return false } } - if !m.mcpbridgeController.HasSynced() || !m.mcpbridgeReconciled { + if !m.mcpbridgeController.HasSynced() || !m.mcpbridgeReconciled.Load() { return false } if !m.wasmPluginController.HasSynced() { diff --git a/registry/reconcile/reconcile.go b/registry/reconcile/reconcile.go index 8928f43d4..c2fb8e78c 100644 --- a/registry/reconcile/reconcile.go +++ b/registry/reconcile/reconcile.go @@ -58,7 +58,7 @@ func NewReconciler(serviceUpdate func(), client kube.Client, namespace string) * } } -func (r *Reconciler) Reconcile(mcpbridge *v1.McpBridge) { +func (r *Reconciler) Reconcile(mcpbridge *v1.McpBridge) error { newRegistries := make(map[string]*apiv1.RegistryConfig) if mcpbridge != nil { for _, registry := range mcpbridge.Spec.Registries { @@ -121,12 +121,12 @@ func (r *Reconciler) Reconcile(mcpbridge *v1.McpBridge) { r.registries[k] = v } if errHappened { - log.Error("ReconcileRegistries failed, Init Watchers failed") - return + return errors.New("ReconcileRegistries failed, Init Watchers failed") } wg.Wait() r.Cache.PurgeStaleService() log.Infof("Registries is reconciled") + return nil } func (r *Reconciler) generateWatcherFromRegistryConfig(registry *apiv1.RegistryConfig, wg *sync.WaitGroup) (Watcher, error) {