diff --git a/bazel/grpc_build_system.bzl b/bazel/grpc_build_system.bzl index d3907c71973bb..3ec0e17959436 100644 --- a/bazel/grpc_build_system.bzl +++ b/bazel/grpc_build_system.bzl @@ -116,7 +116,7 @@ def _update_visibility(visibility): "grpc_public_hdrs": PRIVATE, "grpcpp_gcp_observability": PUBLIC, "grpc_resolver_fake": PRIVATE, - "grpc++_public_hdrs": PUBLIC, + "grpc++_public_hdrs": PRIVATE, "http": PRIVATE, "httpcli": PRIVATE, "iomgr_internal_errqueue": PRIVATE, diff --git a/include/grpc/status.h b/include/grpc/status.h index 378c8bcc52cb1..13fee46abc0d3 100644 --- a/include/grpc/status.h +++ b/include/grpc/status.h @@ -146,7 +146,7 @@ typedef enum { GRPC_STATUS_DATA_LOSS = 15, /** Force users to include a default branch: */ - GRPC_STATUS__DO_NOT_USE = -1 + GRPC_STATUS__DO_NOT_USE = 0x7fffffffu, } grpc_status_code; #ifdef __cplusplus diff --git a/src/core/ext/transport/inproc/legacy_inproc_transport.cc b/src/core/ext/transport/inproc/legacy_inproc_transport.cc index 2f4a6fa8a5e8d..9abf6b75a082d 100644 --- a/src/core/ext/transport/inproc/legacy_inproc_transport.cc +++ b/src/core/ext/transport/inproc/legacy_inproc_transport.cc @@ -185,16 +185,21 @@ struct inproc_stream { if (!server_data) { t->ref(); - inproc_transport* st = t->other_side; - st->ref(); other_side = nullptr; // will get filled in soon - // Pass the client-side stream address to the server-side for a ref - ref("inproc_init_stream:clt"); // ref it now on behalf of server - // side to avoid destruction - GRPC_TRACE_LOG(inproc, INFO) - << "calling accept stream cb " << st->accept_stream_cb << " " - << st->accept_stream_data; - (*st->accept_stream_cb)(st->accept_stream_data, t, this); + inproc_transport* st = t->other_side; + if (st->accept_stream_cb == nullptr) { + cancel_stream_locked(this, + absl::UnavailableError("inproc server closed")); + } else { + st->ref(); + // Pass the client-side stream address to the server-side for a ref + ref("inproc_init_stream:clt"); // ref it now on behalf of server + // side to avoid destruction + GRPC_TRACE_LOG(inproc, INFO) + << "calling accept stream cb " << st->accept_stream_cb << " " + << st->accept_stream_data; + (*st->accept_stream_cb)(st->accept_stream_data, t, this); + } } else { // This is the server-side and is being called through accept_stream_cb inproc_stream* cs = const_cast( diff --git a/src/core/lib/transport/metadata_batch.h b/src/core/lib/transport/metadata_batch.h index 87eea4b2a6c8d..2bd5f3d62d851 100644 --- a/src/core/lib/transport/metadata_batch.h +++ b/src/core/lib/transport/metadata_batch.h @@ -369,12 +369,73 @@ struct SimpleIntBasedMetadata : public SimpleIntBasedMetadataBase { }; // grpc-status metadata trait. -struct GrpcStatusMetadata - : public SimpleIntBasedMetadata { +struct GrpcStatusMetadata { + using ValueType = grpc_status_code; + using MementoType = grpc_status_code; + static ValueType MementoToValue(MementoType value) { return value; } + static Slice Encode(ValueType x) { return Slice::FromInt64(x); } + static std::string DisplayValue(ValueType x) { + switch (x) { + case GRPC_STATUS_OK: + return "OK"; + case GRPC_STATUS_CANCELLED: + return "CANCELLED"; + case GRPC_STATUS_UNKNOWN: + return "UNKNOWN"; + case GRPC_STATUS_INVALID_ARGUMENT: + return "INVALID_ARGUMENT"; + case GRPC_STATUS_DEADLINE_EXCEEDED: + return "DEADLINE_EXCEEDED"; + case GRPC_STATUS_NOT_FOUND: + return "NOT_FOUND"; + case GRPC_STATUS_ALREADY_EXISTS: + return "ALREADY_EXISTS"; + case GRPC_STATUS_PERMISSION_DENIED: + return "PERMISSION_DENIED"; + case GRPC_STATUS_RESOURCE_EXHAUSTED: + return "RESOURCE_EXHAUSTED"; + case GRPC_STATUS_FAILED_PRECONDITION: + return "FAILED_PRECONDITION"; + case GRPC_STATUS_ABORTED: + return "ABORTED"; + case GRPC_STATUS_OUT_OF_RANGE: + return "OUT_OF_RANGE"; + case GRPC_STATUS_UNIMPLEMENTED: + return "UNIMPLEMENTED"; + case GRPC_STATUS_INTERNAL: + return "INTERNAL"; + case GRPC_STATUS_UNAVAILABLE: + return "UNAVAILABLE"; + case GRPC_STATUS_DATA_LOSS: + return "DATA_LOSS"; + case GRPC_STATUS_UNAUTHENTICATED: + return "UNAUTHENTICATED"; + default: + return absl::StrCat("UNKNOWN(", static_cast(x), ")"); + } + } + static auto DisplayMemento(MementoType x) { return DisplayValue(x); } static constexpr bool kRepeatable = false; static constexpr bool kTransferOnTrailersOnly = false; using CompressionTraits = SmallIntegralValuesCompressor<16>; static absl::string_view key() { return "grpc-status"; } + static grpc_status_code ParseMemento(Slice value, bool, + MetadataParseErrorFn on_error) { + int64_t wire_value; + if (!absl::SimpleAtoi(value.as_string_view(), &wire_value)) { + on_error("not an integer", value); + return GRPC_STATUS_UNKNOWN; + } + if (wire_value < 0) { + on_error("negative value", value); + return GRPC_STATUS_UNKNOWN; + } + if (wire_value >= GRPC_STATUS__DO_NOT_USE) { + on_error("out of range", value); + return GRPC_STATUS_UNKNOWN; + } + return static_cast(wire_value); + } }; // grpc-previous-rpc-attempts metadata trait. diff --git a/src/core/xds/grpc/xds_server_grpc.cc b/src/core/xds/grpc/xds_server_grpc.cc index b6e41374befcc..6b4eb9396992a 100644 --- a/src/core/xds/grpc/xds_server_grpc.cc +++ b/src/core/xds/grpc/xds_server_grpc.cc @@ -39,6 +39,9 @@ constexpr absl::string_view kServerFeatureIgnoreResourceDeletion = constexpr absl::string_view kServerFeatureFailOnDataErrors = "fail_on_data_errors"; +constexpr absl::string_view kServerFeatureResourceTimerIsTransientFailure = + "resource_timer_is_transient_error"; + constexpr absl::string_view kServerFeatureTrustedXdsServer = "trusted_xds_server"; @@ -54,6 +57,12 @@ bool GrpcXdsServer::FailOnDataErrors() const { server_features_.end(); } +bool GrpcXdsServer::ResourceTimerIsTransientFailure() const { + return server_features_.find( + std::string(kServerFeatureResourceTimerIsTransientFailure)) != + server_features_.end(); +} + bool GrpcXdsServer::TrustedXdsServer() const { return server_features_.find(std::string(kServerFeatureTrustedXdsServer)) != server_features_.end(); @@ -135,6 +144,8 @@ void GrpcXdsServer::JsonPostLoad(const Json& json, const JsonArgs& args, if (feature_json.type() == Json::Type::kString && (feature_json.string() == kServerFeatureIgnoreResourceDeletion || feature_json.string() == kServerFeatureFailOnDataErrors || + feature_json.string() == + kServerFeatureResourceTimerIsTransientFailure || feature_json.string() == kServerFeatureTrustedXdsServer)) { server_features_.insert(feature_json.string()); } diff --git a/src/core/xds/grpc/xds_server_grpc.h b/src/core/xds/grpc/xds_server_grpc.h index 7b58e2a75b11b..e83215a4ec251 100644 --- a/src/core/xds/grpc/xds_server_grpc.h +++ b/src/core/xds/grpc/xds_server_grpc.h @@ -36,6 +36,7 @@ class GrpcXdsServer final : public XdsBootstrap::XdsServer { bool IgnoreResourceDeletion() const override; bool FailOnDataErrors() const override; + bool ResourceTimerIsTransientFailure() const override; bool TrustedXdsServer() const; diff --git a/src/core/xds/xds_client/xds_bootstrap.h b/src/core/xds/xds_client/xds_bootstrap.h index fedc10bc92fe6..4a0859672e2d2 100644 --- a/src/core/xds/xds_client/xds_bootstrap.h +++ b/src/core/xds/xds_client/xds_bootstrap.h @@ -53,6 +53,7 @@ class XdsBootstrap { virtual bool IgnoreResourceDeletion() const = 0; virtual bool FailOnDataErrors() const = 0; + virtual bool ResourceTimerIsTransientFailure() const = 0; virtual bool Equals(const XdsServer& other) const = 0; diff --git a/src/core/xds/xds_client/xds_client.cc b/src/core/xds/xds_client/xds_client.cc index 6905cd26804a8..da09d1db1be55 100644 --- a/src/core/xds/xds_client/xds_client.cc +++ b/src/core/xds/xds_client/xds_client.cc @@ -208,9 +208,16 @@ class XdsClient::XdsChannel::AdsCall final if (state.HasResource()) return; // Start timer. ads_call_ = std::move(ads_call); + Duration timeout = ads_call_->xds_client()->request_timeout_; + if (timeout == Duration::Zero()) { + timeout = XdsDataErrorHandlingEnabled() && + ads_call_->xds_channel() + ->server_.ResourceTimerIsTransientFailure() + ? Duration::Seconds(30) + : Duration::Seconds(15); + } timer_handle_ = ads_call_->xds_client()->engine()->RunAfter( - ads_call_->xds_client()->request_timeout_, - [self = Ref(DEBUG_LOCATION, "timer")]() { + timeout, [self = Ref(DEBUG_LOCATION, "timer")]() { ApplicationCallbackExecCtx callback_exec_ctx; ExecCtx exec_ctx; self->OnTimer(); @@ -236,7 +243,15 @@ class XdsClient::XdsChannel::AdsCall final name_.authority, type_->type_url(), name_.key) << "} from xds server"; resource_seen_ = true; - state.SetDoesNotExistOnTimeout(); + if (XdsDataErrorHandlingEnabled() && + ads_call_->xds_channel() + ->server_.ResourceTimerIsTransientFailure()) { + state.SetTimeout( + absl::StrCat("timeout obtaining resource from xDS server ", + ads_call_->xds_channel()->server_uri())); + } else { + state.SetDoesNotExistOnTimeout(); + } ads_call_->xds_client()->NotifyWatchersOnResourceChanged( state.failed_status(), state.watchers(), ReadDelayHandle::NoWait()); @@ -1302,18 +1317,12 @@ void XdsClient::ResourceState::SetNacked(const std::string& version, serialized_proto_.clear(); } client_status_ = ClientResourceStatus::NACKED; - failed_version_ = version; failed_status_ = absl::InvalidArgumentError(absl::StrCat("invalid resource: ", details)); + failed_version_ = version; failed_update_time_ = update_time; } -void XdsClient::ResourceState::SetDoesNotExistOnTimeout() { - client_status_ = ClientResourceStatus::DOES_NOT_EXIST; - failed_status_ = absl::NotFoundError("does not exist"); - failed_version_.clear(); -} - void XdsClient::ResourceState::SetDoesNotExistOnLdsOrCdsDeletion( const std::string& version, Timestamp update_time, bool drop_cached_resource) { @@ -1327,6 +1336,18 @@ void XdsClient::ResourceState::SetDoesNotExistOnLdsOrCdsDeletion( failed_update_time_ = update_time; } +void XdsClient::ResourceState::SetDoesNotExistOnTimeout() { + client_status_ = ClientResourceStatus::DOES_NOT_EXIST; + failed_status_ = absl::NotFoundError("does not exist"); + failed_version_.clear(); +} + +void XdsClient::ResourceState::SetTimeout(const std::string& details) { + client_status_ = ClientResourceStatus::TIMEOUT; + failed_status_ = absl::UnavailableError(details); + failed_version_.clear(); +} + absl::string_view XdsClient::ResourceState::CacheStateString() const { switch (client_status_) { case ClientResourceStatus::REQUESTED: @@ -1338,6 +1359,8 @@ absl::string_view XdsClient::ResourceState::CacheStateString() const { return "acked"; case ClientResourceStatus::NACKED: return resource_ != nullptr ? "nacked_but_cached" : "nacked"; + case ClientResourceStatus::TIMEOUT: + return "timeout"; } Crash("unknown resource state"); } diff --git a/src/core/xds/xds_client/xds_client.h b/src/core/xds/xds_client/xds_client.h index 0f0e52a9daec1..88a3d5846711a 100644 --- a/src/core/xds/xds_client/xds_client.h +++ b/src/core/xds/xds_client/xds_client.h @@ -89,7 +89,9 @@ class XdsClient : public DualRefCounted { std::shared_ptr engine, std::unique_ptr metrics_reporter, std::string user_agent_name, std::string user_agent_version, - Duration resource_request_timeout = Duration::Seconds(15)); + // This parameter overrides the timer duration for testing + // purposes only -- do not use in production. + Duration resource_request_timeout = Duration::Zero()); ~XdsClient() override; // Start and cancel watch for a resource. @@ -276,6 +278,9 @@ class XdsClient : public DualRefCounted { ACKED, // Client received this resource and replied with NACK. NACKED, + // Client encountered timeout getting resource from server. + // TODO(roth): Remove explicit value when adding RECEIVED_ERROR. + TIMEOUT = 6, }; static_assert(static_cast(envoy_admin_v3_REQUESTED) == ClientResourceStatus::REQUESTED); @@ -286,6 +291,8 @@ class XdsClient : public DualRefCounted { ClientResourceStatus::ACKED); static_assert(static_cast(envoy_admin_v3_NACKED) == ClientResourceStatus::NACKED); + static_assert(static_cast(envoy_admin_v3_TIMEOUT) == + ClientResourceStatus::TIMEOUT); void AddWatcher(RefCountedPtr watcher) { watchers_.insert(std::move(watcher)); @@ -301,10 +308,11 @@ class XdsClient : public DualRefCounted { Timestamp update_time); void SetNacked(const std::string& version, absl::string_view details, Timestamp update_time, bool drop_cached_resource); - void SetDoesNotExistOnTimeout(); void SetDoesNotExistOnLdsOrCdsDeletion(const std::string& version, Timestamp update_time, bool drop_cached_resource); + void SetDoesNotExistOnTimeout(); + void SetTimeout(const std::string& details); ClientResourceStatus client_status() const { return client_status_; } absl::string_view CacheStateString() const; @@ -332,11 +340,12 @@ class XdsClient : public DualRefCounted { Timestamp update_time_; // The last successfully updated version of the resource. std::string version_; + // Details about the last failed update attempt or transient error. + absl::Status failed_status_; // The rejected version string of the last failed update attempt. std::string failed_version_; - // Details about the last failed update attempt. - absl::Status failed_status_; // Timestamp of the last failed update attempt. + // Used only if failed_version_ is non-empty. Timestamp failed_update_time_; }; diff --git a/test/core/end2end/BUILD b/test/core/end2end/BUILD index dfc12e171e68f..dbb547e321849 100644 --- a/test/core/end2end/BUILD +++ b/test/core/end2end/BUILD @@ -272,7 +272,10 @@ grpc_core_end2end_test( grpc_core_end2end_test(name = "http2_stats") -grpc_core_end2end_test(name = "invoke_large_request") +grpc_core_end2end_test( + name = "invoke_large_request", + shard_count = 5, +) grpc_core_end2end_test(name = "keepalive_timeout") diff --git a/test/core/end2end/end2end_tests.h b/test/core/end2end/end2end_tests.h index e4b162013315d..ad6ed91785bb5 100644 --- a/test/core/end2end/end2end_tests.h +++ b/test/core/end2end/end2end_tests.h @@ -719,6 +719,14 @@ core_end2end_test_fuzzer::Msg ParseTestProto(std::string text); void suite##_##name(const grpc_core::CoreTestConfiguration* config, \ core_end2end_test_fuzzer::Msg msg) { \ if (absl::StartsWith(#name, "DISABLED_")) GTEST_SKIP() << "disabled test"; \ + if (!IsEventEngineListenerEnabled() || !IsEventEngineClientEnabled() || \ + !IsEventEngineDnsEnabled()) { \ + GTEST_SKIP() << "fuzzers need event engine"; \ + } \ + if (IsEventEngineDnsNonClientChannelEnabled()) { \ + GTEST_SKIP() << "event_engine_dns_non_client_channel experiment breaks " \ + "fuzzing currently"; \ + } \ CoreEnd2endTest_##suite##_##name(config, &msg).RunTest(); \ grpc_event_engine::experimental::ShutdownDefaultEventEngine(); \ } \ diff --git a/test/core/end2end/fuzzers/api_fuzzer.cc b/test/core/end2end/fuzzers/api_fuzzer.cc index 8d6b8a304cd8f..6c53dc6ecf046 100644 --- a/test/core/end2end/fuzzers/api_fuzzer.cc +++ b/test/core/end2end/fuzzers/api_fuzzer.cc @@ -16,6 +16,7 @@ // // +#include #include #include #include @@ -43,6 +44,7 @@ #include "absl/time/clock.h" #include "absl/time/time.h" #include "fuzztest/fuzztest.h" +#include "gtest/gtest.h" #include "src/core/ext/transport/inproc/inproc_transport.h" #include "src/core/lib/address_utils/parse_address.h" #include "src/core/lib/channel/channel_args.h" @@ -70,12 +72,6 @@ // IWYU pragma: no_include -//////////////////////////////////////////////////////////////////////////////// -// logging - -bool squelch = true; -bool leak_check = true; - //////////////////////////////////////////////////////////////////////////////// // dns resolution @@ -514,14 +510,31 @@ void ApiFuzzer::DestroyChannel() { } void RunApiFuzzer(const api_fuzzer::Msg& msg) { - if (squelch && !GetEnv("GRPC_TRACE_FUZZER").has_value()) { - grpc_disable_all_absl_logs(); - } ApplyFuzzConfigVars(msg.config_vars()); TestOnlyReloadExperimentsFromConfigVariables(); ApiFuzzer(msg.event_engine_actions()).Run(msg.actions()); } FUZZ_TEST(MyTestSuite, RunApiFuzzer); +auto ParseTestProto(const std::string& proto) { + api_fuzzer::Msg msg; + CHECK(google::protobuf::TextFormat::ParseFromString(proto, &msg)); + return msg; +} + +TEST(MyTestSuite, RunApiFuzzerRegression1) { + RunApiFuzzer(ParseTestProto( + R"pb(actions { create_server {} } + actions { shutdown_server {} } + actions { create_channel { inproc: true } } + actions { + create_call { + method { value: "\364\217\277\277" } + host { value: ")" } + } + } + )pb")); +} + } // namespace testing } // namespace grpc_core diff --git a/test/core/end2end/grpc_core_end2end_test.bzl b/test/core/end2end/grpc_core_end2end_test.bzl index b1f080d0b2635..ee66f602b7d4d 100644 --- a/test/core/end2end/grpc_core_end2end_test.bzl +++ b/test/core/end2end/grpc_core_end2end_test.bzl @@ -100,7 +100,7 @@ def grpc_core_end2end_test(name, shard_count = 1, enable_fuzzing = True, tags = deps = _DEPS + ["end2end_test_lib_no_fuzztest_gtest"], data = _DATA, shard_count = shard_count, - tags = tags, + tags = tags + ["core_end2end_test"], flaky = flaky, ) diff --git a/test/core/test_util/grpc_fuzzer.bzl b/test/core/test_util/grpc_fuzzer.bzl index 2f9b2894c2fff..153864d4f8a49 100644 --- a/test/core/test_util/grpc_fuzzer.bzl +++ b/test/core/test_util/grpc_fuzzer.bzl @@ -21,82 +21,7 @@ simpler and better maintained, and we'll eventually replace existing fuzzers with grpc_fuzz_test. """ -load("//bazel:grpc_build_system.bzl", "grpc_cc_proto_library", "grpc_cc_test", "grpc_internal_proto_library") - -def grpc_proto_fuzzer( - name, - corpus, - proto, - owner = "grpc", # @unused - proto_deps = [], - external_deps = [], - srcs = [], - tags = [], - deps = [], - end2end_fuzzer = False, # @unused - data = [], - size = "large", - **kwargs): - """Instantiates a protobuf mutator fuzzer test. - - Args: - name: The name of the test. - corpus: The corpus for the test. - proto: The proto for the test. If empty, it assumes the proto dependency - is already included in the target deps. Otherwise it creates a - new proto_library with name "_{name}_proto" and - cc_proto_library with name "_{name}_cc_proto" and makes the - fuzz target depend on the latter. - proto_deps: Deps for proto. Only used if proto is not empty. - external_deps: External deps. - srcs: The source files for the test. - deps: The dependencies of the test. - data: The data dependencies of the test. - size: The size of the test. - tags: The tags for the test. - owner: The owning team of the test (for auto-bug-filing). - end2end_fuzzer: Flag to enable end2end fuzzers. - This is currently False and ignored - **kwargs: Other arguments to supply to the test. - """ - - CORPUS_DIR = native.package_name() + "/" + corpus - deps = deps + ["@com_google_libprotobuf_mutator//:libprotobuf_mutator"] - - if "gtest" not in external_deps: - external_deps = external_deps + ["gtest"] - - if proto != None: - PROTO_LIBRARY = "_%s_proto" % name - grpc_internal_proto_library( - name = PROTO_LIBRARY, - srcs = [proto], - deps = proto_deps, - ) - CC_PROTO_LIBRARY = "_%s_cc_proto" % name - grpc_cc_proto_library( - name = CC_PROTO_LIBRARY, - deps = [PROTO_LIBRARY], - ) - deps = deps + [CC_PROTO_LIBRARY] - - grpc_cc_test( - name = name, - srcs = srcs, - tags = tags + ["grpc-fuzzer", "no-cache"], - deps = deps + select({ - "//:grpc_build_fuzzers": [], - "//conditions:default": ["//test/core/test_util:fuzzer_corpus_test"], - }), - data = data + native.glob([corpus + "/**"]), - external_deps = external_deps, - size = size, - args = select({ - "//:grpc_build_fuzzers": [CORPUS_DIR, "-runs=20000", "-max_total_time=300"], - "//conditions:default": ["--directory=" + CORPUS_DIR], - }), - **kwargs - ) +load("//bazel:grpc_build_system.bzl", "grpc_cc_test") def grpc_fuzz_test(name, srcs = [], deps = [], tags = [], data = [], external_deps = []): """Instantiates a fuzztest based test. diff --git a/test/core/transport/chttp2/hpack_sync_fuzzer.cc b/test/core/transport/chttp2/hpack_sync_fuzzer.cc index cee88c14bc893..970b4216d3c84 100644 --- a/test/core/transport/chttp2/hpack_sync_fuzzer.cc +++ b/test/core/transport/chttp2/hpack_sync_fuzzer.cc @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include #include #include #include @@ -225,5 +226,25 @@ void FuzzOneInput(const hpack_sync_fuzzer::Msg& msg) { } FUZZ_TEST(HpackSyncFuzzer, FuzzOneInput); +auto ParseTestProto(const std::string& proto) { + hpack_sync_fuzzer::Msg msg; + CHECK(google::protobuf::TextFormat::ParseFromString(proto, &msg)); + return msg; +} + +TEST(HpackSyncFuzzer, FuzzOneInputRegression1) { + FuzzOneInput(ParseTestProto( + R"pb( + headers { literal_not_idx { key: "grpc-status" value: "72" } } + )pb")); +} + +TEST(HpackSyncFuzzer, FuzzOneInputRegression2) { + FuzzOneInput(ParseTestProto( + R"pb( + headers { literal_not_idx { key: "grpc-status" value: "-1" } } + )pb")); +} + } // namespace } // namespace grpc_core diff --git a/test/core/xds/xds_client_test.cc b/test/core/xds/xds_client_test.cc index 5fb0444f6e12f..53969cd53c2a7 100644 --- a/test/core/xds/xds_client_test.cc +++ b/test/core/xds/xds_client_test.cc @@ -139,14 +139,20 @@ class XdsClientTest : public ::testing::Test { public: explicit FakeXdsServer( absl::string_view server_uri = kDefaultXdsServerUrl, - bool fail_on_data_errors = false) + bool fail_on_data_errors = false, + bool resource_timer_is_transient_failure = false) : server_uri_(server_uri), - fail_on_data_errors_(fail_on_data_errors) {} + fail_on_data_errors_(fail_on_data_errors), + resource_timer_is_transient_failure_( + resource_timer_is_transient_failure) {} const std::string& server_uri() const override { return server_uri_; } bool IgnoreResourceDeletion() const override { return !fail_on_data_errors_; } bool FailOnDataErrors() const override { return fail_on_data_errors_; } + bool ResourceTimerIsTransientFailure() const override { + return resource_timer_is_transient_failure_; + } bool Equals(const XdsServer& other) const override { const auto& o = static_cast(other); return server_uri_ == o.server_uri_ && @@ -159,6 +165,7 @@ class XdsClientTest : public ::testing::Test { private: std::string server_uri_; bool fail_on_data_errors_ = false; + bool resource_timer_is_transient_failure_ = false; }; class FakeAuthority : public Authority { @@ -3315,6 +3322,13 @@ TEST_F(XdsClientTest, ConnectionFailsWithCachedResource) { } TEST_F(XdsClientTest, ResourceDoesNotExistUponTimeout) { + event_engine_->SetRunAfterDurationCallback( + [&](grpc_event_engine::experimental::EventEngine::Duration duration) { + grpc_event_engine::experimental::EventEngine::Duration expected = + std::chrono::seconds(15); + EXPECT_EQ(duration, expected) << "Expected: " << expected.count() + << "\nActual: " << duration.count(); + }); InitXdsClient(); // Start a watch for "foo1". auto watcher = StartFooWatch("foo1"); @@ -3416,6 +3430,231 @@ TEST_F(XdsClientTest, ResourceDoesNotExistUponTimeout) { EXPECT_TRUE(stream->IsOrphaned()); } +TEST_F(XdsClientTest, ResourceTimerIsTransientErrorIgnoredUnlessEnabled) { + event_engine_->SetRunAfterDurationCallback( + [&](grpc_event_engine::experimental::EventEngine::Duration duration) { + grpc_event_engine::experimental::EventEngine::Duration expected = + std::chrono::seconds(15); + EXPECT_EQ(duration, expected) << "Expected: " << expected.count() + << "\nActual: " << duration.count(); + }); + InitXdsClient(FakeXdsBootstrap::Builder().SetServers( + {FakeXdsBootstrap::FakeXdsServer(kDefaultXdsServerUrl, false, true)})); + // Start a watch for "foo1". + auto watcher = StartFooWatch("foo1"); + // Watcher should initially not see any resource reported. + EXPECT_FALSE(watcher->HasEvent()); + // Check metric data. + EXPECT_THAT(metrics_reporter_->resource_updates_valid(), + ::testing::ElementsAre()); + EXPECT_THAT(metrics_reporter_->resource_updates_invalid(), + ::testing::ElementsAre()); + EXPECT_THAT(GetResourceCounts(), + ::testing::ElementsAre(::testing::Pair( + ResourceCountLabelsEq(XdsClient::kOldStyleAuthority, + XdsFooResourceType::Get()->type_url(), + "requested"), + 1))); + // XdsClient should have created an ADS stream. + auto stream = WaitForAdsStream(); + ASSERT_TRUE(stream != nullptr); + // XdsClient should have sent a subscription request on the ADS stream. + auto request = WaitForRequest(stream.get()); + ASSERT_TRUE(request.has_value()); + CheckRequest(*request, XdsFooResourceType::Get()->type_url(), + /*version_info=*/"", /*response_nonce=*/"", + /*error_detail=*/absl::OkStatus(), + /*resource_names=*/{"foo1"}); + CheckRequestNode(*request); // Should be present on the first request. + // Do not send a response, but wait for the resource to be reported as + // not existing. + EXPECT_TRUE(watcher->WaitForDoesNotExist()); + // Check metric data. + EXPECT_TRUE(metrics_reporter_->WaitForMetricsReporterData( + ::testing::ElementsAre(), ::testing::ElementsAre(), ::testing::_)); + EXPECT_THAT(GetResourceCounts(), + ::testing::ElementsAre(::testing::Pair( + ResourceCountLabelsEq(XdsClient::kOldStyleAuthority, + XdsFooResourceType::Get()->type_url(), + "does_not_exist"), + 1))); + // Check CSDS data. + ClientConfig csds = DumpCsds(); + EXPECT_THAT(csds.generic_xds_configs(), + ::testing::ElementsAre(CsdsResourceDoesNotExistOnTimeout( + XdsFooResourceType::Get()->type_url(), "foo1"))); + // Start a new watcher for the same resource. It should immediately + // receive the same does-not-exist notification. + auto watcher2 = StartFooWatch("foo1"); + EXPECT_TRUE(watcher2->WaitForDoesNotExist()); + // Now server sends a response. + stream->SendMessageToClient( + ResponseBuilder(XdsFooResourceType::Get()->type_url()) + .set_version_info("1") + .set_nonce("A") + .AddFooResource(XdsFooResource("foo1", 6)) + .Serialize()); + // XdsClient should have delivered the response to the watchers. + auto resource = watcher->WaitForNextResource(); + ASSERT_NE(resource, nullptr); + EXPECT_EQ(resource->name, "foo1"); + EXPECT_EQ(resource->value, 6); + resource = watcher2->WaitForNextResource(); + ASSERT_NE(resource, nullptr); + EXPECT_EQ(resource->name, "foo1"); + EXPECT_EQ(resource->value, 6); + // Check metric data. + EXPECT_TRUE(metrics_reporter_->WaitForMetricsReporterData( + ::testing::ElementsAre(::testing::Pair( + ::testing::Pair(kDefaultXdsServerUrl, + XdsFooResourceType::Get()->type_url()), + 1)), + ::testing::ElementsAre(), ::testing::_)); + EXPECT_THAT( + GetResourceCounts(), + ::testing::ElementsAre(::testing::Pair( + ResourceCountLabelsEq(XdsClient::kOldStyleAuthority, + XdsFooResourceType::Get()->type_url(), "acked"), + 1))); + // Check CSDS data. + csds = DumpCsds(); + EXPECT_THAT(csds.generic_xds_configs(), + ::testing::UnorderedElementsAre(CsdsResourceAcked( + XdsFooResourceType::Get()->type_url(), "foo1", + resource->AsJsonString(), "1", TimestampProtoEq(kTime0)))); + // XdsClient should have sent an ACK message to the xDS server. + request = WaitForRequest(stream.get()); + ASSERT_TRUE(request.has_value()); + CheckRequest(*request, XdsFooResourceType::Get()->type_url(), + /*version_info=*/"1", /*response_nonce=*/"A", + /*error_detail=*/absl::OkStatus(), + /*resource_names=*/{"foo1"}); + // Cancel watch. + CancelFooWatch(watcher.get(), "foo1"); + CancelFooWatch(watcher2.get(), "foo1"); + EXPECT_TRUE(stream->IsOrphaned()); +} + +TEST_F(XdsClientTest, ResourceTimerIsTransientFailure) { + testing::ScopedEnvVar env_var("GRPC_EXPERIMENTAL_XDS_DATA_ERROR_HANDLING", + "true"); + event_engine_->SetRunAfterDurationCallback( + [&](grpc_event_engine::experimental::EventEngine::Duration duration) { + grpc_event_engine::experimental::EventEngine::Duration expected = + std::chrono::seconds(30); + EXPECT_EQ(duration, expected) << "Expected: " << expected.count() + << "\nActual: " << duration.count(); + }); + InitXdsClient(FakeXdsBootstrap::Builder().SetServers( + {FakeXdsBootstrap::FakeXdsServer(kDefaultXdsServerUrl, false, true)})); + // Start a watch for "foo1". + auto watcher = StartFooWatch("foo1"); + // Watcher should initially not see any resource reported. + EXPECT_FALSE(watcher->HasEvent()); + // Check metric data. + EXPECT_THAT(metrics_reporter_->resource_updates_valid(), + ::testing::ElementsAre()); + EXPECT_THAT(metrics_reporter_->resource_updates_invalid(), + ::testing::ElementsAre()); + EXPECT_THAT(GetResourceCounts(), + ::testing::ElementsAre(::testing::Pair( + ResourceCountLabelsEq(XdsClient::kOldStyleAuthority, + XdsFooResourceType::Get()->type_url(), + "requested"), + 1))); + // XdsClient should have created an ADS stream. + auto stream = WaitForAdsStream(); + ASSERT_TRUE(stream != nullptr); + // XdsClient should have sent a subscription request on the ADS stream. + auto request = WaitForRequest(stream.get()); + ASSERT_TRUE(request.has_value()); + CheckRequest(*request, XdsFooResourceType::Get()->type_url(), + /*version_info=*/"", /*response_nonce=*/"", + /*error_detail=*/absl::OkStatus(), + /*resource_names=*/{"foo1"}); + CheckRequestNode(*request); // Should be present on the first request. + // Do not send a response, but wait for the resource to be reported as + // not existing. + auto error = watcher->WaitForNextError(); + ASSERT_TRUE(error.has_value()); + EXPECT_EQ(error, absl::UnavailableError(absl::StrCat( + "timeout obtaining resource from xDS server ", + kDefaultXdsServerUrl, " (node ID:xds_client_test)"))); + // Check metric data. + EXPECT_TRUE(metrics_reporter_->WaitForMetricsReporterData( + ::testing::ElementsAre(), ::testing::ElementsAre(), ::testing::_)); + EXPECT_THAT(GetResourceCounts(), + ::testing::ElementsAre(::testing::Pair( + ResourceCountLabelsEq(XdsClient::kOldStyleAuthority, + XdsFooResourceType::Get()->type_url(), + "timeout"), + 1))); + // Check CSDS data. + ClientConfig csds = DumpCsds(); + EXPECT_THAT( + csds.generic_xds_configs(), + ::testing::UnorderedElementsAre(CsdsResourceEq( + ClientResourceStatus::TIMEOUT, XdsFooResourceType::Get()->type_url(), + "foo1", CsdsNoResourceFields(), + CsdsErrorDetailsOnly( + absl::StrCat("timeout obtaining resource from xDS server ", + kDefaultXdsServerUrl))))); + // Start a new watcher for the same resource. It should immediately + // receive the same does-not-exist notification. + auto watcher2 = StartFooWatch("foo1"); + error = watcher2->WaitForNextError(); + ASSERT_TRUE(error.has_value()); + EXPECT_EQ(error, absl::UnavailableError(absl::StrCat( + "timeout obtaining resource from xDS server ", + kDefaultXdsServerUrl, " (node ID:xds_client_test)"))); + // Now server sends a response. + stream->SendMessageToClient( + ResponseBuilder(XdsFooResourceType::Get()->type_url()) + .set_version_info("1") + .set_nonce("A") + .AddFooResource(XdsFooResource("foo1", 6)) + .Serialize()); + // XdsClient should have delivered the response to the watchers. + auto resource = watcher->WaitForNextResource(); + ASSERT_NE(resource, nullptr); + EXPECT_EQ(resource->name, "foo1"); + EXPECT_EQ(resource->value, 6); + resource = watcher2->WaitForNextResource(); + ASSERT_NE(resource, nullptr); + EXPECT_EQ(resource->name, "foo1"); + EXPECT_EQ(resource->value, 6); + // Check metric data. + EXPECT_TRUE(metrics_reporter_->WaitForMetricsReporterData( + ::testing::ElementsAre(::testing::Pair( + ::testing::Pair(kDefaultXdsServerUrl, + XdsFooResourceType::Get()->type_url()), + 1)), + ::testing::ElementsAre(), ::testing::_)); + EXPECT_THAT( + GetResourceCounts(), + ::testing::ElementsAre(::testing::Pair( + ResourceCountLabelsEq(XdsClient::kOldStyleAuthority, + XdsFooResourceType::Get()->type_url(), "acked"), + 1))); + // Check CSDS data. + csds = DumpCsds(); + EXPECT_THAT(csds.generic_xds_configs(), + ::testing::UnorderedElementsAre(CsdsResourceAcked( + XdsFooResourceType::Get()->type_url(), "foo1", + resource->AsJsonString(), "1", TimestampProtoEq(kTime0)))); + // XdsClient should have sent an ACK message to the xDS server. + request = WaitForRequest(stream.get()); + ASSERT_TRUE(request.has_value()); + CheckRequest(*request, XdsFooResourceType::Get()->type_url(), + /*version_info=*/"1", /*response_nonce=*/"A", + /*error_detail=*/absl::OkStatus(), + /*resource_names=*/{"foo1"}); + // Cancel watch. + CancelFooWatch(watcher.get(), "foo1"); + CancelFooWatch(watcher2.get(), "foo1"); + EXPECT_TRUE(stream->IsOrphaned()); +} + TEST_F(XdsClientTest, ResourceDoesNotExistAfterStreamRestart) { InitXdsClient(); // Metrics should initially be empty. diff --git a/tools/distrib/fix_build_deps.py b/tools/distrib/fix_build_deps.py index 4af5168641324..6f893e0f9e0ef 100755 --- a/tools/distrib/fix_build_deps.py +++ b/tools/distrib/fix_build_deps.py @@ -442,7 +442,6 @@ def score_best(proposed, existing): "grpc_yodel_simple_test": lambda **kwargs: None, "grpc_fuzzer": grpc_cc_library, "grpc_fuzz_test": grpc_cc_library, - "grpc_proto_fuzzer": grpc_cc_library, "grpc_proto_library": grpc_proto_library, "grpc_internal_proto_library": grpc_proto_library, "grpc_cc_proto_library": lambda **kwargs: None,