Skip to content

Commit

Permalink
Add observability for pending connections
Browse files Browse the repository at this point in the history
Summary:
# Issue
Thrift currently does not have any observability into the number of pending connections which are waiting for acceptance by the IO Worker threads.

This observability is especially important during connection storms, which may possibly overload the service with the large number of incoming connections causing OOMs, which was the case for the uCache service (S399591, S400702)

# Change
Introduce observability into pending connections that are enqueued for the IO workers for processing of heavyweight TLS Handshake and other connection acceptance related operations.

**Counter prefix:** `thrift.pending_connections`

**Counter types:**
- P50 - median,  helps to understand the "average" pending connections number under normal conditions.
- P99 - captures rare but impactful high-congestion events.
- P100 - reports maximum number of pending connections observed.

Example values:
 {F1936343509}

Reviewed By: robertroeser

Differential Revision: D64355577

fbshipit-source-id: 935281b926add87adae4d045befd81b2f5286ffc
  • Loading branch information
kaczmarekmhl authored and facebook-github-bot committed Oct 18, 2024
1 parent 909e690 commit 1953621
Show file tree
Hide file tree
Showing 5 changed files with 133 additions and 2 deletions.
2 changes: 2 additions & 0 deletions thrift/lib/cpp/server/TServerObserver.h
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,8 @@ class TServerObserver {
virtual void resourcePoolsInitialized(
const std::vector<std::string>& /*resourcePoolsDescriptions*/) {}

virtual void pendingConnections(int32_t /*numPendingConnections*/) {}

// The observer has to specify a sample rate for callCompleted notifications
inline uint32_t getSampleRate() const { return sampleRate_; }

Expand Down
1 change: 1 addition & 0 deletions thrift/lib/cpp2/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,7 @@ add_library(
server/ThriftServerConfig.cpp
server/TMConcurrencyController.cpp
server/peeking/TLSHelper.cpp
server/metrics/PendingConnectionsMetrics.cpp
transport/core/RequestStateMachine.cpp
transport/core/RpcMetadataUtil.cpp
transport/core/RpcMetadataPlugins.cpp
Expand Down
23 changes: 21 additions & 2 deletions thrift/lib/cpp2/server/ThriftServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
#include <thrift/lib/cpp2/server/StandardConcurrencyController.h>
#include <thrift/lib/cpp2/server/TMConcurrencyController.h>
#include <thrift/lib/cpp2/server/ThriftProcessor.h>
#include <thrift/lib/cpp2/server/metrics/PendingConnectionsMetrics.h>
#include <thrift/lib/cpp2/transport/core/ManagedConnectionIf.h>
#include <thrift/lib/cpp2/transport/rocket/server/RocketRoutingHandler.h>
#include <thrift/lib/cpp2/transport/rocket/server/RocketServerConnection.h>
Expand Down Expand Up @@ -513,8 +514,9 @@ void ThriftServer::configureIOUring() {
class ThriftServer::ConnectionEventCallback
: public folly::AsyncServerSocket::ConnectionEventCallback {
public:
ConnectionEventCallback(const ThriftServer& thriftServer)
: thriftServer_(thriftServer) {}
explicit ConnectionEventCallback(const ThriftServer& thriftServer)
: thriftServer_(thriftServer),
pendingConnectionsMetrics_(setupPendingConnectionsMetrics()) {}

void onConnectionAccepted(
const folly::NetworkSocket,
Expand All @@ -541,20 +543,29 @@ class ThriftServer::ConnectionEventCallback
metadata["error_msg"] = errorMsg;
return metadata;
});
if (pendingConnectionsMetrics_) {
pendingConnectionsMetrics_->onConnectionDropped(errorMsg);
}
}

void onConnectionEnqueuedForAcceptorCallback(
const folly::NetworkSocket,
const folly::SocketAddress& clientAddr) noexcept override {
THRIFT_CONNECTION_EVENT(connection_enqueued_acceptor)
.log(thriftServer_, clientAddr);
if (pendingConnectionsMetrics_) {
pendingConnectionsMetrics_->onConnectionEnqueuedToIoWorker();
}
}

void onConnectionDequeuedByAcceptorCallback(
const folly::NetworkSocket,
const folly::SocketAddress& clientAddr) noexcept override {
THRIFT_CONNECTION_EVENT(connection_dequeued_acceptor)
.log(thriftServer_, clientAddr);
if (pendingConnectionsMetrics_) {
pendingConnectionsMetrics_->onConnectionDequedByIoWorker();
}
}

void onBackoffStarted() noexcept override {}
Expand All @@ -565,6 +576,14 @@ class ThriftServer::ConnectionEventCallback

private:
const ThriftServer& thriftServer_;
std::shared_ptr<PendingConnectionsMetrics> pendingConnectionsMetrics_;

std::shared_ptr<PendingConnectionsMetrics> setupPendingConnectionsMetrics() {
return thriftServer_.getObserver()
? std::make_shared<PendingConnectionsMetrics>(
thriftServer_.getObserverShared())
: nullptr;
}
};

void ThriftServer::setup() {
Expand Down
67 changes: 67 additions & 0 deletions thrift/lib/cpp2/server/metrics/PendingConnectionsMetrics.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Copyright (c) Meta Platforms, Inc. and affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include <thrift/lib/cpp2/server/metrics/PendingConnectionsMetrics.h>

THRIFT_FLAG_DEFINE_bool(enable_pending_connections_metrics, true);

namespace {
bool isFeatureDisabled() {
return !THRIFT_FLAG(enable_pending_connections_metrics);
}
} // namespace

namespace apache::thrift {
PendingConnectionsMetrics::PendingConnectionsMetrics(
std::shared_ptr<server::TServerObserver> observer)
: observer_(std::move(observer)) {}

void PendingConnectionsMetrics::onConnectionEnqueuedToIoWorker() {
if (isFeatureDisabled()) {
return;
}
pendingConnections_++;
onPendingConnectionsChange();
}

void PendingConnectionsMetrics::onConnectionDropped(
const std::string& errorMsg) {
if (isFeatureDisabled()) {
return;
}
// When connection expires in the queue, before it is accepted by an IO
// Worker, AsyncServerSocker reports the connection as dropped. The queue
// counter is decremented in this case for accurate queue size tracking.
if (errorMsg.find("Exceeded deadline for accepting connection") !=
std::string::npos) {
pendingConnections_--;
onPendingConnectionsChange();
}
}

void PendingConnectionsMetrics::onConnectionDequedByIoWorker() {
if (isFeatureDisabled()) {
return;
}
pendingConnections_--;
onPendingConnectionsChange();
}

void PendingConnectionsMetrics::onPendingConnectionsChange() {
observer_->pendingConnections(pendingConnections_.load());
}

} // namespace apache::thrift
42 changes: 42 additions & 0 deletions thrift/lib/cpp2/server/metrics/PendingConnectionsMetrics.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright (c) Meta Platforms, Inc. and affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#pragma once

#include <thrift/lib/cpp/server/TServerObserver.h>
#include <thrift/lib/cpp2/Flags.h>

namespace apache::thrift {
// This class is responsible for recording metrics associated with inflight
// connections. An inflight connection is a connection that has been accepted by
// an Acceptor thread, but has not yet been accepted by an IO Worker thread for
// further processing of heavier weight operations like TLS Handshake.
class PendingConnectionsMetrics {
public:
explicit PendingConnectionsMetrics(
std::shared_ptr<server::TServerObserver> observer);

void onConnectionEnqueuedToIoWorker();
void onConnectionDequedByIoWorker();
void onConnectionDropped(const std::string& errorMsg);

private:
std::shared_ptr<server::TServerObserver> observer_;
folly::relaxed_atomic<int32_t> pendingConnections_{0};

void onPendingConnectionsChange();
};
} // namespace apache::thrift

0 comments on commit 1953621

Please sign in to comment.