Skip to content

Commit

Permalink
Merge branch 'envoyproxy:main' into add_capsule_filter
Browse files Browse the repository at this point in the history
  • Loading branch information
ohadvano authored Sep 12, 2023
2 parents 60ae549 + f01b746 commit 8cf11c0
Show file tree
Hide file tree
Showing 45 changed files with 939 additions and 69 deletions.
4 changes: 4 additions & 0 deletions changelogs/current.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,10 @@ new_features:
change: |
added :ref:`record_headers_received_time <envoy_v3_api_field_extensions.filters.http.tap.v3.Tap.record_headers_received_time>`
to control writing request and response headers received time in trace output.
- area: tcp
change: |
added the support to detect and send TCP RST for raw buffer socket based connections. This is currently supported on Linux only.
It can be disabled by the runtime guard ``envoy_reloadable_features_detect_and_raise_rst_tcp_connection``.
- area: upstream
change: |
Expand Down
1 change: 0 additions & 1 deletion ci/run_envoy_docker.sh
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,5 @@ docker run --rm \
-e ENVOY_BUILD_ARCH \
-e SYSTEM_STAGEDISPLAYNAME \
-e SYSTEM_JOBDISPLAYNAME \
-e SYSTEM_PULLREQUEST_PULLREQUESTNUMBER \
"${ENVOY_BUILD_IMAGE}" \
"${START_COMMAND[@]}"
5 changes: 5 additions & 0 deletions contrib/golang/common/go/api/type.go
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,9 @@ const (
FlushWriteAndDelay ConnectionCloseType = 2
// Do not write/flush any pending data and immediately raise ConnectionEvent::LocalClose
Abort ConnectionCloseType = 3
// Do not write/flush any pending data and immediately raise
// ConnectionEvent::LocalClose. Envoy will try to close the connection with RST flag.
AbortReset ConnectionCloseType = 4
)

