diff --git a/src/common/buffer.cc b/src/common/buffer.cc index 7ac54390943a5..cf20080fe5666 100644 --- a/src/common/buffer.cc +++ b/src/common/buffer.cc @@ -228,6 +228,125 @@ static uint32_t simple_spinlock_t buffer_debug_lock = SIMPLE_SPINLOCK_INITIALIZE }; #endif +#ifdef CEPH_HAVE_SPLICE + class buffer::raw_pipe : public buffer::raw { + public: + raw_pipe(unsigned len) : raw(len), source_consumed(false) { + pipefds[0] = -1; + pipefds[1] = -1; + if (::pipe(pipefds) == -1) { + int r = -errno; + bdout << "raw_pipe: error creating pipe: " << cpp_strerror(r) << bendl; + throw error_code(r); + } + if (set_nonblocking(pipefds) < 0) { + int r = -errno; + bdout << "raw_pipe: error setting nonblocking flag on temp pipe: " + << cpp_strerror(r) << bendl; + throw error_code(r); + } + inc_total_alloc(len); + bdout << "raw_pipe " << this << " alloc " << len << " " + << buffer::get_total_alloc() << bendl; + } + ~raw_pipe() { + if (data) + delete data; + close_pipe(pipefds); + dec_total_alloc(len); + bdout << "raw_pipe " << this << " free " << (void *)data << " " + << buffer::get_total_alloc() << bendl; + } + return false; + } + int set_source(int fd, loff_t *off) { + int flags = SPLICE_F_NONBLOCK; + ssize_t r = safe_splice(fd, off, pipefds[1], NULL, len, flags); + if (r < 0) { + bdout << "raw_pipe: error splicing into pipe: " << cpp_strerror(r) + << bendl; + return r; + } + // update length with actual amount read + len = r; + return 0; + } + // cloning doesn't make sense for pipe-based buffers, + // and is only used by unit tests for other types of buffers + return NULL; + } + char *get_data() { + if (data) + return data; + return copy_pipe(pipefds); + } + private: + int set_nonblocking(int *fds) { + if (::fcntl(fds[0], F_SETFL, O_NONBLOCK) == -1) + return -errno; + if (::fcntl(fds[1], F_SETFL, O_NONBLOCK) == -1) + return -errno; + return 0; + } + + void close_pipe(int *fds) { + if (fds[0] >= 0) + TEMP_FAILURE_RETRY(::close(fds[0])); + if (fds[1] >= 0) + TEMP_FAILURE_RETRY(::close(fds[1])); + } + char *copy_pipe(int *fds) { + /* preserve original pipe contents by copying into a temporary + * pipe before reading. + */ + int tmpfd[2]; + int r; + + assert(!source_consumed); + assert(fds[0] >= 0); + + if (::pipe(tmpfd) == -1) { + r = -errno; + bdout << "raw_pipe: error creating temp pipe: " << cpp_strerror(r) + << bendl; + throw error_code(r); + } + r = set_nonblocking(tmpfd); + if (r < 0) { + bdout << "raw_pipe: error setting nonblocking flag on temp pipe: " + << cpp_strerror(r) << bendl; + throw error_code(r); + } + int flags = SPLICE_F_NONBLOCK; + if (::tee(fds[0], tmpfd[1], len, flags) == -1) { + r = errno; + bdout << "raw_pipe: error tee'ing into temp pipe: " << cpp_strerror(r) + << bendl; + close_pipe(tmpfd); + throw error_code(r); + } + data = (char *)malloc(len); + if (!data) { + close_pipe(tmpfd); + throw bad_alloc(); + } + r = safe_read(tmpfd[0], data, len); + if (r < (ssize_t)len) { + bdout << "raw_pipe: error reading from temp pipe:" << cpp_strerror(r) + << bendl; + delete data; + data = NULL; + close_pipe(tmpfd); + throw error_code(r); + } + close_pipe(tmpfd); + return data; + } + bool source_consumed; + int pipefds[2]; + }; +#endif // CEPH_HAVE_SPLICE + /* * primitive buffer types */ diff --git a/src/include/buffer.h b/src/include/buffer.h index 3987f021601cd..546f728bbdc98 100644 --- a/src/include/buffer.h +++ b/src/include/buffer.h @@ -137,6 +137,7 @@ class buffer { class raw_posix_aligned; class raw_hack_aligned; class raw_char; + class raw_pipe; friend std::ostream& operator<<(std::ostream& out, const raw &r); @@ -152,8 +153,8 @@ class buffer { static raw* claim_malloc(unsigned len, char *buf); static raw* create_static(unsigned len, char *buf); static raw* create_page_aligned(unsigned len); - - + static raw* create_zero_copy(unsigned len, int fd, loff_t *offset); + /* * a buffer pointer. references (a subsequence of) a raw buffer. */ diff --git a/src/test/bufferlist.cc b/src/test/bufferlist.cc index 8b6ca269234ff..61db1ca33467f 100644 --- a/src/test/bufferlist.cc +++ b/src/test/bufferlist.cc @@ -29,6 +29,7 @@ #include "include/encoding.h" #include "common/environment.h" #include "common/Clock.h" +#include "common/safe_io.h" #include "gtest/gtest.h" #include "stdlib.h" @@ -141,6 +142,25 @@ TEST(Buffer, constructors) { bufferptr clone = ptr.clone(); EXPECT_EQ(0, ::memcmp(clone.c_str(), ptr.c_str(), len)); } +#ifdef CEPH_HAVE_SPLICE + if (ceph_buffer_track) + EXPECT_EQ(0, buffer::get_total_alloc()); + { + // no fd + EXPECT_THROW(buffer::create_zero_copy(len, -1, NULL), buffer::error_code); + + unsigned zc_len = 4; + ::unlink("testfile"); + ::system("echo ABC > testfile"); + int fd = ::open("testfile", O_RDONLY); + bufferptr ptr(buffer::create_zero_copy(zc_len, fd, NULL)); + EXPECT_EQ(zc_len, ptr.length()); + if (ceph_buffer_track) + EXPECT_EQ(zc_len, (unsigned)buffer::get_total_alloc()); + ::close(fd); + ::unlink("testfile"); + } +#endif if (ceph_buffer_track) EXPECT_EQ(0, buffer::get_total_alloc()); } @@ -153,6 +173,115 @@ TEST(BufferRaw, ostream) { EXPECT_GT(stream.str().size(), stream.str().find("len 1 nref 1)")); } +#ifdef CEPH_HAVE_SPLICE +class TestRawPipe : public ::testing::Test { +protected: + virtual void SetUp() { + len = 4; + ::unlink("testfile"); + ::system("echo ABC > testfile"); + fd = ::open("testfile", O_RDONLY); + assert(fd >= 0); + } + virtual void TearDown() { + ::close(fd); + ::unlink("testfile"); + } + int fd; + unsigned len; +}; + +TEST_F(TestRawPipe, create_zero_copy) { + bufferptr ptr(buffer::create_zero_copy(len, fd, NULL)); + EXPECT_EQ(len, ptr.length()); + if (get_env_bool("CEPH_BUFFER_TRACK")) + EXPECT_EQ(len, (unsigned)buffer::get_total_alloc()); +} + +TEST_F(TestRawPipe, c_str_no_fd) { + EXPECT_THROW(bufferptr ptr(buffer::create_zero_copy(len, -1, NULL)), + buffer::error_code); +} + +TEST_F(TestRawPipe, c_str_basic) { + bufferptr ptr = bufferptr(buffer::create_zero_copy(len, fd, NULL)); + EXPECT_EQ(0, memcmp(ptr.c_str(), "ABC\n", len)); + EXPECT_EQ(len, ptr.length()); +} + +TEST_F(TestRawPipe, c_str_twice) { + // make sure we're creating a copy of the data and not consuming it + bufferptr ptr = bufferptr(buffer::create_zero_copy(len, fd, NULL)); + EXPECT_EQ(len, ptr.length()); + EXPECT_EQ(0, memcmp(ptr.c_str(), "ABC\n", len)); + EXPECT_EQ(0, memcmp(ptr.c_str(), "ABC\n", len)); +} + +TEST_F(TestRawPipe, c_str_basic_offset) { + loff_t offset = len - 1; + ::lseek(fd, offset, SEEK_SET); + bufferptr ptr = bufferptr(buffer::create_zero_copy(len - offset, fd, NULL)); + EXPECT_EQ(len - offset, ptr.length()); + EXPECT_EQ(0, memcmp(ptr.c_str(), "\n", len - offset)); +} + +TEST_F(TestRawPipe, c_str_dest_short) { + ::lseek(fd, 1, SEEK_SET); + bufferptr ptr = bufferptr(buffer::create_zero_copy(2, fd, NULL)); + EXPECT_EQ(2u, ptr.length()); + EXPECT_EQ(0, memcmp(ptr.c_str(), "BC", 2)); +} + +TEST_F(TestRawPipe, c_str_source_short) { + ::lseek(fd, 1, SEEK_SET); + bufferptr ptr = bufferptr(buffer::create_zero_copy(len, fd, NULL)); + EXPECT_EQ(len - 1, ptr.length()); + EXPECT_EQ(0, memcmp(ptr.c_str(), "BC\n", len - 1)); +} + +TEST_F(TestRawPipe, c_str_explicit_zero_offset) { + loff_t offset = 0; + ::lseek(fd, 1, SEEK_SET); + bufferptr ptr = bufferptr(buffer::create_zero_copy(len, fd, &offset)); + EXPECT_EQ(len, offset); + EXPECT_EQ(len, ptr.length()); + EXPECT_EQ(0, memcmp(ptr.c_str(), "ABC\n", len)); +} + +TEST_F(TestRawPipe, c_str_explicit_positive_offset) { + loff_t offset = 1; + bufferptr ptr = bufferptr(buffer::create_zero_copy(len - offset, fd, + &offset)); + EXPECT_EQ(len, offset); + EXPECT_EQ(len - 1, ptr.length()); + EXPECT_EQ(0, memcmp(ptr.c_str(), "BC\n", len - 1)); +} + +TEST_F(TestRawPipe, c_str_explicit_positive_empty_result) { + loff_t offset = len; + bufferptr ptr = bufferptr(buffer::create_zero_copy(len - offset, fd, + &offset)); + EXPECT_EQ(len, offset); + EXPECT_EQ(0u, ptr.length()); +} + +TEST_F(TestRawPipe, c_str_source_short_explicit_offset) { + loff_t offset = 1; + bufferptr ptr = bufferptr(buffer::create_zero_copy(len, fd, &offset)); + EXPECT_EQ(len, offset); + EXPECT_EQ(len - 1, ptr.length()); + EXPECT_EQ(0, memcmp(ptr.c_str(), "BC\n", len - 1)); +} + +TEST_F(TestRawPipe, c_str_dest_short_explicit_offset) { + loff_t offset = 1; + bufferptr ptr = bufferptr(buffer::create_zero_copy(2, fd, &offset)); + EXPECT_EQ(3, offset); + EXPECT_EQ(2u, ptr.length()); + EXPECT_EQ(0, memcmp(ptr.c_str(), "BC", 2)); +} +#endif // CEPH_HAVE_SPLICE + // // +-----------+ +-----+ // | | | |