diff --git a/.github/workflows/build-and-test-plugin.yaml b/.github/workflows/build-and-test-plugin.yaml index 4cd993d25..53a0ddcb0 100644 --- a/.github/workflows/build-and-test-plugin.yaml +++ b/.github/workflows/build-and-test-plugin.yaml @@ -52,11 +52,9 @@ jobs: uses: actions/cache@v3 with: path: |- - envoy - istio .git/modules - key: ${{ runner.os }}-submodules-new-${{ github.run_id }} - restore-keys: ${{ runner.os }}-submodules-new + key: ${{ runner.os }}-submodules-cache-${{ github.run_id }} + restore-keys: ${{ runner.os }}-submodules-cache - run: git stash # restore patch diff --git a/.github/workflows/build-and-test.yaml b/.github/workflows/build-and-test.yaml index 18566938b..149c7cf52 100644 --- a/.github/workflows/build-and-test.yaml +++ b/.github/workflows/build-and-test.yaml @@ -36,11 +36,9 @@ jobs: uses: actions/cache@v3 with: path: |- - envoy - istio .git/modules - key: ${{ runner.os }}-submodules-new-${{ github.run_id }} - restore-keys: ${{ runner.os }}-submodules-new + key: ${{ runner.os }}-submodules-cache-${{ github.run_id }} + restore-keys: ${{ runner.os }}-submodules-cache - run: git stash # restore patch @@ -82,11 +80,9 @@ jobs: uses: actions/cache@v3 with: path: |- - envoy - istio .git/modules - key: ${{ runner.os }}-submodules-new-${{ github.run_id }} - restore-keys: ${{ runner.os }}-submodules-new + key: ${{ runner.os }}-submodules-cache-${{ github.run_id }} + restore-keys: ${{ runner.os }}-submodules-cache - run: git stash # restore patch @@ -130,11 +126,9 @@ jobs: uses: actions/cache@v3 with: path: |- - envoy - istio .git/modules - key: ${{ runner.os }}-submodules-new-${{ github.run_id }} - restore-keys: ${{ runner.os }}-submodules-new + key: ${{ runner.os }}-submodules-cache-${{ github.run_id }} + restore-keys: ${{ runner.os }}-submodules-cache - run: git stash # restore patch diff --git a/envoy/1.20/patches/envoy/20240116-fix-cve.patch b/envoy/1.20/patches/envoy/20240116-fix-cve.patch new file mode 100644 index 000000000..982f02d6c --- /dev/null +++ b/envoy/1.20/patches/envoy/20240116-fix-cve.patch @@ -0,0 +1,1445 @@ +diff -Naur envoy/source/common/http/conn_manager_config.h envoy-new/source/common/http/conn_manager_config.h +--- envoy/source/common/http/conn_manager_config.h 2024-01-16 23:42:11.191853354 +0800 ++++ envoy-new/source/common/http/conn_manager_config.h 2024-01-16 23:27:25.849358541 +0800 +@@ -62,6 +62,7 @@ + COUNTER(downstream_rq_rejected_via_ip_detection) \ + COUNTER(downstream_rq_response_before_rq_complete) \ + COUNTER(downstream_rq_rx_reset) \ ++ COUNTER(downstream_rq_too_many_premature_resets) \ + COUNTER(downstream_rq_timeout) \ + COUNTER(downstream_rq_header_timeout) \ + COUNTER(downstream_rq_too_large) \ +diff -Naur envoy/source/common/http/conn_manager_impl.cc envoy-new/source/common/http/conn_manager_impl.cc +--- envoy/source/common/http/conn_manager_impl.cc 2024-01-16 23:42:11.307854195 +0800 ++++ envoy-new/source/common/http/conn_manager_impl.cc 2024-01-16 23:27:25.853358572 +0800 +@@ -1,5 +1,6 @@ + #include "source/common/http/conn_manager_impl.h" + ++#include + #include + #include + #include +@@ -52,6 +53,15 @@ + namespace Envoy { + namespace Http { + ++const absl::string_view ConnectionManagerImpl::PrematureResetTotalStreamCountKey = ++ "overload.premature_reset_total_stream_count"; ++const absl::string_view ConnectionManagerImpl::PrematureResetMinStreamLifetimeSecondsKey = ++ "overload.premature_reset_min_stream_lifetime_seconds"; ++// Runtime key for maximum number of requests that can be processed from a single connection per ++// I/O cycle. Requests over this limit are deferred until the next I/O cycle. ++const absl::string_view ConnectionManagerImpl::MaxRequestsPerIoCycle = ++ "http.max_requests_per_io_cycle"; ++ + bool requestWasConnect(const RequestHeaderMapPtr& headers, Protocol protocol) { + if (!headers) { + return false; +@@ -102,8 +112,10 @@ + overload_disable_keepalive_ref_( + overload_state_.getState(Server::OverloadActionNames::get().DisableHttpKeepAlive)), + time_source_(time_source), +- enable_internal_redirects_with_body_(Runtime::runtimeFeatureEnabled( +- "envoy.reloadable_features.internal_redirects_with_body")) {} ++ enable_internal_redirects_with_body_( ++ Runtime::runtimeFeatureEnabled("envoy.reloadable_features.internal_redirects_with_body")), ++ max_requests_during_dispatch_(runtime_.snapshot().getInteger( ++ ConnectionManagerImpl::MaxRequestsPerIoCycle, UINT32_MAX)) {} + + const ResponseHeaderMap& ConnectionManagerImpl::continueHeader() { + static const auto headers = createHeaderMap( +@@ -113,6 +125,12 @@ + + void ConnectionManagerImpl::initializeReadFilterCallbacks(Network::ReadFilterCallbacks& callbacks) { + read_callbacks_ = &callbacks; ++ if (max_requests_during_dispatch_ != UINT32_MAX) { ++ deferred_request_processing_callback_ = ++ callbacks.connection().dispatcher().createSchedulableCallback( ++ [this]() -> void { onDeferredRequestProcessing(); }); ++ } ++ + stats_.named_.downstream_cx_total_.inc(); + stats_.named_.downstream_cx_active_.inc(); + if (read_callbacks_->connection().ssl()) { +@@ -252,6 +270,10 @@ + } + + void ConnectionManagerImpl::doDeferredStreamDestroy(ActiveStream& stream) { ++ ++closed_non_internally_destroyed_requests_; ++ if (isPrematureRstStream(stream)) { ++ ++number_premature_stream_resets_; ++ } + if (stream.max_stream_duration_timer_) { + stream.max_stream_duration_timer_->disableTimer(); + stream.max_stream_duration_timer_ = nullptr; +@@ -277,6 +299,7 @@ + if (connection_idle_timer_ && streams_.empty()) { + connection_idle_timer_->enableTimer(config_.idleTimeout().value()); + } ++ maybeDrainDueToPrematureResets(); + } + + RequestDecoder& ConnectionManagerImpl::newStream(ResponseEncoder& response_encoder, +@@ -356,6 +379,7 @@ + } + + Network::FilterStatus ConnectionManagerImpl::onData(Buffer::Instance& data, bool) { ++ requests_during_dispatch_count_ = 0; + if (!codec_) { + // Http3 codec should have been instantiated by now. + createCodec(data); +@@ -510,6 +534,59 @@ + } + } + ++bool ConnectionManagerImpl::isPrematureRstStream(const ActiveStream& stream) const { ++ // Check if the request was prematurely reset, by comparing its lifetime to the configured ++ // threshold. ++ MonotonicTime current_time = time_source_.monotonicTime(); ++ MonotonicTime request_start_time = stream.filter_manager_.streamInfo().startTimeMonotonic(); ++ std::chrono::nanoseconds duration = ++ std::chrono::duration_cast(current_time - request_start_time); ++ ++ // Check if request lifetime is longer than the premature reset threshold. ++ if (duration.count() > 0) { ++ const uint64_t lifetime = std::chrono::duration_cast(duration).count(); ++ const uint64_t min_lifetime = runtime_.snapshot().getInteger( ++ ConnectionManagerImpl::PrematureResetMinStreamLifetimeSecondsKey, 1); ++ if (lifetime > min_lifetime) { ++ return false; ++ } ++ } ++ ++ // If request has completed before configured threshold, also check if the Envoy proxied the ++ // response from the upstream. Requests without the response status were reset. ++ // TODO(RyanTheOptimist): Possibly support half_closed_local instead. ++ return !stream.filter_manager_.streamInfo().responseCode(); ++} ++ ++// Sends a GOAWAY if too many streams have been reset prematurely on this ++// connection. ++void ConnectionManagerImpl::maybeDrainDueToPrematureResets() { ++ if (!Runtime::runtimeFeatureEnabled( ++ "envoy.restart_features.send_goaway_for_premature_rst_streams") || ++ closed_non_internally_destroyed_requests_ == 0) { ++ return; ++ } ++ ++ const uint64_t limit = ++ runtime_.snapshot().getInteger(ConnectionManagerImpl::PrematureResetTotalStreamCountKey, 500); ++ ++ if (closed_non_internally_destroyed_requests_ < limit) { ++ return; ++ } ++ ++ if (static_cast(number_premature_stream_resets_) / ++ closed_non_internally_destroyed_requests_ < ++ .5) { ++ return; ++ } ++ ++ if (drain_state_ == DrainState::NotDraining) { ++ stats_.named_.downstream_rq_too_many_premature_resets_.inc(); ++ doConnectionClose(Network::ConnectionCloseType::NoFlush, absl::nullopt, ++ "too_many_premature_resets"); ++ } ++} ++ + void ConnectionManagerImpl::onGoAway(GoAwayErrorCode) { + // Currently we do nothing with remote go away frames. In the future we can decide to no longer + // push resources if applicable. +@@ -1093,7 +1170,12 @@ + traceRequest(); + } + +- filter_manager_.decodeHeaders(*request_headers_, end_stream); ++ if (!connection_manager_.shouldDeferRequestProxyingToNextIoCycle()) { ++ filter_manager_.decodeHeaders(*request_headers_, end_stream); ++ } else { ++ state_.deferred_to_next_io_iteration_ = true; ++ state_.deferred_end_stream_ = end_stream; ++ } + + // Reset it here for both global and overridden cases. + resetIdleTimer(); +@@ -1161,8 +1243,15 @@ + connection_manager_.read_callbacks_->connection().dispatcher()); + filter_manager_.maybeEndDecode(end_stream); + filter_manager_.streamInfo().addBytesReceived(data.length()); +- +- filter_manager_.decodeData(data, end_stream); ++ if (!state_.deferred_to_next_io_iteration_) { ++ filter_manager_.decodeData(data, end_stream); ++ } else { ++ if (!deferred_data_) { ++ deferred_data_ = std::make_unique(); ++ } ++ deferred_data_->move(data); ++ state_.deferred_end_stream_ = end_stream; ++ } + } + + void ConnectionManagerImpl::ActiveStream::decodeTrailers(RequestTrailerMapPtr&& trailers) { +@@ -1173,7 +1262,9 @@ + ASSERT(!request_trailers_); + request_trailers_ = std::move(trailers); + filter_manager_.maybeEndDecode(true); +- filter_manager_.decodeTrailers(*request_trailers_); ++ if (!state_.deferred_to_next_io_iteration_) { ++ filter_manager_.decodeTrailers(*request_trailers_); ++ } + } + + void ConnectionManagerImpl::ActiveStream::decodeMetadata(MetadataMapPtr&& metadata_map) { +@@ -1733,5 +1824,61 @@ + connection_manager_.doEndStream(*this); + } + ++bool ConnectionManagerImpl::ActiveStream::onDeferredRequestProcessing() { ++ // TODO(yanavlasov): Merge this with the filter manager continueIteration() method ++ if (!state_.deferred_to_next_io_iteration_) { ++ return false; ++ } ++ state_.deferred_to_next_io_iteration_ = false; ++ bool end_stream = ++ state_.deferred_end_stream_ && deferred_data_ == nullptr && request_trailers_ == nullptr; ++ filter_manager_.decodeHeaders(*request_headers_, end_stream); ++ if (end_stream) { ++ return true; ++ } ++ if (deferred_data_ != nullptr) { ++ end_stream = state_.deferred_end_stream_ && request_trailers_ == nullptr; ++ filter_manager_.decodeData(*deferred_data_, end_stream); ++ } ++ if (request_trailers_ != nullptr) { ++ filter_manager_.decodeTrailers(*request_trailers_); ++ } ++ return true; ++} ++ ++bool ConnectionManagerImpl::shouldDeferRequestProxyingToNextIoCycle() { ++ // Do not defer this stream if stream deferral is disabled ++ if (deferred_request_processing_callback_ == nullptr) { ++ return false; ++ } ++ // Defer this stream if there are already deferred streams, so they are not ++ // processed out of order ++ if (deferred_request_processing_callback_->enabled()) { ++ return true; ++ } ++ ++requests_during_dispatch_count_; ++ bool defer = requests_during_dispatch_count_ > max_requests_during_dispatch_; ++ if (defer) { ++ deferred_request_processing_callback_->scheduleCallbackNextIteration(); ++ } ++ return defer; ++} ++ ++void ConnectionManagerImpl::onDeferredRequestProcessing() { ++ requests_during_dispatch_count_ = 1; // 1 stream is always let through ++ // Streams are inserted at the head of the list. As such process deferred ++ // streams at the back of the list first. ++ for (auto reverse_iter = streams_.rbegin(); reverse_iter != streams_.rend();) { ++ auto& stream_ptr = *reverse_iter; ++ // Move the iterator to the next item in case the `onDeferredRequestProcessing` call removes the ++ // stream from the list. ++ ++reverse_iter; ++ bool was_deferred = stream_ptr->onDeferredRequestProcessing(); ++ if (was_deferred && shouldDeferRequestProxyingToNextIoCycle()) { ++ break; ++ } ++ } ++} ++ + } // namespace Http + } // namespace Envoy +diff -Naur envoy/source/common/http/conn_manager_impl.h envoy-new/source/common/http/conn_manager_impl.h +--- envoy/source/common/http/conn_manager_impl.h 2024-01-16 23:42:11.275853963 +0800 ++++ envoy-new/source/common/http/conn_manager_impl.h 2024-01-16 23:27:25.853358572 +0800 +@@ -115,6 +115,15 @@ + void setClearHopByHopResponseHeaders(bool value) { clear_hop_by_hop_response_headers_ = value; } + bool clearHopByHopResponseHeaders() const { return clear_hop_by_hop_response_headers_; } + ++ // This runtime key configures the number of streams which must be closed on a connection before ++ // envoy will potentially drain a connection due to excessive prematurely reset streams. ++ static const absl::string_view PrematureResetTotalStreamCountKey; ++ ++ // The minimum lifetime of a stream, in seconds, in order not to be considered ++ // prematurely closed. ++ static const absl::string_view PrematureResetMinStreamLifetimeSecondsKey; ++ static const absl::string_view MaxRequestsPerIoCycle; ++ + private: + struct ActiveStream; + class MobileConnectionManagerImpl; +@@ -308,7 +317,8 @@ + struct State { + State() + : codec_saw_local_complete_(false), saw_connection_close_(false), +- successful_upgrade_(false), is_internally_created_(false), decorated_propagate_(true) {} ++ successful_upgrade_(false), is_internally_created_(false), decorated_propagate_(true), ++ deferred_to_next_io_iteration_(false) {} + + bool codec_saw_local_complete_ : 1; // This indicates that local is complete as written all + // the way through to the codec. +@@ -320,6 +330,14 @@ + bool is_internally_created_ : 1; + + bool decorated_propagate_ : 1; ++ ++ // Indicates that sending headers to the filter manager is deferred to the ++ // next I/O cycle. If data or trailers are received when this flag is set ++ // they are deferred too. ++ // TODO(yanavlasov): encapsulate the entire state of deferred streams into a separate ++ // structure, so it can be atomically created and cleared. ++ bool deferred_to_next_io_iteration_ : 1; ++ bool deferred_end_stream_ : 1; + }; + + // Per-stream idle timeout callback. +@@ -347,6 +365,11 @@ + return *tracing_custom_tags_; + } + ++ // Dispatch deferred headers, body and trailers to the filter manager. ++ // Return true if this stream was deferred and dispatched pending headers, body and trailers (if ++ // present). Return false if this stream was not deferred. ++ bool onDeferredRequestProcessing(); ++ + ConnectionManagerImpl& connection_manager_; + // TODO(snowp): It might make sense to move this to the FilterManager to avoid storing it in + // both locations, then refer to the FM when doing stream logs. +@@ -390,6 +413,8 @@ + std::unique_ptr tracing_custom_tags_{nullptr}; + + friend FilterManager; ++ ++ std::unique_ptr deferred_data_; + }; + + using ActiveStreamPtr = std::unique_ptr; +@@ -422,6 +447,18 @@ + void doConnectionClose(absl::optional close_type, + absl::optional response_flag, + absl::string_view details); ++ // Returns true if a RST_STREAM for the given stream is premature. Premature ++ // means the RST_STREAM arrived before response headers were sent and than ++ // the stream was alive for short period of time. This period is specified ++ // by the optional runtime value PrematureResetMinStreamLifetimeSecondsKey, ++ // or one second if that is not present. ++ bool isPrematureRstStream(const ActiveStream& stream) const; ++ // Sends a GOAWAY if both sufficient streams have been closed on a connection ++ // and at least half have been prematurely reset? ++ void maybeDrainDueToPrematureResets(); ++ ++ bool shouldDeferRequestProxyingToNextIoCycle(); ++ void onDeferredRequestProcessing(); + + enum class DrainState { NotDraining, Draining, Closing }; + +@@ -461,6 +498,15 @@ + bool clear_hop_by_hop_response_headers_{true}; + // The number of requests accumulated on the current connection. + uint64_t accumulated_requests_{}; ++ // The number of requests closed on the current connection which were ++ // not internally destroyed ++ uint64_t closed_non_internally_destroyed_requests_{}; ++ // The number of requests that received a premature RST_STREAM, according to ++ // the definition given in `isPrematureRstStream()`. ++ uint64_t number_premature_stream_resets_{0}; ++ uint32_t requests_during_dispatch_count_{0}; ++ const uint32_t max_requests_during_dispatch_{UINT32_MAX}; ++ Event::SchedulableCallbackPtr deferred_request_processing_callback_; + }; + + } // namespace Http +diff -Naur envoy/source/common/runtime/runtime_features.cc envoy-new/source/common/runtime/runtime_features.cc +--- envoy/source/common/runtime/runtime_features.cc 2024-01-16 23:42:11.279853992 +0800 ++++ envoy-new/source/common/runtime/runtime_features.cc 2024-01-16 23:27:25.853358572 +0800 +@@ -100,6 +100,7 @@ + "envoy.reloadable_features.vhds_heartbeats", + "envoy.reloadable_features.wasm_cluster_name_envoy_grpc", + "envoy.reloadable_features.upstream_http2_flood_checks", ++ "envoy.restart_features.send_goaway_for_premature_rst_streams", + "envoy.restart_features.use_apple_api_for_dns_lookups", + "envoy.reloadable_features.header_map_correctly_coalesce_cookies", + "envoy.reloadable_features.sanitize_http_header_referer", +diff -Naur envoy/test/common/http/conn_manager_impl_test_2.cc envoy-new/test/common/http/conn_manager_impl_test_2.cc +--- envoy/test/common/http/conn_manager_impl_test_2.cc 2024-01-16 23:42:11.223853586 +0800 ++++ envoy-new/test/common/http/conn_manager_impl_test_2.cc 2024-01-16 23:27:25.853358572 +0800 +@@ -13,6 +13,7 @@ + using testing::Property; + using testing::Ref; + using testing::Return; ++using testing::ReturnArg; + using testing::ReturnRef; + + namespace Envoy { +@@ -2428,7 +2429,7 @@ + + EXPECT_CALL(*static_cast( + scopedRouteConfigProvider()->config().get()), +- getRouteConfig(_)) ++ getRouteConfig(_, _)) + .Times(2) + .WillRepeatedly(Return(nullptr)); + EXPECT_CALL(*codec_, dispatch(_)).WillOnce(Invoke([&](Buffer::Instance& data) -> Http::Status { +@@ -2460,7 +2461,7 @@ + + EXPECT_CALL(*static_cast( + scopedRouteConfigProvider()->config().get()), +- getRouteConfig(_)) ++ getRouteConfig(_, _)) + .Times(3) + .WillOnce(Return(nullptr)) + .WillOnce(Return(nullptr)) // refreshCachedRoute first time. +@@ -2520,12 +2521,13 @@ + EXPECT_CALL(*route_config2, route(_, _, _, _)).WillRepeatedly(Return(route2)); + EXPECT_CALL(*static_cast( + scopedRouteConfigProvider()->config().get()), +- getRouteConfig(_)) ++ getRouteConfig(_, _)) + // 1. Snap scoped route config; + // 2. refreshCachedRoute (both in decodeHeaders(headers,end_stream); + // 3. then refreshCachedRoute triggered by decoder_filters_[1]->callbacks_->route(). + .Times(3) +- .WillRepeatedly(Invoke([&](const HeaderMap& headers) -> Router::ConfigConstSharedPtr { ++ .WillRepeatedly(Invoke([&](const HeaderMap& headers, ++ const StreamInfo::StreamInfo&) -> Router::ConfigConstSharedPtr { + auto& test_headers = dynamic_cast(headers); + if (test_headers.get_("scope_key") == "foo") { + return route_config1; +@@ -2579,7 +2581,8 @@ + std::shared_ptr fake_cluster1 = + std::make_shared>(); + EXPECT_CALL(cluster_manager_, getThreadLocalCluster(_)).WillOnce(Return(fake_cluster1.get())); +- EXPECT_CALL(*scopedRouteConfigProvider()->config(), getRouteConfig(_)) ++ EXPECT_CALL(*scopedRouteConfigProvider()->config(), ++ getRouteConfig(_, _)) + // 1. decodeHeaders() snapping route config. + // 2. refreshCachedRoute() later in the same decodeHeaders(). + .Times(2); +@@ -2848,5 +2851,240 @@ + EXPECT_EQ(1U, stats_.named_.downstream_rq_rejected_via_ip_detection_.value()); + } + ++// Validate that deferred streams are processed with a variety of ++// headers, data and trailer arriving in the same I/O cycle ++TEST_F(HttpConnectionManagerImplTest, LimitWorkPerIOCycle) { ++ const int kRequestsSentPerIOCycle = 100; ++ EXPECT_CALL(runtime_.snapshot_, getInteger(_, _)).WillRepeatedly(ReturnArg<1>()); ++ // Process 1 request per I/O cycle ++ auto* deferred_request_callback = enableStreamsPerIoLimit(1); ++ setup(false, ""); ++ ++ // Store the basic request encoder during filter chain setup. ++ std::vector> encoder_filters; ++ int decode_headers_call_count = 0; ++ for (int i = 0; i < kRequestsSentPerIOCycle; ++i) { ++ std::shared_ptr filter(new NiceMock()); ++ ++ // Each 4th request is headers only ++ EXPECT_CALL(*filter, decodeHeaders(_, i % 4 == 0 ? true : false)) ++ .WillRepeatedly(Invoke([&](RequestHeaderMap&, bool) -> FilterHeadersStatus { ++ ++decode_headers_call_count; ++ return FilterHeadersStatus::StopIteration; ++ })); ++ ++ // Each 1st request is headers and data only ++ // Each 2nd request is headers, data and trailers ++ if (i % 4 == 1 || i % 4 == 2) { ++ EXPECT_CALL(*filter, decodeData(_, i % 4 == 1 ? true : false)) ++ .WillOnce(Return(FilterDataStatus::StopIterationNoBuffer)); ++ } ++ ++ // Each 3rd request is headers and trailers (no data) ++ if (i % 4 == 2 || i % 4 == 3) { ++ EXPECT_CALL(*filter, decodeTrailers(_)).WillOnce(Return(FilterTrailersStatus::StopIteration)); ++ } ++ ++ EXPECT_CALL(*filter, setDecoderFilterCallbacks(_)); ++ encoder_filters.push_back(std::move(filter)); ++ } ++ ++ uint64_t random_value = 0; ++ EXPECT_CALL(random_, random()).WillRepeatedly(Invoke([&random_value]() { ++ return random_value++; ++ })); ++ ++ EXPECT_CALL(filter_factory_, createFilterChain(_)) ++ .Times(kRequestsSentPerIOCycle) ++ .WillRepeatedly(Invoke([&](FilterChainFactoryCallbacks& callbacks) -> void { ++ static int index = 0; ++ int i = index++; ++ callbacks.addStreamDecoderFilter(encoder_filters[i]); ++ })); ++ ++ EXPECT_CALL(filter_callbacks_.connection_.dispatcher_, deferredDelete_(_)) ++ .Times(kRequestsSentPerIOCycle); ++ ++ std::vector> response_encoders(kRequestsSentPerIOCycle); ++ for (auto& encoder : response_encoders) { ++ EXPECT_CALL(encoder, getStream()).WillRepeatedly(ReturnRef(encoder.stream_)); ++ } ++ ++ EXPECT_CALL(*codec_, dispatch(_)) ++ .WillRepeatedly(Invoke([&](Buffer::Instance& data) -> Http::Status { ++ for (int i = 0; i < kRequestsSentPerIOCycle; ++i) { ++ decoder_ = &conn_manager_->newStream(response_encoders[i]); ++ ++ RequestHeaderMapPtr headers{new TestRequestHeaderMapImpl{ ++ {":authority", "host"}, {":path", "/"}, {":method", "GET"}}}; ++ ++ RequestTrailerMapPtr trailers{ ++ new TestRequestTrailerMapImpl{{"key1", "value1"}, {"key2", "value2"}}}; ++ ++ Buffer::OwnedImpl data("data"); ++ ++ switch (i % 4) { ++ case 0: ++ decoder_->decodeHeaders(std::move(headers), true); ++ break; ++ case 1: ++ decoder_->decodeHeaders(std::move(headers), false); ++ decoder_->decodeData(data, true); ++ break; ++ case 2: ++ decoder_->decodeHeaders(std::move(headers), false); ++ decoder_->decodeData(data, false); ++ decoder_->decodeTrailers(std::move(trailers)); ++ break; ++ case 3: ++ decoder_->decodeHeaders(std::move(headers), false); ++ decoder_->decodeTrailers(std::move(trailers)); ++ break; ++ } ++ } ++ ++ data.drain(4); ++ return Http::okStatus(); ++ })); ++ ++ // Kick off the incoming data. ++ Buffer::OwnedImpl fake_input("1234"); ++ conn_manager_->onData(fake_input, false); ++ ++ EXPECT_TRUE(deferred_request_callback->enabled_); ++ // Only one request should go through the filter chain ++ ASSERT_EQ(decode_headers_call_count, 1); ++ ++ // Let other requests to go through the filter chain. Call expectations will fail ++ // if this is not the case. ++ int deferred_request_count = 0; ++ while (deferred_request_callback->enabled_) { ++ deferred_request_callback->invokeCallback(); ++ ++deferred_request_count; ++ } ++ ++ ASSERT_EQ(deferred_request_count, kRequestsSentPerIOCycle); ++ ++ for (auto& filter : encoder_filters) { ++ ResponseHeaderMapPtr response_headers{new TestResponseHeaderMapImpl{{":status", "200"}}}; ++ filter->callbacks_->streamInfo().setResponseCodeDetails(""); ++ filter->callbacks_->encodeHeaders(std::move(response_headers), true, "details"); ++ } ++ ++ EXPECT_EQ(kRequestsSentPerIOCycle, stats_.named_.downstream_rq_2xx_.value()); ++ EXPECT_EQ(kRequestsSentPerIOCycle, listener_stats_.downstream_rq_2xx_.value()); ++ EXPECT_EQ(kRequestsSentPerIOCycle, stats_.named_.downstream_rq_completed_.value()); ++ EXPECT_EQ(kRequestsSentPerIOCycle, listener_stats_.downstream_rq_completed_.value()); ++} ++ ++TEST_F(HttpConnectionManagerImplTest, StreamDeferralPreservesOrder) { ++ EXPECT_CALL(runtime_.snapshot_, getInteger(_, _)).WillRepeatedly(ReturnArg<1>()); ++ // Process 1 request per I/O cycle ++ auto* deferred_request_callback = enableStreamsPerIoLimit(1); ++ setup(false, ""); ++ ++ std::vector> encoder_filters; ++ int expected_request_id = 0; ++ const Http::LowerCaseString request_id_header(absl::string_view("request-id")); ++ // Two requests are processed in 2 I/O reads ++ const int TotalRequests = 2 * 2; ++ for (int i = 0; i < TotalRequests; ++i) { ++ std::shared_ptr filter(new NiceMock()); ++ ++ EXPECT_CALL(*filter, decodeHeaders(_, true)) ++ .WillRepeatedly(Invoke([&](RequestHeaderMap& headers, bool) -> FilterHeadersStatus { ++ // Check that requests are decoded in expected order ++ int request_id = 0; ++ ASSERT(absl::SimpleAtoi(headers.get(request_id_header)[0]->value().getStringView(), ++ &request_id)); ++ ASSERT(request_id == expected_request_id); ++ ++expected_request_id; ++ return FilterHeadersStatus::StopIteration; ++ })); ++ ++ EXPECT_CALL(*filter, setDecoderFilterCallbacks(_)); ++ encoder_filters.push_back(std::move(filter)); ++ } ++ ++ uint64_t random_value = 0; ++ EXPECT_CALL(random_, random()).WillRepeatedly(Invoke([&random_value]() { ++ return random_value++; ++ })); ++ ++ EXPECT_CALL(filter_factory_, createFilterChain(_)) ++ .Times(TotalRequests) ++ .WillRepeatedly(Invoke([&](FilterChainFactoryCallbacks& callbacks) -> void { ++ static int index = 0; ++ int i = index++; ++ callbacks.addStreamDecoderFilter(encoder_filters[i]); ++ })); ++ ++ EXPECT_CALL(filter_callbacks_.connection_.dispatcher_, deferredDelete_(_)).Times(TotalRequests); ++ ++ std::vector> response_encoders(TotalRequests); ++ for (auto& encoder : response_encoders) { ++ EXPECT_CALL(encoder, getStream()).WillRepeatedly(ReturnRef(encoder.stream_)); ++ } ++ auto response_encoders_iter = response_encoders.begin(); ++ ++ int request_id = 0; ++ EXPECT_CALL(*codec_, dispatch(_)) ++ .WillRepeatedly(Invoke([&](Buffer::Instance& data) -> Http::Status { ++ // The second request should be deferred ++ for (int i = 0; i < 2; ++i) { ++ decoder_ = &conn_manager_->newStream(*response_encoders_iter); ++ ++response_encoders_iter; ++ ++ RequestHeaderMapPtr headers{ ++ new TestRequestHeaderMapImpl{{":authority", "host"}, ++ {":path", "/"}, ++ {":method", "GET"}, ++ {"request-id", absl::StrCat(request_id)}}}; ++ ++ ++request_id; ++ decoder_->decodeHeaders(std::move(headers), true); ++ } ++ ++ data.drain(4); ++ return Http::okStatus(); ++ })); ++ ++ // Kick off the incoming data. ++ Buffer::OwnedImpl fake_input("1234"); ++ conn_manager_->onData(fake_input, false); ++ ++ EXPECT_TRUE(deferred_request_callback->enabled_); ++ // Only one request should go through the filter chain ++ ASSERT_EQ(expected_request_id, 1); ++ ++ // Test arrival of another request. New request is read from the socket before deferred callbacks. ++ Buffer::OwnedImpl fake_input2("1234"); ++ conn_manager_->onData(fake_input2, false); ++ ++ // No requests from the second read should go through as there are deferred stream present ++ ASSERT_EQ(expected_request_id, 1); ++ ++ // Let other requests to go through the filter chain. Call expectations will fail ++ // if this is not the case. ++ int deferred_request_count = 0; ++ while (deferred_request_callback->enabled_) { ++ deferred_request_callback->invokeCallback(); ++ ++deferred_request_count; ++ } ++ ++ ASSERT_EQ(deferred_request_count, TotalRequests); ++ ++ for (auto& filter : encoder_filters) { ++ ResponseHeaderMapPtr response_headers{new TestResponseHeaderMapImpl{{":status", "200"}}}; ++ filter->callbacks_->streamInfo().setResponseCodeDetails(""); ++ filter->callbacks_->encodeHeaders(std::move(response_headers), true, "details"); ++ } ++ ++ EXPECT_EQ(TotalRequests, stats_.named_.downstream_rq_2xx_.value()); ++ EXPECT_EQ(TotalRequests, listener_stats_.downstream_rq_2xx_.value()); ++ EXPECT_EQ(TotalRequests, stats_.named_.downstream_rq_completed_.value()); ++ EXPECT_EQ(TotalRequests, listener_stats_.downstream_rq_completed_.value()); ++} ++ + } // namespace Http + } // namespace Envoy +diff -Naur envoy/test/common/http/conn_manager_impl_test_base.cc envoy-new/test/common/http/conn_manager_impl_test_base.cc +--- envoy/test/common/http/conn_manager_impl_test_base.cc 2024-01-16 23:42:11.223853586 +0800 ++++ envoy-new/test/common/http/conn_manager_impl_test_base.cc 2024-01-16 23:27:25.853358572 +0800 +@@ -77,6 +77,7 @@ + conn_manager_ = std::make_unique( + *this, drain_close_, random_, http_context_, runtime_, local_info_, cluster_manager_, + overload_manager_, test_time_.timeSystem()); ++ + conn_manager_->initializeReadFilterCallbacks(filter_callbacks_); + + if (tracing) { +@@ -294,5 +295,23 @@ + conn_manager_->onData(fake_input, false); + } + ++Event::MockSchedulableCallback* ++HttpConnectionManagerImplTest::enableStreamsPerIoLimit(uint32_t limit) { ++ EXPECT_CALL(runtime_.snapshot_, getInteger("http.max_requests_per_io_cycle", _)) ++ .WillOnce(Return(limit)); ++ ++ // Expect HCM to create and set schedulable callback ++ auto* deferred_request_callback = ++ new Event::MockSchedulableCallback(&filter_callbacks_.connection_.dispatcher_); ++ EXPECT_CALL(*deferred_request_callback, enabled()) ++ .WillRepeatedly( ++ Invoke([deferred_request_callback]() { return deferred_request_callback->enabled_; })); ++ EXPECT_CALL(*deferred_request_callback, scheduleCallbackNextIteration()) ++ .WillRepeatedly( ++ Invoke([deferred_request_callback]() { deferred_request_callback->enabled_ = true; })); ++ ++ return deferred_request_callback; ++} ++ + } // namespace Http + } // namespace Envoy +diff -Naur envoy/test/common/http/conn_manager_impl_test_base.h envoy-new/test/common/http/conn_manager_impl_test_base.h +--- envoy/test/common/http/conn_manager_impl_test_base.h 2024-01-16 23:42:11.223853586 +0800 ++++ envoy-new/test/common/http/conn_manager_impl_test_base.h 2024-01-16 23:27:25.853358572 +0800 +@@ -155,6 +155,8 @@ + } + uint64_t maxRequestsPerConnection() const override { return 0; } + ++ Event::MockSchedulableCallback* enableStreamsPerIoLimit(uint32_t limit); ++ + Envoy::Event::SimulatedTimeSystem test_time_; + NiceMock route_config_provider_; + std::shared_ptr route_config_{new NiceMock()}; +diff -Naur envoy/test/common/http/http2/http2_frame.cc envoy-new/test/common/http/http2/http2_frame.cc +--- envoy/test/common/http/http2/http2_frame.cc 2024-01-16 23:42:11.223853586 +0800 ++++ envoy-new/test/common/http/http2/http2_frame.cc 2024-01-16 23:27:25.853358572 +0800 +@@ -339,7 +339,11 @@ + makeNetworkOrderStreamId(stream_index)); + frame.appendStaticHeader(StaticHeaderIndex::MethodGet); + frame.appendStaticHeader(StaticHeaderIndex::SchemeHttps); +- frame.appendHeaderWithoutIndexing(StaticHeaderIndex::Path, path); ++ if (path.empty() || path == "/") { ++ frame.appendStaticHeader(StaticHeaderIndex::Path); ++ } else { ++ frame.appendHeaderWithoutIndexing(StaticHeaderIndex::Path, path); ++ } + frame.appendHeaderWithoutIndexing(StaticHeaderIndex::Host, host); + frame.adjustPayloadSize(); + return frame; +@@ -363,7 +367,11 @@ + makeNetworkOrderStreamId(stream_index)); + frame.appendStaticHeader(StaticHeaderIndex::MethodPost); + frame.appendStaticHeader(StaticHeaderIndex::SchemeHttps); +- frame.appendHeaderWithoutIndexing(StaticHeaderIndex::Path, path); ++ if (path.empty() || path == "/") { ++ frame.appendStaticHeader(StaticHeaderIndex::Path); ++ } else { ++ frame.appendHeaderWithoutIndexing(StaticHeaderIndex::Path, path); ++ } + frame.appendHeaderWithoutIndexing(StaticHeaderIndex::Host, host); + frame.adjustPayloadSize(); + return frame; +diff -Naur envoy/test/common/http/http2/http2_frame.h envoy-new/test/common/http/http2/http2_frame.h +--- envoy/test/common/http/http2/http2_frame.h 2024-01-16 23:42:11.223853586 +0800 ++++ envoy-new/test/common/http/http2/http2_frame.h 2024-01-16 23:27:25.853358572 +0800 +@@ -209,6 +209,13 @@ + ConstIterator end() const { return data_.end(); } + bool empty() const { return data_.empty(); } + ++ void appendHeaderWithoutIndexing(const Header& header); ++ // This method updates payload length in the HTTP2 header based on the size of the data_ ++ void adjustPayloadSize() { ++ ASSERT(size() >= HeaderSize); ++ setPayloadSize(size() - HeaderSize); ++ } ++ + private: + void buildHeader(Type type, uint32_t payload_size = 0, uint8_t flags = 0, uint32_t stream_id = 0); + void setPayloadSize(uint32_t size); +@@ -228,15 +235,8 @@ + // Headers are directly encoded + void appendStaticHeader(StaticHeaderIndex index); + void appendHeaderWithoutIndexing(StaticHeaderIndex index, absl::string_view value); +- void appendHeaderWithoutIndexing(const Header& header); + void appendEmptyHeader(); + +- // This method updates payload length in the HTTP2 header based on the size of the data_ +- void adjustPayloadSize() { +- ASSERT(size() >= HeaderSize); +- setPayloadSize(size() - HeaderSize); +- } +- + DataContainer data_; + }; + +diff -Naur envoy/test/integration/BUILD envoy-new/test/integration/BUILD +--- envoy/test/integration/BUILD 2024-01-16 23:42:11.287854050 +0800 ++++ envoy-new/test/integration/BUILD 2024-01-16 23:27:25.853358572 +0800 +@@ -370,7 +370,6 @@ + name = "multiplexed_integration_test", + srcs = [ + "multiplexed_integration_test.cc", +- "multiplexed_integration_test.h", + ], + shard_count = 4, + deps = [ +diff -Naur envoy/test/integration/multiplexed_integration_test.cc envoy-new/test/integration/multiplexed_integration_test.cc +--- envoy/test/integration/multiplexed_integration_test.cc 2024-01-16 23:42:11.255853818 +0800 ++++ envoy-new/test/integration/multiplexed_integration_test.cc 2024-01-16 23:27:25.853358572 +0800 +@@ -1,12 +1,14 @@ +-#include "test/integration/multiplexed_integration_test.h" +- + #include ++#include ++#include + #include + + #ifdef ENVOY_ENABLE_QUIC + #include "source/common/quic/client_connection_factory_impl.h" + #endif + ++#include "absl/synchronization/mutex.h" ++ + #include "envoy/config/bootstrap/v3/bootstrap.pb.h" + #include "envoy/config/cluster/v3/cluster.pb.h" + #include "envoy/extensions/filters/network/http_connection_manager/v3/http_connection_manager.pb.h" +@@ -16,10 +18,12 @@ + #include "source/common/http/header_map_impl.h" + + #include "test/integration/filters/stop_and_continue_filter_config.pb.h" ++#include "test/integration/http_protocol_integration.h" + #include "test/integration/utility.h" + #include "test/mocks/http/mocks.h" + #include "test/test_common/network_utility.h" + #include "test/test_common/printers.h" ++#include "test/test_common/simulated_time_system.h" + #include "test/test_common/utility.h" + + #include "gtest/gtest.h" +@@ -34,30 +38,42 @@ + return; \ + } + +-INSTANTIATE_TEST_SUITE_P(IpVersions, Http2IntegrationTest, ++class MultiplexedIntegrationTest : public HttpProtocolIntegrationTest { ++public: ++ void simultaneousRequest(int32_t request1_bytes, int32_t request2_bytes); ++}; ++ ++INSTANTIATE_TEST_SUITE_P(IpVersions, MultiplexedIntegrationTest, ++ testing::ValuesIn(HttpProtocolIntegrationTest::getProtocolTestParams( ++ {Http::CodecType::HTTP2}, {Http::CodecType::HTTP1})), ++ HttpProtocolIntegrationTest::protocolTestParamsToString); ++ ++class MultiplexedIntegrationTestWithSimulatedTime : public Event::TestUsingSimulatedTime, ++ public MultiplexedIntegrationTest {}; ++ ++INSTANTIATE_TEST_SUITE_P(IpVersions, MultiplexedIntegrationTestWithSimulatedTime, + testing::ValuesIn(HttpProtocolIntegrationTest::getProtocolTestParams( +- {Http::CodecType::HTTP2, Http::CodecType::HTTP3}, +- {Http::CodecType::HTTP1})), ++ {Http::CodecType::HTTP2}, {Http::CodecType::HTTP1})), + HttpProtocolIntegrationTest::protocolTestParamsToString); + +-TEST_P(Http2IntegrationTest, RouterRequestAndResponseWithBodyNoBuffer) { ++TEST_P(MultiplexedIntegrationTest, RouterRequestAndResponseWithBodyNoBuffer) { + testRouterRequestAndResponseWithBody(1024, 512, false, false); + } + +-TEST_P(Http2IntegrationTest, RouterRequestAndResponseWithGiantBodyNoBuffer) { ++TEST_P(MultiplexedIntegrationTest, RouterRequestAndResponseWithGiantBodyNoBuffer) { + config_helper_.addConfigModifier(ConfigHelper::adjustUpstreamTimeoutForTsan); + testRouterRequestAndResponseWithBody(10 * 1024 * 1024, 10 * 1024 * 1024, false, false, nullptr, + TSAN_TIMEOUT_FACTOR * TestUtility::DefaultTimeout); + } + +-TEST_P(Http2IntegrationTest, FlowControlOnAndGiantBody) { ++TEST_P(MultiplexedIntegrationTest, FlowControlOnAndGiantBody) { + config_helper_.addConfigModifier(ConfigHelper::adjustUpstreamTimeoutForTsan); + config_helper_.setBufferLimits(1024, 1024); // Set buffer limits upstream and downstream. + testRouterRequestAndResponseWithBody(10 * 1024 * 1024, 10 * 1024 * 1024, false, false, nullptr, + TSAN_TIMEOUT_FACTOR * TestUtility::DefaultTimeout); + } + +-TEST_P(Http2IntegrationTest, LargeFlowControlOnAndGiantBody) { ++TEST_P(MultiplexedIntegrationTest, LargeFlowControlOnAndGiantBody) { + config_helper_.addConfigModifier(ConfigHelper::adjustUpstreamTimeoutForTsan); + config_helper_.setBufferLimits(128 * 1024, + 128 * 1024); // Set buffer limits upstream and downstream. +@@ -65,24 +81,24 @@ + TSAN_TIMEOUT_FACTOR * TestUtility::DefaultTimeout); + } + +-TEST_P(Http2IntegrationTest, RouterRequestAndResponseWithBodyAndContentLengthNoBuffer) { ++TEST_P(MultiplexedIntegrationTest, RouterRequestAndResponseWithBodyAndContentLengthNoBuffer) { + testRouterRequestAndResponseWithBody(1024, 512, false, true); + } + +-TEST_P(Http2IntegrationTest, RouterRequestAndResponseWithGiantBodyAndContentLengthNoBuffer) { ++TEST_P(MultiplexedIntegrationTest, RouterRequestAndResponseWithGiantBodyAndContentLengthNoBuffer) { + config_helper_.addConfigModifier(ConfigHelper::adjustUpstreamTimeoutForTsan); + testRouterRequestAndResponseWithBody(10 * 1024 * 1024, 10 * 1024 * 1024, false, true, nullptr, + TSAN_TIMEOUT_FACTOR * TestUtility::DefaultTimeout); + } + +-TEST_P(Http2IntegrationTest, FlowControlOnAndGiantBodyWithContentLength) { ++TEST_P(MultiplexedIntegrationTest, FlowControlOnAndGiantBodyWithContentLength) { + config_helper_.addConfigModifier(ConfigHelper::adjustUpstreamTimeoutForTsan); + config_helper_.setBufferLimits(1024, 1024); // Set buffer limits upstream and downstream. + testRouterRequestAndResponseWithBody(10 * 1024 * 1024, 10 * 1024 * 1024, false, true, nullptr, + TSAN_TIMEOUT_FACTOR * TestUtility::DefaultTimeout); + } + +-TEST_P(Http2IntegrationTest, LargeFlowControlOnAndGiantBodyWithContentLength) { ++TEST_P(MultiplexedIntegrationTest, LargeFlowControlOnAndGiantBodyWithContentLength) { + config_helper_.addConfigModifier(ConfigHelper::adjustUpstreamTimeoutForTsan); + config_helper_.setBufferLimits(128 * 1024, + 128 * 1024); // Set buffer limits upstream and downstream. +@@ -90,42 +106,44 @@ + TSAN_TIMEOUT_FACTOR * TestUtility::DefaultTimeout); + } + +-TEST_P(Http2IntegrationTest, RouterHeaderOnlyRequestAndResponseNoBuffer) { ++TEST_P(MultiplexedIntegrationTest, RouterHeaderOnlyRequestAndResponseNoBuffer) { + testRouterHeaderOnlyRequestAndResponse(); + } + +-TEST_P(Http2IntegrationTest, RouterRequestAndResponseLargeHeaderNoBuffer) { ++TEST_P(MultiplexedIntegrationTest, RouterRequestAndResponseLargeHeaderNoBuffer) { + testRouterRequestAndResponseWithBody(1024, 512, true); + } + +-TEST_P(Http2IntegrationTest, RouterUpstreamDisconnectBeforeRequestcomplete) { ++TEST_P(MultiplexedIntegrationTest, RouterUpstreamDisconnectBeforeRequestcomplete) { + testRouterUpstreamDisconnectBeforeRequestComplete(); + } + +-TEST_P(Http2IntegrationTest, RouterUpstreamDisconnectBeforeResponseComplete) { ++TEST_P(MultiplexedIntegrationTest, RouterUpstreamDisconnectBeforeResponseComplete) { + testRouterUpstreamDisconnectBeforeResponseComplete(); + } + +-TEST_P(Http2IntegrationTest, RouterDownstreamDisconnectBeforeRequestComplete) { ++TEST_P(MultiplexedIntegrationTest, RouterDownstreamDisconnectBeforeRequestComplete) { + testRouterDownstreamDisconnectBeforeRequestComplete(); + } + +-TEST_P(Http2IntegrationTest, RouterDownstreamDisconnectBeforeResponseComplete) { ++TEST_P(MultiplexedIntegrationTest, RouterDownstreamDisconnectBeforeResponseComplete) { + testRouterDownstreamDisconnectBeforeResponseComplete(); + } + +-TEST_P(Http2IntegrationTest, RouterUpstreamResponseBeforeRequestComplete) { ++TEST_P(MultiplexedIntegrationTest, RouterUpstreamResponseBeforeRequestComplete) { + testRouterUpstreamResponseBeforeRequestComplete(); + } + +-TEST_P(Http2IntegrationTest, Retry) { testRetry(); } ++TEST_P(MultiplexedIntegrationTest, Retry) { testRetry(); } + +-TEST_P(Http2IntegrationTest, RetryAttemptCount) { testRetryAttemptCountHeader(); } ++TEST_P(MultiplexedIntegrationTest, RetryAttemptCount) { testRetryAttemptCountHeader(); } + +-TEST_P(Http2IntegrationTest, LargeRequestTrailersRejected) { testLargeRequestTrailers(66, 60); } ++TEST_P(MultiplexedIntegrationTest, LargeRequestTrailersRejected) { ++ testLargeRequestTrailers(66, 60); ++} + + // Verify downstream codec stream flush timeout. +-TEST_P(Http2IntegrationTest, CodecStreamIdleTimeout) { ++TEST_P(MultiplexedIntegrationTest, CodecStreamIdleTimeout) { + config_helper_.setBufferLimits(1024, 1024); + config_helper_.addConfigModifier( + [&](envoy::extensions::filters::network::http_connection_manager::v3::HttpConnectionManager& +@@ -161,7 +179,7 @@ + ASSERT_TRUE(response->waitForReset()); + } + +-TEST_P(Http2IntegrationTest, Http2DownstreamKeepalive) { ++TEST_P(MultiplexedIntegrationTest, Http2DownstreamKeepalive) { + // TODO(#16751) Need to support keepalive. + EXCLUDE_DOWNSTREAM_HTTP3; + constexpr uint64_t interval_ms = 1; +@@ -197,6 +215,41 @@ + "@type": type.googleapis.com/google.protobuf.Empty + )EOF"; + ++class Http2MetadataIntegrationTest : public HttpProtocolIntegrationTest { ++public: ++ void SetUp() override { ++ HttpProtocolIntegrationTest::SetUp(); ++ config_helper_.addConfigModifier( ++ [&](envoy::config::bootstrap::v3::Bootstrap& bootstrap) -> void { ++ RELEASE_ASSERT(bootstrap.mutable_static_resources()->clusters_size() >= 1, ""); ++ ConfigHelper::HttpProtocolOptions protocol_options; ++ protocol_options.mutable_explicit_http_config() ++ ->mutable_http2_protocol_options() ++ ->set_allow_metadata(true); ++ ConfigHelper::setProtocolOptions( ++ *bootstrap.mutable_static_resources()->mutable_clusters(0), protocol_options); ++ }); ++ config_helper_.addConfigModifier( ++ [&](envoy::extensions::filters::network::http_connection_manager::v3::HttpConnectionManager& ++ hcm) -> void { hcm.mutable_http2_protocol_options()->set_allow_metadata(true); }); ++ } ++ ++ void testRequestMetadataWithStopAllFilter(); ++ ++ void verifyHeadersOnlyTest(); ++ ++ void runHeaderOnlyTest(bool send_request_body, size_t body_size); ++ ++protected: ++ // Utility function to prepend filters. Note that the filters ++ // are added in reverse order. ++ void prependFilters(std::vector filters) { ++ for (const auto& filter : filters) { ++ config_helper_.prependFilter(filter); ++ } ++ } ++}; ++ + // Verifies metadata can be sent at different locations of the responses. + TEST_P(Http2MetadataIntegrationTest, ProxyMetadataInResponse) { + initialize(); +@@ -888,7 +941,7 @@ + EXPECT_EQ(count * size + added_decoded_data_size * 2, response->body().size()); + } + +-TEST_P(Http2IntegrationTest, GrpcRouterNotFound) { ++TEST_P(MultiplexedIntegrationTest, GrpcRouterNotFound) { + config_helper_.setDefaultHostAndRoute("foo.com", "/found"); + initialize(); + +@@ -901,10 +954,10 @@ + EXPECT_EQ("12", response->headers().getGrpcStatusValue()); + } + +-TEST_P(Http2IntegrationTest, GrpcRetry) { testGrpcRetry(); } ++TEST_P(MultiplexedIntegrationTest, GrpcRetry) { testGrpcRetry(); } + + // Verify the case where there is an HTTP/2 codec/protocol error with an active stream. +-TEST_P(Http2IntegrationTest, CodecErrorAfterStreamStart) { ++TEST_P(MultiplexedIntegrationTest, CodecErrorAfterStreamStart) { + EXCLUDE_DOWNSTREAM_HTTP3; // The HTTP/3 client has no "bad frame" equivalent. + initialize(); + codec_client_ = makeHttpConnection(lookupPort("http")); +@@ -921,7 +974,7 @@ + ASSERT_TRUE(response->waitForEndStream()); + } + +-TEST_P(Http2IntegrationTest, Http2BadMagic) { ++TEST_P(MultiplexedIntegrationTest, Http2BadMagic) { + if (downstreamProtocol() == Http::CodecType::HTTP3) { + // The "magic" payload is an HTTP/2 specific thing. + return; +@@ -937,7 +990,7 @@ + EXPECT_EQ("", response); + } + +-TEST_P(Http2IntegrationTest, BadFrame) { ++TEST_P(MultiplexedIntegrationTest, BadFrame) { + EXCLUDE_DOWNSTREAM_HTTP3; // The HTTP/3 client has no "bad frame" equivalent. + + initialize(); +@@ -953,7 +1006,7 @@ + + // Send client headers, a GoAway and then a body and ensure the full request and + // response are received. +-TEST_P(Http2IntegrationTest, GoAway) { ++TEST_P(MultiplexedIntegrationTest, GoAway) { + config_helper_.prependFilter(ConfigHelper::defaultHealthCheckFilter()); + initialize(); + +@@ -971,14 +1024,75 @@ + EXPECT_EQ("200", response->headers().getStatusValue()); + } + +-TEST_P(Http2IntegrationTest, Trailers) { testTrailers(1024, 2048, false, false); } ++// TODO(rch): Add a unit test which covers internal redirect handling. ++TEST_P(MultiplexedIntegrationTestWithSimulatedTime, GoAwayAfterTooManyResets) { ++ EXCLUDE_DOWNSTREAM_HTTP3; // Need to wait for the server to reset the stream ++ // before opening new one. ++ config_helper_.addRuntimeOverride("envoy.restart_features.send_goaway_for_premature_rst_streams", ++ "true"); ++ const int total_streams = 100; ++ config_helper_.addRuntimeOverride("overload.premature_reset_total_stream_count", ++ absl::StrCat(total_streams)); ++ initialize(); ++ ++ Http::TestRequestHeaderMapImpl headers{ ++ {":method", "GET"}, {":path", "/healthcheck"}, {":scheme", "http"}, {":authority", "host"}}; ++ codec_client_ = makeHttpConnection(lookupPort("http")); ++ for (int i = 0; i < total_streams; ++i) { ++ auto encoder_decoder = codec_client_->startRequest(headers); ++ request_encoder_ = &encoder_decoder.first; ++ auto response = std::move(encoder_decoder.second); ++ codec_client_->sendReset(*request_encoder_); ++ ASSERT_TRUE(response->waitForReset()); ++ } + +-TEST_P(Http2IntegrationTest, TrailersGiantBody) { ++ // Envoy should disconnect client due to premature reset check ++ ASSERT_TRUE(codec_client_->waitForDisconnect()); ++ test_server_->waitForCounterEq("http.config_test.downstream_rq_rx_reset", total_streams); ++ test_server_->waitForCounterEq("http.config_test.downstream_rq_too_many_premature_resets", 1); ++} ++ ++TEST_P(MultiplexedIntegrationTestWithSimulatedTime, DontGoAwayAfterTooManyResetsForLongStreams) { ++ EXCLUDE_DOWNSTREAM_HTTP3; // Need to wait for the server to reset the stream ++ // before opening new one. ++ config_helper_.addRuntimeOverride("envoy.restart_features.send_goaway_for_premature_rst_streams", ++ "true"); ++ const int total_streams = 100; ++ const int stream_lifetime_seconds = 2; ++ config_helper_.addRuntimeOverride("overload.premature_reset_total_stream_count", ++ absl::StrCat(total_streams)); ++ ++ config_helper_.addRuntimeOverride("overload.premature_reset_min_stream_lifetime_seconds", ++ absl::StrCat(stream_lifetime_seconds)); ++ ++ initialize(); ++ ++ Http::TestRequestHeaderMapImpl headers{ ++ {":method", "GET"}, {":path", "/healthcheck"}, {":scheme", "http"}, {":authority", "host"}}; ++ codec_client_ = makeHttpConnection(lookupPort("http")); ++ ++ std::string request_counter = "http.config_test.downstream_rq_total"; ++ std::string reset_counter = "http.config_test.downstream_rq_rx_reset"; ++ for (int i = 0; i < total_streams * 2; ++i) { ++ auto encoder_decoder = codec_client_->startRequest(headers); ++ request_encoder_ = &encoder_decoder.first; ++ auto response = std::move(encoder_decoder.second); ++ test_server_->waitForCounterEq(request_counter, i + 1); ++ timeSystem().advanceTimeWait(std::chrono::seconds(2 * stream_lifetime_seconds)); ++ codec_client_->sendReset(*request_encoder_); ++ ASSERT_TRUE(response->waitForReset()); ++ test_server_->waitForCounterEq(reset_counter, i + 1); ++ } ++} ++ ++TEST_P(MultiplexedIntegrationTest, Trailers) { testTrailers(1024, 2048, false, false); } ++ ++TEST_P(MultiplexedIntegrationTest, TrailersGiantBody) { + testTrailers(1024 * 1024, 1024 * 1024, false, false); + } + + // Ensure if new timeouts are set, legacy timeouts do not apply. +-TEST_P(Http2IntegrationTest, DEPRECATED_FEATURE_TEST(GrpcRequestTimeoutMixedLegacy)) { ++TEST_P(MultiplexedIntegrationTest, DEPRECATED_FEATURE_TEST(GrpcRequestTimeoutMixedLegacy)) { + config_helper_.addConfigModifier( + [&](envoy::extensions::filters::network::http_connection_manager::v3::HttpConnectionManager& + hcm) -> void { +@@ -1011,7 +1125,7 @@ + EXPECT_THAT(waitForAccessLog(access_log_name_), HasSubstr("via_upstream\n")); + } + +-TEST_P(Http2IntegrationTest, GrpcRequestTimeout) { ++TEST_P(MultiplexedIntegrationTest, GrpcRequestTimeout) { + config_helper_.addConfigModifier( + [&](envoy::extensions::filters::network::http_connection_manager::v3::HttpConnectionManager& + hcm) -> void { +@@ -1044,7 +1158,7 @@ + } + + // Interleave two requests and responses and make sure that idle timeout is handled correctly. +-TEST_P(Http2IntegrationTest, IdleTimeoutWithSimultaneousRequests) { ++TEST_P(MultiplexedIntegrationTest, IdleTimeoutWithSimultaneousRequests) { + FakeHttpConnectionPtr fake_upstream_connection1; + FakeHttpConnectionPtr fake_upstream_connection2; + Http::RequestEncoder* encoder1; +@@ -1133,7 +1247,7 @@ + } + + // Test request mirroring / shadowing with an HTTP/2 downstream and a request with a body. +-TEST_P(Http2IntegrationTest, RequestMirrorWithBody) { ++TEST_P(MultiplexedIntegrationTest, RequestMirrorWithBody) { + config_helper_.addConfigModifier( + [&](envoy::extensions::filters::network::http_connection_manager::v3::HttpConnectionManager& + hcm) -> void { +@@ -1181,7 +1295,8 @@ + } + + // Interleave two requests and responses and make sure the HTTP2 stack handles this correctly. +-void Http2IntegrationTest::simultaneousRequest(int32_t request1_bytes, int32_t request2_bytes) { ++void MultiplexedIntegrationTest::simultaneousRequest(int32_t request1_bytes, ++ int32_t request2_bytes) { + FakeHttpConnectionPtr fake_upstream_connection1; + FakeHttpConnectionPtr fake_upstream_connection2; + Http::RequestEncoder* encoder1; +@@ -1250,15 +1365,15 @@ + codec_client_->close(); + } + +-TEST_P(Http2IntegrationTest, SimultaneousRequest) { simultaneousRequest(1024, 512); } ++TEST_P(MultiplexedIntegrationTest, SimultaneousRequest) { simultaneousRequest(1024, 512); } + +-TEST_P(Http2IntegrationTest, SimultaneousRequestWithBufferLimits) { ++TEST_P(MultiplexedIntegrationTest, SimultaneousRequestWithBufferLimits) { + config_helper_.setBufferLimits(1024, 1024); // Set buffer limits upstream and downstream. + simultaneousRequest(1024 * 32, 1024 * 16); + } + + // Test downstream connection delayed close processing. +-TEST_P(Http2IntegrationTest, DelayedCloseAfterBadFrame) { ++TEST_P(MultiplexedIntegrationTest, DelayedCloseAfterBadFrame) { + EXCLUDE_DOWNSTREAM_HTTP3; // Needs HTTP/3 "bad frame" equivalent. + config_helper_.addConfigModifier( + [](envoy::extensions::filters::network::http_connection_manager::v3::HttpConnectionManager& +@@ -1288,7 +1403,7 @@ + } + + // Test disablement of delayed close processing on downstream connections. +-TEST_P(Http2IntegrationTest, DelayedCloseDisabled) { ++TEST_P(MultiplexedIntegrationTest, DelayedCloseDisabled) { + EXCLUDE_DOWNSTREAM_HTTP3; // Needs HTTP/3 "bad frame" equivalent. + config_helper_.addConfigModifier( + [](envoy::extensions::filters::network::http_connection_manager::v3::HttpConnectionManager& +@@ -1315,7 +1430,7 @@ + 0); + } + +-TEST_P(Http2IntegrationTest, PauseAndResume) { ++TEST_P(MultiplexedIntegrationTest, PauseAndResume) { + config_helper_.prependFilter(R"EOF( + name: stop-iteration-and-continue-filter + typed_config: +@@ -1345,7 +1460,7 @@ + ASSERT_TRUE(response->complete()); + } + +-TEST_P(Http2IntegrationTest, PauseAndResumeHeadersOnly) { ++TEST_P(MultiplexedIntegrationTest, PauseAndResumeHeadersOnly) { + config_helper_.prependFilter(R"EOF( + name: stop-iteration-and-continue-filter + typed_config: +@@ -1368,7 +1483,7 @@ + // Verify the case when we have large pending data with empty trailers. It should not introduce + // stack-overflow (on ASan build). This is a regression test for + // https://bugs.chromium.org/p/oss-fuzz/issues/detail?id=24714. +-TEST_P(Http2IntegrationTest, EmptyTrailers) { ++TEST_P(MultiplexedIntegrationTest, EmptyTrailers) { + initialize(); + codec_client_ = makeHttpConnection(lookupPort("http")); + +@@ -1386,7 +1501,22 @@ + ASSERT_TRUE(response->complete()); + } + +-Http2RingHashIntegrationTest::Http2RingHashIntegrationTest() { ++class MultiplexedRingHashIntegrationTest : public HttpProtocolIntegrationTest { ++public: ++ MultiplexedRingHashIntegrationTest(); ++ ++ ~MultiplexedRingHashIntegrationTest() override; ++ ++ void createUpstreams() override; ++ ++ void sendMultipleRequests(int request_bytes, Http::TestRequestHeaderMapImpl headers, ++ std::function cb); ++ ++ std::vector fake_upstream_connections_; ++ int num_upstreams_ = 5; ++}; ++ ++MultiplexedRingHashIntegrationTest::MultiplexedRingHashIntegrationTest() { + config_helper_.addConfigModifier([&](envoy::config::bootstrap::v3::Bootstrap& bootstrap) -> void { + auto* cluster = bootstrap.mutable_static_resources()->mutable_clusters(0); + cluster->clear_load_assignment(); +@@ -1405,7 +1535,7 @@ + }); + } + +-Http2RingHashIntegrationTest::~Http2RingHashIntegrationTest() { ++MultiplexedRingHashIntegrationTest::~MultiplexedRingHashIntegrationTest() { + if (codec_client_) { + codec_client_->close(); + codec_client_ = nullptr; +@@ -1418,13 +1548,13 @@ + } + } + +-void Http2RingHashIntegrationTest::createUpstreams() { ++void MultiplexedRingHashIntegrationTest::createUpstreams() { + for (int i = 0; i < num_upstreams_; i++) { + addFakeUpstream(Http::CodecType::HTTP1); + } + } + +-INSTANTIATE_TEST_SUITE_P(IpVersions, Http2RingHashIntegrationTest, ++INSTANTIATE_TEST_SUITE_P(IpVersions, MultiplexedRingHashIntegrationTest, + testing::ValuesIn(HttpProtocolIntegrationTest::getProtocolTestParams( + {Http::CodecType::HTTP2}, {Http::CodecType::HTTP1})), + HttpProtocolIntegrationTest::protocolTestParamsToString); +@@ -1434,7 +1564,7 @@ + {Http::CodecType::HTTP2}, {Http::CodecType::HTTP2})), + HttpProtocolIntegrationTest::protocolTestParamsToString); + +-void Http2RingHashIntegrationTest::sendMultipleRequests( ++void MultiplexedRingHashIntegrationTest::sendMultipleRequests( + int request_bytes, Http::TestRequestHeaderMapImpl headers, + std::function cb) { + TestRandomGenerator rand; +@@ -1481,7 +1611,7 @@ + } + } + +-TEST_P(Http2RingHashIntegrationTest, CookieRoutingNoCookieNoTtl) { ++TEST_P(MultiplexedRingHashIntegrationTest, CookieRoutingNoCookieNoTtl) { + config_helper_.addConfigModifier( + [&](envoy::extensions::filters::network::http_connection_manager::v3::HttpConnectionManager& + hcm) -> void { +@@ -1515,7 +1645,7 @@ + EXPECT_EQ(served_by.size(), num_upstreams_); + } + +-TEST_P(Http2RingHashIntegrationTest, CookieRoutingNoCookieWithNonzeroTtlSet) { ++TEST_P(MultiplexedRingHashIntegrationTest, CookieRoutingNoCookieWithNonzeroTtlSet) { + config_helper_.addConfigModifier( + [&](envoy::extensions::filters::network::http_connection_manager::v3::HttpConnectionManager& + hcm) -> void { +@@ -1546,7 +1676,7 @@ + EXPECT_EQ(set_cookies.size(), 1); + } + +-TEST_P(Http2RingHashIntegrationTest, CookieRoutingNoCookieWithZeroTtlSet) { ++TEST_P(MultiplexedRingHashIntegrationTest, CookieRoutingNoCookieWithZeroTtlSet) { + config_helper_.addConfigModifier( + [&](envoy::extensions::filters::network::http_connection_manager::v3::HttpConnectionManager& + hcm) -> void { +@@ -1577,7 +1707,7 @@ + EXPECT_EQ(set_cookies.size(), 1); + } + +-TEST_P(Http2RingHashIntegrationTest, CookieRoutingWithCookieNoTtl) { ++TEST_P(MultiplexedRingHashIntegrationTest, CookieRoutingWithCookieNoTtl) { + config_helper_.addConfigModifier( + [&](envoy::extensions::filters::network::http_connection_manager::v3::HttpConnectionManager& + hcm) -> void { +@@ -1609,7 +1739,7 @@ + EXPECT_EQ(served_by.size(), 1); + } + +-TEST_P(Http2RingHashIntegrationTest, CookieRoutingWithCookieWithTtlSet) { ++TEST_P(MultiplexedRingHashIntegrationTest, CookieRoutingWithCookieWithTtlSet) { + config_helper_.addConfigModifier( + [&](envoy::extensions::filters::network::http_connection_manager::v3::HttpConnectionManager& + hcm) -> void { +@@ -1826,7 +1956,7 @@ + "@type": type.googleapis.com/google.protobuf.Empty + )EOF"; + +-TEST_P(Http2IntegrationTest, OnLocalReply) { ++TEST_P(MultiplexedIntegrationTest, OnLocalReply) { + config_helper_.prependFilter(on_local_reply_filter); + initialize(); + +@@ -1860,7 +1990,7 @@ + } + } + +-TEST_P(Http2IntegrationTest, InvalidTrailers) { ++TEST_P(MultiplexedIntegrationTest, InvalidTrailers) { + useAccessLog("%RESPONSE_CODE_DETAILS%"); + autonomous_upstream_ = true; + initialize(); +diff -Naur envoy/test/integration/multiplexed_integration_test.h envoy-new/test/integration/multiplexed_integration_test.h +--- envoy/test/integration/multiplexed_integration_test.h 2024-01-16 23:42:11.255853818 +0800 ++++ envoy-new/test/integration/multiplexed_integration_test.h 1970-01-01 08:00:00.000000000 +0800 +@@ -1,69 +0,0 @@ +-#pragma once +- +-#include +- +-#include "envoy/config/bootstrap/v3/bootstrap.pb.h" +-#include "envoy/extensions/filters/network/http_connection_manager/v3/http_connection_manager.pb.h" +- +-#include "test/integration/http_protocol_integration.h" +- +-#include "absl/synchronization/mutex.h" +-#include "gtest/gtest.h" +- +-namespace Envoy { +-class Http2IntegrationTest : public HttpProtocolIntegrationTest { +-public: +- void simultaneousRequest(int32_t request1_bytes, int32_t request2_bytes); +- +-protected: +- // Utility function to prepend filters. Note that the filters +- // are added in reverse order. +- void prependFilters(std::vector filters) { +- for (const auto& filter : filters) { +- config_helper_.prependFilter(filter); +- } +- } +-}; +- +-class Http2RingHashIntegrationTest : public Http2IntegrationTest { +-public: +- Http2RingHashIntegrationTest(); +- +- ~Http2RingHashIntegrationTest() override; +- +- void createUpstreams() override; +- +- void sendMultipleRequests(int request_bytes, Http::TestRequestHeaderMapImpl headers, +- std::function cb); +- +- std::vector fake_upstream_connections_; +- int num_upstreams_ = 5; +-}; +- +-class Http2MetadataIntegrationTest : public Http2IntegrationTest { +-public: +- void SetUp() override { +- HttpProtocolIntegrationTest::SetUp(); +- config_helper_.addConfigModifier( +- [&](envoy::config::bootstrap::v3::Bootstrap& bootstrap) -> void { +- RELEASE_ASSERT(bootstrap.mutable_static_resources()->clusters_size() >= 1, ""); +- ConfigHelper::HttpProtocolOptions protocol_options; +- protocol_options.mutable_explicit_http_config() +- ->mutable_http2_protocol_options() +- ->set_allow_metadata(true); +- ConfigHelper::setProtocolOptions( +- *bootstrap.mutable_static_resources()->mutable_clusters(0), protocol_options); +- }); +- config_helper_.addConfigModifier( +- [&](envoy::extensions::filters::network::http_connection_manager::v3::HttpConnectionManager& +- hcm) -> void { hcm.mutable_http2_protocol_options()->set_allow_metadata(true); }); +- } +- +- void testRequestMetadataWithStopAllFilter(); +- +- void verifyHeadersOnlyTest(); +- +- void runHeaderOnlyTest(bool send_request_body, size_t body_size); +-}; +- +-} // namespace Envoy +diff -Naur envoy/test/mocks/router/mocks.cc envoy-new/test/mocks/router/mocks.cc +--- envoy/test/mocks/router/mocks.cc 2024-01-16 23:42:11.287854050 +0800 ++++ envoy-new/test/mocks/router/mocks.cc 2024-01-16 23:27:25.853358572 +0800 +@@ -154,7 +154,7 @@ + MockRouteConfigProviderManager::~MockRouteConfigProviderManager() = default; + + MockScopedConfig::MockScopedConfig() { +- ON_CALL(*this, getRouteConfig(_)).WillByDefault(Return(route_config_)); ++ ON_CALL(*this, getRouteConfig(_, _)).WillByDefault(Return(route_config_)); + } + MockScopedConfig::~MockScopedConfig() = default; + +diff -Naur envoy/test/mocks/router/mocks.h envoy-new/test/mocks/router/mocks.h +--- envoy/test/mocks/router/mocks.h 2024-01-16 23:42:11.303854166 +0800 ++++ envoy-new/test/mocks/router/mocks.h 2024-01-16 23:27:25.853358572 +0800 +@@ -566,7 +566,11 @@ + public: + MockScopedConfig(); + ~MockScopedConfig() override; +- ++#if defined(ALIMESH) ++ MOCK_METHOD(ConfigConstSharedPtr, getRouteConfig, ++ (const Http::HeaderMap& headers, const StreamInfo::StreamInfo&), (const)); ++#else + MOCK_METHOD(ConfigConstSharedPtr, getRouteConfig, (const Http::HeaderMap& headers), (const)); ++#endif + + std::shared_ptr route_config_{new NiceMock()}; + }; + \ No newline at end of file