func (t ConnectionCloseType) String() string {
Expand All @@ -361,6 +364,8 @@ func (t ConnectionCloseType) String() string {
return "FlushWriteAndDelay"
case Abort:
return "Abort"
case AbortReset:
return "AbortReset"
}
return "unknown"
}
Expand Down
1 change: 1 addition & 0 deletions envoy/network/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@ envoy_cc_library(
":io_handle_interface",
":post_io_action_interface",
":proxy_protocol_options_lib",
"//envoy/api:io_error_interface",
"//envoy/buffer:buffer_interface",
"//envoy/network:listen_socket_interface",
"//envoy/ssl:connection_interface",
Expand Down
18 changes: 17 additions & 1 deletion envoy/network/connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,18 @@ enum class ConnectionCloseType {
// raise ConnectionEvent::LocalClose
FlushWriteAndDelay, // Flush pending write data and delay raising a ConnectionEvent::LocalClose
// until the delayed_close_timeout expires
Abort // Do not write/flush any pending data and immediately raise ConnectionEvent::LocalClose
Abort, // Do not write/flush any pending data and immediately raise ConnectionEvent::LocalClose
AbortReset // Do not write/flush any pending data and immediately raise
// ConnectionEvent::LocalClose. Envoy will try to close the connection with RST flag.
};

/**
* Type of connection close which is detected from the socket.
*/
enum class DetectedCloseType {
Normal, // The normal socket close from Envoy's connection perspective.
LocalReset, // The local reset initiated from Envoy.
RemoteReset, // The peer reset detected by the connection.
};

/**
Expand Down Expand Up @@ -153,6 +164,11 @@ class Connection : public Event::DeferredDeletable,
*/
virtual void close(ConnectionCloseType type, absl::string_view details) PURE;

/**
* @return the detected close type from socket.
*/
virtual DetectedCloseType detectedCloseType() const PURE;

/**
* @return Event::Dispatcher& the dispatcher backing this connection.
*/
Expand Down
15 changes: 15 additions & 0 deletions envoy/network/transport_socket.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

#include <vector>

#include "envoy/api/io_error.h"
#include "envoy/buffer/buffer.h"
#include "envoy/common/optref.h"
#include "envoy/common/pure.h"
Expand Down Expand Up @@ -37,6 +38,15 @@ enum class ConnectionEvent;
* Result of each I/O event.
*/
struct IoResult {
IoResult(PostIoAction action, uint64_t bytes_processed, bool end_stream_read)
: action_(action), bytes_processed_(bytes_processed), end_stream_read_(end_stream_read),
err_code_(absl::nullopt) {}

IoResult(PostIoAction action, uint64_t bytes_processed, bool end_stream_read,
absl::optional<Api::IoError::IoErrorCode> err_code)
: action_(action), bytes_processed_(bytes_processed), end_stream_read_(end_stream_read),
err_code_(err_code) {}

PostIoAction action_;

/**
Expand All @@ -49,6 +59,11 @@ struct IoResult {
* can only be true for read operations.
*/
bool end_stream_read_;

/**
* The underlying I/O error code.
*/
absl::optional<Api::IoError::IoErrorCode> err_code_;
};

/**
Expand Down
2 changes: 2 additions & 0 deletions envoy/stream_info/stream_info.h
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,8 @@ struct LocalCloseReasonValues {
"closing_upstream_tcp_connection_due_to_downstream_remote_close";
const std::string ClosingUpstreamTcpDueToDownstreamLocalClose =
"closing_upstream_tcp_connection_due_to_downstream_local_close";
const std::string ClosingUpstreamTcpDueToDownstreamResetClose =
"closing_upstream_tcp_connection_due_to_downstream_reset_close";
const std::string NonPooledTcpConnectionHostHealthFailure =
"non_pooled_tcp_connection_host_health_failure";
};
Expand Down
5 changes: 5 additions & 0 deletions envoy/tcp/async_tcp_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,11 @@ class AsyncTcpClient {
*/
virtual void close(Network::ConnectionCloseType type) PURE;

/**
* @return the detected close type from socket.
*/
virtual Network::DetectedCloseType detectedCloseType() const PURE;

/**
* Write data through the client.
* @param data the bufferred data.
Expand Down
5 changes: 3 additions & 2 deletions source/common/common/matchers.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,10 @@ ValueMatcherConstSharedPtr ValueMatcher::create(const envoy::type::matcher::v3::
return std::make_shared<const PresentMatcher>(v.present_match());
case envoy::type::matcher::v3::ValueMatcher::MatchPatternCase::kListMatch:
return std::make_shared<const ListMatcher>(v.list_match());
default:
throw EnvoyException("Uncaught default");
case envoy::type::matcher::v3::ValueMatcher::MatchPatternCase::MATCH_PATTERN_NOT_SET:
break; // Fall through to PANIC.
}
PANIC("unexpected");
}

bool NullMatcher::match(const ProtobufWkt::Value& value) const {
Expand Down
3 changes: 3 additions & 0 deletions source/common/network/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ envoy_cc_library(
"//envoy/event:timer_interface",
"//envoy/network:connection_interface",
"//envoy/network:filter_interface",
"//envoy/network:socket_interface",
"//envoy/server/overload:thread_local_overload_state",
"//source/common/buffer:buffer_lib",
"//source/common/buffer:watermark_buffer_lib",
Expand All @@ -117,6 +118,8 @@ envoy_cc_library(
"//source/common/common:minimal_logger_lib",
"//source/common/event:libevent_lib",
"//source/common/network:listen_socket_lib",
"//source/common/network:socket_option_factory_lib",
"//source/common/runtime:runtime_features_lib",
"//source/common/stream_info:stream_info_lib",
"@envoy_api//envoy/config/core/v3:pkg_cc_proto",
],
Expand Down
58 changes: 58 additions & 0 deletions source/common/network/connection_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@
#include "source/common/network/address_impl.h"
#include "source/common/network/listen_socket_impl.h"
#include "source/common/network/raw_buffer_socket.h"
#include "source/common/network/socket_option_factory.h"
#include "source/common/network/socket_option_impl.h"
#include "source/common/network/utility.h"
#include "source/common/runtime/runtime_features.h"

namespace Envoy {
namespace Network {
Expand Down Expand Up @@ -82,6 +85,10 @@ ConnectionImpl::ConnectionImpl(Event::Dispatcher& dispatcher, ConnectionSocketPt
write_end_stream_(false), current_write_end_stream_(false), dispatch_buffered_data_(false),
transport_wants_read_(false) {

// Keep it as a bool flag to reduce the times calling runtime method..
enable_rst_detect_send_ = Runtime::runtimeFeatureEnabled(
"envoy.reloadable_features.detect_and_raise_rst_tcp_connection");

if (!connected) {
connecting_ = true;
}
Expand Down Expand Up @@ -137,6 +144,23 @@ void ConnectionImpl::close(ConnectionCloseType type) {
uint64_t data_to_write = write_buffer_->length();
ENVOY_CONN_LOG_EVENT(debug, "connection_closing", "closing data_to_write={} type={}", *this,
data_to_write, enumToInt(type));

// RST will be sent only if enable_rst_detect_send_ is true, otherwise it is converted to normal
// ConnectionCloseType::Abort.
if (!enable_rst_detect_send_ && type == ConnectionCloseType::AbortReset) {
type = ConnectionCloseType::Abort;
}

// The connection is closed by Envoy by sending RST, and the connection is closed immediately.
if (type == ConnectionCloseType::AbortReset) {
ENVOY_CONN_LOG(
trace, "connection closing type=AbortReset, setting LocalReset to the detected close type.",
*this);
setDetectedCloseType(DetectedCloseType::LocalReset);
closeSocket(ConnectionEvent::LocalClose);
return;
}

const bool delayed_close_timeout_set = delayed_close_timeout_.count() > 0;
if (data_to_write == 0 || type == ConnectionCloseType::NoFlush ||
type == ConnectionCloseType::Abort || !transport_socket_->canFlushClose()) {
Expand Down Expand Up @@ -236,6 +260,10 @@ bool ConnectionImpl::filterChainWantsData() {
(read_disable_count_ == 1 && read_buffer_->highWatermarkTriggered());
}

void ConnectionImpl::setDetectedCloseType(DetectedCloseType close_type) {
detected_close_type_ = close_type;
}

void ConnectionImpl::closeSocket(ConnectionEvent close_type) {
if (!ConnectionImpl::ioHandle().isOpen()) {
return;
Expand All @@ -262,6 +290,17 @@ void ConnectionImpl::closeSocket(ConnectionEvent close_type) {

connection_stats_.reset();

if (enable_rst_detect_send_ && (detected_close_type_ == DetectedCloseType::RemoteReset ||
detected_close_type_ == DetectedCloseType::LocalReset)) {
const bool ok = Network::Socket::applyOptions(
Network::SocketOptionFactory::buildZeroSoLingerOptions(), *socket_,
envoy::config::core::v3::SocketOption::STATE_LISTENING);
if (!ok) {
ENVOY_LOG_EVERY_POW_2(error, "rst setting so_linger=0 failed on connection {}", id());
}
}

// It is safe to call close() since there is an IO handle check.
socket_->close();

// Call the base class directly as close() is called in the destructor.
Expand Down Expand Up @@ -642,6 +681,15 @@ void ConnectionImpl::onReadReady() {
uint64_t new_buffer_size = read_buffer_->length();
updateReadBufferStats(result.bytes_processed_, new_buffer_size);

// The socket is closed immediately when receiving RST.
if (enable_rst_detect_send_ && result.err_code_.has_value() &&
result.err_code_ == Api::IoError::IoErrorCode::ConnectionReset) {
ENVOY_CONN_LOG(trace, "read: rst close from peer", *this);
setDetectedCloseType(DetectedCloseType::RemoteReset);
closeSocket(ConnectionEvent::RemoteClose);
return;
}

// If this connection doesn't have half-close semantics, translate end_stream into
// a connection close.
if ((!enable_half_close_ && result.end_stream_read_)) {
Expand Down Expand Up @@ -714,6 +762,16 @@ void ConnectionImpl::onWriteReady() {
uint64_t new_buffer_size = write_buffer_->length();
updateWriteBufferStats(result.bytes_processed_, new_buffer_size);

// The socket is closed immediately when receiving RST.
if (enable_rst_detect_send_ && result.err_code_.has_value() &&
result.err_code_ == Api::IoError::IoErrorCode::ConnectionReset) {
// Discard anything in the buffer.
ENVOY_CONN_LOG(debug, "write: rst close from peer.", *this);
setDetectedCloseType(DetectedCloseType::RemoteReset);
closeSocket(ConnectionEvent::RemoteClose);
return;
}

// NOTE: If the delayed_close_timer_ is set, it must only trigger after a delayed_close_timeout_
// period of inactivity from the last write event. Therefore, the timer must be reset to its
// original timeout value unless the socket is going to be closed as a result of the doWrite().
Expand Down
6 changes: 6 additions & 0 deletions source/common/network/connection_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ class ConnectionImpl : public ConnectionImplBase, public TransportSocketCallback

// ScopeTrackedObject
void dumpState(std::ostream& os, int indent_level) const override;
DetectedCloseType detectedCloseType() const override { return detected_close_type_; }

protected:
// A convenience function which returns true if
Expand Down Expand Up @@ -213,6 +214,9 @@ class ConnectionImpl : public ConnectionImplBase, public TransportSocketCallback
// Returns true iff end of stream has been both written and read.
bool bothSidesHalfClosed();

// Set the detected close type for this connection.
void setDetectedCloseType(DetectedCloseType close_type);

static std::atomic<uint64_t> next_global_id_;

std::list<BytesSentCb> bytes_sent_callbacks_;
Expand All @@ -225,6 +229,7 @@ class ConnectionImpl : public ConnectionImplBase, public TransportSocketCallback
uint64_t last_write_buffer_size_{};
Buffer::Instance* current_write_buffer_{};
uint32_t read_disable_count_{0};
DetectedCloseType detected_close_type_{DetectedCloseType::Normal};
bool write_buffer_above_high_watermark_ : 1;
bool detect_early_close_ : 1;
bool enable_half_close_ : 1;
Expand All @@ -239,6 +244,7 @@ class ConnectionImpl : public ConnectionImplBase, public TransportSocketCallback
// read_disable_count_ == 0 to ensure that read resumption happens when remaining bytes are held
// in transport socket internal buffers.
bool transport_wants_read_ : 1;
bool enable_rst_detect_send_ : 1;
};

class ServerConnectionImpl : public ConnectionImpl, virtual public ServerConnection {
Expand Down
4 changes: 4 additions & 0 deletions source/common/network/multi_connection_base_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,10 @@ void MultiConnectionBaseImpl::close(ConnectionCloseType type, absl::string_view
connections_[0]->close(type, details);
}

DetectedCloseType MultiConnectionBaseImpl::detectedCloseType() const {
return connections_[0]->detectedCloseType();
};

Event::Dispatcher& MultiConnectionBaseImpl::dispatcher() const {
ASSERT(&dispatcher_ == &connections_[0]->dispatcher());
return connections_[0]->dispatcher();
Expand Down
1 change: 1 addition & 0 deletions source/common/network/multi_connection_base_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ class MultiConnectionBaseImpl : public ClientConnection,
Event::Dispatcher& dispatcher() const override;
void close(ConnectionCloseType type) override { close(type, ""); }
void close(ConnectionCloseType type, absl::string_view details) override;
DetectedCloseType detectedCloseType() const override;
bool readEnabled() const override;
bool aboveHighWatermark() const override;
void hashKey(std::vector<uint8_t>& hash_key) const override;
Expand Down
16 changes: 10 additions & 6 deletions source/common/network/raw_buffer_socket.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ IoResult RawBufferSocket::doRead(Buffer::Instance& buffer) {
PostIoAction action = PostIoAction::KeepOpen;
uint64_t bytes_read = 0;
bool end_stream = false;
absl::optional<Api::IoError::IoErrorCode> err = absl::nullopt;
do {
Api::IoCallUint64Result result = callbacks_->ioHandle().read(buffer, absl::nullopt);

Expand All @@ -34,21 +35,23 @@ IoResult RawBufferSocket::doRead(Buffer::Instance& buffer) {
}
} else {
// Remote error (might be no data).
ENVOY_CONN_LOG(trace, "read error: {}", callbacks_->connection(),
result.err_->getErrorDetails());
ENVOY_CONN_LOG(trace, "read error: {}, code: {}", callbacks_->connection(),
result.err_->getErrorDetails(), static_cast<int>(result.err_->getErrorCode()));
if (result.err_->getErrorCode() != Api::IoError::IoErrorCode::Again) {
action = PostIoAction::Close;
err = result.err_->getErrorCode();
}
break;
}
} while (true);

return {action, bytes_read, end_stream};
return {action, bytes_read, end_stream, err};
}

IoResult RawBufferSocket::doWrite(Buffer::Instance& buffer, bool end_stream) {
PostIoAction action;
uint64_t bytes_written = 0;
absl::optional<Api::IoError::IoErrorCode> err = absl::nullopt;
ASSERT(!shutdown_ || buffer.length() == 0);
do {
if (buffer.length() == 0) {
Expand All @@ -67,18 +70,19 @@ IoResult RawBufferSocket::doWrite(Buffer::Instance& buffer, bool end_stream) {
ENVOY_CONN_LOG(trace, "write returns: {}", callbacks_->connection(), result.return_value_);
bytes_written += result.return_value_;
} else {
ENVOY_CONN_LOG(trace, "write error: {}", callbacks_->connection(),
result.err_->getErrorDetails());
ENVOY_CONN_LOG(trace, "write error: {}, code: {}", callbacks_->connection(),
result.err_->getErrorDetails(), static_cast<int>(result.err_->getErrorCode()));
if (result.err_->getErrorCode() == Api::IoError::IoErrorCode::Again) {
action = PostIoAction::KeepOpen;
} else {
err = result.err_->getErrorCode();
action = PostIoAction::Close;
}
break;
}
} while (true);

return {action, bytes_written, false};
return {action, bytes_written, false, err};
}

std::string RawBufferSocket::protocol() const { return EMPTY_STRING; }
Expand Down
Loading

0 comments on commit 8cf11c0

Please sign in to comment.