Skip to content

Commit

Permalink
buffer: create raw pipe-based buffer
Browse files Browse the repository at this point in the history
This uses a pipe to reference kernel memory so we can use splice(2) to
avoid extra data copies. Take an fd in the factory to create it, since
that's the only way to use it efficiently, which is its whole purpose.

Signed-off-by: Josh Durgin <[email protected]>
  • Loading branch information
jdurgin committed Nov 23, 2013
1 parent eb94b8f commit 5021b43
Show file tree
Hide file tree
Showing 3 changed files with 251 additions and 2 deletions.
119 changes: 119 additions & 0 deletions src/common/buffer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,125 @@ static uint32_t simple_spinlock_t buffer_debug_lock = SIMPLE_SPINLOCK_INITIALIZE
};
#endif

#ifdef CEPH_HAVE_SPLICE
class buffer::raw_pipe : public buffer::raw {
public:
raw_pipe(unsigned len) : raw(len), source_consumed(false) {
pipefds[0] = -1;
pipefds[1] = -1;
if (::pipe(pipefds) == -1) {
int r = -errno;
bdout << "raw_pipe: error creating pipe: " << cpp_strerror(r) << bendl;
throw error_code(r);
}
if (set_nonblocking(pipefds) < 0) {
int r = -errno;
bdout << "raw_pipe: error setting nonblocking flag on temp pipe: "
<< cpp_strerror(r) << bendl;
throw error_code(r);
}
inc_total_alloc(len);
bdout << "raw_pipe " << this << " alloc " << len << " "
<< buffer::get_total_alloc() << bendl;
}
~raw_pipe() {
if (data)
delete data;
close_pipe(pipefds);
dec_total_alloc(len);
bdout << "raw_pipe " << this << " free " << (void *)data << " "
<< buffer::get_total_alloc() << bendl;
}
return false;
}
int set_source(int fd, loff_t *off) {
int flags = SPLICE_F_NONBLOCK;
ssize_t r = safe_splice(fd, off, pipefds[1], NULL, len, flags);
if (r < 0) {
bdout << "raw_pipe: error splicing into pipe: " << cpp_strerror(r)
<< bendl;
return r;
}
// update length with actual amount read
len = r;
return 0;
}
// cloning doesn't make sense for pipe-based buffers,
// and is only used by unit tests for other types of buffers
return NULL;
}
char *get_data() {
if (data)
return data;
return copy_pipe(pipefds);
}
private:
int set_nonblocking(int *fds) {
if (::fcntl(fds[0], F_SETFL, O_NONBLOCK) == -1)
return -errno;
if (::fcntl(fds[1], F_SETFL, O_NONBLOCK) == -1)
return -errno;
return 0;
}

void close_pipe(int *fds) {
if (fds[0] >= 0)
TEMP_FAILURE_RETRY(::close(fds[0]));
if (fds[1] >= 0)
TEMP_FAILURE_RETRY(::close(fds[1]));
}
char *copy_pipe(int *fds) {
/* preserve original pipe contents by copying into a temporary
* pipe before reading.
*/
int tmpfd[2];
int r;

assert(!source_consumed);
assert(fds[0] >= 0);

if (::pipe(tmpfd) == -1) {
r = -errno;
bdout << "raw_pipe: error creating temp pipe: " << cpp_strerror(r)
<< bendl;
throw error_code(r);
}
r = set_nonblocking(tmpfd);
if (r < 0) {
bdout << "raw_pipe: error setting nonblocking flag on temp pipe: "
<< cpp_strerror(r) << bendl;
throw error_code(r);
}
int flags = SPLICE_F_NONBLOCK;
if (::tee(fds[0], tmpfd[1], len, flags) == -1) {
r = errno;
bdout << "raw_pipe: error tee'ing into temp pipe: " << cpp_strerror(r)
<< bendl;
close_pipe(tmpfd);
throw error_code(r);
}
data = (char *)malloc(len);
if (!data) {
close_pipe(tmpfd);
throw bad_alloc();
}
r = safe_read(tmpfd[0], data, len);
if (r < (ssize_t)len) {
bdout << "raw_pipe: error reading from temp pipe:" << cpp_strerror(r)
<< bendl;
delete data;
data = NULL;
close_pipe(tmpfd);
throw error_code(r);
}
close_pipe(tmpfd);
return data;
}
bool source_consumed;
int pipefds[2];
};
#endif // CEPH_HAVE_SPLICE

