Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[pull] master from grpc:master #575

Merged
merged 7 commits into from
Jan 30, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion bazel/grpc_build_system.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ def _update_visibility(visibility):
"grpc_public_hdrs": PRIVATE,
"grpcpp_gcp_observability": PUBLIC,
"grpc_resolver_fake": PRIVATE,
"grpc++_public_hdrs": PUBLIC,
"grpc++_public_hdrs": PRIVATE,
"http": PRIVATE,
"httpcli": PRIVATE,
"iomgr_internal_errqueue": PRIVATE,
Expand Down
2 changes: 1 addition & 1 deletion include/grpc/status.h
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ typedef enum {
GRPC_STATUS_DATA_LOSS = 15,

/** Force users to include a default branch: */
GRPC_STATUS__DO_NOT_USE = -1
GRPC_STATUS__DO_NOT_USE = 0x7fffffffu,
} grpc_status_code;

#ifdef __cplusplus
Expand Down
23 changes: 14 additions & 9 deletions src/core/ext/transport/inproc/legacy_inproc_transport.cc
Original file line number Diff line number Diff line change
Expand Up @@ -185,16 +185,21 @@ struct inproc_stream {

if (!server_data) {
t->ref();
inproc_transport* st = t->other_side;
st->ref();
other_side = nullptr; // will get filled in soon
// Pass the client-side stream address to the server-side for a ref
ref("inproc_init_stream:clt"); // ref it now on behalf of server
// side to avoid destruction
GRPC_TRACE_LOG(inproc, INFO)
<< "calling accept stream cb " << st->accept_stream_cb << " "
<< st->accept_stream_data;
(*st->accept_stream_cb)(st->accept_stream_data, t, this);
inproc_transport* st = t->other_side;
if (st->accept_stream_cb == nullptr) {
cancel_stream_locked(this,
absl::UnavailableError("inproc server closed"));
} else {
st->ref();
// Pass the client-side stream address to the server-side for a ref
ref("inproc_init_stream:clt"); // ref it now on behalf of server
// side to avoid destruction
GRPC_TRACE_LOG(inproc, INFO)
<< "calling accept stream cb " << st->accept_stream_cb << " "
<< st->accept_stream_data;
(*st->accept_stream_cb)(st->accept_stream_data, t, this);
}
} else {
// This is the server-side and is being called through accept_stream_cb
inproc_stream* cs = const_cast<inproc_stream*>(
Expand Down
65 changes: 63 additions & 2 deletions src/core/lib/transport/metadata_batch.h
Original file line number Diff line number Diff line change
Expand Up @@ -369,12 +369,73 @@ struct SimpleIntBasedMetadata : public SimpleIntBasedMetadataBase<Int> {
};

// grpc-status metadata trait.
struct GrpcStatusMetadata
: public SimpleIntBasedMetadata<grpc_status_code, GRPC_STATUS_UNKNOWN> {
struct GrpcStatusMetadata {
using ValueType = grpc_status_code;
using MementoType = grpc_status_code;
static ValueType MementoToValue(MementoType value) { return value; }
static Slice Encode(ValueType x) { return Slice::FromInt64(x); }
static std::string DisplayValue(ValueType x) {
switch (x) {
case GRPC_STATUS_OK:
return "OK";
case GRPC_STATUS_CANCELLED:
return "CANCELLED";
case GRPC_STATUS_UNKNOWN:
return "UNKNOWN";
case GRPC_STATUS_INVALID_ARGUMENT:
return "INVALID_ARGUMENT";
case GRPC_STATUS_DEADLINE_EXCEEDED:
return "DEADLINE_EXCEEDED";
case GRPC_STATUS_NOT_FOUND:
return "NOT_FOUND";
case GRPC_STATUS_ALREADY_EXISTS:
return "ALREADY_EXISTS";
case GRPC_STATUS_PERMISSION_DENIED:
return "PERMISSION_DENIED";
case GRPC_STATUS_RESOURCE_EXHAUSTED:
return "RESOURCE_EXHAUSTED";
case GRPC_STATUS_FAILED_PRECONDITION:
return "FAILED_PRECONDITION";
case GRPC_STATUS_ABORTED:
return "ABORTED";
case GRPC_STATUS_OUT_OF_RANGE:
return "OUT_OF_RANGE";
case GRPC_STATUS_UNIMPLEMENTED:
return "UNIMPLEMENTED";
case GRPC_STATUS_INTERNAL:
return "INTERNAL";
case GRPC_STATUS_UNAVAILABLE:
return "UNAVAILABLE";
case GRPC_STATUS_DATA_LOSS:
return "DATA_LOSS";
case GRPC_STATUS_UNAUTHENTICATED:
return "UNAUTHENTICATED";
default:
return absl::StrCat("UNKNOWN(", static_cast<int>(x), ")");
}
}
static auto DisplayMemento(MementoType x) { return DisplayValue(x); }
static constexpr bool kRepeatable = false;
static constexpr bool kTransferOnTrailersOnly = false;
using CompressionTraits = SmallIntegralValuesCompressor<16>;
static absl::string_view key() { return "grpc-status"; }
static grpc_status_code ParseMemento(Slice value, bool,
MetadataParseErrorFn on_error) {
int64_t wire_value;
if (!absl::SimpleAtoi(value.as_string_view(), &wire_value)) {
on_error("not an integer", value);
return GRPC_STATUS_UNKNOWN;
}
if (wire_value < 0) {
on_error("negative value", value);
return GRPC_STATUS_UNKNOWN;
}
if (wire_value >= GRPC_STATUS__DO_NOT_USE) {
on_error("out of range", value);
return GRPC_STATUS_UNKNOWN;
}
return static_cast<grpc_status_code>(wire_value);
}
};

// grpc-previous-rpc-attempts metadata trait.
Expand Down
11 changes: 11 additions & 0 deletions src/core/xds/grpc/xds_server_grpc.cc
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ constexpr absl::string_view kServerFeatureIgnoreResourceDeletion =
constexpr absl::string_view kServerFeatureFailOnDataErrors =
"fail_on_data_errors";

constexpr absl::string_view kServerFeatureResourceTimerIsTransientFailure =
"resource_timer_is_transient_error";

constexpr absl::string_view kServerFeatureTrustedXdsServer =
"trusted_xds_server";

Expand All @@ -54,6 +57,12 @@ bool GrpcXdsServer::FailOnDataErrors() const {
server_features_.end();
}

bool GrpcXdsServer::ResourceTimerIsTransientFailure() const {
return server_features_.find(
std::string(kServerFeatureResourceTimerIsTransientFailure)) !=
server_features_.end();
}

bool GrpcXdsServer::TrustedXdsServer() const {
return server_features_.find(std::string(kServerFeatureTrustedXdsServer)) !=
server_features_.end();
Expand Down Expand Up @@ -135,6 +144,8 @@ void GrpcXdsServer::JsonPostLoad(const Json& json, const JsonArgs& args,
if (feature_json.type() == Json::Type::kString &&
(feature_json.string() == kServerFeatureIgnoreResourceDeletion ||
feature_json.string() == kServerFeatureFailOnDataErrors ||
feature_json.string() ==
kServerFeatureResourceTimerIsTransientFailure ||
feature_json.string() == kServerFeatureTrustedXdsServer)) {
server_features_.insert(feature_json.string());
}
Expand Down
1 change: 1 addition & 0 deletions src/core/xds/grpc/xds_server_grpc.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ class GrpcXdsServer final : public XdsBootstrap::XdsServer {

bool IgnoreResourceDeletion() const override;
bool FailOnDataErrors() const override;
bool ResourceTimerIsTransientFailure() const override;

bool TrustedXdsServer() const;

Expand Down
1 change: 1 addition & 0 deletions src/core/xds/xds_client/xds_bootstrap.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ class XdsBootstrap {
virtual bool IgnoreResourceDeletion() const = 0;

virtual bool FailOnDataErrors() const = 0;
virtual bool ResourceTimerIsTransientFailure() const = 0;

virtual bool Equals(const XdsServer& other) const = 0;

Expand Down
43 changes: 33 additions & 10 deletions src/core/xds/xds_client/xds_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -208,9 +208,16 @@ class XdsClient::XdsChannel::AdsCall final
if (state.HasResource()) return;
// Start timer.
ads_call_ = std::move(ads_call);
Duration timeout = ads_call_->xds_client()->request_timeout_;
if (timeout == Duration::Zero()) {
timeout = XdsDataErrorHandlingEnabled() &&
ads_call_->xds_channel()
->server_.ResourceTimerIsTransientFailure()
? Duration::Seconds(30)
: Duration::Seconds(15);
}
timer_handle_ = ads_call_->xds_client()->engine()->RunAfter(
ads_call_->xds_client()->request_timeout_,
[self = Ref(DEBUG_LOCATION, "timer")]() {
timeout, [self = Ref(DEBUG_LOCATION, "timer")]() {
ApplicationCallbackExecCtx callback_exec_ctx;
ExecCtx exec_ctx;
self->OnTimer();
Expand All @@ -236,7 +243,15 @@ class XdsClient::XdsChannel::AdsCall final
name_.authority, type_->type_url(), name_.key)
<< "} from xds server";
resource_seen_ = true;
state.SetDoesNotExistOnTimeout();
if (XdsDataErrorHandlingEnabled() &&
ads_call_->xds_channel()
->server_.ResourceTimerIsTransientFailure()) {
state.SetTimeout(
absl::StrCat("timeout obtaining resource from xDS server ",
ads_call_->xds_channel()->server_uri()));
} else {
state.SetDoesNotExistOnTimeout();
}
ads_call_->xds_client()->NotifyWatchersOnResourceChanged(
state.failed_status(), state.watchers(),
ReadDelayHandle::NoWait());
Expand Down Expand Up @@ -1302,18 +1317,12 @@ void XdsClient::ResourceState::SetNacked(const std::string& version,
serialized_proto_.clear();
}
client_status_ = ClientResourceStatus::NACKED;
failed_version_ = version;
failed_status_ =
absl::InvalidArgumentError(absl::StrCat("invalid resource: ", details));
failed_version_ = version;
failed_update_time_ = update_time;
}

void XdsClient::ResourceState::SetDoesNotExistOnTimeout() {
client_status_ = ClientResourceStatus::DOES_NOT_EXIST;
failed_status_ = absl::NotFoundError("does not exist");
failed_version_.clear();
}

void XdsClient::ResourceState::SetDoesNotExistOnLdsOrCdsDeletion(
const std::string& version, Timestamp update_time,
bool drop_cached_resource) {
Expand All @@ -1327,6 +1336,18 @@ void XdsClient::ResourceState::SetDoesNotExistOnLdsOrCdsDeletion(
failed_update_time_ = update_time;
}

void XdsClient::ResourceState::SetDoesNotExistOnTimeout() {
client_status_ = ClientResourceStatus::DOES_NOT_EXIST;
failed_status_ = absl::NotFoundError("does not exist");
failed_version_.clear();
}

void XdsClient::ResourceState::SetTimeout(const std::string& details) {
client_status_ = ClientResourceStatus::TIMEOUT;
failed_status_ = absl::UnavailableError(details);
failed_version_.clear();
}

absl::string_view XdsClient::ResourceState::CacheStateString() const {
switch (client_status_) {
case ClientResourceStatus::REQUESTED:
Expand All @@ -1338,6 +1359,8 @@ absl::string_view XdsClient::ResourceState::CacheStateString() const {
return "acked";
case ClientResourceStatus::NACKED:
return resource_ != nullptr ? "nacked_but_cached" : "nacked";
case ClientResourceStatus::TIMEOUT:
return "timeout";
}
Crash("unknown resource state");
}
Expand Down
17 changes: 13 additions & 4 deletions src/core/xds/xds_client/xds_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,9 @@ class XdsClient : public DualRefCounted<XdsClient> {
std::shared_ptr<grpc_event_engine::experimental::EventEngine> engine,
std::unique_ptr<XdsMetricsReporter> metrics_reporter,
std::string user_agent_name, std::string user_agent_version,
Duration resource_request_timeout = Duration::Seconds(15));
// This parameter overrides the timer duration for testing
// purposes only -- do not use in production.
Duration resource_request_timeout = Duration::Zero());
~XdsClient() override;

// Start and cancel watch for a resource.
Expand Down Expand Up @@ -276,6 +278,9 @@ class XdsClient : public DualRefCounted<XdsClient> {
ACKED,
// Client received this resource and replied with NACK.
NACKED,
// Client encountered timeout getting resource from server.
// TODO(roth): Remove explicit value when adding RECEIVED_ERROR.
TIMEOUT = 6,
};
static_assert(static_cast<ClientResourceStatus>(envoy_admin_v3_REQUESTED) ==
ClientResourceStatus::REQUESTED);
Expand All @@ -286,6 +291,8 @@ class XdsClient : public DualRefCounted<XdsClient> {
ClientResourceStatus::ACKED);
static_assert(static_cast<ClientResourceStatus>(envoy_admin_v3_NACKED) ==
ClientResourceStatus::NACKED);
static_assert(static_cast<ClientResourceStatus>(envoy_admin_v3_TIMEOUT) ==
ClientResourceStatus::TIMEOUT);

void AddWatcher(RefCountedPtr<ResourceWatcherInterface> watcher) {
watchers_.insert(std::move(watcher));
Expand All @@ -301,10 +308,11 @@ class XdsClient : public DualRefCounted<XdsClient> {
Timestamp update_time);
void SetNacked(const std::string& version, absl::string_view details,
Timestamp update_time, bool drop_cached_resource);
void SetDoesNotExistOnTimeout();
void SetDoesNotExistOnLdsOrCdsDeletion(const std::string& version,
Timestamp update_time,
bool drop_cached_resource);
void SetDoesNotExistOnTimeout();
void SetTimeout(const std::string& details);

ClientResourceStatus client_status() const { return client_status_; }
absl::string_view CacheStateString() const;
Expand Down Expand Up @@ -332,11 +340,12 @@ class XdsClient : public DualRefCounted<XdsClient> {
Timestamp update_time_;
// The last successfully updated version of the resource.
std::string version_;
// Details about the last failed update attempt or transient error.
absl::Status failed_status_;
// The rejected version string of the last failed update attempt.
std::string failed_version_;
// Details about the last failed update attempt.
absl::Status failed_status_;
// Timestamp of the last failed update attempt.
// Used only if failed_version_ is non-empty.
Timestamp failed_update_time_;
};

Expand Down
5 changes: 4 additions & 1 deletion test/core/end2end/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,10 @@ grpc_core_end2end_test(

grpc_core_end2end_test(name = "http2_stats")

grpc_core_end2end_test(name = "invoke_large_request")
grpc_core_end2end_test(
name = "invoke_large_request",
shard_count = 5,
)

grpc_core_end2end_test(name = "keepalive_timeout")

Expand Down
8 changes: 8 additions & 0 deletions test/core/end2end/end2end_tests.h
Original file line number Diff line number Diff line change
Expand Up @@ -719,6 +719,14 @@ core_end2end_test_fuzzer::Msg ParseTestProto(std::string text);
void suite##_##name(const grpc_core::CoreTestConfiguration* config, \
core_end2end_test_fuzzer::Msg msg) { \
if (absl::StartsWith(#name, "DISABLED_")) GTEST_SKIP() << "disabled test"; \
if (!IsEventEngineListenerEnabled() || !IsEventEngineClientEnabled() || \
!IsEventEngineDnsEnabled()) { \
GTEST_SKIP() << "fuzzers need event engine"; \
} \
if (IsEventEngineDnsNonClientChannelEnabled()) { \
GTEST_SKIP() << "event_engine_dns_non_client_channel experiment breaks " \
"fuzzing currently"; \
} \
CoreEnd2endTest_##suite##_##name(config, &msg).RunTest(); \
grpc_event_engine::experimental::ShutdownDefaultEventEngine(); \
} \
Expand Down
Loading
Loading