diff --git a/istio/1.12/patches/istio/20221110-fix-deadlock.patch b/istio/1.12/patches/istio/20221110-fix-deadlock.patch new file mode 100644 index 000000000..dfcfdffad --- /dev/null +++ b/istio/1.12/patches/istio/20221110-fix-deadlock.patch @@ -0,0 +1,185 @@ +diff --git a/pkg/istio-agent/xds_proxy.go b/pkg/istio-agent/xds_proxy.go +index 51766751e1..5b33234024 100644 +--- a/pkg/istio-agent/xds_proxy.go ++++ b/pkg/istio-agent/xds_proxy.go +@@ -47,6 +47,7 @@ import ( + "istio.io/istio/pilot/pkg/features" + istiogrpc "istio.io/istio/pilot/pkg/grpc" + v3 "istio.io/istio/pilot/pkg/xds/v3" ++ "istio.io/istio/pkg/channels" + "istio.io/istio/pkg/config/constants" + dnsProto "istio.io/istio/pkg/dns/proto" + "istio.io/istio/pkg/istio-agent/health" +@@ -233,24 +234,14 @@ func initXdsProxy(ia *Agent) (*XdsProxy, error) { + // PersistRequest sends a request to the currently connected proxy. Additionally, on any reconnection + // to the upstream XDS request we will resend this request. + func (p *XdsProxy) PersistRequest(req *discovery.DiscoveryRequest) { +- var ch chan *discovery.DiscoveryRequest +- var stop chan struct{} +- + p.connectedMutex.Lock() +- if p.connected != nil { +- ch = p.connected.requestsChan +- stop = p.connected.stopChan ++ // Immediately send if we are currently connect ++ if p.connected != nil && p.connected.requestsChan != nil { ++ p.connected.requestsChan.Put(req) + } ++ // Otherwise place it as our initial request for new connections + p.initialRequest = req + p.connectedMutex.Unlock() +- +- // Immediately send if we are currently connect +- if ch != nil { +- select { +- case ch <- req: +- case <-stop: +- } +- } + } + + func (p *XdsProxy) UnregisterStream(c *ProxyConnection) { +@@ -276,9 +267,9 @@ type ProxyConnection struct { + conID uint32 + upstreamError chan error + downstreamError chan error +- requestsChan chan *discovery.DiscoveryRequest ++ requestsChan *channels.Unbounded + responsesChan chan *discovery.DiscoveryResponse +- deltaRequestsChan chan *discovery.DeltaDiscoveryRequest ++ deltaRequestsChan *channels.Unbounded + deltaResponsesChan chan *discovery.DeltaDiscoveryResponse + stopChan chan struct{} + downstream adsStream +@@ -290,10 +281,7 @@ type ProxyConnection struct { + // sendRequest is a small wrapper around sending to con.requestsChan. This ensures that we do not + // block forever on + func (con *ProxyConnection) sendRequest(req *discovery.DiscoveryRequest) { +- select { +- case con.requestsChan <- req: +- case <-con.stopChan: +- } ++ con.requestsChan.Put(req) + } + + type adsStream interface { +@@ -315,10 +303,29 @@ func (p *XdsProxy) handleStream(downstream adsStream) error { + conID: connectionNumber.Inc(), + upstreamError: make(chan error, 2), // can be produced by recv and send + downstreamError: make(chan error, 2), // can be produced by recv and send +- requestsChan: make(chan *discovery.DiscoveryRequest, 10), +- responsesChan: make(chan *discovery.DiscoveryResponse, 10), +- stopChan: make(chan struct{}), +- downstream: downstream, ++ // Requests channel is unbounded. The Envoy<->XDS Proxy<->Istiod system produces a natural ++ // looping of Recv and Send. Due to backpressure introduce by gRPC natively (that is, Send() can ++ // only send so much data without being Recv'd before it starts blocking), along with the ++ // backpressure provided by our channels, we have a risk of deadlock where both xdsproxy and ++ // Istiod are trying to Send, but both are blocked by gRPC backpressure until Recv() is called. ++ // However, Recv can fail to be called by Send being blocked. This can be triggered by the two ++ // sources in our system (Envoy request and Istiod pushes) producing more events than we can keep ++ // up with. ++ // See https://github.com/istio/istio/issues/39209 for more information ++ // ++ // To prevent these issues, we need to either: ++ // 1. Apply backpressure directly to Envoy requests or Istiod pushes ++ // 2. Make part of the system unbounded ++ // ++ // (1) is challenging because we cannot do a conditional Recv (for Envoy requests), and changing ++ // the control plane requires substantial changes. Instead, we make the requests channel ++ // unbounded. This is the least likely to cause issues as the messages we store here are the ++ // smallest relative to other channels. ++ requestsChan: channels.NewUnbounded(), ++ // Allow a buffer of 1. This ensures we queue up at most 2 (one in process, 1 pending) responses before forwarding. ++ responsesChan: make(chan *discovery.DiscoveryResponse, 1), ++ stopChan: make(chan struct{}), ++ downstream: downstream, + } + + p.RegisterStream(con) +@@ -452,7 +459,9 @@ func (p *XdsProxy) handleUpstreamRequest(con *ProxyConnection) { + defer con.upstream.CloseSend() // nolint + for { + select { +- case req := <-con.requestsChan: ++ case requ := <-con.requestsChan.Get(): ++ con.requestsChan.Load() ++ req := requ.(*discovery.DiscoveryRequest) + proxyLog.Debugf("request for type url %s", req.TypeUrl) + metrics.XdsProxyRequests.Increment() + if req.TypeUrl == v3.ExtensionConfigurationType { +diff --git a/pkg/istio-agent/xds_proxy_delta.go b/pkg/istio-agent/xds_proxy_delta.go +index 2cd82b4059..c2245f9918 100644 +--- a/pkg/istio-agent/xds_proxy_delta.go ++++ b/pkg/istio-agent/xds_proxy_delta.go +@@ -27,6 +27,7 @@ import ( + "istio.io/istio/pilot/pkg/features" + istiogrpc "istio.io/istio/pilot/pkg/grpc" + v3 "istio.io/istio/pilot/pkg/xds/v3" ++ "istio.io/istio/pkg/channels" + "istio.io/istio/pkg/istio-agent/metrics" + "istio.io/istio/pkg/wasm" + ) +@@ -34,10 +35,7 @@ import ( + // sendDeltaRequest is a small wrapper around sending to con.requestsChan. This ensures that we do not + // block forever on + func (con *ProxyConnection) sendDeltaRequest(req *discovery.DeltaDiscoveryRequest) { +- select { +- case con.deltaRequestsChan <- req: +- case <-con.stopChan: +- } ++ con.deltaRequestsChan.Put(req) + } + + // requests from envoy +@@ -48,10 +46,11 @@ func (p *XdsProxy) DeltaAggregatedResources(downstream discovery.AggregatedDisco + proxyLog.Debugf("accepted delta xds connection from envoy, forwarding to upstream") + + con := &ProxyConnection{ +- upstreamError: make(chan error, 2), // can be produced by recv and send +- downstreamError: make(chan error, 2), // can be produced by recv and send +- deltaRequestsChan: make(chan *discovery.DeltaDiscoveryRequest, 10), +- deltaResponsesChan: make(chan *discovery.DeltaDiscoveryResponse, 10), ++ upstreamError: make(chan error, 2), // can be produced by recv and send ++ downstreamError: make(chan error, 2), // can be produced by recv and send ++ deltaRequestsChan: channels.NewUnbounded(), ++ // Allow a buffer of 1. This ensures we queue up at most 2 (one in process, 1 pending) responses before forwarding. ++ deltaResponsesChan: make(chan *discovery.DeltaDiscoveryResponse, 1), + stopChan: make(chan struct{}), + downstreamDeltas: downstream, + } +@@ -190,7 +189,9 @@ func (p *XdsProxy) handleUpstreamDeltaRequest(con *ProxyConnection) { + }() + for { + select { +- case req := <-con.deltaRequestsChan: ++ case requ := <-con.deltaRequestsChan.Get(): ++ con.deltaRequestsChan.Load() ++ req := requ.(*discovery.DeltaDiscoveryRequest) + proxyLog.Debugf("delta request for type url %s", req.TypeUrl) + metrics.XdsProxyRequests.Increment() + if req.TypeUrl == v3.ExtensionConfigurationType { +@@ -296,22 +297,10 @@ func sendDownstreamDelta(deltaDownstream discovery.AggregatedDiscoveryService_De + } + + func (p *XdsProxy) PersistDeltaRequest(req *discovery.DeltaDiscoveryRequest) { +- var ch chan *discovery.DeltaDiscoveryRequest +- var stop chan struct{} +- + p.connectedMutex.Lock() + if p.connected != nil { +- ch = p.connected.deltaRequestsChan +- stop = p.connected.stopChan ++ p.connected.deltaRequestsChan.Put(req) + } + p.initialDeltaRequest = req + p.connectedMutex.Unlock() +- +- // Immediately send if we are currently connect +- if ch != nil { +- select { +- case ch <- req: +- case <-stop: +- } +- } + }