From ebb261f9780669aba1963f7216ebc653204f659a Mon Sep 17 00:00:00 2001 From: Josh Durgin Date: Wed, 16 Oct 2013 16:23:36 -0700 Subject: [PATCH 01/10] buffer: abstract raw data related methods Create a virtual function that returns the raw data instead of accessing it directly, so raw buffers backed by pipes can be used as buffer::ptrs. Make raw::is_page_aligned() virtual so it will not need to look at the raw data for a pipe-based buffer. Signed-off-by: Josh Durgin --- src/common/buffer.cc | 19 ++++++++++++++----- 1 file changed, 14 insertions(+), 5 deletions(-) diff --git a/src/common/buffer.cc b/src/common/buffer.cc index 493070557159e..9dd98310d97ea 100644 --- a/src/common/buffer.cc +++ b/src/common/buffer.cc @@ -89,6 +89,9 @@ static uint32_t simple_spinlock_t buffer_debug_lock = SIMPLE_SPINLOCK_INITIALIZE raw(const raw &other); const raw& operator=(const raw &other); + virtual char *get_data() { + return data; + } virtual raw* clone_empty() = 0; raw *clone() { raw *c = clone_empty(); @@ -100,7 +103,7 @@ static uint32_t simple_spinlock_t buffer_debug_lock = SIMPLE_SPINLOCK_INITIALIZE return len; } - bool is_page_aligned() { + virtual bool is_page_aligned() { return ((long)data & ~CEPH_PAGE_MASK) == 0; } bool is_n_page_sized() { @@ -375,8 +378,14 @@ static uint32_t simple_spinlock_t buffer_debug_lock = SIMPLE_SPINLOCK_INITIALIZE bool buffer::ptr::at_buffer_tail() const { return _off + _len == _raw->len; } - const char *buffer::ptr::c_str() const { assert(_raw); return _raw->data + _off; } - char *buffer::ptr::c_str() { assert(_raw); return _raw->data + _off; } + const char *buffer::ptr::c_str() const { + assert(_raw); + return _raw->get_data() + _off; + } + char *buffer::ptr::c_str() { + assert(_raw); + return _raw->get_data() + _off; + } unsigned buffer::ptr::unused_tail_length() const { @@ -389,13 +398,13 @@ static uint32_t simple_spinlock_t buffer_debug_lock = SIMPLE_SPINLOCK_INITIALIZE { assert(_raw); assert(n < _len); - return _raw->data[_off + n]; + return _raw->get_data()[_off + n]; } char& buffer::ptr::operator[](unsigned n) { assert(_raw); assert(n < _len); - return _raw->data[_off + n]; + return _raw->get_data()[_off + n]; } const char *buffer::ptr::raw_c_str() const { assert(_raw); return _raw->data; } From b8518fea03af9c9873e87d715d1a5d4bb649410e Mon Sep 17 00:00:00 2001 From: Josh Durgin Date: Thu, 17 Oct 2013 16:34:55 -0700 Subject: [PATCH 02/10] buffer: remove unused raw::length() method This was accidentally introduced in 0c23a5624a80903fba7e635e8c44f38a79caf223 Signed-off-by: Josh Durgin --- src/common/buffer.cc | 5 ----- 1 file changed, 5 deletions(-) diff --git a/src/common/buffer.cc b/src/common/buffer.cc index 9dd98310d97ea..2fdb2cab19152 100644 --- a/src/common/buffer.cc +++ b/src/common/buffer.cc @@ -98,11 +98,6 @@ static uint32_t simple_spinlock_t buffer_debug_lock = SIMPLE_SPINLOCK_INITIALIZE memcpy(c->data, data, len); return c; } - - unsigned length() const { - return len; - } - virtual bool is_page_aligned() { return ((long)data & ~CEPH_PAGE_MASK) == 0; } From 30bc0e2791d1593043531988fe1c88500f3d77c3 Mon Sep 17 00:00:00 2001 From: Josh Durgin Date: Sun, 20 Oct 2013 10:46:49 -0700 Subject: [PATCH 03/10] safe_io: add functions for handling splice Like the other functions, these don't handle non-blocking I/O. Signed-off-by: Josh Durgin --- configure.ac | 6 ++++++ src/common/safe_io.c | 34 ++++++++++++++++++++++++++++++++++ src/common/safe_io.h | 13 +++++++++++++ 3 files changed, 53 insertions(+) diff --git a/configure.ac b/configure.ac index 58c5e1b94a634..91c623abdaf16 100644 --- a/configure.ac +++ b/configure.ac @@ -536,6 +536,12 @@ AC_CHECK_FUNC([fallocate], []) +# splice/tee +AC_CHECK_FUNC([splice], + [AC_DEFINE([CEPH_HAVE_SPLICE], [], [splice(2) is supported])], + []) + +AC_CHECK_HEADERS([arpa/nameser_compat.h]) AC_CHECK_HEADERS([sys/prctl.h]) AC_CHECK_FUNCS([prctl]) diff --git a/src/common/safe_io.c b/src/common/safe_io.c index afee82edf0778..16cc7293d9cd3 100644 --- a/src/common/safe_io.c +++ b/src/common/safe_io.c @@ -117,6 +117,40 @@ ssize_t safe_pwrite(int fd, const void *buf, size_t count, off_t offset) return 0; } +#ifdef CEPH_HAVE_SPLICE +ssize_t safe_splice(int fd_in, loff_t *off_in, int fd_out, loff_t *off_out, + size_t len, unsigned int flags) +{ + size_t cnt = 0; + + while (cnt < len) { + ssize_t r = splice(fd_in, off_in, fd_out, off_out, len - cnt, flags); + if (r <= 0) { + if (r == 0) { + // EOF + return cnt; + } + if (errno == EINTR) + continue; + return -errno; + } + cnt += r; + } + return cnt; +} + +ssize_t safe_splice_exact(int fd_in, loff_t *off_in, int fd_out, + loff_t *off_out, size_t len, unsigned int flags) +{ + ssize_t ret = safe_splice(fd_in, off_in, fd_out, off_out, len, flags); + if (ret < 0) + return ret; + if ((size_t)ret != len) + return -EDOM; + return 0; +} +#endif + int safe_write_file(const char *base, const char *file, const char *val, size_t vallen) { diff --git a/src/common/safe_io.h b/src/common/safe_io.h index a4c9bc7a72f91..c45589eee7e05 100644 --- a/src/common/safe_io.h +++ b/src/common/safe_io.h @@ -15,6 +15,7 @@ #ifndef CEPH_SAFE_IO #define CEPH_SAFE_IO +#include "acconfig.h" #include "common/compiler_extensions.h" #include @@ -35,6 +36,18 @@ extern "C" { WARN_UNUSED_RESULT; ssize_t safe_pwrite(int fd, const void *buf, size_t count, off_t offset) WARN_UNUSED_RESULT; +#ifdef CEPH_HAVE_SPLICE + /* + * Similar to the above (non-exact version) and below (exact version). + * See splice(2) for parameter descriptions. + */ + ssize_t safe_splice(int fd_in, loff_t *off_in, int fd_out, loff_t *off_out, + size_t len, unsigned int flags) + WARN_UNUSED_RESULT; + ssize_t safe_splice_exact(int fd_in, loff_t *off_in, int fd_out, + loff_t *off_out, size_t len, unsigned int flags) + WARN_UNUSED_RESULT; +#endif /* * Same as the above functions, but return -EDOM unless exactly the requested From eb94b8fd52c245bbcf305a0c91462ed0a291bd06 Mon Sep 17 00:00:00 2001 From: Josh Durgin Date: Mon, 21 Oct 2013 08:51:54 -0700 Subject: [PATCH 04/10] buffer: add an exception with an error code This allows e.g. raw buffer constructors to provide more specific information about what failed, as well as a useful error string automatically. Signed-off-by: Josh Durgin --- src/common/buffer.cc | 2 ++ src/include/buffer.h | 4 ++++ 2 files changed, 6 insertions(+) diff --git a/src/common/buffer.cc b/src/common/buffer.cc index 2fdb2cab19152..7ac54390943a5 100644 --- a/src/common/buffer.cc +++ b/src/common/buffer.cc @@ -69,6 +69,8 @@ static uint32_t simple_spinlock_t buffer_debug_lock = SIMPLE_SPINLOCK_INITIALIZE return buffer_cached_crc_adjusted.read(); } + buffer::error_code::error_code(int error) : + buffer::malformed_input(cpp_strerror(error).c_str()), code(error) {} class buffer::raw { public: diff --git a/src/include/buffer.h b/src/include/buffer.h index 0b497a7cf38d6..3987f021601cd 100644 --- a/src/include/buffer.h +++ b/src/include/buffer.h @@ -101,6 +101,10 @@ class buffer { private: char buf[256]; }; + struct error_code : public malformed_input { + explicit error_code(int error); + int code; + }; /// total bytes allocated From 5021b43e0a6ece07b8dd364ce227f60e4916f707 Mon Sep 17 00:00:00 2001 From: Josh Durgin Date: Mon, 21 Oct 2013 08:58:56 -0700 Subject: [PATCH 05/10] buffer: create raw pipe-based buffer This uses a pipe to reference kernel memory so we can use splice(2) to avoid extra data copies. Take an fd in the factory to create it, since that's the only way to use it efficiently, which is its whole purpose. Signed-off-by: Josh Durgin --- src/common/buffer.cc | 119 +++++++++++++++++++++++++++++++++++++ src/include/buffer.h | 5 +- src/test/bufferlist.cc | 129 +++++++++++++++++++++++++++++++++++++++++ 3 files changed, 251 insertions(+), 2 deletions(-) diff --git a/src/common/buffer.cc b/src/common/buffer.cc index 7ac54390943a5..cf20080fe5666 100644 --- a/src/common/buffer.cc +++ b/src/common/buffer.cc @@ -228,6 +228,125 @@ static uint32_t simple_spinlock_t buffer_debug_lock = SIMPLE_SPINLOCK_INITIALIZE }; #endif +#ifdef CEPH_HAVE_SPLICE + class buffer::raw_pipe : public buffer::raw { + public: + raw_pipe(unsigned len) : raw(len), source_consumed(false) { + pipefds[0] = -1; + pipefds[1] = -1; + if (::pipe(pipefds) == -1) { + int r = -errno; + bdout << "raw_pipe: error creating pipe: " << cpp_strerror(r) << bendl; + throw error_code(r); + } + if (set_nonblocking(pipefds) < 0) { + int r = -errno; + bdout << "raw_pipe: error setting nonblocking flag on temp pipe: " + << cpp_strerror(r) << bendl; + throw error_code(r); + } + inc_total_alloc(len); + bdout << "raw_pipe " << this << " alloc " << len << " " + << buffer::get_total_alloc() << bendl; + } + ~raw_pipe() { + if (data) + delete data; + close_pipe(pipefds); + dec_total_alloc(len); + bdout << "raw_pipe " << this << " free " << (void *)data << " " + << buffer::get_total_alloc() << bendl; + } + return false; + } + int set_source(int fd, loff_t *off) { + int flags = SPLICE_F_NONBLOCK; + ssize_t r = safe_splice(fd, off, pipefds[1], NULL, len, flags); + if (r < 0) { + bdout << "raw_pipe: error splicing into pipe: " << cpp_strerror(r) + << bendl; + return r; + } + // update length with actual amount read + len = r; + return 0; + } + // cloning doesn't make sense for pipe-based buffers, + // and is only used by unit tests for other types of buffers + return NULL; + } + char *get_data() { + if (data) + return data; + return copy_pipe(pipefds); + } + private: + int set_nonblocking(int *fds) { + if (::fcntl(fds[0], F_SETFL, O_NONBLOCK) == -1) + return -errno; + if (::fcntl(fds[1], F_SETFL, O_NONBLOCK) == -1) + return -errno; + return 0; + } + + void close_pipe(int *fds) { + if (fds[0] >= 0) + TEMP_FAILURE_RETRY(::close(fds[0])); + if (fds[1] >= 0) + TEMP_FAILURE_RETRY(::close(fds[1])); + } + char *copy_pipe(int *fds) { + /* preserve original pipe contents by copying into a temporary + * pipe before reading. + */ + int tmpfd[2]; + int r; + + assert(!source_consumed); + assert(fds[0] >= 0); + + if (::pipe(tmpfd) == -1) { + r = -errno; + bdout << "raw_pipe: error creating temp pipe: " << cpp_strerror(r) + << bendl; + throw error_code(r); + } + r = set_nonblocking(tmpfd); + if (r < 0) { + bdout << "raw_pipe: error setting nonblocking flag on temp pipe: " + << cpp_strerror(r) << bendl; + throw error_code(r); + } + int flags = SPLICE_F_NONBLOCK; + if (::tee(fds[0], tmpfd[1], len, flags) == -1) { + r = errno; + bdout << "raw_pipe: error tee'ing into temp pipe: " << cpp_strerror(r) + << bendl; + close_pipe(tmpfd); + throw error_code(r); + } + data = (char *)malloc(len); + if (!data) { + close_pipe(tmpfd); + throw bad_alloc(); + } + r = safe_read(tmpfd[0], data, len); + if (r < (ssize_t)len) { + bdout << "raw_pipe: error reading from temp pipe:" << cpp_strerror(r) + << bendl; + delete data; + data = NULL; + close_pipe(tmpfd); + throw error_code(r); + } + close_pipe(tmpfd); + return data; + } + bool source_consumed; + int pipefds[2]; + }; +#endif // CEPH_HAVE_SPLICE + /* * primitive buffer types */ diff --git a/src/include/buffer.h b/src/include/buffer.h index 3987f021601cd..546f728bbdc98 100644 --- a/src/include/buffer.h +++ b/src/include/buffer.h @@ -137,6 +137,7 @@ class buffer { class raw_posix_aligned; class raw_hack_aligned; class raw_char; + class raw_pipe; friend std::ostream& operator<<(std::ostream& out, const raw &r); @@ -152,8 +153,8 @@ class buffer { static raw* claim_malloc(unsigned len, char *buf); static raw* create_static(unsigned len, char *buf); static raw* create_page_aligned(unsigned len); - - + static raw* create_zero_copy(unsigned len, int fd, loff_t *offset); + /* * a buffer pointer. references (a subsequence of) a raw buffer. */ diff --git a/src/test/bufferlist.cc b/src/test/bufferlist.cc index 8b6ca269234ff..61db1ca33467f 100644 --- a/src/test/bufferlist.cc +++ b/src/test/bufferlist.cc @@ -29,6 +29,7 @@ #include "include/encoding.h" #include "common/environment.h" #include "common/Clock.h" +#include "common/safe_io.h" #include "gtest/gtest.h" #include "stdlib.h" @@ -141,6 +142,25 @@ TEST(Buffer, constructors) { bufferptr clone = ptr.clone(); EXPECT_EQ(0, ::memcmp(clone.c_str(), ptr.c_str(), len)); } +#ifdef CEPH_HAVE_SPLICE + if (ceph_buffer_track) + EXPECT_EQ(0, buffer::get_total_alloc()); + { + // no fd + EXPECT_THROW(buffer::create_zero_copy(len, -1, NULL), buffer::error_code); + + unsigned zc_len = 4; + ::unlink("testfile"); + ::system("echo ABC > testfile"); + int fd = ::open("testfile", O_RDONLY); + bufferptr ptr(buffer::create_zero_copy(zc_len, fd, NULL)); + EXPECT_EQ(zc_len, ptr.length()); + if (ceph_buffer_track) + EXPECT_EQ(zc_len, (unsigned)buffer::get_total_alloc()); + ::close(fd); + ::unlink("testfile"); + } +#endif if (ceph_buffer_track) EXPECT_EQ(0, buffer::get_total_alloc()); } @@ -153,6 +173,115 @@ TEST(BufferRaw, ostream) { EXPECT_GT(stream.str().size(), stream.str().find("len 1 nref 1)")); } +#ifdef CEPH_HAVE_SPLICE +class TestRawPipe : public ::testing::Test { +protected: + virtual void SetUp() { + len = 4; + ::unlink("testfile"); + ::system("echo ABC > testfile"); + fd = ::open("testfile", O_RDONLY); + assert(fd >= 0); + } + virtual void TearDown() { + ::close(fd); + ::unlink("testfile"); + } + int fd; + unsigned len; +}; + +TEST_F(TestRawPipe, create_zero_copy) { + bufferptr ptr(buffer::create_zero_copy(len, fd, NULL)); + EXPECT_EQ(len, ptr.length()); + if (get_env_bool("CEPH_BUFFER_TRACK")) + EXPECT_EQ(len, (unsigned)buffer::get_total_alloc()); +} + +TEST_F(TestRawPipe, c_str_no_fd) { + EXPECT_THROW(bufferptr ptr(buffer::create_zero_copy(len, -1, NULL)), + buffer::error_code); +} + +TEST_F(TestRawPipe, c_str_basic) { + bufferptr ptr = bufferptr(buffer::create_zero_copy(len, fd, NULL)); + EXPECT_EQ(0, memcmp(ptr.c_str(), "ABC\n", len)); + EXPECT_EQ(len, ptr.length()); +} + +TEST_F(TestRawPipe, c_str_twice) { + // make sure we're creating a copy of the data and not consuming it + bufferptr ptr = bufferptr(buffer::create_zero_copy(len, fd, NULL)); + EXPECT_EQ(len, ptr.length()); + EXPECT_EQ(0, memcmp(ptr.c_str(), "ABC\n", len)); + EXPECT_EQ(0, memcmp(ptr.c_str(), "ABC\n", len)); +} + +TEST_F(TestRawPipe, c_str_basic_offset) { + loff_t offset = len - 1; + ::lseek(fd, offset, SEEK_SET); + bufferptr ptr = bufferptr(buffer::create_zero_copy(len - offset, fd, NULL)); + EXPECT_EQ(len - offset, ptr.length()); + EXPECT_EQ(0, memcmp(ptr.c_str(), "\n", len - offset)); +} + +TEST_F(TestRawPipe, c_str_dest_short) { + ::lseek(fd, 1, SEEK_SET); + bufferptr ptr = bufferptr(buffer::create_zero_copy(2, fd, NULL)); + EXPECT_EQ(2u, ptr.length()); + EXPECT_EQ(0, memcmp(ptr.c_str(), "BC", 2)); +} + +TEST_F(TestRawPipe, c_str_source_short) { + ::lseek(fd, 1, SEEK_SET); + bufferptr ptr = bufferptr(buffer::create_zero_copy(len, fd, NULL)); + EXPECT_EQ(len - 1, ptr.length()); + EXPECT_EQ(0, memcmp(ptr.c_str(), "BC\n", len - 1)); +} + +TEST_F(TestRawPipe, c_str_explicit_zero_offset) { + loff_t offset = 0; + ::lseek(fd, 1, SEEK_SET); + bufferptr ptr = bufferptr(buffer::create_zero_copy(len, fd, &offset)); + EXPECT_EQ(len, offset); + EXPECT_EQ(len, ptr.length()); + EXPECT_EQ(0, memcmp(ptr.c_str(), "ABC\n", len)); +} + +TEST_F(TestRawPipe, c_str_explicit_positive_offset) { + loff_t offset = 1; + bufferptr ptr = bufferptr(buffer::create_zero_copy(len - offset, fd, + &offset)); + EXPECT_EQ(len, offset); + EXPECT_EQ(len - 1, ptr.length()); + EXPECT_EQ(0, memcmp(ptr.c_str(), "BC\n", len - 1)); +} + +TEST_F(TestRawPipe, c_str_explicit_positive_empty_result) { + loff_t offset = len; + bufferptr ptr = bufferptr(buffer::create_zero_copy(len - offset, fd, + &offset)); + EXPECT_EQ(len, offset); + EXPECT_EQ(0u, ptr.length()); +} + +TEST_F(TestRawPipe, c_str_source_short_explicit_offset) { + loff_t offset = 1; + bufferptr ptr = bufferptr(buffer::create_zero_copy(len, fd, &offset)); + EXPECT_EQ(len, offset); + EXPECT_EQ(len - 1, ptr.length()); + EXPECT_EQ(0, memcmp(ptr.c_str(), "BC\n", len - 1)); +} + +TEST_F(TestRawPipe, c_str_dest_short_explicit_offset) { + loff_t offset = 1; + bufferptr ptr = bufferptr(buffer::create_zero_copy(2, fd, &offset)); + EXPECT_EQ(3, offset); + EXPECT_EQ(2u, ptr.length()); + EXPECT_EQ(0, memcmp(ptr.c_str(), "BC", 2)); +} +#endif // CEPH_HAVE_SPLICE + // // +-----------+ +-----+ // | | | | From 3f6fa05d7224f9b04d849c4a5309a3da01d3cb34 Mon Sep 17 00:00:00 2001 From: Josh Durgin Date: Mon, 21 Oct 2013 12:40:30 -0700 Subject: [PATCH 06/10] buffer: add methods to read and write using zero copy Create explicit methods for testing. Make buffer::list::write_fd() use zero-copy if all the buffers support it. Don't automatically handle reads yet, since we need better detection of read length first. Signed-off-by: Josh Durgin --- src/common/buffer.cc | 102 ++++++++++++++++++++++++++++++++++++++++- src/include/buffer.h | 7 +++ src/test/bufferlist.cc | 31 +++++++++++++ 3 files changed, 138 insertions(+), 2 deletions(-) diff --git a/src/common/buffer.cc b/src/common/buffer.cc index cf20080fe5666..ed0c3a96a96f1 100644 --- a/src/common/buffer.cc +++ b/src/common/buffer.cc @@ -100,6 +100,12 @@ static uint32_t simple_spinlock_t buffer_debug_lock = SIMPLE_SPINLOCK_INITIALIZE memcpy(c->data, data, len); return c; } + virtual bool can_zero_copy() const { + return false; + } + virtual int zero_copy_to_fd(int fd, loff_t *offset) { + return -ENOTSUP; + } virtual bool is_page_aligned() { return ((long)data & ~CEPH_PAGE_MASK) == 0; } @@ -257,6 +263,10 @@ static uint32_t simple_spinlock_t buffer_debug_lock = SIMPLE_SPINLOCK_INITIALIZE bdout << "raw_pipe " << this << " free " << (void *)data << " " << buffer::get_total_alloc() << bendl; } + bool can_zero_copy() const { + return true; + } + bool is_page_aligned() { return false; } int set_source(int fd, loff_t *off) { @@ -271,6 +281,19 @@ static uint32_t simple_spinlock_t buffer_debug_lock = SIMPLE_SPINLOCK_INITIALIZE len = r; return 0; } + int zero_copy_to_fd(int fd, loff_t *offset) { + assert(!source_consumed); + int flags = SPLICE_F_NONBLOCK; + ssize_t r = safe_splice_exact(pipefds[0], NULL, fd, offset, len, flags); + if (r < 0) { + bdout << "raw_pipe: error splicing from pipe to fd: " + << cpp_strerror(r) << bendl; + return r; + } + source_consumed = true; + return 0; + } + buffer::raw* clone_empty() { // cloning doesn't make sense for pipe-based buffers, // and is only used by unit tests for other types of buffers return NULL; @@ -412,6 +435,20 @@ static uint32_t simple_spinlock_t buffer_debug_lock = SIMPLE_SPINLOCK_INITIALIZE #endif } + buffer::raw* buffer::create_zero_copy(unsigned len, int fd, loff_t *offset) { +#ifdef CEPH_HAVE_SPLICE + buffer::raw_pipe* buf = new raw_pipe(len); + int r = buf->set_source(fd, offset); + if (r < 0) { + delete buf; + throw error_code(r); + } + return buf; +#else + throw error_code(-ENOTSUP); +#endif + } + buffer::ptr::ptr(raw *r) : _raw(r), _off(0), _len(r->len) // no lock needed; this is an unref raw. { r->nref.inc(); @@ -597,6 +634,15 @@ static uint32_t simple_spinlock_t buffer_debug_lock = SIMPLE_SPINLOCK_INITIALIZE memset(c_str()+o, 0, l); } + bool buffer::ptr::can_zero_copy() const + { + return _raw->can_zero_copy(); + } + + int buffer::ptr::zero_copy_to_fd(int fd, loff_t *offset) const + { + return _raw->zero_copy_to_fd(fd, offset); + } // -- buffer::list::iterator -- /* @@ -859,6 +905,16 @@ static uint32_t simple_spinlock_t buffer_debug_lock = SIMPLE_SPINLOCK_INITIALIZE } } + bool buffer::list::can_zero_copy() const + { + for (std::list::const_iterator it = _buffers.begin(); + it != _buffers.end(); + ++it) + if (!it->can_zero_copy()) + return false; + return true; + } + bool buffer::list::is_page_aligned() const { for (std::list::const_iterator it = _buffers.begin(); @@ -1351,7 +1407,7 @@ int buffer::list::read_file(const char *fn, std::string *error) return 0; } -ssize_t buffer::list::read_fd(int fd, size_t len) +ssize_t buffer::list::read_fd(int fd, size_t len) { int s = ROUND_UP_TO(len, CEPH_PAGE_SIZE); bufferptr bp = buffer::create_page_aligned(s); @@ -1363,6 +1419,21 @@ ssize_t buffer::list::read_fd(int fd, size_t len) return ret; } +int buffer::list::read_fd_zero_copy(int fd, size_t len) +{ +#ifdef CEPH_HAVE_SPLICE + try { + bufferptr bp = buffer::create_zero_copy(len, fd, NULL); + append(bp); + } catch (buffer::error_code e) { + return e.code; + } + return 0; +#else + return -ENOTSUP; +#endif +} + int buffer::list::write_file(const char *fn, int mode) { int fd = TEMP_FAILURE_RETRY(::open(fn, O_WRONLY|O_CREAT|O_TRUNC, mode)); @@ -1390,12 +1461,15 @@ int buffer::list::write_file(const char *fn, int mode) int buffer::list::write_fd(int fd) const { + if (can_zero_copy()) + return write_fd_zero_copy(fd); + // use writev! iovec iov[IOV_MAX]; int iovlen = 0; ssize_t bytes = 0; - std::list::const_iterator p = _buffers.begin(); + std::list::const_iterator p = _buffers.begin(); while (p != _buffers.end()) { if (p->length() > 0) { iov[iovlen].iov_base = (void *)p->c_str(); @@ -1440,6 +1514,30 @@ int buffer::list::write_fd(int fd) const return 0; } +int buffer::list::write_fd_zero_copy(int fd) const +{ + if (!can_zero_copy()) + return -ENOTSUP; + /* pass offset to each call to avoid races updating the fd seek + * position, since the I/O may be non-blocking + */ + loff_t offset = ::lseek(fd, 0, SEEK_CUR); + loff_t *off_p = &offset; + if (offset < 0 && offset != ESPIPE) + return (int) offset; + if (offset == ESPIPE) + off_p = NULL; + for (std::list::const_iterator it = _buffers.begin(); + it != _buffers.end(); ++it) { + int r = it->zero_copy_to_fd(fd, off_p); + if (r < 0) + return r; + if (off_p) + offset += it->length(); + } + return 0; +} + __u32 buffer::list::crc32c(__u32 crc) const { for (std::list::const_iterator it = _buffers.begin(); diff --git a/src/include/buffer.h b/src/include/buffer.h index 546f728bbdc98..6eefd3494cfc7 100644 --- a/src/include/buffer.h +++ b/src/include/buffer.h @@ -211,6 +211,9 @@ class buffer { memcpy(dest, c_str()+o, l); } + bool can_zero_copy() const; + int zero_copy_to_fd(int fd, loff_t *offset) const; + unsigned wasted(); int cmp(const ptr& o); @@ -310,6 +313,7 @@ class buffer { private: mutable iterator last_p; + int zero_copy_to_fd(int fd) const; public: // cons/des @@ -347,6 +351,7 @@ class buffer { } bool contents_equal(buffer::list& other); + bool can_zero_copy() const; bool is_page_aligned() const; bool is_n_page_sized() const; @@ -434,8 +439,10 @@ class buffer { void hexdump(std::ostream &out) const; int read_file(const char *fn, std::string *error); ssize_t read_fd(int fd, size_t len); + int read_fd_zero_copy(int fd, size_t len); int write_file(const char *fn, int mode=0644); int write_fd(int fd) const; + int write_fd_zero_copy(int fd) const; uint32_t crc32c(uint32_t crc) const; }; diff --git a/src/test/bufferlist.cc b/src/test/bufferlist.cc index 61db1ca33467f..4b3ae61672671 100644 --- a/src/test/bufferlist.cc +++ b/src/test/bufferlist.cc @@ -280,6 +280,37 @@ TEST_F(TestRawPipe, c_str_dest_short_explicit_offset) { EXPECT_EQ(2u, ptr.length()); EXPECT_EQ(0, memcmp(ptr.c_str(), "BC", 2)); } + +TEST_F(TestRawPipe, buffer_list_read_fd_zero_copy) { + bufferlist bl; + EXPECT_EQ(-EBADF, bl.read_fd_zero_copy(-1, len)); + bl = bufferlist(); + EXPECT_EQ(0, bl.read_fd_zero_copy(fd, len)); + EXPECT_EQ(len, bl.length()); + EXPECT_EQ(0u, bl.buffers().front().unused_tail_length()); + EXPECT_EQ(1u, bl.buffers().size()); + EXPECT_EQ(len, bl.buffers().front().raw_length()); + EXPECT_EQ(0, memcmp(bl.c_str(), "ABC\n", len)); + EXPECT_TRUE(bl.can_zero_copy()); +} + +TEST_F(TestRawPipe, buffer_list_write_fd_zero_copy) { + ::unlink("testfile_out"); + bufferlist bl; + EXPECT_EQ(0, bl.read_fd_zero_copy(fd, len)); + EXPECT_TRUE(bl.can_zero_copy()); + int out_fd = ::open("testfile_out", O_RDWR|O_CREAT|O_TRUNC, 0600); + EXPECT_EQ(0, bl.write_fd_zero_copy(out_fd)); + struct stat st; + memset(&st, 0, sizeof(st)); + EXPECT_EQ(0, ::stat("testfile_out", &st)); + EXPECT_EQ(len, st.st_size); + char buf[len + 1]; + EXPECT_EQ(len, safe_read(out_fd, buf, len + 1)); + EXPECT_EQ(0, memcmp(buf, "ABC\n", len)); + ::close(out_fd); + ::unlink("testfile_out"); +} #endif // CEPH_HAVE_SPLICE // From be29b3471c3f92d28b7aa05449e89ea83603e295 Mon Sep 17 00:00:00 2001 From: Josh Durgin Date: Fri, 18 Oct 2013 07:46:34 -0700 Subject: [PATCH 07/10] buffer: attempt to size raw_pipe buffers Make sure the requested length is below the maximum pipe size for now, since we're only using one pipe and splicing once into and out of it. The default max is 1MB on recent kernels, so this isn't such a terrible limitation. To get around this we could use multiple pipes, or keep both source and destination fds open at the same time and call splice many times. This is more usual usage for splice, but would require a lot more work to restructure the filestore and messenger to handle it. Signed-off-by: Josh Durgin --- configure.ac | 8 +++++ src/common/buffer.cc | 86 ++++++++++++++++++++++++++++++++++++++++++-- 2 files changed, 91 insertions(+), 3 deletions(-) diff --git a/configure.ac b/configure.ac index 91c623abdaf16..3c3e227bed86b 100644 --- a/configure.ac +++ b/configure.ac @@ -541,7 +541,15 @@ AC_CHECK_FUNC([splice], [AC_DEFINE([CEPH_HAVE_SPLICE], [], [splice(2) is supported])], []) + AC_CHECK_HEADERS([arpa/nameser_compat.h]) + +AC_COMPILE_IFELSE([AC_LANG_SOURCE([[#include +F_SETPIPE_SZ]])], + [AC_DEFINE([CEPH_HAVE_SETPIPE_SZ], [], [F_SETPIPE_SZ is supported])], + [AC_MSG_NOTICE(["F_SETPIPE_SZ not found, zero-copy may be less efficent"])]) + + AC_CHECK_HEADERS([sys/prctl.h]) AC_CHECK_FUNCS([prctl]) diff --git a/src/common/buffer.cc b/src/common/buffer.cc index ed0c3a96a96f1..650b3fbb331ee 100644 --- a/src/common/buffer.cc +++ b/src/common/buffer.cc @@ -18,6 +18,7 @@ #include "common/errno.h" #include "common/safe_io.h" #include "common/simple_spin.h" +#include "common/strtol.h" #include "include/atomic.h" #include "include/types.h" #include "include/compat.h" @@ -69,6 +70,40 @@ static uint32_t simple_spinlock_t buffer_debug_lock = SIMPLE_SPINLOCK_INITIALIZE return buffer_cached_crc_adjusted.read(); } + atomic_t buffer_max_pipe_size; + int update_max_pipe_size() { +#ifdef CEPH_HAVE_SETPIPE_SZ + char buf[32]; + int r; + std::string err; + struct stat stat_result; + if (::stat("/proc/sys/fs/pipe-max-size", &stat_result) == -1) + return -errno; + r = safe_read_file("/proc/sys/fs/", "pipe-max-size", + buf, sizeof(buf) - 1); + if (r < 0) + return r; + buf[r] = '\0'; + size = strict_strtol(buf, 10, &err); + if (!err.empty()) + return -EIO; + buffer_max_pipe_size.set(size); +#endif + return 0; + } + + size_t get_max_pipe_size() { +#ifdef CEPH_HAVE_SETPIPE_SZ + size_t size = buffer_max_pipe_size.read(); + if (size) + return size; + if (update_max_pipe_size() == 0) + return buffer_max_pipe_size.read() +#endif + // this is the max size hardcoded in linux before 2.6.35 + return 65536; + } + buffer::error_code::error_code(int error) : buffer::malformed_input(cpp_strerror(error).c_str()), code(error) {} @@ -238,23 +273,40 @@ static uint32_t simple_spinlock_t buffer_debug_lock = SIMPLE_SPINLOCK_INITIALIZE class buffer::raw_pipe : public buffer::raw { public: raw_pipe(unsigned len) : raw(len), source_consumed(false) { + size_t max = get_max_pipe_size(); + if (len > max) { + bdout << "raw_pipe: requested length " << len + << " > max length " << max << bendl; + throw malformed_input("length larger than max pipe size"); + } pipefds[0] = -1; pipefds[1] = -1; + + int r; if (::pipe(pipefds) == -1) { - int r = -errno; + r = -errno; bdout << "raw_pipe: error creating pipe: " << cpp_strerror(r) << bendl; throw error_code(r); } - if (set_nonblocking(pipefds) < 0) { - int r = -errno; + + r = set_nonblocking(pipefds); + if (r < 0) { bdout << "raw_pipe: error setting nonblocking flag on temp pipe: " << cpp_strerror(r) << bendl; throw error_code(r); } + + r = set_pipe_size(pipefds, len); + if (r < 0) { + bdout << "raw_pipe: could not set pipe size" << bendl; + // continue, since the pipe should become large enough as needed + } + inc_total_alloc(len); bdout << "raw_pipe " << this << " alloc " << len << " " << buffer::get_total_alloc() << bendl; } + ~raw_pipe() { if (data) delete data; @@ -263,12 +315,15 @@ static uint32_t simple_spinlock_t buffer_debug_lock = SIMPLE_SPINLOCK_INITIALIZE bdout << "raw_pipe " << this << " free " << (void *)data << " " << buffer::get_total_alloc() << bendl; } + bool can_zero_copy() const { return true; } + bool is_page_aligned() { return false; } + int set_source(int fd, loff_t *off) { int flags = SPLICE_F_NONBLOCK; ssize_t r = safe_splice(fd, off, pipefds[1], NULL, len, flags); @@ -281,6 +336,7 @@ static uint32_t simple_spinlock_t buffer_debug_lock = SIMPLE_SPINLOCK_INITIALIZE len = r; return 0; } + int zero_copy_to_fd(int fd, loff_t *offset) { assert(!source_consumed); int flags = SPLICE_F_NONBLOCK; @@ -293,17 +349,36 @@ static uint32_t simple_spinlock_t buffer_debug_lock = SIMPLE_SPINLOCK_INITIALIZE source_consumed = true; return 0; } + buffer::raw* clone_empty() { // cloning doesn't make sense for pipe-based buffers, // and is only used by unit tests for other types of buffers return NULL; } + char *get_data() { if (data) return data; return copy_pipe(pipefds); } + private: + int set_pipe_size(int *fds, long length) { +#ifdef CEPH_HAVE_SETPIPE_SZ + if (::fcntl(fds[1], F_SETPIPE_SZ, length) == -1) { + int r = -errno; + if (r == -EPERM) { + // pipe limit must have changed - EPERM means we requested + // more than the maximum size as an unprivileged user + update_max_pipe_size(); + throw malformed_input("length larger than new max pipe size"); + } + return r; + } +#endif + return 0; + } + int set_nonblocking(int *fds) { if (::fcntl(fds[0], F_SETFL, O_NONBLOCK) == -1) return -errno; @@ -340,6 +415,11 @@ static uint32_t simple_spinlock_t buffer_debug_lock = SIMPLE_SPINLOCK_INITIALIZE << cpp_strerror(r) << bendl; throw error_code(r); } + r = set_pipe_size(tmpfd, len); + if (r < 0) { + bdout << "raw_pipe: error setting pipe size on temp pipe: " + << cpp_strerror(r) << bendl; + } int flags = SPLICE_F_NONBLOCK; if (::tee(fds[0], tmpfd[1], len, flags) == -1) { r = errno; From 445fb183906a56e3b412537f3756c38af667cb25 Mon Sep 17 00:00:00 2001 From: Josh Durgin Date: Fri, 18 Oct 2013 07:58:10 -0700 Subject: [PATCH 08/10] buffer: try to do zero copy in read_fd Leave the explicit read_fd_zero_copy around as well for testing. Signed-off-by: Josh Durgin --- src/common/buffer.cc | 8 ++++++++ src/test/bufferlist.cc | 7 ++++++- 2 files changed, 14 insertions(+), 1 deletion(-) diff --git a/src/common/buffer.cc b/src/common/buffer.cc index 650b3fbb331ee..66a43913cc4b4 100644 --- a/src/common/buffer.cc +++ b/src/common/buffer.cc @@ -1489,6 +1489,12 @@ int buffer::list::read_file(const char *fn, std::string *error) ssize_t buffer::list::read_fd(int fd, size_t len) { + // try zero copy first + if (read_fd_zero_copy(fd, len) == 0) { + // TODO fix callers to not require correct read size, which is not + // available for raw_pipe until we actually inspect the data + return 0; + } int s = ROUND_UP_TO(len, CEPH_PAGE_SIZE); bufferptr bp = buffer::create_page_aligned(s); ssize_t ret = safe_read(fd, (void*)bp.c_str(), len); @@ -1507,6 +1513,8 @@ int buffer::list::read_fd_zero_copy(int fd, size_t len) append(bp); } catch (buffer::error_code e) { return e.code; + } catch (buffer::malformed_input) { + return -EIO; } return 0; #else diff --git a/src/test/bufferlist.cc b/src/test/bufferlist.cc index 4b3ae61672671..822b5b4c402a4 100644 --- a/src/test/bufferlist.cc +++ b/src/test/bufferlist.cc @@ -1763,9 +1763,14 @@ TEST(BufferList, read_fd) { bufferlist bl; EXPECT_EQ(-EBADF, bl.read_fd(fd, len)); fd = ::open("testfile", O_RDONLY); +#ifdef CEPH_HAVE_SPLICE + EXPECT_EQ(0, bl.read_fd(fd, len)); + EXPECT_EQ(0u, bl.buffers().front().unused_tail_length()); +#else EXPECT_EQ(len, (unsigned)bl.read_fd(fd, len)); - EXPECT_EQ(len, bl.length()); EXPECT_EQ(CEPH_PAGE_SIZE - len, bl.buffers().front().unused_tail_length()); +#endif + EXPECT_EQ(len, bl.length()); ::close(fd); ::unlink("testfile"); } From 75d4a72086bcfcab5c0d6834e72cbc6e473c801e Mon Sep 17 00:00:00 2001 From: Josh Durgin Date: Fri, 18 Oct 2013 08:23:40 -0700 Subject: [PATCH 09/10] buffer: enable tracking of calls to c_str() Track buffer::ptr::c_str() to catch internal calls that use it, like buffer::ptr::cmp(). buffer::list::c_str() will be captured by this as well, since it will do a final buffer::ptr::c_str() and possibly several more if it needs to rebuild into a single raw buffer. Signed-off-by: Josh Durgin --- src/common/buffer.cc | 14 ++++++++++++++ src/include/buffer.h | 4 ++++ 2 files changed, 18 insertions(+) diff --git a/src/common/buffer.cc b/src/common/buffer.cc index 66a43913cc4b4..229625910192f 100644 --- a/src/common/buffer.cc +++ b/src/common/buffer.cc @@ -70,6 +70,16 @@ static uint32_t simple_spinlock_t buffer_debug_lock = SIMPLE_SPINLOCK_INITIALIZE return buffer_cached_crc_adjusted.read(); } + atomic_t buffer_c_str_accesses; + bool buffer_track_c_str = get_env_bool("CEPH_BUFFER_TRACK"); + + void buffer::track_c_str(bool b) { + buffer_track_c_str = b; + } + int buffer::get_c_str_accesses() { + return buffer_c_str_accesses.read(); + } + atomic_t buffer_max_pipe_size; int update_max_pipe_size() { #ifdef CEPH_HAVE_SETPIPE_SZ @@ -613,10 +623,14 @@ static uint32_t simple_spinlock_t buffer_debug_lock = SIMPLE_SPINLOCK_INITIALIZE const char *buffer::ptr::c_str() const { assert(_raw); + if (buffer_track_c_str) + buffer_c_str_accesses.inc(); return _raw->get_data() + _off; } char *buffer::ptr::c_str() { assert(_raw); + if (buffer_track_c_str) + buffer_c_str_accesses.inc(); return _raw->get_data() + _off; } diff --git a/src/include/buffer.h b/src/include/buffer.h index 6eefd3494cfc7..8bd7603c4639f 100644 --- a/src/include/buffer.h +++ b/src/include/buffer.h @@ -120,6 +120,10 @@ class buffer { /// enable/disable tracking of cached crcs static void track_cached_crc(bool b); + /// count of calls to buffer::ptr::c_str() + static int get_c_str_accesses(); + /// enable/disable tracking of buffer::ptr::c_str() calls + static void track_c_str(bool b); private: From 03d63c4b2d63b655924c5657637e85abdef40899 Mon Sep 17 00:00:00 2001 From: Josh Durgin Date: Mon, 25 Nov 2013 14:37:57 -0800 Subject: [PATCH 10/10] buffer: turn off zero-copy reads for now Some users will need to be changed to handle getting the correct length from bufferlist::read_fd. Signed-off-by: Josh Durgin --- src/common/buffer.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/common/buffer.cc b/src/common/buffer.cc index 229625910192f..24b8182abb43d 100644 --- a/src/common/buffer.cc +++ b/src/common/buffer.cc @@ -1504,7 +1504,7 @@ int buffer::list::read_file(const char *fn, std::string *error) ssize_t buffer::list::read_fd(int fd, size_t len) { // try zero copy first - if (read_fd_zero_copy(fd, len) == 0) { + if (false && read_fd_zero_copy(fd, len) == 0) { // TODO fix callers to not require correct read size, which is not // available for raw_pipe until we actually inspect the data return 0;