Skip to content

Commit

Permalink
Config: Implement deferred clusters on worker. (envoyproxy#28702)
Browse files Browse the repository at this point in the history
* Implement deferred clusters on worker. We initialize certain cluster on
workers inline when there's traffic for that cluster.

Signed-off-by: Kevin Baichoo <[email protected]>
  • Loading branch information
KBaichoo authored Aug 8, 2023
1 parent cce97d2 commit 4aaf17d
Show file tree
Hide file tree
Showing 35 changed files with 1,656 additions and 168 deletions.
6 changes: 6 additions & 0 deletions api/envoy/config/bootstrap/v3/bootstrap.proto
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,7 @@ message Admin {
}

// Cluster manager :ref:`architecture overview <arch_overview_cluster_manager>`.
// [#next-free-field: 6]
message ClusterManager {
option (udpa.annotations.versioning).previous_message_type =
"envoy.config.bootstrap.v2.ClusterManager";
Expand Down Expand Up @@ -478,6 +479,11 @@ message ClusterManager {
// <envoy_v3_api_field_config.core.v3.ApiConfigSource.api_type>` :ref:`GRPC
// <envoy_v3_api_enum_value_config.core.v3.ApiConfigSource.ApiType.GRPC>`.
core.v3.ApiConfigSource load_stats_config = 4;

// Whether the ClusterManager will create clusters on the worker threads
// inline during requests. This will save memory and CPU cycles in cases where
// there are lots of inactive clusters and > 1 worker thread.
bool enable_deferred_cluster_creation = 5;
}

// Allows you to specify different watchdog configs for different subsystems.
Expand Down
8 changes: 8 additions & 0 deletions changelogs/current.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,14 @@ new_features:
change: |
added %RESPONSE_FLAGS_LONG% substitution string, that will output a pascal case string representing the resonse flags.
The output response flags will correspond with %RESPONSE_FLAGS%, only with a long textual string representation.
- area: config
change: |
Added the capability to defer broadcasting of certain cluster (CDS, EDS) to
worker threads from the main thread. This optimization can save significant
amount of memory in cases where there are (1) a large number of workers and
(2) a large amount of config, most of which is unused. This capability is
guarded by :ref:`enable_deferred_cluster_creation
<envoy_v3_api_field_config.bootstrap.v3.ClusterManager.enable_deferred_cluster_creation>`.
- area: extension_discovery_service
change: |
added ECDS support for :ref:` downstream network filters<envoy_v3_api_field_config.listener.v3.Filter.config_discovery>`.
Expand Down
11 changes: 11 additions & 0 deletions docs/root/configuration/upstream/cluster_manager/cluster_stats.rst
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,17 @@ upstreams and control plane xDS clusters.
active_clusters, Gauge, Number of currently active (warmed) clusters
warming_clusters, Gauge, Number of currently warming (not active) clusters


In addition to the cluster manager stats, there are per worker thread local
cluster manager statistics tree rooted at
*thread_local_cluster_manager.<worker_id>.* with the following statistics.

.. csv-table::
:header: Name, Type, Description
:widths: 1, 1, 2

clusters_inflated, Gauge, Number of clusters the worker has initialized. If using cluster deferral this number should be <= (cluster_added - clusters_removed).

.. _config_cluster_stats:

Every cluster has a statistics tree rooted at *cluster.<name>.* with the following statistics:
Expand Down
10 changes: 6 additions & 4 deletions envoy/upstream/cluster_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,18 +43,20 @@ namespace Upstream {
* ClusterUpdateCallbacks provide a way to expose Cluster lifecycle events in the
* ClusterManager.
*/
using ThreadLocalClusterCommand = std::function<ThreadLocalCluster&()>;
class ClusterUpdateCallbacks {
public:
virtual ~ClusterUpdateCallbacks() = default;

/**
* onClusterAddOrUpdate is called when a new cluster is added or an existing cluster
* is updated in the ClusterManager.
* @param cluster is the ThreadLocalCluster that represents the updated
* cluster.
* @param cluster_name the name of the changed cluster.
* @param get_cluster is a callable that will provide the ThreadLocalCluster that represents the
* updated cluster. It should be used within the call or discarded.
*/
virtual void onClusterAddOrUpdate(ThreadLocalCluster& cluster) PURE;

virtual void onClusterAddOrUpdate(absl::string_view cluster_name,
ThreadLocalClusterCommand& get_cluster) PURE;
/**
* onClusterRemoval is called when a cluster is removed; the argument is the cluster name.
* @param cluster_name is the name of the removed cluster.
Expand Down
12 changes: 7 additions & 5 deletions source/common/upstream/cluster_discovery_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,15 @@ namespace Upstream {

namespace {

using ClusterAddedCb = std::function<void(ThreadLocalCluster&)>;
using ClusterAddedCb = std::function<void(absl::string_view)>;

class ClusterCallbacks : public ClusterUpdateCallbacks {
public:
ClusterCallbacks(ClusterAddedCb cb) : cb_(std::move(cb)) {}

void onClusterAddOrUpdate(ThreadLocalCluster& cluster) override { cb_(cluster); };
void onClusterAddOrUpdate(absl::string_view cluster_name, ThreadLocalClusterCommand&) override {
cb_(cluster_name);
};

void onClusterRemoval(const std::string&) override {}

Expand All @@ -28,12 +30,12 @@ class ClusterCallbacks : public ClusterUpdateCallbacks {
ClusterDiscoveryManager::ClusterDiscoveryManager(
std::string thread_name, ClusterLifecycleCallbackHandler& lifecycle_callbacks_handler)
: thread_name_(std::move(thread_name)) {
callbacks_ = std::make_unique<ClusterCallbacks>([this](ThreadLocalCluster& cluster) {
callbacks_ = std::make_unique<ClusterCallbacks>([this](absl::string_view cluster_name) {
ENVOY_LOG(trace,
"cm cdm: starting processing cluster name {} (status {}) from cluster lifecycle "
"callback in {}",
cluster.info()->name(), enumToInt(ClusterDiscoveryStatus::Available), thread_name_);
processClusterName(cluster.info()->name(), ClusterDiscoveryStatus::Available);
cluster_name, enumToInt(ClusterDiscoveryStatus::Available), thread_name_);
processClusterName(cluster_name, ClusterDiscoveryStatus::Available);
});
callbacks_handle_ = lifecycle_callbacks_handler.addClusterUpdateCallbacks(*callbacks_);
}
Expand Down
Loading

0 comments on commit 4aaf17d

Please sign in to comment.