diff --git a/core/network/CMakeLists.txt b/core/network/CMakeLists.txt index 8166b8b0d2..265cc4c3e6 100644 --- a/core/network/CMakeLists.txt +++ b/core/network/CMakeLists.txt @@ -32,6 +32,7 @@ add_library(network impl/peer_manager_impl.cpp impl/reputation_repository_impl.cpp helpers/scale_message_read_writer.cpp + helpers/compressor/zstd_stream_compressor.cpp adapters/adapter_errors.cpp impl/protocols/protocol_req_pov.cpp warp/cache.cpp diff --git a/core/network/adapters/protobuf_state_response.hpp b/core/network/adapters/protobuf_state_response.hpp index a2569ac522..e27a913993 100644 --- a/core/network/adapters/protobuf_state_response.hpp +++ b/core/network/adapters/protobuf_state_response.hpp @@ -76,4 +76,4 @@ namespace kagome::network { } }; -} // namespace kagome::network +} // namespace kagome::network \ No newline at end of file diff --git a/core/network/helpers/compressor/compressor.h b/core/network/helpers/compressor/compressor.h new file mode 100644 index 0000000000..e819690f5b --- /dev/null +++ b/core/network/helpers/compressor/compressor.h @@ -0,0 +1,22 @@ +/** + * Copyright Quadrivium LLC + * All Rights Reserved + * SPDX-License-Identifier: Apache-2.0 + */ + +#pragma once + +#include <cstdint> +#include <span> +#include <vector> + +#include "outcome/outcome.hpp" + +namespace kagome::network { +struct ICompressor { + virtual ~ICompressor() = default; + virtual outcome::result<std::vector<uint8_t>> compress(std::span<uint8_t> data) = 0; + virtual outcome::result<std::vector<uint8_t>> decompress(std::span<uint8_t> compressedData) = 0; +}; + +} // namespace kagome::network diff --git a/core/network/helpers/compressor/zstd_error.h b/core/network/helpers/compressor/zstd_error.h new file mode 100644 index 0000000000..b399c51ce1 --- /dev/null +++ b/core/network/helpers/compressor/zstd_error.h @@ -0,0 +1,149 @@ +/** + * Copyright Quadrivium LLC + * All Rights Reserved + * SPDX-License-Identifier: Apache-2.0 + */ + +#pragma once + +#include <zstd_errors.h> + +#include <qtils/enum_error_code.hpp> + +namespace kagome::network { +enum class ZstdStreamCompressorError { + UNKNOWN, + EXCEPTION, + UNKNOWN_EXCEPTION, + CONTEXT_ERROR, + ERROR_GENERIC, + PREFIX_UNKNOWN, + VERSION_UNSUPPORTED, + PARAMETER_UNKNOWN, + FRAME_PARAMETER_UNSUPPORTED, + FRAME_PARAMETER_WINDOW_TOO_LARGE, + COMPRESSION_PARAMETER_UNSUPPORTED, + INIT_MISSING, + MEMORY_ALLOCATION, + STAGE_WRONG, + DST_SIZE_TOO_SMALL, + SRC_SIZE_WRONG, + CORRUPTION_DETECTED, + CHECKSUM_WRONG, + TABLE_LOG_TOO_LARGE, + MAX_SYMBOL_VALUE_TOO_LARGE, + MAX_SYMBOL_VALUE_TOO_SMALL, + DICTIONARY_CORRUPTED, + DICTIONARY_WRONG, + DICTIONARY_CREATION_FAILED, + MAX_CODE, +}; + +ZstdStreamCompressorError convertErrorCode(ZSTD_ErrorCode errorCode) { + switch (errorCode) { + case ZSTD_error_no_error: + return ZstdStreamCompressorError::UNKNOWN; + case ZSTD_error_GENERIC: + return ZstdStreamCompressorError::ERROR_GENERIC; + case ZSTD_error_prefix_unknown: + return ZstdStreamCompressorError::PREFIX_UNKNOWN; + case ZSTD_error_version_unsupported: + return ZstdStreamCompressorError::VERSION_UNSUPPORTED; + case ZSTD_error_parameter_unsupported: + return ZstdStreamCompressorError::COMPRESSION_PARAMETER_UNSUPPORTED; + case ZSTD_error_frameParameter_unsupported: + return ZstdStreamCompressorError::FRAME_PARAMETER_UNSUPPORTED; + case ZSTD_error_frameParameter_windowTooLarge: + return ZstdStreamCompressorError::FRAME_PARAMETER_WINDOW_TOO_LARGE; + case ZSTD_error_init_missing: + return ZstdStreamCompressorError::INIT_MISSING; + case ZSTD_error_memory_allocation: + return ZstdStreamCompressorError::MEMORY_ALLOCATION; + case ZSTD_error_stage_wrong: + return ZstdStreamCompressorError::STAGE_WRONG; + case ZSTD_error_dstSize_tooSmall: + return ZstdStreamCompressorError::DST_SIZE_TOO_SMALL; + case ZSTD_error_srcSize_wrong: + return ZstdStreamCompressorError::SRC_SIZE_WRONG; + case ZSTD_error_corruption_detected: + return ZstdStreamCompressorError::CORRUPTION_DETECTED; + case ZSTD_error_checksum_wrong: + return ZstdStreamCompressorError::CHECKSUM_WRONG; + case ZSTD_error_tableLog_tooLarge: + return ZstdStreamCompressorError::TABLE_LOG_TOO_LARGE; + case ZSTD_error_maxSymbolValue_tooLarge: + return ZstdStreamCompressorError::MAX_SYMBOL_VALUE_TOO_LARGE; + case ZSTD_error_maxSymbolValue_tooSmall: + return ZstdStreamCompressorError::MAX_SYMBOL_VALUE_TOO_SMALL; + case ZSTD_error_dictionary_corrupted: + return ZstdStreamCompressorError::DICTIONARY_CORRUPTED; + case ZSTD_error_dictionary_wrong: + return ZstdStreamCompressorError::DICTIONARY_WRONG; + case ZSTD_error_dictionaryCreation_failed: + return ZstdStreamCompressorError::DICTIONARY_CREATION_FAILED; + case ZSTD_error_maxCode: + return ZstdStreamCompressorError::MAX_CODE; + default: + return ZstdStreamCompressorError::UNKNOWN; + } +} + +Q_ENUM_ERROR_CODE(ZstdStreamCompressorError) { + using E = decltype(e); + switch (e) { + case E::UNKNOWN: + return "Unknown error"; + case E::CONTEXT_ERROR: + return "Failed to create ZSTD compression context"; + case E::ERROR_GENERIC: + return "Generic error"; + case E::PREFIX_UNKNOWN: + return "Unknown prefix"; + case E::VERSION_UNSUPPORTED: + return "Unsupported version"; + case E::PARAMETER_UNKNOWN: + return "Unknown parameter"; + case E::FRAME_PARAMETER_UNSUPPORTED: + return "Unsupported frame parameter"; + case E::FRAME_PARAMETER_WINDOW_TOO_LARGE: + return "Frame parameter window too large"; + case E::COMPRESSION_PARAMETER_UNSUPPORTED: + return "Unsupported compression parameter"; + case E::INIT_MISSING: + return "Init missing"; + case E::MEMORY_ALLOCATION: + return "Memory allocation error"; + case E::STAGE_WRONG: + return "Wrong stage"; + case E::DST_SIZE_TOO_SMALL: + return "Destination size too small"; + case E::SRC_SIZE_WRONG: + return "Wrong source size"; + case E::CORRUPTION_DETECTED: + return "Corruption detected"; + case E::CHECKSUM_WRONG: + return "Wrong checksum"; + case E::TABLE_LOG_TOO_LARGE: + return "Table log too large"; + case E::MAX_SYMBOL_VALUE_TOO_LARGE: + return "Max symbol value too large"; + case E::MAX_SYMBOL_VALUE_TOO_SMALL: + return "Max symbol value too small"; + case E::DICTIONARY_CORRUPTED: + return "Dictionary corrupted"; + case E::DICTIONARY_WRONG: + return "Wrong dictionary"; + case E::DICTIONARY_CREATION_FAILED: + return "Dictionary creation failed"; + case E::MAX_CODE: + return "Max code"; + case E::EXCEPTION: + return "Exception"; + case E::UNKNOWN_EXCEPTION: + return "Unknown exception"; + default: + return "Unknown error"; + } +} + +} // namespace kagome::network diff --git a/core/network/helpers/compressor/zstd_stream_compressor.cpp b/core/network/helpers/compressor/zstd_stream_compressor.cpp new file mode 100644 index 0000000000..ee0ee28b1f --- /dev/null +++ b/core/network/helpers/compressor/zstd_stream_compressor.cpp @@ -0,0 +1,95 @@ +/** + * Copyright Quadrivium LLC + * All Rights Reserved + * SPDX-License-Identifier: Apache-2.0 + */ + +#include <memory> +#include <exception> + +#include <zstd.h> + +#include "zstd_stream_compressor.h" + +#include "zstd_error.h" + +namespace kagome::network { +outcome::result<std::vector<uint8_t>> ZstdStreamCompressor::compress(std::span<uint8_t> data) try { + std::unique_ptr<ZSTD_CCtx, void(*)(ZSTD_CCtx*)> cctx( + ZSTD_createCCtx(), + [](ZSTD_CCtx* c) { ZSTD_freeCCtx(c); } + ); + + if (!cctx) { + return ZstdStreamCompressorError::CONTEXT_ERROR; + } + + const auto setParameterResult = ZSTD_CCtx_setParameter(cctx.get(), ZSTD_c_compressionLevel, m_compressionLevel); + if (ZSTD_isError(setParameterResult)) { + return convertErrorCode(ZSTD_getErrorCode(setParameterResult)); + } + + const auto maxCompressedSize = ZSTD_compressBound(data.size()); + std::vector<uint8_t> compressedData(maxCompressedSize); + + ZSTD_inBuffer input = { data.data(), data.size(), 0 }; + ZSTD_outBuffer output = { compressedData.data(), compressedData.size(), 0 }; + + while (input.pos < input.size) { + const auto compressResult = ZSTD_compressStream(cctx.get(), &output, &input); + if (ZSTD_isError(compressResult)) { + return convertErrorCode(ZSTD_getErrorCode(compressResult)); + } + } + + size_t remaining; + do { + remaining = ZSTD_endStream(cctx.get(), &output); + if (ZSTD_isError(remaining)) { + return convertErrorCode(ZSTD_getErrorCode(remaining)); + } + } while (remaining > 0); + + compressedData.resize(output.pos); + + return compressedData; +} catch (const std::exception& e) { + return ZstdStreamCompressorError::EXCEPTION; +} +catch (...) { + return ZstdStreamCompressorError::UNKNOWN_EXCEPTION; +} + +outcome::result<std::vector<uint8_t>> ZstdStreamCompressor::decompress(std::span<uint8_t> compressedData) try { + std::unique_ptr<ZSTD_DCtx, void(*)(ZSTD_DCtx*)> dctx( + ZSTD_createDCtx(), + [](ZSTD_DCtx* d) { ZSTD_freeDCtx(d); } + ); + if (dctx == nullptr) { + return ZstdStreamCompressorError::CONTEXT_ERROR; + } + + std::vector<uint8_t> decompressedData; + std::vector<uint8_t> outBuffer(ZSTD_DStreamOutSize()); + + ZSTD_inBuffer input = { compressedData.data(), compressedData.size(), 0 }; + ZSTD_outBuffer output = { outBuffer.data(), outBuffer.size(), 0 }; + + while (input.pos < input.size) { + size_t ret = ZSTD_decompressStream(dctx.get(), &output, &input); + if (ZSTD_isError(ret)) { + return convertErrorCode(ZSTD_getErrorCode(ret)); + } + + decompressedData.insert(decompressedData.end(), outBuffer.data(), outBuffer.data() + output.pos); + output.pos = 0; + } + + return decompressedData; +} catch (const std::exception& e) { + return ZstdStreamCompressorError::EXCEPTION; +} catch (...) { + return ZstdStreamCompressorError::UNKNOWN_EXCEPTION; +} + +} // namespace kagome::network diff --git a/core/network/helpers/compressor/zstd_stream_compressor.h b/core/network/helpers/compressor/zstd_stream_compressor.h new file mode 100644 index 0000000000..05183da764 --- /dev/null +++ b/core/network/helpers/compressor/zstd_stream_compressor.h @@ -0,0 +1,19 @@ +/** + * Copyright Quadrivium LLC + * All Rights Reserved + * SPDX-License-Identifier: Apache-2.0 + */ + +#pragma once + +#include "compressor.h" +namespace kagome::network { +struct ZstdStreamCompressor : public ICompressor { + ZstdStreamCompressor(int compressionLevel = 3) : m_compressionLevel(compressionLevel) {} + outcome::result<std::vector<uint8_t>> compress(std::span<uint8_t> data) override; + outcome::result<std::vector<uint8_t>> decompress(std::span<uint8_t> compressedData) override; +private: + int m_compressionLevel; +}; + +} // namespace kagome::network diff --git a/core/network/helpers/protobuf_message_read_writer.hpp b/core/network/helpers/protobuf_message_read_writer.hpp index 79e42314fd..5615a1420c 100644 --- a/core/network/helpers/protobuf_message_read_writer.hpp +++ b/core/network/helpers/protobuf_message_read_writer.hpp @@ -16,6 +16,7 @@ #include "network/adapters/protobuf.hpp" #include "network/adapters/uvar.hpp" #include "network/helpers/message_read_writer.hpp" +#include "network/helpers/compressor/compressor.h" #include "scale/scale.hpp" namespace kagome::network { @@ -42,13 +43,22 @@ namespace kagome::network { * @param cb to be called, when the message is read, or error happens */ template <typename MsgType> - void read(ReadCallback<MsgType> &&cb) const { + void read(ReadCallback<MsgType> &&cb, std::shared_ptr<ICompressor> decompressor = nullptr + ) const { read_writer_->read( - [self{shared_from_this()}, cb = std::move(cb)](auto &&read_res) { + [self{shared_from_this()}, cb = std::move(cb), decompressor](auto &&read_res) { if (!read_res) { return cb(read_res.error()); } + if (decompressor) { + std::span<uint8_t> compressed{read_res.value()->begin(), read_res.value()->end()}; + auto compressionRes = decompressor->decompress(compressed); + if (!compressionRes) { + return cb(outcome::failure(compressionRes.error())); + } + *read_res.value() = std::move(compressionRes.value()); + } using ProtobufRW = MessageReadWriter<ProtobufMessageAdapter<MsgType>, NoSink>; @@ -72,7 +82,7 @@ namespace kagome::network { */ template <typename MsgType> void write(const MsgType &msg, - libp2p::basic::Writer::WriteCallbackFunc &&cb) const { + libp2p::basic::Writer::WriteCallbackFunc &&cb, std::shared_ptr<ICompressor> compressor = nullptr) const { using ProtobufRW = MessageReadWriter<ProtobufMessageAdapter<MsgType>, NoSink>; @@ -83,15 +93,33 @@ namespace kagome::network { std::span<uint8_t> data(it.base(), out.size() - std::distance(out.begin(), it)); - read_writer_->write(data, - [self{shared_from_this()}, - out{std::move(out)}, - cb = std::move(cb)](auto &&write_res) { - if (!write_res) { - return cb(write_res.error()); - } - cb(outcome::success()); - }); + if (compressor == nullptr) { + read_writer_->write(data, + [self{shared_from_this()}, + out{std::move(out)}, + cb = std::move(cb)](auto &&write_res) { + if (!write_res) { + return cb(write_res.error()); + } + cb(outcome::success()); + }); + } else { + auto compressionRes = compressor->compress(data); + if (!compressionRes) { + return cb(outcome::failure(compressionRes.error())); + } + auto compressedData = std::move(compressionRes.value()); + std::span<uint8_t> compressedDataSpan(compressedData.data(), compressedData.size()); + read_writer_->write(compressedDataSpan, + [self{shared_from_this()}, + compressedData{std::move(compressedData)}, + cb = std::move(cb)](auto &&write_res) { + if (!write_res) { + return cb(write_res.error()); + } + cb(outcome::success()); + }); + } } }; diff --git a/core/network/impl/protocols/state_protocol_impl.cpp b/core/network/impl/protocols/state_protocol_impl.cpp index 4d6704ffcb..85f40657dc 100644 --- a/core/network/impl/protocols/state_protocol_impl.cpp +++ b/core/network/impl/protocols/state_protocol_impl.cpp @@ -10,8 +10,8 @@ #include "network/adapters/protobuf_state_response.hpp" #include "network/common.hpp" #include "network/helpers/protobuf_message_read_writer.hpp" +#include "network/helpers/compressor/zstd_stream_compressor.h" #include "network/impl/protocols/protocol_error.hpp" - namespace kagome::network { StateProtocolImpl::StateProtocolImpl( @@ -19,11 +19,13 @@ namespace kagome::network { const application::ChainSpec &chain_spec, const blockchain::GenesisBlockHash &genesis_hash, std::shared_ptr<StateProtocolObserver> state_observer) - : base_(kStateProtocolName, - host, - make_protocols(kStateProtocol, genesis_hash, chain_spec), - log::createLogger(kStateProtocolName, "state_protocol")), - state_observer_(std::move(state_observer)) { + : base_(kStateProtocolName + , host + , make_protocols(kStateProtocol, genesis_hash, chain_spec) + , log::createLogger(kStateProtocolName, "state_protocol")) + , state_observer_(std::move(state_observer)) + // , state_response_compressor_(std::make_shared<ZstdStreamCompressor>()) + { BOOST_ASSERT(state_observer_ != nullptr); } @@ -228,7 +230,7 @@ namespace kagome::network { } stream->close([](auto &&...) {}); - }); + }, state_response_compressor_); } void StateProtocolImpl::writeRequest( @@ -275,7 +277,7 @@ namespace kagome::network { }); } - void StateProtocolImpl::readResponse( + void StateProtocolImpl::readResponse( std::shared_ptr<Stream> stream, std::function<void(outcome::result<StateResponse>)> &&response_handler) { auto read_writer = std::make_shared<ProtobufMessageReadWriter>(stream); @@ -317,7 +319,7 @@ namespace kagome::network { stream->reset(); response_handler(std::move(state_response)); - }); + }, state_response_compressor_); } } // namespace kagome::network diff --git a/core/network/impl/protocols/state_protocol_impl.hpp b/core/network/impl/protocols/state_protocol_impl.hpp index 2c1c4fc595..1a61ddeb7b 100644 --- a/core/network/impl/protocols/state_protocol_impl.hpp +++ b/core/network/impl/protocols/state_protocol_impl.hpp @@ -17,6 +17,7 @@ #include "log/logger.hpp" #include "network/impl/protocols/protocol_base_impl.hpp" #include "network/state_protocol_observer.hpp" +#include <network/helpers/compressor/compressor.h> #include "utils/non_copyable.hpp" namespace kagome::blockchain { @@ -75,6 +76,7 @@ namespace kagome::network { inline static const auto kStateProtocolName = "StateProtocol"s; ProtocolBaseImpl base_; std::shared_ptr<StateProtocolObserver> state_observer_; + std::shared_ptr<ICompressor> state_response_compressor_; }; } // namespace kagome::network diff --git a/core/network/protobuf/api.v1.proto b/core/network/protobuf/api.v1.proto index b8f234f385..1e740dec3b 100644 --- a/core/network/protobuf/api.v1.proto +++ b/core/network/protobuf/api.v1.proto @@ -92,6 +92,11 @@ message StateResponse { bytes proof = 2; } +message StateResponseCompressed { + // compressed zstd-stream data representing StateResponse + bytes payload = 1; +} + // A key value state. message KeyValueStateEntry { // Root of for this level, empty length bytes diff --git a/core/network/types/state_response_compressed.hpp b/core/network/types/state_response_compressed.hpp new file mode 100644 index 0000000000..12737a1402 --- /dev/null +++ b/core/network/types/state_response_compressed.hpp @@ -0,0 +1,21 @@ +/** + * Copyright Quadrivium LLC + * All Rights Reserved + * SPDX-License-Identifier: Apache-2.0 + */ + +#pragma once + +#include <vector> +#include <cstdint> + +namespace kagome::network { + + /** + * Response to the StateRequest but compressed + */ + struct StateResponseCompressed { + /// Compressed state Response + std::vector<uint8_t> data; + }; +} // namespace kagome::network