From 04eedde460225845198e0b60b4c78274ea8e1e10 Mon Sep 17 00:00:00 2001 From: Paul Guyot Date: Wed, 22 Jan 2025 22:16:27 +0100 Subject: [PATCH] Add support for udp multicast Also fix processing of return value of send(2) and sendto(2). Signed-off-by: Paul Guyot --- CHANGELOG.md | 1 + libs/estdlib/src/socket.erl | 8 ++- src/libAtomVM/otp_socket.c | 77 +++++++++++++++++++++----- tests/libs/estdlib/test_udp_socket.erl | 70 +++++++++++++++++++++++ 4 files changed, 140 insertions(+), 16 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index bf2b386b9..7d1c9e85e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -25,6 +25,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Added `externalterm_to_term_with_roots` to efficiently preserve roots when allocating memory for external terms. - Added `erl_epmd` client implementation to epmd using `socket` module - Added support for socket asynchronous API for `recv`, `recvfrom` and `accept`. +- Added support for UDP multicast with socket API. ### Changed diff --git a/libs/estdlib/src/socket.erl b/libs/estdlib/src/socket.erl index 447dbaee6..160f4c10c 100644 --- a/libs/estdlib/src/socket.erl +++ b/libs/estdlib/src/socket.erl @@ -66,10 +66,12 @@ }. -type in_addr() :: {0..255, 0..255, 0..255, 0..255}. -type port_number() :: 0..65535. +-type ip_mreq() :: #{multiaddr := in_addr(), interface := in_addr()}. -type socket_option() :: {socket, reuseaddr | linger | type} - | {otp, recvbuf}. + | {otp, recvbuf} + | {ip, add_membership}. -export_type([ socket/0, @@ -80,7 +82,8 @@ sockaddr_in/0, in_addr/0, port_number/0, - socket_option/0 + socket_option/0, + ip_mreq/0 ]). -define(DEFAULT_BACKLOG, 4). @@ -647,6 +650,7 @@ getopt(_Socket, _SocketOption) -> %% `{socket, reuseaddr}'`boolean()' %% `{socket, linger}'`#{onoff => boolean(), linger => non_neg_integer()}' %% `{otp, recvbuf}'`non_neg_integer()' +%% `{ip, add_membership}'`ip_mreq()' %% %% %% Example: diff --git a/src/libAtomVM/otp_socket.c b/src/libAtomVM/otp_socket.c index fe7a71652..55265f862 100644 --- a/src/libAtomVM/otp_socket.c +++ b/src/libAtomVM/otp_socket.c @@ -194,6 +194,7 @@ static const char *const port_atom = ATOM_STR("\x4", "port"); static const char *const rcvbuf_atom = ATOM_STR("\x6", "rcvbuf"); static const char *const reuseaddr_atom = ATOM_STR("\x9", "reuseaddr"); static const char *const type_atom = ATOM_STR("\x4", "type"); +static const char *const add_membership_atom = ATOM_STR("\xE", "add_membership"); #define CLOSED_FD 0 @@ -221,12 +222,14 @@ enum otp_socket_setopt_level { OtpSocketInvalidSetoptLevel = 0, OtpSocketSetoptLevelSocket, - OtpSocketSetoptLevelOTP + OtpSocketSetoptLevelOTP, + OtpSocketSetoptLevelIP }; static const AtomStringIntPair otp_socket_setopt_level_table[] = { { ATOM_STR("\x6", "socket"), OtpSocketSetoptLevelSocket }, { ATOM_STR("\x3", "otp"), OtpSocketSetoptLevelOTP }, + { ATOM_STR("\x2", "ip"), OtpSocketSetoptLevelIP }, SELECT_INT_DEFAULT(OtpSocketInvalidSetoptLevel) }; @@ -604,7 +607,7 @@ static term nif_socket_open(Context *ctx, int argc, term argv[]) } term socket_term = term_alloc_tuple(2, &ctx->heap); - uint64_t ref_ticks = globalcontext_get_ref_ticks(ctx->global); + uint64_t ref_ticks = globalcontext_get_ref_ticks(global); rsrc_obj->socket_ref_ticks = ref_ticks; term ref = term_from_ref_ticks(ref_ticks, &ctx->heap); term_put_tuple_element(socket_term, 0, obj); @@ -1261,8 +1264,8 @@ static term nif_socket_setopt(Context *ctx, int argc, term argv[]) return OK_ATOM; #endif } else if (globalcontext_is_term_equal_to_atom_string(global, opt, linger_atom)) { - term onoff = interop_kv_get_value(value, onoff_atom, ctx->global); - term linger = interop_kv_get_value(value, linger_atom, ctx->global); + term onoff = interop_kv_get_value(value, onoff_atom, global); + term linger = interop_kv_get_value(value, linger_atom, global); VALIDATE_VALUE(linger, term_is_integer); #if OTP_SOCKET_BSD @@ -1323,6 +1326,52 @@ static term nif_socket_setopt(Context *ctx, int argc, term argv[]) } } +#if OTP_SOCKET_BSD + case OtpSocketSetoptLevelIP: { + term opt = term_get_tuple_element(level_tuple, 1); + if (globalcontext_is_term_equal_to_atom_string(global, opt, add_membership_atom)) { + // socket:setopt(Socket, {ip, add_membership_atom}, Req :: ip_mreq()) + + if (UNLIKELY(!term_is_map(value))) { + TRACE("socket:setopt: ip add_membership_atom value must be a map"); + SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock); + return make_error_tuple(globalcontext_make_atom(global, invalid_value_atom), ctx); + } + + term multiaddr = interop_kv_get_value(value, ATOM_STR("\x9", "multiaddr"), global); + if (UNLIKELY(!term_is_tuple(multiaddr) || term_get_tuple_arity(multiaddr) != 4)) { + TRACE("socket:setopt: ip add_membership_atom multiaddr value must be an IP addr"); + SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock); + return make_error_tuple(globalcontext_make_atom(global, invalid_value_atom), ctx); + } + + term interface = interop_kv_get_value(value, ATOM_STR("\x9", "interface"), global); + if (UNLIKELY(!term_is_tuple(interface) || term_get_tuple_arity(interface) != 4)) { + TRACE("socket:setopt: ip add_membership_atom interface value must be an IP addr"); + SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock); + return make_error_tuple(globalcontext_make_atom(global, invalid_value_atom), ctx); + } + + struct ip_mreq option_value; + option_value.imr_multiaddr.s_addr = htonl(inet_addr4_to_uint32(multiaddr)); + option_value.imr_interface.s_addr = htonl(inet_addr4_to_uint32(interface)); + + int res = setsockopt(rsrc_obj->fd, IPPROTO_IP, IP_ADD_MEMBERSHIP, &option_value, sizeof(option_value)); + + SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock); + if (UNLIKELY(res != 0)) { + return make_errno_tuple(ctx); + } else { + return OK_ATOM; + } + } else { + TRACE("socket:setopt: Unsupported ip option"); + SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock); + return make_error_tuple(globalcontext_make_atom(global, invalid_option_atom), ctx); + } + } +#endif + default: { TRACE("socket:setopt: Unsupported level"); SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock); @@ -1538,9 +1587,9 @@ static term nif_socket_bind(Context *ctx, int argc, term argv[]) ip_addr_set_loopback(false, &ip_addr); #endif } else if (term_is_map(sockaddr)) { - term port = interop_kv_get_value_default(sockaddr, port_atom, term_from_int(0), ctx->global); + term port = interop_kv_get_value_default(sockaddr, port_atom, term_from_int(0), global); port_u16 = term_to_int(port); - term addr = interop_kv_get_value(sockaddr, addr_atom, ctx->global); + term addr = interop_kv_get_value(sockaddr, addr_atom, global); if (globalcontext_is_term_equal_to_atom_string(global, addr, any_atom)) { #if OTP_SOCKET_BSD serveraddr.sin_addr.s_addr = htonl(INADDR_ANY); @@ -1764,7 +1813,7 @@ static term nif_socket_accept(Context *ctx, int argc, term argv[]) } term socket_term = term_alloc_tuple(2, &ctx->heap); - uint64_t ref_ticks = globalcontext_get_ref_ticks(ctx->global); + uint64_t ref_ticks = globalcontext_get_ref_ticks(global); conn_rsrc_obj->socket_ref_ticks = ref_ticks; term ref = term_from_ref_ticks(ref_ticks, &ctx->heap); term_put_tuple_element(socket_term, 0, new_resource); @@ -1808,7 +1857,7 @@ static term nif_socket_accept(Context *ctx, int argc, term argv[]) // return EAGAIN LWIP_END(); SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock); - return make_error_tuple(posix_errno_to_term(EAGAIN, ctx->global), ctx); + return make_error_tuple(posix_errno_to_term(EAGAIN, global), ctx); } LWIP_END(); SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock); @@ -2285,13 +2334,13 @@ static ssize_t do_socket_send(struct SocketResource *rsrc_obj, const uint8_t *bu } else { sent_data = send(rsrc_obj->fd, buf, len, 0); } - if (sent_data == 0) { - return SocketClosed; - } if (sent_data < 0) { if (errno == EAGAIN || errno == EWOULDBLOCK) { return SocketWouldBlock; } + if (errno == EBADF || errno == ECONNRESET) { + return SocketClosed; + } return SocketOtherError; } return sent_data; @@ -2430,7 +2479,7 @@ static term nif_socket_send_internal(Context *ctx, int argc, term argv[], bool i RAISE_ERROR(OUT_OF_MEMORY_ATOM); } - term rest = term_maybe_create_sub_binary(data, sent_data, rest_len, &ctx->heap, ctx->global); + term rest = term_maybe_create_sub_binary(data, sent_data, rest_len, &ctx->heap, global); return port_create_tuple2(ctx, OK_ATOM, rest); } else if (sent_data == 0) { @@ -2535,8 +2584,8 @@ static term nif_socket_connect(Context *ctx, int argc, term argv[]) SMP_RWLOCK_RDLOCK(rsrc_obj->socket_lock); term sockaddr = argv[1]; - term port = interop_kv_get_value_default(sockaddr, port_atom, term_from_int(0), ctx->global); - term addr = interop_kv_get_value(sockaddr, addr_atom, ctx->global); + term port = interop_kv_get_value_default(sockaddr, port_atom, term_from_int(0), global); + term addr = interop_kv_get_value(sockaddr, addr_atom, global); if (term_is_invalid_term(addr)) { SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock); RAISE_ERROR(BADARG_ATOM); diff --git a/tests/libs/estdlib/test_udp_socket.erl b/tests/libs/estdlib/test_udp_socket.erl index 99147fa5c..44fa8a6c1 100644 --- a/tests/libs/estdlib/test_udp_socket.erl +++ b/tests/libs/estdlib/test_udp_socket.erl @@ -28,6 +28,24 @@ test() -> ok = test_timeout(), ok = test_nowait(), ok = test_setopt_getopt(), + % Workaround for image limitation on CI + % https://github.com/actions/runner-images/issues/10924 + Platform = erlang:system_info(machine), + System = execute_command(Platform, "uname -s"), + TestMulticast = + case System of + "Darwin\n" -> + Version = execute_command(Platform, "uname -r"), + Version < "25"; + _ -> + true + end, + if + TestMulticast -> + ok = test_multicast(); + true -> + ok + end, ok. -define(PACKET_SIZE, 7). @@ -290,3 +308,55 @@ test_setopt_getopt() -> {error, closed} = socket:getopt(Socket, {socket, type}), {error, closed} = socket:setopt(Socket, {socket, reuseaddr}, true), ok. + +test_multicast() -> + {ok, SocketRecv} = socket:open(inet, dgram, udp), + SocketRecvAddr = #{ + family => inet, addr => {0, 0, 0, 0}, port => 8042 + }, + ok = socket:setopt(SocketRecv, {socket, reuseaddr}, true), + ok = socket:bind(SocketRecv, SocketRecvAddr), + ok = socket:setopt(SocketRecv, {ip, add_membership}, #{ + multiaddr => {224, 0, 0, 42}, interface => {0, 0, 0, 0} + }), + + {ok, SocketSender} = socket:open(inet, dgram, udp), + ok = socket:sendto(SocketSender, <<"42">>, #{ + family => inet, addr => {224, 0, 0, 42}, port => 8042 + }), + {ok, SocketSenderAddr} = socket:sockname(SocketSender), + SocketSenderAddrPort = maps:get(port, SocketSenderAddr), + + {ok, {SocketSenderAddrFrom, <<"42">>}} = socket:recvfrom(SocketRecv, 2, 500), + {error, timeout} = socket:recvfrom(SocketRecv, 2, 0), + SocketSenderAddrPort = maps:get(port, SocketSenderAddrFrom), + + ok = socket:sendto(SocketRecv, <<"43">>, #{ + family => inet, addr => {224, 0, 0, 42}, port => 8042 + }), + {ok, {SocketRecvAddrFrom, <<"43">>}} = socket:recvfrom(SocketRecv, 2, 500), + {error, timeout} = socket:recvfrom(SocketRecv, 2, 0), + 8042 = maps:get(port, SocketRecvAddrFrom), + + ok = socket:close(SocketRecv), + ok = socket:close(SocketSender), + ok. + +execute_command("BEAM", Command) -> + os:cmd(Command); +execute_command("ATOM", Command) -> + {ok, _, Fd} = atomvm:subprocess("/bin/sh", ["sh", "-c", Command], undefined, [stdout]), + Result = loop_read(Fd, []), + ok = atomvm:posix_close(Fd), + Result. + +loop_read(Fd, Acc) -> + case atomvm:posix_read(Fd, 10) of + eof -> + lists:flatten(lists:reverse(Acc)); + {error, eintr} -> + % used with lldb ;-) + loop_read(Fd, Acc); + {ok, Line} -> + loop_read(Fd, [binary_to_list(Line) | Acc]) + end.