Skip to content

Commit

Permalink
buffer: add methods to read and write using zero copy
Browse files Browse the repository at this point in the history
Create explicit methods for testing. Make buffer::list::write_fd() use
zero-copy if all the buffers support it.  Don't automatically handle
reads yet, since we need better detection of read length first.

Signed-off-by: Josh Durgin <[email protected]>
  • Loading branch information
jdurgin committed Nov 23, 2013
1 parent 5021b43 commit 3f6fa05
Show file tree
Hide file tree
Showing 3 changed files with 138 additions and 2 deletions.
102 changes: 100 additions & 2 deletions src/common/buffer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,12 @@ static uint32_t simple_spinlock_t buffer_debug_lock = SIMPLE_SPINLOCK_INITIALIZE
memcpy(c->data, data, len);
return c;
}
virtual bool can_zero_copy() const {
return false;
}
virtual int zero_copy_to_fd(int fd, loff_t *offset) {
return -ENOTSUP;
}
virtual bool is_page_aligned() {
return ((long)data & ~CEPH_PAGE_MASK) == 0;
}
Expand Down Expand Up @@ -257,6 +263,10 @@ static uint32_t simple_spinlock_t buffer_debug_lock = SIMPLE_SPINLOCK_INITIALIZE
bdout << "raw_pipe " << this << " free " << (void *)data << " "
<< buffer::get_total_alloc() << bendl;
}
bool can_zero_copy() const {
return true;
}
bool is_page_aligned() {
return false;
}
int set_source(int fd, loff_t *off) {
Expand All @@ -271,6 +281,19 @@ static uint32_t simple_spinlock_t buffer_debug_lock = SIMPLE_SPINLOCK_INITIALIZE
len = r;
return 0;
}
int zero_copy_to_fd(int fd, loff_t *offset) {
assert(!source_consumed);
int flags = SPLICE_F_NONBLOCK;
ssize_t r = safe_splice_exact(pipefds[0], NULL, fd, offset, len, flags);
if (r < 0) {
bdout << "raw_pipe: error splicing from pipe to fd: "
<< cpp_strerror(r) << bendl;
return r;
}
source_consumed = true;
return 0;
}
buffer::raw* clone_empty() {
// cloning doesn't make sense for pipe-based buffers,
// and is only used by unit tests for other types of buffers
return NULL;
Expand Down Expand Up @@ -412,6 +435,20 @@ static uint32_t simple_spinlock_t buffer_debug_lock = SIMPLE_SPINLOCK_INITIALIZE
#endif
}

buffer::raw* buffer::create_zero_copy(unsigned len, int fd, loff_t *offset) {
#ifdef CEPH_HAVE_SPLICE
buffer::raw_pipe* buf = new raw_pipe(len);
int r = buf->set_source(fd, offset);
if (r < 0) {
delete buf;
throw error_code(r);
}
return buf;
#else
throw error_code(-ENOTSUP);
#endif
}

buffer::ptr::ptr(raw *r) : _raw(r), _off(0), _len(r->len) // no lock needed; this is an unref raw.
{
r->nref.inc();
Expand Down Expand Up @@ -597,6 +634,15 @@ static uint32_t simple_spinlock_t buffer_debug_lock = SIMPLE_SPINLOCK_INITIALIZE
memset(c_str()+o, 0, l);
}

bool buffer::ptr::can_zero_copy() const
{
return _raw->can_zero_copy();
}

int buffer::ptr::zero_copy_to_fd(int fd, loff_t *offset) const
{
return _raw->zero_copy_to_fd(fd, offset);
}

