Skip to content

Commit

Permalink
listener: removing some exceptions (envoyproxy#35695)
Browse files Browse the repository at this point in the history
Risk Level: low
Testing: updated tests
Docs Changes: n/a
Release Notes: n/a
envoyproxy/envoy-mobile#176

Signed-off-by: Alyssa Wilk <[email protected]>
  • Loading branch information
alyssawilk authored Sep 12, 2024
1 parent 15d290c commit 26861ac
Show file tree
Hide file tree
Showing 16 changed files with 143 additions and 118 deletions.
3 changes: 2 additions & 1 deletion envoy/network/listener.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,9 @@ class ListenSocketFactory {
* workers. For example, the actual listen() call, post listen socket options, etc. This is done
* so that all error handling can occur on the main thread and the gap between performing these
* actions and using the socket is minimized.
* @return a status indicating if an error occurred.
*/
virtual void doFinalPreWorkerInit() PURE;
virtual absl::Status doFinalPreWorkerInit() PURE;
};

/**
Expand Down
4 changes: 3 additions & 1 deletion envoy/server/listener_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -226,8 +226,10 @@ class ListenerManager {
* Start all workers accepting new connections on all added listeners.
* @param guard_dog supplies the optional guard dog to use for thread watching.
* @param callback supplies the callback to complete server initialization.
* @return a status indicating if the operation succeeded.
*/
virtual void startWorkers(OptRef<GuardDog> guard_dog, std::function<void()> callback) PURE;
virtual absl::Status startWorkers(OptRef<GuardDog> guard_dog,
std::function<void()> callback) PURE;

/**
* Stop all listeners from accepting new connections without actually removing any of them. This
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,10 @@ class ApiListenerManagerImpl : public ListenerManager, Logger::Loggable<Logger::
}
uint64_t numConnections() const override { return 0; }
bool removeListener(const std::string&) override { return true; }
void startWorkers(OptRef<GuardDog>, std::function<void()> callback) override { callback(); }
absl::Status startWorkers(OptRef<GuardDog>, std::function<void()> callback) override {
callback();
return absl::OkStatus();
}
void stopListeners(StopListenersType, const Network::ExtraShutdownListenerOptions&) override {}
void stopWorkers() override {}
void beginListenerUpdate() override {}
Expand Down
80 changes: 43 additions & 37 deletions source/common/listener_manager/listener_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -187,33 +187,30 @@ Network::SocketSharedPtr ListenSocketFactoryImpl::getListenSocket(uint32_t worke
return sockets_[worker_index];
}

void ListenSocketFactoryImpl::doFinalPreWorkerInit() {
absl::Status ListenSocketFactoryImpl::doFinalPreWorkerInit() {
if (bind_type_ == ListenerComponentFactory::BindType::NoBind ||
socket_type_ != Network::Socket::Type::Stream) {
return;
return absl::OkStatus();
}

ASSERT(!sockets_.empty());
auto listen_and_apply_options = [](Envoy::Network::SocketSharedPtr socket, int tcp_backlog_size) {
const auto rc = socket->ioHandle().listen(tcp_backlog_size);
if (rc.return_value_ != 0) {
throwEnvoyExceptionOrPanic(fmt::format("cannot listen() errno={}", rc.errno_));
return absl::InvalidArgumentError(fmt::format("cannot listen() errno={}", rc.errno_));
}
if (!Network::Socket::applyOptions(socket->options(), *socket,
envoy::config::core::v3::SocketOption::STATE_LISTENING)) {
std::string message =
fmt::format("cannot set post-listen socket option on socket: {}",
socket->connectionInfoProvider().localAddress()->asString());
#ifdef ENVOY_DISABLE_EXCEPTIONS
PANIC(message);
#else
throw Network::SocketOptionException(message);
#endif
return absl::InvalidArgumentError(message);
}
return absl::OkStatus();
};
// On all platforms we should listen on the first socket.
auto iterator = sockets_.begin();
listen_and_apply_options(*iterator, tcp_backlog_size_);
RETURN_IF_NOT_OK(listen_and_apply_options(*iterator, tcp_backlog_size_));
++iterator;
#ifndef WIN32
// With this implementation on Windows we only accept
Expand All @@ -222,9 +219,10 @@ void ListenSocketFactoryImpl::doFinalPreWorkerInit() {
// TODO(davinci26): We should update the behavior when socket duplication
// does not cause accepts to hang in the OS.
for (; iterator != sockets_.end(); ++iterator) {
listen_and_apply_options(*iterator, tcp_backlog_size_);
RETURN_IF_NOT_OK(listen_and_apply_options(*iterator, tcp_backlog_size_));
}
#endif
return absl::OkStatus();
}

namespace {
Expand Down Expand Up @@ -326,7 +324,7 @@ ListenerImpl::ListenerImpl(const envoy::config::listener::v3::Listener& config,
auto address_or_error = Network::Address::resolveProtoAddress(config.address());
THROW_IF_NOT_OK_REF(address_or_error.status());
auto address = std::move(address_or_error.value());
checkIpv4CompatAddress(address, config.address());
THROW_IF_NOT_OK(checkIpv4CompatAddress(address, config.address()));
addresses_.emplace_back(address);
address_opts_list.emplace_back(std::ref(config.socket_options()));

Expand All @@ -342,7 +340,7 @@ ListenerImpl::ListenerImpl(const envoy::config::listener::v3::Listener& config,
Network::Address::resolveProtoAddress(config.additional_addresses(i).address());
THROW_IF_NOT_OK_REF(addresses_or_error.status());
auto additional_address = std::move(addresses_or_error.value());
checkIpv4CompatAddress(address, config.additional_addresses(i).address());
THROW_IF_NOT_OK(checkIpv4CompatAddress(address, config.additional_addresses(i).address()));
addresses_.emplace_back(additional_address);
if (config.additional_addresses(i).has_socket_options()) {
address_opts_list.emplace_back(
Expand All @@ -367,11 +365,11 @@ ListenerImpl::ListenerImpl(const envoy::config::listener::v3::Listener& config,
addresses_, listener_factory_context_->parentFactoryContext(), initManager()),

buildAccessLog(config);
validateConfig();
THROW_IF_NOT_OK(validateConfig());

// buildUdpListenerFactory() must come before buildListenSocketOptions() because the UDP
// listener factory can provide additional options.
buildUdpListenerFactory(config, parent_.server_.options().concurrency());
THROW_IF_NOT_OK(buildUdpListenerFactory(config, parent_.server_.options().concurrency()));
buildListenSocketOptions(config, address_opts_list);
createListenerFilterFactories(config);
validateFilterChains(config);
Expand All @@ -380,7 +378,7 @@ ListenerImpl::ListenerImpl(const envoy::config::listener::v3::Listener& config,
buildSocketOptions(config);
buildOriginalDstListenerFilter(config);
buildProxyProtocolListenerFilter(config);
buildInternalListener(config);
THROW_IF_NOT_OK(buildInternalListener(config));
}
if (!workers_started_) {
// Initialize dynamic_init_manager_ from Server's init manager if it's not initialized.
Expand Down Expand Up @@ -443,11 +441,11 @@ ListenerImpl::ListenerImpl(ListenerImpl& origin,
missing_listener_config_stats_({ALL_MISSING_LISTENER_CONFIG_STATS(
POOL_COUNTER(listener_factory_context_->listenerScope()))}) {
buildAccessLog(config);
validateConfig();
THROW_IF_NOT_OK(validateConfig());
createListenerFilterFactories(config);
validateFilterChains(config);
buildFilterChains(config);
buildInternalListener(config);
THROW_IF_NOT_OK(buildInternalListener(config));
if (socket_type_ == Network::Socket::Type::Stream) {
// Apply the options below only for TCP.
buildSocketOptions(config);
Expand All @@ -457,37 +455,40 @@ ListenerImpl::ListenerImpl(ListenerImpl& origin,
}
}

void ListenerImpl::checkIpv4CompatAddress(const Network::Address::InstanceConstSharedPtr& address,
const envoy::config::core::v3::Address& proto_address) {
absl::Status
ListenerImpl::checkIpv4CompatAddress(const Network::Address::InstanceConstSharedPtr& address,
const envoy::config::core::v3::Address& proto_address) {
if ((address->type() == Network::Address::Type::Ip &&
proto_address.socket_address().ipv4_compat()) &&
(address->ip()->version() != Network::Address::IpVersion::v6 ||
(!address->ip()->isAnyAddress() &&
address->ip()->ipv6()->v4CompatibleAddress() == nullptr))) {
throwEnvoyExceptionOrPanic(fmt::format(
return absl::InvalidArgumentError(fmt::format(
"Only IPv6 address '::' or valid IPv4-mapped IPv6 address can set ipv4_compat: {}",
address->asStringView()));
}
return absl::OkStatus();
}

void ListenerImpl::validateConfig() {
absl::Status ListenerImpl::validateConfig() {
if (mptcp_enabled_) {
if (socket_type_ != Network::Socket::Type::Stream) {
throwEnvoyExceptionOrPanic(
return absl::InvalidArgumentError(
fmt::format("listener {}: enable_mptcp can only be used with TCP listeners", name_));
}
for (auto& address : addresses_) {
if (address->type() != Network::Address::Type::Ip) {
throwEnvoyExceptionOrPanic(
return absl::InvalidArgumentError(
fmt::format("listener {}: enable_mptcp can only be used with IP addresses", name_));
}
}
if (!Api::OsSysCallsSingleton::get().supportsMptcp()) {
throwEnvoyExceptionOrPanic(fmt::format(
return absl::InvalidArgumentError(fmt::format(
"listener {}: enable_mptcp is set but MPTCP is not supported by the operating system",
name_));
}
}
return absl::OkStatus();
}

void ListenerImpl::buildAccessLog(const envoy::config::listener::v3::Listener& config) {
Expand All @@ -498,10 +499,11 @@ void ListenerImpl::buildAccessLog(const envoy::config::listener::v3::Listener& c
}
}

void ListenerImpl::buildInternalListener(const envoy::config::listener::v3::Listener& config) {
absl::Status
ListenerImpl::buildInternalListener(const envoy::config::listener::v3::Listener& config) {
if (config.has_internal_listener()) {
if (config.has_address() || !config.additional_addresses().empty()) {
throwEnvoyExceptionOrPanic(
return absl::InvalidArgumentError(
fmt::format("error adding listener '{}': address should not be used "
"when an internal listener config is provided",
name_));
Expand All @@ -513,11 +515,11 @@ void ListenerImpl::buildInternalListener(const envoy::config::listener::v3::List
|| (config.has_freebind() && config.freebind().value()) || config.has_tcp_backlog_size() ||
config.has_tcp_fast_open_queue_length() ||
(config.has_transparent() && config.transparent().value())) {
throwEnvoyExceptionOrPanic(fmt::format(
return absl::InvalidArgumentError(fmt::format(
"error adding listener named '{}': has unsupported tcp listener feature", name_));
}
if (!config.socket_options().empty()) {
throwEnvoyExceptionOrPanic(
return absl::InvalidArgumentError(
fmt::format("error adding listener named '{}': does not support socket option", name_));
}
std::shared_ptr<Network::InternalListenerRegistry> internal_listener_registry =
Expand All @@ -534,7 +536,7 @@ void ListenerImpl::buildInternalListener(const envoy::config::listener::v3::List
"type.googleapis.com/"
"envoy.extensions.bootstrap.internal_listener.v3.InternalListener";
})) {
throwEnvoyExceptionOrPanic(fmt::format(
return absl::InvalidArgumentError(fmt::format(
"error adding listener named '{}': InternalListener bootstrap extension is mandatory",
name_));
}
Expand All @@ -548,11 +550,12 @@ void ListenerImpl::buildInternalListener(const envoy::config::listener::v3::List
[](const envoy::config::listener::v3::AdditionalAddress& proto_address) {
return proto_address.address().has_envoy_internal_address();
})) {
throwEnvoyExceptionOrPanic(
return absl::InvalidArgumentError(
fmt::format("error adding listener named '{}': use internal_listener "
"field instead of address for internal listeners",
name_));
}
return absl::OkStatus();
}

bool ListenerImpl::buildUdpListenerWorkerRouter(const Network::Address::Instance& address,
Expand All @@ -569,13 +572,14 @@ bool ListenerImpl::buildUdpListenerWorkerRouter(const Network::Address::Instance
return true;
}

void ListenerImpl::buildUdpListenerFactory(const envoy::config::listener::v3::Listener& config,
uint32_t concurrency) {
absl::Status
ListenerImpl::buildUdpListenerFactory(const envoy::config::listener::v3::Listener& config,
uint32_t concurrency) {
if (socket_type_ != Network::Socket::Type::Datagram) {
return;
return absl::OkStatus();
}
if (!reuse_port_ && concurrency > 1) {
throwEnvoyExceptionOrPanic(
return absl::InvalidArgumentError(
"Listening on UDP when concurrency is > 1 without the SO_REUSEPORT "
"socket option results in "
"unstable packet proxying. Configure the reuse_port listener option or "
Expand All @@ -593,8 +597,9 @@ void ListenerImpl::buildUdpListenerFactory(const envoy::config::listener::v3::Li
if (config.udp_listener_config().has_quic_options()) {
#ifdef ENVOY_ENABLE_QUIC
if (config.has_connection_balance_config()) {
throwEnvoyExceptionOrPanic("connection_balance_config is configured for QUIC listener which "
"doesn't work with connection balancer.");
return absl::InvalidArgumentError(
"connection_balance_config is configured for QUIC listener which "
"doesn't work with connection balancer.");
}
udp_listener_config_->listener_factory_ = std::make_unique<Quic::ActiveQuicListenerFactory>(
config.udp_listener_config().quic_options(), concurrency, quic_stat_names_,
Expand All @@ -611,7 +616,7 @@ void ListenerImpl::buildUdpListenerFactory(const envoy::config::listener::v3::Li
}
#endif
#else
throwEnvoyExceptionOrPanic("QUIC is configured but not enabled in the build.");
return absl::InvalidArgumentError("QUIC is configured but not enabled in the build.");
#endif
} else {
udp_listener_config_->listener_factory_ =
Expand All @@ -620,6 +625,7 @@ void ListenerImpl::buildUdpListenerFactory(const envoy::config::listener::v3::Li
if (udp_listener_config_->writer_factory_ == nullptr) {
udp_listener_config_->writer_factory_ = std::make_unique<Network::UdpDefaultWriterFactory>();
}
return absl::OkStatus();
}

void ListenerImpl::buildListenSocketOptions(
Expand Down
14 changes: 7 additions & 7 deletions source/common/listener_manager/listener_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ class ListenSocketFactoryImpl : public Network::ListenSocketFactory,
socket->close();
}
}
void doFinalPreWorkerInit() override;
absl::Status doFinalPreWorkerInit() override;

private:
ListenSocketFactoryImpl(const ListenSocketFactoryImpl& factory_to_clone);
Expand Down Expand Up @@ -387,12 +387,12 @@ class ListenerImpl final : public Network::ListenerConfig,
const std::string& name, bool added_via_api, bool workers_started, uint64_t hash);
// Helpers for constructor.
void buildAccessLog(const envoy::config::listener::v3::Listener& config);
void buildInternalListener(const envoy::config::listener::v3::Listener& config);
void validateConfig();
absl::Status buildInternalListener(const envoy::config::listener::v3::Listener& config);
absl::Status validateConfig();
bool buildUdpListenerWorkerRouter(const Network::Address::Instance& address,
uint32_t concurrency);
void buildUdpListenerFactory(const envoy::config::listener::v3::Listener& config,
uint32_t concurrency);
absl::Status buildUdpListenerFactory(const envoy::config::listener::v3::Listener& config,
uint32_t concurrency);
void buildListenSocketOptions(const envoy::config::listener::v3::Listener& config,
std::vector<std::reference_wrapper<const Protobuf::RepeatedPtrField<
envoy::config::core::v3::SocketOption>>>& address_opts_list);
Expand All @@ -404,8 +404,8 @@ class ListenerImpl final : public Network::ListenerConfig,
void buildSocketOptions(const envoy::config::listener::v3::Listener& config);
void buildOriginalDstListenerFilter(const envoy::config::listener::v3::Listener& config);
void buildProxyProtocolListenerFilter(const envoy::config::listener::v3::Listener& config);
void checkIpv4CompatAddress(const Network::Address::InstanceConstSharedPtr& address,
const envoy::config::core::v3::Address& proto_address);
absl::Status checkIpv4CompatAddress(const Network::Address::InstanceConstSharedPtr& address,
const envoy::config::core::v3::Address& proto_address);

void addListenSocketOptions(Network::Socket::OptionsSharedPtr& options,
const Network::Socket::OptionsSharedPtr& append_options) {
Expand Down
16 changes: 13 additions & 3 deletions source/common/listener_manager/listener_manager_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -735,7 +735,12 @@ ListenerManagerImpl::listeners(ListenerState state) {
bool ListenerManagerImpl::doFinalPreWorkerListenerInit(ListenerImpl& listener) {
TRY_ASSERT_MAIN_THREAD {
for (auto& socket_factory : listener.listenSocketFactories()) {
socket_factory->doFinalPreWorkerInit();
absl::Status success = (socket_factory->doFinalPreWorkerInit());
if (!success.ok()) {
ENVOY_LOG(error, "final pre-worker listener init for listener '{}' failed: {}",
listener.name(), success.message());
return false;
}
}
return true;
}
Expand Down Expand Up @@ -930,7 +935,8 @@ bool ListenerManagerImpl::removeListenerInternal(const std::string& name,
return true;
}

void ListenerManagerImpl::startWorkers(OptRef<GuardDog> guard_dog, std::function<void()> callback) {
absl::Status ListenerManagerImpl::startWorkers(OptRef<GuardDog> guard_dog,
std::function<void()> callback) {
ENVOY_LOG(info, "all dependencies initialized. starting workers");
ASSERT(!workers_started_);
workers_started_ = true;
Expand All @@ -952,7 +958,10 @@ void ListenerManagerImpl::startWorkers(OptRef<GuardDog> guard_dog, std::function
auto& listener = *listener_it;
listener_it++;

if (!doFinalPreWorkerListenerInit(*listener)) {
absl::StatusOr<bool> init_status = doFinalPreWorkerListenerInit(*listener);
RETURN_IF_NOT_OK_REF(init_status.status());

if (!*init_status) {
incListenerCreateFailureStat();
removeListenerInternal(listener->name(), false);
continue;
Expand Down Expand Up @@ -983,6 +992,7 @@ void ListenerManagerImpl::startWorkers(OptRef<GuardDog> guard_dog, std::function
stats_.workers_started_.set(1);
callback();
}
return absl::OkStatus();
}

void ListenerManagerImpl::stopListener(Network::ListenerConfig& listener,
Expand Down
2 changes: 1 addition & 1 deletion source/common/listener_manager/listener_manager_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ class ListenerManagerImpl : public ListenerManager, Logger::Loggable<Logger::Id:
listeners(ListenerState state = ListenerState::ACTIVE) override;
uint64_t numConnections() const override;
bool removeListener(const std::string& listener_name) override;
void startWorkers(OptRef<GuardDog> guard_dog, std::function<void()> callback) override;
absl::Status startWorkers(OptRef<GuardDog> guard_dog, std::function<void()> callback) override;
void stopListeners(StopListenersType stop_listeners_type,
const Network::ExtraShutdownListenerOptions& options) override;
void stopWorkers() override;
Expand Down
2 changes: 1 addition & 1 deletion source/server/admin/admin.h
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,7 @@ class AdminImpl : public Admin,
}
Network::ListenSocketFactoryPtr clone() const override { return nullptr; }
void closeAllSockets() override {}
void doFinalPreWorkerInit() override {}
absl::Status doFinalPreWorkerInit() override { return absl::OkStatus(); }

private:
Network::SocketSharedPtr socket_;
Expand Down
Loading

0 comments on commit 26861ac

Please sign in to comment.