Skip to content

Commit

Permalink
Services: Improved behavior when the first route to the server doesn'…
Browse files Browse the repository at this point in the history
…t work (#1343)

Before, the Service Client would only try the very first route to the Service Server. Now, it will go through all routes until it finds one that works.
  • Loading branch information
FlorianReimold authored Feb 5, 2024
1 parent 8b6a5ed commit 3f103b0
Show file tree
Hide file tree
Showing 4 changed files with 141 additions and 139 deletions.
136 changes: 64 additions & 72 deletions ecal/service/ecal_service/src/client_session_impl_v0.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -109,88 +109,80 @@ namespace eCAL

void ClientSessionV0::connect_to_endpoint(const asio::ip::tcp::resolver::iterator& resolved_endpoints)
{
// Look for the best endpoint to connect to. If possible, we use a loopback
// endpoint. Otherwise, we just use the first one.


auto endpoint_to_connect_to = resolved_endpoints->endpoint(); // Default to first endpoint
for (auto it = resolved_endpoints; it != asio::ip::tcp::resolver::iterator(); it++)
// Convert the resolved_endpoints iterator to an endpoint sequence
// (i.e. a vector of endpoints)
auto endpoint_sequence = std::make_shared<std::vector<asio::ip::tcp::endpoint>>();
for (auto it = resolved_endpoints; it != asio::ip::tcp::resolver::iterator(); ++it)
{
if (it->endpoint().address().is_loopback())
{
// If we find a loopback endpoint we use that one.
endpoint_to_connect_to = it->endpoint();
break;
}
endpoint_sequence->push_back(*it);
}

ECAL_SERVICE_LOG_DEBUG(logger_, "Successfully resolved endpoint to [" + endpoint_to_string(endpoint_to_connect_to) + "]. Connecting...");

const std::lock_guard<std::mutex> socket_lock(socket_mutex_);
socket_.async_connect(endpoint_to_connect_to
, service_call_queue_strand_.wrap([me = shared_from_this(), endpoint_to_connect_to](asio::error_code ec)
asio::async_connect(socket_
, *endpoint_sequence
, service_call_queue_strand_.wrap([me = shared_from_this(), endpoint_sequence](asio::error_code ec, const asio::ip::tcp::endpoint& endpoint)
{
if (ec)
{
const std::string message = "Failed to connect to endpoint [" + me->address_ + ":" + std::to_string(me->port_) + "]: " + ec.message();
me->logger_(LogLevel::Error, message);
me->handle_connection_loss_error(message);
return;
}
else
{
ECAL_SERVICE_LOG_DEBUG(me->logger_, "Successfully connected to endpoint [" + endpoint.address().to_string() + ":" + std::to_string(endpoint.port()) + "]");

// Disable Nagle's algorithm. Nagles Algorithm will otherwise cause the
// Socket to wait for more data, if it encounters a frame that can still
// fit more data. Obviously, this is an awfull default behaviour, if we
// want to transmit our data in a timely fashion.
{
asio::error_code socket_option_ec;
{
if (ec)
{
const std::string message = "Failed to connect to endpoint [" + endpoint_to_string(endpoint_to_connect_to) + "]: " + ec.message();
me->logger_(LogLevel::Error, message);
me->handle_connection_loss_error(message);
return;
}
else
{
ECAL_SERVICE_LOG_DEBUG(me->logger_, "Successfully connected to endpoint [" + endpoint_to_string(endpoint_to_connect_to) + "]");
const std::lock_guard<std::mutex> socket_lock(me->socket_mutex_);
me->socket_.set_option(asio::ip::tcp::no_delay(true), socket_option_ec);
}
if (socket_option_ec)
{
me->logger_(LogLevel::Warning, "[" + get_connection_info_string(me->socket_) + "] " + "Failed setting tcp::no_delay option: " + socket_option_ec.message());
}
}

// Disable Nagle's algorithm. Nagles Algorithm will otherwise cause the
// Socket to wait for more data, if it encounters a frame that can still
// fit more data. Obviously, this is an awfull default behaviour, if we
// want to transmit our data in a timely fashion.
{
asio::error_code socket_option_ec;
{
const std::lock_guard<std::mutex> socket_lock(me->socket_mutex_);
me->socket_.set_option(asio::ip::tcp::no_delay(true), socket_option_ec);
}
if (socket_option_ec)
{
me->logger_(LogLevel::Warning, "[" + get_connection_info_string(me->socket_) + "] " + "Failed setting tcp::no_delay option: " + socket_option_ec.message());
}
}
const std::string message = "Connected to server. Using protocol version 0.";
me->logger_(LogLevel::Info, "[" + get_connection_info_string(me->socket_) + "] " + message);

const std::string message = "Connected to server. Using protocol version 0.";
me->logger_(LogLevel::Info, "[" + get_connection_info_string(me->socket_) + "] " + message);
{
const std::lock_guard<std::mutex> lock(me->service_state_mutex_);
me->state_ = State::CONNECTED;
}

{
const std::lock_guard<std::mutex> lock(me->service_state_mutex_);
me->state_ = State::CONNECTED;
}
// Call event callback
me->event_callback_(eCAL::service::ClientEventType::Connected, message);

// Call event callback
me->event_callback_(eCAL::service::ClientEventType::Connected, message);
// Start sending service requests, if there are any
{
const std::lock_guard<std::mutex> lock(me->service_state_mutex_);
if (!me->service_call_queue_.empty())
{
// If there are service calls in the queue, we send the next one.
me->service_call_in_progress_ = true;
me->send_next_service_request(me->service_call_queue_.front().request, me->service_call_queue_.front().response_cb);
me->service_call_queue_.pop_front();
}
else
{
// If there are no more service calls to send, we go to error-peeking.
// While error peeking we basically do nothing, except from non-destructively
// reading 1 byte from the socket (i.e. without removing it from the socket).
// This will cause asio / the OS to notify us, when the server closed the connection.

// Start sending service requests, if there are any
{
const std::lock_guard<std::mutex> lock(me->service_state_mutex_);
if (!me->service_call_queue_.empty())
{
// If there are service calls in the queue, we send the next one.
me->service_call_in_progress_ = true;
me->send_next_service_request(me->service_call_queue_.front().request, me->service_call_queue_.front().response_cb);
me->service_call_queue_.pop_front();
}
else
{
// If there are no more service calls to send, we go to error-peeking.
// While error peeking we basically do nothing, except from non-destructively
// reading 1 byte from the socket (i.e. without removing it from the socket).
// This will cause asio / the OS to notify us, when the server closed the connection.

me->service_call_in_progress_ = false;
me->peek_for_error();
}
}
}
}));
me->service_call_in_progress_ = false;
me->peek_for_error();
}
}
}
}));
}

