Skip to content

Commit

Permalink
buffer: attempt to size raw_pipe buffers
Browse files Browse the repository at this point in the history
Make sure the requested length is below the maximum pipe size for now,
since we're only using one pipe and splicing once into and out of
it. The default max is 1MB on recent kernels, so this isn't such a
terrible limitation.

To get around this we could use multiple pipes, or keep both source and
destination fds open at the same time and call splice many times. This
is more usual usage for splice, but would require a lot more work to
restructure the filestore and messenger to handle it.

Signed-off-by: Josh Durgin <[email protected]>
  • Loading branch information
jdurgin committed Nov 23, 2013
1 parent 3f6fa05 commit be29b34
Show file tree
Hide file tree
Showing 2 changed files with 91 additions and 3 deletions.
8 changes: 8 additions & 0 deletions configure.ac
Original file line number Diff line number Diff line change
Expand Up @@ -541,7 +541,15 @@ AC_CHECK_FUNC([splice],
[AC_DEFINE([CEPH_HAVE_SPLICE], [], [splice(2) is supported])],
[])


AC_CHECK_HEADERS([arpa/nameser_compat.h])

AC_COMPILE_IFELSE([AC_LANG_SOURCE([[#include <fcntl.h>
F_SETPIPE_SZ]])],
[AC_DEFINE([CEPH_HAVE_SETPIPE_SZ], [], [F_SETPIPE_SZ is supported])],
[AC_MSG_NOTICE(["F_SETPIPE_SZ not found, zero-copy may be less efficent"])])


AC_CHECK_HEADERS([sys/prctl.h])
AC_CHECK_FUNCS([prctl])

Expand Down
86 changes: 83 additions & 3 deletions src/common/buffer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include "common/errno.h"
#include "common/safe_io.h"
#include "common/simple_spin.h"
#include "common/strtol.h"
#include "include/atomic.h"
#include "include/types.h"
#include "include/compat.h"
Expand Down Expand Up @@ -69,6 +70,40 @@ static uint32_t simple_spinlock_t buffer_debug_lock = SIMPLE_SPINLOCK_INITIALIZE
return buffer_cached_crc_adjusted.read();
}

atomic_t buffer_max_pipe_size;
int update_max_pipe_size() {
#ifdef CEPH_HAVE_SETPIPE_SZ
char buf[32];
int r;
std::string err;
struct stat stat_result;
if (::stat("/proc/sys/fs/pipe-max-size", &stat_result) == -1)
return -errno;
r = safe_read_file("/proc/sys/fs/", "pipe-max-size",
buf, sizeof(buf) - 1);
if (r < 0)
return r;
buf[r] = '\0';
size = strict_strtol(buf, 10, &err);
if (!err.empty())
return -EIO;
buffer_max_pipe_size.set(size);
#endif
return 0;
}

size_t get_max_pipe_size() {
#ifdef CEPH_HAVE_SETPIPE_SZ
size_t size = buffer_max_pipe_size.read();
if (size)
return size;
if (update_max_pipe_size() == 0)
return buffer_max_pipe_size.read()
#endif
// this is the max size hardcoded in linux before 2.6.35
return 65536;
}

buffer::error_code::error_code(int error) :
buffer::malformed_input(cpp_strerror(error).c_str()), code(error) {}

Expand Down Expand Up @@ -238,23 +273,40 @@ static uint32_t simple_spinlock_t buffer_debug_lock = SIMPLE_SPINLOCK_INITIALIZE
class buffer::raw_pipe : public buffer::raw {
public:
raw_pipe(unsigned len) : raw(len), source_consumed(false) {
size_t max = get_max_pipe_size();
if (len > max) {
bdout << "raw_pipe: requested length " << len
<< " > max length " << max << bendl;
throw malformed_input("length larger than max pipe size");
}
pipefds[0] = -1;
pipefds[1] = -1;

int r;
if (::pipe(pipefds) == -1) {
int r = -errno;
r = -errno;
bdout << "raw_pipe: error creating pipe: " << cpp_strerror(r) << bendl;
throw error_code(r);
}
if (set_nonblocking(pipefds) < 0) {
int r = -errno;

r = set_nonblocking(pipefds);
if (r < 0) {
bdout << "raw_pipe: error setting nonblocking flag on temp pipe: "
<< cpp_strerror(r) << bendl;
throw error_code(r);
}

r = set_pipe_size(pipefds, len);
if (r < 0) {
bdout << "raw_pipe: could not set pipe size" << bendl;
// continue, since the pipe should become large enough as needed
}

inc_total_alloc(len);
bdout << "raw_pipe " << this << " alloc " << len << " "
<< buffer::get_total_alloc() << bendl;
}

~raw_pipe() {
if (data)
delete data;
Expand All @@ -263,12 +315,15 @@ static uint32_t simple_spinlock_t buffer_debug_lock = SIMPLE_SPINLOCK_INITIALIZE
bdout << "raw_pipe " << this << " free " << (void *)data << " "
<< buffer::get_total_alloc() << bendl;
}

bool can_zero_copy() const {
return true;
}

bool is_page_aligned() {
return false;
}

int set_source(int fd, loff_t *off) {
int flags = SPLICE_F_NONBLOCK;
ssize_t r = safe_splice(fd, off, pipefds[1], NULL, len, flags);
Expand All @@ -281,6 +336,7 @@ static uint32_t simple_spinlock_t buffer_debug_lock = SIMPLE_SPINLOCK_INITIALIZE
len = r;
return 0;
}

int zero_copy_to_fd(int fd, loff_t *offset) {
assert(!source_consumed);
int flags = SPLICE_F_NONBLOCK;
Expand All @@ -293,17 +349,36 @@ static uint32_t simple_spinlock_t buffer_debug_lock = SIMPLE_SPINLOCK_INITIALIZE
source_consumed = true;
return 0;
}

buffer::raw* clone_empty() {
// cloning doesn't make sense for pipe-based buffers,
// and is only used by unit tests for other types of buffers
return NULL;
}

char *get_data() {
if (data)
return data;
return copy_pipe(pipefds);
}

private:
int set_pipe_size(int *fds, long length) {
#ifdef CEPH_HAVE_SETPIPE_SZ
if (::fcntl(fds[1], F_SETPIPE_SZ, length) == -1) {
int r = -errno;
if (r == -EPERM) {
// pipe limit must have changed - EPERM means we requested
// more than the maximum size as an unprivileged user
update_max_pipe_size();
throw malformed_input("length larger than new max pipe size");
}
return r;
}
#endif
return 0;
}

int set_nonblocking(int *fds) {
if (::fcntl(fds[0], F_SETFL, O_NONBLOCK) == -1)
return -errno;
Expand Down Expand Up @@ -340,6 +415,11 @@ static uint32_t simple_spinlock_t buffer_debug_lock = SIMPLE_SPINLOCK_INITIALIZE
<< cpp_strerror(r) << bendl;
throw error_code(r);
}
r = set_pipe_size(tmpfd, len);
if (r < 0) {
bdout << "raw_pipe: error setting pipe size on temp pipe: "
<< cpp_strerror(r) << bendl;
}
int flags = SPLICE_F_NONBLOCK;
if (::tee(fds[0], tmpfd[1], len, flags) == -1) {
r = errno;
Expand Down

0 comments on commit be29b34

Please sign in to comment.