Skip to content

Commit

Permalink
fix ws empty frame (#218)
Browse files Browse the repository at this point in the history
Signed-off-by: turuslan <[email protected]>
  • Loading branch information
turuslan authored Oct 12, 2023
1 parent 3afa249 commit 28f3bf0
Show file tree
Hide file tree
Showing 7 changed files with 117 additions and 10 deletions.
61 changes: 61 additions & 0 deletions include/libp2p/basic/read.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/**
* Copyright Soramitsu Co., Ltd. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0
*/

#ifndef LIBP2P_BASIC_READ_HPP
#define LIBP2P_BASIC_READ_HPP

#include <libp2p/basic/reader.hpp>
#include <libp2p/common/span_size.hpp>
#include <memory>

namespace libp2p {
/// Read exactly `out.size()` bytes
inline void read(const std::shared_ptr<basic::Reader> &reader,
gsl::span<uint8_t> out,
std::function<void(outcome::result<void>)> cb) {
auto post_cb = [](decltype(reader) reader, decltype(cb) &&cb,
outcome::result<size_t> r) {
reader->deferReadCallback(r,
[cb{std::move(cb)}](outcome::result<size_t> r) {
if (r.has_error()) {
cb(r.error());
} else {
cb(outcome::success());
}
});
};
if (out.empty()) {
return post_cb(reader, std::move(cb), outcome::success());
}
// read some bytes
reader->readSome(
out, spanSize(out),
[weak{std::weak_ptr{reader}}, out, cb{std::move(cb)},
post_cb](outcome::result<size_t> n_res) mutable {
auto reader = weak.lock();
if (not reader) {
return;
}
if (n_res.has_error()) {
return post_cb(reader, std::move(cb), n_res.error());
}
auto n = n_res.value();
if (n == 0) {
throw std::logic_error{"libp2p::read zero bytes read"};
}
if (n > spanSize(out)) {
throw std::logic_error{"libp2p::read too much bytes read"};
}
if (n == spanSize(out)) {
// successfully read last bytes
return post_cb(reader, std::move(cb), outcome::success());
}
// read remaining bytes
read(reader, out.subspan(n), std::move(cb));
});
}
} // namespace libp2p

#endif // LIBP2P_BASIC_READ_HPP
27 changes: 27 additions & 0 deletions include/libp2p/basic/read_return_size.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/**
* Copyright Soramitsu Co., Ltd. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0
*/

#ifndef LIBP2P_BASIC_READ_RETURN_SIZE_HPP
#define LIBP2P_BASIC_READ_RETURN_SIZE_HPP

#include <libp2p/basic/read.hpp>

namespace libp2p {
/// Read exactly `out.size()` bytes
inline void readReturnSize(const std::shared_ptr<basic::Reader> &reader,
gsl::span<uint8_t> out,
basic::Reader::ReadCallbackFunc cb) {
read(reader, out,
[n{spanSize(out)}, cb{std::move(cb)}](outcome::result<void> r) {
if (r.has_error()) {
cb(r.error());
} else {
cb(n);
}
});
}
} // namespace libp2p

#endif // LIBP2P_BASIC_READ_RETURN_SIZE_HPP
4 changes: 3 additions & 1 deletion include/libp2p/layer/websocket/ssl_connection.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@ namespace libp2p::layer {
} // namespace libp2p::layer

namespace libp2p::connection {
class SslConnection final : public LayerConnection {
class SslConnection final
: public LayerConnection,
public std::enable_shared_from_this<SslConnection> {
public:
SslConnection(std::shared_ptr<boost::asio::io_context> io_context,
std::shared_ptr<LayerConnection> connection,
Expand Down
4 changes: 2 additions & 2 deletions src/layer/websocket/ssl_connection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

#include <libp2p/layer/websocket/ssl_connection.hpp>

#include <boost/asio/read.hpp>
#include <libp2p/basic/read_return_size.hpp>
#include <libp2p/common/ambigous_size.hpp>
#include <libp2p/common/asio_buffer.hpp>
#include <libp2p/common/asio_cb.hpp>
Expand Down Expand Up @@ -45,7 +45,7 @@ namespace libp2p::connection {
void SslConnection::read(gsl::span<uint8_t> out, size_t bytes,
libp2p::basic::Reader::ReadCallbackFunc cb) {
ambigousSize(out, bytes);
boost::asio::async_read(ssl_, asioBuffer(out), toAsioCbSize(std::move(cb)));
readReturnSize(shared_from_this(), out, std::move(cb));
}

void SslConnection::readSome(gsl::span<uint8_t> out, size_t bytes,
Expand Down
18 changes: 15 additions & 3 deletions src/layer/websocket/ws_connection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

#include <libp2p/layer/websocket/ws_connection.hpp>

#include <boost/asio/read.hpp>
#include <libp2p/basic/read_return_size.hpp>
#include <libp2p/common/ambigous_size.hpp>
#include <libp2p/common/asio_buffer.hpp>
#include <libp2p/common/asio_cb.hpp>
Expand Down Expand Up @@ -129,14 +129,26 @@ namespace libp2p::connection {
libp2p::basic::Reader::ReadCallbackFunc cb) {
ambigousSize(out, bytes);
SL_TRACE(log_, "read {} bytes", bytes);
boost::asio::async_read(ws_, asioBuffer(out), toAsioCbSize(std::move(cb)));
readReturnSize(shared_from_this(), out, std::move(cb));
}

void WsConnection::readSome(gsl::span<uint8_t> out, size_t bytes,
libp2p::basic::Reader::ReadCallbackFunc cb) {
ambigousSize(out, bytes);
SL_TRACE(log_, "read some upto {} bytes", bytes);
ws_.async_read_some(asioBuffer(out), toAsioCbSize(std::move(cb)));
auto on_read = [weak{weak_from_this()}, out, cb{std::move(cb)}](
boost::system::error_code ec, size_t n) mutable {
if (ec) {
cb(ec);
} else if (n != 0) {
cb(n);
} else if (auto self = weak.lock()) {
self->readSome(out, spanSize(out), std::move(cb));
} else {
cb(boost::system::errc::broken_pipe);
}
};
ws_.async_read_some(asioBuffer(out), std::move(on_read));
}

void WsConnection::write(gsl::span<const uint8_t> in, //
Expand Down
7 changes: 5 additions & 2 deletions src/security/tls/tls_connection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@
#include "tls_connection.hpp"
#include "tls_details.hpp"

#include <libp2p/basic/read_return_size.hpp>
#include <libp2p/common/ambigous_size.hpp>

namespace libp2p::connection {

using TlsError = security::TlsError;
Expand Down Expand Up @@ -139,9 +142,9 @@ namespace libp2p::connection {

void TlsConnection::read(gsl::span<uint8_t> out, size_t bytes,
Reader::ReadCallbackFunc f) {
ambigousSize(out, bytes);
SL_TRACE(log(), "reading {} bytes", bytes);
boost::asio::async_read(socket_, makeBuffer(out, bytes),
closeOnError(*this, std::move(f)));
readReturnSize(shared_from_this(), out, std::move(f));
}

void TlsConnection::readSome(gsl::span<uint8_t> out, size_t bytes,
Expand Down
6 changes: 4 additions & 2 deletions src/transport/tcp/tcp_connection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

#include <libp2p/transport/tcp/tcp_connection.hpp>

#include <libp2p/basic/read_return_size.hpp>
#include <libp2p/common/ambigous_size.hpp>
#include <libp2p/transport/tcp/tcp_util.hpp>

#define TRACE_ENABLED 0
Expand Down Expand Up @@ -218,9 +220,9 @@ namespace libp2p::transport {

void TcpConnection::read(gsl::span<uint8_t> out, size_t bytes,
TcpConnection::ReadCallbackFunc cb) {
ambigousSize(out, bytes);
TRACE("{} read {}", debug_str_, bytes);
boost::asio::async_read(socket_, detail::makeBuffer(out, bytes),
closeOnError(*this, std::move(cb)));
readReturnSize(shared_from_this(), out, std::move(cb));
}

void TcpConnection::readSome(gsl::span<uint8_t> out, size_t bytes,
Expand Down

0 comments on commit 28f3bf0

Please sign in to comment.