fix deadlock caused by xds (#50)

This commit is contained in:
澄潭
2022-11-10 16:00:15 +08:00
committed by GitHub
parent cdad934567
commit 2b55b6530a

View File

@@ -0,0 +1,185 @@
diff --git a/pkg/istio-agent/xds_proxy.go b/pkg/istio-agent/xds_proxy.go
index 51766751e1..5b33234024 100644
--- a/pkg/istio-agent/xds_proxy.go
+++ b/pkg/istio-agent/xds_proxy.go
@@ -47,6 +47,7 @@ import (
"istio.io/istio/pilot/pkg/features"
istiogrpc "istio.io/istio/pilot/pkg/grpc"
v3 "istio.io/istio/pilot/pkg/xds/v3"
+ "istio.io/istio/pkg/channels"
"istio.io/istio/pkg/config/constants"
dnsProto "istio.io/istio/pkg/dns/proto"
"istio.io/istio/pkg/istio-agent/health"
@@ -233,24 +234,14 @@ func initXdsProxy(ia *Agent) (*XdsProxy, error) {
// PersistRequest sends a request to the currently connected proxy. Additionally, on any reconnection
// to the upstream XDS request we will resend this request.
func (p *XdsProxy) PersistRequest(req *discovery.DiscoveryRequest) {
- var ch chan *discovery.DiscoveryRequest
- var stop chan struct{}
-
p.connectedMutex.Lock()
- if p.connected != nil {
- ch = p.connected.requestsChan
- stop = p.connected.stopChan
+ // Immediately send if we are currently connect
+ if p.connected != nil && p.connected.requestsChan != nil {
+ p.connected.requestsChan.Put(req)
}
+ // Otherwise place it as our initial request for new connections
p.initialRequest = req
p.connectedMutex.Unlock()
-
- // Immediately send if we are currently connect
- if ch != nil {
- select {
- case ch <- req:
- case <-stop:
- }
- }
}
func (p *XdsProxy) UnregisterStream(c *ProxyConnection) {
@@ -276,9 +267,9 @@ type ProxyConnection struct {
conID uint32
upstreamError chan error
downstreamError chan error
- requestsChan chan *discovery.DiscoveryRequest
+ requestsChan *channels.Unbounded
responsesChan chan *discovery.DiscoveryResponse
- deltaRequestsChan chan *discovery.DeltaDiscoveryRequest
+ deltaRequestsChan *channels.Unbounded
deltaResponsesChan chan *discovery.DeltaDiscoveryResponse
stopChan chan struct{}
downstream adsStream
@@ -290,10 +281,7 @@ type ProxyConnection struct {
// sendRequest is a small wrapper around sending to con.requestsChan. This ensures that we do not
// block forever on
func (con *ProxyConnection) sendRequest(req *discovery.DiscoveryRequest) {
- select {
- case con.requestsChan <- req:
- case <-con.stopChan:
- }
+ con.requestsChan.Put(req)
}
type adsStream interface {
@@ -315,10 +303,29 @@ func (p *XdsProxy) handleStream(downstream adsStream) error {
conID: connectionNumber.Inc(),
upstreamError: make(chan error, 2), // can be produced by recv and send
downstreamError: make(chan error, 2), // can be produced by recv and send
- requestsChan: make(chan *discovery.DiscoveryRequest, 10),
- responsesChan: make(chan *discovery.DiscoveryResponse, 10),
- stopChan: make(chan struct{}),
- downstream: downstream,
+ // Requests channel is unbounded. The Envoy<->XDS Proxy<->Istiod system produces a natural
+ // looping of Recv and Send. Due to backpressure introduce by gRPC natively (that is, Send() can
+ // only send so much data without being Recv'd before it starts blocking), along with the
+ // backpressure provided by our channels, we have a risk of deadlock where both xdsproxy and
+ // Istiod are trying to Send, but both are blocked by gRPC backpressure until Recv() is called.
+ // However, Recv can fail to be called by Send being blocked. This can be triggered by the two
+ // sources in our system (Envoy request and Istiod pushes) producing more events than we can keep
+ // up with.
+ // See https://github.com/istio/istio/issues/39209 for more information
+ //
+ // To prevent these issues, we need to either:
+ // 1. Apply backpressure directly to Envoy requests or Istiod pushes
+ // 2. Make part of the system unbounded
+ //
+ // (1) is challenging because we cannot do a conditional Recv (for Envoy requests), and changing
+ // the control plane requires substantial changes. Instead, we make the requests channel
+ // unbounded. This is the least likely to cause issues as the messages we store here are the
+ // smallest relative to other channels.
+ requestsChan: channels.NewUnbounded(),
+ // Allow a buffer of 1. This ensures we queue up at most 2 (one in process, 1 pending) responses before forwarding.
+ responsesChan: make(chan *discovery.DiscoveryResponse, 1),
+ stopChan: make(chan struct{}),
+ downstream: downstream,
}
p.RegisterStream(con)
@@ -452,7 +459,9 @@ func (p *XdsProxy) handleUpstreamRequest(con *ProxyConnection) {
defer con.upstream.CloseSend() // nolint
for {
select {
- case req := <-con.requestsChan:
+ case requ := <-con.requestsChan.Get():
+ con.requestsChan.Load()
+ req := requ.(*discovery.DiscoveryRequest)
proxyLog.Debugf("request for type url %s", req.TypeUrl)
metrics.XdsProxyRequests.Increment()
if req.TypeUrl == v3.ExtensionConfigurationType {
diff --git a/pkg/istio-agent/xds_proxy_delta.go b/pkg/istio-agent/xds_proxy_delta.go
index 2cd82b4059..c2245f9918 100644
--- a/pkg/istio-agent/xds_proxy_delta.go
+++ b/pkg/istio-agent/xds_proxy_delta.go
@@ -27,6 +27,7 @@ import (
"istio.io/istio/pilot/pkg/features"
istiogrpc "istio.io/istio/pilot/pkg/grpc"
v3 "istio.io/istio/pilot/pkg/xds/v3"
+ "istio.io/istio/pkg/channels"
"istio.io/istio/pkg/istio-agent/metrics"
"istio.io/istio/pkg/wasm"
)
@@ -34,10 +35,7 @@ import (
// sendDeltaRequest is a small wrapper around sending to con.requestsChan. This ensures that we do not
// block forever on
func (con *ProxyConnection) sendDeltaRequest(req *discovery.DeltaDiscoveryRequest) {
- select {
- case con.deltaRequestsChan <- req:
- case <-con.stopChan:
- }
+ con.deltaRequestsChan.Put(req)
}
// requests from envoy
@@ -48,10 +46,11 @@ func (p *XdsProxy) DeltaAggregatedResources(downstream discovery.AggregatedDisco
proxyLog.Debugf("accepted delta xds connection from envoy, forwarding to upstream")
con := &ProxyConnection{
- upstreamError: make(chan error, 2), // can be produced by recv and send
- downstreamError: make(chan error, 2), // can be produced by recv and send
- deltaRequestsChan: make(chan *discovery.DeltaDiscoveryRequest, 10),
- deltaResponsesChan: make(chan *discovery.DeltaDiscoveryResponse, 10),
+ upstreamError: make(chan error, 2), // can be produced by recv and send
+ downstreamError: make(chan error, 2), // can be produced by recv and send
+ deltaRequestsChan: channels.NewUnbounded(),
+ // Allow a buffer of 1. This ensures we queue up at most 2 (one in process, 1 pending) responses before forwarding.
+ deltaResponsesChan: make(chan *discovery.DeltaDiscoveryResponse, 1),
stopChan: make(chan struct{}),
downstreamDeltas: downstream,
}
@@ -190,7 +189,9 @@ func (p *XdsProxy) handleUpstreamDeltaRequest(con *ProxyConnection) {
}()
for {
select {
- case req := <-con.deltaRequestsChan:
+ case requ := <-con.deltaRequestsChan.Get():
+ con.deltaRequestsChan.Load()
+ req := requ.(*discovery.DeltaDiscoveryRequest)
proxyLog.Debugf("delta request for type url %s", req.TypeUrl)
metrics.XdsProxyRequests.Increment()
if req.TypeUrl == v3.ExtensionConfigurationType {
@@ -296,22 +297,10 @@ func sendDownstreamDelta(deltaDownstream discovery.AggregatedDiscoveryService_De
}
func (p *XdsProxy) PersistDeltaRequest(req *discovery.DeltaDiscoveryRequest) {
- var ch chan *discovery.DeltaDiscoveryRequest
- var stop chan struct{}
-
p.connectedMutex.Lock()
if p.connected != nil {
- ch = p.connected.deltaRequestsChan
- stop = p.connected.stopChan
+ p.connected.deltaRequestsChan.Put(req)
}
p.initialDeltaRequest = req
p.connectedMutex.Unlock()
-
- // Immediately send if we are currently connect
- if ch != nil {
- select {
- case ch <- req:
- case <-stop:
- }
- }
}