Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

When using io_uring_prep_recv_multishot and IORING_RECVSEND_BUNDLE and IOU_PBUF_RING_INC, data will only be accepted once! #1320

Open
AomaYple opened this issue Jan 4, 2025 · 10 comments

Comments

@AomaYple
Copy link

AomaYple commented Jan 4, 2025

As stated in the title, io_uring_prep_recv_multishot must be initiated again to continue receiving data

@axboe
Copy link
Owner

axboe commented Jan 4, 2025

You're going to need to be a LOT more specific on what you're talking about here.

@AomaYple
Copy link
Author

AomaYple commented Jan 5, 2025

You're going to need to be a LOT more specific on what you're talking about here.

#include <arpa/inet.h>
#include <liburing.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>

int main() {
    const unsigned entries = 32768;
    struct io_uring ring;
    int result = io_uring_queue_init(entries, &ring, 0);
    if (result != 0) {
        printf("Failed to initialize io_uring queue: %s\n", strerror(-result));
        exit(1);
    }
    const int ringBufferId = 0;
    int error;
    struct io_uring_buf_ring *ringBuffer =
        io_uring_setup_buf_ring(&ring, entries, ringBufferId, IOU_PBUF_RING_INC, &error);
    if (ringBuffer == NULL) {
        printf("Failed to setup buffer ring: %s\n", strerror(-error));
        exit(1);
    }

    char buffer0[16], buffer1[16], buffer2[16];
    unsigned buffer0Offset = 0, buffer1Offset = 0, buffer2Offset = 0;
    const unsigned bufferLength = sizeof(buffer0);
    io_uring_buf_ring_add(ringBuffer, buffer0, bufferLength, 0, io_uring_buf_ring_mask(entries), 0);
    io_uring_buf_ring_add(ringBuffer, buffer1, bufferLength, 1, io_uring_buf_ring_mask(entries), 1);
    io_uring_buf_ring_add(ringBuffer, buffer2, bufferLength, 2, io_uring_buf_ring_mask(entries), 2);
    io_uring_buf_ring_advance(ringBuffer, 3);

    int listenFd = socket(AF_INET, SOCK_STREAM, 0);
    if (listenFd == -1) {
        printf("Failed to create socket: %s\n", strerror(errno));
        exit(1);
    }

    int option = 1;
    result = setsockopt(listenFd, SOL_SOCKET, SO_REUSEADDR, &option, sizeof(option));
    if (result == -1) {
        printf("Failed to set socket option: %s\n", strerror(errno));
        exit(1);
    }

    struct sockaddr_in address = {0};
    address.sin_addr.s_addr = INADDR_ANY;
    address.sin_family = AF_INET;
    address.sin_port = htons(8080);

    result = bind(listenFd, (struct sockaddr *) &address, sizeof(address));
    if (result == -1) {
        printf("Failed to bind socket: %s\n", strerror(errno));
        exit(1);
    }

    result = listen(listenFd, SOMAXCONN);
    if (result == -1) {
        printf("Failed to listen on socket: %s\n", strerror(errno));
        exit(1);
    }

    int clientFd = accept(listenFd, NULL, NULL);
    if (clientFd == -1) {
        printf("Failed to accept connection: %s\n", strerror(errno));
        exit(1);
    }

    for (int i = 0; i != 3; ++i) { // loop 3 times
        struct io_uring_sqe *sqe = io_uring_get_sqe(&ring);
        if (sqe == NULL) {
            printf("no sqe available\n");
            exit(1);
        }

        io_uring_prep_recv_multishot(sqe, clientFd, NULL, 0, 0);
        io_uring_sqe_set_flags(sqe, IOSQE_BUFFER_SELECT);
        sqe->ioprio = IORING_RECVSEND_BUNDLE;
        sqe->buf_group = ringBufferId;

        result = io_uring_submit_and_wait(&ring, 1);
        if (result < 0) {
            printf("Failed to submit and wait: %s\n", strerror(-result));
            exit(1);
        }

        int count = 0;
        struct io_uring_cqe *cqe;
        unsigned head;
        io_uring_for_each_cqe(&ring, head, cqe) {
            printf("received %d bytes\n",
                   cqe->res);    // only print once, if remove IORING_RECVSEND_BUNDLE, it will print 3 times

            ++count;
        }
        io_uring_cq_advance(&ring, count);
    }

    result = io_uring_free_buf_ring(&ring, ringBuffer, entries, ringBufferId);
    if (result != 0) {
        printf("Failed to free buffer ring: %s\n", strerror(-result));
        exit(1);
    }

    io_uring_queue_exit(&ring);
}

