Skip to content

Commit

Permalink
Services: Improved behavior when the first route to the server doesn'…
Browse files Browse the repository at this point in the history
…t work

Before, the Service Client would only try the very first route to the Service Server. Now, it will go through all routes until it finds one that works.
  • Loading branch information
FlorianReimold committed Feb 1, 2024
1 parent 8b30e33 commit 8a6d046
Show file tree
Hide file tree
Showing 2 changed files with 102 additions and 117 deletions.
136 changes: 64 additions & 72 deletions ecal/service/ecal_service/src/client_session_impl_v0.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -109,88 +109,80 @@ namespace eCAL

void ClientSessionV0::connect_to_endpoint(const asio::ip::tcp::resolver::iterator& resolved_endpoints)
{
// Look for the best endpoint to connect to. If possible, we use a loopback
// endpoint. Otherwise, we just use the first one.


auto endpoint_to_connect_to = resolved_endpoints->endpoint(); // Default to first endpoint
for (auto it = resolved_endpoints; it != asio::ip::tcp::resolver::iterator(); it++)
// Convert the resolved_endpoints iterator to an endpoint sequence
// (i.e. a vector of endpoints)
auto endpoint_sequence = std::make_shared<std::vector<asio::ip::tcp::endpoint>>();
for (auto it = resolved_endpoints; it != asio::ip::tcp::resolver::iterator(); ++it)
{
if (it->endpoint().address().is_loopback())
{
// If we find a loopback endpoint we use that one.
endpoint_to_connect_to = it->endpoint();
break;
}
endpoint_sequence->push_back(*it);
}

ECAL_SERVICE_LOG_DEBUG(logger_, "Successfully resolved endpoint to [" + endpoint_to_string(endpoint_to_connect_to) + "]. Connecting...");

const std::lock_guard<std::mutex> socket_lock(socket_mutex_);
socket_.async_connect(endpoint_to_connect_to
, service_call_queue_strand_.wrap([me = shared_from_this(), endpoint_to_connect_to](asio::error_code ec)
asio::async_connect(socket_
, *endpoint_sequence
, service_call_queue_strand_.wrap([me = shared_from_this(), endpoint_sequence](asio::error_code ec, const asio::ip::tcp::endpoint& endpoint)
{
if (ec)
{
const std::string message = "Failed to connect to endpoint [" + me->address_ + ":" + std::to_string(me->port_) + "]: " + ec.message();
me->logger_(LogLevel::Error, message);
me->handle_connection_loss_error(message);
return;
}
else
{
ECAL_SERVICE_LOG_DEBUG(me->logger_, "Successfully connected to endpoint [" + endpoint.address().to_string() + ":" + std::to_string(endpoint.port()) + "]");

// Disable Nagle's algorithm. Nagles Algorithm will otherwise cause the
// Socket to wait for more data, if it encounters a frame that can still
// fit more data. Obviously, this is an awfull default behaviour, if we
// want to transmit our data in a timely fashion.
{
asio::error_code socket_option_ec;
{
if (ec)
{
const std::string message = "Failed to connect to endpoint [" + endpoint_to_string(endpoint_to_connect_to) + "]: " + ec.message();
me->logger_(LogLevel::Error, message);
me->handle_connection_loss_error(message);
return;
}
else
{
ECAL_SERVICE_LOG_DEBUG(me->logger_, "Successfully connected to endpoint [" + endpoint_to_string(endpoint_to_connect_to) + "]");
const std::lock_guard<std::mutex> socket_lock(me->socket_mutex_);
me->socket_.set_option(asio::ip::tcp::no_delay(true), socket_option_ec);
}
if (socket_option_ec)
{
me->logger_(LogLevel::Warning, "[" + get_connection_info_string(me->socket_) + "] " + "Failed setting tcp::no_delay option: " + socket_option_ec.message());
}
}

// Disable Nagle's algorithm. Nagles Algorithm will otherwise cause the
// Socket to wait for more data, if it encounters a frame that can still
// fit more data. Obviously, this is an awfull default behaviour, if we
// want to transmit our data in a timely fashion.
{
asio::error_code socket_option_ec;
{
const std::lock_guard<std::mutex> socket_lock(me->socket_mutex_);
me->socket_.set_option(asio::ip::tcp::no_delay(true), socket_option_ec);
}
if (socket_option_ec)
{
me->logger_(LogLevel::Warning, "[" + get_connection_info_string(me->socket_) + "] " + "Failed setting tcp::no_delay option: " + socket_option_ec.message());
}
}
const std::string message = "Connected to server. Using protocol version 0.";
me->logger_(LogLevel::Info, "[" + get_connection_info_string(me->socket_) + "] " + message);

const std::string message = "Connected to server. Using protocol version 0.";
me->logger_(LogLevel::Info, "[" + get_connection_info_string(me->socket_) + "] " + message);
{
const std::lock_guard<std::mutex> lock(me->service_state_mutex_);
me->state_ = State::CONNECTED;
}

{
const std::lock_guard<std::mutex> lock(me->service_state_mutex_);
me->state_ = State::CONNECTED;
}
// Call event callback
me->event_callback_(eCAL::service::ClientEventType::Connected, message);

// Call event callback
me->event_callback_(eCAL::service::ClientEventType::Connected, message);
// Start sending service requests, if there are any
{
const std::lock_guard<std::mutex> lock(me->service_state_mutex_);
if (!me->service_call_queue_.empty())
{
// If there are service calls in the queue, we send the next one.
me->service_call_in_progress_ = true;
me->send_next_service_request(me->service_call_queue_.front().request, me->service_call_queue_.front().response_cb);
me->service_call_queue_.pop_front();
}
else
{
// If there are no more service calls to send, we go to error-peeking.
// While error peeking we basically do nothing, except from non-destructively
// reading 1 byte from the socket (i.e. without removing it from the socket).
// This will cause asio / the OS to notify us, when the server closed the connection.

// Start sending service requests, if there are any
{
const std::lock_guard<std::mutex> lock(me->service_state_mutex_);
if (!me->service_call_queue_.empty())
{
// If there are service calls in the queue, we send the next one.
me->service_call_in_progress_ = true;
me->send_next_service_request(me->service_call_queue_.front().request, me->service_call_queue_.front().response_cb);
me->service_call_queue_.pop_front();
}
else
{
// If there are no more service calls to send, we go to error-peeking.
// While error peeking we basically do nothing, except from non-destructively
// reading 1 byte from the socket (i.e. without removing it from the socket).
// This will cause asio / the OS to notify us, when the server closed the connection.

me->service_call_in_progress_ = false;
me->peek_for_error();
}
}
}
}));
me->service_call_in_progress_ = false;
me->peek_for_error();
}
}
}
}));
}

//////////////////////////////////////
Expand Down
83 changes: 38 additions & 45 deletions ecal/service/ecal_service/src/client_session_impl_v1.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -113,57 +113,50 @@ namespace eCAL

void ClientSessionV1::connect_to_endpoint(const asio::ip::tcp::resolver::iterator& resolved_endpoints)
{
// Look for the best endpoint to connect to. If possible, we use a loopback
// endpoint. Otherwise, we just use the first one.

auto endpoint_to_connect_to = resolved_endpoints->endpoint(); // Default to first endpoint
for (auto it = resolved_endpoints; it != asio::ip::tcp::resolver::iterator(); it++)
// Convert the resolved_endpoints iterator to an endpoint sequence
// (i.e. a vector of endpoints)
auto endpoint_sequence = std::make_shared<std::vector<asio::ip::tcp::endpoint>>();
for (auto it = resolved_endpoints; it != asio::ip::tcp::resolver::iterator(); ++it)
{
if (it->endpoint().address().is_loopback())
{
// If we find a loopback endpoint we use that one.
endpoint_to_connect_to = it->endpoint();
break;
}
endpoint_sequence->push_back(*it);
}

ECAL_SERVICE_LOG_DEBUG(logger_, "Successfully resolved endpoint to [" + endpoint_to_string(endpoint_to_connect_to) + "]. Connecting...");

const std::lock_guard<std::mutex> socket_lock(socket_mutex_);
socket_.async_connect(endpoint_to_connect_to
, service_call_queue_strand_.wrap([me = shared_from_this(), endpoint_to_connect_to](asio::error_code ec)
{
if (ec)
{
const std::string message = "Failed to connect to endpoint [" + endpoint_to_string(endpoint_to_connect_to) + "]: " + ec.message();
me->logger_(LogLevel::Error, message);
me->handle_connection_loss_error(message);
return;
}
else
{
ECAL_SERVICE_LOG_DEBUG(me->logger_, "Successfully connected to endpoint [" + endpoint_to_string(endpoint_to_connect_to) + "]");
asio::async_connect(socket_
, *endpoint_sequence
, service_call_queue_strand_.wrap([me = shared_from_this(), endpoint_sequence](asio::error_code ec, const asio::ip::tcp::endpoint& endpoint)
{
if (ec)
{
const std::string message = "Failed to connect to endpoint [" + me->address_ + ":" + std::to_string(me->port_) + "]: " + ec.message();
me->logger_(LogLevel::Error, message);
me->handle_connection_loss_error(message);
return;
}
else
{
ECAL_SERVICE_LOG_DEBUG(me->logger_, "Successfully connected to endpoint [" + endpoint.address().to_string() + ":" + std::to_string(endpoint.port()) + "]");

// Disable Nagle's algorithm. Nagles Algorithm will otherwise cause the
// Socket to wait for more data, if it encounters a frame that can still
// fit more data. Obviously, this is an awfull default behaviour, if we
// want to transmit our data in a timely fashion.
{
asio::error_code socket_option_ec;
{
const std::lock_guard<std::mutex> socket_lock(me->socket_mutex_);
me->socket_.set_option(asio::ip::tcp::no_delay(true), socket_option_ec);
}
if (socket_option_ec)
{
me->logger_(LogLevel::Warning, "[" + get_connection_info_string(me->socket_) + "] " + "Failed setting tcp::no_delay option: " + socket_option_ec.message());
}
}
// Disable Nagle's algorithm. Nagles Algorithm will otherwise cause the
// Socket to wait for more data, if it encounters a frame that can still
// fit more data. Obviously, this is an awfull default behaviour, if we
// want to transmit our data in a timely fashion.
{
asio::error_code socket_option_ec;
{
const std::lock_guard<std::mutex> socket_lock(me->socket_mutex_);
me->socket_.set_option(asio::ip::tcp::no_delay(true), socket_option_ec);
}
if (socket_option_ec)
{
me->logger_(LogLevel::Warning, "[" + get_connection_info_string(me->socket_) + "] " + "Failed setting tcp::no_delay option: " + socket_option_ec.message());
}
}

// Start sending the protocol handshake to the server. This will tell us the actual protocol version.
me->send_protocol_handshake_request();
}
}));
// Start sending the protocol handshake to the server. This will tell us the actual protocol version.
me->send_protocol_handshake_request();
}
}));
}

void ClientSessionV1::send_protocol_handshake_request()
Expand Down

0 comments on commit 8a6d046

Please sign in to comment.