Skip to content

Commit

Permalink
lb: apis for async host resolution (#38021)
Browse files Browse the repository at this point in the history
refactor for #38007

This adds async APIs for chooseHost. All endpoints treat async
resolution as resolution failure, canceling the lookup and proceeding
with null host.

Risk Level: medium
Testing: in followup
Docs Changes: n/a
Release Notes: n/a

---------

Signed-off-by: Alyssa Wilk <[email protected]>
  • Loading branch information
alyssawilk authored Jan 15, 2025
1 parent 7ab73d0 commit 504a4db
Show file tree
Hide file tree
Showing 66 changed files with 1,300 additions and 1,096 deletions.
62 changes: 61 additions & 1 deletion envoy/upstream/load_balancer.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,41 @@ namespace Upstream {

using ClusterProto = envoy::config::cluster::v3::Cluster;

/*
* A handle to allow cancelation of asynchronous host selection.
* If chooseHost returns a HostSelectionResponse with an AsyncHostSelectionHandle
* handle, and the endpoint does not wish to receive onAsyncHostSelction call,
* it must call cancel() on the provided handle.
*
* Please note that the AsyncHostSelectionHandle may be deleted after the
* cancel() call. It is up to the implemention of the asynchronous load balancer
* to ensure the cancelation state persists until the load balancer checks it.
*/
class AsyncHostSelectionHandle {
public:
AsyncHostSelectionHandle& operator=(const AsyncHostSelectionHandle&) = delete;
virtual ~AsyncHostSelectionHandle() = default;
virtual void cancel() PURE;
};

/*
* The response to a LoadBalancer::chooseHost call.
*
* chooseHost either returns a host directly or, in the case of asynchronous
* load balancing, returns an AsyncHostSelectionHandle handle.
*
* If it returns a AsyncHostSelectionHandle handle, the load balancer guarantees an
* eventual call to LoadBalancerContext::onAsyncHostSelction unless
* AsyncHostSelectionHandle::cancel is called.
*/
struct HostSelectionResponse {
HostSelectionResponse(HostConstSharedPtr host,
std::unique_ptr<AsyncHostSelectionHandle> cancelable = nullptr)
: host(host), cancelable(std::move(cancelable)) {}
HostConstSharedPtr host;
std::unique_ptr<AsyncHostSelectionHandle> cancelable;
};

/**
* Context information passed to a load balancer to use when choosing a host. Not all load
* balancers make use of all context information.
Expand Down Expand Up @@ -113,6 +148,11 @@ class LoadBalancerContext {
* and return the corresponding host directly.
*/
virtual absl::optional<OverrideHost> overrideHostToSelect() const PURE;

/* Called by the load balancer when asynchronous host selection completes
* @param host supplies the upstream host selected
*/
virtual void onAsyncHostSelection(HostConstSharedPtr&& host) PURE;
};

/**
Expand All @@ -130,13 +170,33 @@ class LoadBalancer {
public:
virtual ~LoadBalancer() = default;

/*
* This is a convenience wrapper function for code which does not yet support
* asynchronous host selection. It cancels any asynchronous lookup and treats
* it as host selection failure.
*/
static HostConstSharedPtr
onlyAllowSynchronousHostSelection(HostSelectionResponse host_selection) {
if (host_selection.cancelable) {
// Async host selection not handled yet. Treat this as host selection
// failure.
host_selection.cancelable->cancel();
}
return std::move(host_selection.host);
}

/**
* Ask the load balancer for the next host to use depending on the underlying LB algorithm.
* @param context supplies the load balancer context. Not all load balancers make use of all
* context information. Load balancers should be written to assume that context information
* is missing and use sensible defaults.
* @return a HostSelectionResponse either containing a host, or AsyncHostSelectionHandle handle.
*
* Please note that asynchronous host selection is not yet fully supported in
* Envoy. All endpoints will treat asynchronous resolution as host resolution
* failure. TODO(alyssawilk) land #38007
*/
virtual HostConstSharedPtr chooseHost(LoadBalancerContext* context) PURE;
virtual HostSelectionResponse chooseHost(LoadBalancerContext* context) PURE;

/**
* Returns a best effort prediction of the next host to be picked, or nullptr if not predictable.
Expand Down
2 changes: 1 addition & 1 deletion envoy/upstream/thread_local_cluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ class ThreadLocalCluster {
* @return host the next host selected by the load balancer or null if no host
* is available.
*/
virtual HostConstSharedPtr chooseHost(LoadBalancerContext* context) PURE;
virtual HostSelectionResponse chooseHost(LoadBalancerContext* context) PURE;

/**
* Allocate a load balanced HTTP connection pool for a cluster. This is *per-thread* so that
Expand Down
3 changes: 2 additions & 1 deletion source/common/router/router.cc
Original file line number Diff line number Diff line change
Expand Up @@ -863,7 +863,8 @@ Filter::createConnPool(Upstream::ThreadLocalCluster& thread_local_cluster) {
}
}

Upstream::HostConstSharedPtr host = thread_local_cluster.chooseHost(this);
Upstream::HostConstSharedPtr host = Upstream::LoadBalancer::onlyAllowSynchronousHostSelection(
thread_local_cluster.chooseHost(this));
if (!host) {
return nullptr;
}
Expand Down
3 changes: 2 additions & 1 deletion source/common/tcp_proxy/tcp_proxy.cc
Original file line number Diff line number Diff line change
Expand Up @@ -569,7 +569,8 @@ bool Filter::maybeTunnel(Upstream::ThreadLocalCluster& cluster) {
config_->regexEngine()),
std::unique_ptr<Http::NullRouteImpl>);
}
Upstream::HostConstSharedPtr host = cluster.chooseHost(this);
Upstream::HostConstSharedPtr host =
Upstream::LoadBalancer::onlyAllowSynchronousHostSelection(cluster.chooseHost(this));
if (host) {
generic_conn_pool_ = factory->createGenericConnPool(
host, cluster, config_->tunnelingConfigHelper(), this, *upstream_callbacks_,
Expand Down
22 changes: 13 additions & 9 deletions source/common/upstream/cluster_manager_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1247,7 +1247,7 @@ ClusterManagerImpl::ThreadLocalClusterManagerImpl::ClusterEntry::tcpConnPool(
absl::optional<TcpPoolData>
ClusterManagerImpl::ThreadLocalClusterManagerImpl::ClusterEntry::tcpConnPool(
ResourcePriority priority, LoadBalancerContext* context) {
HostConstSharedPtr host = chooseHost(context);
HostConstSharedPtr host = LoadBalancer::onlyAllowSynchronousHostSelection(chooseHost(context));
if (!host) {
return absl::nullopt;
}
Expand Down Expand Up @@ -1624,7 +1624,9 @@ void ClusterManagerImpl::postThreadLocalHealthFailure(const HostSharedPtr& host)

Host::CreateConnectionData ClusterManagerImpl::ThreadLocalClusterManagerImpl::ClusterEntry::tcpConn(
LoadBalancerContext* context) {
HostConstSharedPtr logical_host = chooseHost(context);
HostConstSharedPtr logical_host =
LoadBalancer::onlyAllowSynchronousHostSelection(chooseHost(context));

if (logical_host) {
auto conn_info = logical_host->createConnection(
parent_.thread_local_dispatcher_, nullptr,
Expand Down Expand Up @@ -2214,23 +2216,25 @@ void ClusterManagerImpl::ThreadLocalClusterManagerImpl::httpConnPoolIsIdle(
}
}

HostConstSharedPtr ClusterManagerImpl::ThreadLocalClusterManagerImpl::ClusterEntry::chooseHost(
HostSelectionResponse ClusterManagerImpl::ThreadLocalClusterManagerImpl::ClusterEntry::chooseHost(
LoadBalancerContext* context) {
auto cross_priority_host_map = priority_set_.crossPriorityHostMap();
HostConstSharedPtr host = HostUtility::selectOverrideHost(cross_priority_host_map.get(),
override_host_statuses_, context);
if (host != nullptr) {
return host;
return {std::move(host)};
}

if (HostUtility::allowLBChooseHost(context)) {
host = lb_->chooseHost(context);
}
if (host) {
return host;
Upstream::HostSelectionResponse host_selection = lb_->chooseHost(context);
if (host_selection.host || host_selection.cancelable) {
return host_selection;
}
}

cluster_info_->trafficStats()->upstream_cx_none_healthy_.inc();
ENVOY_LOG(debug, "no healthy host");
return nullptr;
return {nullptr};
}

HostConstSharedPtr ClusterManagerImpl::ThreadLocalClusterManagerImpl::ClusterEntry::peekAnotherHost(
Expand Down
2 changes: 1 addition & 1 deletion source/common/upstream/cluster_manager_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -582,7 +582,7 @@ class ClusterManagerImpl : public ClusterManager,
const PrioritySet& prioritySet() override { return priority_set_; }
ClusterInfoConstSharedPtr info() override { return cluster_info_; }
LoadBalancer& loadBalancer() override { return *lb_; }
HostConstSharedPtr chooseHost(LoadBalancerContext* context) override;
HostSelectionResponse chooseHost(LoadBalancerContext* context) override;
absl::optional<HttpPoolData> httpConnPool(HostConstSharedPtr host, ResourcePriority priority,
absl::optional<Http::Protocol> downstream_protocol,
LoadBalancerContext* context) override;
Expand Down
2 changes: 2 additions & 0 deletions source/common/upstream/load_balancer_context_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ class LoadBalancerContextBase : public LoadBalancerContext {
}

absl::optional<OverrideHost> overrideHostToSelect() const override { return {}; }

void onAsyncHostSelection(HostConstSharedPtr&&) override {}
};

} // namespace Upstream
Expand Down
6 changes: 3 additions & 3 deletions source/extensions/clusters/aggregate/cluster.cc
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ absl::optional<uint32_t> AggregateClusterLoadBalancer::LoadBalancerImpl::hostToL
}
}

Upstream::HostConstSharedPtr
Upstream::HostSelectionResponse
AggregateClusterLoadBalancer::LoadBalancerImpl::chooseHost(Upstream::LoadBalancerContext* context) {
const Upstream::HealthyAndDegradedLoad* priority_loads = nullptr;
if (context != nullptr) {
Expand All @@ -174,12 +174,12 @@ AggregateClusterLoadBalancer::LoadBalancerImpl::chooseHost(Upstream::LoadBalance
return cluster->loadBalancer().chooseHost(&aggregate_context);
}

Upstream::HostConstSharedPtr
Upstream::HostSelectionResponse
AggregateClusterLoadBalancer::chooseHost(Upstream::LoadBalancerContext* context) {
if (load_balancer_) {
return load_balancer_->chooseHost(context);
}
return nullptr;
return {nullptr};
}

Upstream::HostConstSharedPtr
Expand Down
4 changes: 2 additions & 2 deletions source/extensions/clusters/aggregate/cluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ class AggregateClusterLoadBalancer : public Upstream::LoadBalancer,
void onClusterRemoval(const std::string& cluster_name) override;

// Upstream::LoadBalancer
Upstream::HostConstSharedPtr chooseHost(Upstream::LoadBalancerContext* context) override;
Upstream::HostSelectionResponse chooseHost(Upstream::LoadBalancerContext* context) override;
Upstream::HostConstSharedPtr peekAnotherHost(Upstream::LoadBalancerContext*) override;
absl::optional<Upstream::SelectedPoolAndConnection>
selectExistingConnection(Upstream::LoadBalancerContext* /*context*/,
Expand All @@ -102,7 +102,7 @@ class AggregateClusterLoadBalancer : public Upstream::LoadBalancer,
priority_context_(priority_context) {}

// Upstream::LoadBalancer
Upstream::HostConstSharedPtr chooseHost(Upstream::LoadBalancerContext* context) override;
Upstream::HostSelectionResponse chooseHost(Upstream::LoadBalancerContext* context) override;
// Preconnecting not yet implemented for extensions.
Upstream::HostConstSharedPtr peekAnotherHost(Upstream::LoadBalancerContext*) override {
return nullptr;
Expand Down
12 changes: 6 additions & 6 deletions source/extensions/clusters/dynamic_forward_proxy/cluster.cc
Original file line number Diff line number Diff line change
Expand Up @@ -191,8 +191,8 @@ Cluster::createSubClusterConfig(const std::string& cluster_name, const std::stri
return std::make_pair(true, absl::make_optional(config));
}

Upstream::HostConstSharedPtr Cluster::chooseHost(absl::string_view host,
Upstream::LoadBalancerContext* context) const {
Upstream::HostSelectionResponse Cluster::chooseHost(absl::string_view host,
Upstream::LoadBalancerContext* context) const {
uint16_t default_port = 80;
if (info_->transportSocketMatcher()
.resolve(nullptr, nullptr)
Expand All @@ -211,7 +211,7 @@ Upstream::HostConstSharedPtr Cluster::chooseHost(absl::string_view host,
auto cluster = cm_.getThreadLocalCluster(cluster_name);
if (cluster == nullptr) {
ENVOY_LOG(debug, "cluster='{}' get thread local failed, too short ttl?", cluster_name);
return nullptr;
return {nullptr};
}

return cluster->loadBalancer().chooseHost(context);
Expand Down Expand Up @@ -350,10 +350,10 @@ void Cluster::onDnsHostRemove(const std::string& host) {
updatePriorityState({}, hosts_removed);
}

Upstream::HostConstSharedPtr
Upstream::HostSelectionResponse
Cluster::LoadBalancer::chooseHost(Upstream::LoadBalancerContext* context) {
if (!context) {
return nullptr;
return {nullptr};
}

const Router::StringAccessor* dynamic_host_filter_state = nullptr;
Expand Down Expand Up @@ -394,7 +394,7 @@ Cluster::LoadBalancer::chooseHost(Upstream::LoadBalancerContext* context) {

if (raw_host.empty()) {
ENVOY_LOG(debug, "host empty");
return nullptr;
return {nullptr};
}
std::string host = Common::DynamicForwardProxy::DnsHostInfo::normalizeHostForDfp(raw_host, port);

Expand Down
6 changes: 3 additions & 3 deletions source/extensions/clusters/dynamic_forward_proxy/cluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ class Cluster : public Upstream::BaseDynamicClusterImpl,

bool allowCoalescedConnections() const { return allow_coalesced_connections_; }
bool enableSubCluster() const override { return enable_sub_cluster_; }
Upstream::HostConstSharedPtr chooseHost(absl::string_view host,
Upstream::LoadBalancerContext* context) const;
Upstream::HostSelectionResponse chooseHost(absl::string_view host,
Upstream::LoadBalancerContext* context) const;

// Extensions::Common::DynamicForwardProxy::DfpCluster
std::pair<bool, absl::optional<envoy::config::cluster::v3::Cluster>>
Expand Down Expand Up @@ -98,7 +98,7 @@ class Cluster : public Upstream::BaseDynamicClusterImpl,
// DfpLb
Upstream::HostConstSharedPtr findHostByName(const std::string& host) const override;
// Upstream::LoadBalancer
Upstream::HostConstSharedPtr chooseHost(Upstream::LoadBalancerContext* context) override;
Upstream::HostSelectionResponse chooseHost(Upstream::LoadBalancerContext* context) override;
// Preconnecting not implemented.
Upstream::HostConstSharedPtr peekAnotherHost(Upstream::LoadBalancerContext*) override {
return nullptr;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ OriginalDstClusterHandle::~OriginalDstClusterHandle() {
dispatcher.post([cluster = std::move(cluster)]() mutable { cluster.reset(); });
}

HostConstSharedPtr OriginalDstCluster::LoadBalancer::chooseHost(LoadBalancerContext* context) {
HostSelectionResponse OriginalDstCluster::LoadBalancer::chooseHost(LoadBalancerContext* context) {
if (context) {
// Check if filter state override is present, if yes use it before anything else.
Network::Address::InstanceConstSharedPtr dst_host = filterStateOverrideHost(context);
Expand Down Expand Up @@ -91,15 +91,15 @@ HostConstSharedPtr OriginalDstCluster::LoadBalancer::chooseHost(LoadBalancerCont
parent->cluster_->addHost(host);
}
});
return host;
return {host};
} else {
ENVOY_LOG(debug, "Failed to create host for {}.", dst_addr.asString());
}
}
}
// TODO(ramaraochavali): add a stat and move this log line to debug.
ENVOY_LOG(warn, "original_dst_load_balancer: No downstream connection or no original_dst.");
return nullptr;
return {nullptr};
}

Network::Address::InstanceConstSharedPtr
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ class OriginalDstCluster : public ClusterImplBase {
host_map_(parent->cluster_->getCurrentHostMap()) {}

// Upstream::LoadBalancer
HostConstSharedPtr chooseHost(LoadBalancerContext* context) override;
HostSelectionResponse chooseHost(LoadBalancerContext* context) override;
// Preconnecting is not implemented for OriginalDstCluster
HostConstSharedPtr peekAnotherHost(LoadBalancerContext*) override { return nullptr; }
// Pool selection not implemented for OriginalDstCluster
Expand Down
9 changes: 5 additions & 4 deletions source/extensions/clusters/redis/redis_cluster_lb.cc
Original file line number Diff line number Diff line change
Expand Up @@ -126,26 +126,27 @@ Upstream::HostConstSharedPtr chooseRandomHost(const Upstream::HostSetImpl& host_
}
} // namespace

Upstream::HostConstSharedPtr RedisClusterLoadBalancerFactory::RedisClusterLoadBalancer::chooseHost(
Upstream::HostSelectionResponse
RedisClusterLoadBalancerFactory::RedisClusterLoadBalancer::chooseHost(
Envoy::Upstream::LoadBalancerContext* context) {
if (!slot_array_) {
return nullptr;
return {nullptr};
}
absl::optional<uint64_t> hash;
if (context) {
hash = context->computeHashKey();
}

if (!hash) {
return nullptr;
return {nullptr};
}

RedisShardSharedPtr shard;
if (dynamic_cast<const RedisSpecifyShardContextImpl*>(context)) {
if (hash.value() < shard_vector_->size()) {
shard = shard_vector_->at(hash.value());
} else {
return nullptr;
return {nullptr};
}
} else {
shard = shard_vector_->at(
Expand Down
2 changes: 1 addition & 1 deletion source/extensions/clusters/redis/redis_cluster_lb.h
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ class RedisClusterLoadBalancerFactory : public ClusterSlotUpdateCallBack,
random_(random) {}

// Upstream::LoadBalancerBase
Upstream::HostConstSharedPtr chooseHost(Upstream::LoadBalancerContext*) override;
Upstream::HostSelectionResponse chooseHost(Upstream::LoadBalancerContext*) override;
Upstream::HostConstSharedPtr peekAnotherHost(Upstream::LoadBalancerContext*) override {
return nullptr;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,8 @@ uint16_t InstanceImpl::ThreadLocalPool::shardSize() {
for (uint16_t size = 0;; size++) {
Clusters::Redis::RedisSpecifyShardContextImpl lb_context(
size, request, Common::Redis::Client::ReadPolicy::Primary);
Upstream::HostConstSharedPtr host = cluster_->loadBalancer().chooseHost(&lb_context);
Upstream::HostConstSharedPtr host = Upstream::LoadBalancer::onlyAllowSynchronousHostSelection(
cluster_->loadBalancer().chooseHost(&lb_context));
if (!host) {
return size;
}
Expand All @@ -313,7 +314,8 @@ InstanceImpl::ThreadLocalPool::makeRequest(const std::string& key, RespVariant&&
key, config_->enableHashtagging(), is_redis_cluster_, getRequest(request),
transaction.active_ ? Common::Redis::Client::ReadPolicy::Primary : config_->readPolicy());

Upstream::HostConstSharedPtr host = cluster_->loadBalancer().chooseHost(&lb_context);
Upstream::HostConstSharedPtr host = Upstream::LoadBalancer::onlyAllowSynchronousHostSelection(
cluster_->loadBalancer().chooseHost(&lb_context));
if (!host) {
ENVOY_LOG(debug, "host not found: '{}'", key);
return nullptr;
Expand All @@ -336,7 +338,8 @@ InstanceImpl::ThreadLocalPool::makeRequestToShard(uint16_t shard_index, RespVari
shard_index, getRequest(request),
transaction.active_ ? Common::Redis::Client::ReadPolicy::Primary : config_->readPolicy());

Upstream::HostConstSharedPtr host = cluster_->loadBalancer().chooseHost(&lb_context);
Upstream::HostConstSharedPtr host = Upstream::LoadBalancer::onlyAllowSynchronousHostSelection(
cluster_->loadBalancer().chooseHost(&lb_context));
if (!host) {
ENVOY_LOG(debug, "host not found: '{}'", shard_index);
return nullptr;
Expand Down
Loading

0 comments on commit 504a4db

Please sign in to comment.