As described above, this is an acceptance program.

Three buffers were added, all 16 bytes in size

At this point, I write a send program that sends 40 bytes to this program.

I open a loop for three times.

Each loop initiates an io_uring_prep_recv_multishot

But each io_uring_for_each_cqe generates only one cqe and prints the data once.

The first two times it prints out 16 bytes and the last time it prints 8 bytes before it is all received.

So why can't it be accepted all at once and have to be initiated manually to do so?

@axboe
Copy link
Owner

axboe commented Jan 5, 2025

You should only generate and submit a new recv multishot, when the previous CQE for that multishot request doesn't have IORING_CQE_F_MORE set. If it does, then more completions will be generated by that request.

@axboe
Copy link
Owner

axboe commented Jan 5, 2025

That said, the behavior does sound odd. I'll take a look.

@axboe
Copy link
Owner

axboe commented Jan 5, 2025

Oh, well I think this one is pretty simple. When you do:

sqe->ioprio = IORING_RECVSEND_BUNDLE;

you're overwriting the multishot flag. Change that to a |= and I suspect it'll look more like what you expect.

@AomaYple
Copy link
Author

AomaYple commented Jan 6, 2025

Oh, well I think this one is pretty simple. When you do:

sqe->ioprio = IORING_RECVSEND_BUNDLE;

you're overwriting the multishot flag. Change that to a |= and I suspect it'll look more like what you expect.

#include <arpa/inet.h>
#include <liburing.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>

void addBuffer(struct io_uring_buf_ring *ringBuffer, int bufferId, int mask);
void submitRecvMultishot(struct io_uring *ring, int clientFd, int ringBufferId);
void forEachCqe(struct io_uring *ring, struct io_uring_buf_ring *ringBuffer, int *bufferId, int mask);

