diff --git a/Makefile b/Makefile index b2edb95e8..18471e2bf 100644 --- a/Makefile +++ b/Makefile @@ -91,7 +91,8 @@ LIBC_BOTTOM_HALF_OMIT_SOURCES := \ $(LIBC_BOTTOM_HALF_SOURCES)/listen.c \ $(LIBC_BOTTOM_HALF_SOURCES)/accept-wasip2.c \ $(LIBC_BOTTOM_HALF_SOURCES)/shutdown.c \ - $(LIBC_BOTTOM_HALF_SOURCES)/sockopt.c + $(LIBC_BOTTOM_HALF_SOURCES)/sockopt.c \ + $(LIBC_BOTTOM_HALF_SOURCES)/poll-wasip2.c LIBC_BOTTOM_HALF_ALL_SOURCES := $(filter-out $(LIBC_BOTTOM_HALF_OMIT_SOURCES),$(LIBC_BOTTOM_HALF_ALL_SOURCES)) # Omit p2-specific headers from include-all.c test. INCLUDE_ALL_CLAUSES := -not -name wasip2.h -not -name descriptor_table.h diff --git a/expected/wasm32-wasip2/defined-symbols.txt b/expected/wasm32-wasip2/defined-symbols.txt index 772a486b9..3b11fb413 100644 --- a/expected/wasm32-wasip2/defined-symbols.txt +++ b/expected/wasm32-wasip2/defined-symbols.txt @@ -1012,6 +1012,7 @@ poll_method_pollable_ready poll_poll poll_pollable_drop_borrow poll_pollable_drop_own +poll_wasip2 posix_close posix_fadvise posix_fallocate diff --git a/libc-bottom-half/cloudlibc/src/libc/poll/poll.c b/libc-bottom-half/cloudlibc/src/libc/poll/poll.c index cde4e81c2..0ee14791d 100644 --- a/libc-bottom-half/cloudlibc/src/libc/poll/poll.c +++ b/libc-bottom-half/cloudlibc/src/libc/poll/poll.c @@ -7,7 +7,7 @@ #include #include -int poll(struct pollfd *fds, size_t nfds, int timeout) { +static int poll_wasip1(struct pollfd *fds, size_t nfds, int timeout) { // Construct events for poll(). size_t maxevents = 2 * nfds + 1; __wasi_subscription_t subscriptions[maxevents]; @@ -127,3 +127,48 @@ int poll(struct pollfd *fds, size_t nfds, int timeout) { } return retval; } + +#ifdef __wasilibc_use_wasip2 +#include + +int poll_wasip2(struct pollfd *fds, size_t nfds, int timeout); + +int poll(struct pollfd* fds, nfds_t nfds, int timeout) +{ + bool found_socket = false; + bool found_non_socket = false; + for (size_t i = 0; i < nfds; ++i) { + descriptor_table_entry_t* entry; + if (descriptor_table_get_ref(fds[i].fd, &entry)) { + found_socket = true; + } else { + found_non_socket = true; + } + } + + if (found_socket) { + if (found_non_socket) { + // We currently don't support polling a mix of non-sockets and + // sockets here (though you can do it by using the host APIs + // directly), and we probably won't until we've migrated entirely to + // WASI 0.2. + errno = ENOTSUP; + return -1; + } + + return poll_wasip2(fds, nfds, timeout); + } else if (found_non_socket) { + return poll_wasip1(fds, nfds, timeout); + } else if (timeout >= 0) { + return poll_wasip2(fds, nfds, timeout); + } else { + errno = ENOTSUP; + return -1; + } +} +#else // not __wasilibc_use_wasip2 +int poll(struct pollfd* fds, nfds_t nfds, int timeout) +{ + return poll_wasip1(fds, nfds, timeout); +} +#endif // not __wasilibc_use_wasip2 diff --git a/libc-bottom-half/cloudlibc/src/libc/sys/select/pselect.c b/libc-bottom-half/cloudlibc/src/libc/sys/select/pselect.c index fdc470ea8..3b479edfb 100644 --- a/libc-bottom-half/cloudlibc/src/libc/sys/select/pselect.c +++ b/libc-bottom-half/cloudlibc/src/libc/sys/select/pselect.c @@ -8,6 +8,7 @@ #include #include +#include int pselect(int nfds, fd_set *restrict readfds, fd_set *restrict writefds, fd_set *restrict errorfds, const struct timespec *restrict timeout, @@ -33,93 +34,66 @@ int pselect(int nfds, fd_set *restrict readfds, fd_set *restrict writefds, if (writefds == NULL) writefds = ∅ - // Determine the maximum number of events. - size_t maxevents = readfds->__nfds + writefds->__nfds + 1; - __wasi_subscription_t subscriptions[maxevents]; - size_t nsubscriptions = 0; - - // Convert the readfds set. + struct pollfd poll_fds[readfds->__nfds + writefds->__nfds]; + size_t poll_nfds = 0; + for (size_t i = 0; i < readfds->__nfds; ++i) { int fd = readfds->__fds[i]; if (fd < nfds) { - __wasi_subscription_t *subscription = &subscriptions[nsubscriptions++]; - *subscription = (__wasi_subscription_t){ - .userdata = fd, - .u.tag = __WASI_EVENTTYPE_FD_READ, - .u.u.fd_read.file_descriptor = fd, - }; + poll_fds[poll_nfds++] = (struct pollfd){ + .fd = fd, + .events = POLLRDNORM, + .revents = 0 + }; } } - - // Convert the writefds set. + for (size_t i = 0; i < writefds->__nfds; ++i) { int fd = writefds->__fds[i]; if (fd < nfds) { - __wasi_subscription_t *subscription = &subscriptions[nsubscriptions++]; - *subscription = (__wasi_subscription_t){ - .userdata = fd, - .u.tag = __WASI_EVENTTYPE_FD_WRITE, - .u.u.fd_write.file_descriptor = fd, + poll_fds[poll_nfds++] = (struct pollfd){ + .fd = fd, + .events = POLLWRNORM, + .revents = 0 }; } } - // Create extra event for the timeout. - if (timeout != NULL) { - __wasi_subscription_t *subscription = &subscriptions[nsubscriptions++]; - *subscription = (__wasi_subscription_t){ - .u.tag = __WASI_EVENTTYPE_CLOCK, - .u.u.clock.id = __WASI_CLOCKID_REALTIME, - }; - if (!timespec_to_timestamp_clamp(timeout, &subscription->u.u.clock.timeout)) { + int poll_timeout; + if (timeout) { + uint64_t timeout_u64; + if (!timespec_to_timestamp_clamp(timeout, &timeout_u64) ) { errno = EINVAL; return -1; } - } - // Execute poll(). - size_t nevents; - __wasi_event_t events[nsubscriptions]; - __wasi_errno_t error = - __wasi_poll_oneoff(subscriptions, events, nsubscriptions, &nevents); - if (error != 0) { - // WASI's poll requires at least one subscription, or else it returns - // `EINVAL`. Since a `pselect` with nothing to wait for is valid in POSIX, - // return `ENOTSUP` to indicate that we don't support that case. - // - // Wasm has no signal handling, so if none of the user-provided `pollfd` - // elements, nor the timeout, led us to producing even one subscription - // to wait for, there would be no way for the poll to wake up. WASI - // returns `EINVAL` in this case, but for users of `poll`, `ENOTSUP` is - // more likely to be understood. - if (nsubscriptions == 0) - errno = ENOTSUP; - else - errno = error; - return -1; - } + // Convert nanoseconds to milliseconds: + timeout_u64 /= 1000000; - // Test for EBADF. - for (size_t i = 0; i < nevents; ++i) { - const __wasi_event_t *event = &events[i]; - if ((event->type == __WASI_EVENTTYPE_FD_READ || - event->type == __WASI_EVENTTYPE_FD_WRITE) && - event->error == __WASI_ERRNO_BADF) { - errno = EBADF; - return -1; + if (timeout_u64 > INT_MAX) { + timeout_u64 = INT_MAX; } + + poll_timeout = (int) timeout_u64; + } else { + poll_timeout = -1; + }; + + if (poll(poll_fds, poll_nfds, poll_timeout) < 0) { + return -1; } - // Clear and set entries in the result sets. FD_ZERO(readfds); FD_ZERO(writefds); - for (size_t i = 0; i < nevents; ++i) { - const __wasi_event_t *event = &events[i]; - if (event->type == __WASI_EVENTTYPE_FD_READ) { - readfds->__fds[readfds->__nfds++] = event->userdata; - } else if (event->type == __WASI_EVENTTYPE_FD_WRITE) { - writefds->__fds[writefds->__nfds++] = event->userdata; + for (size_t i = 0; i < poll_nfds; ++i) { + struct pollfd* pollfd = poll_fds + i; + if ((pollfd->revents & POLLRDNORM) != 0) { + readfds->__fds[readfds->__nfds++] = pollfd->fd; + } + if ((pollfd->revents & POLLWRNORM) != 0) { + writefds->__fds[writefds->__nfds++] = pollfd->fd; } } + return readfds->__nfds + writefds->__nfds; } diff --git a/libc-bottom-half/sources/__wasilibc_fd_renumber.c b/libc-bottom-half/sources/__wasilibc_fd_renumber.c index 47992e91f..7690d1359 100644 --- a/libc-bottom-half/sources/__wasilibc_fd_renumber.c +++ b/libc-bottom-half/sources/__wasilibc_fd_renumber.c @@ -15,10 +15,85 @@ int __wasilibc_fd_renumber(int fd, int newfd) { return 0; } +#ifdef __wasilibc_use_wasip2 +#include + +void drop_tcp_socket(tcp_socket_t socket) { + switch (socket.state.tag) { + case TCP_SOCKET_STATE_UNBOUND: + case TCP_SOCKET_STATE_BOUND: + case TCP_SOCKET_STATE_CONNECTING: + case TCP_SOCKET_STATE_LISTENING: + case TCP_SOCKET_STATE_CONNECT_FAILED: + // No additional resources to drop. + break; + case TCP_SOCKET_STATE_CONNECTED: { + tcp_socket_state_connected_t connection = socket.state.connected; + + poll_pollable_drop_own(connection.input_pollable); + poll_pollable_drop_own(connection.output_pollable); + streams_input_stream_drop_own(connection.input); + streams_output_stream_drop_own(connection.output); + break; + } + default: /* unreachable */ abort(); + } + + poll_pollable_drop_own(socket.socket_pollable); + tcp_tcp_socket_drop_own(socket.socket); +} + +void drop_udp_socket_streams(udp_socket_streams_t streams) { + poll_pollable_drop_own(streams.incoming_pollable); + poll_pollable_drop_own(streams.outgoing_pollable); + udp_incoming_datagram_stream_drop_own(streams.incoming); + udp_outgoing_datagram_stream_drop_own(streams.outgoing); +} + +void drop_udp_socket(udp_socket_t socket) { + switch (socket.state.tag) { + case UDP_SOCKET_STATE_UNBOUND: + case UDP_SOCKET_STATE_BOUND_NOSTREAMS: + // No additional resources to drop. + break; + case UDP_SOCKET_STATE_BOUND_STREAMING: + drop_udp_socket_streams(socket.state.bound_streaming.streams); + break; + case UDP_SOCKET_STATE_CONNECTED: { + drop_udp_socket_streams(socket.state.connected.streams); + break; + } + default: /* unreachable */ abort(); + } + + poll_pollable_drop_own(socket.socket_pollable); + udp_udp_socket_drop_own(socket.socket); +} +#endif // __wasilibc_use_wasip2 + int close(int fd) { // Scan the preopen fds before making any changes. __wasilibc_populate_preopens(); +#ifdef __wasilibc_use_wasip2 + descriptor_table_entry_t entry; + if (descriptor_table_remove(fd, &entry)) { + + switch (entry.tag) + { + case DESCRIPTOR_TABLE_ENTRY_TCP_SOCKET: + drop_tcp_socket(entry.tcp_socket); + break; + case DESCRIPTOR_TABLE_ENTRY_UDP_SOCKET: + drop_udp_socket(entry.udp_socket); + break; + default: /* unreachable */ abort(); + } + + return 0; + } +#endif // __wasilibc_use_wasip2 + __wasi_errno_t error = __wasi_fd_close(fd); if (error != 0) { errno = error; diff --git a/libc-bottom-half/sources/poll-wasip2.c b/libc-bottom-half/sources/poll-wasip2.c new file mode 100644 index 000000000..be7809c66 --- /dev/null +++ b/libc-bottom-half/sources/poll-wasip2.c @@ -0,0 +1,257 @@ +#include +#include + +#include + +typedef struct { + poll_own_pollable_t pollable; + struct pollfd *pollfd; + descriptor_table_entry_t *entry; + short events; +} state_t; + +int poll_wasip2(struct pollfd *fds, size_t nfds, int timeout) +{ + int event_count = 0; + for (size_t i = 0; i < nfds; ++i) { + fds[i].revents = 0; + } + + size_t max_pollables = (2 * nfds) + 1; + state_t states[max_pollables]; + size_t state_index = 0; + for (size_t i = 0; i < nfds; ++i) { + struct pollfd *pollfd = fds + i; + descriptor_table_entry_t *entry; + if (descriptor_table_get_ref(pollfd->fd, &entry)) { + switch (entry->tag) { + case DESCRIPTOR_TABLE_ENTRY_TCP_SOCKET: { + tcp_socket_t *socket = &(entry->tcp_socket); + switch (socket->state.tag) { + case TCP_SOCKET_STATE_CONNECTING: + case TCP_SOCKET_STATE_LISTENING: { + if ((pollfd->events & + (POLLRDNORM | POLLWRNORM)) != 0) { + states[state_index++] = (state_t){ + .pollable = + socket->socket_pollable, + .pollfd = pollfd, + .entry = entry, + .events = pollfd->events + }; + } + break; + } + + case TCP_SOCKET_STATE_CONNECTED: { + if ((pollfd->events & POLLRDNORM) != + 0) { + states[state_index++] = (state_t){ + .pollable = + socket->state + .connected + .input_pollable, + .pollfd = pollfd, + .entry = entry, + .events = POLLRDNORM + }; + } + if ((pollfd->events & POLLWRNORM) != + 0) { + states[state_index++] = (state_t){ + .pollable = + socket->state + .connected + .output_pollable, + .pollfd = pollfd, + .entry = entry, + .events = POLLWRNORM + }; + } + break; + } + + case TCP_SOCKET_STATE_CONNECT_FAILED: { + if (pollfd->revents == 0) { + ++event_count; + } + pollfd->revents |= pollfd->events; + break; + } + + default: + errno = ENOTSUP; + return -1; + } + break; + } + + case DESCRIPTOR_TABLE_ENTRY_UDP_SOCKET: { + udp_socket_t *socket = &(entry->udp_socket); + switch (socket->state.tag) { + case UDP_SOCKET_STATE_UNBOUND: + case UDP_SOCKET_STATE_BOUND_NOSTREAMS: { + if (pollfd->revents == 0) { + ++event_count; + } + pollfd->revents |= pollfd->events; + break; + } + + case UDP_SOCKET_STATE_BOUND_STREAMING: + case UDP_SOCKET_STATE_CONNECTED: { + udp_socket_streams_t *streams; + if (socket->state.tag == + UDP_SOCKET_STATE_BOUND_STREAMING) { + streams = &( + socket->state + .bound_streaming + .streams); + } else { + streams = &( + socket->state.connected + .streams); + } + if ((pollfd->events & POLLRDNORM) != + 0) { + states[state_index++] = (state_t){ + .pollable = + streams->incoming_pollable, + .pollfd = pollfd, + .entry = entry, + .events = POLLRDNORM + }; + } + if ((pollfd->events & POLLWRNORM) != + 0) { + states[state_index++] = (state_t){ + .pollable = + streams->outgoing_pollable, + .pollfd = pollfd, + .entry = entry, + .events = POLLWRNORM + }; + } + break; + } + + default: + errno = ENOTSUP; + return -1; + } + break; + } + + default: + errno = ENOTSUP; + return -1; + } + } else { + abort(); + } + } + + if (event_count > 0 && timeout != 0) { + return event_count; + } + + poll_borrow_pollable_t pollables[state_index + 1]; + for (size_t i = 0; i < state_index; ++i) { + pollables[i] = poll_borrow_pollable(states[i].pollable); + } + + poll_own_pollable_t timeout_pollable; + size_t pollable_count = state_index; + if (timeout >= 0) { + timeout_pollable = monotonic_clock_subscribe_duration( + ((monotonic_clock_duration_t)timeout) * 1000000); + pollables[pollable_count++] = + poll_borrow_pollable(timeout_pollable); + } + + wasip2_list_u32_t ready; + poll_list_borrow_pollable_t list = { + .ptr = (poll_borrow_pollable_t *)&pollables, + .len = pollable_count + }; + poll_poll(&list, &ready); + + for (size_t i = 0; i < ready.len; ++i) { + size_t index = ready.ptr[i]; + if (index < state_index) { + state_t *state = &states[index]; + if (state->entry->tag == + DESCRIPTOR_TABLE_ENTRY_TCP_SOCKET && + state->entry->tcp_socket.state.tag == + TCP_SOCKET_STATE_CONNECTING) { + tcp_socket_t *socket = + &(state->entry->tcp_socket); + tcp_borrow_tcp_socket_t borrow = + tcp_borrow_tcp_socket(socket->socket); + tcp_tuple2_own_input_stream_own_output_stream_t + tuple; + tcp_error_code_t error; + if (tcp_method_tcp_socket_finish_connect( + borrow, &tuple, &error)) { + streams_borrow_input_stream_t + input_stream_borrow = + streams_borrow_input_stream( + tuple.f0); + streams_own_pollable_t input_pollable = + streams_method_input_stream_subscribe( + input_stream_borrow); + streams_borrow_output_stream_t + output_stream_borrow = + streams_borrow_output_stream( + tuple.f1); + streams_own_pollable_t output_pollable = + streams_method_output_stream_subscribe( + output_stream_borrow); + socket->state = + (tcp_socket_state_t){ .tag = TCP_SOCKET_STATE_CONNECTED, + .connected = { + .input_pollable = + input_pollable, + .input = + tuple.f0, + .output_pollable = + output_pollable, + .output = + tuple.f1, + } }; + if (state->pollfd->revents == 0) { + ++event_count; + } + state->pollfd->revents |= state->events; + } else if (error == + NETWORK_ERROR_CODE_WOULD_BLOCK) { + // No events yet -- application will need to poll again + } else { + socket->state = + (tcp_socket_state_t){ .tag = TCP_SOCKET_STATE_CONNECT_FAILED, + .connect_failed = { + .error_code = + error, + } }; + if (state->pollfd->revents == 0) { + ++event_count; + } + state->pollfd->revents |= state->events; + } + } else { + if (state->pollfd->revents == 0) { + ++event_count; + } + state->pollfd->revents |= state->events; + } + } + } + + wasip2_list_u32_free(&ready); + + if (timeout >= 0) { + poll_pollable_drop_own(timeout_pollable); + } + + return event_count; +}