fix a concurrency issue of mcprbidge reconcile (#511)

This commit is contained in:
澄潭
2023-09-08 15:59:22 +08:00
committed by GitHub
parent 8747e1ddad
commit dc3e496aa0
2 changed files with 14 additions and 15 deletions

View File

@@ -29,6 +29,7 @@ import (
"github.com/gogo/protobuf/jsonpb" "github.com/gogo/protobuf/jsonpb"
"github.com/gogo/protobuf/types" "github.com/gogo/protobuf/types"
"github.com/golang/protobuf/ptypes/wrappers" "github.com/golang/protobuf/ptypes/wrappers"
"go.uber.org/atomic"
"google.golang.org/protobuf/types/known/anypb" "google.golang.org/protobuf/types/known/anypb"
extensions "istio.io/api/extensions/v1alpha1" extensions "istio.io/api/extensions/v1alpha1"
networking "istio.io/api/networking/v1alpha3" networking "istio.io/api/networking/v1alpha3"
@@ -109,7 +110,7 @@ type IngressConfig struct {
RegistryReconciler *reconcile.Reconciler RegistryReconciler *reconcile.Reconciler
mcpbridgeReconciled bool mcpbridgeReconciled *atomic.Bool
mcpbridgeController mcpbridge.McpBridgeController mcpbridgeController mcpbridge.McpBridgeController
@@ -154,7 +155,7 @@ func NewIngressConfig(localKubeClient kube.Client, XDSUpdater model.XDSUpdater,
common.CreateConvertedName(clusterId, "global"), common.CreateConvertedName(clusterId, "global"),
watchedSecretSet: sets.NewSet(), watchedSecretSet: sets.NewSet(),
namespace: namespace, namespace: namespace,
mcpbridgeReconciled: true, mcpbridgeReconciled: atomic.NewBool(true),
wasmPlugins: make(map[string]*extensions.WasmPlugin), wasmPlugins: make(map[string]*extensions.WasmPlugin),
http2rpcs: make(map[string]*higressv1.Http2Rpc), http2rpcs: make(map[string]*higressv1.Http2Rpc),
} }
@@ -947,9 +948,7 @@ func (m *IngressConfig) AddOrUpdateMcpBridge(clusterNamespacedName util.ClusterN
clusterNamespacedName.Namespace, clusterNamespacedName.Name) clusterNamespacedName.Namespace, clusterNamespacedName.Name)
return return
} }
m.mutex.Lock() m.mcpbridgeReconciled.Store(false)
m.mcpbridgeReconciled = false
m.mutex.Unlock()
if m.RegistryReconciler == nil { if m.RegistryReconciler == nil {
m.RegistryReconciler = reconcile.NewReconciler(func() { m.RegistryReconciler = reconcile.NewReconciler(func() {
metadata := config.Meta{ metadata := config.Meta{
@@ -966,12 +965,12 @@ func (m *IngressConfig) AddOrUpdateMcpBridge(clusterNamespacedName util.ClusterN
}, m.localKubeClient, m.namespace) }, m.localKubeClient, m.namespace)
} }
reconciler := m.RegistryReconciler reconciler := m.RegistryReconciler
go func() { err = reconciler.Reconcile(mcpbridge)
reconciler.Reconcile(mcpbridge) if err != nil {
m.mutex.Lock() IngressLog.Errorf("Mcpbridge reconcile failed, err:%v", err)
m.mcpbridgeReconciled = true return
m.mutex.Unlock() }
}() m.mcpbridgeReconciled.Store(true)
} }
func (m *IngressConfig) DeleteMcpBridge(clusterNamespacedName util.ClusterNamespacedName) { func (m *IngressConfig) DeleteMcpBridge(clusterNamespacedName util.ClusterNamespacedName) {
@@ -1405,7 +1404,7 @@ func (m *IngressConfig) HasSynced() bool {
return false return false
} }
} }
if !m.mcpbridgeController.HasSynced() || !m.mcpbridgeReconciled { if !m.mcpbridgeController.HasSynced() || !m.mcpbridgeReconciled.Load() {
return false return false
} }
if !m.wasmPluginController.HasSynced() { if !m.wasmPluginController.HasSynced() {

View File

@@ -58,7 +58,7 @@ func NewReconciler(serviceUpdate func(), client kube.Client, namespace string) *
} }
} }
func (r *Reconciler) Reconcile(mcpbridge *v1.McpBridge) { func (r *Reconciler) Reconcile(mcpbridge *v1.McpBridge) error {
newRegistries := make(map[string]*apiv1.RegistryConfig) newRegistries := make(map[string]*apiv1.RegistryConfig)
if mcpbridge != nil { if mcpbridge != nil {
for _, registry := range mcpbridge.Spec.Registries { for _, registry := range mcpbridge.Spec.Registries {
@@ -121,12 +121,12 @@ func (r *Reconciler) Reconcile(mcpbridge *v1.McpBridge) {
r.registries[k] = v r.registries[k] = v
} }
if errHappened { if errHappened {
log.Error("ReconcileRegistries failed, Init Watchers failed") return errors.New("ReconcileRegistries failed, Init Watchers failed")
return
} }
wg.Wait() wg.Wait()
r.Cache.PurgeStaleService() r.Cache.PurgeStaleService()
log.Infof("Registries is reconciled") log.Infof("Registries is reconciled")
return nil
} }
func (r *Reconciler) generateWatcherFromRegistryConfig(registry *apiv1.RegistryConfig, wg *sync.WaitGroup) (Watcher, error) { func (r *Reconciler) generateWatcherFromRegistryConfig(registry *apiv1.RegistryConfig, wg *sync.WaitGroup) (Watcher, error) {