int main() {
    const unsigned entries = 32768;

    struct io_uring ring;
    int result = io_uring_queue_init(entries, &ring, 0);
    if (result != 0) {
        printf("Failed to initialize io_uring queue: %s\n", strerror(-result));
        exit(1);
    }

    const int ringBufferId = 0;
    int error;
    struct io_uring_buf_ring *ringBuffer =
        io_uring_setup_buf_ring(&ring, entries, ringBufferId, IOU_PBUF_RING_INC, &error);
    if (ringBuffer == NULL) {
        printf("Failed to setup buffer ring: %s\n", strerror(-error));
        exit(1);
    }
    int bufferId = 0;

    const int listenFd = socket(AF_INET, SOCK_STREAM, 0);
    if (listenFd == -1) {
        printf("Failed to create socket: %s\n", strerror(errno));
        exit(1);
    }

    const int option = 1;
    result = setsockopt(listenFd, SOL_SOCKET, SO_REUSEADDR, &option, sizeof(option));
    if (result == -1) {
        printf("Failed to set socket option: %s\n", strerror(errno));
        exit(1);
    }

    struct sockaddr_in address = {0};
    address.sin_addr.s_addr = INADDR_ANY;
    address.sin_family = AF_INET;
    address.sin_port = htons(8080);

    result = bind(listenFd, (struct sockaddr *) &address, sizeof(address));
    if (result == -1) {
        printf("Failed to bind socket: %s\n", strerror(errno));
        exit(1);
    }

    result = listen(listenFd, SOMAXCONN);
    if (result == -1) {
        printf("Failed to listen on socket: %s\n", strerror(errno));
        exit(1);
    }

    const int clientFd = accept(listenFd, NULL, NULL);
    if (clientFd == -1) {
        printf("Failed to accept connection: %s\n", strerror(errno));
        exit(1);
    }

    // client send 20 bytes

    submitRecvMultishot(&ring, clientFd, ringBufferId);
    forEachCqe(&ring, ringBuffer, &bufferId, io_uring_buf_ring_mask(entries));
    // first, generate ENOBUFS

    submitRecvMultishot(&ring, clientFd, ringBufferId);
    forEachCqe(&ring, ringBuffer, &bufferId, io_uring_buf_ring_mask(entries));
    // second, receive 16 bytes, but not all
    // if not set IORING_RECVSEND_BUNDLE, will generate two cqe, one for 16 bytes, one ENOBUFS, then add buffer
    // if set IORING_RECVSEND_BUNDLE, will generate one cqe, 16 bytes

    submitRecvMultishot(&ring, clientFd, ringBufferId);
    forEachCqe(&ring, ringBuffer, &bufferId, io_uring_buf_ring_mask(entries));
    // thrid
    // if not set IORING_RECVSEND_BUNDLE, will generate one cqe, 4 bytes
    // if set IORING_RECVSEND_BUNDLE, will generate one cqe, ENOBUFS, then add buffer

    submitRecvMultishot(&ring, clientFd, ringBufferId);
    forEachCqe(&ring, ringBuffer, &bufferId, io_uring_buf_ring_mask(entries));
    // if set IORING_RECVSEND_BUNDLE, will generate one cqe, 4 bytes

    result = io_uring_free_buf_ring(&ring, ringBuffer, entries, ringBufferId);
    if (result != 0) {
        printf("Failed to free buffer ring: %s\n", strerror(-result));
        exit(1);
    }

    io_uring_queue_exit(&ring);
}

void addBuffer(struct io_uring_buf_ring *const ringBuffer, const int bufferId, const int mask) {
    void *buffer = malloc(16);    // allocate 16 bytes
    io_uring_buf_ring_add(ringBuffer, buffer, 16, bufferId, mask, 0);
    io_uring_buf_ring_advance(ringBuffer, 1);
}

void submitRecvMultishot(struct io_uring *const ring, const int clientFd, const int ringBufferId) {
    struct io_uring_sqe *const sqe = io_uring_get_sqe(ring);
    if (sqe == NULL) {
        printf("no sqe available\n");
        exit(1);
    }

    io_uring_prep_recv_multishot(sqe, clientFd, NULL, 0, 0);
    io_uring_sqe_set_flags(sqe, IOSQE_BUFFER_SELECT);
    sqe->ioprio |= IORING_RECVSEND_BUNDLE;
    sqe->buf_group = ringBufferId;

    const int result = io_uring_submit_and_wait(ring, 1);
    if (result < 0) {
        printf("Failed to submit and wait: %s\n", strerror(-result));
        exit(1);
    }
}

void forEachCqe(struct io_uring *const ring, struct io_uring_buf_ring *const ringBuffer, int *const bufferId,
                const int mask) {
    struct io_uring_cqe *cqe;
    unsigned head;
    io_uring_for_each_cqe(ring, head, cqe) {
        printf("received %d bytes\n", cqe->res);
        if (cqe->res == -ENOBUFS) {
            printf("ENOBUFS\n");
            addBuffer(ringBuffer, (*bufferId)++, mask);
        }

        if ((cqe->flags & IORING_CQE_F_BUFFER) != 0) {
            const unsigned currentBufferId = cqe->flags >> IORING_CQE_BUFFER_SHIFT;
            printf("buffer id: %d\n", currentBufferId);
            printf("is more data: %d\n", (cqe->flags & IORING_CQE_F_BUF_MORE) != 0);
        }

        io_uring_cq_advance(ring, 1);
    }

    printf("loop end\n");
}

