Skip to content

Commit

Permalink
aws: Add aws cluster manager capability (#38205)
Browse files Browse the repository at this point in the history
Commit Message: aws: Adds aws cluster manager capability
Additional Description: 

Part 1 of 3 patches, to address customer reported bugs in the aws
request signing extension when using the route discovery service.

This component will add cluster management (via a pinned singleton), and
split the cluster creation and management logic from the credential
refresh logic. It will also remove the flawed UUID logic from cluster
naming, which will prevent excessive numbers of clusters being created
during RDS update.

When integrated, this should substantially improve credential retrieval
behaviour following xDS changes, as the clusters will persist for the
life of the server rather than being continually deleted and recreated.

Risk Level: N/A - not integrated
Testing: Unit
Docs Changes:
Release Notes:
Platform Specific Features:
[Optional Runtime guard:]
[Optional Fixes #Issue]
[Optional Fixes commit #PR or SHA]
[Optional Deprecated:]
[Optional [API
Considerations](https://github.com/envoyproxy/envoy/blob/main/api/review_checklist.md):]

---------

Signed-off-by: Nigel Brittain <[email protected]>
  • Loading branch information
nbaws authored Jan 30, 2025
1 parent 3d26e01 commit ca5c700
Show file tree
Hide file tree
Showing 5 changed files with 514 additions and 0 deletions.
15 changes: 15 additions & 0 deletions source/extensions/common/aws/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,21 @@ envoy_cc_library(
],
)

envoy_cc_library(
name = "aws_cluster_manager_lib",
srcs = ["aws_cluster_manager.cc"],
hdrs = ["aws_cluster_manager.h"],
deps = [
":utility_lib",
"//envoy/http:message_interface",
"//envoy/singleton:manager_interface",
"//source/common/common:cleanup_lib",
"//source/common/init:target_lib",
"@envoy_api//envoy/config/cluster/v3:pkg_cc_proto",
"@envoy_api//envoy/config/core/v3:pkg_cc_proto",
],
)

envoy_cc_library(
name = "signer_base_impl",
srcs = ["signer_base_impl.cc"],
Expand Down
134 changes: 134 additions & 0 deletions source/extensions/common/aws/aws_cluster_manager.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
#include "source/extensions/common/aws/aws_cluster_manager.h"

namespace Envoy {
namespace Extensions {
namespace Common {
namespace Aws {

AwsClusterManager::AwsClusterManager(Server::Configuration::ServerFactoryContext& context)
: context_(context) {

// If we are still initializing, defer cluster creation using an init target
if (context_.initManager().state() == Envoy::Init::Manager::State::Initialized) {
queue_clusters_.exchange(false);
cm_handle_ = context_.clusterManager().addThreadLocalClusterUpdateCallbacks(*this);
} else {
init_target_ = std::make_unique<Init::TargetImpl>("aws_cluster_manager", [this]() -> void {
queue_clusters_.exchange(false);
cm_handle_ = context_.clusterManager().addThreadLocalClusterUpdateCallbacks(*this);
createQueuedClusters();

init_target_->ready();
init_target_.reset();
});
context_.initManager().add(*init_target_);
}
// We're pinned, so ensure that we remove our cluster update callbacks before cluster manager
// terminates
shutdown_handle_ = context.lifecycleNotifier().registerCallback(
Server::ServerLifecycleNotifier::Stage::ShutdownExit,
[&](Event::PostCb) { cm_handle_.reset(); });
};

absl::StatusOr<AwsManagedClusterUpdateCallbacksHandlePtr>
AwsClusterManager::addManagedClusterUpdateCallbacks(absl::string_view cluster_name,
AwsManagedClusterUpdateCallbacks& cb) {
auto it = managed_clusters_.find(cluster_name);
ENVOY_LOG_MISC(debug, "Adding callback for cluster {}", cluster_name);
if (it == managed_clusters_.end()) {
return absl::InvalidArgumentError("Cluster not found");
}
auto managed_cluster = it->second.get();
// If the cluster is already alive, signal the callback immediately to start retrieving
// credentials
if (!managed_cluster->is_creating_) {
ENVOY_LOG_MISC(debug, "Managed cluster {} is ready immediately, calling callback",
cluster_name);
cb.onClusterAddOrUpdate();
return absl::AlreadyExistsError("Cluster already online");
}
return std::make_unique<AwsManagedClusterUpdateCallbacksHandle>(
cb, managed_cluster->update_callbacks_);
}

void AwsClusterManager::onClusterAddOrUpdate(absl::string_view cluster_name,
Upstream::ThreadLocalClusterCommand&) {
// Mark our cluster as ready for use
auto it = managed_clusters_.find(cluster_name);
if (it != managed_clusters_.end()) {
auto managed_cluster = it->second.get();
managed_cluster->is_creating_.store(false);
for (auto& cb : managed_cluster->update_callbacks_) {
ENVOY_LOG_MISC(debug, "Managed cluster {} is ready, calling callback", cluster_name);
cb->onClusterAddOrUpdate();
}
}
}

// No removal handler required, as we are using avoid_cds_removal flag
void AwsClusterManager::onClusterRemoval(const std::string&){};

void AwsClusterManager::createQueuedClusters() {
std::vector<std::string> failed_clusters;
for (const auto& it : managed_clusters_) {
auto cluster_name = it.first;
auto cluster_type = it.second->cluster_type_;
auto uri = it.second->uri_;
auto cluster = Utility::createInternalClusterStatic(cluster_name, cluster_type, uri);
auto status = context_.clusterManager().addOrUpdateCluster(cluster, "", true);
if (!status.ok()) {
ENVOY_LOG_MISC(debug, "Failed to add cluster {} to cluster manager: {}", cluster_name,
status.status().ToString());
failed_clusters.push_back(cluster_name);
}
}
for (const auto& cluster_name : failed_clusters) {
managed_clusters_.erase(cluster_name);
}
}

absl::Status AwsClusterManager::addManagedCluster(
absl::string_view cluster_name,
const envoy::config::cluster::v3::Cluster::DiscoveryType cluster_type, absl::string_view uri) {

auto it = managed_clusters_.find(cluster_name);
if (it == managed_clusters_.end()) {
auto new_cluster = std::make_unique<CredentialsProviderCluster>(cluster_type, std::string(uri));
auto inserted = managed_clusters_.insert({std::string(cluster_name), std::move(new_cluster)});
if (inserted.second) {
it = inserted.first;
it->second->is_creating_.store(true);
ENVOY_LOG_MISC(debug, "Added cluster {} to list, cluster list len {}", cluster_name,
managed_clusters_.size());

auto cluster = Utility::createInternalClusterStatic(cluster_name, cluster_type, uri);
if (!queue_clusters_) {
auto status = context_.clusterManager().addOrUpdateCluster(cluster, "", true);
if (!status.ok()) {
ENVOY_LOG_MISC(debug, "Failed to add cluster {} to cluster manager: {}", cluster_name,
status.status().ToString());
managed_clusters_.erase(cluster_name);
return status.status();
}
}
}
return absl::OkStatus();
} else {
ENVOY_LOG_MISC(debug, "Cluster {} already exists, not readding", cluster_name);
return absl::AlreadyExistsError("Cluster already exists");
}
}

absl::StatusOr<std::string>
AwsClusterManager::getUriFromClusterName(absl::string_view cluster_name) {
auto it = managed_clusters_.find(cluster_name);
if (it == managed_clusters_.end()) {
return absl::InvalidArgumentError("Cluster not found");
}
return it->second->uri_;
}

} // namespace Aws
} // namespace Common
} // namespace Extensions
} // namespace Envoy
129 changes: 129 additions & 0 deletions source/extensions/common/aws/aws_cluster_manager.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
#pragma once

#include "envoy/common/optref.h"
#include "envoy/common/pure.h"
#include "envoy/config/cluster/v3/cluster.pb.h"
#include "envoy/config/core/v3/base.pb.h"
#include "envoy/singleton/manager.h"
#include "envoy/upstream/cluster_manager.h"

#include "source/common/common/cleanup.h"
#include "source/common/init/target_impl.h"
#include "source/extensions/common/aws/utility.h"

namespace Envoy {
namespace Extensions {
namespace Common {
namespace Aws {

class AwsManagedClusterUpdateCallbacks {
public:
virtual ~AwsManagedClusterUpdateCallbacks() = default;

virtual void onClusterAddOrUpdate() PURE;
};

class AwsManagedClusterUpdateCallbacksHandle
: public RaiiListElement<AwsManagedClusterUpdateCallbacks*> {
public:
AwsManagedClusterUpdateCallbacksHandle(AwsManagedClusterUpdateCallbacks& cb,
std::list<AwsManagedClusterUpdateCallbacks*>& parent)
: RaiiListElement<AwsManagedClusterUpdateCallbacks*>(parent, &cb) {}
};

using AwsManagedClusterUpdateCallbacksHandlePtr =
std::unique_ptr<AwsManagedClusterUpdateCallbacksHandle>;

/**
* Manages clusters for any number of credentials provider instances
*
* Credentials providers in async mode require clusters to be created so that they can use the async
* http client to retrieve credentials. The aws cluster manager is responsible for creating these
* clusters, and notifying a credential provider when a cluster comes on line so they can begin
* retrieving credentials.
*
* - For InstanceProfileCredentialsProvider, a cluster is created with the uri of the instance
* metadata service. Only one cluster is required for any number of instantiations of the aws
* request signing extension.
*
* - For ContainerCredentialsProvider (including ECS and EKS), a cluster is created with the uri of
* the container agent. Only one cluster is required for any number of instantiations of the aws
* request signing extension.
*
* - For WebIdentityCredentialsProvider, a cluster is required for the STS service in any region
* configured. There may be many WebIdentityCredentialsProvider instances instances configured, each
* with their own region, or their own role ARN or role session name. The aws cluster manager will
* maintain only a single cluster per region, and notify all relevant WebIdentityCredentialsProvider
* instances when their cluster is ready.
*
* - For IAMRolesAnywhere, this behaves similarly to WebIdentityCredentialsProvider, where there may
* be many instantiations of the credential provider for different roles, regions and profiles. The
* aws cluster manager will dedupe these clusters as required.
*/
class AwsClusterManager : public Envoy::Singleton::Instance,
public Upstream::ClusterUpdateCallbacks {
// Friend class for testing callbacks
friend class AwsClusterManagerFriend;

public:
AwsClusterManager(Server::Configuration::ServerFactoryContext& context);

/**
* Add a managed cluster to the aws cluster manager
* @return absl::Status based on whether the cluster could be added to the cluster manager
*/

absl::Status
addManagedCluster(absl::string_view cluster_name,
const envoy::config::cluster::v3::Cluster::DiscoveryType cluster_type,
absl::string_view uri);

/**
* Add a callback to be signaled when a managed cluster comes online. This is used to kick off
* credential refresh
* @return RAII handle for the callback
*/

absl::StatusOr<AwsManagedClusterUpdateCallbacksHandlePtr>
addManagedClusterUpdateCallbacks(absl::string_view cluster_name,
AwsManagedClusterUpdateCallbacks& cb);
absl::StatusOr<std::string> getUriFromClusterName(absl::string_view cluster_name);

private:
// Callbacks for cluster manager
void onClusterAddOrUpdate(absl::string_view, Upstream::ThreadLocalClusterCommand&) override;
void onClusterRemoval(const std::string&) override;

/**
* Create all queued clusters, if we were unable to create them in real time due to envoy cluster
* manager initialization
*/

void createQueuedClusters();
struct CredentialsProviderCluster {
CredentialsProviderCluster(envoy::config::cluster::v3::Cluster::DiscoveryType cluster_type,
std::string uri)
: uri_(uri), cluster_type_(cluster_type){};

std::string uri_;
envoy::config::cluster::v3::Cluster::DiscoveryType cluster_type_;
// Atomic flag for cluster recreate
std::atomic<bool> is_creating_ = false;
std::list<AwsManagedClusterUpdateCallbacks*> update_callbacks_;
};

absl::flat_hash_map<std::string, std::unique_ptr<CredentialsProviderCluster>> managed_clusters_;
std::atomic<bool> queue_clusters_ = true;
Server::Configuration::ServerFactoryContext& context_;
Upstream::ClusterUpdateCallbacksHandlePtr cm_handle_;
Server::ServerLifecycleNotifier::HandlePtr shutdown_handle_;
std::unique_ptr<Init::TargetImpl> init_target_;
};

using AwsClusterManagerPtr = std::shared_ptr<AwsClusterManager>;
using AwsClusterManagerOptRef = OptRef<AwsClusterManagerPtr>;

} // namespace Aws
} // namespace Common
} // namespace Extensions
} // namespace Envoy
11 changes: 11 additions & 0 deletions test/extensions/common/aws/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,17 @@ envoy_cc_test(
],
)

envoy_cc_test(
name = "aws_cluster_manager_test",
srcs = ["aws_cluster_manager_test.cc"],
rbe_pool = "6gig",
deps = [
"//source/extensions/common/aws:aws_cluster_manager_lib",
"//test/mocks/server:server_factory_context_mocks",
"@envoy_api//envoy/config/cluster/v3:pkg_cc_proto",
],
)

envoy_cc_test(
name = "sigv4_signer_corpus_test",
srcs = ["sigv4_signer_corpus_test.cc"],
Expand Down
Loading

0 comments on commit ca5c700

Please sign in to comment.