Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[RFC] iostream: extended read_exactly2 interface with alignment #5

Open
wants to merge 2 commits into
base: ceph-octopus-19.06.0-45-g7744693c
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 55 additions & 0 deletions include/seastar/core/iostream-impl.hh
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,61 @@ input_stream<CharType>::read_exactly(size_t n) {
}
}

template <typename CharType>
future<temporary_buffer<CharType>>
input_stream<CharType>::read_exactly_part_direct(size_t n, tmp_buf out, size_t completed) {
if (completed == n) {
return make_ready_future<tmp_buf>(std::move(out));
}
assert(!available() && "Still available buffer left");

return _fd.get_direct(out.get_write() + completed, n - completed)
.then([this, n, out = std::move(out), completed] (size_t size, tmp_buf prefetch_buf) mutable {
if (size == 0) {
assert(prefetch_buf.empty() && "EOF should not prefetch");
_eof = true;
return make_ready_future<tmp_buf>();
} else {
auto read_offset = completed + size;
if (bool(prefetch_buf)) {
// with prefetch
assert(read_offset == n && "Prefetch should only happen with a complete read");
_buf = std::move(prefetch_buf);
return make_ready_future<tmp_buf>(std::move(out));
} else {
// no prefetch, and buffer maybe too small
return this->read_exactly_part_direct(n, std::move(out), read_offset);
}
}
});
}

template <typename CharType>
future<temporary_buffer<CharType>>
input_stream<CharType>::read_exactly2(size_t n, uint16_t alignment) {
if (_buf.size() &&
(alignment == DEFAULT_ALIGNMENT /*||
reinterpret_cast<uint64_t>(_buf.begin()) & (alignment - 1) == 0)*/)) {
// alignment suffices
if (_buf.size() == n) {
// easy case: steal buffer, return to caller
return make_ready_future<tmp_buf>(std::move(_buf));
} else if (_buf.size() > n) {
// buffer large enough, share it with caller
auto front = _buf.share(0, n);
_buf.trim_front(n);
return make_ready_future<tmp_buf>(std::move(front));
}
// buffer not large enough, we need to create one
}
// no chance to reuse the memory space of the prefetched buffer
auto out = tmp_buf::aligned(alignment, n);
auto len_needs_copy = std::min(available(), n);
std::copy(_buf.get(), _buf.get() + len_needs_copy, out.get_write());
_buf.trim_front(len_needs_copy);
return read_exactly_part_direct(n, std::move(out), len_needs_copy);
}

template <typename CharType>
template <typename Consumer>
GCC6_CONCEPT(requires InputStreamConsumer<Consumer, CharType> || ObsoleteInputStreamConsumer<Consumer, CharType>)
Expand Down
19 changes: 19 additions & 0 deletions include/seastar/core/iostream.hh
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,18 @@ class data_source_impl {
public:
virtual ~data_source_impl() {}
virtual future<temporary_buffer<char>> get() = 0;
virtual future<size_t, temporary_buffer<char>> get_direct(char* buf, size_t size) {
// the default implementation:
// need to override if the concrete data_source_impl
// can work with user provided buffer pointer with less copy.
return get().then([buf, size] (auto read_buf) mutable {
auto len_needs_copy = std::min(read_buf.size(), size);
std::copy(read_buf.get(), read_buf.get() + len_needs_copy, buf);
read_buf.trim_front(len_needs_copy);
return make_ready_future<size_t, temporary_buffer<char>>(
len_needs_copy, std::move(read_buf));
});
}
virtual future<temporary_buffer<char>> skip(uint64_t n);
virtual future<> close() { return make_ready_future<>(); }
};
Expand All @@ -62,6 +74,10 @@ public:
data_source(data_source&& x) = default;
data_source& operator=(data_source&& x) = default;
future<temporary_buffer<char>> get() { return _dsi->get(); }
// fill the buf directly within the size boundary.
// return the number of bytes actually read, and if the direct buf is fulfilled,
// also return the prefetched buffer.
future<size_t, temporary_buffer<char>> get_direct(char* buf, size_t size) { return _dsi->get_direct(buf, size); }
future<temporary_buffer<char>> skip(uint64_t n) { return _dsi->skip(n); }
future<> close() { return _dsi->close(); }
};
Expand Down Expand Up @@ -214,6 +230,8 @@ public:
input_stream(input_stream&&) = default;
input_stream& operator=(input_stream&&) = default;
future<temporary_buffer<CharType>> read_exactly(size_t n);
static constexpr uint16_t DEFAULT_ALIGNMENT = alignof(char);
future<temporary_buffer<CharType>> read_exactly2(size_t n, uint16_t alignment = DEFAULT_ALIGNMENT);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm afraid that using read_exactly2() to read big chunks will impose memcpy for DPDK due to contiguity requirement. The new method returns temporary_buffer which means: only one data pointer and one data size.
If we expect from DPDK fragmented payloads, we should expect from read_exactly2() a lot of memcpy.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, if crimson-OSD supports fragmented payloads (such as SPDK), it should explicitly instruct messenger to use ceph::net::Socket::read(size) instead of ceph::net::Socket::read_exactly(size, alignment). Because ceph::net::Socket::read(size) will return internally fragmented bufferlist as expected, and IMO it is better renamed to ceph::net::Socket::read_fragmented(size).

Also, the current ceph::net::Socket::read(size) is already optimal for both DPDK stack and POSIX stack if OSD-side supports fragmented DATA payload:

  • for DPDK: it's zero copy.
  • for POSIX: it's zero copy in user-space, and also minimizes syscalls.