/*
* primitive buffer types
*/
Expand Down
5 changes: 3 additions & 2 deletions src/include/buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ class buffer {
class raw_posix_aligned;
class raw_hack_aligned;
class raw_char;
class raw_pipe;

friend std::ostream& operator<<(std::ostream& out, const raw &r);

Expand All @@ -152,8 +153,8 @@ class buffer {
static raw* claim_malloc(unsigned len, char *buf);
static raw* create_static(unsigned len, char *buf);
static raw* create_page_aligned(unsigned len);

static raw* create_zero_copy(unsigned len, int fd, loff_t *offset);

/*
* a buffer pointer. references (a subsequence of) a raw buffer.
*/
Expand Down
129 changes: 129 additions & 0 deletions src/test/bufferlist.cc
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include "include/encoding.h"
#include "common/environment.h"
#include "common/Clock.h"
#include "common/safe_io.h"

#include "gtest/gtest.h"
#include "stdlib.h"
Expand Down Expand Up @@ -141,6 +142,25 @@ TEST(Buffer, constructors) {
bufferptr clone = ptr.clone();
EXPECT_EQ(0, ::memcmp(clone.c_str(), ptr.c_str(), len));
}
#ifdef CEPH_HAVE_SPLICE
if (ceph_buffer_track)
EXPECT_EQ(0, buffer::get_total_alloc());
{
// no fd
EXPECT_THROW(buffer::create_zero_copy(len, -1, NULL), buffer::error_code);

unsigned zc_len = 4;
::unlink("testfile");
::system("echo ABC > testfile");
int fd = ::open("testfile", O_RDONLY);
bufferptr ptr(buffer::create_zero_copy(zc_len, fd, NULL));
EXPECT_EQ(zc_len, ptr.length());
if (ceph_buffer_track)
EXPECT_EQ(zc_len, (unsigned)buffer::get_total_alloc());
::close(fd);
::unlink("testfile");
}
#endif
if (ceph_buffer_track)
EXPECT_EQ(0, buffer::get_total_alloc());
}
Expand All @@ -153,6 +173,115 @@ TEST(BufferRaw, ostream) {
EXPECT_GT(stream.str().size(), stream.str().find("len 1 nref 1)"));
}

#ifdef CEPH_HAVE_SPLICE
class TestRawPipe : public ::testing::Test {
protected:
virtual void SetUp() {
len = 4;
::unlink("testfile");
::system("echo ABC > testfile");
fd = ::open("testfile", O_RDONLY);
assert(fd >= 0);
}
virtual void TearDown() {
::close(fd);
::unlink("testfile");
}
int fd;
unsigned len;
};

TEST_F(TestRawPipe, create_zero_copy) {
bufferptr ptr(buffer::create_zero_copy(len, fd, NULL));
EXPECT_EQ(len, ptr.length());
if (get_env_bool("CEPH_BUFFER_TRACK"))
EXPECT_EQ(len, (unsigned)buffer::get_total_alloc());
}

TEST_F(TestRawPipe, c_str_no_fd) {
EXPECT_THROW(bufferptr ptr(buffer::create_zero_copy(len, -1, NULL)),
buffer::error_code);
}

TEST_F(TestRawPipe, c_str_basic) {
bufferptr ptr = bufferptr(buffer::create_zero_copy(len, fd, NULL));
EXPECT_EQ(0, memcmp(ptr.c_str(), "ABC\n", len));
EXPECT_EQ(len, ptr.length());
}

TEST_F(TestRawPipe, c_str_twice) {
// make sure we're creating a copy of the data and not consuming it
bufferptr ptr = bufferptr(buffer::create_zero_copy(len, fd, NULL));
EXPECT_EQ(len, ptr.length());
EXPECT_EQ(0, memcmp(ptr.c_str(), "ABC\n", len));
EXPECT_EQ(0, memcmp(ptr.c_str(), "ABC\n", len));
}

TEST_F(TestRawPipe, c_str_basic_offset) {
loff_t offset = len - 1;
::lseek(fd, offset, SEEK_SET);
bufferptr ptr = bufferptr(buffer::create_zero_copy(len - offset, fd, NULL));
EXPECT_EQ(len - offset, ptr.length());
EXPECT_EQ(0, memcmp(ptr.c_str(), "\n", len - offset));
}

TEST_F(TestRawPipe, c_str_dest_short) {
::lseek(fd, 1, SEEK_SET);
bufferptr ptr = bufferptr(buffer::create_zero_copy(2, fd, NULL));
EXPECT_EQ(2u, ptr.length());
EXPECT_EQ(0, memcmp(ptr.c_str(), "BC", 2));
}

TEST_F(TestRawPipe, c_str_source_short) {
::lseek(fd, 1, SEEK_SET);
bufferptr ptr = bufferptr(buffer::create_zero_copy(len, fd, NULL));
EXPECT_EQ(len - 1, ptr.length());
EXPECT_EQ(0, memcmp(ptr.c_str(), "BC\n", len - 1));
}

TEST_F(TestRawPipe, c_str_explicit_zero_offset) {
loff_t offset = 0;
::lseek(fd, 1, SEEK_SET);
bufferptr ptr = bufferptr(buffer::create_zero_copy(len, fd, &offset));
EXPECT_EQ(len, offset);
EXPECT_EQ(len, ptr.length());
EXPECT_EQ(0, memcmp(ptr.c_str(), "ABC\n", len));
}

TEST_F(TestRawPipe, c_str_explicit_positive_offset) {
loff_t offset = 1;
bufferptr ptr = bufferptr(buffer::create_zero_copy(len - offset, fd,
&offset));
EXPECT_EQ(len, offset);
EXPECT_EQ(len - 1, ptr.length());
EXPECT_EQ(0, memcmp(ptr.c_str(), "BC\n", len - 1));
}

TEST_F(TestRawPipe, c_str_explicit_positive_empty_result) {
loff_t offset = len;
bufferptr ptr = bufferptr(buffer::create_zero_copy(len - offset, fd,
&offset));
EXPECT_EQ(len, offset);
EXPECT_EQ(0u, ptr.length());
}

TEST_F(TestRawPipe, c_str_source_short_explicit_offset) {
loff_t offset = 1;
bufferptr ptr = bufferptr(buffer::create_zero_copy(len, fd, &offset));
EXPECT_EQ(len, offset);
EXPECT_EQ(len - 1, ptr.length());
EXPECT_EQ(0, memcmp(ptr.c_str(), "BC\n", len - 1));
}

TEST_F(TestRawPipe, c_str_dest_short_explicit_offset) {
loff_t offset = 1;
bufferptr ptr = bufferptr(buffer::create_zero_copy(2, fd, &offset));
EXPECT_EQ(3, offset);
EXPECT_EQ(2u, ptr.length());
EXPECT_EQ(0, memcmp(ptr.c_str(), "BC", 2));
}
#endif // CEPH_HAVE_SPLICE

//
// +-----------+ +-----+
// | | | |
Expand Down

0 comments on commit 5021b43

Please sign in to comment.