diff --git a/CHANGELOG.md b/CHANGELOG.md
index ef64169b3..661788e0e 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -11,6 +11,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Added `code:all_loaded/0` and `code:all_available/0`
- Added `erlang:split_binary/2`
- Added support for external pids and encoded pids in external terms
+- Added support for socket asynchronous API for `recv` and `recvfrom`.
## [0.6.6] - Unreleased
diff --git a/libs/estdlib/src/gen_tcp_socket.erl b/libs/estdlib/src/gen_tcp_socket.erl
index 6d808c417..86079a42e 100644
--- a/libs/estdlib/src/gen_tcp_socket.erl
+++ b/libs/estdlib/src/gen_tcp_socket.erl
@@ -339,10 +339,7 @@ handle_cast(_Request, State) ->
{noreply, State}.
%% @hidden
-handle_info({select, _Socket, Ref, ready_input}, State) ->
- ?LOG_DEBUG("handle_info [~p], ~p]", [
- {select, _Socket, Ref, ready_input}, State
- ]),
+handle_info({'$socket', _Socket, select, Ref}, State) ->
%% TODO cancel timer
case maps:get(Ref, State#state.pending_selects, undefined) of
undefined ->
@@ -366,6 +363,28 @@ handle_info({select, _Socket, Ref, ready_input}, State) ->
pending_selects = maps:remove(Ref, State#state.pending_selects)
}}
end;
+handle_info({'$socket', Socket, abort, {Ref, closed}}, State) ->
+ %% TODO cancel timer
+ case maps:get(Ref, State#state.pending_selects, undefined) of
+ undefined ->
+ ?LOG_WARNING("Unable to find select ref ~p in pending selects", [Ref]),
+ socket:nif_select_stop(Socket),
+ {noreply, State};
+ {accept, From, _AcceptingProc, _Timeout} ->
+ socket:nif_select_stop(Socket),
+ gen_server:reply(From, {error, closed}),
+ {noreply, State};
+ active ->
+ WrappedSocket = {?GEN_TCP_MONIKER, self(), ?MODULE},
+ State#state.controlling_process ! {tcp_closed, WrappedSocket},
+ {noreply, State};
+ {passive, From, _Length, _Timeout} ->
+ socket:nif_select_stop(Socket),
+ gen_server:reply(From, {error, closed}),
+ {noreply, State#state{
+ pending_selects = maps:remove(Ref, State#state.pending_selects)
+ }}
+ end;
handle_info({timeout, Ref, From}, State) ->
?LOG_DEBUG("handle_info [~p], ~p]", [
{timeout, Ref, From}, State
diff --git a/libs/estdlib/src/gen_udp_socket.erl b/libs/estdlib/src/gen_udp_socket.erl
index 56aa27fe5..c3a561e53 100644
--- a/libs/estdlib/src/gen_udp_socket.erl
+++ b/libs/estdlib/src/gen_udp_socket.erl
@@ -242,7 +242,7 @@ handle_cast(_Request, State) ->
{noreply, State}.
%% @hidden
-handle_info({select, _Socket, Ref, ready_input}, State) ->
+handle_info({'$socket', _Socket, select, Ref}, State) ->
case maps:get(Ref, State#state.pending_selects, undefined) of
undefined ->
?LOG_INFO("Unable to find select ref ~p in pending selects", [Ref]),
diff --git a/libs/estdlib/src/socket.erl b/libs/estdlib/src/socket.erl
index c7f95c807..0de6087f9 100644
--- a/libs/estdlib/src/socket.erl
+++ b/libs/estdlib/src/socket.erl
@@ -38,6 +38,7 @@
send/2,
sendto/3,
setopt/3,
+ getopt/2,
connect/2,
shutdown/2
]).
@@ -66,7 +67,9 @@
-type in_addr() :: {0..255, 0..255, 0..255, 0..255}.
-type port_number() :: 0..65535.
--type socket_option() :: {socket, reuseaddr} | {socket, linger}.
+-type socket_option() ::
+ {socket, reuseaddr | linger | type}
+ | {otp, recvbuf}.
-export_type([
socket/0,
@@ -242,7 +245,7 @@ accept(Socket, Timeout) ->
case ?MODULE:nif_select_read(Socket, Ref) of
ok ->
receive
- {select, _AcceptedSocket, Ref, ready_input} ->
+ {'$socket', Socket, select, Ref} ->
case ?MODULE:nif_accept(Socket) of
{error, closed} = E ->
?MODULE:nif_select_stop(Socket),
@@ -250,14 +253,15 @@ accept(Socket, Timeout) ->
R ->
R
end;
- {closed, Ref} ->
+ {'$socket', Socket, abort, {Ref, closed}} ->
% socket was closed by another process
% TODO: we need to handle:
% (a) SELECT_STOP being scheduled
- % (b) flush of messages as we can have both
- % {closed, Ref} and {select, _, Ref, _} in the
+ % (b) flush of messages as we can have both in the
% queue
- {error, closed}
+ {error, closed};
+ Other ->
+ {error, {accept, unexpected, Other, {'$socket', Socket, select, Ref}}}
after Timeout ->
{error, timeout}
end;
@@ -296,25 +300,60 @@ recv(Socket, Length) ->
%% `{ok, Data} = socket:recv(ConnectedSocket)'
%% @end
%%-----------------------------------------------------------------------------
--spec recv(Socket :: socket(), Length :: non_neg_integer(), Timeout :: timeout()) ->
- {ok, Data :: binary()} | {error, Reason :: term()}.
+-spec recv(
+ Socket :: socket(), Length :: non_neg_integer(), Timeout :: timeout() | nowait | reference()
+) ->
+ {ok, Data :: binary()}
+ | {select, {select_info, recvfrom, reference()}}
+ | {select, {{select_info, recvfrom, reference()}, Data :: binary()}}
+ | {error, Reason :: term()}.
+recv(Socket, Length, 0) ->
+ recv0_noselect(Socket, Length);
+recv(Socket, 0, Timeout) when is_integer(Timeout) orelse Timeout =:= infinity ->
+ recv0(Socket, 0, Timeout);
+recv(Socket, Length, nowait) ->
+ recv0_nowait(Socket, Length, erlang:make_ref());
+recv(Socket, Length, Ref) when is_reference(Ref) ->
+ recv0_nowait(Socket, Length, Ref);
recv(Socket, Length, Timeout) ->
+ case ?MODULE:getopt(Socket, {socket, type}) of
+ {ok, stream} when Timeout =/= infinity ->
+ recv0_r(Socket, Length, Timeout, erlang:system_time(millisecond) + Timeout, []);
+ {ok, stream} when Timeout =:= infinity ->
+ recv0_r(Socket, Length, Timeout, undefined, []);
+ _ ->
+ recv0(Socket, Length, Timeout)
+ end.
+
+recv0_noselect(Socket, Length) ->
+ case ?MODULE:nif_recv(Socket, Length) of
+ {error, _} = E ->
+ E;
+ {ok, Data} when Length =:= 0 orelse byte_size(Data) =:= Length ->
+ {ok, Data};
+ {ok, Data} ->
+ case ?MODULE:getopt(Socket, {socket, type}) of
+ {ok, stream} ->
+ {error, {timeout, Data}};
+ {ok, dgram} ->
+ {ok, Data}
+ end
+ end.
+
+recv0(Socket, Length, Timeout) ->
Ref = erlang:make_ref(),
- ?TRACE("select read for recv. self=~p ref=~p~n", [self(), Ref]),
case ?MODULE:nif_select_read(Socket, Ref) of
ok ->
receive
- {select, _AcceptedSocket, Ref, ready_input} ->
+ {'$socket', Socket, select, Ref} ->
case ?MODULE:nif_recv(Socket, Length) of
{error, _} = E ->
?MODULE:nif_select_stop(Socket),
E;
- % TODO: Assemble data to have more if Length > byte_size(Data)
- % as long as timeout did not expire
{ok, Data} ->
{ok, Data}
end;
- {closed, Ref} ->
+ {'$socket', Socket, abort, {Ref, closed}} ->
% socket was closed by another process
% TODO: see above in accept/2
{error, closed}
@@ -325,6 +364,72 @@ recv(Socket, Length, Timeout) ->
Error
end.
+recv0_nowait(Socket, Length, Ref) ->
+ case ?MODULE:nif_recv(Socket, Length) of
+ {error, timeout} ->
+ case ?MODULE:nif_select_read(Socket, Ref) of
+ ok ->
+ {select, {select_info, recv, Ref}};
+ {error, _} = Error1 ->
+ Error1
+ end;
+ {error, _} = E ->
+ E;
+ {ok, Data} when byte_size(Data) < Length ->
+ case ?MODULE:getopt(Socket, {socket, type}) of
+ {ok, stream} ->
+ case ?MODULE:nif_select_read(Socket, Ref) of
+ ok ->
+ {select, {{select_info, recv, Ref}, Data}};
+ {error, _} = Error1 ->
+ Error1
+ end;
+ {ok, dgram} ->
+ {ok, Data}
+ end;
+ {ok, Data} ->
+ {ok, Data}
+ end.
+
+recv0_r(Socket, Length, Timeout, EndQuery, Acc) ->
+ Ref = erlang:make_ref(),
+ case ?MODULE:nif_select_read(Socket, Ref) of
+ ok ->
+ receive
+ {'$socket', Socket, select, Ref} ->
+ case ?MODULE:nif_recv(Socket, Length) of
+ {error, _} = E ->
+ ?MODULE:nif_select_stop(Socket),
+ E;
+ {ok, Data} ->
+ NewAcc = [Data | Acc],
+ Remaining = Length - byte_size(Data),
+ case Remaining of
+ 0 ->
+ {ok, list_to_binary(lists:reverse(NewAcc))};
+ _ ->
+ NewTimeout =
+ case Timeout of
+ infinity -> infinity;
+ _ -> EndQuery - erlang:system_time(millisecond)
+ end,
+ recv0_r(Socket, Remaining, NewTimeout, EndQuery, NewAcc)
+ end
+ end;
+ {'$socket', Socket, abort, {Ref, closed}} ->
+ % socket was closed by another process
+ % TODO: see above in accept/2
+ {error, closed}
+ after Timeout ->
+ case Acc of
+ [] -> {error, timeout};
+ _ -> {error, {timeout, list_to_binary(lists:reverse(Acc))}}
+ end
+ end;
+ {error, _Reason} = Error ->
+ Error
+ end.
+
%%-----------------------------------------------------------------------------
%% @equiv socket:recvfrom(Socket, 0)
%% @end
@@ -367,25 +472,43 @@ recvfrom(Socket, Length) ->
%% bytes are available and return these bytes.
%% @end
%%-----------------------------------------------------------------------------
--spec recvfrom(Socket :: socket(), Length :: non_neg_integer(), Timeout :: timeout()) ->
- {ok, {Address :: sockaddr(), Data :: binary()}} | {error, Reason :: term()}.
+-spec recvfrom(
+ Socket :: socket(), Length :: non_neg_integer(), Timeout :: timeout() | nowait | reference()
+) ->
+ {ok, {Address :: sockaddr(), Data :: binary()}}
+ | {select, {select_info, recvfrom, reference()}}
+ | {error, Reason :: term()}.
+recvfrom(Socket, Length, 0) ->
+ recvfrom0_noselect(Socket, Length);
+recvfrom(Socket, Length, nowait) ->
+ recvfrom0_nowait(Socket, Length, erlang:make_ref());
+recvfrom(Socket, Length, Ref) when is_reference(Ref) ->
+ recvfrom0_nowait(Socket, Length, Ref);
recvfrom(Socket, Length, Timeout) ->
+ recvfrom0(Socket, Length, Timeout).
+
+recvfrom0_noselect(Socket, Length) ->
+ case ?MODULE:nif_recvfrom(Socket, Length) of
+ {error, _} = E ->
+ E;
+ {ok, {_Address, _Data}} = Reply ->
+ Reply
+ end.
+
+recvfrom0(Socket, Length, Timeout) ->
Ref = erlang:make_ref(),
- ?TRACE("select read for recvfrom. self=~p ref=~p", [self(), Ref]),
case ?MODULE:nif_select_read(Socket, Ref) of
ok ->
receive
- {select, _AcceptedSocket, Ref, ready_input} ->
+ {'$socket', Socket, select, Ref} ->
case ?MODULE:nif_recvfrom(Socket, Length) of
{error, _} = E ->
?MODULE:nif_select_stop(Socket),
E;
- % TODO: Assemble data to have more if Length > byte_size(Data)
- % as long as timeout did not expire
- {ok, {Address, Data}} ->
- {ok, {Address, Data}}
+ {ok, {_Address, _Data}} = Reply ->
+ Reply
end;
- {closed, Ref} ->
+ {'$socket', Socket, abort, {Ref, closed}} ->
% socket was closed by another process
% TODO: see above in accept/2
{error, closed}
@@ -396,6 +519,21 @@ recvfrom(Socket, Length, Timeout) ->
Error
end.
+recvfrom0_nowait(Socket, Length, Ref) ->
+ case ?MODULE:nif_recvfrom(Socket, Length) of
+ {error, timeout} ->
+ case ?MODULE:nif_select_read(Socket, Ref) of
+ ok ->
+ {select, {select_info, recvfrom, Ref}};
+ {error, _} = SelectError ->
+ SelectError
+ end;
+ {error, _} = RecvError ->
+ RecvError;
+ {ok, {_Address, _Data}} = Reply ->
+ Reply
+ end.
+
%%-----------------------------------------------------------------------------
%% @param Socket the socket
%% @param Data the data to send
@@ -443,11 +581,32 @@ sendto(Socket, Data, Dest) when is_binary(Data) ->
sendto(Socket, Data, Dest) ->
?MODULE:nif_sendto(Socket, erlang:iolist_to_binary(Data), Dest).
+%%-----------------------------------------------------------------------------
+%% @param Socket the socket
+%% @param SocketOption the option
+%% @returns `{ok, Value}' if successful; `{error, Reason}', otherwise.
+%% @doc Get a socket option.
+%%
+%% Currently, the following options are supported:
+%%
+%% `{socket, type}' | `type()' |
+%%
+%%
+%% Example:
+%%
+%% `{ok, stream} = socket:getopt(ListeningSocket, {socket, type})'
+%% @end
+%%-----------------------------------------------------------------------------
+-spec getopt(Socket :: socket(), SocketOption :: socket_option()) ->
+ {ok, Value :: term()} | {error, Reason :: term()}.
+getopt(_Socket, _SocketOption) ->
+ erlang:nif_error(undefined).
+
%%-----------------------------------------------------------------------------
%% @param Socket the socket
%% @param SocketOption the option
%% @param Value the option value
-%% @returns `{ok, Address}' if successful; `{error, Reason}', otherwise.
+%% @returns `ok' if successful; `{error, Reason}', otherwise.
%% @doc Set a socket option.
%%
%% Set an option on a socket.
@@ -456,6 +615,7 @@ sendto(Socket, Data, Dest) ->
%%
%% `{socket, reuseaddr}' | `boolean()' |
%% `{socket, linger}' | `#{onoff => boolean(), linger => non_neg_integer()}' |
+%% `{otp, recvbuf}' | `non_neg_integer()' |
%%
%%
%% Example:
@@ -465,7 +625,7 @@ sendto(Socket, Data, Dest) ->
%% @end
%%-----------------------------------------------------------------------------
-spec setopt(Socket :: socket(), SocketOption :: socket_option(), Value :: term()) ->
- ok | {error, Reason :: term()}.
+ ok | {error, any()}.
setopt(_Socket, _SocketOption, _Value) ->
erlang:nif_error(undefined).
diff --git a/libs/estdlib/src/ssl.erl b/libs/estdlib/src/ssl.erl
index 95919765d..46385b066 100644
--- a/libs/estdlib/src/ssl.erl
+++ b/libs/estdlib/src/ssl.erl
@@ -189,9 +189,9 @@ handshake_loop(SSLContext, Socket) ->
case socket:nif_select_read(Socket, Ref) of
ok ->
receive
- {select, _SocketResource, Ref, ready_input} ->
+ {'$socket', Socket, select, Ref} ->
handshake_loop(SSLContext, Socket);
- {closed, Ref} ->
+ {'$socket', Socket, abort, {Ref, closed}} ->
ok = socket:close(Socket),
{error, closed}
end;
@@ -242,9 +242,9 @@ close_notify_loop(SSLContext, Socket) ->
case socket:nif_select_read(Socket, Ref) of
ok ->
receive
- {select, _SocketResource, Ref, ready_input} ->
+ {'$socket', Socket, select, Ref} ->
close_notify_loop(SSLContext, Socket);
- {closed, Ref} ->
+ {'$socket', Socket, abort, {Ref, closed}} ->
ok = socket:close(Socket),
{error, closed}
end;
@@ -274,9 +274,9 @@ send({SSLContext, Socket} = SSLSocket, Binary) ->
case socket:nif_select_read(Socket, Ref) of
ok ->
receive
- {select, _SocketResource, Ref, ready_input} ->
+ {'$socket', Socket, select, Ref} ->
send(SSLSocket, Binary);
- {closed, Ref} ->
+ {'$socket', Socket, abort, {Ref, closed}} ->
{error, closed}
end;
{error, _Reason} = Error ->
@@ -309,9 +309,9 @@ recv0({SSLContext, Socket} = SSLSocket, Length, Remaining, Acc) ->
case socket:nif_select_read(Socket, Ref) of
ok ->
receive
- {select, _SocketResource, Ref, ready_input} ->
+ {'$socket', Socket, select, Ref} ->
recv0(SSLSocket, Length, Remaining, Acc);
- {closed, Ref} ->
+ {'$socket', Socket, abort, {Ref, closed}} ->
{error, closed}
end;
{error, _Reason} = Error ->
diff --git a/src/libAtomVM/defaultatoms.c b/src/libAtomVM/defaultatoms.c
index 228dd79fc..4f71b36c4 100644
--- a/src/libAtomVM/defaultatoms.c
+++ b/src/libAtomVM/defaultatoms.c
@@ -165,6 +165,13 @@ static const char *const global_atom = "\x6" "global";
static const char *const nonode_at_nohost_atom = "\xD" "nonode@nohost";
static const char *const net_kernel_atom = "\xA" "net_kernel";
+static const char *const dollar_socket_atom = "\x7" "$socket";
+static const char *const abort_atom = "\x5" "abort";
+static const char *const family_atom = "\x6" "family";
+static const char *const inet_atom = "\x4" "inet";
+static const char *const timeout_atom = "\x7" "timeout";
+
+
void defaultatoms_init(GlobalContext *glb)
{
int ok = 1;
@@ -314,6 +321,12 @@ void defaultatoms_init(GlobalContext *glb)
ok &= globalcontext_insert_atom(glb, nonode_at_nohost_atom) == NONODE_AT_NOHOST_ATOM_INDEX;
ok &= globalcontext_insert_atom(glb, net_kernel_atom) == NET_KERNEL_ATOM_INDEX;
+ ok &= globalcontext_insert_atom(glb, dollar_socket_atom) == DOLLAR_SOCKET_ATOM_INDEX;
+ ok &= globalcontext_insert_atom(glb, abort_atom) == ABORT_ATOM_INDEX;
+ ok &= globalcontext_insert_atom(glb, family_atom) == FAMILY_ATOM_INDEX;
+ ok &= globalcontext_insert_atom(glb, inet_atom) == INET_ATOM_INDEX;
+ ok &= globalcontext_insert_atom(glb, timeout_atom) == TIMEOUT_ATOM_INDEX;
+
if (!ok) {
AVM_ABORT();
}
diff --git a/src/libAtomVM/defaultatoms.h b/src/libAtomVM/defaultatoms.h
index 75b3ac9a5..f5ebbc551 100644
--- a/src/libAtomVM/defaultatoms.h
+++ b/src/libAtomVM/defaultatoms.h
@@ -174,7 +174,13 @@ extern "C" {
#define NONODE_AT_NOHOST_ATOM_INDEX 112
#define NET_KERNEL_ATOM_INDEX 113
-#define PLATFORM_ATOMS_BASE_INDEX 114
+#define DOLLAR_SOCKET_ATOM_INDEX 114
+#define ABORT_ATOM_INDEX 115
+#define FAMILY_ATOM_INDEX 116
+#define INET_ATOM_INDEX 117
+#define TIMEOUT_ATOM_INDEX 118
+
+#define PLATFORM_ATOMS_BASE_INDEX 119
#define FALSE_ATOM TERM_FROM_ATOM_INDEX(FALSE_ATOM_INDEX)
#define TRUE_ATOM TERM_FROM_ATOM_INDEX(TRUE_ATOM_INDEX)
@@ -323,6 +329,12 @@ extern "C" {
#define NONODE_AT_NOHOST_ATOM TERM_FROM_ATOM_INDEX(NONODE_AT_NOHOST_ATOM_INDEX)
#define NET_KERNEL_ATOM TERM_FROM_ATOM_INDEX(NET_KERNEL_ATOM_INDEX)
+#define DOLLAR_SOCKET_ATOM TERM_FROM_ATOM_INDEX(DOLLAR_SOCKET_ATOM_INDEX)
+#define ABORT_ATOM TERM_FROM_ATOM_INDEX(ABORT_ATOM_INDEX)
+#define FAMILY_ATOM TERM_FROM_ATOM_INDEX(FAMILY_ATOM_INDEX)
+#define INET_ATOM TERM_FROM_ATOM_INDEX(INET_ATOM_INDEX)
+#define TIMEOUT_ATOM TERM_FROM_ATOM_INDEX(TIMEOUT_ATOM_INDEX)
+
void defaultatoms_init(GlobalContext *glb);
void platform_defaultatoms_init(GlobalContext *glb);
diff --git a/src/libAtomVM/erl_nif.h b/src/libAtomVM/erl_nif.h
index 23d3f256a..b13114d77 100644
--- a/src/libAtomVM/erl_nif.h
+++ b/src/libAtomVM/erl_nif.h
@@ -215,6 +215,32 @@ ERL_NIF_TERM enif_make_resource(ErlNifEnv *env, void *obj);
*/
int enif_select(ErlNifEnv *env, ErlNifEvent event, enum ErlNifSelectFlags mode, void *obj, const ErlNifPid *pid, ERL_NIF_TERM ref);
+/**
+ * @brief Variant of `enif_select` where sent message is `msg` instead of default.
+ *
+ * @param env current environment
+ * @param event event object (typically a file descriptor)
+ * @param obj resource object working as a container of the event object.
+ * @param pid process id to send a message to or NULL to use the current process (from `env`)
+ * @param msg message to send (copied).
+ * @param msg_env must be NULL.
+ * @return a negative value on failure, 0 or flags on success.
+ */
+int enif_select_read(ErlNifEnv *env, ErlNifEvent event, void *obj, const ErlNifPid *pid, ERL_NIF_TERM msg, ErlNifEnv *msg_env);
+
+/**
+ * @brief Variant of `enif_select` where sent message is `msg` instead of default.
+ *
+ * @param env current environment
+ * @param event event object (typically a file descriptor)
+ * @param obj resource object working as a container of the event object.
+ * @param pid process id to send a message to or NULL to use the current process (from `env`)
+ * @param msg message to send (copied).
+ * @param msg_env must be NULL.
+ * @return a negative value on failure, 0 or flags on success.
+ */
+int enif_select_write(ErlNifEnv *env, ErlNifEvent event, void *obj, const ErlNifPid *pid, ERL_NIF_TERM msg, ErlNifEnv *msg_env);
+
/**
* @brief Monitor a process by using a resource object.
* @details The monitor is automatically removed after being triggered or if the
diff --git a/src/libAtomVM/globalcontext.c b/src/libAtomVM/globalcontext.c
index f10001828..b27eb127a 100644
--- a/src/libAtomVM/globalcontext.c
+++ b/src/libAtomVM/globalcontext.c
@@ -334,6 +334,18 @@ bool globalcontext_process_exists(GlobalContext *glb, int32_t process_id)
return p != NULL;
}
+enum SendMessageResult globalcontext_post_message(GlobalContext *glb, int32_t process_id, Message *m)
+{
+ Context *p = globalcontext_get_process_lock(glb, process_id);
+ enum SendMessageResult result = SEND_MESSAGE_PROCESS_NOT_FOUND;
+ if (p) {
+ mailbox_post_message(p, &m->base);
+ globalcontext_get_process_unlock(glb, p);
+ result = SEND_MESSAGE_OK;
+ }
+ return result;
+}
+
void globalcontext_send_message(GlobalContext *glb, int32_t process_id, term t)
{
Context *p = globalcontext_get_process_lock(glb, process_id);
@@ -352,15 +364,17 @@ void globalcontext_send_message_nolock(GlobalContext *glb, int32_t process_id, t
}
#ifdef AVM_TASK_DRIVER_ENABLED
-void globalcontext_send_message_from_task(GlobalContext *glb, int32_t process_id, enum MessageType type, term t)
+static inline enum SendMessageResult globalcontext_send_message_from_task_common(GlobalContext *glb, int32_t process_id, MailboxMessage *message, enum MessageType type, term t)
{
- MailboxMessage *message = NULL;
+ enum SendMessageResult result = SEND_MESSAGE_PROCESS_NOT_FOUND;
bool postponed = false;
#ifndef AVM_NO_SMP
Context *p = NULL;
if (globalcontext_get_process_trylock(glb, process_id, &p)) {
if (p) {
- message = mailbox_message_create_from_term(type, t);
+ if (message == NULL) {
+ message = mailbox_message_create_from_term(type, t);
+ }
// Ensure we can acquire the spinlock
if (smp_spinlock_trylock(&glb->processes_spinlock)) {
// We can send the message.
@@ -371,6 +385,7 @@ void globalcontext_send_message_from_task(GlobalContext *glb, int32_t process_id
postponed = true;
}
globalcontext_get_process_unlock(glb, p);
+ result = SEND_MESSAGE_OK;
}
} else {
postponed = true;
@@ -397,7 +412,20 @@ void globalcontext_send_message_from_task(GlobalContext *glb, int32_t process_id
} while (!ATOMIC_COMPARE_EXCHANGE_WEAK_PTR(&glb->message_queue, ¤t_first, queued_item));
// Make sure the scheduler is busy
sys_signal(glb);
+
+ result = SEND_MESSAGE_OK;
}
+ return result;
+}
+
+enum SendMessageResult globalcontext_post_message_from_task(GlobalContext *glb, int32_t process_id, Message *message)
+{
+ return globalcontext_send_message_from_task_common(glb, process_id, &message->base, NormalMessage, term_nil());
+}
+
+void globalcontext_send_message_from_task(GlobalContext *glb, int32_t process_id, enum MessageType type, term t)
+{
+ globalcontext_send_message_from_task_common(glb, process_id, NULL, type, t);
}
static inline void globalcontext_process_message_queue(GlobalContext *glb)
diff --git a/src/libAtomVM/globalcontext.h b/src/libAtomVM/globalcontext.h
index 1aadb2406..375f3a7a3 100644
--- a/src/libAtomVM/globalcontext.h
+++ b/src/libAtomVM/globalcontext.h
@@ -71,6 +71,11 @@ typedef struct GlobalContext GlobalContext;
typedef struct MailboxMessage MailboxMessage;
#endif
+#ifndef TYPEDEF_MESSAGE
+#define TYPEDEF_MESSAGE
+typedef struct Message Message;
+#endif
+
struct MessageQueueItem
{
struct MessageQueueItem *next;
@@ -165,6 +170,12 @@ struct GlobalContext
void *platform_data;
};
+enum SendMessageResult
+{
+ SEND_MESSAGE_OK = 0,
+ SEND_MESSAGE_PROCESS_NOT_FOUND = 1
+};
+
/**
* @brief Creates a new GlobalContext
*
@@ -250,6 +261,18 @@ void globalcontext_send_message(GlobalContext *glb, int32_t process_id, term t);
*/
void globalcontext_send_message_nolock(GlobalContext *glb, int32_t process_id, term t);
+/**
+ * @brief Post a mailbox message to a process identified by its id.
+ * @details This function is only used by enif_select_read/enif_select_write to
+ * post a message that is built before.
+ *
+ * @param glb the global context (that owns the process table).
+ * @param process_id the local process id.
+ * @param m the mailbox message to send.
+ * @return SEND_MESSAGE_OK if the message was sent (and ownership transfered).
+ */
+enum SendMessageResult globalcontext_post_message(GlobalContext *glb, int32_t process_id, Message *m);
+
#ifdef AVM_TASK_DRIVER_ENABLED
/**
* @brief Send a message to a process identified by its id. This variant is to
@@ -267,6 +290,19 @@ void globalcontext_send_message_nolock(GlobalContext *glb, int32_t process_id, t
*/
void globalcontext_send_message_from_task(GlobalContext *glb, int32_t process_id, enum MessageType type, term t);
+/**
+ * @brief Post a mailbox message to a process identified by its id. Variant
+ * to be used from task drivers.
+ * @details This function is only used by enif_select_read/enif_select_write to
+ * post a message that is built before.
+ *
+ * @param glb the global context (that owns the process table).
+ * @param process_id the local process id.
+ * @param m the mailbox message to send.
+ * @return SEND_MESSAGE_OK if the message was sent (and ownership transfered).
+ */
+enum SendMessageResult globalcontext_post_message_from_task(GlobalContext *glb, int32_t process_id, Message *m);
+
/**
* @brief Enqueue a refc binary from a task driver, to be refc decremented
* later from the scheduler.
diff --git a/src/libAtomVM/inet.c b/src/libAtomVM/inet.c
index e3a858fd3..54227362e 100644
--- a/src/libAtomVM/inet.c
+++ b/src/libAtomVM/inet.c
@@ -44,6 +44,11 @@ enum inet_type inet_atom_to_type(term type, GlobalContext *global)
return interop_atom_term_select_int(inet_type_table, type, global);
}
+term inet_type_to_atom(enum inet_type type, GlobalContext *global)
+{
+ return interop_atom_term_select_atom(inet_type_table, (int) type, global);
+}
+
static const AtomStringIntPair inet_protocol_table[] = {
{ ATOM_STR("\x2", "ip"), InetIpProtocol },
{ ATOM_STR("\x3", "tcp"), InetTcpProtocol },
diff --git a/src/libAtomVM/inet.h b/src/libAtomVM/inet.h
index e422bcd50..1b2490bc7 100644
--- a/src/libAtomVM/inet.h
+++ b/src/libAtomVM/inet.h
@@ -59,6 +59,14 @@ enum inet_type
*/
enum inet_type inet_atom_to_type(term type, GlobalContext *global);
+/**
+ * @brief Convert an inet type to an atom
+ * @param type the inet type
+ * @param global the global context
+ * @returns an atom representing the inet type
+ */
+term inet_type_to_atom(enum inet_type type, GlobalContext *global);
+
enum inet_protocol
{
InetInvalidProtocol = 0,
diff --git a/src/libAtomVM/mailbox.c b/src/libAtomVM/mailbox.c
index 9d78bb1db..bcd75a75b 100644
--- a/src/libAtomVM/mailbox.c
+++ b/src/libAtomVM/mailbox.c
@@ -126,6 +126,16 @@ void mailbox_message_dispose(MailboxMessage *m, Heap *heap)
}
}
+// Dispose message. Normal / signal messages are not destroyed, instead they
+// are appended to the current heap.
+void mailbox_message_dispose_unsent(Message *m, GlobalContext *global, bool from_task)
+{
+ term mso_list = m->storage[STORAGE_MSO_LIST_INDEX];
+ HeapFragment *fragment = mailbox_message_to_heap_fragment(m, m->heap_end);
+ memory_sweep_mso_list(mso_list, global, from_task);
+ memory_destroy_heap_fragment(fragment);
+}
+
void mailbox_destroy(Mailbox *mbox, Heap *heap)
{
MailboxMessage *msg = mbox->outer_first;
@@ -191,13 +201,13 @@ inline void mailbox_enqueue_message(Context *c, MailboxMessage *m)
} while (!ATOMIC_COMPARE_EXCHANGE_WEAK_PTR(&c->mailbox.outer_first, ¤t_first, m));
}
-static void mailbox_post_message(Context *c, MailboxMessage *m)
+void mailbox_post_message(Context *c, MailboxMessage *m)
{
mailbox_enqueue_message(c, m);
scheduler_signal_message(c);
}
#else
-static void mailbox_post_message(Context *c, MailboxMessage *m)
+void mailbox_post_message(Context *c, MailboxMessage *m)
{
m->next = c->mailbox.outer_first;
c->mailbox.outer_first = m;
@@ -231,6 +241,12 @@ MailboxMessage *mailbox_message_create_from_term(enum MessageType type, term t)
}
}
+Message *mailbox_message_create_normal_message_from_term(term t)
+{
+ MailboxMessage *message = mailbox_message_create_from_term(NormalMessage, t);
+ return CONTAINER_OF(message, Message, base);
+}
+
void mailbox_send(Context *c, term t)
{
MailboxMessage *msg = mailbox_message_create_from_term(NormalMessage, t);
diff --git a/src/libAtomVM/mailbox.h b/src/libAtomVM/mailbox.h
index 64d53a6f4..68fb92ae1 100644
--- a/src/libAtomVM/mailbox.h
+++ b/src/libAtomVM/mailbox.h
@@ -57,6 +57,11 @@ struct Context;
typedef struct Context Context;
#endif
+#ifndef TYPEDEF_GLOBALCONTEXT
+#define TYPEDEF_GLOBALCONTEXT
+typedef struct GlobalContext GlobalContext;
+#endif
+
struct Heap;
#ifndef TYPEDEF_HEAP
@@ -69,7 +74,10 @@ typedef struct Heap Heap;
typedef struct MailboxMessage MailboxMessage;
#endif
+#ifndef TYPEDEF_MESSAGE
+#define TYPEDEF_MESSAGE
typedef struct Message Message;
+#endif
enum MessageType
{
@@ -238,6 +246,17 @@ void mailbox_send_ref_signal(Context *c, enum MessageType type, uint64_t ref_tic
*/
void mailbox_send_empty_body_signal(Context *c, enum MessageType type);
+/**
+ * @brief Post a message.
+ *
+ * @details Post a message to a given context. Context gets ownership of the
+ * created message.
+ *
+ * @param c the process context.
+ * @param m the mailbox message to send
+ */
+void mailbox_post_message(Context *c, MailboxMessage *m);
+
#ifdef AVM_TASK_DRIVER_ENABLED
/**
* @brief Enqueue message
@@ -341,6 +360,23 @@ MailboxMessage *mailbox_message_create_from_term(enum MessageType type, term t);
*/
void mailbox_message_dispose(MailboxMessage *m, Heap *heap);
+/**
+ * @brief Allocate and serialize a term to a normal message.
+ *
+ * @details Can be called from a task or even ISR (provided malloc works).
+ * @param t the term that will be sent
+ */
+Message *mailbox_message_create_normal_message_from_term(term t);
+
+/**
+ * @brief Dispose an unsent normal message. The message will be freed.
+ *
+ * @param m the message to dispose.
+ * @param global the global context
+ * @param from_task boolean, true if called from a task, false otherwise
+ */
+void mailbox_message_dispose_unsent(Message *m, GlobalContext *global, bool from_task);
+
/**
* @brief Remove next message from mailbox.
*
diff --git a/src/libAtomVM/otp_socket.c b/src/libAtomVM/otp_socket.c
index 1b76a155e..612537935 100644
--- a/src/libAtomVM/otp_socket.c
+++ b/src/libAtomVM/otp_socket.c
@@ -33,6 +33,7 @@
#include
#include
#include
+#include
#include
#include
#include
@@ -148,10 +149,14 @@ static void udp_recv_cb(void *arg, struct udp_pcb *pcb, struct pbuf *p, const ip
struct SocketResource
{
int fd;
- uint64_t ref_ticks;
+ uint64_t socket_ref_ticks;
+ uint64_t select_ref_ticks;
int32_t selecting_process_id;
ErlNifMonitor selecting_process_monitor;
size_t buf_size;
+#ifndef AVM_NO_SMP
+ RWLock *socket_lock;
+#endif
};
#elif OTP_SOCKET_LWIP
struct SocketResource
@@ -162,7 +167,8 @@ struct SocketResource
struct tcp_pcb *tcp_pcb;
struct udp_pcb *udp_pcb;
};
- uint64_t ref_ticks;
+ uint64_t socket_ref_ticks;
+ uint64_t select_ref_ticks;
int32_t selecting_process_id; // trapped or selecting
ErlNifMonitor selecting_process_monitor;
bool linger_on;
@@ -170,6 +176,9 @@ struct SocketResource
size_t pos;
struct ListHead received_list;
size_t buf_size;
+#ifndef AVM_NO_SMP
+ RWLock *socket_lock;
+#endif
};
#endif
@@ -183,6 +192,7 @@ static const char *const onoff_atom = ATOM_STR("\x5", "onoff");
static const char *const port_atom = ATOM_STR("\x4", "port");
static const char *const rcvbuf_atom = ATOM_STR("\x6", "rcvbuf");
static const char *const reuseaddr_atom = ATOM_STR("\x9", "reuseaddr");
+static const char *const type_atom = ATOM_STR("\x4", "type");
#define CLOSED_FD 0
@@ -227,6 +237,9 @@ static const AtomStringIntPair otp_socket_setopt_level_table[] = {
static ErlNifResourceType *socket_resource_type;
+#define SOCKET_MAKE_SELECT_NOTIFICATION_SIZE (TUPLE_SIZE(4) + REF_SIZE + TUPLE_SIZE(2) + REF_SIZE + TERM_BOXED_RESOURCE_SIZE)
+static term socket_make_select_notification(struct SocketResource *rsrc_obj, Heap *heap);
+
//
// resource operations
//
@@ -261,6 +274,9 @@ static void socket_dtor(ErlNifEnv *caller_env, void *obj)
}
LWIP_END();
#endif
+#ifndef AVM_NO_SMP
+ smp_rwlock_destroy(rsrc_obj->socket_lock);
+#endif
}
#if OTP_SOCKET_BSD
@@ -295,11 +311,18 @@ static void socket_down(ErlNifEnv *caller_env, void *obj, ErlNifPid *pid, ErlNif
TRACE("socket_down called on process_id=%i\n", (int) *pid);
#endif
+ // Increment the reference count so the resource doesn't go away
+ // (enif_select will decrement the ref count)
+ struct RefcBinary *rsrc_refc = refc_binary_from_data(rsrc_obj);
+ refc_binary_increment_refcount(rsrc_refc);
+ SMP_RWLOCK_WRLOCK(rsrc_obj->socket_lock);
+
#if OTP_SOCKET_BSD
if (rsrc_obj->selecting_process_id != INVALID_PROCESS_ID) {
// Monitor fired, so make sure we don't try to demonitor in select_stop
// as it could crash trying to reacquire lock on process table
rsrc_obj->selecting_process_id = INVALID_PROCESS_ID;
+ refc_binary_increment_refcount(rsrc_refc);
enif_select(caller_env, rsrc_obj->fd, ERL_NIF_SELECT_STOP, rsrc_obj, NULL, term_nil());
}
#elif OTP_SOCKET_LWIP
@@ -323,6 +346,9 @@ static void socket_down(ErlNifEnv *caller_env, void *obj, ErlNifPid *pid, ErlNif
LWIP_END();
}
#endif
+
+ SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock);
+ refc_binary_decrement_refcount(rsrc_refc, caller_env->global);
}
static const ErlNifResourceTypeInit SocketResourceTypeInit = {
@@ -336,12 +362,40 @@ static const ErlNifResourceTypeInit SocketResourceTypeInit = {
.down = socket_down,
};
+// Make a notification message, using SOCKET_MAKE_SELECT_NOTIFICATION_SIZE on heap
+static term socket_make_select_notification(struct SocketResource *rsrc_obj, Heap *heap)
+{
+ term notification = term_alloc_tuple(4, heap);
+ term_put_tuple_element(notification, 0, DOLLAR_SOCKET_ATOM);
+ term socket_tuple = term_alloc_tuple(2, heap);
+ term_put_tuple_element(socket_tuple, 0, term_from_resource(rsrc_obj, heap));
+ struct RefcBinary *rsrc_refc = refc_binary_from_data(rsrc_obj);
+ refc_binary_increment_refcount(rsrc_refc);
+ term socket_ref;
+ if (rsrc_obj->socket_ref_ticks == 0) {
+ socket_ref = UNDEFINED_ATOM;
+ } else {
+ socket_ref = term_from_ref_ticks(rsrc_obj->socket_ref_ticks, heap);
+ }
+ term_put_tuple_element(socket_tuple, 1, socket_ref);
+ term_put_tuple_element(notification, 1, socket_tuple);
+ term_put_tuple_element(notification, 2, SELECT_ATOM);
+ term select_ref;
+ if (rsrc_obj->select_ref_ticks == 0) {
+ select_ref = UNDEFINED_ATOM;
+ } else {
+ select_ref = term_from_ref_ticks(rsrc_obj->select_ref_ticks, heap);
+ }
+ term_put_tuple_element(notification, 3, select_ref);
+ return notification;
+}
+
// select emulation for lwIP that doesn't have select.
#if OTP_SOCKET_LWIP
static void select_event_send_notification_from_nif(struct SocketResource *rsrc_obj, Context *locked_ctx)
{
- BEGIN_WITH_STACK_HEAP(SELECT_EVENT_NOTIFICATION_SIZE, heap)
- term notification = select_event_make_notification(rsrc_obj, rsrc_obj->ref_ticks, false, &heap);
+ BEGIN_WITH_STACK_HEAP(SOCKET_MAKE_SELECT_NOTIFICATION_SIZE, heap)
+ term notification = socket_make_select_notification(rsrc_obj, &heap);
mailbox_send(locked_ctx, notification);
END_WITH_STACK_HEAP(heap, locked_ctx->global)
}
@@ -350,8 +404,8 @@ static void select_event_send_notification_from_handler(struct SocketResource *r
{
struct RefcBinary *rsrc_refc = refc_binary_from_data(rsrc_obj);
GlobalContext *global = rsrc_refc->resource_type->global;
- BEGIN_WITH_STACK_HEAP(SELECT_EVENT_NOTIFICATION_SIZE, heap)
- term notification = select_event_make_notification(rsrc_obj, rsrc_obj->ref_ticks, false, &heap);
+ BEGIN_WITH_STACK_HEAP(SOCKET_MAKE_SELECT_NOTIFICATION_SIZE, heap)
+ term notification = socket_make_select_notification(rsrc_obj, &heap);
globalcontext_send_message(global, process_id, notification);
END_WITH_STACK_HEAP(heap, global)
}
@@ -502,6 +556,13 @@ static term nif_socket_open(Context *ctx, int argc, term argv[])
RAISE_ERROR(OUT_OF_MEMORY_ATOM);
}
+#ifndef AVM_NO_SMP
+ rsrc_obj->socket_lock = smp_rwlock_create();
+ if (IS_NULL_PTR(rsrc_obj->socket_lock)) {
+ free(rsrc_obj);
+ RAISE_ERROR(OUT_OF_MEMORY_ATOM);
+ }
+#endif
#if OTP_SOCKET_BSD
rsrc_obj->fd = socket(domain, type, protocol);
if (UNLIKELY(rsrc_obj->fd == -1 || rsrc_obj->fd == CLOSED_FD)) {
@@ -512,6 +573,14 @@ static term nif_socket_open(Context *ctx, int argc, term argv[])
TRACE("nif_socket_open: Created socket fd=%i\n", rsrc_obj->fd);
rsrc_obj->selecting_process_id = INVALID_PROCESS_ID;
+ if (type != SOCK_STREAM) {
+ // TCP sockets are made non-blocking after connect, for now.
+ if (UNLIKELY(fcntl(rsrc_obj->fd, F_SETFL, O_NONBLOCK) != 0)) {
+ AVM_LOGE(TAG, "Unable to configure fd=%d to be non blocking.", rsrc_obj->fd);
+ return make_errno_tuple(ctx);
+ }
+ }
+
#elif OTP_SOCKET_LWIP
if (domain == PF_INET && type == SOCK_STREAM && protocol == IPPROTO_TCP) {
LWIP_BEGIN();
@@ -565,6 +634,7 @@ static term nif_socket_open(Context *ctx, int argc, term argv[])
term socket_term = term_alloc_tuple(2, &ctx->heap);
uint64_t ref_ticks = globalcontext_get_ref_ticks(ctx->global);
+ rsrc_obj->socket_ref_ticks = ref_ticks;
term ref = term_from_ref_ticks(ref_ticks, &ctx->heap);
term_put_tuple_element(socket_term, 0, obj);
term_put_tuple_element(socket_term, 1, ref);
@@ -604,21 +674,27 @@ bool term_is_otp_socket(term socket_term)
// close
//
-static int send_closed_notification(Context *ctx, struct SocketResource *rsrc_obj)
+static int send_closed_notification(Context *ctx, term socket_term, int32_t selecting_process_id, struct SocketResource *rsrc_obj)
{
- // send a {closed, Ref | undefined} message to the pid
- if (UNLIKELY(memory_ensure_free(ctx, TUPLE_SIZE(2) + REF_SIZE) != MEMORY_GC_OK)) {
+ // send a {'$socket', Socket, abort, {Ref | undefined, closed}} message to the pid
+ if (UNLIKELY(memory_ensure_free_with_roots(ctx, TUPLE_SIZE(4) + TUPLE_SIZE(2) + REF_SIZE, 1, &socket_term, MEMORY_CAN_SHRINK) != MEMORY_GC_OK)) {
AVM_LOGW(TAG, "Failed to allocate memory: %s:%i.", __FILE__, __LINE__);
return -1;
}
+ term socket_tuple = term_alloc_tuple(4, &ctx->heap);
+ term_put_tuple_element(socket_tuple, 0, DOLLAR_SOCKET_ATOM);
+ term_put_tuple_element(socket_tuple, 1, socket_term);
+ term_put_tuple_element(socket_tuple, 2, ABORT_ATOM);
+
term error_tuple = term_alloc_tuple(2, &ctx->heap);
- term_put_tuple_element(error_tuple, 0, CLOSED_ATOM);
- term ref = (rsrc_obj->ref_ticks == 0) ? UNDEFINED_ATOM : term_from_ref_ticks(rsrc_obj->ref_ticks, &ctx->heap);
- term_put_tuple_element(error_tuple, 1, ref);
+ term ref = (rsrc_obj->select_ref_ticks == 0) ? UNDEFINED_ATOM : term_from_ref_ticks(rsrc_obj->select_ref_ticks, &ctx->heap);
+ term_put_tuple_element(error_tuple, 0, ref);
+ term_put_tuple_element(error_tuple, 1, CLOSED_ATOM);
+ term_put_tuple_element(socket_tuple, 3, error_tuple);
- TRACE("nif_socket_close: Sending msg to process %i, rsrc_obj = %p\n", (int) rsrc_obj->selecting_process_id, (void *) rsrc_obj);
- globalcontext_send_message(ctx->global, rsrc_obj->selecting_process_id, error_tuple);
+ TRACE("nif_socket_close: Sending msg to process %i, rsrc_obj = %p\n", (int) selecting_process_id, (void *) rsrc_obj);
+ globalcontext_send_message(ctx->global, selecting_process_id, socket_tuple);
return 0;
}
@@ -641,53 +717,61 @@ static term nif_socket_close(Context *ctx, int argc, term argv[])
if (UNLIKELY(!term_to_otp_socket(argv[0], &rsrc_obj, ctx))) {
RAISE_ERROR(BADARG_ATOM);
}
+
+ SMP_RWLOCK_WRLOCK(rsrc_obj->socket_lock);
+
#if OTP_SOCKET_BSD
if (rsrc_obj->fd) {
+ // In POSIX with BSD sockets, if a file descriptor being monitored by
+ // select() is closed in another thread, the result is unspecified.
+ // select may continue.
+ //
+ // However, in Erlang, asynchronous sockets support closing from another
+ // process, as documented in specification of the abort message.
+
+ // So we handle closing a socket while another process is selecting
if (rsrc_obj->selecting_process_id != INVALID_PROCESS_ID) {
- //
- // If we are in select, then stop selecting
- //
+ // Save process id as socket_stop may be called by enif_select.
+ int32_t selecting_process_id = rsrc_obj->selecting_process_id;
+ // Stop selecting.
int stop_res = enif_select(erl_nif_env_from_context(ctx), rsrc_obj->fd, ERL_NIF_SELECT_STOP, rsrc_obj, NULL, term_nil());
if (UNLIKELY(stop_res < 0)) {
+ SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock);
RAISE_ERROR(BADARG_ATOM);
}
- // TODO: check if stop_res & ERL_NIF_SELECT_STOP_CALLED or ERL_NIF_SELECT_STOP_SCHEDULED
- // following what OTP does. Indeed, if we have ERL_NIF_SELECT_STOP_SCHEDULED, we should not close the socket now
- //
- // If there is a process (other than ourself) who is waiting on select
- // the send a {closed, Ref} message to it, so that it can break
+ // If there is a selecting process who may be waiting on select,
+ // send a closed notification to it, so that it can break
// out of its receive statement.
//
- if (rsrc_obj->selecting_process_id != ctx->process_id) {
-
- // send a {closed, Ref | undefined} message to the pid
- if (UNLIKELY(send_closed_notification(ctx, rsrc_obj) < 0)) {
+ // When using asynchronous API, the selecting process can be the
+ // calling process. In this case we don't send any notification.
+ //
+ if (selecting_process_id != ctx->process_id) {
+ // send a {'$socket', Socket, abort, {Ref | undefined, closed}} message to the pid
+ if (UNLIKELY(send_closed_notification(ctx, argv[0], selecting_process_id, rsrc_obj) < 0)) {
+ SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock);
RAISE_ERROR(OUT_OF_MEMORY_ATOM);
}
- } else {
- AVM_LOGW(TAG, "Selectable socket %i was closed but no known pid is waiting. This shouldn't happen.", rsrc_obj->fd);
}
}
+ // Eventually close the socket
int res = close(rsrc_obj->fd);
if (UNLIKELY(res != 0)) {
AVM_LOGW(TAG, "Failed to close socket %i", res);
}
-
- TRACE("nif_socket_close: Clearing pid for socket fd=%i\n", rsrc_obj->fd);
rsrc_obj->fd = CLOSED_FD;
- rsrc_obj->selecting_process_id = INVALID_PROCESS_ID;
- rsrc_obj->ref_ticks = 0;
} else {
TRACE("Double close on socket fd %i", rsrc_obj->fd);
}
#elif OTP_SOCKET_LWIP
- // If the socket is being selected by another process, send a closed tuple.
+ // If the socket is being selected by another process, send a closed notification.
if (rsrc_obj->socket_state & SocketStateSelectingRead
&& rsrc_obj->selecting_process_id != INVALID_PROCESS_ID
&& rsrc_obj->selecting_process_id != ctx->process_id) {
- // send a {closed, Ref | undefined} message to the pid
- if (UNLIKELY(send_closed_notification(ctx, rsrc_obj) < 0)) {
+ // send a {'$socket', Socket, abort, {Ref | undefined, closed}} message to the pid
+ if (UNLIKELY(send_closed_notification(ctx, argv[0], rsrc_obj->selecting_process_id, rsrc_obj) < 0)) {
+ SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock);
RAISE_ERROR(OUT_OF_MEMORY_ATOM);
}
}
@@ -721,7 +805,8 @@ static term nif_socket_close(Context *ctx, int argc, term argv[])
}
#endif
- rsrc_obj->buf_size = DEFAULT_BUFFER_SIZE;
+
+ SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock);
return OK_ATOM;
}
@@ -744,6 +829,13 @@ static struct SocketResource *make_accepted_socket_resource(struct tcp_pcb *newp
conn_rsrc_obj->linger_on = false;
conn_rsrc_obj->linger_sec = 0;
conn_rsrc_obj->buf_size = DEFAULT_BUFFER_SIZE;
+#ifndef AVM_NO_SMP
+ conn_rsrc_obj->socket_lock = smp_rwlock_create();
+ if (IS_NULL_PTR(conn_rsrc_obj->socket_lock)) {
+ free(conn_rsrc_obj);
+ return NULL;
+ }
+#endif
list_init(&conn_rsrc_obj->received_list);
tcp_arg(newpcb, conn_rsrc_obj);
@@ -905,6 +997,8 @@ static term nif_socket_select_read(Context *ctx, int argc, term argv[])
RAISE_ERROR(BADARG_ATOM);
}
+ SMP_RWLOCK_WRLOCK(rsrc_obj->socket_lock);
+
ErlNifEnv *env = erl_nif_env_from_context(ctx);
if (rsrc_obj->selecting_process_id != ctx->process_id && rsrc_obj->selecting_process_id != INVALID_PROCESS_ID) {
// demonitor can fail if process is gone.
@@ -915,24 +1009,36 @@ static term nif_socket_select_read(Context *ctx, int argc, term argv[])
// if select fails than to stop select if monitor fails
if (rsrc_obj->selecting_process_id != ctx->process_id) {
if (UNLIKELY(enif_monitor_process(env, rsrc_obj, &ctx->process_id, &rsrc_obj->selecting_process_monitor) != 0)) {
+ SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock);
RAISE_ERROR(NOPROC_ATOM);
}
rsrc_obj->selecting_process_id = ctx->process_id;
}
- rsrc_obj->ref_ticks = (select_ref_term == UNDEFINED_ATOM) ? 0 : term_to_ref_ticks(select_ref_term);
+ rsrc_obj->select_ref_ticks = (select_ref_term == UNDEFINED_ATOM) ? 0 : term_to_ref_ticks(select_ref_term);
#if OTP_SOCKET_BSD
TRACE("rsrc_obj->fd=%i\n", (int) rsrc_obj->fd);
- if (UNLIKELY(enif_select(erl_nif_env_from_context(ctx), rsrc_obj->fd, ERL_NIF_SELECT_READ, rsrc_obj, &ctx->process_id, select_ref_term) < 0)) {
- enif_demonitor_process(env, rsrc_obj, &rsrc_obj->selecting_process_monitor);
- rsrc_obj->selecting_process_id = INVALID_PROCESS_ID;
- RAISE_ERROR(BADARG_ATOM);
+ // The socket may be closed here.
+ if (rsrc_obj->fd == CLOSED_FD) {
+ send_closed_notification(ctx, argv[0], ctx->process_id, rsrc_obj);
+ } else {
+ // Ensure the resource we're working on isn't garbage collected now.
+ if (UNLIKELY(memory_ensure_free_with_roots(ctx, SOCKET_MAKE_SELECT_NOTIFICATION_SIZE, 2, argv, MEMORY_CAN_SHRINK) != MEMORY_GC_OK)) {
+ AVM_LOGW(TAG, "Failed to allocate memory: %s:%i.", __FILE__, __LINE__);
+ SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock);
+ RAISE_ERROR(OUT_OF_MEMORY_ATOM);
+ }
+ term notification = socket_make_select_notification(rsrc_obj, &ctx->heap);
+ if (UNLIKELY(enif_select_read(erl_nif_env_from_context(ctx), rsrc_obj->fd, rsrc_obj, &ctx->process_id, notification, NULL) < 0)) {
+ enif_demonitor_process(env, rsrc_obj, &rsrc_obj->selecting_process_monitor);
+ rsrc_obj->selecting_process_id = INVALID_PROCESS_ID;
+ SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock);
+ RAISE_ERROR(BADARG_ATOM);
+ }
}
- TRACE("nif_socket_select: Setting pid for socket fd %i to %i\n", (int) rsrc_obj->fd, (int) ctx->process_id);
-
#elif OTP_SOCKET_LWIP
LWIP_BEGIN();
switch (rsrc_obj->socket_state) {
@@ -974,11 +1080,13 @@ static term nif_socket_select_read(Context *ctx, int argc, term argv[])
default:
enif_demonitor_process(env, rsrc_obj, &rsrc_obj->selecting_process_monitor);
LWIP_END();
+ SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock);
RAISE_ERROR(BADARG_ATOM);
}
LWIP_END();
#endif
+ SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock);
return OK_ATOM;
}
@@ -993,21 +1101,112 @@ static term nif_socket_select_stop(Context *ctx, int argc, term argv[])
if (UNLIKELY(!term_to_otp_socket(argv[0], &rsrc_obj, ctx))) {
RAISE_ERROR(BADARG_ATOM);
}
+ // Avoid the race condition with select object here.
+ SMP_RWLOCK_WRLOCK(rsrc_obj->socket_lock);
+ rsrc_obj->selecting_process_id = INVALID_PROCESS_ID;
#if OTP_SOCKET_BSD
if (UNLIKELY(enif_select(erl_nif_env_from_context(ctx), rsrc_obj->fd, ERL_NIF_SELECT_STOP, rsrc_obj, NULL, term_nil()) < 0)) {
RAISE_ERROR(BADARG_ATOM);
}
-
- return OK_ATOM;
#elif OTP_SOCKET_LWIP
LWIP_BEGIN();
if (rsrc_obj->socket_state & SocketStateSelectingRead) {
rsrc_obj->socket_state &= ~SocketStateSelectingRead;
}
LWIP_END();
+#endif
+ SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock);
return OK_ATOM;
+}
+
+//
+// getopt
+//
+
+static term nif_socket_getopt(Context *ctx, int argc, term argv[])
+{
+ TRACE("nif_socket_getopt\n");
+ UNUSED(argc);
+
+ VALIDATE_VALUE(argv[0], term_is_otp_socket);
+
+ GlobalContext *global = ctx->global;
+
+ struct SocketResource *rsrc_obj;
+ if (UNLIKELY(!term_to_otp_socket(argv[0], &rsrc_obj, ctx))) {
+ RAISE_ERROR(BADARG_ATOM);
+ }
+
+ SMP_RWLOCK_RDLOCK(rsrc_obj->socket_lock);
+
+#if OTP_SOCKET_BSD
+ if (rsrc_obj->fd == 0) {
+#elif OTP_SOCKET_LWIP
+ if (rsrc_obj->socket_state == SocketStateClosed) {
+#endif
+ SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock);
+ return make_error_tuple(CLOSED_ATOM, ctx);
+ }
+ term level_tuple = argv[1];
+
+ term level = term_get_tuple_element(level_tuple, 0);
+ int level_val = interop_atom_term_select_int(otp_socket_setopt_level_table, level, global);
+ switch (level_val) {
+ case OtpSocketSetoptLevelSocket: {
+ term opt = term_get_tuple_element(level_tuple, 1);
+ if (globalcontext_is_term_equal_to_atom_string(global, opt, type_atom)) {
+ enum inet_type type;
+#if OTP_SOCKET_BSD
+ int option_value;
+ socklen_t option_len = sizeof(option_value);
+ int res = getsockopt(rsrc_obj->fd, SOL_SOCKET, SO_TYPE, &option_value, &option_len);
+ if (UNLIKELY(res != 0)) {
+ SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock);
+ return make_errno_tuple(ctx);
+ } else {
+ switch (option_value) {
+ case SOCK_STREAM:
+ type = InetStreamType;
+ break;
+ case SOCK_DGRAM:
+ type = InetDgramType;
+ break;
+ default:
+ SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock);
+ RAISE_ERROR(BADARG_ATOM);
+ }
+ }
+#elif OTP_SOCKET_LWIP
+ LWIP_BEGIN();
+ if (rsrc_obj->socket_state & SocketStateTCP) {
+ type = InetStreamType;
+ } else {
+ type = InetDgramType;
+ }
+ LWIP_END();
#endif
+ if (UNLIKELY(memory_ensure_free_with_roots(ctx, TUPLE_SIZE(2), 1, argv, MEMORY_CAN_SHRINK) != MEMORY_GC_OK)) {
+ AVM_LOGW(TAG, "Failed to allocate memory: %s:%i.", __FILE__, __LINE__);
+ SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock);
+ RAISE_ERROR(OUT_OF_MEMORY_ATOM);
+ }
+ term result = term_alloc_tuple(2, &ctx->heap);
+ term_put_tuple_element(result, 0, OK_ATOM);
+ term_put_tuple_element(result, 1, inet_type_to_atom(type, global));
+ SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock);
+ return result;
+ } else {
+ SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock);
+ RAISE_ERROR(BADARG_ATOM);
+ }
+ }
+ default: {
+ AVM_LOGE(TAG, "socket:getopt: Unsupported level");
+ SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock);
+ RAISE_ERROR(BADARG_ATOM);
+ }
+ }
}
//
@@ -1028,12 +1227,15 @@ static term nif_socket_setopt(Context *ctx, int argc, term argv[])
RAISE_ERROR(BADARG_ATOM);
}
+ SMP_RWLOCK_RDLOCK(rsrc_obj->socket_lock);
+
#if OTP_SOCKET_BSD
if (rsrc_obj->fd == 0) {
#elif OTP_SOCKET_LWIP
if (rsrc_obj->socket_state == SocketStateClosed) {
#endif
- return make_error_tuple(posix_errno_to_term(EBADF, global), ctx);
+ SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock);
+ return make_error_tuple(CLOSED_ATOM, ctx);
}
term level_tuple = argv[1];
term value = argv[2];
@@ -1049,6 +1251,7 @@ static term nif_socket_setopt(Context *ctx, int argc, term argv[])
int option_value = (value == TRUE_ATOM);
#if OTP_SOCKET_BSD
int res = setsockopt(rsrc_obj->fd, SOL_SOCKET, SO_REUSEADDR, &option_value, sizeof(int));
+ SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock);
if (UNLIKELY(res != 0)) {
return make_errno_tuple(ctx);
} else {
@@ -1070,6 +1273,7 @@ static term nif_socket_setopt(Context *ctx, int argc, term argv[])
}
}
LWIP_END();
+ SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock);
return OK_ATOM;
#endif
} else if (globalcontext_is_term_equal_to_atom_string(global, opt, linger_atom)) {
@@ -1082,6 +1286,7 @@ static term nif_socket_setopt(Context *ctx, int argc, term argv[])
sl.l_onoff = (onoff == TRUE_ATOM);
sl.l_linger = term_to_int(linger);
int res = setsockopt(rsrc_obj->fd, SOL_SOCKET, SO_LINGER, &sl, sizeof(sl));
+ SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock);
if (UNLIKELY(res != 0)) {
return make_errno_tuple(ctx);
} else {
@@ -1090,6 +1295,7 @@ static term nif_socket_setopt(Context *ctx, int argc, term argv[])
#elif OTP_SOCKET_LWIP
rsrc_obj->linger_on = (onoff == TRUE_ATOM);
rsrc_obj->linger_sec = term_to_int(linger);
+ SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock);
return OK_ATOM;
#endif
// TODO add more as needed
@@ -1099,6 +1305,7 @@ static term nif_socket_setopt(Context *ctx, int argc, term argv[])
// AVM_LOGW(TAG, "Failed to set TCP_NODELAY.");
// }
} else {
+ SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock);
RAISE_ERROR(BADARG_ATOM);
}
@@ -1109,27 +1316,32 @@ static term nif_socket_setopt(Context *ctx, int argc, term argv[])
// TODO support the atom `default` as a value to roll back to the default buffer size
if (UNLIKELY(!term_is_integer(value))) {
- AVM_LOGE(TAG, "socket:setopt: otp rcvbuf value must be an integer");
+ TRACE("socket:setopt: otp rcvbuf value must be an integer");
+ SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock);
return make_error_tuple(globalcontext_make_atom(global, invalid_value_atom), ctx);
}
avm_int_t buf_size = term_to_int(value);
if (UNLIKELY(buf_size < 0)) {
- AVM_LOGE(TAG, "socket:setopt: otp rcvbuf value may not be negative");
+ TRACE("socket:setopt: otp rcvbuf value may not be negative");
+ SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock);
return make_error_tuple(globalcontext_make_atom(global, invalid_value_atom), ctx);
}
rsrc_obj->buf_size = (size_t) buf_size;
+ SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock);
return OK_ATOM;
} else {
- AVM_LOGE(TAG, "socket:setopt: Unsupported otp option");
+ TRACE("socket:setopt: Unsupported otp option");
+ SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock);
return make_error_tuple(globalcontext_make_atom(global, invalid_option_atom), ctx);
}
}
default: {
- AVM_LOGE(TAG, "socket:setopt: Unsupported level");
+ TRACE("socket:setopt: Unsupported level");
+ SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock);
RAISE_ERROR(BADARG_ATOM);
}
}
@@ -1153,11 +1365,14 @@ static term nif_socket_sockname(Context *ctx, int argc, term argv[])
if (UNLIKELY(!term_to_otp_socket(argv[0], &rsrc_obj, ctx))) {
RAISE_ERROR(BADARG_ATOM);
}
+ SMP_RWLOCK_RDLOCK(rsrc_obj->socket_lock);
+
#if OTP_SOCKET_BSD
if (rsrc_obj->fd == 0) {
#elif OTP_SOCKET_LWIP
if (rsrc_obj->socket_state == SocketStateClosed) {
#endif
+ SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock);
return make_error_tuple(posix_errno_to_term(EBADF, global), ctx);
}
@@ -1168,6 +1383,7 @@ static term nif_socket_sockname(Context *ctx, int argc, term argv[])
if (UNLIKELY(res != 0)) {
AVM_LOGE(TAG, "Unable to getsockname: fd=%i res=%i.", rsrc_obj->fd, res);
+ SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock);
return make_errno_tuple(ctx);
}
uint32_t ip4_u32 = ntohl(addr.sin_addr.s_addr);
@@ -1185,6 +1401,7 @@ static term nif_socket_sockname(Context *ctx, int argc, term argv[])
}
LWIP_END();
#endif
+ SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock);
// {ok, #{addr => {a,b,c,d}, port => integer()}}
if (UNLIKELY(memory_ensure_free(ctx, TUPLE_SIZE(2) + term_map_size_in_terms(2) + TUPLE_SIZE(4)) != MEMORY_GC_OK)) {
@@ -1221,19 +1438,24 @@ static term nif_socket_peername(Context *ctx, int argc, term argv[])
RAISE_ERROR(BADARG_ATOM);
}
+ SMP_RWLOCK_RDLOCK(rsrc_obj->socket_lock);
#if OTP_SOCKET_BSD
if (rsrc_obj->fd == 0) {
+ SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock);
return make_error_tuple(posix_errno_to_term(EBADF, global), ctx);
}
#elif OTP_SOCKET_LWIP
if (rsrc_obj->socket_state == SocketStateClosed) {
+ SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock);
return make_error_tuple(posix_errno_to_term(EBADF, global), ctx);
}
if (rsrc_obj->socket_state & SocketStateUDP) {
// TODO: handle "connected" UDP sockets
+ SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock);
return make_error_tuple(posix_errno_to_term(EOPNOTSUPP, global), ctx);
}
if ((rsrc_obj->socket_state & SocketStateTCPListening) == SocketStateTCPListening) {
+ SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock);
return make_error_tuple(posix_errno_to_term(ENOTCONN, global), ctx);
}
#endif
@@ -1245,14 +1467,17 @@ static term nif_socket_peername(Context *ctx, int argc, term argv[])
if (UNLIKELY(res != 0)) {
AVM_LOGE(TAG, "Unable to getpeername: fd=%i res=%i.", rsrc_obj->fd, res);
+ SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock);
return make_errno_tuple(ctx);
}
+ SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock);
uint32_t ip4_u32 = ntohl(addr.sin_addr.s_addr);
uint16_t port_u16 = ntohs(addr.sin_port);
#elif OTP_SOCKET_LWIP
// TODO: support peername for "connected" UDP sockets
uint32_t ip4_u32 = ntohl(ip_addr_get_ip4_u32(&rsrc_obj->tcp_pcb->remote_ip));
uint16_t port_u16 = rsrc_obj->tcp_pcb->remote_port;
+ SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock);
#endif
// {ok, #{addr => {a,b,c,d}, port => integer()}}
@@ -1289,13 +1514,17 @@ static term nif_socket_bind(Context *ctx, int argc, term argv[])
if (UNLIKELY(!term_to_otp_socket(argv[0], &rsrc_obj, ctx))) {
RAISE_ERROR(BADARG_ATOM);
}
+
+ SMP_RWLOCK_RDLOCK(rsrc_obj->socket_lock);
#if OTP_SOCKET_BSD
TRACE("rsrc_obj->fd=%i\n", (int) rsrc_obj->fd);
if (rsrc_obj->fd == 0) {
+ SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock);
return make_error_tuple(posix_errno_to_term(EBADF, global), ctx);
}
#elif OTP_SOCKET_LWIP
if (rsrc_obj->socket_state == SocketStateClosed) {
+ SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock);
return make_error_tuple(posix_errno_to_term(EBADF, global), ctx);
}
#endif
@@ -1353,6 +1582,7 @@ static term nif_socket_bind(Context *ctx, int argc, term argv[])
serveraddr.sin_port = htons(port_u16);
socklen_t address_len = sizeof(serveraddr);
int res = bind(rsrc_obj->fd, (struct sockaddr *) &serveraddr, address_len);
+ SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock);
if (UNLIKELY(res != 0)) {
AVM_LOGE(TAG, "Unable to bind socket: res=%i.", res);
return make_errno_tuple(ctx);
@@ -1366,6 +1596,7 @@ static term nif_socket_bind(Context *ctx, int argc, term argv[])
} else {
res = udp_bind(rsrc_obj->udp_pcb, &ip_addr, port_u16);
}
+ SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock);
if (UNLIKELY(res != ERR_OK)) {
AVM_LOGE(TAG, "Unable to bind socket: res=%i.", res);
return make_lwip_err_tuple(res, ctx);
@@ -1393,15 +1624,20 @@ static term nif_socket_listen(Context *ctx, int argc, term argv[])
if (UNLIKELY(!term_to_otp_socket(argv[0], &rsrc_obj, ctx))) {
RAISE_ERROR(BADARG_ATOM);
}
+
+ SMP_RWLOCK_RDLOCK(rsrc_obj->socket_lock);
#if OTP_SOCKET_BSD
if (rsrc_obj->fd == 0) {
+ SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock);
return make_error_tuple(posix_errno_to_term(EBADF, global), ctx);
}
#elif OTP_SOCKET_LWIP
if (rsrc_obj->socket_state == SocketStateClosed) {
+ SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock);
return make_error_tuple(posix_errno_to_term(EBADF, global), ctx);
}
if (rsrc_obj->socket_state & SocketStateUDP) {
+ SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock);
return make_error_tuple(posix_errno_to_term(EPROTOTYPE, global), ctx);
}
#endif
@@ -1410,6 +1646,7 @@ static term nif_socket_listen(Context *ctx, int argc, term argv[])
#if OTP_SOCKET_BSD
int res = listen(rsrc_obj->fd, backlog);
+ SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock);
if (UNLIKELY(res != 0)) {
AVM_LOGE(TAG, "Unable to listen on socket: res=%i.", res);
return make_errno_tuple(ctx);
@@ -1437,6 +1674,7 @@ static term nif_socket_listen(Context *ctx, int argc, term argv[])
tcp_accept(new_pcb, tcp_accept_cb);
rsrc_obj->tcp_pcb = new_pcb;
rsrc_obj->socket_state = SocketStateTCPListening;
+ SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock);
return OK_ATOM;
#endif
}
@@ -1452,6 +1690,7 @@ static term make_accepted_socket_term(struct SocketResource *conn_rsrc_obj, Heap
term socket_term = term_alloc_tuple(2, heap);
uint64_t ref_ticks = globalcontext_get_ref_ticks(global);
+ conn_rsrc_obj->socket_ref_ticks = ref_ticks;
term ref = term_from_ref_ticks(ref_ticks, heap);
term_put_tuple_element(socket_term, 0, obj);
term_put_tuple_element(socket_term, 1, ref);
@@ -1472,19 +1711,25 @@ static term nif_socket_accept(Context *ctx, int argc, term argv[])
if (UNLIKELY(!term_to_otp_socket(argv[0], &rsrc_obj, ctx))) {
RAISE_ERROR(BADARG_ATOM);
}
+
+ SMP_RWLOCK_RDLOCK(rsrc_obj->socket_lock);
#if OTP_SOCKET_BSD
if (rsrc_obj->fd == 0) {
+ SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock);
return make_error_tuple(posix_errno_to_term(EBADF, global), ctx);
}
#elif OTP_SOCKET_LWIP
if (rsrc_obj->socket_state & SocketStateClosed) {
+ SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock);
return make_error_tuple(posix_errno_to_term(EBADF, global), ctx);
}
if (rsrc_obj->socket_state & SocketStateUDP) {
+ SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock);
return make_error_tuple(posix_errno_to_term(EOPNOTSUPP, global), ctx);
}
// Only listening is allowed
if ((rsrc_obj->socket_state & SocketStateTCPListening) != SocketStateTCPListening) {
+ SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock);
return make_error_tuple(posix_errno_to_term(EINVAL, global), ctx);
}
#endif
@@ -1492,33 +1737,51 @@ static term nif_socket_accept(Context *ctx, int argc, term argv[])
#if OTP_SOCKET_BSD
struct sockaddr_in clientaddr;
socklen_t clientlen = sizeof(clientaddr);
+ if (UNLIKELY(fcntl(rsrc_obj->fd, F_SETFL, O_NONBLOCK) != 0)) {
+ AVM_LOGE(TAG, "Unable to configure fd=%d to be non blocking.", rsrc_obj->fd);
+ SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock);
+ return make_errno_tuple(ctx);
+ }
int fd = accept(rsrc_obj->fd, (struct sockaddr *) &clientaddr, &clientlen);
+ SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock);
if (UNLIKELY(fd == -1 || fd == CLOSED_FD)) {
AVM_LOGE(TAG, "Unable to accept on socket %i.", rsrc_obj->fd);
int err = errno;
term reason = (err == ECONNABORTED) ? CLOSED_ATOM : posix_errno_to_term(err, global);
return make_error_tuple(reason, ctx);
} else {
-
+ if (UNLIKELY(fcntl(fd, F_SETFL, O_NONBLOCK) != 0)) {
+ AVM_LOGE(TAG, "Unable to configure fd=%d to be non blocking.", fd);
+ close(fd);
+ return make_errno_tuple(ctx);
+ }
struct SocketResource *conn_rsrc_obj = enif_alloc_resource(socket_resource_type, sizeof(struct SocketResource));
conn_rsrc_obj->fd = fd;
conn_rsrc_obj->selecting_process_id = INVALID_PROCESS_ID;
conn_rsrc_obj->buf_size = DEFAULT_BUFFER_SIZE;
+#ifndef AVM_NO_SMP
+ conn_rsrc_obj->socket_lock = smp_rwlock_create();
+ if (IS_NULL_PTR(conn_rsrc_obj->socket_lock)) {
+ free(conn_rsrc_obj);
+ RAISE_ERROR(OUT_OF_MEMORY_ATOM);
+ }
+#endif
TRACE("nif_socket_accept: Created socket on accept fd=%i\n", rsrc_obj->fd);
- term obj = enif_make_resource(erl_nif_env_from_context(ctx), conn_rsrc_obj);
+ term new_resource = enif_make_resource(erl_nif_env_from_context(ctx), conn_rsrc_obj);
enif_release_resource(conn_rsrc_obj);
size_t requested_size = TUPLE_SIZE(2) + TUPLE_SIZE(2) + REF_SIZE;
- if (UNLIKELY(memory_ensure_free_with_roots(ctx, requested_size, 1, &obj, MEMORY_CAN_SHRINK) != MEMORY_GC_OK)) {
+ if (UNLIKELY(memory_ensure_free_with_roots(ctx, requested_size, 1, &new_resource, MEMORY_CAN_SHRINK) != MEMORY_GC_OK)) {
AVM_LOGW(TAG, "Failed to allocate memory: %s:%i.", __FILE__, __LINE__);
RAISE_ERROR(OUT_OF_MEMORY_ATOM);
}
term socket_term = term_alloc_tuple(2, &ctx->heap);
uint64_t ref_ticks = globalcontext_get_ref_ticks(ctx->global);
+ conn_rsrc_obj->socket_ref_ticks = ref_ticks;
term ref = term_from_ref_ticks(ref_ticks, &ctx->heap);
- term_put_tuple_element(socket_term, 0, obj);
+ term_put_tuple_element(socket_term, 0, new_resource);
term_put_tuple_element(socket_term, 1, ref);
term result = term_alloc_tuple(2, &ctx->heap);
@@ -1536,12 +1799,14 @@ static term nif_socket_accept(Context *ctx, int argc, term argv[])
if (IS_NULL_PTR(new_resource)) {
AVM_LOGW(TAG, "Failed to allocate memory: %s:%i.", __FILE__, __LINE__);
LWIP_END();
+ SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock);
RAISE_ERROR(OUT_OF_MEMORY_ATOM);
}
size_t requested_size = TERM_BOXED_RESOURCE_SIZE + TUPLE_SIZE(2) + TUPLE_SIZE(2) + REF_SIZE;
- if (UNLIKELY(memory_ensure_free(ctx, requested_size) != MEMORY_GC_OK)) {
+ if (UNLIKELY(memory_ensure_free_with_roots(ctx, requested_size, 1, argv, MEMORY_CAN_SHRINK) != MEMORY_GC_OK)) {
AVM_LOGW(TAG, "Failed to allocate memory: %s:%i.", __FILE__, __LINE__);
LWIP_END();
+ SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock);
RAISE_ERROR(OUT_OF_MEMORY_ATOM);
}
@@ -1555,9 +1820,12 @@ static term nif_socket_accept(Context *ctx, int argc, term argv[])
term_put_tuple_element(result, 1, socket_term);
} else {
// return EAGAIN
+ LWIP_END();
+ SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock);
return make_error_tuple(posix_errno_to_term(EAGAIN, ctx->global), ctx);
}
LWIP_END();
+ SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock);
return result;
#endif
}
@@ -1586,7 +1854,7 @@ static size_t copy_pbuf_data(struct pbuf *src, size_t offset, size_t count, uint
}
#endif
-ssize_t socket_recv(struct SocketResource *rsrc_obj, uint8_t *buf, size_t len, int flags, term *from, Heap *heap)
+static ssize_t do_socket_recv(struct SocketResource *rsrc_obj, uint8_t *buf, size_t len, int flags, term *from, Heap *heap)
{
#if OTP_SOCKET_BSD
//
@@ -1604,9 +1872,10 @@ ssize_t socket_recv(struct SocketResource *rsrc_obj, uint8_t *buf, size_t len, i
term address = inet_make_addr4(ntohl(addr.sin_addr.s_addr), heap);
term port_number = term_from_int(ntohs(addr.sin_port));
- term map = term_alloc_map(2, heap);
+ term map = term_alloc_map(3, heap);
term_set_map_assoc(map, 0, ADDR_ATOM, address);
- term_set_map_assoc(map, 1, PORT_ATOM, port_number);
+ term_set_map_assoc(map, 1, FAMILY_ATOM, INET_ATOM);
+ term_set_map_assoc(map, 2, PORT_ATOM, port_number);
*from = map;
} else {
res = recv(rsrc_obj->fd, buf, len, flags);
@@ -1691,7 +1960,7 @@ ssize_t socket_recv(struct SocketResource *rsrc_obj, uint8_t *buf, size_t len, i
term port_number = term_from_int(port_u16);
term map = term_alloc_map(2, heap);
- term_set_map_assoc(map, 0, globalcontext_make_atom(global, addr_atom), address);
+ term_set_map_assoc(map, 0, ADDR_ATOM, address);
term_set_map_assoc(map, 1, PORT_ATOM, port_number);
*from = map;
@@ -1706,8 +1975,16 @@ ssize_t socket_recv(struct SocketResource *rsrc_obj, uint8_t *buf, size_t len, i
#endif
}
+ssize_t socket_recv(struct SocketResource *rsrc_obj, uint8_t *buf, size_t len, int flags, term *from, Heap *heap)
+{
+ SMP_RWLOCK_RDLOCK(rsrc_obj->socket_lock);
+ ssize_t result = do_socket_recv(rsrc_obj, buf, len, flags, from, heap);
+ SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock);
+ return result;
+}
+
#if OTP_SOCKET_BSD
-static term nif_socket_recv_with_peek(Context *ctx, struct SocketResource *rsrc_obj, size_t len, bool is_recvfrom)
+static term nif_socket_recv_with_peek(Context *ctx, term resource_term, struct SocketResource *rsrc_obj, size_t len, bool is_recvfrom)
{
TRACE("nif_socket_recv_with_peek\n");
@@ -1718,6 +1995,12 @@ static term nif_socket_recv_with_peek(Context *ctx, struct SocketResource *rsrc_
TRACE("%li bytes available.\n", (long int) res);
if (res < 0) {
AVM_LOGI(TAG, "Unable to receive data on fd %i. errno=%i", rsrc_obj->fd, errno);
+ if (errno == EAGAIN) {
+ return make_error_tuple(TIMEOUT_ATOM, ctx);
+ } else if (errno == ECONNRESET) {
+ TRACE("Peer closed connection.");
+ return make_error_tuple(CLOSED_ATOM, ctx);
+ }
return make_errno_tuple(ctx);
} else if (res == 0) {
TRACE("Peer closed socket %i.\n", rsrc_obj->fd);
@@ -1732,7 +2015,8 @@ static term nif_socket_recv_with_peek(Context *ctx, struct SocketResource *rsrc_
size_t ensure_packet_avail = term_binary_data_size_in_terms(buffer_size) + BINARY_HEADER_SIZE;
size_t requested_size = TUPLE_SIZE(2) + ensure_packet_avail + (is_recvfrom ? (TUPLE_SIZE(2) + INET_ADDR4_TUPLE_SIZE + TERM_MAP_SIZE(2)) : 0);
- if (UNLIKELY(memory_ensure_free(ctx, requested_size) != MEMORY_GC_OK)) {
+ // Because resource is locked, we must ensure it's not garbage collected
+ if (UNLIKELY(memory_ensure_free_with_roots(ctx, requested_size, 1, &resource_term, MEMORY_CAN_SHRINK) != MEMORY_GC_OK)) {
AVM_LOGW(TAG, "Failed to allocate memory: %s:%i.\n", __FILE__, __LINE__);
RAISE_ERROR(OUT_OF_MEMORY_ATOM);
}
@@ -1765,7 +2049,7 @@ static term nif_socket_recv_with_peek(Context *ctx, struct SocketResource *rsrc_
}
}
-static term nif_socket_recv_without_peek(Context *ctx, struct SocketResource *rsrc_obj, size_t len, bool is_recvfrom)
+static term nif_socket_recv_without_peek(Context *ctx, term resource_term, struct SocketResource *rsrc_obj, size_t len, bool is_recvfrom)
{
TRACE("nif_socket_recv_without_peek\n");
@@ -1779,29 +2063,32 @@ static term nif_socket_recv_without_peek(Context *ctx, struct SocketResource *rs
RAISE_ERROR(OUT_OF_MEMORY_ATOM);
} else {
-
- term map = term_invalid_term();
+ term roots[2];
+ roots[0] = resource_term;
+ roots[1] = term_invalid_term();
if (is_recvfrom) {
- if (UNLIKELY(memory_ensure_free(ctx, INET_ADDR4_TUPLE_SIZE + TERM_MAP_SIZE(2)) != MEMORY_GC_OK)) {
+ // Because resource is locked, we must ensure it's not garbage collected
+ if (UNLIKELY(memory_ensure_free_with_roots(ctx, INET_ADDR4_TUPLE_SIZE + TERM_MAP_SIZE(2), 1, &resource_term, MEMORY_CAN_SHRINK) != MEMORY_GC_OK)) {
free(buffer);
AVM_LOGW(TAG, "Failed to allocate memory: %s:%i.", __FILE__, __LINE__);
RAISE_ERROR(OUT_OF_MEMORY_ATOM);
}
}
- ssize_t res = socket_recv(rsrc_obj, buffer, buffer_size, 0, is_recvfrom ? &map : NULL, &ctx->heap);
+ ssize_t res = socket_recv(rsrc_obj, buffer, buffer_size, 0, is_recvfrom ? &roots[1] : NULL, &ctx->heap);
if (res < 0) {
int err = errno;
- term reason = (err == ECONNRESET) ? globalcontext_make_atom(global, ATOM_STR("\xA", "econnreset")) : posix_errno_to_term(err, global);
-
if (err == ECONNRESET) {
- AVM_LOGI(TAG, "Peer closed connection.");
+ TRACE("Peer closed connection.");
+ return make_error_tuple(CLOSED_ATOM, ctx);
+ } else if (err == EAGAIN) {
+ return make_error_tuple(TIMEOUT_ATOM, ctx);
} else {
- AVM_LOGE(TAG, "Unable to read data on socket %i. errno=%i", rsrc_obj->fd, errno);
+ TRACE("Unable to read data on socket %i. errno=%i", rsrc_obj->fd, errno);
}
- return make_error_tuple(reason, ctx);
+ return make_errno_tuple(ctx);
}
if (res == 0) {
@@ -1818,7 +2105,8 @@ static term nif_socket_recv_without_peek(Context *ctx, struct SocketResource *rs
size_t ensure_packet_avail = term_binary_data_size_in_terms(len) + BINARY_HEADER_SIZE;
size_t requested_size = TUPLE_SIZE(2) + ensure_packet_avail + (is_recvfrom ? TUPLE_SIZE(2) : 0);
- if (UNLIKELY(memory_ensure_free_with_roots(ctx, requested_size, is_recvfrom ? 1 : 0, &map, MEMORY_CAN_SHRINK) != MEMORY_GC_OK)) {
+ // Because resource is locked, we must ensure it's not garbage collected
+ if (UNLIKELY(memory_ensure_free_with_roots(ctx, requested_size, is_recvfrom ? 2 : 1, roots, MEMORY_CAN_SHRINK) != MEMORY_GC_OK)) {
free(buffer);
AVM_LOGW(TAG, "Failed to allocate memory: %s:%i.", __FILE__, __LINE__);
RAISE_ERROR(OUT_OF_MEMORY_ATOM);
@@ -1828,7 +2116,7 @@ static term nif_socket_recv_without_peek(Context *ctx, struct SocketResource *rs
term payload;
if (is_recvfrom) {
- term tuple = port_heap_create_tuple2(&ctx->heap, map, data);
+ term tuple = port_heap_create_tuple2(&ctx->heap, roots[1], data);
payload = port_heap_create_ok_tuple(&ctx->heap, tuple);
} else {
payload = port_heap_create_ok_tuple(&ctx->heap, data);
@@ -1841,7 +2129,7 @@ static term nif_socket_recv_without_peek(Context *ctx, struct SocketResource *rs
#elif OTP_SOCKET_LWIP
-static term nif_socket_recv_lwip(Context *ctx, struct SocketResource *rsrc_obj, size_t len, bool is_recvfrom)
+static term nif_socket_recv_lwip(Context *ctx, term resource_term, struct SocketResource *rsrc_obj, size_t len, bool is_recvfrom)
{
TRACE("nif_socket_recv_lwip\n");
@@ -1894,7 +2182,8 @@ static term nif_socket_recv_lwip(Context *ctx, struct SocketResource *rsrc_obj,
size_t ensure_packet_avail = term_binary_data_size_in_terms(len) + BINARY_HEADER_SIZE;
size_t requested_size = REF_SIZE + 2 * TUPLE_SIZE(2) + ensure_packet_avail + (is_recvfrom ? (TUPLE_SIZE(2) + INET_ADDR4_TUPLE_SIZE + TERM_MAP_SIZE(2)) : 0);
- if (UNLIKELY(memory_ensure_free(ctx, requested_size) != MEMORY_GC_OK)) {
+ // Because resource is locked, we must ensure it's not garbage collected
+ if (UNLIKELY(memory_ensure_free_with_roots(ctx, requested_size, 1, &resource_term, MEMORY_CAN_SHRINK) != MEMORY_GC_OK)) {
AVM_LOGW(TAG, "Failed to allocate memory: %s:%i.\n", __FILE__, __LINE__);
RAISE_ERROR(OUT_OF_MEMORY_ATOM);
}
@@ -1935,28 +2224,36 @@ static term nif_socket_recv_internal(Context *ctx, term argv[], bool is_recvfrom
if (UNLIKELY(!term_to_otp_socket(argv[0], &rsrc_obj, ctx))) {
RAISE_ERROR(BADARG_ATOM);
}
+
+ SMP_RWLOCK_RDLOCK(rsrc_obj->socket_lock);
#if OTP_SOCKET_BSD
if (rsrc_obj->fd == 0) {
+ SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock);
return make_error_tuple(posix_errno_to_term(EBADF, ctx->global), ctx);
}
#elif OTP_SOCKET_LWIP
if (rsrc_obj->socket_state & SocketStateClosed) {
+ SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock);
return make_error_tuple(posix_errno_to_term(EBADF, ctx->global), ctx);
}
if (rsrc_obj->socket_state & SocketStateListening) {
+ SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock);
return make_error_tuple(posix_errno_to_term(EOPNOTSUPP, ctx->global), ctx);
}
#endif
+ term result;
#if OTP_SOCKET_BSD
if (otp_socket_platform_supports_peek()) {
- return nif_socket_recv_with_peek(ctx, rsrc_obj, len, is_recvfrom);
+ result = nif_socket_recv_with_peek(ctx, argv[0], rsrc_obj, len, is_recvfrom);
} else {
- return nif_socket_recv_without_peek(ctx, rsrc_obj, len, is_recvfrom);
+ result = nif_socket_recv_without_peek(ctx, argv[0], rsrc_obj, len, is_recvfrom);
}
#elif OTP_SOCKET_LWIP
- return nif_socket_recv_lwip(ctx, rsrc_obj, len, is_recvfrom);
+ result = nif_socket_recv_lwip(ctx, argv[0], rsrc_obj, len, is_recvfrom);
#endif
+ SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock);
+ return result;
}
static term nif_socket_recv(Context *ctx, int argc, term argv[])
@@ -1978,7 +2275,7 @@ static term nif_socket_recvfrom(Context *ctx, int argc, term argv[])
//
// send/sendto
//
-ssize_t socket_send(struct SocketResource *rsrc_obj, const uint8_t *buf, size_t len, term dest)
+static ssize_t do_socket_send(struct SocketResource *rsrc_obj, const uint8_t *buf, size_t len, term dest)
{
ssize_t sent_data = -1;
#if OTP_SOCKET_BSD
@@ -2084,6 +2381,14 @@ ssize_t socket_send(struct SocketResource *rsrc_obj, const uint8_t *buf, size_t
#endif
}
+ssize_t socket_send(struct SocketResource *rsrc_obj, const uint8_t *buf, size_t len, term dest)
+{
+ SMP_RWLOCK_RDLOCK(rsrc_obj->socket_lock);
+ ssize_t result = do_socket_send(rsrc_obj, buf, len, dest);
+ SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock);
+ return result;
+}
+
static term nif_socket_send_internal(Context *ctx, int argc, term argv[], bool is_sendto)
{
TRACE("nif_socket_send_internal\n");
@@ -2099,15 +2404,19 @@ static term nif_socket_send_internal(Context *ctx, int argc, term argv[], bool i
RAISE_ERROR(BADARG_ATOM);
}
+ SMP_RWLOCK_RDLOCK(rsrc_obj->socket_lock);
#if OTP_SOCKET_BSD
if (rsrc_obj->fd == 0) {
+ SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock);
return make_error_tuple(posix_errno_to_term(EBADF, global), ctx);
}
#elif OTP_SOCKET_LWIP
if (rsrc_obj->socket_state == SocketStateClosed) {
+ SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock);
return make_error_tuple(posix_errno_to_term(EBADF, global), ctx);
}
if (rsrc_obj->socket_state & SocketStateListening) {
+ SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock);
return make_error_tuple(posix_errno_to_term(EOPNOTSUPP, global), ctx);
}
#endif
@@ -2121,7 +2430,8 @@ static term nif_socket_send_internal(Context *ctx, int argc, term argv[], bool i
const uint8_t *buf = (const uint8_t *) term_binary_data(data);
size_t len = term_binary_size(data);
- ssize_t sent_data = socket_send(rsrc_obj, buf, len, dest);
+ ssize_t sent_data = do_socket_send(rsrc_obj, buf, len, dest);
+ SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock);
// {ok, RestData} | {error, Reason}
@@ -2147,7 +2457,7 @@ static term nif_socket_send_internal(Context *ctx, int argc, term argv[], bool i
return port_create_tuple2(ctx, OK_ATOM, data);
} else {
- AVM_LOGE(TAG, "Unable to send data: res=%zi.", sent_data);
+ TRACE("Unable to send data: res=%zi.", sent_data);
return make_error_tuple(CLOSED_ATOM, ctx);
}
}
@@ -2239,27 +2549,33 @@ static term nif_socket_connect(Context *ctx, int argc, term argv[])
RAISE_ERROR(BADARG_ATOM);
}
+ SMP_RWLOCK_RDLOCK(rsrc_obj->socket_lock);
term sockaddr = argv[1];
term port = interop_kv_get_value_default(sockaddr, port_atom, term_from_int(0), ctx->global);
term addr = interop_kv_get_value(sockaddr, addr_atom, ctx->global);
if (term_is_invalid_term(addr)) {
+ SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock);
RAISE_ERROR(BADARG_ATOM);
}
avm_int_t port_number = term_to_int(port);
if (port_number < 0 || port_number > 65535) {
+ SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock);
RAISE_ERROR(BADARG_ATOM);
}
#if OTP_SOCKET_BSD
if (rsrc_obj->fd == 0) {
+ SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock);
return make_error_tuple(posix_errno_to_term(EBADF, global), ctx);
}
#elif OTP_SOCKET_LWIP
if (rsrc_obj->socket_state == SocketStateClosed) {
+ SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock);
return make_error_tuple(posix_errno_to_term(EBADF, global), ctx);
}
if (((rsrc_obj->socket_state & SocketStateTCPListening) == SocketStateTCPListening)
|| ((rsrc_obj->socket_state & SocketStateTCPConnected) == SocketStateTCPConnected)) {
+ SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock);
return make_error_tuple(posix_errno_to_term(EOPNOTSUPP, global), ctx);
}
#endif
@@ -2284,16 +2600,25 @@ static term nif_socket_connect(Context *ctx, int argc, term argv[])
if (errno == EINPROGRESS) {
// TODO make connect non-blocking
+ SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock);
return UNDEFINED_ATOM;
} else {
+ SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock);
AVM_LOGE(TAG, "Unable to connect: res=%i errno=%i", res, errno);
return make_error_tuple(CLOSED_ATOM, ctx);
}
} else if (res == 0) {
+ if (UNLIKELY(fcntl(rsrc_obj->fd, F_SETFL, O_NONBLOCK) != 0)) {
+ AVM_LOGE(TAG, "Unable to configure fd=%d to be non blocking.", rsrc_obj->fd);
+ SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock);
+ return make_errno_tuple(ctx);
+ }
+ SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock);
return OK_ATOM;
} else {
// won't happen according to connect(2)
+ SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock);
return UNDEFINED_ATOM;
}
#elif OTP_SOCKET_LWIP
@@ -2312,11 +2637,13 @@ static term nif_socket_connect(Context *ctx, int argc, term argv[])
RAISE_ERROR(OUT_OF_MEMORY_ATOM);
}
if (rsrc_obj->socket_state & SocketStateUDP) {
+ SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock);
return OK_ATOM;
} else {
rsrc_obj->selecting_process_id = ctx->process_id;
// Trap caller waiting for completion
context_update_flags(ctx, ~NoFlags, Trap);
+ SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock);
return term_invalid_term();
}
#endif
@@ -2341,6 +2668,7 @@ static term nif_socket_shutdown(Context *ctx, int argc, term argv[])
RAISE_ERROR(BADARG_ATOM);
}
+ SMP_RWLOCK_RDLOCK(rsrc_obj->socket_lock);
int how;
int val = interop_atom_term_select_int(otp_socket_shutdown_direction_table, argv[1], global);
switch (val) {
@@ -2358,6 +2686,7 @@ static term nif_socket_shutdown(Context *ctx, int argc, term argv[])
break;
default:
+ SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock);
RAISE_ERROR(BADARG_ATOM);
}
@@ -2366,6 +2695,7 @@ static term nif_socket_shutdown(Context *ctx, int argc, term argv[])
#elif OTP_SOCKET_LWIP
if (rsrc_obj->socket_state == SocketStateClosed) {
#endif
+ SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock);
return make_error_tuple(posix_errno_to_term(EBADF, global), ctx);
}
@@ -2375,6 +2705,7 @@ static term nif_socket_shutdown(Context *ctx, int argc, term argv[])
int res = shutdown(rsrc_obj->fd, how);
if (res < 0) {
AVM_LOGE(TAG, "Unable to shut down socket: res=%i errno=%i", res, errno);
+ SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock);
return make_errno_tuple(ctx);
}
#elif OTP_SOCKET_LWIP
@@ -2391,6 +2722,7 @@ static term nif_socket_shutdown(Context *ctx, int argc, term argv[])
}
LWIP_END();
#endif
+ SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock);
return result;
}
@@ -2410,6 +2742,10 @@ static const struct Nif socket_select_stop_nif = {
.base.type = NIFFunctionType,
.nif_ptr = nif_socket_select_stop
};
+static const struct Nif socket_getopt_nif = {
+ .base.type = NIFFunctionType,
+ .nif_ptr = nif_socket_getopt
+};
static const struct Nif socket_setopt_nif = {
.base.type = NIFFunctionType,
.nif_ptr = nif_socket_setopt
@@ -2480,6 +2816,10 @@ const struct Nif *otp_socket_nif_get_nif(const char *nifname)
TRACE("Resolved platform nif %s ...\n", nifname);
return &socket_select_stop_nif;
}
+ if (strcmp("getopt/2", rest) == 0) {
+ TRACE("Resolved platform nif %s ...\n", nifname);
+ return &socket_getopt_nif;
+ }
if (strcmp("setopt/3", rest) == 0) {
TRACE("Resolved platform nif %s ...\n", nifname);
return &socket_setopt_nif;
diff --git a/src/libAtomVM/resources.c b/src/libAtomVM/resources.c
index f19eb3834..fe6aaa58c 100644
--- a/src/libAtomVM/resources.c
+++ b/src/libAtomVM/resources.c
@@ -116,19 +116,20 @@ ERL_NIF_TERM enif_make_resource(ErlNifEnv *env, void *obj)
return term_from_resource(obj, &env->heap);
}
-int enif_select(ErlNifEnv *env, ErlNifEvent event, enum ErlNifSelectFlags mode, void *obj, const ErlNifPid *pid, ERL_NIF_TERM ref)
+static void enif_select_event_message_dispose(Message *message, GlobalContext *global, bool from_task)
{
- if (!(mode & (ERL_NIF_SELECT_STOP | ERL_NIF_SELECT_READ | ERL_NIF_SELECT_WRITE))) {
- return ERL_NIF_SELECT_BADARG;
- }
- if (UNLIKELY(mode & (ERL_NIF_SELECT_READ | ERL_NIF_SELECT_WRITE) && !term_is_local_reference(ref) && ref != UNDEFINED_ATOM)) {
- return ERL_NIF_SELECT_BADARG;
+ if (message) {
+ mailbox_message_dispose_unsent(message, global, from_task);
}
+}
+static int enif_select_common(ErlNifEnv *env, ErlNifEvent event, enum ErlNifSelectFlags mode, void *obj, const ErlNifPid *pid, ERL_NIF_TERM ref, Message *message)
+{
+ GlobalContext *global = env->global;
struct RefcBinary *resource = refc_binary_from_data(obj);
// Search for event and obj
struct ListHead *item;
- struct ListHead *select_events = synclist_wrlock(&env->global->select_events);
+ struct ListHead *select_events = synclist_wrlock(&global->select_events);
struct SelectEvent *select_event = NULL;
LIST_FOR_EACH (item, select_events) {
select_event = GET_LIST_ENTRY(item, struct SelectEvent, head);
@@ -139,19 +140,20 @@ int enif_select(ErlNifEnv *env, ErlNifEvent event, enum ErlNifSelectFlags mode,
}
if (mode & ERL_NIF_SELECT_STOP) {
if (select_event == NULL) {
- synclist_unlock(&env->global->select_events);
+ synclist_unlock(&global->select_events);
return ERL_NIF_SELECT_INVALID_EVENT;
}
bool was_read = select_event->read;
bool was_write = select_event->write;
if (!was_read && !was_write) {
list_remove(&select_event->head);
- synclist_unlock(&env->global->select_events);
+ synclist_unlock(&global->select_events);
// We can call stop now.
if (resource->resource_type->stop) {
resource->resource_type->stop(env, obj, event, true);
}
- refc_binary_decrement_refcount(resource, env->global);
+ refc_binary_decrement_refcount(resource, global);
+ enif_select_event_message_dispose(select_event->message, global, false);
free((void *) select_event);
return ERL_NIF_SELECT_STOP_CALLED;
}
@@ -161,13 +163,13 @@ int enif_select(ErlNifEnv *env, ErlNifEvent event, enum ErlNifSelectFlags mode,
select_event->close = 1;
select_event->read = 0;
select_event->write = 0;
- synclist_unlock(&env->global->select_events);
+ synclist_unlock(&global->select_events);
// Platform loop should check close flag after unregister is called
if (was_read) {
- sys_unregister_select_event(env->global, event, false);
+ sys_unregister_select_event(global, event, false);
}
if (was_write) {
- sys_unregister_select_event(env->global, event, true);
+ sys_unregister_select_event(global, event, true);
}
return ERL_NIF_SELECT_STOP_SCHEDULED;
}
@@ -179,31 +181,60 @@ int enif_select(ErlNifEnv *env, ErlNifEvent event, enum ErlNifSelectFlags mode,
}
select_event->event = event;
select_event->resource = resource;
+ select_event->message = NULL;
+ select_event->ref_ticks = 0;
// Resource is used in select_event, so we increase refcount.
refc_binary_increment_refcount(resource);
list_init(&select_event->head);
list_append(select_events, &select_event->head);
}
- // Second read or second write overwrite ref & pid.
- if (ref == UNDEFINED_ATOM) {
+ // Second read or second write overwrite ref/message & pid.
+ enif_select_event_message_dispose(select_event->message, global, false);
+ if (message) {
+ select_event->message = message;
select_event->ref_ticks = 0;
} else {
- select_event->ref_ticks = term_to_ref_ticks(ref);
+ if (ref == UNDEFINED_ATOM) {
+ select_event->ref_ticks = 0;
+ } else {
+ select_event->ref_ticks = term_to_ref_ticks(ref);
+ }
}
select_event->local_pid = *pid;
select_event->read = mode & ERL_NIF_SELECT_READ;
select_event->write = mode & ERL_NIF_SELECT_WRITE;
select_event->close = 0;
- synclist_unlock(&env->global->select_events);
+ synclist_unlock(&global->select_events);
if (select_event->read) {
- sys_register_select_event(env->global, event, false);
+ sys_register_select_event(global, event, false);
}
if (select_event->write) {
- sys_register_select_event(env->global, event, true);
+ sys_register_select_event(global, event, true);
}
return 0;
}
+int enif_select(ErlNifEnv *env, ErlNifEvent event, enum ErlNifSelectFlags mode, void *obj, const ErlNifPid *pid, ERL_NIF_TERM ref)
+{
+ if (!(mode & (ERL_NIF_SELECT_STOP | ERL_NIF_SELECT_READ | ERL_NIF_SELECT_WRITE))) {
+ return ERL_NIF_SELECT_BADARG;
+ }
+ if (UNLIKELY(mode & (ERL_NIF_SELECT_READ | ERL_NIF_SELECT_WRITE) && !term_is_local_reference(ref) && ref != UNDEFINED_ATOM)) {
+ return ERL_NIF_SELECT_BADARG;
+ }
+ return enif_select_common(env, event, mode, obj, pid, ref, NULL);
+}
+
+int enif_select_read(ErlNifEnv *env, ErlNifEvent event, void *obj, const ErlNifPid *pid, ERL_NIF_TERM msg, ErlNifEnv *msg_env)
+{
+ if (UNLIKELY(msg_env != NULL)) {
+ return ERL_NIF_SELECT_BADARG;
+ }
+ Message *message = mailbox_message_create_normal_message_from_term(msg);
+ enum ErlNifSelectFlags mode = ERL_NIF_SELECT_READ;
+ return enif_select_common(env, event, mode, obj, pid, term_nil(), message);
+}
+
term select_event_make_notification(void *rsrc_obj, uint64_t ref_ticks, bool is_write, Heap *heap)
{
term notification = term_alloc_tuple(4, heap);
@@ -224,19 +255,33 @@ term select_event_make_notification(void *rsrc_obj, uint64_t ref_ticks, bool is_
static void select_event_send_notification(struct SelectEvent *select_event, bool is_write, GlobalContext *global)
{
- BEGIN_WITH_STACK_HEAP(SELECT_EVENT_NOTIFICATION_SIZE, heap)
- term notification = select_event_make_notification(select_event->resource->data, select_event->ref_ticks, is_write, &heap);
+ if (select_event->message) {
+ enum SendMessageResult result;
+#ifdef AVM_SELECT_IN_TASK
+ result = globalcontext_post_message_from_task(global, select_event->local_pid, select_event->message);
+#else
+ result = globalcontext_post_message(global, select_event->local_pid, select_event->message);
+#endif
+ if (result == SEND_MESSAGE_OK) {
+ // Ownership was properly transfered.
+ // Otherwise, it will be destroyed when we have a context (when enif_select is called with stop for example)
+ select_event->message = NULL;
+ }
+ } else {
+ BEGIN_WITH_STACK_HEAP(SELECT_EVENT_NOTIFICATION_SIZE, heap)
+ term notification = select_event_make_notification(select_event->resource->data, select_event->ref_ticks, is_write, &heap);
#ifdef AVM_SELECT_IN_TASK
- globalcontext_send_message_from_task(global, select_event->local_pid, NormalMessage, notification);
+ globalcontext_send_message_from_task(global, select_event->local_pid, NormalMessage, notification);
#else
- globalcontext_send_message(global, select_event->local_pid, notification);
+ globalcontext_send_message(global, select_event->local_pid, notification);
#endif
+ END_WITH_STACK_HEAP(heap, global)
+ }
if (is_write) {
select_event->write = 0;
} else {
select_event->read = 0;
}
- END_WITH_STACK_HEAP(heap, global)
sys_unregister_select_event(global, select_event->event, is_write);
}
@@ -279,6 +324,7 @@ static inline void select_event_destroy(struct SelectEvent *select_event, Global
#else
refc_binary_decrement_refcount(select_event->resource, global);
#endif
+ enif_select_event_message_dispose(select_event->message, global, true);
free((void *) select_event);
}
diff --git a/src/libAtomVM/resources.h b/src/libAtomVM/resources.h
index b9f2c7b5f..f9ba20162 100644
--- a/src/libAtomVM/resources.h
+++ b/src/libAtomVM/resources.h
@@ -25,6 +25,7 @@
#include "erl_nif.h"
#include "list.h"
+#include "mailbox.h"
#include "memory.h"
#ifdef __cplusplus
@@ -70,6 +71,7 @@ struct SelectEvent
bool close;
int32_t local_pid;
uint64_t ref_ticks;
+ Message *message;
};
static inline void resource_type_destroy(struct ResourceType *resource_type)
diff --git a/src/platforms/esp32/test/main/test_erl_sources/test_ssl.erl b/src/platforms/esp32/test/main/test_erl_sources/test_ssl.erl
index 15f7a835e..42449c135 100644
--- a/src/platforms/esp32/test/main/test_erl_sources/test_ssl.erl
+++ b/src/platforms/esp32/test/main/test_erl_sources/test_ssl.erl
@@ -70,9 +70,9 @@ handshake_loop(SSLContext, Socket) ->
case socket:nif_select_read(Socket, Ref) of
ok ->
receive
- {select, _SocketResource, Ref, ready_input} ->
+ {'$socket', Socket, select, Ref} ->
handshake_loop(SSLContext, Socket);
- {closed, Ref} ->
+ {'$socket', Socket, abort, {Ref, closed}} ->
ok = socket:close(Socket),
{error, closed}
end;
@@ -98,9 +98,9 @@ send_loop(SSLContext, Socket, Binary) ->
case socket:nif_select_read(Socket, Ref) of
ok ->
receive
- {select, _SocketResource, Ref, ready_input} ->
+ {'$socket', Socket, select, Ref} ->
send_loop(SSLContext, Socket, Binary);
- {closed, Ref} ->
+ {'$socket', Socket, abort, {Ref, closed}} ->
{error, closed}
end;
{error, _Reason} = Error ->
@@ -124,9 +124,9 @@ recv_loop(SSLContext, Socket, Remaining, Acc) ->
case socket:nif_select_read(Socket, Ref) of
ok ->
receive
- {select, _SocketResource, Ref, ready_input} ->
+ {'$socket', Socket, select, Ref} ->
recv_loop(SSLContext, Socket, Remaining, Acc);
- {closed, Ref} ->
+ {'$socket', Socket, abort, {Ref, closed}} ->
{error, closed}
end;
{error, _Reason} = Error ->
@@ -147,9 +147,9 @@ close_notify_loop(SSLContext, Socket) ->
case socket:nif_select_read(Socket, Ref) of
ok ->
receive
- {select, _SocketResource, Ref, ready_input} ->
+ {'$socket', Socket, select, Ref} ->
close_notify_loop(SSLContext, Socket);
- {closed, Ref} ->
+ {'$socket', Socket, abort, {Ref, closed}} ->
{error, closed}
end;
{error, _Reason} = Error ->
diff --git a/src/platforms/rp2/tests/test_erl_sources/CMakeLists.txt b/src/platforms/rp2/tests/test_erl_sources/CMakeLists.txt
index 6007ce23f..773a555dc 100644
--- a/src/platforms/rp2/tests/test_erl_sources/CMakeLists.txt
+++ b/src/platforms/rp2/tests/test_erl_sources/CMakeLists.txt
@@ -18,6 +18,7 @@
# SPDX-License-Identifier: Apache-2.0 OR LGPL-2.1-or-later
#
+include(ExternalProject)
ExternalProject_Add(HostAtomVM
SOURCE_DIR ../../../../../../
INSTALL_COMMAND cmake -E echo "Skipping install step."
diff --git a/tests/libs/estdlib/test_tcp_socket.erl b/tests/libs/estdlib/test_tcp_socket.erl
index f49c5c1e4..fa154af17 100644
--- a/tests/libs/estdlib/test_tcp_socket.erl
+++ b/tests/libs/estdlib/test_tcp_socket.erl
@@ -27,7 +27,9 @@ test() ->
ok = test_shutdown(),
ok = test_close_by_another_process(),
ok = test_buf_size(),
- ok = test_override_buf_size(),
+ ok = test_timeout(),
+ ok = test_nowait(),
+ ok = test_setopt_getopt(),
case get_otp_version() of
atomvm ->
ok = test_abandon_select();
@@ -36,6 +38,8 @@ test() ->
end,
ok.
+-define(PACKET_SIZE, 7).
+
test_echo_server() ->
etest:flush_msg_queue(),
@@ -44,7 +48,7 @@ test_echo_server() ->
test_send_receive(Port, 10),
- close_listen_socket(ListenSocket).
+ ok = close_listen_socket(ListenSocket).
%%
%% test_shutdown
@@ -63,12 +67,12 @@ test_shutdown() ->
id(ok).
test_shutdown_of_client_sockets(Port) ->
- ok = test_shutdown_of_side(Port, write),
- ok = test_shutdown_of_side(Port, read_write),
- ok = test_shutdown_of_side(Port, read),
+ ok = test_shutdown_of_side(Port, write, <<"echo:01">>),
+ ok = test_shutdown_of_side(Port, read_write, <<"echo:02">>),
+ ok = test_shutdown_of_side(Port, read, <<"echo:03">>),
id(ok).
-test_shutdown_of_side(Port, Side) ->
+test_shutdown_of_side(Port, Side, Packet) ->
{ok, Socket} = socket:open(inet, stream, tcp),
ok = try_connect(Socket, Port, 10),
@@ -76,21 +80,25 @@ test_shutdown_of_side(Port, Side) ->
case Side of
read ->
%% read on the socket should fail
- socket:send(Socket, erlang:atom_to_binary(Side, latin1)),
+ socket:send(Socket, Packet),
case catch (socket:recv(Socket)) of
{error, _} ->
ok;
{ok, Data} ->
- %% On some Linux kernels, shutdown is not guaranteed to
- %% result in an error on read.
+ %% On some Linux kernels, shutdown doesn't return an error
+ %% until all buffered data is read.
%% C.f. https://stackoverflow.com/questions/740817/behavior-of-shutdownsock-shut-rd-with-tcp
- erlang:display({warning, expected_error_on_recv, Side, Data}),
- % error({expected_error_on_recv, Side, Data})
- ok
+ %% Second recv will fail
+ case catch (socket:recv(Socket)) of
+ {error, _} ->
+ ok;
+ {ok, Data} ->
+ error({expected_error_on_recv, Side, Data})
+ end
end;
_ ->
%% write on the socket should fail
- case catch (socket:send(Socket, erlang:atom_to_binary(Side, latin1))) of
+ case catch (socket:send(Socket, Packet)) of
{error, _} ->
ok;
{ok, Data} ->
@@ -122,25 +130,7 @@ test_close_by_another_process() ->
timer:sleep(10),
- ok = close_listen_socket(ListenSocket),
-
- id(ok).
-
-check_receive(Socket, Packet, Length, Expect) ->
- case socket:send(Socket, Packet) of
- ok ->
- ok =
- case socket:recv(Socket, Length) of
- {ok, Expect} ->
- ok;
- Error ->
- io:format("Unexpected value on recv: ~p~n", [Error]),
- Error
- end;
- {error, Reason} = Error ->
- io:format("Error on send: ~p~n", [Reason]),
- Error
- end.
+ ok = close_listen_socket(ListenSocket).
test_buf_size() ->
etest:flush_msg_queue(),
@@ -156,50 +146,26 @@ test_buf_size() ->
{error, _} = socket:setopt(Socket, {otp, rcvbuf}, not_an_int),
{error, _} = socket:setopt(Socket, {otp, rcvbuf}, -1),
- %% limit the recv buffer size to 10 bytes
- ok = socket:setopt(Socket, {otp, rcvbuf}, 10),
-
- Packet = "012345678901234567890123456789",
+ %% limit the recv buffer size to 5 bytes
+ ok = socket:setopt(Socket, {otp, rcvbuf}, 5),
+ true = 5 < ?PACKET_SIZE,
%% we should only be able to receive
- ok = check_receive(Socket, Packet, 0, <<"0123456789">>),
- ok = check_receive(Socket, Packet, 0, <<"0123456789">>),
- ok = check_receive(Socket, Packet, 0, <<"0123456789">>),
-
- timer:sleep(10),
-
- ok = close_client_socket(Socket),
-
- ok = close_listen_socket(ListenSocket),
-
- id(ok).
-
-test_override_buf_size() ->
- etest:flush_msg_queue(),
-
- Port = 44404,
- ListenSocket = start_echo_server(Port),
-
- {ok, Socket} = socket:open(inet, stream, tcp),
- ok = try_connect(Socket, Port, 10),
-
- %% limit the recv buffer size to 10 bytes
- ok = socket:setopt(Socket, {otp, rcvbuf}, 10),
-
- Packet = "012345678901234567890123456789",
+ ok = socket:send(Socket, <<"echo:01">>),
+ {ok, <<"echo:">>} = socket:recv(Socket, 0, 5000),
+ {ok, <<"01">>} = socket:recv(Socket, 0, 5000),
+ ok = socket:send(Socket, <<"echo:02">>),
+ {ok, <<"echo:">>} = socket:recv(Socket, 0, 5000),
+ {ok, <<"02">>} = socket:recv(Socket, 0, 5000),
%% verify that the socket:recv length parameter takes
%% precedence over the default
- ok = check_receive(Socket, Packet, 15, <<"012345678901234">>),
- ok = check_receive(Socket, Packet, 15, <<"567890123456789">>),
-
- timer:sleep(10),
+ ok = socket:send(Socket, <<"echo:03">>),
+ {ok, <<"echo:03">>} = socket:recv(Socket, ?PACKET_SIZE, 5000),
ok = close_client_socket(Socket),
- ok = close_listen_socket(ListenSocket),
-
- id(ok).
+ ok = close_listen_socket(ListenSocket).
%%
%% echo_server
@@ -244,18 +210,18 @@ accept(Pid, ListenSocket) ->
end.
echo(Pid, Socket) ->
- case socket:recv(Socket) of
- {ok, Packet} ->
- % Pid ! {packet_received, Packet},
- ok =
- case socket:send(Socket, Packet) of
- ok ->
- ok;
- E ->
- %% TODO support returning Rest when Packet > buffer_size
- {unexpected_reply_from_send, E}
- end,
- % Pid ! {packet_echoed, Packet},
+ case socket:recv(Socket, ?PACKET_SIZE) of
+ {ok, <<"echo:", _/binary>> = Packet} ->
+ ok = socket:send(Socket, Packet),
+ echo(Pid, Socket);
+ {ok, <<"wait:", _/binary>> = Packet} ->
+ timer:sleep(500),
+ ok = socket:send(Socket, Packet),
+ echo(Pid, Socket);
+ {ok, <<"chnk:", Rest/binary>>} ->
+ ok = socket:send(Socket, <<"chnk:">>),
+ timer:sleep(500),
+ ok = socket:send(Socket, Rest),
echo(Pid, Socket);
%% estdlib TODO
{error, closed} ->
@@ -265,6 +231,9 @@ echo(Pid, Socket) ->
{error, econnreset} ->
Pid ! recv_terminated,
ok;
+ {error, {closed, <<"read">>}} ->
+ Pid ! recv_terminated,
+ ok;
SomethingElse ->
error({unexpected_return_from_recv, SomethingElse})
end.
@@ -274,23 +243,13 @@ close_listen_socket(ListenSocket) ->
%% Close the socket, and wait for a signal that we came out of accept
%%
ok = socket:close(ListenSocket),
- receive
- accept_terminated ->
- ok
- after 1000 ->
- %%
- %% Closing the listening socket from another process may in some
- %% cases not result in the blocking accept to break out of its
- %% call with an expected return value. In this case, we will
- %% allow the wait for the `accept_terminated` message to fail
- %% and simply warn the user. See TODO comment to this effect in
- %% `nif_socket_close` function in `otp_socket.c`
- %%
- erlang:display({warning, timeout, waiting, accept_terminated})
- % throw({timeout, waiting, accept_terminated})
- end,
-
- ok.
+ ok =
+ receive
+ accept_terminated ->
+ ok
+ after 1000 ->
+ {error, {timeout, accept_terminated}}
+ end.
%%
%% send_receive loop
@@ -312,7 +271,7 @@ close_client_socket(Socket) ->
receive
recv_terminated ->
ok
- after 1000 ->
+ after 2000 ->
throw({timeout, waiting, recv_terminated})
end.
@@ -330,7 +289,8 @@ try_connect(Socket, Port, Tries) ->
send_receive_loop(_Socket, 0) ->
ok;
send_receive_loop(Socket, I) ->
- Packet = pid_to_list(self()) ++ ":" ++ integer_to_list(I),
+ Packet = list_to_binary(io_lib:format("echo:~2.10.0B", [I])),
+ ?PACKET_SIZE = byte_size(Packet),
case socket:send(Socket, Packet) of
ok ->
case socket:recv(Socket) of
@@ -345,6 +305,170 @@ send_receive_loop(Socket, I) ->
Error
end.
+receive_loop_nowait(Socket, Packet) when byte_size(Packet) > 0 ->
+ case socket:recv(Socket, byte_size(Packet), nowait) of
+ {ok, ReceivedPacket} when ReceivedPacket =:= Packet ->
+ ok;
+ {select, {select_info, recv, SelectHandle}} when is_reference(SelectHandle) ->
+ receive
+ {'$socket', Socket, select, SelectHandle} ->
+ receive_loop_nowait(Socket, Packet)
+ after 5000 ->
+ {error, timeout}
+ end;
+ {select, {{select_info, recv, SelectHandle}, Data}} when is_reference(SelectHandle) ->
+ {Data, Rest} = split_binary(Packet, byte_size(Data)),
+ receive
+ {'$socket', Socket, select, SelectHandle} ->
+ receive_loop_nowait(Socket, Rest)
+ after 5000 ->
+ {error, timeout}
+ end;
+ {error, _} = Error ->
+ io:format("Error on recv: ~p~n", [Error]),
+ Error
+ end.
+
+receive_loop_nowait_ref(Socket, Packet) when byte_size(Packet) > 0 ->
+ Ref = make_ref(),
+ case socket:recv(Socket, byte_size(Packet), Ref) of
+ {ok, ReceivedPacket} when ReceivedPacket =:= Packet ->
+ ok;
+ {select, {select_info, recv, Ref}} ->
+ receive
+ {'$socket', Socket, select, Ref} ->
+ receive_loop_nowait_ref(Socket, Packet)
+ after 5000 ->
+ {error, timeout}
+ end;
+ {select, {{select_info, recv, Ref}, Data}} ->
+ {Data, Rest} = split_binary(Packet, byte_size(Data)),
+ receive
+ {'$socket', Socket, select, Ref} ->
+ receive_loop_nowait_ref(Socket, Rest)
+ after 5000 ->
+ {error, timeout}
+ end;
+ {error, _} = Error ->
+ io:format("Error on recv: ~p~n", [Error]),
+ Error
+ end.
+
+test_timeout() ->
+ etest:flush_msg_queue(),
+
+ Port = 44404,
+ ListenSocket = start_echo_server(Port),
+
+ {ok, Socket} = socket:open(inet, stream, tcp),
+ ok = try_connect(Socket, Port, 10),
+
+ % receive of two chunks with an infinity timeout
+ Packet0 = <<"chnk:00">>,
+ ok = socket:send(Socket, Packet0),
+ {ok, Packet0} = socket:recv(Socket, ?PACKET_SIZE, infinity),
+
+ % receive of two chunks with a large timeout
+ Packet1 = <<"chnk:01">>,
+ ok = socket:send(Socket, Packet1),
+ {ok, Packet1} = socket:recv(Socket, ?PACKET_SIZE, 5000),
+
+ % receive of two chunks with a small timeout causing a timeout error
+ Packet2 = <<"chnk:02">>,
+ ok = socket:send(Socket, Packet2),
+ {error, Timeout02} = socket:recv(Socket, ?PACKET_SIZE, 250),
+ case Timeout02 of
+ {timeout, <<"chnk:">>} ->
+ % AtomVM usually does return partial data
+ {ok, <<"02">>} = socket:recv(Socket, 2, infinity);
+ timeout ->
+ % BEAM OTP-27 seems to never return partial data
+ {ok, <<"chnk:02">>} = socket:recv(Socket, ?PACKET_SIZE, infinity)
+ end,
+
+ % receive of two chunks with a null timeout causing a timeout error
+ Packet3 = <<"chnk:03">>,
+ ok = socket:send(Socket, Packet3),
+ timer:sleep(250),
+ case socket:recv(Socket, ?PACKET_SIZE, 0) of
+ {ok, <<"chnk:">>} ->
+ % BEAM OTP-22 to OTP-24 returns this on Linux on the CI.
+ {ok, <<"03">>} = socket:recv(Socket, 2);
+ {error, Timeout03} ->
+ case Timeout03 of
+ {timeout, <<"chnk:">>} ->
+ % BEAM OTP-27 seems to always return partial data
+ % AtomVM usually does
+ {ok, <<"03">>} = socket:recv(Socket, 2);
+ timeout ->
+ % Depending on scheduling, AtomVM may return no partial data
+ {ok, <<"chnk:03">>} = socket:recv(Socket, ?PACKET_SIZE)
+ end
+ end,
+
+ % Test recv
+ ok = socket:send(Socket, <<"wait:01">>),
+ {error, timeout} = socket:recv(Socket, 0, 100),
+ {ok, <<"wait:01">>} = socket:recv(Socket, 0, 5000),
+
+ ok = socket:send(Socket, <<"wait:02">>),
+ {error, timeout} = socket:recv(Socket, ?PACKET_SIZE, 0),
+ {ok, <<"wait:02">>} = socket:recv(Socket, ?PACKET_SIZE, 5000),
+
+ ok = socket:send(Socket, <<"wait:03">>),
+ {error, Timeout04} = socket:recv(Socket, 2 * ?PACKET_SIZE, 5000),
+ ok =
+ case Timeout04 of
+ {timeout, <<"wait:03">>} ->
+ % AtomVM usually does return partial data
+ ok;
+ timeout ->
+ % BEAM OTP-27 seems to never return partial data
+ ok
+ end,
+
+ ok = close_client_socket(Socket),
+ ok = close_listen_socket(ListenSocket).
+
+test_nowait() ->
+ ok = test_nowait(fun receive_loop_nowait/2),
+ ok = test_nowait(fun receive_loop_nowait_ref/2),
+ ok.
+
+test_nowait(ReceiveFun) ->
+ etest:flush_msg_queue(),
+
+ Port = 44404,
+ ListenSocket = start_echo_server(Port),
+
+ {ok, Socket} = socket:open(inet, stream, tcp),
+ ok = try_connect(Socket, Port, 10),
+
+ Packet0 = <<"echo:00">>,
+ ok = socket:send(Socket, Packet0),
+ ok = ReceiveFun(Socket, Packet0),
+
+ Packet1 = <<"wait:00">>,
+ ok = socket:send(Socket, Packet1),
+ ok = ReceiveFun(Socket, Packet1),
+
+ Packet2 = <<"chnk:00">>,
+ ok = socket:send(Socket, Packet2),
+ ok = ReceiveFun(Socket, Packet2),
+
+ ok = close_client_socket(Socket),
+
+ ok = close_listen_socket(ListenSocket).
+
+test_setopt_getopt() ->
+ {ok, Socket} = socket:open(inet, stream, tcp),
+ {ok, stream} = socket:getopt(Socket, {socket, type}),
+ ok = socket:setopt(Socket, {socket, reuseaddr}, true),
+ ok = socket:close(Socket),
+ {error, closed} = socket:getopt(Socket, {socket, type}),
+ {error, closed} = socket:setopt(Socket, {socket, reuseaddr}, true),
+ ok.
+
%%
%% abandon_select test
%%
diff --git a/tests/libs/estdlib/test_udp_socket.erl b/tests/libs/estdlib/test_udp_socket.erl
index 4ca20d687..99147fa5c 100644
--- a/tests/libs/estdlib/test_udp_socket.erl
+++ b/tests/libs/estdlib/test_udp_socket.erl
@@ -23,80 +23,270 @@
-export([test/0]).
test() ->
- ok = test_echo_server(),
+ ok = test_echo(),
+ ok = test_buf_size(),
+ ok = test_timeout(),
+ ok = test_nowait(),
+ ok = test_setopt_getopt(),
ok.
-test_echo_server() ->
- Port = 44405,
- {ok, ReceiveSocket} = socket:open(inet, dgram, udp),
+-define(PACKET_SIZE, 7).
+
+start_echo_server(Port) ->
+ {ok, Socket} = socket:open(inet, dgram, udp),
- ok = socket:setopt(ReceiveSocket, {socket, reuseaddr}, true),
- ok = socket:setopt(ReceiveSocket, {socket, linger}, #{onoff => true, linger => 0}),
+ ok = socket:setopt(Socket, {socket, reuseaddr}, true),
+ ok = socket:setopt(Socket, {socket, linger}, #{onoff => true, linger => 0}),
- ok = socket:bind(ReceiveSocket, #{
+ ok = socket:bind(Socket, #{
family => inet, addr => loopback, port => Port
}),
- Self = self(),
- spawn(fun() ->
- Self ! ready,
- receive_loop(Self, ReceiveSocket)
- end),
-
- receive
- ready ->
- ok
- end,
-
- test_send_receive(Port, 10),
-
- %%
- %% Close the socket, and wait for a signal that we came out of recvfrom
- %%
- ok = socket:close(ReceiveSocket),
- receive
- recv_terminated -> ok
- after 1000 ->
- %% This is UDP, so raising an error might not be fair here.
- %% Let's just log instead.
- erlang:display({innocuous_udp_timeout, waiting, recv_terminated})
- end,
- ok.
+ {Pid, MonitorRef} = spawn_opt(
+ fun() ->
+ echo_server_loop(Socket)
+ end,
+ [monitor]
+ ),
-receive_loop(Pid, ReceiveSocket) ->
- case socket:recvfrom(ReceiveSocket) of
- {ok, {_Source, Packet}} ->
- Pid ! {received, Packet},
- receive_loop(Pid, ReceiveSocket);
+ {Pid, MonitorRef, Socket}.
+
+echo_server_loop(Socket) ->
+ case socket:recvfrom(Socket, 0, 5000) of
+ {ok, {Source, <<"echo:", _/binary>> = Packet}} ->
+ ok = socket:sendto(Socket, Packet, Source),
+ echo_server_loop(Socket);
+ {ok, {Source, <<"wait:", _/binary>> = Packet}} ->
+ timer:sleep(500),
+ ok = socket:sendto(Socket, Packet, Source),
+ echo_server_loop(Socket);
+ {ok, {Source, <<"chnk:", Rest/binary>>}} ->
+ ok = socket:sendto(Socket, <<"chnk:">>, Source),
+ ok = socket:sendto(Socket, Rest, Source),
+ echo_server_loop(Socket);
{error, closed} ->
- Pid ! recv_terminated;
+ ok;
SomethingElse ->
- Pid ! recv_terminated,
error({unexpected_return_from_recv, SomethingElse})
end.
-test_send_receive(Port, N) ->
+stop_echo_server({Pid, MonitorRef, Socket}) ->
+ % We stop the server by closing the packet.
+ ok = socket:close(Socket),
+ normal =
+ receive
+ {'DOWN', MonitorRef, process, Pid, Reason} -> Reason
+ end,
+ ok.
+
+test_echo() ->
+ Port = 44405,
+ EchoServer = start_echo_server(Port),
+ Dest = #{family => inet, addr => {127, 0, 0, 1}, port => Port},
+ {ok, Socket} = socket:open(inet, dgram, udp),
+
+ % Test recvfrom
+ ok = socket:sendto(Socket, <<"echo:01">>, Dest),
+ {ok, {Dest, <<"echo:01">>}} = socket:recvfrom(Socket, 0, 5000),
+
+ % Test recv
+ ok = socket:sendto(Socket, <<"echo:02">>, Dest),
+ {ok, <<"echo:02">>} = socket:recv(Socket, 0, 5000),
+
+ % Test loopback
+ ok = socket:sendto(Socket, <<"echo:03">>, #{family => inet, addr => loopback, port => Port}),
+ {ok, {Dest, <<"echo:03">>}} = socket:recvfrom(Socket, 0, 5000),
+
+ % Chunk means two packets with UDP
+ ok = socket:sendto(Socket, <<"chnk:01">>, Dest),
+ timer:sleep(200),
+ {ok, {Dest, <<"chnk:">>}} = socket:recvfrom(Socket, 0, 5000),
+ {ok, {Dest, <<"01">>}} = socket:recvfrom(Socket, 0, 5000),
+
+ % Chunk means two packets with UDP, including with recv
+ ok = socket:sendto(Socket, <<"chnk:02">>, Dest),
+ timer:sleep(200),
+ {ok, <<"chnk:">>} = socket:recv(Socket, 0, 5000),
+ {ok, <<"02">>} = socket:recv(Socket, 0, 5000),
+
+ ok = socket:close(Socket),
+ ok = stop_echo_server(EchoServer).
+
+test_buf_size() ->
+ Port = 44405,
+ EchoServer = start_echo_server(Port),
+ Dest = #{family => inet, addr => {127, 0, 0, 1}, port => Port},
+ {ok, Socket} = socket:open(inet, dgram, udp),
+
+ %% try a few failures first
+ {error, _} = socket:setopt(Socket, {otp, badopt}, any_value),
+ {error, _} = socket:setopt(Socket, {otp, rcvbuf}, not_an_int),
+ {error, _} = socket:setopt(Socket, {otp, rcvbuf}, -1),
+
+ %% limit the recv buffer size to 5 bytes
+ ok = socket:setopt(Socket, {otp, rcvbuf}, 5),
+ true = 5 < ?PACKET_SIZE,
+
+ %% we should only be able to receive
+ ok = socket:sendto(Socket, <<"echo:01">>, Dest),
+ {ok, {Dest, <<"echo:">>}} = socket:recvfrom(Socket, 0, 5000),
+ {error, timeout} = socket:recvfrom(Socket, 0, 0),
+ ok = socket:sendto(Socket, <<"echo:01">>, Dest),
+ {ok, {Dest, <<"echo:">>}} = socket:recvfrom(Socket, 0, 5000),
+ {error, timeout} = socket:recvfrom(Socket, 0, 0),
+
+ %% verify that the socket:recv length parameter takes
+ %% precedence over the default
+ ok = socket:sendto(Socket, <<"echo:03">>, Dest),
+ {ok, {Dest, <<"echo:03">>}} = socket:recvfrom(Socket, ?PACKET_SIZE, 5000),
+
+ ok = socket:close(Socket),
+ ok = stop_echo_server(EchoServer).
+
+test_timeout() ->
+ Port = 44405,
+ EchoServer = start_echo_server(Port),
+ Dest = #{family => inet, addr => {127, 0, 0, 1}, port => Port},
+ {ok, Socket} = socket:open(inet, dgram, udp),
+
+ % Test recvfrom
+ ok = socket:sendto(Socket, <<"wait:01">>, Dest),
+ {error, timeout} = socket:recvfrom(Socket, 0, 100),
+ {ok, {Dest, <<"wait:01">>}} = socket:recvfrom(Socket, 0, 5000),
+
+ ok = socket:sendto(Socket, <<"wait:02">>, Dest),
+ {error, timeout} = socket:recvfrom(Socket, ?PACKET_SIZE, 0),
+ {ok, {Dest, <<"wait:02">>}} = socket:recvfrom(Socket, ?PACKET_SIZE, 5000),
+
+ ok = socket:sendto(Socket, <<"wait:03">>, Dest),
+ {error, timeout} = socket:recvfrom(Socket, 0, 0),
+ {ok, {Dest, <<"wait:03">>}} = socket:recvfrom(Socket, 10, infinity),
+
+ % Test recv
+ ok = socket:sendto(Socket, <<"wait:01">>, Dest),
+ {error, timeout} = socket:recv(Socket, 0, 100),
+ {ok, <<"wait:01">>} = socket:recv(Socket, 0, 5000),
+
+ ok = socket:sendto(Socket, <<"wait:02">>, Dest),
+ {error, timeout} = socket:recv(Socket, ?PACKET_SIZE, 0),
+ {ok, <<"wait:02">>} = socket:recv(Socket, ?PACKET_SIZE, 5000),
+
+ ok = socket:sendto(Socket, <<"wait:03">>, Dest),
+ {error, timeout} = socket:recv(Socket, 2 * ?PACKET_SIZE, 0),
+ ok =
+ case socket:recv(Socket, 2 * ?PACKET_SIZE, 5000) of
+ {ok, <<"wait:03">>} ->
+ ok;
+ % https://github.com/erlang/otp/issues/9172
+ {error, {timeout, <<"wait:03">>}} ->
+ "BEAM" = erlang:system_info(machine),
+ case erlang:system_info(otp_release) of
+ "26" -> ok;
+ "27" -> ok
+ end,
+ ok
+ end,
+
+ ok = socket:close(Socket),
+ ok = stop_echo_server(EchoServer).
+
+test_nowait() ->
+ ok = test_nowait(fun receive_loop_nowait/2),
+ ok = test_nowait(fun receive_loop_nowait_ref/2),
+ ok = test_nowait(fun receive_loop_recvfrom_nowait/2),
+ ok = test_nowait(fun receive_loop_recvfrom_nowait_ref/2),
+ ok.
+
+test_nowait(ReceiveFun) ->
+ etest:flush_msg_queue(),
+
+ Port = 44404,
+ EchoServer = start_echo_server(Port),
+ Dest = #{family => inet, addr => {127, 0, 0, 1}, port => Port},
{ok, Socket} = socket:open(inet, dgram, udp),
- ok = loop(Socket, Port, N),
+ Packet0 = <<"echo:00">>,
+ ok = socket:sendto(Socket, Packet0, Dest),
+ ok = ReceiveFun(Socket, Packet0),
+
+ Packet1 = <<"wait:00">>,
+ ok = socket:sendto(Socket, Packet1, Dest),
+ ok = ReceiveFun(Socket, Packet1),
+
+ ok = socket:close(Socket),
+ ok = stop_echo_server(EchoServer).
+
+receive_loop_nowait(Socket, Packet) ->
+ case socket:recv(Socket, byte_size(Packet), nowait) of
+ {ok, ReceivedPacket} when ReceivedPacket =:= Packet ->
+ ok;
+ {select, {select_info, recv, SelectHandle}} when is_reference(SelectHandle) ->
+ receive
+ {'$socket', Socket, select, SelectHandle} ->
+ receive_loop_nowait(Socket, Packet)
+ after 5000 ->
+ {error, timeout}
+ end;
+ {error, _} = Error ->
+ io:format("Error on recv: ~p~n", [Error]),
+ Error
+ end.
+
+receive_loop_nowait_ref(Socket, Packet) ->
+ Ref = make_ref(),
+ case socket:recv(Socket, byte_size(Packet), Ref) of
+ {ok, ReceivedPacket} when ReceivedPacket =:= Packet ->
+ ok;
+ {select, {select_info, recv, Ref}} ->
+ receive
+ {'$socket', Socket, select, Ref} ->
+ receive_loop_nowait_ref(Socket, Packet)
+ after 5000 ->
+ {error, timeout}
+ end;
+ {error, _} = Error ->
+ io:format("Error on recv: ~p~n", [Error]),
+ Error
+ end.
- %%
- %% Close the socket
- %%
- ok = socket:close(Socket).
+receive_loop_recvfrom_nowait(Socket, Packet) ->
+ case socket:recvfrom(Socket, byte_size(Packet), nowait) of
+ {ok, {_Source, ReceivedPacket}} when ReceivedPacket =:= Packet ->
+ ok;
+ {select, {select_info, recvfrom, SelectHandle}} when is_reference(SelectHandle) ->
+ receive
+ {'$socket', Socket, select, SelectHandle} ->
+ receive_loop_nowait(Socket, Packet)
+ after 5000 ->
+ {error, timeout}
+ end;
+ {error, _} = Error ->
+ io:format("Error on recv: ~p~n", [Error]),
+ Error
+ end.
-loop(_Socket, _Port, 0) ->
- ok;
-loop(Socket, Port, I) ->
- Packet = pid_to_list(self()) ++ ":" ++ integer_to_list(I),
- Dest = #{family => inet, addr => loopback, port => Port},
- case socket:sendto(Socket, Packet, Dest) of
- ok ->
+receive_loop_recvfrom_nowait_ref(Socket, Packet) ->
+ Ref = make_ref(),
+ case socket:recvfrom(Socket, byte_size(Packet), Ref) of
+ {ok, {_Source, ReceivedPacket}} when ReceivedPacket =:= Packet ->
+ ok;
+ {select, {select_info, recvfrom, Ref}} ->
receive
- {received, _Packet} ->
- loop(Socket, Port, I - 1)
+ {'$socket', Socket, select, Ref} ->
+ receive_loop_nowait_ref(Socket, Packet)
+ after 5000 ->
+ {error, timeout}
end;
- {error, _Reason} = Error ->
- io:format("Error on sendto: ~p~n", [Error]),
+ {error, _} = Error ->
+ io:format("Error on recv: ~p~n", [Error]),
Error
end.
+
+test_setopt_getopt() ->
+ {ok, Socket} = socket:open(inet, dgram, udp),
+ {ok, dgram} = socket:getopt(Socket, {socket, type}),
+ ok = socket:setopt(Socket, {socket, reuseaddr}, true),
+ ok = socket:close(Socket),
+ {error, closed} = socket:getopt(Socket, {socket, type}),
+ {error, closed} = socket:setopt(Socket, {socket, reuseaddr}, true),
+ ok.