If OSD doesn't support fragmented payloads itself (such as kernel), ceph::net::Socket::read_exactly(size) still needs to be used to build up big chunks of aligned payload, regardless of whether the messenger is using Native or POSIX stack.

My point is that whether or not to use fragmented/aligned payloads should be instructed by OSD, not seastar framework. It's our (framework user) specific requirement.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, the current ceph::net::Socket::read(size) is already optimal for both DPDK stack and POSIX stack if OSD-side supports fragmented DATA payload:

  • for DPDK: it's zero copy.
  • for POSIX: it's zero copy in user-space, and also minimizes syscalls.

I disagree with that. For POSIX stack the SGL will be terribly fragmented and many syscalls will be issued because of the small, 8 KB-long prefetch buffer. For instance: reading 4 MB payload requires 4096 KB / 8 KB = 512 fragments and also 512 syscalls.

Copy link
Member Author

@cyx1231st cyx1231st Jun 11, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

because of the small, 8 KB-long prefetch buffer

I think it's a separate issue, and there is already another PR addressing it (#4). My analysis (#4 (comment)) shows that messenger performance is much better with larger trunks (1 MB), as expected. But I still don't know why rados bench disagreed (from kefu).

template <typename Consumer>
GCC6_CONCEPT(requires InputStreamConsumer<Consumer, CharType> || ObsoleteInputStreamConsumer<Consumer, CharType>)
future<> consume(Consumer&& c);
Expand Down Expand Up @@ -256,6 +274,7 @@ public:
data_source detach() &&;
private:
future<temporary_buffer<CharType>> read_exactly_part(size_t n, tmp_buf buf, size_t completed);
future<temporary_buffer<CharType>> read_exactly_part_direct(size_t n, tmp_buf buf, size_t completed);
};

// Facilitates data buffering before it's handed over to data_sink.
Expand Down
3 changes: 2 additions & 1 deletion include/seastar/net/posix-stack.hh
Original file line number Diff line number Diff line change
Expand Up @@ -109,9 +109,10 @@ class posix_data_source_impl final : public data_source_impl {
size_t _buf_size;
public:
explicit posix_data_source_impl(lw_shared_ptr<pollable_fd> fd, compat::polymorphic_allocator<char>* allocator=memory::malloc_allocator,
size_t buf_size = 8192) : _buffer_allocator(allocator), _fd(std::move(fd)),
size_t buf_size = 1 << 13) : _buffer_allocator(allocator), _fd(std::move(fd)),
_buf(make_temporary_buffer<char>(_buffer_allocator, buf_size)), _buf_size(buf_size) {}
future<temporary_buffer<char>> get() override;
future<size_t, temporary_buffer<char>> get_direct(char* buf, size_t size) override;
future<> close() override;
};

Expand Down
28 changes: 28 additions & 0 deletions src/net/posix-stack.cc
Original file line number Diff line number Diff line change
Expand Up @@ -315,16 +315,44 @@ posix_ap_server_socket_impl<Transport>::move_connected_socket(socket_address sa,
}
}

static constexpr size_t min_buf_size = 1 << 9;
static constexpr size_t max_buf_size = 1 << 15;

future<temporary_buffer<char>>
posix_data_source_impl::get() {
return _fd->read_some(_buf.get_write(), _buf_size).then([this] (size_t size) {
if (_buf_size == size) {
_buf_size = std::min(max_buf_size, _buf_size << 2);
} else if (size < (_buf_size >> 2)) {
_buf_size = std::max(min_buf_size, _buf_size >> 2);
}
_buf.trim(size);
auto ret = std::move(_buf);
_buf = make_temporary_buffer<char>(_buffer_allocator, _buf_size);
return make_ready_future<temporary_buffer<char>>(std::move(ret));
});
}

future<size_t, temporary_buffer<char>>
posix_data_source_impl::get_direct(char* buf, size_t size) {
if (size > _buf_size / 2) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, currently 4096 looks in the 1 syscal/msg testing as a reasonable threshold for prefetching.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, it is the same strategy implemented in the current async-messenger.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the ::get_direct() good place for such logic? Maybe moving it up is preferred? I'm afraid the name is currently a little bit misleading as the _direct part is actually conditional.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I cannot move it up, because this is the special case to reduce syscall for posix sockets, which is not general to other concrete data_source_impl classes such as native_data_source_impl.

// this was a large read, we don't prefetch
return _fd->read_some(buf, size).then([this] (auto read_size) {
if (_buf_size == read_size) {
_buf_size = std::min(max_buf_size, _buf_size << 2);
} else if (read_size < (_buf_size >> 2)) {
_buf_size = std::max(min_buf_size, _buf_size >> 2);
}
return make_ready_future<size_t, temporary_buffer<char>>(
read_size, temporary_buffer<char>());
});
} else {
// read with prefetch, but with extra memory copy,
// because we prefer less system calls.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about io_uring? Currently it's reasonable to do a lot of extra work just to lower the number of syscalls. However, io_uring is intended to lower the costs of communication between kernel and user-space, and thus I would expect it will move the threshold much lower.

Copy link
Member Author

@cyx1231st cyx1231st Jun 11, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For io_uring I believe we need another concrete data_source_impl class, not posix_data_source_impl . And io_uring also needs a new poller, right?

return data_source_impl::get_direct(buf, size);
}
}

future<> posix_data_source_impl::close() {
_fd->shutdown(SHUT_RD);
return make_ready_future<>();
Expand Down