// -- buffer::list::iterator --
/*
Expand Down Expand Up @@ -859,6 +905,16 @@ static uint32_t simple_spinlock_t buffer_debug_lock = SIMPLE_SPINLOCK_INITIALIZE
}
}

bool buffer::list::can_zero_copy() const
{
for (std::list<ptr>::const_iterator it = _buffers.begin();
it != _buffers.end();
++it)
if (!it->can_zero_copy())
return false;
return true;
}

bool buffer::list::is_page_aligned() const
{
for (std::list<ptr>::const_iterator it = _buffers.begin();
Expand Down Expand Up @@ -1351,7 +1407,7 @@ int buffer::list::read_file(const char *fn, std::string *error)
return 0;
}

ssize_t buffer::list::read_fd(int fd, size_t len)
ssize_t buffer::list::read_fd(int fd, size_t len)
{
int s = ROUND_UP_TO(len, CEPH_PAGE_SIZE);
bufferptr bp = buffer::create_page_aligned(s);
Expand All @@ -1363,6 +1419,21 @@ ssize_t buffer::list::read_fd(int fd, size_t len)
return ret;
}

int buffer::list::read_fd_zero_copy(int fd, size_t len)
{
#ifdef CEPH_HAVE_SPLICE
try {
bufferptr bp = buffer::create_zero_copy(len, fd, NULL);
append(bp);
} catch (buffer::error_code e) {
return e.code;
}
return 0;
#else
return -ENOTSUP;
#endif
}

int buffer::list::write_file(const char *fn, int mode)
{
int fd = TEMP_FAILURE_RETRY(::open(fn, O_WRONLY|O_CREAT|O_TRUNC, mode));
Expand Down Expand Up @@ -1390,12 +1461,15 @@ int buffer::list::write_file(const char *fn, int mode)

int buffer::list::write_fd(int fd) const
{
if (can_zero_copy())
return write_fd_zero_copy(fd);

// use writev!
iovec iov[IOV_MAX];
int iovlen = 0;
ssize_t bytes = 0;

std::list<ptr>::const_iterator p = _buffers.begin();
std::list<ptr>::const_iterator p = _buffers.begin();
while (p != _buffers.end()) {
if (p->length() > 0) {
iov[iovlen].iov_base = (void *)p->c_str();
Expand Down Expand Up @@ -1440,6 +1514,30 @@ int buffer::list::write_fd(int fd) const
return 0;
}

int buffer::list::write_fd_zero_copy(int fd) const
{
if (!can_zero_copy())
return -ENOTSUP;
/* pass offset to each call to avoid races updating the fd seek
* position, since the I/O may be non-blocking
*/
loff_t offset = ::lseek(fd, 0, SEEK_CUR);
loff_t *off_p = &offset;
if (offset < 0 && offset != ESPIPE)
return (int) offset;
if (offset == ESPIPE)
off_p = NULL;
for (std::list<ptr>::const_iterator it = _buffers.begin();
it != _buffers.end(); ++it) {
int r = it->zero_copy_to_fd(fd, off_p);
if (r < 0)
return r;
if (off_p)
offset += it->length();
}
return 0;
}

__u32 buffer::list::crc32c(__u32 crc) const
{
for (std::list<ptr>::const_iterator it = _buffers.begin();
Expand Down
7 changes: 7 additions & 0 deletions src/include/buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,9 @@ class buffer {
memcpy(dest, c_str()+o, l);
}

bool can_zero_copy() const;
int zero_copy_to_fd(int fd, loff_t *offset) const;

unsigned wasted();

int cmp(const ptr& o);
Expand Down Expand Up @@ -310,6 +313,7 @@ class buffer {

private:
mutable iterator last_p;
int zero_copy_to_fd(int fd) const;

public:
// cons/des
Expand Down Expand Up @@ -347,6 +351,7 @@ class buffer {
}
bool contents_equal(buffer::list& other);

bool can_zero_copy() const;
bool is_page_aligned() const;
bool is_n_page_sized() const;

Expand Down Expand Up @@ -434,8 +439,10 @@ class buffer {
void hexdump(std::ostream &out) const;
int read_file(const char *fn, std::string *error);
ssize_t read_fd(int fd, size_t len);
int read_fd_zero_copy(int fd, size_t len);
int write_file(const char *fn, int mode=0644);
int write_fd(int fd) const;
int write_fd_zero_copy(int fd) const;
uint32_t crc32c(uint32_t crc) const;
};

Expand Down
31 changes: 31 additions & 0 deletions src/test/bufferlist.cc
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,37 @@ TEST_F(TestRawPipe, c_str_dest_short_explicit_offset) {
EXPECT_EQ(2u, ptr.length());
EXPECT_EQ(0, memcmp(ptr.c_str(), "BC", 2));
}

TEST_F(TestRawPipe, buffer_list_read_fd_zero_copy) {
bufferlist bl;
EXPECT_EQ(-EBADF, bl.read_fd_zero_copy(-1, len));
bl = bufferlist();
EXPECT_EQ(0, bl.read_fd_zero_copy(fd, len));
EXPECT_EQ(len, bl.length());
EXPECT_EQ(0u, bl.buffers().front().unused_tail_length());
EXPECT_EQ(1u, bl.buffers().size());
EXPECT_EQ(len, bl.buffers().front().raw_length());
EXPECT_EQ(0, memcmp(bl.c_str(), "ABC\n", len));
EXPECT_TRUE(bl.can_zero_copy());
}

TEST_F(TestRawPipe, buffer_list_write_fd_zero_copy) {
::unlink("testfile_out");
bufferlist bl;
EXPECT_EQ(0, bl.read_fd_zero_copy(fd, len));
EXPECT_TRUE(bl.can_zero_copy());
int out_fd = ::open("testfile_out", O_RDWR|O_CREAT|O_TRUNC, 0600);
EXPECT_EQ(0, bl.write_fd_zero_copy(out_fd));
struct stat st;
memset(&st, 0, sizeof(st));
EXPECT_EQ(0, ::stat("testfile_out", &st));
EXPECT_EQ(len, st.st_size);
char buf[len + 1];
EXPECT_EQ(len, safe_read(out_fd, buf, len + 1));
EXPECT_EQ(0, memcmp(buf, "ABC\n", len));
::close(out_fd);
::unlink("testfile_out");
}
#endif // CEPH_HAVE_SPLICE

//
Expand Down

0 comments on commit 3f6fa05

Please sign in to comment.