Skip to content

Commit

Permalink
Previously, if an emulated TCP socket is connect()ed and one attempts…
Browse files Browse the repository at this point in the history
… to send() back to back right after connecting, the send would fail because the socket is not yet actually connected.

It is a bit hard to imagine where that behavior would be useful.

So this PR changes this behavior to be identical to how connectionless UDP sockets are emulated: instead, connect() calls are always pretended to succeed (since we cannot synchronously establish if the connect would fail, so presume it'll work), and all send() calls that are issued while the socket is connecting will be queued to be sent after the socket actually connects.
  • Loading branch information
juj committed Sep 26, 2024
1 parent a0fc53c commit 3f02d35
Show file tree
Hide file tree
Showing 3 changed files with 90 additions and 17 deletions.
37 changes: 20 additions & 17 deletions src/library_sockfs.js
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ addToLibrary({
addr,
port,
socket: ws,
dgram_send_queue: []
msg_send_queue: []
};

SOCKFS.websocket_sock_ops.addPeer(sock, peer);
Expand All @@ -254,7 +254,7 @@ addToLibrary({
#if SOCKET_DEBUG
dbg('websocket queuing port message (port ' + sock.sport + ')');
#endif
peer.dgram_send_queue.push(new Uint8Array([
peer.msg_send_queue.push(new Uint8Array([
255, 255, 255, 255,
'p'.charCodeAt(0), 'o'.charCodeAt(0), 'r'.charCodeAt(0), 't'.charCodeAt(0),
((sock.sport & 0xff00) >> 8) , (sock.sport & 0xff)
Expand Down Expand Up @@ -283,13 +283,13 @@ addToLibrary({
Module['websocket'].emit('open', sock.stream.fd);

try {
var queued = peer.dgram_send_queue.shift();
var queued = peer.msg_send_queue.shift();
while (queued) {
#if SOCKET_DEBUG
dbg('websocket sending queued data (' + queued.byteLength + ' bytes): ' + [Array.prototype.slice.call(new Uint8Array(queued))]);
#endif
peer.socket.send(queued);
queued = peer.dgram_send_queue.shift();
queued = peer.msg_send_queue.shift();
}
} catch (e) {
// not much we can do here in the way of proper error handling as we've already
Expand Down Expand Up @@ -493,8 +493,9 @@ addToLibrary({
sock.daddr = peer.addr;
sock.dport = peer.port;

// always "fail" in non-blocking mode
throw new FS.ErrnoError({{{ cDefs.EINPROGRESS }}});
// because we cannot synchronously block to wait for the WebSocket
// connection to complete, we return here pretending that the connection
// was a success.
},
listen(sock, backlog) {
if (!ENVIRONMENT_IS_NODE) {
Expand Down Expand Up @@ -605,8 +606,10 @@ addToLibrary({
if (sock.type === {{{ cDefs.SOCK_STREAM }}}) {
if (!dest || dest.socket.readyState === dest.socket.CLOSING || dest.socket.readyState === dest.socket.CLOSED) {
throw new FS.ErrnoError({{{ cDefs.ENOTCONN }}});
#if SOCKET_DEBUG
} else if (dest.socket.readyState === dest.socket.CONNECTING) {
throw new FS.ErrnoError({{{ cDefs.EAGAIN }}});
dbg('socket sendmsg called while socket is still connecting.');
#endif
}
}

Expand All @@ -631,21 +634,21 @@ addToLibrary({
}
#endif

// if we're emulating a connection-less dgram socket and don't have
// a cached connection, queue the buffer to send upon connect and
// lie, saying the data was sent now.
if (sock.type === {{{ cDefs.SOCK_DGRAM }}}) {
if (!dest || dest.socket.readyState !== dest.socket.OPEN) {
// if we're not connected, open a new connection
// if we don't have a cached connectionless UDP datagram connection, or
// the TCP socket is still connecting, queue the message to be sent upon
// connect, and lie, saying the data was sent now.
if (!dest || dest.socket.readyState !== dest.socket.OPEN) {
// if we're not connected, open a new connection
if (sock.type === {{{ cDefs.SOCK_DGRAM }}}) {
if (!dest || dest.socket.readyState === dest.socket.CLOSING || dest.socket.readyState === dest.socket.CLOSED) {
dest = SOCKFS.websocket_sock_ops.createPeer(sock, addr, port);
}
}
#if SOCKET_DEBUG
dbg('websocket queuing (' + length + ' bytes): ' + [Array.prototype.slice.call(new Uint8Array(data))]);
dbg('websocket queuing (' + length + ' bytes): ' + [Array.prototype.slice.call(new Uint8Array(data))]);
#endif
dest.dgram_send_queue.push(data);
return length;
}
dest.msg_send_queue.push(data);
return length;
}

try {
Expand Down
65 changes: 65 additions & 0 deletions test/sockets/send_immediately_after_connect.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
// This test verifies that calling send() back to back right after calling
// connect() succeeds.

#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <emscripten/html5.h>
#include <string.h>
#include <stdio.h>

int sock;

EM_BOOL wait_for_recv(double, void *) {
// Poll read from the socket to see what we have received
char buf[1024] = {};
recv(sock, buf, sizeof(buf)-1, 0);
if (strlen(buf) > 0)
{
printf("%s\n", buf);
if (!strcmp(buf, "Hello"))
{
printf("Got hello, test finished.\n");
#ifdef REPORT_RESULT
REPORT_RESULT(0);
#endif
}
}
return EM_TRUE;
}

int main() {
// Connect socket to a WebSocket echo server
sockaddr_in addr = {
.sin_family = AF_INET,
.sin_port = htons(8089)
};
inet_pton(AF_INET, "127.0.0.1", &addr.sin_addr);
sock = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);
if (sock < 0)
{
printf("socket() failed to error %d\n", sock);
return sock;
}

// Connect to echo server.
int error = connect(sock, (sockaddr*)&addr, sizeof(addr));
if (error)
{
printf("connect() failed to error %d\n", error);
return error;
}

// Immediately send a message back-to-back from connecting to the socket
const char *msg = "Hello";
ssize_t bytes = send(sock, msg, strlen(msg), 0);
if (bytes != strlen(msg))
{
printf("send() failed to send %d bytes. Return value: %d\n", (int)strlen(msg), (int)bytes);
return bytes;
}

// Asynchronously wait until we get the message echoed back
emscripten_set_timeout_loop(wait_for_recv, 0, 0);
return 0;
}
5 changes: 5 additions & 0 deletions test/test_sockets.py
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,11 @@ def test_posix_proxy_sockets(self):
# Build and run the TCP echo client program with Emscripten
self.btest_exit('websocket/tcp_echo_client.c', args=['-lwebsocket', '-sPROXY_POSIX_SOCKETS', '-pthread', '-sPROXY_TO_PTHREAD'])

# Test that multiple pthreads calling send() on the same socket produces correct ordering semantics.
def test_sockets_send_immediately_after_connect(self):
with NodeJsWebSocketEchoServerProcess():
self.btest('sockets/send_immediately_after_connect.cpp', args=['-DSOCKET_DEBUG'], expected='0')


class sockets64(sockets):
def setUp(self):
Expand Down

0 comments on commit 3f02d35

Please sign in to comment.