From 5021b43e0a6ece07b8dd364ce227f60e4916f707 Mon Sep 17 00:00:00 2001 From: Josh Durgin Date: Mon, 21 Oct 2013 08:58:56 -0700 Subject: [PATCH] buffer: create raw pipe-based buffer This uses a pipe to reference kernel memory so we can use splice(2) to avoid extra data copies. Take an fd in the factory to create it, since that's the only way to use it efficiently, which is its whole purpose. Signed-off-by: Josh Durgin --- src/common/buffer.cc | 119 +++++++++++++++++++++++++++++++++++++ src/include/buffer.h | 5 +- src/test/bufferlist.cc | 129 +++++++++++++++++++++++++++++++++++++++++ 3 files changed, 251 insertions(+), 2 deletions(-) diff --git a/src/common/buffer.cc b/src/common/buffer.cc index 7ac54390943a5..cf20080fe5666 100644 --- a/src/common/buffer.cc +++ b/src/common/buffer.cc @@ -228,6 +228,125 @@ static uint32_t simple_spinlock_t buffer_debug_lock = SIMPLE_SPINLOCK_INITIALIZE }; #endif +#ifdef CEPH_HAVE_SPLICE + class buffer::raw_pipe : public buffer::raw { + public: + raw_pipe(unsigned len) : raw(len), source_consumed(false) { + pipefds[0] = -1; + pipefds[1] = -1; + if (::pipe(pipefds) == -1) { + int r = -errno; + bdout << "raw_pipe: error creating pipe: " << cpp_strerror(r) << bendl; + throw error_code(r); + } + if (set_nonblocking(pipefds) < 0) { + int r = -errno; + bdout << "raw_pipe: error setting nonblocking flag on temp pipe: " + << cpp_strerror(r) << bendl; + throw error_code(r); + } + inc_total_alloc(len); + bdout << "raw_pipe " << this << " alloc " << len << " " + << buffer::get_total_alloc() << bendl; + } + ~raw_pipe() { + if (data) + delete data; + close_pipe(pipefds); + dec_total_alloc(len); + bdout << "raw_pipe " << this << " free " << (void *)data << " " + << buffer::get_total_alloc() << bendl; + } + return false; + } + int set_source(int fd, loff_t *off) { + int flags = SPLICE_F_NONBLOCK; + ssize_t r = safe_splice(fd, off, pipefds[1], NULL, len, flags); + if (r < 0) { + bdout << "raw_pipe: error splicing into pipe: " << cpp_strerror(r) + << bendl; + return r; + } + // update length with actual amount read + len = r; + return 0; + } + // cloning doesn't make sense for pipe-based buffers, + // and is only used by unit tests for other types of buffers + return NULL; + } + char *get_data() { + if (data) + return data; + return copy_pipe(pipefds); + } + private: + int set_nonblocking(int *fds) { + if (::fcntl(fds[0], F_SETFL, O_NONBLOCK) == -1) + return -errno; + if (::fcntl(fds[1], F_SETFL, O_NONBLOCK) == -1) + return -errno; + return 0; + } + + void close_pipe(int *fds) { + if (fds[0] >= 0) + TEMP_FAILURE_RETRY(::close(fds[0])); + if (fds[1] >= 0) + TEMP_FAILURE_RETRY(::close(fds[1])); + } + char *copy_pipe(int *fds) { + /* preserve original pipe contents by copying into a temporary + * pipe before reading. + */ + int tmpfd[2]; + int r; + + assert(!source_consumed); + assert(fds[0] >= 0); + + if (::pipe(tmpfd) == -1) { + r = -errno; + bdout << "raw_pipe: error creating temp pipe: " << cpp_strerror(r) + << bendl; + throw error_code(r); + } + r = set_nonblocking(tmpfd); + if (r < 0) { + bdout << "raw_pipe: error setting nonblocking flag on temp pipe: " + << cpp_strerror(r) << bendl; + throw error_code(r); + } + int flags = SPLICE_F_NONBLOCK; + if (::tee(fds[0], tmpfd[1], len, flags) == -1) { + r = errno; + bdout << "raw_pipe: error tee'ing into temp pipe: " << cpp_strerror(r) + << bendl; + close_pipe(tmpfd); + throw error_code(r); + } + data = (char *)malloc(len); + if (!data) { + close_pipe(tmpfd); + throw bad_alloc(); + } + r = safe_read(tmpfd[0], data, len); + if (r < (ssize_t)len) { + bdout << "raw_pipe: error reading from temp pipe:" << cpp_strerror(r) + << bendl; + delete data; + data = NULL; + close_pipe(tmpfd); + throw error_code(r); + } + close_pipe(tmpfd); + return data; + } + bool source_consumed; + int pipefds[2]; + }; +#endif // CEPH_HAVE_SPLICE + /* * primitive buffer types */ diff --git a/src/include/buffer.h b/src/include/buffer.h index 3987f021601cd..546f728bbdc98 100644 --- a/src/include/buffer.h +++ b/src/include/buffer.h @@ -137,6 +137,7 @@ class buffer { class raw_posix_aligned; class raw_hack_aligned; class raw_char; + class raw_pipe; friend std::ostream& operator<<(std::ostream& out, const raw &r); @@ -152,8 +153,8 @@ class buffer { static raw* claim_malloc(unsigned len, char *buf); static raw* create_static(unsigned len, char *buf); static raw* create_page_aligned(unsigned len); - - + static raw* create_zero_copy(unsigned len, int fd, loff_t *offset); + /* * a buffer pointer. references (a subsequence of) a raw buffer. */ diff --git a/src/test/bufferlist.cc b/src/test/bufferlist.cc index 8b6ca269234ff..61db1ca33467f 100644 --- a/src/test/bufferlist.cc +++ b/src/test/bufferlist.cc @@ -29,6 +29,7 @@ #include "include/encoding.h" #include "common/environment.h" #include "common/Clock.h" +#include "common/safe_io.h" #include "gtest/gtest.h" #include "stdlib.h" @@ -141,6 +142,25 @@ TEST(Buffer, constructors) { bufferptr clone = ptr.clone(); EXPECT_EQ(0, ::memcmp(clone.c_str(), ptr.c_str(), len)); } +#ifdef CEPH_HAVE_SPLICE + if (ceph_buffer_track) + EXPECT_EQ(0, buffer::get_total_alloc()); + { + // no fd + EXPECT_THROW(buffer::create_zero_copy(len, -1, NULL), buffer::error_code); + + unsigned zc_len = 4; + ::unlink("testfile"); + ::system("echo ABC > testfile"); + int fd = ::open("testfile", O_RDONLY); + bufferptr ptr(buffer::create_zero_copy(zc_len, fd, NULL)); + EXPECT_EQ(zc_len, ptr.length()); + if (ceph_buffer_track) + EXPECT_EQ(zc_len, (unsigned)buffer::get_total_alloc()); + ::close(fd); + ::unlink("testfile"); + } +#endif if (ceph_buffer_track) EXPECT_EQ(0, buffer::get_total_alloc()); } @@ -153,6 +173,115 @@ TEST(BufferRaw, ostream) { EXPECT_GT(stream.str().size(), stream.str().find("len 1 nref 1)")); } +#ifdef CEPH_HAVE_SPLICE +class TestRawPipe : public ::testing::Test { +protected: + virtual void SetUp() { + len = 4; + ::unlink("testfile"); + ::system("echo ABC > testfile"); + fd = ::open("testfile", O_RDONLY); + assert(fd >= 0); + } + virtual void TearDown() { + ::close(fd); + ::unlink("testfile"); + } + int fd; + unsigned len; +}; + +TEST_F(TestRawPipe, create_zero_copy) { + bufferptr ptr(buffer::create_zero_copy(len, fd, NULL)); + EXPECT_EQ(len, ptr.length()); + if (get_env_bool("CEPH_BUFFER_TRACK")) + EXPECT_EQ(len, (unsigned)buffer::get_total_alloc()); +} + +TEST_F(TestRawPipe, c_str_no_fd) { + EXPECT_THROW(bufferptr ptr(buffer::create_zero_copy(len, -1, NULL)), + buffer::error_code); +} + +TEST_F(TestRawPipe, c_str_basic) { + bufferptr ptr = bufferptr(buffer::create_zero_copy(len, fd, NULL)); + EXPECT_EQ(0, memcmp(ptr.c_str(), "ABC\n", len)); + EXPECT_EQ(len, ptr.length()); +} + +TEST_F(TestRawPipe, c_str_twice) { + // make sure we're creating a copy of the data and not consuming it + bufferptr ptr = bufferptr(buffer::create_zero_copy(len, fd, NULL)); + EXPECT_EQ(len, ptr.length()); + EXPECT_EQ(0, memcmp(ptr.c_str(), "ABC\n", len)); + EXPECT_EQ(0, memcmp(ptr.c_str(), "ABC\n", len)); +} + +TEST_F(TestRawPipe, c_str_basic_offset) { + loff_t offset = len - 1; + ::lseek(fd, offset, SEEK_SET); + bufferptr ptr = bufferptr(buffer::create_zero_copy(len - offset, fd, NULL)); + EXPECT_EQ(len - offset, ptr.length()); + EXPECT_EQ(0, memcmp(ptr.c_str(), "\n", len - offset)); +} + +TEST_F(TestRawPipe, c_str_dest_short) { + ::lseek(fd, 1, SEEK_SET); + bufferptr ptr = bufferptr(buffer::create_zero_copy(2, fd, NULL)); + EXPECT_EQ(2u, ptr.length()); + EXPECT_EQ(0, memcmp(ptr.c_str(), "BC", 2)); +} + +TEST_F(TestRawPipe, c_str_source_short) { + ::lseek(fd, 1, SEEK_SET); + bufferptr ptr = bufferptr(buffer::create_zero_copy(len, fd, NULL)); + EXPECT_EQ(len - 1, ptr.length()); + EXPECT_EQ(0, memcmp(ptr.c_str(), "BC\n", len - 1)); +} + +TEST_F(TestRawPipe, c_str_explicit_zero_offset) { + loff_t offset = 0; + ::lseek(fd, 1, SEEK_SET); + bufferptr ptr = bufferptr(buffer::create_zero_copy(len, fd, &offset)); + EXPECT_EQ(len, offset); + EXPECT_EQ(len, ptr.length()); + EXPECT_EQ(0, memcmp(ptr.c_str(), "ABC\n", len)); +} + +TEST_F(TestRawPipe, c_str_explicit_positive_offset) { + loff_t offset = 1; + bufferptr ptr = bufferptr(buffer::create_zero_copy(len - offset, fd, + &offset)); + EXPECT_EQ(len, offset); + EXPECT_EQ(len - 1, ptr.length()); + EXPECT_EQ(0, memcmp(ptr.c_str(), "BC\n", len - 1)); +} + +TEST_F(TestRawPipe, c_str_explicit_positive_empty_result) { + loff_t offset = len; + bufferptr ptr = bufferptr(buffer::create_zero_copy(len - offset, fd, + &offset)); + EXPECT_EQ(len, offset); + EXPECT_EQ(0u, ptr.length()); +} + +TEST_F(TestRawPipe, c_str_source_short_explicit_offset) { + loff_t offset = 1; + bufferptr ptr = bufferptr(buffer::create_zero_copy(len, fd, &offset)); + EXPECT_EQ(len, offset); + EXPECT_EQ(len - 1, ptr.length()); + EXPECT_EQ(0, memcmp(ptr.c_str(), "BC\n", len - 1)); +} + +TEST_F(TestRawPipe, c_str_dest_short_explicit_offset) { + loff_t offset = 1; + bufferptr ptr = bufferptr(buffer::create_zero_copy(2, fd, &offset)); + EXPECT_EQ(3, offset); + EXPECT_EQ(2u, ptr.length()); + EXPECT_EQ(0, memcmp(ptr.c_str(), "BC", 2)); +} +#endif // CEPH_HAVE_SPLICE + // // +-----------+ +-----+ // | | | |