The client sends 20 bytes to the server. The server-side io_uring_buf_ring has a buffer size of 16 bytes per buffer, meaning that it takes at least two buffers to be accepted.

Assume that initiating an io_uring_prep_recv_multishot and io_uring_for_each_cqe is a single execution.

Disregarding reuse of buffers.

There are two scenarios here.

  1. Does not set IORING_RECVSEND_BUNDLE.

    1. The first execution, which produces a cqe, results in -ENOBUFS because there is no buffer.then add the buffer.
    2. The second execution produces two cqe. the first for received 16 bytes and the second for -ENOBUFS. then add the
      buffer.
    3. The third execution produces a cqe and receives 4 bytes of data.
  2. Setting IORING_RECVSEND_BUNDLE

    1. The first execution, which produces a cqe, results in -ENOBUFS because there is no buffer.then add the buffer.
    2. The second execution produces a cqe and receives 16 bytes.
    3. The third execution, which produces a cqe, results in -ENOBUFS because there is no buffer.then add the buffer.
    4. The fourth execution produces a cqe and receives 4 bytes of data.

So why are the two situations different? Why does IORING_RECVSEND_BUNDLE need to be executed once?

@axboe
Copy link
Owner

axboe commented Jan 6, 2025

I asked this before but you did not answer - what kernel are you using?

@AomaYple
Copy link
Author

AomaYple commented Jan 7, 2025

I asked this before but you did not answer - what kernel are you using?

Arch WSL 6.12.8

@axboe
Copy link
Owner

axboe commented Jan 7, 2025

Here's what I see:

IORING_RECVSEND_BUNDLE``` not set
received -105 bytes
ENOBUFS
loop end
received 16 bytes
buffer id: 0
is more data: 0
received -105 bytes
ENOBUFS
loop end
received 4 bytes
buffer id: 1
is more data: 1
received 0 bytes
loop end
received 0 bytes
loop end
IORING_RECVSEND_BUNDLE``` set
received -105 bytes
ENOBUFS
loop end
received 16 bytes
buffer id: 0
is more data: 0
loop end
received -105 bytes
ENOBUFS
loop end
received 4 bytes
buffer id: 1
is more data: 1
loop end

which looks as expected? I appreciate the reports, but you never seem to clearly mention how what you're seeing is differing from your expectations. Any bug or issue report should include that, to avoid the other end needing to guess.

@AomaYple
Copy link
Author

AomaYple commented Jan 8, 2025

Here's what I see:

IORING_RECVSEND_BUNDLE``` not set
received -105 bytes
ENOBUFS
loop end
received 16 bytes
buffer id: 0
is more data: 0
received -105 bytes
ENOBUFS
loop end
received 4 bytes
buffer id: 1
is more data: 1
received 0 bytes
loop end
received 0 bytes
loop end
IORING_RECVSEND_BUNDLE``` set
received -105 bytes
ENOBUFS
loop end
received 16 bytes
buffer id: 0
is more data: 0
loop end
received -105 bytes
ENOBUFS
loop end
received 4 bytes
buffer id: 1
is more data: 1
loop end

which looks as expected? I appreciate the reports, but you never seem to clearly mention how what you're seeing is differing from your expectations. Any bug or issue report should include that, to avoid the other end needing to guess.

It looks like you're getting the same results as me. My question is why after setting IORING_RECVSEND_BUNDLE, why does the second receive only generate a cqe and no ENOBUFS is generated.

In my other code, my buffer additions are triggered mostly by ENOBUFS, and if the second receive doesn't generate ENOBUFS, it causes that socket to never finish receiving the rest of the data.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants