Skip to content

Commit

Permalink
transport(tcp): set timeout on send sockets (#419)
Browse files Browse the repository at this point in the history
  • Loading branch information
csegarragonz authored Apr 12, 2024
1 parent 709e479 commit 440189f
Show file tree
Hide file tree
Showing 5 changed files with 27 additions and 7 deletions.
3 changes: 2 additions & 1 deletion include/faabric/transport/tcp/SocketOptions.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,6 @@ bool isNonBlocking(int connFd);
void setBusyPolling(int connFd);

// Set timeout for blocking sockets
void setTimeoutMs(int connFd, int timeoutMs);
void setRecvTimeoutMs(int connFd, int timeoutMs);
void setSendTimeoutMs(int connFd, int timeoutMs);
}
2 changes: 1 addition & 1 deletion src/transport/tcp/RecvSocket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ void RecvSocket::setSocketOptions(int connFd)
}

// Set the timeout
setTimeoutMs(connFd, SocketTimeoutMs);
setRecvTimeoutMs(connFd, SocketTimeoutMs);
#endif
}

Expand Down
3 changes: 2 additions & 1 deletion src/transport/tcp/SendSocket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ void SendSocket::setSocketOptions(int connFd)
{
setNoDelay(connFd);
setQuickAck(connFd);
setSendTimeoutMs(connFd, SocketTimeoutMs);
}

void SendSocket::dial()
Expand Down Expand Up @@ -67,7 +68,7 @@ void SendSocket::dial()

void SendSocket::sendOne(const uint8_t* buffer, size_t bufferSize)
{
size_t sent = send(sock.get(), buffer, bufferSize, 0);
size_t sent = ::send(sock.get(), buffer, bufferSize, 0);
if (sent != bufferSize) {
SPDLOG_ERROR(
"TCP client error sending TCP message to {}:{} ({}/{}): {}",
Expand Down
18 changes: 17 additions & 1 deletion src/transport/tcp/SocketOptions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ void setBusyPolling(int connFd)
}
}

void setTimeoutMs(int connFd, int timeoutMs)
void setRecvTimeoutMs(int connFd, int timeoutMs)
{
struct timeval timeVal;
timeVal.tv_sec = timeoutMs / 1000;
Expand All @@ -119,4 +119,20 @@ void setTimeoutMs(int connFd, int timeoutMs)
throw std::runtime_error("Error setting recv timeout");
}
}

void setSendTimeoutMs(int connFd, int timeoutMs)
{
struct timeval timeVal;
timeVal.tv_sec = timeoutMs / 1000;
timeVal.tv_usec = 0;

int ret = ::setsockopt(
connFd, SOL_SOCKET, SO_SNDTIMEO, (const char*)&timeVal, sizeof(timeVal));
if (ret == -1) {
SPDLOG_ERROR("Error setting send timeout for socket {}: {}",
connFd,
std::strerror(errno));
throw std::runtime_error("Error setting send timeout");
}
}
}
8 changes: 5 additions & 3 deletions tests/test/transport/test_tcp_sockets.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,15 +60,16 @@ TEST_CASE("Test setting socket options", "[transport]")
dst.listen();
conn = dst.accept();

// Set options on the socket given by accept (i.e. the send socket)
// Set options on the socket given by accept (i.e. the recv socket)
setReuseAddr(conn);
setNoDelay(conn);
setQuickAck(conn);
setQuickAck(conn);
setBusyPolling(conn);
setNonBlocking(conn);
setBlocking(conn);
setTimeoutMs(conn, SocketTimeoutMs);
setRecvTimeoutMs(conn, SocketTimeoutMs);
setSendTimeoutMs(conn, SocketTimeoutMs);

REQUIRE(!isNonBlocking(conn));

Expand All @@ -88,7 +89,8 @@ TEST_CASE("Test setting socket options", "[transport]")
REQUIRE_THROWS(setBusyPolling(conn));
REQUIRE_THROWS(setNonBlocking(conn));
REQUIRE_THROWS(setBlocking(conn));
REQUIRE_THROWS(setTimeoutMs(conn, SocketTimeoutMs));
REQUIRE_THROWS(setRecvTimeoutMs(conn, SocketTimeoutMs));
REQUIRE_THROWS(setSendTimeoutMs(conn, SocketTimeoutMs));
}

TEST_CASE("Test send/recv one message using raw TCP sockets", "[transport]")
Expand Down

0 comments on commit 440189f

Please sign in to comment.