//////////////////////////////////////
Expand Down
83 changes: 38 additions & 45 deletions ecal/service/ecal_service/src/client_session_impl_v1.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -113,57 +113,50 @@ namespace eCAL

void ClientSessionV1::connect_to_endpoint(const asio::ip::tcp::resolver::iterator& resolved_endpoints)
{
// Look for the best endpoint to connect to. If possible, we use a loopback
// endpoint. Otherwise, we just use the first one.

auto endpoint_to_connect_to = resolved_endpoints->endpoint(); // Default to first endpoint
for (auto it = resolved_endpoints; it != asio::ip::tcp::resolver::iterator(); it++)
// Convert the resolved_endpoints iterator to an endpoint sequence
// (i.e. a vector of endpoints)
auto endpoint_sequence = std::make_shared<std::vector<asio::ip::tcp::endpoint>>();
for (auto it = resolved_endpoints; it != asio::ip::tcp::resolver::iterator(); ++it)
{
if (it->endpoint().address().is_loopback())
{
// If we find a loopback endpoint we use that one.
endpoint_to_connect_to = it->endpoint();
break;
}
endpoint_sequence->push_back(*it);
}

ECAL_SERVICE_LOG_DEBUG(logger_, "Successfully resolved endpoint to [" + endpoint_to_string(endpoint_to_connect_to) + "]. Connecting...");

const std::lock_guard<std::mutex> socket_lock(socket_mutex_);
socket_.async_connect(endpoint_to_connect_to
, service_call_queue_strand_.wrap([me = shared_from_this(), endpoint_to_connect_to](asio::error_code ec)
{
if (ec)
{
const std::string message = "Failed to connect to endpoint [" + endpoint_to_string(endpoint_to_connect_to) + "]: " + ec.message();
me->logger_(LogLevel::Error, message);
me->handle_connection_loss_error(message);
return;
}
else
{
ECAL_SERVICE_LOG_DEBUG(me->logger_, "Successfully connected to endpoint [" + endpoint_to_string(endpoint_to_connect_to) + "]");
asio::async_connect(socket_
, *endpoint_sequence
, service_call_queue_strand_.wrap([me = shared_from_this(), endpoint_sequence](asio::error_code ec, const asio::ip::tcp::endpoint& endpoint)
{
if (ec)
{
const std::string message = "Failed to connect to endpoint [" + me->address_ + ":" + std::to_string(me->port_) + "]: " + ec.message();
me->logger_(LogLevel::Error, message);
me->handle_connection_loss_error(message);
return;
}
else
{
ECAL_SERVICE_LOG_DEBUG(me->logger_, "Successfully connected to endpoint [" + endpoint.address().to_string() + ":" + std::to_string(endpoint.port()) + "]");

// Disable Nagle's algorithm. Nagles Algorithm will otherwise cause the
// Socket to wait for more data, if it encounters a frame that can still
// fit more data. Obviously, this is an awfull default behaviour, if we
// want to transmit our data in a timely fashion.
{
asio::error_code socket_option_ec;
{
const std::lock_guard<std::mutex> socket_lock(me->socket_mutex_);
me->socket_.set_option(asio::ip::tcp::no_delay(true), socket_option_ec);
}
if (socket_option_ec)
{
me->logger_(LogLevel::Warning, "[" + get_connection_info_string(me->socket_) + "] " + "Failed setting tcp::no_delay option: " + socket_option_ec.message());
}
}
// Disable Nagle's algorithm. Nagles Algorithm will otherwise cause the
// Socket to wait for more data, if it encounters a frame that can still
// fit more data. Obviously, this is an awfull default behaviour, if we
// want to transmit our data in a timely fashion.
{
asio::error_code socket_option_ec;
{
const std::lock_guard<std::mutex> socket_lock(me->socket_mutex_);
me->socket_.set_option(asio::ip::tcp::no_delay(true), socket_option_ec);
}
if (socket_option_ec)
{
me->logger_(LogLevel::Warning, "[" + get_connection_info_string(me->socket_) + "] " + "Failed setting tcp::no_delay option: " + socket_option_ec.message());
}
}

// Start sending the protocol handshake to the server. This will tell us the actual protocol version.
me->send_protocol_handshake_request();
}
}));
// Start sending the protocol handshake to the server. This will tell us the actual protocol version.
me->send_protocol_handshake_request();
}
}));
}

void ClientSessionV1::send_protocol_handshake_request()
Expand Down
6 changes: 6 additions & 0 deletions testing/ecal/clientserver_test/src/atomic_signalable.h
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,12 @@ class atomic_signalable
return cv.wait_for(lock, timeout, [&]() { return predicate(value); });
}

T get() const
{
std::lock_guard<std::mutex> lock(mutex);
return value;
}

bool operator==(T other) const
{
std::lock_guard<std::mutex> lock(mutex);
Expand Down
Loading

0 comments on commit 3f103b0

Please sign in to comment.