diff --git a/Cargo.toml b/Cargo.toml
index c61276942..33d8f37cb 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -54,10 +54,11 @@ lto = true
codegen-units = 1
[workspace.package]
-authors = ["Setheum Labs", "Open Web3 Foundation"]
+authors = [ "Open Web3 Foundation", "Setheum Labs", "Slixon Technologies"]
edition = "2021"
homepage = "https://setheum.xyz"
repository = "https://github.com/Setheum-Labs/Setheum"
+version = "0.9.81-dev"
# The list of dependencies below (which can be both direct and indirect dependencies) are crates
# that are suspected to be CPU-intensive, and that are unlikely to require debugging (as some of
@@ -202,7 +203,7 @@ orml-xtokens = { path = "orml/xtokens", default-features = false }
wasm-bencher = { git = "https://github.com/open-web3-stack/wasm-bencher", branch = "polkadot-v1.3.0", default-features = false }
# Setheum (WASM)
-primitives = { package = "setheum-primitives", path = "blockchain/primitives", default-features = false }
+primitives = { package = "setheum-primitives", path = "primitives", default-features = false }
runtime-common = { path = "blockchain/runtime/common", default-features = false }
qingdao-runtime = { path = "blockchain/runtime/qingdao", default-features = false }
setheum-runtime = { path = "blockchain/runtime/setheum", default-features = false }
diff --git a/Makefile b/Makefile
index d6cbf97e9..0d9473590 100644
--- a/Makefile
+++ b/Makefile
@@ -1,128 +1,116 @@
-.PHONY: toolchain
-toolchain:
- ./scripts/init.sh
-
-.PHONY: init
-init:
- make toolchain
- make submodule
-
-.PHONY: submodule
-submodule:
- git submodule update --init --recursive
-
-.PHONY: release
-release:
- make toolchain
- rm -rf target/
- cargo build --manifest-path node/Cargo.toml --features with-ethereum-compatibility --release
-
-.PHONY: build
-build:
- cargo build --manifest-path node/Cargo.toml --features runtime-benchmarks,with-ethereum-compatibility --release
-
-.PHONY: wasm
-wasm:
- cargo build -p setheum-runtime --features with-ethereum-compatibility --release
-
-.PHONY: genesis
-genesis:
- make build
- ./target/release/setheum-node build-spec --chain testnet-new > blockchain/bin/resources/chain_spec_testnet.json
- ./target/release/setheum-node build-spec --chain mainnet-new > blockchain/bin/resources/chain_spec_mainnet.json
- ./target/release/setheum-node build-spec --chain testnet-new --raw > blockchain/bin/resources/chain_spec_testnet_raw.json
- ./target/release/setheum-node build-spec --chain mainnet-new --raw > blockchain/bin/resources/chain_spec_mainnet_raw.json
-
-.PHONY: check
-check:
- SKIP_WASM_BUILD=1 cargo check
-
-.PHONY: check-all
-check-all: check-runtime check-benchmarks
-
-.PHONY: check-runtime
-check-runtime:
- SKIP_WASM_BUILD= cargo check --features with-ethereum-compatibility --tests --all
-
-.PHONY: clippy
-clippy:
- SKIP_WASM_BUILD=1 cargo clippy -- -D warnings -A clippy::from-over-into -A clippy::unnecessary-cast -A clippy::identity-op -A clippy::upper-case-acronyms
-
-.PHONY: watch
-watch:
- SKIP_WASM_BUILD=1 cargo watch -c -x build
-
-.PHONY: test
-test:
- SKIP_WASM_BUILD=1 cargo test --features with-ethereum-compatibility --all
-
-.PHONY: check-tests
-check-tests:
- SKIP_WASM_BUILD= cargo check --features with-ethereum-compatibility --tests --all
-
-.PHONY: debug
-debug:
- cargo build && RUST_LOG=debug RUST_BACKTRACE=1 rust-gdb --args target/debug/setheum-node --dev --tmp -lruntime=debug
-
-.PHONY: run
-run:
- RUST_BACKTRACE=1 cargo run --manifest-path node/Cargo.toml --features with-ethereum-compatibility -- --dev --tmp
-
-.PHONY: log
-log:
- RUST_BACKTRACE=1 RUST_LOG=debug cargo run --manifest-path node/Cargo.toml --features with-ethereum-compatibility -- --dev --tmp
-
-.PHONY: noeth
-noeth:
- RUST_BACKTRACE=1 cargo run -- --dev --tmp
-
-.PHONY: check-benchmarks
-check-benchmarks:
- SKIP_WASM_BUILD= cargo check --features runtime-benchmarks --no-default-features --target=wasm32-unknown-unknown -p setheum-runtime
-
-.PHONY: test-benchmarking
-test-benchmarking:
- cargo test --features bench --package module-evm
- cargo test --features runtime-benchmarks --features with-ethereum-compatibility --features --all benchmarking
-
-.PHONY: bench
-bench:
- SKIP_WASM_BUILD=1 cargo test --manifest-path node/Cargo.toml --features runtime-benchmarks,with-ethereum-compatibility benchmarking
-
-.PHONY: benchmark
-benchmark:
- cargo run --release --features=runtime-benchmarks --features=with-ethereum-compatibility -- benchmark --chain=dev --steps=50 --repeat=20 '--pallet=*' '--extrinsic=*' --execution=wasm --wasm-execution=compiled --heap-pages=4096 --template=.maintain/runtime-weight-template.hbs --output=./runtime/src/weights/
-
-.PHONY: doc
-doc:
- SKIP_WASM_BUILD=1 cargo doc --open
-
-.PHONY: cargo-update
-cargo-update:
- cargo update
- cargo update --manifest-path node/Cargo.toml
- make test
-
-.PHONY: purge
-purge: target/debug/setheum-node
- target/debug/setheum-node purge-chain --dev -y
-
-.PHONY: restart
-restart: purge run
-
-.PHONY: fork
-fork:
- npm i --prefix fork fork
-ifeq (,$(wildcard fork/data))
- mkdir fork/data
-endif
- cp target/release/setheum-node fork/data/binary
- cp target/release/wbuild/setheum-runtime/setheum_runtime.compact.wasm fork/data/runtime.wasm
- cp blockchain/bin/resources/types.json fork/data/schema.json
- cp blockchain/bin/resources/chain_spec_$(chain)_raw.json fork/data/genesis.json
- cd fork && npm start && cd ..
-
-.PHONY: generate-tokens
-generate-tokens:
- cargo test -p setheum-primitives -- --ignored
- cd blockchain/predeploy-contracts && yarn && yarn run generate-bytecode
+.PHONY: toolchain
+toolchain:
+ ./scripts/init.sh
+
+.PHONY: init
+init:
+ make toolchain
+ make submodule
+
+.PHONY: submodule
+submodule:
+ git submodule update --init --recursive
+
+.PHONY: release
+release:
+ make toolchain
+ rm -rf target/
+ cargo build --manifest-path node/Cargo.toml --features with-ethereum-compatibility --release
+
+.PHONY: build
+build:
+ cargo build --manifest-path node/Cargo.toml --features runtime-benchmarks,with-ethereum-compatibility --release
+
+.PHONY: wasm
+wasm:
+ cargo build -p setheum-runtime --features with-ethereum-compatibility --release
+
+.PHONY: genesis
+genesis:
+ make build
+ ./target/release/setheum-node build-spec --chain testnet-new > blockchain/bin/resources/chain_spec_testnet.json
+ ./target/release/setheum-node build-spec --chain mainnet-new > blockchain/bin/resources/chain_spec_mainnet.json
+ ./target/release/setheum-node build-spec --chain testnet-new --raw > blockchain/bin/resources/chain_spec_testnet_raw.json
+ ./target/release/setheum-node build-spec --chain mainnet-new --raw > blockchain/bin/resources/chain_spec_mainnet_raw.json
+
+.PHONY: check
+check:
+ SKIP_WASM_BUILD=1 cargo check
+
+.PHONY: check-all
+check-all: check-runtime check-benchmarks
+
+.PHONY: check-runtime
+check-runtime:
+ SKIP_WASM_BUILD= cargo check --features with-ethereum-compatibility --tests --all
+
+.PHONY: clippy
+clippy:
+ SKIP_WASM_BUILD=1 cargo clippy -- -D warnings -A clippy::from-over-into -A clippy::unnecessary-cast -A clippy::identity-op -A clippy::upper-case-acronyms
+
+.PHONY: watch
+watch:
+ SKIP_WASM_BUILD=1 cargo watch -c -x build
+
+.PHONY: test
+test:
+ SKIP_WASM_BUILD=1 cargo test --features with-ethereum-compatibility --all
+
+.PHONY: check-tests
+check-tests:
+ SKIP_WASM_BUILD= cargo check --features with-ethereum-compatibility --tests --all
+
+.PHONY: debug
+debug:
+ cargo build && RUST_LOG=debug RUST_BACKTRACE=1 rust-gdb --args target/debug/setheum-node --dev --tmp -lruntime=debug
+
+.PHONY: run
+run:
+ RUST_BACKTRACE=1 cargo run --manifest-path node/Cargo.toml --features with-ethereum-compatibility -- --dev --tmp
+
+.PHONY: log
+log:
+ RUST_BACKTRACE=1 RUST_LOG=debug cargo run --manifest-path node/Cargo.toml --features with-ethereum-compatibility -- --dev --tmp
+
+.PHONY: noeth
+noeth:
+ RUST_BACKTRACE=1 cargo run -- --dev --tmp
+
+.PHONY: check-benchmarks
+check-benchmarks:
+ SKIP_WASM_BUILD= cargo check --features runtime-benchmarks --no-default-features --target=wasm32-unknown-unknown -p setheum-runtime
+
+.PHONY: test-benchmarking
+test-benchmarking:
+ cargo test --features bench --package module-evm
+ cargo test --features runtime-benchmarks --features with-ethereum-compatibility --features --all benchmarking
+
+.PHONY: bench
+bench:
+ SKIP_WASM_BUILD=1 cargo test --manifest-path node/Cargo.toml --features runtime-benchmarks,with-ethereum-compatibility benchmarking
+
+.PHONY: benchmark
+benchmark:
+ cargo run --release --features=runtime-benchmarks --features=with-ethereum-compatibility -- benchmark --chain=dev --steps=50 --repeat=20 '--pallet=*' '--extrinsic=*' --execution=wasm --wasm-execution=compiled --heap-pages=4096 --template=.maintain/runtime-weight-template.hbs --output=./runtime/src/weights/
+
+.PHONY: doc
+doc:
+ SKIP_WASM_BUILD=1 cargo doc --open
+
+.PHONY: cargo-update
+cargo-update:
+ cargo update
+ cargo update --manifest-path node/Cargo.toml
+ make test
+
+.PHONY: purge
+purge: target/debug/setheum-node
+ target/debug/setheum-node purge-chain --dev -y
+
+.PHONY: restart
+restart: purge run
+
+.PHONY: generate-tokens
+generate-tokens:
+ cargo test -p setheum-primitives -- --ignored
+ cd blockchain/predeploy-contracts && yarn && yarn run generate-bytecode
diff --git a/README.md b/README.md
index 65f2f4792..038f48205 100644
--- a/README.md
+++ b/README.md
@@ -38,10 +38,10 @@ Setheum's Blockchain Network node Implementation in Rust, ready for hacking :roc
- [Setheum - Powering The New Internet](#setheum---powering-the-new-internet)
- [1.0. Introduction](#10-introduction)
- [1.1. Setheum Chain](#11-setheum-chain)
- - [1.2. Ethical DeFi](#12-ethicaldefi)
- - [1.2.1 Edfis - Ethical DeFi Swap](#121-edfis---ethical-defi-swap)
- - [1.2.2 The Setter Stablecoin](#122-the-setter-stablecoin)
- - [1.2.3 The Slick USD Stablecoin](#123-the-slick-usd-stablecoin)
+ - [1.2. Ethical DeFi](#12-ethical-defi)
+ - [1.2.1. Edfis - Ethical DeFi Swap](#121-edfis---ethical-defi-swap)
+ - [1.2.2. The Setter Stablecoin](#122-the-setter-stablecoin)
+ - [1.2.3. The Slick USD Stablecoin](#123-the-slick-usd-stablecoin)
- [2.0. Getting Started](#20-getting-started)
- [2.1. Build](#21-build)
- [2.2. Run](#22-run)
@@ -363,10 +363,10 @@ project adheres to the [Contributor Covenant Code of Conduct](./CODE_OF_CONDUCT.
### 8.1. ToDo List
-Note> Before adding/removing a TODO, please carefully read the [TODO.md file](./TODO.md)
+Note> Before adding/removing a TODO, please carefully read the [TODO.md file](./docs/TODO.md)
-Whenever you write a TODO in any file, please add a reference to it [here](./TODO.md).
-Whenever you remove a TODO in any file, please remove its reference from [here](./TODO.md).
+Whenever you write a TODO in any file, please add a reference to it [here](./docs/TODO.md).
+Whenever you remove a TODO in any file, please remove its reference from [here](./docs/TODO.md).
## 9.0. License
diff --git a/blockchain/aggregator/Cargo.toml b/blockchain/aggregator/Cargo.toml
new file mode 100644
index 000000000..c70026392
--- /dev/null
+++ b/blockchain/aggregator/Cargo.toml
@@ -0,0 +1,16 @@
+[package]
+name = "aggregator"
+version = "0.6.0"
+authors.workspace = true
+edition.workspace = true
+homepage.workspace = true
+repository.workspace = true
+
+[dependencies]
+aleph-bft-rmc = { workspace = true }
+aleph-bft-types = { workspace = true }
+async-trait = { workspace = true }
+futures = { workspace = true }
+log = { workspace = true }
+parity-scale-codec = { workspace = true, features = ["derive"] }
+tokio = { workspace = true, features = [ "sync", "macros", "time", "rt-multi-thread" ] }
diff --git a/blockchain/aggregator/README.md b/blockchain/aggregator/README.md
new file mode 100644
index 000000000..72eba174b
--- /dev/null
+++ b/blockchain/aggregator/README.md
@@ -0,0 +1,7 @@
+# Aggregator
+
+## Overview
+
+This crate provides an AlephBFT Block Signature Aggregator
+
+Synchronize with [Aleph Aggregator](https://github.com/Cardinal-Cryptography/aleph-node/tree/main/aggregator)
diff --git a/blockchain/aggregator/TODO.md b/blockchain/aggregator/TODO.md
new file mode 100644
index 000000000..fb25575ad
--- /dev/null
+++ b/blockchain/aggregator/TODO.md
@@ -0,0 +1,56 @@
+# To-Do List
+
+This list contains all TODOs in the Repo
+
+
+
+- [To-Do List](#to-do-list)
+ - [1. Guidelines](#1-guidelines)
+ - [2. Contribution](#2-contribution)
+ - [3. Lists](#3-lists)
+ - [4. Tasks](#4-tasks)
+
+
+
+## 1. Guidelines
+
+Note: Before you write a ToDo in this repo, please read the below guidelines carefully.
+
+Whenever you write a ToDo, you need to follow this standard syntax
+
+```rust
+//TODO:[file_name:task_number] - task_details
+```
+
+for example:
+
+```rust
+//TODO:[TODO.md:0] - Add Todo Guidelines
+```
+
+Note > the `//TODO:[filename:task_number] - ` is what we call the `task_prefix`.
+
+Whenever adding/writing a Task/ToDo, you need to describe the task on this list. Whenever you write a TODO in any file, add a reference to it here. Please make sure the task reference here is titled correctly and as detailed as possible\.
+
+Whenever you `complete` a task/TODO from any file, please tick/complete its reference here and make sure you do it in the same `commit` that completes the task.
+
+Whenever a task is cancelled (discontinued or not needed for w/e reason), please note in the details why it is cancelled, make sure you do it in the same `commit` that removes/cancels the TODO, and add this `-C` as a suffix to its `file_name` in the list here, for example:
+
+```rust
+//TODO:[TODO.md-C:0] - Add Todo Guidelines
+```
+
+## 2. Contribution
+
+You can contribute to this list by completing tasks or by adding tasks(TODOs) that are currently in the repo but not on the list. You can also contribute by updating old tasks to the new Standard.
+
+## 3. Lists
+
+Each package/module/directory has its own `TODO.md`.
+
+## 4. Tasks
+
+These tasks are just for this file specifically.
+
+- [x] [[TODO.md:0] - Add TODO.md File](TODO.md): Add a TODO.md file to organise TODOs in the repo.
+- [x] [[TODO.md:1] - Add a `task_title`](/TODO.md/#tasks): Adda `task_title`.
diff --git a/blockchain/aggregator/aggregator.rs b/blockchain/aggregator/aggregator.rs
new file mode 100644
index 000000000..649cd425e
--- /dev/null
+++ b/blockchain/aggregator/aggregator.rs
@@ -0,0 +1,295 @@
+// بِسْمِ اللَّهِ الرَّحْمَنِ الرَّحِيم
+
+// This file is part of Setheum.
+
+// Copyright (C) 2019-Present Setheum Labs.
+// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
+
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+
+// You should have received a copy of the GNU General Public License
+// along with this program. If not, see .
+
+use std::{
+ collections::{HashMap, HashSet, VecDeque},
+ fmt::Debug,
+ time::Instant,
+};
+
+use aleph_bft_rmc::{DoublingDelayScheduler, MultiKeychain, Multisigned, Service as RmcService};
+use aleph_bft_types::Recipient;
+use log::{debug, info, trace, warn};
+
+use crate::{Hash, ProtocolSink, RmcNetworkData, SignableHash};
+
+#[derive(Debug, PartialEq, Eq)]
+pub enum AggregatorError {
+ NoHashFound,
+ DuplicateHash,
+}
+
+pub enum IOError {
+ NetworkChannelClosed,
+}
+
+pub type AggregatorResult = Result;
+pub type IOResult = Result<(), IOError>;
+type Rmc =
+ RmcService, MK, DoublingDelayScheduler>>;
+
+/// A wrapper around an `rmc::Multicast` returning the signed hashes in the order of the [`Multicast::start_multicast`] calls.
+pub struct BlockSignatureAggregator {
+ signatures: HashMap,
+ hash_queue: VecDeque,
+ started_hashes: HashSet,
+ last_change: Instant,
+}
+
+impl Default for BlockSignatureAggregator {
+ fn default() -> Self {
+ Self::new()
+ }
+}
+
+impl BlockSignatureAggregator {
+ pub fn new() -> Self {
+ BlockSignatureAggregator {
+ signatures: HashMap::new(),
+ hash_queue: VecDeque::new(),
+ started_hashes: HashSet::new(),
+ last_change: Instant::now(),
+ }
+ }
+
+ fn on_start(&mut self, hash: H) -> AggregatorResult<()> {
+ if !self.started_hashes.insert(hash) {
+ return Err(AggregatorError::DuplicateHash);
+ }
+ if self.hash_queue.is_empty() {
+ self.last_change = Instant::now();
+ }
+ self.hash_queue.push_back(hash);
+
+ Ok(())
+ }
+
+ fn on_multisigned_hash(&mut self, hash: H, signature: PMS) {
+ debug!(target: "aleph-aggregator", "New multisigned_hash {:?}.", hash);
+ self.signatures.insert(hash, signature);
+ }
+
+ fn try_pop_hash(&mut self) -> AggregatorResult<(H, PMS)> {
+ match self.hash_queue.pop_front() {
+ Some(hash) => {
+ if let Some(multisignature) = self.signatures.remove(&hash) {
+ self.last_change = Instant::now();
+ Ok((hash, multisignature))
+ } else {
+ self.hash_queue.push_front(hash);
+ Err(AggregatorError::NoHashFound)
+ }
+ }
+ None => Err(AggregatorError::NoHashFound),
+ }
+ }
+
+ pub fn status_report(&self) {
+ let mut status = String::from("Block Signature Aggregator status report: ");
+
+ status.push_str(&format!(
+ "started hashes - {:?}; ",
+ self.started_hashes.len()
+ ));
+
+ status.push_str(&format!(
+ "collected signatures - {:?}; ",
+ self.signatures.len()
+ ));
+
+ status.push_str(&format!("hashes in queue - {:?}; ", self.hash_queue.len()));
+
+ if let Some(hash) = self.hash_queue.front() {
+ status.push_str(&format!(
+ "front of hash queue - {} for - {:.2} s; ",
+ hash,
+ Instant::now()
+ .saturating_duration_since(self.last_change)
+ .as_secs_f64()
+ ));
+ }
+
+ info!(target: "aleph-aggregator", "{}", status);
+ }
+}
+
+pub struct IO<
+ H: Hash + Copy,
+ N: ProtocolSink>,
+ MK: MultiKeychain,
+> {
+ network: N,
+ rmc_service: Rmc,
+ aggregator: BlockSignatureAggregator,
+ multisigned_events: VecDeque, MK>>,
+}
+
+impl<
+ H: Copy + Hash,
+ N: ProtocolSink>,
+ MK: MultiKeychain,
+ > IO
+{
+ pub fn new(
+ network: N,
+ rmc_service: Rmc,
+ aggregator: BlockSignatureAggregator,
+ ) -> Self {
+ IO {
+ network,
+ rmc_service,
+ aggregator,
+ multisigned_events: VecDeque::new(),
+ }
+ }
+
+ pub fn status_report(&self) {
+ self.aggregator.status_report()
+ }
+
+ pub async fn start_aggregation(&mut self, hash: H) {
+ debug!(target: "aleph-aggregator", "Started aggregation for block hash {:?}", hash);
+ if let Err(AggregatorError::DuplicateHash) = self.aggregator.on_start(hash) {
+ debug!(target: "aleph-aggregator", "Aggregation already started for block hash {:?}, ignoring.", hash);
+ return;
+ }
+ if let Some(multisigned) = self.rmc_service.start_rmc(SignableHash::new(hash)) {
+ self.multisigned_events.push_back(multisigned);
+ }
+ }
+
+ async fn wait_for_next_signature(&mut self) -> IOResult {
+ loop {
+ if let Some(multisigned) = self.multisigned_events.pop_front() {
+ let unchecked = multisigned.into_unchecked();
+ let signature = unchecked.signature();
+ self.aggregator
+ .on_multisigned_hash(unchecked.into_signable().get_hash(), signature);
+ return Ok(());
+ }
+ tokio::select! {
+ message_from_rmc = self.rmc_service.next_message() => {
+ trace!(target: "aleph-aggregator", "Our rmc message {:?}.", message_from_rmc);
+ if let Err(e) = self.network.send(message_from_rmc, Recipient::Everyone) {
+ warn!(target: "aleph-aggregator", "failed broadcasting a message from rmc: {:?}", e);
+ }
+ }
+ message_from_network = self.network.next() => match message_from_network {
+ Some(message) => {
+ trace!(target: "aleph-aggregator", "Received message for rmc: {:?}", message);
+ if let Some(multisigned) = self.rmc_service.process_message(message) {
+ self.multisigned_events.push_back(multisigned);
+ }
+ },
+ None => {
+ // In case the network is down we can terminate (?).
+ return Err(IOError::NetworkChannelClosed);
+ }
+ }
+ }
+ }
+ }
+
+ pub async fn next_multisigned_hash(&mut self) -> Option<(H, MK::PartialMultisignature)> {
+ loop {
+ trace!(target: "aleph-aggregator", "Entering next_multisigned_hash loop.");
+ match self.aggregator.try_pop_hash() {
+ Ok(res) => {
+ return Some(res);
+ }
+ Err(AggregatorError::NoHashFound) => { /* ignored */ }
+ Err(AggregatorError::DuplicateHash) => {
+ warn!(
+ target: "aleph-aggregator",
+ "Unexpected aggregator exception in IO: DuplicateHash",
+ )
+ }
+ }
+
+ if self.wait_for_next_signature().await.is_err() {
+ warn!(target: "aleph-aggregator", "the network channel closed");
+ return None;
+ }
+ }
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use std::{
+ fmt::{Debug, Display, Formatter},
+ hash::Hash,
+ };
+
+ use parity_scale_codec::{Decode, Encode};
+
+ use crate::aggregator::{AggregatorError, BlockSignatureAggregator};
+
+ #[derive(Hash, PartialEq, Eq, Clone, Copy, Encode, Decode, Debug)]
+ struct MockHash(pub [u8; 32]);
+
+ impl AsRef<[u8]> for MockHash {
+ fn as_ref(&self) -> &[u8] {
+ &self.0
+ }
+ }
+
+ impl Display for MockHash {
+ fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+ std::fmt::Debug::fmt(&self.0, f)
+ }
+ }
+ type TestMultisignature = usize;
+ const TEST_SIGNATURE: TestMultisignature = 42;
+
+ fn build_aggregator() -> BlockSignatureAggregator {
+ BlockSignatureAggregator::new()
+ }
+
+ fn build_hash(b0: u8) -> MockHash {
+ let mut bytes = [0u8; 32];
+ bytes[0] = b0;
+ MockHash(bytes)
+ }
+
+ #[test]
+ fn returns_with_matching_multisigned_hash() {
+ let mut aggregator = build_aggregator();
+ let res = aggregator.on_start(build_hash(0));
+ assert!(res.is_ok());
+
+ aggregator.on_multisigned_hash(build_hash(0), TEST_SIGNATURE);
+
+ let res = aggregator.try_pop_hash();
+ assert!(res.is_ok());
+ }
+
+ #[test]
+ fn doesnt_return_without_matching_multisigned_hash() {
+ let mut aggregator = build_aggregator();
+ let res = aggregator.on_start(build_hash(0));
+ assert!(res.is_ok());
+
+ aggregator.on_multisigned_hash(build_hash(1), TEST_SIGNATURE);
+
+ let res = aggregator.try_pop_hash();
+ assert_eq!(res, Err(AggregatorError::NoHashFound));
+ }
+}
diff --git a/blockchain/aggregator/src/aggregator.rs b/blockchain/aggregator/src/aggregator.rs
new file mode 100644
index 000000000..649cd425e
--- /dev/null
+++ b/blockchain/aggregator/src/aggregator.rs
@@ -0,0 +1,295 @@
+// بِسْمِ اللَّهِ الرَّحْمَنِ الرَّحِيم
+
+// This file is part of Setheum.
+
+// Copyright (C) 2019-Present Setheum Labs.
+// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
+
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+
+// You should have received a copy of the GNU General Public License
+// along with this program. If not, see .
+
+use std::{
+ collections::{HashMap, HashSet, VecDeque},
+ fmt::Debug,
+ time::Instant,
+};
+
+use aleph_bft_rmc::{DoublingDelayScheduler, MultiKeychain, Multisigned, Service as RmcService};
+use aleph_bft_types::Recipient;
+use log::{debug, info, trace, warn};
+
+use crate::{Hash, ProtocolSink, RmcNetworkData, SignableHash};
+
+#[derive(Debug, PartialEq, Eq)]
+pub enum AggregatorError {
+ NoHashFound,
+ DuplicateHash,
+}
+
+pub enum IOError {
+ NetworkChannelClosed,
+}
+
+pub type AggregatorResult = Result;
+pub type IOResult = Result<(), IOError>;
+type Rmc =
+ RmcService, MK, DoublingDelayScheduler>>;
+
+/// A wrapper around an `rmc::Multicast` returning the signed hashes in the order of the [`Multicast::start_multicast`] calls.
+pub struct BlockSignatureAggregator {
+ signatures: HashMap,
+ hash_queue: VecDeque,
+ started_hashes: HashSet,
+ last_change: Instant,
+}
+
+impl Default for BlockSignatureAggregator {
+ fn default() -> Self {
+ Self::new()
+ }
+}
+
+impl BlockSignatureAggregator {
+ pub fn new() -> Self {
+ BlockSignatureAggregator {
+ signatures: HashMap::new(),
+ hash_queue: VecDeque::new(),
+ started_hashes: HashSet::new(),
+ last_change: Instant::now(),
+ }
+ }
+
+ fn on_start(&mut self, hash: H) -> AggregatorResult<()> {
+ if !self.started_hashes.insert(hash) {
+ return Err(AggregatorError::DuplicateHash);
+ }
+ if self.hash_queue.is_empty() {
+ self.last_change = Instant::now();
+ }
+ self.hash_queue.push_back(hash);
+
+ Ok(())
+ }
+
+ fn on_multisigned_hash(&mut self, hash: H, signature: PMS) {
+ debug!(target: "aleph-aggregator", "New multisigned_hash {:?}.", hash);
+ self.signatures.insert(hash, signature);
+ }
+
+ fn try_pop_hash(&mut self) -> AggregatorResult<(H, PMS)> {
+ match self.hash_queue.pop_front() {
+ Some(hash) => {
+ if let Some(multisignature) = self.signatures.remove(&hash) {
+ self.last_change = Instant::now();
+ Ok((hash, multisignature))
+ } else {
+ self.hash_queue.push_front(hash);
+ Err(AggregatorError::NoHashFound)
+ }
+ }
+ None => Err(AggregatorError::NoHashFound),
+ }
+ }
+
+ pub fn status_report(&self) {
+ let mut status = String::from("Block Signature Aggregator status report: ");
+
+ status.push_str(&format!(
+ "started hashes - {:?}; ",
+ self.started_hashes.len()
+ ));
+
+ status.push_str(&format!(
+ "collected signatures - {:?}; ",
+ self.signatures.len()
+ ));
+
+ status.push_str(&format!("hashes in queue - {:?}; ", self.hash_queue.len()));
+
+ if let Some(hash) = self.hash_queue.front() {
+ status.push_str(&format!(
+ "front of hash queue - {} for - {:.2} s; ",
+ hash,
+ Instant::now()
+ .saturating_duration_since(self.last_change)
+ .as_secs_f64()
+ ));
+ }
+
+ info!(target: "aleph-aggregator", "{}", status);
+ }
+}
+
+pub struct IO<
+ H: Hash + Copy,
+ N: ProtocolSink>,
+ MK: MultiKeychain,
+> {
+ network: N,
+ rmc_service: Rmc,
+ aggregator: BlockSignatureAggregator,
+ multisigned_events: VecDeque, MK>>,
+}
+
+impl<
+ H: Copy + Hash,
+ N: ProtocolSink>,
+ MK: MultiKeychain,
+ > IO
+{
+ pub fn new(
+ network: N,
+ rmc_service: Rmc,
+ aggregator: BlockSignatureAggregator,
+ ) -> Self {
+ IO {
+ network,
+ rmc_service,
+ aggregator,
+ multisigned_events: VecDeque::new(),
+ }
+ }
+
+ pub fn status_report(&self) {
+ self.aggregator.status_report()
+ }
+
+ pub async fn start_aggregation(&mut self, hash: H) {
+ debug!(target: "aleph-aggregator", "Started aggregation for block hash {:?}", hash);
+ if let Err(AggregatorError::DuplicateHash) = self.aggregator.on_start(hash) {
+ debug!(target: "aleph-aggregator", "Aggregation already started for block hash {:?}, ignoring.", hash);
+ return;
+ }
+ if let Some(multisigned) = self.rmc_service.start_rmc(SignableHash::new(hash)) {
+ self.multisigned_events.push_back(multisigned);
+ }
+ }
+
+ async fn wait_for_next_signature(&mut self) -> IOResult {
+ loop {
+ if let Some(multisigned) = self.multisigned_events.pop_front() {
+ let unchecked = multisigned.into_unchecked();
+ let signature = unchecked.signature();
+ self.aggregator
+ .on_multisigned_hash(unchecked.into_signable().get_hash(), signature);
+ return Ok(());
+ }
+ tokio::select! {
+ message_from_rmc = self.rmc_service.next_message() => {
+ trace!(target: "aleph-aggregator", "Our rmc message {:?}.", message_from_rmc);
+ if let Err(e) = self.network.send(message_from_rmc, Recipient::Everyone) {
+ warn!(target: "aleph-aggregator", "failed broadcasting a message from rmc: {:?}", e);
+ }
+ }
+ message_from_network = self.network.next() => match message_from_network {
+ Some(message) => {
+ trace!(target: "aleph-aggregator", "Received message for rmc: {:?}", message);
+ if let Some(multisigned) = self.rmc_service.process_message(message) {
+ self.multisigned_events.push_back(multisigned);
+ }
+ },
+ None => {
+ // In case the network is down we can terminate (?).
+ return Err(IOError::NetworkChannelClosed);
+ }
+ }
+ }
+ }
+ }
+
+ pub async fn next_multisigned_hash(&mut self) -> Option<(H, MK::PartialMultisignature)> {
+ loop {
+ trace!(target: "aleph-aggregator", "Entering next_multisigned_hash loop.");
+ match self.aggregator.try_pop_hash() {
+ Ok(res) => {
+ return Some(res);
+ }
+ Err(AggregatorError::NoHashFound) => { /* ignored */ }
+ Err(AggregatorError::DuplicateHash) => {
+ warn!(
+ target: "aleph-aggregator",
+ "Unexpected aggregator exception in IO: DuplicateHash",
+ )
+ }
+ }
+
+ if self.wait_for_next_signature().await.is_err() {
+ warn!(target: "aleph-aggregator", "the network channel closed");
+ return None;
+ }
+ }
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use std::{
+ fmt::{Debug, Display, Formatter},
+ hash::Hash,
+ };
+
+ use parity_scale_codec::{Decode, Encode};
+
+ use crate::aggregator::{AggregatorError, BlockSignatureAggregator};
+
+ #[derive(Hash, PartialEq, Eq, Clone, Copy, Encode, Decode, Debug)]
+ struct MockHash(pub [u8; 32]);
+
+ impl AsRef<[u8]> for MockHash {
+ fn as_ref(&self) -> &[u8] {
+ &self.0
+ }
+ }
+
+ impl Display for MockHash {
+ fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+ std::fmt::Debug::fmt(&self.0, f)
+ }
+ }
+ type TestMultisignature = usize;
+ const TEST_SIGNATURE: TestMultisignature = 42;
+
+ fn build_aggregator() -> BlockSignatureAggregator {
+ BlockSignatureAggregator::new()
+ }
+
+ fn build_hash(b0: u8) -> MockHash {
+ let mut bytes = [0u8; 32];
+ bytes[0] = b0;
+ MockHash(bytes)
+ }
+
+ #[test]
+ fn returns_with_matching_multisigned_hash() {
+ let mut aggregator = build_aggregator();
+ let res = aggregator.on_start(build_hash(0));
+ assert!(res.is_ok());
+
+ aggregator.on_multisigned_hash(build_hash(0), TEST_SIGNATURE);
+
+ let res = aggregator.try_pop_hash();
+ assert!(res.is_ok());
+ }
+
+ #[test]
+ fn doesnt_return_without_matching_multisigned_hash() {
+ let mut aggregator = build_aggregator();
+ let res = aggregator.on_start(build_hash(0));
+ assert!(res.is_ok());
+
+ aggregator.on_multisigned_hash(build_hash(1), TEST_SIGNATURE);
+
+ let res = aggregator.try_pop_hash();
+ assert_eq!(res, Err(AggregatorError::NoHashFound));
+ }
+}
diff --git a/blockchain/aggregator/src/lib.rs b/blockchain/aggregator/src/lib.rs
new file mode 100644
index 000000000..344f7143f
--- /dev/null
+++ b/blockchain/aggregator/src/lib.rs
@@ -0,0 +1,76 @@
+// بِسْمِ اللَّهِ الرَّحْمَنِ الرَّحِيم
+
+// This file is part of Setheum.
+
+// Copyright (C) 2019-Present Setheum Labs.
+// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
+
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+
+// You should have received a copy of the GNU General Public License
+// along with this program. If not, see .
+
+//! This crate provides an AlephBFT Block Signature Aggregator
+//! Synchronize with [Aleph Aggregator Clique](https://github.com/Cardinal-Cryptography/aleph-node/tree/main/aggregator)
+
+use std::{
+ fmt::{Debug, Display},
+ hash::Hash as StdHash,
+};
+
+use aleph_bft_rmc::{Message as RmcMessage, Signable};
+use aleph_bft_types::Recipient;
+use parity_scale_codec::{Codec, Decode, Encode};
+
+mod aggregator;
+
+pub use crate::aggregator::{BlockSignatureAggregator, IO};
+
+pub type RmcNetworkData = RmcMessage, S, SS>;
+
+/// A convenience trait for gathering all of the desired hash characteristics.
+pub trait Hash: AsRef<[u8]> + StdHash + Eq + Clone + Codec + Debug + Display + Send + Sync {}
+
+impl + StdHash + Eq + Clone + Codec + Debug + Display + Send + Sync> Hash for T {}
+
+/// A wrapper allowing block hashes to be signed.
+#[derive(PartialEq, Eq, StdHash, Clone, Debug, Default, Encode, Decode)]
+pub struct SignableHash {
+ hash: H,
+}
+
+impl SignableHash {
+ pub fn new(hash: H) -> Self {
+ Self { hash }
+ }
+
+ pub fn get_hash(&self) -> H {
+ self.hash.clone()
+ }
+}
+
+impl Signable for SignableHash {
+ type Hash = H;
+ fn hash(&self) -> Self::Hash {
+ self.hash.clone()
+ }
+}
+
+#[derive(Debug)]
+pub enum NetworkError {
+ SendFail,
+}
+
+#[async_trait::async_trait]
+pub trait ProtocolSink: Send + Sync {
+ async fn next(&mut self) -> Option;
+ fn send(&self, data: D, recipient: Recipient) -> Result<(), NetworkError>;
+}
diff --git a/blockchain/clique/Cargo.toml b/blockchain/clique/Cargo.toml
new file mode 100644
index 000000000..a067375db
--- /dev/null
+++ b/blockchain/clique/Cargo.toml
@@ -0,0 +1,38 @@
+[package]
+name = "network-clique"
+version = "0.6.0"
+authors.workspace = true
+edition.workspace = true
+homepage.workspace = true
+repository.workspace = true
+
+[dependencies]
+rate-limiter = { workspace = true }
+
+async-trait = { workspace = true }
+bytes = { workspace = true }
+parity-scale-codec = { workspace = true, features = ["std", "derive"] }
+derive_more = { workspace = true }
+env_logger = { workspace = true }
+futures = { workspace = true }
+futures-timer = { workspace = true }
+hash-db = { workspace = true }
+ip_network = { workspace = true }
+log = { workspace = true }
+lru = { workspace = true }
+rand = { workspace = true }
+serde = { workspace = true }
+substrate-prometheus-endpoint = { workspace = true }
+tiny-bip39 = { workspace = true }
+tokio = { workspace = true, features = [
+ "sync",
+ "macros",
+ "time",
+ "rt-multi-thread",
+ "io-util",
+ "net",
+] }
+
+[dev-dependencies]
+aleph-bft-types = { workspace = true }
+aleph-bft-mock = { workspace = true }
diff --git a/blockchain/clique/README.md b/blockchain/clique/README.md
new file mode 100644
index 000000000..8bdb49c15
--- /dev/null
+++ b/blockchain/clique/README.md
@@ -0,0 +1,5 @@
+# Network Clique
+
+A library to establish network connection between every pair of entities.
+
+Synchronize with [Aleph Network Clique](https://github.com/Cardinal-Cryptography/aleph-node/tree/main/clique)
diff --git a/blockchain/rpc/TODO.md b/blockchain/clique/TODO.md
similarity index 93%
rename from blockchain/rpc/TODO.md
rename to blockchain/clique/TODO.md
index 96e340c79..c8c94c137 100644
--- a/blockchain/rpc/TODO.md
+++ b/blockchain/clique/TODO.md
@@ -8,7 +8,7 @@ This list contains all TODOs in the Repo
- [1. Introduction](#1-guidelines)
- [2. Contribution](#2-contribution)
- [3. Lists](#3-lists)
- - [4. Tasks](#3-tasks)
+ - [4. Tasks](#4-tasks)
@@ -46,7 +46,7 @@ You can contribute to this list by completing tasks or by adding tasks(TODOs) th
## 3. Lists
-Each module/crate has its own `TODO.md`.
+Each package/module/directory has its own `TODO.md`.
## 4. Tasks
diff --git a/blockchain/clique/src/crypto.rs b/blockchain/clique/src/crypto.rs
new file mode 100644
index 000000000..8d690f4e8
--- /dev/null
+++ b/blockchain/clique/src/crypto.rs
@@ -0,0 +1,45 @@
+// بِسْمِ اللَّهِ الرَّحْمَنِ الرَّحِيم
+
+// This file is part of Setheum.
+
+// Copyright (C) 2019-Present Setheum Labs.
+// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
+
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+
+// You should have received a copy of the GNU General Public License
+// along with this program. If not, see .
+
+use std::{fmt::Display, hash::Hash};
+
+use parity_scale_codec::Codec;
+
+/// A public key for signature verification.
+pub trait PublicKey:
+ Send + Sync + Eq + Clone + AsRef<[u8]> + Display + Hash + Codec + 'static
+{
+ type Signature: Send + Sync + Clone + Codec;
+
+ /// Verify whether the message has been signed with the associated private key.
+ fn verify(&self, message: &[u8], signature: &Self::Signature) -> bool;
+}
+
+/// Secret key for signing messages, with an associated public key.
+pub trait SecretKey: Clone + Send + Sync + 'static {
+ type Signature: Send + Sync + Clone + Codec;
+ type PublicKey: PublicKey;
+
+ /// Produce a signature for the provided message.
+ fn sign(&self, message: &[u8]) -> Self::Signature;
+
+ /// Return the associated public key.
+ fn public_key(&self) -> Self::PublicKey;
+}
diff --git a/blockchain/clique/src/incoming.rs b/blockchain/clique/src/incoming.rs
new file mode 100644
index 000000000..44473ea4b
--- /dev/null
+++ b/blockchain/clique/src/incoming.rs
@@ -0,0 +1,114 @@
+// بِسْمِ اللَّهِ الرَّحْمَنِ الرَّحِيم
+
+// This file is part of Setheum.
+
+// Copyright (C) 2019-Present Setheum Labs.
+// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
+
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+
+// You should have received a copy of the GNU General Public License
+// along with this program. If not, see .
+
+use std::fmt::{Display, Error as FmtError, Formatter};
+
+use futures::channel::{mpsc, oneshot};
+use log::{debug, info};
+
+use crate::{
+ metrics::Metrics,
+ protocols::{protocol, ProtocolError, ProtocolNegotiationError, ResultForService},
+ Data, PublicKey, SecretKey, Splittable, LOG_TARGET,
+};
+
+enum IncomingError {
+ ProtocolNegotiationError(ProtocolNegotiationError),
+ ProtocolError(ProtocolError),
+}
+
+impl Display for IncomingError {
+ fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), FmtError> {
+ use IncomingError::*;
+ match self {
+ ProtocolNegotiationError(e) => write!(f, "protocol negotiation error: {e}"),
+ ProtocolError(e) => write!(f, "protocol error: {e}"),
+ }
+ }
+}
+
+impl From for IncomingError {
+ fn from(e: ProtocolNegotiationError) -> Self {
+ IncomingError::ProtocolNegotiationError(e)
+ }
+}
+
+impl From> for IncomingError {
+ fn from(e: ProtocolError) -> Self {
+ IncomingError::ProtocolError(e)
+ }
+}
+
+async fn manage_incoming(
+ secret_key: SK,
+ stream: S,
+ result_for_parent: mpsc::UnboundedSender>,
+ data_for_user: mpsc::UnboundedSender,
+ authorization_requests_sender: mpsc::UnboundedSender<(SK::PublicKey, oneshot::Sender)>,
+ metrics: Metrics,
+) -> Result<(), IncomingError> {
+ debug!(
+ target: LOG_TARGET,
+ "Performing incoming protocol negotiation."
+ );
+ let (stream, protocol) = protocol(stream).await?;
+ debug!(target: LOG_TARGET, "Negotiated protocol, running.");
+ Ok(protocol
+ .manage_incoming(
+ stream,
+ secret_key,
+ result_for_parent,
+ data_for_user,
+ authorization_requests_sender,
+ metrics,
+ )
+ .await?)
+}
+
+/// Manage an incoming connection. After the handshake it will send the recognized PublicKey to
+/// the parent, together with an exit channel for this process. When this channel is dropped the
+/// process ends. Whenever data arrives on this connection it will be passed to the user. Any
+/// failures in receiving data result in the process stopping, we assume the other side will
+/// reestablish it if necessary.
+pub async fn incoming(
+ secret_key: SK,
+ stream: S,
+ result_for_parent: mpsc::UnboundedSender>,
+ data_for_user: mpsc::UnboundedSender,
+ authorization_requests_sender: mpsc::UnboundedSender<(SK::PublicKey, oneshot::Sender)>,
+ metrics: Metrics,
+) {
+ let addr = stream.peer_address_info();
+ if let Err(e) = manage_incoming(
+ secret_key,
+ stream,
+ result_for_parent,
+ data_for_user,
+ authorization_requests_sender,
+ metrics,
+ )
+ .await
+ {
+ info!(
+ target: LOG_TARGET,
+ "Incoming connection from {} failed: {}.", addr, e
+ );
+ }
+}
diff --git a/blockchain/clique/src/io.rs b/blockchain/clique/src/io.rs
new file mode 100644
index 000000000..e636ca762
--- /dev/null
+++ b/blockchain/clique/src/io.rs
@@ -0,0 +1,232 @@
+// بِسْمِ اللَّهِ الرَّحْمَنِ الرَّحِيم
+
+// This file is part of Setheum.
+
+// Copyright (C) 2019-Present Setheum Labs.
+// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
+
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+
+// You should have received a copy of the GNU General Public License
+// along with this program. If not, see .
+
+use std::{
+ fmt::{Display, Error as FmtError, Formatter},
+ io::Error as IoError,
+};
+
+use parity_scale_codec::DecodeAll;
+use tokio::io::{AsyncReadExt, AsyncWriteExt};
+
+use crate::Data;
+
+// We allow sending up to 16MiB, that should be enough forever.
+pub const MAX_DATA_SIZE: u32 = 16 * 1024 * 1024;
+
+/// A general error when sending or receving data.
+#[derive(Debug)]
+pub enum Error {
+ ConnectionClosed(IoError),
+ DataTooLong(u32),
+}
+
+impl Display for Error {
+ fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), FmtError> {
+ use Error::*;
+ match self {
+ ConnectionClosed(e) => write!(f, "connection unexpectedly closed: {e}"),
+ DataTooLong(length) => write!(
+ f,
+ "encoded data too long - {length} bytes, the limit is {MAX_DATA_SIZE}"
+ ),
+ }
+ }
+}
+
+/// An error when sending data.
+#[derive(Debug)]
+pub struct SendError(Error);
+
+impl Display for SendError {
+ fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), FmtError> {
+ write!(f, "{}", self.0)
+ }
+}
+
+impl From for SendError {
+ fn from(e: Error) -> Self {
+ SendError(e)
+ }
+}
+
+/// An error when receiving data.
+#[derive(Debug)]
+pub enum ReceiveError {
+ Error(Error),
+ DataCorrupted,
+}
+
+impl Display for ReceiveError {
+ fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), FmtError> {
+ use ReceiveError::*;
+ match self {
+ Error(e) => write!(f, "{e}"),
+ DataCorrupted => write!(f, "received corrupted data"),
+ }
+ }
+}
+
+impl From for ReceiveError {
+ fn from(e: Error) -> Self {
+ ReceiveError::Error(e)
+ }
+}
+
+/// Sends some data using the stream.
+pub async fn send_data(
+ mut stream: S,
+ data: D,
+) -> Result {
+ let encoded = data.encode();
+ let len = u32::try_from(encoded.len()).map_err(|_| Error::DataTooLong(u32::MAX))?;
+ if len > MAX_DATA_SIZE {
+ return Err(Error::DataTooLong(len).into());
+ }
+ let encoded_len = len.to_le_bytes().to_vec();
+ stream
+ .write_all(&encoded_len)
+ .await
+ .map_err(Error::ConnectionClosed)?;
+ stream
+ .write_all(&encoded)
+ .await
+ .map_err(Error::ConnectionClosed)?;
+ Ok(stream)
+}
+
+/// Attempts to receive some data using the stream.
+pub async fn receive_data(
+ mut stream: S,
+) -> Result<(S, D), ReceiveError> {
+ let mut buf = [0; 4];
+ stream
+ .read_exact(&mut buf[..])
+ .await
+ .map_err(Error::ConnectionClosed)?;
+ let len = u32::from_le_bytes(buf);
+ if len > MAX_DATA_SIZE {
+ return Err(Error::DataTooLong(len).into());
+ }
+ let mut buf: Vec = vec![0; len as usize];
+ stream
+ .read_exact(&mut buf[..])
+ .await
+ .map_err(Error::ConnectionClosed)?;
+ let data = D::decode_all(&mut &buf[..]).map_err(|_| ReceiveError::DataCorrupted)?;
+ Ok((stream, data))
+}
+
+#[cfg(test)]
+mod tests {
+ use tokio::io::{duplex, AsyncWriteExt};
+
+ use super::{receive_data, send_data, Error, ReceiveError, SendError, MAX_DATA_SIZE};
+
+ #[tokio::test]
+ async fn sends_and_receives_correct_data() {
+ let (sender, receiver) = duplex(4096);
+ let data: Vec = vec![4, 3, 43];
+ let _sender = send_data(sender, data.clone())
+ .await
+ .expect("data should send");
+ let (_receiver, received_data) = receive_data(receiver).await.expect("should receive data");
+ let received_data: Vec = received_data;
+ assert_eq!(data, received_data);
+ }
+
+ #[tokio::test]
+ async fn fails_to_receive_from_dropped_connection() {
+ let (_, receiver) = duplex(4096);
+ match receive_data::<_, i32>(receiver).await {
+ Err(e) => match e {
+ ReceiveError::Error(Error::ConnectionClosed(_)) => (),
+ e => panic!("unexpected error: {e}"),
+ },
+ _ => panic!("received data from a dropped stream!"),
+ }
+ }
+
+ #[tokio::test]
+ async fn fails_to_send_to_dropped_connection() {
+ let (sender, _) = duplex(4096);
+ let data: Vec = vec![4, 3, 43];
+ match send_data(sender, data.clone()).await {
+ Err(e) => match e {
+ SendError(Error::ConnectionClosed(_)) => (),
+ e => panic!("unexpected error: {e}"),
+ },
+ _ => panic!("send data to a dropped stream!"),
+ }
+ }
+
+ #[tokio::test]
+ async fn fails_to_send_too_big_message() {
+ let (sender, _) = duplex(4096);
+ let data: Vec = vec![
+ 43;
+ (MAX_DATA_SIZE + 1)
+ .try_into()
+ .expect("why are you running tests on a 16 bit machine? o.0")
+ ];
+ match send_data(sender, data.clone()).await {
+ Err(e) => match e {
+ SendError(Error::DataTooLong(_)) => (),
+ e => panic!("unexpected error: {e}"),
+ },
+ _ => panic!("send data to a dropped stream!"),
+ }
+ }
+
+ #[tokio::test]
+ async fn fails_to_receive_too_much_data() {
+ let (mut sender, receiver) = duplex(4096);
+ let too_long = MAX_DATA_SIZE + 43;
+ let payload = too_long.to_le_bytes().to_vec();
+ sender
+ .write_all(&payload)
+ .await
+ .expect("sending should work");
+ match receive_data::<_, i32>(receiver).await {
+ Err(e) => match e {
+ ReceiveError::Error(Error::DataTooLong(long)) => assert_eq!(long, too_long),
+ e => panic!("unexpected error: {e}"),
+ },
+ _ => panic!("received too long data!"),
+ }
+ }
+
+ #[tokio::test]
+ async fn fails_to_decode_empty_data() {
+ let (mut sender, receiver) = duplex(4096);
+ let payload = 0u32.to_le_bytes().to_vec();
+ sender
+ .write_all(&payload)
+ .await
+ .expect("sending should work");
+ match receive_data::<_, i32>(receiver).await {
+ Err(e) => match e {
+ ReceiveError::DataCorrupted => (),
+ e => panic!("unexpected error: {e}"),
+ },
+ _ => panic!("decoded no data into something?!"),
+ }
+ }
+}
diff --git a/blockchain/clique/src/lib.rs b/blockchain/clique/src/lib.rs
new file mode 100644
index 000000000..770e13197
--- /dev/null
+++ b/blockchain/clique/src/lib.rs
@@ -0,0 +1,267 @@
+// بِسْمِ اللَّهِ الرَّحْمَنِ الرَّحِيم
+
+// This file is part of Setheum.
+
+// Copyright (C) 2019-Present Setheum Labs.
+// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
+
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+
+// You should have received a copy of the GNU General Public License
+// along with this program. If not, see .
+
+//! A network for maintaining direct connections between all nodes.
+
+use std::{
+ fmt::{Debug, Display},
+ hash::Hash,
+ pin::Pin,
+};
+
+use parity_scale_codec::Codec;
+use tokio::io::{AsyncRead, AsyncWrite};
+
+mod crypto;
+mod incoming;
+mod io;
+mod manager;
+pub mod metrics;
+pub mod mock;
+mod outgoing;
+mod protocols;
+mod rate_limiting;
+mod service;
+#[cfg(test)]
+mod testing;
+
+pub use crypto::{PublicKey, SecretKey};
+pub use rate_limiting::{RateLimitingDialer, RateLimitingListener};
+pub use service::{Service, SpawnHandleT};
+
+const LOG_TARGET: &str = "network-clique";
+/// A basic alias for properties we expect basic data to satisfy.
+pub trait Data: Clone + Codec + Send + Sync + 'static {}
+
+impl Data for D {}
+
+/// Represents the id of an arbitrary node.
+pub trait PeerId: PartialEq + Eq + Clone + Debug + Display + Hash + Codec + Send {
+ /// This function is used for logging. It implements a shorter version of `to_string` for ids implementing display.
+ fn to_short_string(&self) -> String {
+ let id = format!("{self}");
+ if id.len() <= 12 {
+ return id;
+ }
+
+ let prefix: String = id.chars().take(4).collect();
+
+ let suffix: String = id.chars().skip(id.len().saturating_sub(8)).collect();
+
+ format!("{}…{}", &prefix, &suffix)
+ }
+}
+
+/// Represents the address of an arbitrary node.
+pub trait AddressingInformation: Debug + Hash + Codec + Clone + Eq + Send + Sync + 'static {
+ type PeerId: PeerId;
+
+ /// Returns the peer id associated with this address.
+ fn peer_id(&self) -> Self::PeerId;
+
+ /// Verify the information.
+ fn verify(&self) -> bool;
+
+ // Address, *only* for debugging purposes.
+ fn address(&self) -> String;
+}
+
+/// Abstraction for requesting own network addressing information.
+pub trait NetworkIdentity {
+ type PeerId: PeerId;
+ type AddressingInformation: AddressingInformation;
+
+ /// The external identity of this node.
+ fn identity(&self) -> Self::AddressingInformation;
+}
+
+/// Network represents an interface for opening and closing connections with other nodes,
+/// and sending direct messages between them.
+///
+/// Note on Network reliability and security: it is neither assumed that the sent messages must be
+/// always delivered, nor the established connections must be secure in any way. The Network
+/// implementation might fail to deliver any specific message, so messages have to be resent while
+/// they still should be delivered.
+#[async_trait::async_trait]
+pub trait Network: Send + 'static {
+ /// Add the peer to the set of connected peers.
+ fn add_connection(&mut self, peer: PK, address: A);
+
+ /// Remove the peer from the set of connected peers and close the connection.
+ fn remove_connection(&mut self, peer: PK);
+
+ /// Send a message to a single peer.
+ /// This function should be implemented in a non-blocking manner.
+ fn send(&self, data: D, recipient: PK);
+
+ /// Receive a message from the network.
+ /// This method's implementation must be cancellation safe.
+ async fn next(&mut self) -> Option;
+}
+
+pub type PeerAddressInfo = String;
+
+/// Reports address of the peer that we are connected to.
+pub trait ConnectionInfo {
+ /// Return the address of the peer that we are connected to.
+ fn peer_address_info(&self) -> PeerAddressInfo;
+}
+
+/// A stream that can be split into a sending and receiving part.
+pub trait Splittable: AsyncWrite + AsyncRead + ConnectionInfo + Unpin + Send {
+ type Sender: AsyncWrite + ConnectionInfo + Unpin + Send;
+ type Receiver: AsyncRead + ConnectionInfo + Unpin + Send;
+
+ /// Split into the sending and receiving part.
+ fn split(self) -> (Self::Sender, Self::Receiver);
+}
+
+/// Can use addresses to connect to a peer.
+#[async_trait::async_trait]
+pub trait Dialer: Clone + Send + 'static {
+ type Connection: Splittable;
+ type Error: Display + Send;
+
+ /// Attempt to connect to a peer using the provided addressing information.
+ async fn connect(&mut self, address: A) -> Result;
+}
+
+use log::info;
+use tokio::net::{
+ tcp::{OwnedReadHalf, OwnedWriteHalf},
+ TcpListener, TcpStream,
+};
+
+/// Accepts new connections. Usually will be created listening on a specific interface and this is
+/// just the result.
+#[async_trait::async_trait]
+pub trait Listener {
+ type Connection: Splittable + 'static;
+ type Error: Display;
+
+ /// Returns the next incoming connection.
+ async fn accept(&mut self) -> Result;
+}
+
+impl ConnectionInfo for TcpStream {
+ fn peer_address_info(&self) -> String {
+ match self.peer_addr() {
+ Ok(addr) => addr.to_string(),
+ Err(e) => format!("unknown address: {e}"),
+ }
+ }
+}
+
+impl ConnectionInfo for OwnedWriteHalf {
+ fn peer_address_info(&self) -> String {
+ match self.peer_addr() {
+ Ok(addr) => addr.to_string(),
+ Err(e) => e.to_string(),
+ }
+ }
+}
+
+impl ConnectionInfo for OwnedReadHalf {
+ fn peer_address_info(&self) -> String {
+ match self.peer_addr() {
+ Ok(addr) => addr.to_string(),
+ Err(e) => e.to_string(),
+ }
+ }
+}
+
+impl Splittable for TcpStream {
+ type Sender = OwnedWriteHalf;
+ type Receiver = OwnedReadHalf;
+
+ fn split(self) -> (Self::Sender, Self::Receiver) {
+ let (receiver, sender) = self.into_split();
+ (sender, receiver)
+ }
+}
+
+#[async_trait::async_trait]
+impl Listener for TcpListener {
+ type Connection = TcpStream;
+ type Error = std::io::Error;
+
+ async fn accept(&mut self) -> Result {
+ let stream = TcpListener::accept(self).await.map(|(stream, _)| stream)?;
+ if stream.set_linger(None).is_err() {
+ info!(target: LOG_TARGET, "stream.set_linger(None) failed.");
+ };
+ Ok(stream)
+ }
+}
+
+pub struct Splitted(I, O);
+
+impl AsyncRead for Splitted {
+ fn poll_read(
+ mut self: Pin<&mut Self>,
+ cx: &mut std::task::Context<'_>,
+ buf: &mut tokio::io::ReadBuf<'_>,
+ ) -> std::task::Poll> {
+ Pin::new(&mut self.0).poll_read(cx, buf)
+ }
+}
+
+impl AsyncWrite for Splitted {
+ fn poll_write(
+ mut self: Pin<&mut Self>,
+ cx: &mut std::task::Context<'_>,
+ buf: &[u8],
+ ) -> std::task::Poll> {
+ Pin::new(&mut self.1).poll_write(cx, buf)
+ }
+
+ fn poll_flush(
+ mut self: Pin<&mut Self>,
+ cx: &mut std::task::Context<'_>,
+ ) -> std::task::Poll> {
+ Pin::new(&mut self.1).poll_flush(cx)
+ }
+
+ fn poll_shutdown(
+ mut self: Pin<&mut Self>,
+ cx: &mut std::task::Context<'_>,
+ ) -> std::task::Poll> {
+ Pin::new(&mut self.1).poll_shutdown(cx)
+ }
+}
+
+impl ConnectionInfo for Splitted {
+ fn peer_address_info(&self) -> PeerAddressInfo {
+ self.1.peer_address_info()
+ }
+}
+
+impl<
+ I: AsyncRead + ConnectionInfo + Unpin + Send,
+ O: AsyncWrite + ConnectionInfo + Unpin + Send,
+ > Splittable for Splitted
+{
+ type Sender = O;
+ type Receiver = I;
+
+ fn split(self) -> (Self::Sender, Self::Receiver) {
+ (self.1, self.0)
+ }
+}
diff --git a/blockchain/clique/src/manager/direction.rs b/blockchain/clique/src/manager/direction.rs
new file mode 100644
index 000000000..045297676
--- /dev/null
+++ b/blockchain/clique/src/manager/direction.rs
@@ -0,0 +1,239 @@
+// بِسْمِ اللَّهِ الرَّحْمَنِ الرَّحِيم
+
+// This file is part of Setheum.
+
+// Copyright (C) 2019-Present Setheum Labs.
+// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
+
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+
+// You should have received a copy of the GNU General Public License
+// along with this program. If not, see .
+
+use std::{
+ collections::{HashMap, HashSet},
+ ops::BitXor,
+};
+
+use crate::{
+ metrics::{Event, Metrics},
+ Data, PublicKey,
+};
+
+/// Data about peers we know and whether we should connect to them or they to us. For the former
+/// case also keeps the peers' addresses.
+pub struct DirectedPeers {
+ own_id: PK,
+ outgoing: HashMap,
+ incoming: HashSet,
+ metrics: Metrics,
+}
+
+/// Whether we should call the remote or the other way around. We xor the peer ids and based on the
+/// parity of the sum of bits of the result decide whether the caller should be the smaller or
+/// greated lexicographically. They are never equal, because cryptography.
+fn should_we_call(own_id: &[u8], remote_id: &[u8]) -> bool {
+ let xor_sum_parity = (own_id.iter().fold(0u8, BitXor::bitxor)
+ ^ remote_id.iter().fold(0u8, BitXor::bitxor))
+ .count_ones()
+ % 2;
+ match xor_sum_parity == 0 {
+ true => own_id < remote_id,
+ false => own_id > remote_id,
+ }
+}
+
+impl DirectedPeers {
+ /// Create a new set of peers directed using our own peer id.
+ pub fn new(own_id: PK, metrics: Metrics) -> Self {
+ DirectedPeers {
+ own_id,
+ outgoing: HashMap::new(),
+ incoming: HashSet::new(),
+ metrics,
+ }
+ }
+
+ /// Add a peer to the list of peers we want to stay connected to, or
+ /// update the address if the peer was already added.
+ /// Returns whether we should start attempts at connecting with the peer, which is the case
+ /// exactly when the peer is one with which we should attempt connections AND it was added for
+ /// the first time.
+ pub fn add_peer(&mut self, peer_id: PK, address: A) -> bool {
+ use Event::*;
+ match should_we_call(self.own_id.as_ref(), peer_id.as_ref()) {
+ true => match self.outgoing.insert(peer_id, address).is_none() {
+ true => {
+ self.metrics.report_event(NewOutgoing);
+ true
+ }
+ false => false,
+ },
+ false => {
+ // We discard the address here, as we will never want to call this peer anyway,
+ // so we don't need it.
+ if self.incoming.insert(peer_id) {
+ self.metrics.report_event(NewIncoming);
+ }
+ false
+ }
+ }
+ }
+
+ /// Return the address of the given peer, or None if we shouldn't attempt connecting with the peer.
+ pub fn peer_address(&self, peer_id: &PK) -> Option {
+ self.outgoing.get(peer_id).cloned()
+ }
+
+ /// Whether we should be maintaining a connection with this peer.
+ pub fn interested(&self, peer_id: &PK) -> bool {
+ self.incoming.contains(peer_id) || self.outgoing.contains_key(peer_id)
+ }
+
+ /// Iterator over the peers we want connections from.
+ pub fn incoming_peers(&self) -> impl Iterator- {
+ self.incoming.iter()
+ }
+
+ /// Iterator over the peers we want to connect to.
+ pub fn outgoing_peers(&self) -> impl Iterator
- {
+ self.outgoing.keys()
+ }
+
+ /// Remove a peer from the list of peers that we want to stay connected with, whether the
+ /// connection was supposed to be incoming or outgoing.
+ pub fn remove_peer(&mut self, peer_id: &PK) {
+ use Event::*;
+ if self.incoming.remove(peer_id) {
+ self.metrics.report_event(DelIncoming);
+ }
+ if self.outgoing.remove(peer_id).is_some() {
+ self.metrics.report_event(DelOutgoing);
+ }
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::DirectedPeers;
+ use crate::{
+ metrics::Metrics,
+ mock::{key, MockPublicKey},
+ };
+
+ type Address = String;
+
+ fn container_with_id() -> (DirectedPeers, MockPublicKey) {
+ let (id, _) = key();
+ let container = DirectedPeers::new(id.clone(), Metrics::noop());
+ (container, id)
+ }
+
+ fn some_address() -> Address {
+ String::from("43.43.43.43:43000")
+ }
+
+ #[test]
+ fn exactly_one_direction_attempts_connections() {
+ let (mut container0, id0) = container_with_id();
+ let (mut container1, id1) = container_with_id();
+ let address = some_address();
+ assert!(container0.add_peer(id1, address.clone()) != container1.add_peer(id0, address));
+ }
+
+ fn container_with_added_connecting_peer(
+ ) -> (DirectedPeers, MockPublicKey) {
+ let (mut container0, id0) = container_with_id();
+ let (mut container1, id1) = container_with_id();
+ let address = some_address();
+ match container0.add_peer(id1.clone(), address.clone()) {
+ true => (container0, id1),
+ false => {
+ container1.add_peer(id0.clone(), address);
+ (container1, id0)
+ }
+ }
+ }
+
+ fn container_with_added_nonconnecting_peer(
+ ) -> (DirectedPeers, MockPublicKey) {
+ let (mut container0, id0) = container_with_id();
+ let (mut container1, id1) = container_with_id();
+ let address = some_address();
+ match container0.add_peer(id1.clone(), address.clone()) {
+ false => (container0, id1),
+ true => {
+ container1.add_peer(id0.clone(), address);
+ (container1, id0)
+ }
+ }
+ }
+
+ #[test]
+ fn no_connecting_on_subsequent_add() {
+ let (mut container0, id1) = container_with_added_connecting_peer();
+ let address = some_address();
+ assert!(!container0.add_peer(id1, address));
+ }
+
+ #[test]
+ fn peer_address_when_connecting() {
+ let (container0, id1) = container_with_added_connecting_peer();
+ assert!(container0.peer_address(&id1).is_some());
+ }
+
+ #[test]
+ fn no_peer_address_when_nonconnecting() {
+ let (container0, id1) = container_with_added_nonconnecting_peer();
+ assert!(container0.peer_address(&id1).is_none());
+ }
+
+ #[test]
+ fn interested_in_connecting() {
+ let (container0, id1) = container_with_added_connecting_peer();
+ assert!(container0.interested(&id1));
+ }
+
+ #[test]
+ fn interested_in_nonconnecting() {
+ let (container0, id1) = container_with_added_nonconnecting_peer();
+ assert!(container0.interested(&id1));
+ }
+
+ #[test]
+ fn uninterested_in_unknown() {
+ let (container0, _) = container_with_id();
+ let (_, id1) = container_with_id();
+ assert!(!container0.interested(&id1));
+ }
+
+ #[test]
+ fn connecting_are_outgoing() {
+ let (container0, id1) = container_with_added_connecting_peer();
+ assert_eq!(container0.outgoing_peers().collect::>(), vec![&id1]);
+ assert_eq!(container0.incoming_peers().next(), None);
+ }
+
+ #[test]
+ fn nonconnecting_are_incoming() {
+ let (container0, id1) = container_with_added_nonconnecting_peer();
+ assert_eq!(container0.incoming_peers().collect::>(), vec![&id1]);
+ assert_eq!(container0.outgoing_peers().next(), None);
+ }
+
+ #[test]
+ fn uninterested_in_removed() {
+ let (mut container0, id1) = container_with_added_connecting_peer();
+ assert!(container0.interested(&id1));
+ container0.remove_peer(&id1);
+ assert!(!container0.interested(&id1));
+ }
+}
diff --git a/blockchain/clique/src/manager/mod.rs b/blockchain/clique/src/manager/mod.rs
new file mode 100644
index 000000000..a9bf02720
--- /dev/null
+++ b/blockchain/clique/src/manager/mod.rs
@@ -0,0 +1,357 @@
+// بِسْمِ اللَّهِ الرَّحْمَنِ الرَّحِيم
+
+// This file is part of Setheum.
+
+// Copyright (C) 2019-Present Setheum Labs.
+// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
+
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+
+// You should have received a copy of the GNU General Public License
+// along with this program. If not, see .
+
+use std::{
+ collections::{HashMap, HashSet},
+ fmt::{Display, Error as FmtError, Formatter},
+};
+
+use futures::channel::mpsc;
+
+use crate::{metrics::Metrics, Data, PeerId, PublicKey};
+
+mod direction;
+use direction::DirectedPeers;
+
+/// Error during sending data through the Manager
+#[derive(Debug, PartialEq, Eq)]
+pub enum SendError {
+ /// Outgoing network connection closed
+ ConnectionClosed,
+ /// Peer not added to the manager
+ PeerNotFound,
+}
+
+impl Display for SendError {
+ fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), FmtError> {
+ use SendError::*;
+ match self {
+ ConnectionClosed => write!(f, "worker dead"),
+ PeerNotFound => write!(f, "peer not found"),
+ }
+ }
+}
+
+/// Possible results of adding connections.
+#[derive(Debug, PartialEq, Eq)]
+pub enum AddResult {
+ /// We do not want to maintain a connection with this peer.
+ Uninterested,
+ /// Connection added.
+ Added,
+ /// Old connection replaced with new one.
+ Replaced,
+}
+
+pub struct ManagerStatus {
+ outgoing_peers: HashSet,
+ missing_outgoing: HashSet,
+ incoming_peers: HashSet,
+ missing_incoming: HashSet,
+}
+
+impl ManagerStatus {
+ fn new(manager: &Manager) -> Self {
+ let mut incoming_peers = HashSet::new();
+ let mut missing_incoming = HashSet::new();
+ let mut outgoing_peers = HashSet::new();
+ let mut missing_outgoing = HashSet::new();
+
+ for peer in manager.wanted.incoming_peers() {
+ match manager.active_connection(peer) {
+ true => incoming_peers.insert(peer.clone()),
+ false => missing_incoming.insert(peer.clone()),
+ };
+ }
+ for peer in manager.wanted.outgoing_peers() {
+ match manager.active_connection(peer) {
+ true => outgoing_peers.insert(peer.clone()),
+ false => missing_outgoing.insert(peer.clone()),
+ };
+ }
+ ManagerStatus {
+ incoming_peers,
+ missing_incoming,
+ outgoing_peers,
+ missing_outgoing,
+ }
+ }
+
+ fn wanted_incoming(&self) -> usize {
+ self.incoming_peers.len() + self.missing_incoming.len()
+ }
+
+ fn wanted_outgoing(&self) -> usize {
+ self.outgoing_peers.len() + self.missing_outgoing.len()
+ }
+}
+
+fn pretty_peer_id_set(set: &HashSet) -> String {
+ set.iter()
+ .map(|peer_id| peer_id.to_short_string())
+ .collect::>()
+ .join(", ")
+}
+
+impl Display for ManagerStatus {
+ fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), FmtError> {
+ let wanted_incoming = self.wanted_incoming();
+ let wanted_outgoing = self.wanted_outgoing();
+ if wanted_incoming + wanted_outgoing == 0 {
+ return write!(f, "not maintaining any connections; ");
+ }
+
+ match wanted_incoming {
+ 0 => write!(f, "not expecting any incoming connections; ")?,
+ _ => {
+ write!(f, "expecting {wanted_incoming:?} incoming connections; ")?;
+ match self.incoming_peers.is_empty() {
+ // We warn about the lack of incoming connections, because this is relatively
+ // likely to be a common misconfiguration; much less the case for outgoing.
+ true => write!(f, "WARNING! No incoming peers even though we expected them, maybe connecting to us is impossible; ")?,
+ false => write!(
+ f,
+ "have - {:?} [{}]; ",
+ self.incoming_peers.len(),
+ pretty_peer_id_set(&self.incoming_peers),
+ )?,
+ }
+ if !self.missing_incoming.is_empty() {
+ write!(
+ f,
+ "missing - {:?} [{}]; ",
+ self.missing_incoming.len(),
+ pretty_peer_id_set(&self.missing_incoming),
+ )?;
+ }
+ }
+ }
+
+ match wanted_outgoing {
+ 0 => write!(f, "not attempting any outgoing connections; ")?,
+ _ => {
+ write!(f, "attempting {wanted_outgoing:?} outgoing connections; ")?;
+ if !self.outgoing_peers.is_empty() {
+ write!(
+ f,
+ "have - {:?} [{}]; ",
+ self.outgoing_peers.len(),
+ pretty_peer_id_set(&self.outgoing_peers),
+ )?;
+ }
+ if !self.missing_outgoing.is_empty() {
+ write!(
+ f,
+ "missing - {:?} [{}]; ",
+ self.missing_outgoing.len(),
+ pretty_peer_id_set(&self.missing_outgoing),
+ )?;
+ }
+ }
+ }
+
+ Ok(())
+ }
+}
+
+/// Network component responsible for holding the list of peers that we
+/// want to connect to or let them connect to us, and managing the established
+/// connections.
+pub struct Manager {
+ // Which peers we want to be connected with, and which way.
+ wanted: DirectedPeers,
+ // This peers we are connected with. We ensure that this is always a subset of what we want.
+ have: HashMap>,
+}
+
+impl Manager {
+ /// Create a new Manager with empty list of peers.
+ pub fn new(own_id: PK, metrics: Metrics) -> Self {
+ Manager {
+ wanted: DirectedPeers::new(own_id, metrics),
+ have: HashMap::new(),
+ }
+ }
+
+ fn active_connection(&self, peer_id: &PK) -> bool {
+ self.have
+ .get(peer_id)
+ .map(|sender| !sender.is_closed())
+ .unwrap_or(false)
+ }
+
+ /// Add a peer to the list of peers we want to stay connected to, or
+ /// update the address if the peer was already added.
+ /// Returns whether we should start attempts at connecting with the peer, which depends on the
+ /// coorddinated pseudorandom decision on the direction of the connection and whether this was
+ /// added for the first time.
+ pub fn add_peer(&mut self, peer_id: PK, address: A) -> bool {
+ self.wanted.add_peer(peer_id, address)
+ }
+
+ /// Return the address of the given peer, or None if we shouldn't attempt connecting with the peer.
+ pub fn peer_address(&self, peer_id: &PK) -> Option {
+ self.wanted.peer_address(peer_id)
+ }
+
+ /// Add an established connection with a known peer, but only if the peer is among the peers we want to be connected to.
+ pub fn add_connection(
+ &mut self,
+ peer_id: PK,
+ data_for_network: mpsc::UnboundedSender,
+ ) -> AddResult {
+ use AddResult::*;
+ if !self.wanted.interested(&peer_id) {
+ return Uninterested;
+ }
+ match self.have.insert(peer_id, data_for_network) {
+ Some(_) => Replaced,
+ None => Added,
+ }
+ }
+
+ /// Remove a peer from the list of peers that we want to stay connected with.
+ /// Close any incoming and outgoing connections that were established.
+ pub fn remove_peer(&mut self, peer_id: &PK) {
+ self.wanted.remove_peer(peer_id);
+ self.have.remove(peer_id);
+ }
+
+ /// Send data to a peer.
+ /// Returns error if there is no outgoing connection to the peer,
+ /// or if the connection is dead.
+ pub fn send_to(&mut self, peer_id: &PK, data: D) -> Result<(), SendError> {
+ self.have
+ .get(peer_id)
+ .ok_or(SendError::PeerNotFound)?
+ .unbounded_send(data)
+ .map_err(|_| SendError::ConnectionClosed)
+ }
+
+ /// A status of the manager, to be displayed somewhere.
+ pub fn status_report(&self) -> ManagerStatus {
+ ManagerStatus::new(self)
+ }
+
+ pub fn is_authorized(&self, public_key: &PK) -> bool {
+ self.wanted.interested(public_key)
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use futures::{channel::mpsc, StreamExt};
+
+ use super::{AddResult::*, Manager, SendError};
+ use crate::{
+ metrics::Metrics,
+ mock::{key, MockPublicKey},
+ };
+
+ type Data = String;
+ type Address = String;
+
+ #[test]
+ fn add_remove() {
+ let (own_id, _) = key();
+ let mut manager = Manager::::new(own_id, Metrics::noop());
+ let (peer_id, _) = key();
+ let (peer_id_b, _) = key();
+ let address = String::from("43.43.43.43:43000");
+ // add new peer - might return either true or false, depending on the ids
+ let attempting_connections = manager.add_peer(peer_id.clone(), address.clone());
+ // add known peer - always returns false
+ assert!(!manager.add_peer(peer_id.clone(), address.clone()));
+ // get address
+ match attempting_connections {
+ true => assert_eq!(manager.peer_address(&peer_id), Some(address)),
+ false => assert_eq!(manager.peer_address(&peer_id), None),
+ }
+ // try to get address of an unknown peer
+ assert_eq!(manager.peer_address(&peer_id_b), None);
+ // remove peer
+ manager.remove_peer(&peer_id);
+ // try to get address of removed peer
+ assert_eq!(manager.peer_address(&peer_id), None);
+ // remove again
+ manager.remove_peer(&peer_id);
+ // remove unknown peer
+ manager.remove_peer(&peer_id_b);
+ }
+
+ #[tokio::test]
+ async fn send_receive() {
+ let (mut connecting_id, _) = key();
+ let mut connecting_manager =
+ Manager::::new(connecting_id.clone(), Metrics::noop());
+ let (mut listening_id, _) = key();
+ let mut listening_manager =
+ Manager::::new(listening_id.clone(), Metrics::noop());
+ let data = String::from("DATA");
+ let address = String::from("43.43.43.43:43000");
+ let (tx, _rx) = mpsc::unbounded();
+ // try add unknown peer
+ assert_eq!(
+ connecting_manager.add_connection(listening_id.clone(), tx),
+ Uninterested
+ );
+ // sending should fail
+ assert_eq!(
+ connecting_manager.send_to(&listening_id, data.clone()),
+ Err(SendError::PeerNotFound)
+ );
+ // add peer, this time for real
+ if connecting_manager.add_peer(listening_id.clone(), address.clone()) {
+ assert!(!listening_manager.add_peer(connecting_id.clone(), address.clone()))
+ } else {
+ // We need to switch the names around, because the connection was randomly the
+ // other way around.
+ std::mem::swap(&mut connecting_id, &mut listening_id);
+ std::mem::swap(&mut connecting_manager, &mut listening_manager);
+ assert!(connecting_manager.add_peer(listening_id.clone(), address.clone()));
+ }
+ // add outgoing to connecting
+ let (tx, mut rx) = mpsc::unbounded();
+ assert_eq!(
+ connecting_manager.add_connection(listening_id.clone(), tx),
+ Added
+ );
+ // send and receive connecting
+ assert!(connecting_manager
+ .send_to(&listening_id, data.clone())
+ .is_ok());
+ assert_eq!(data, rx.next().await.expect("should receive"));
+ // add incoming to listening
+ let (tx, mut rx) = mpsc::unbounded();
+ assert_eq!(
+ listening_manager.add_connection(connecting_id.clone(), tx),
+ Added
+ );
+ // send and receive listening
+ assert!(listening_manager
+ .send_to(&connecting_id, data.clone())
+ .is_ok());
+ assert_eq!(data, rx.next().await.expect("should receive"));
+ // remove peer
+ listening_manager.remove_peer(&connecting_id);
+ // receiving should fail
+ assert!(rx.next().await.is_none());
+ }
+}
diff --git a/blockchain/clique/src/metrics.rs b/blockchain/clique/src/metrics.rs
new file mode 100644
index 000000000..0e72d8f49
--- /dev/null
+++ b/blockchain/clique/src/metrics.rs
@@ -0,0 +1,119 @@
+// بِسْمِ اللَّهِ الرَّحْمَنِ الرَّحِيم
+
+// This file is part of Setheum.
+
+// Copyright (C) 2019-Present Setheum Labs.
+// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
+
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+
+// You should have received a copy of the GNU General Public License
+// along with this program. If not, see .
+
+use substrate_prometheus_endpoint::{register, Gauge, PrometheusError, Registry, U64};
+
+#[derive(Clone)]
+pub enum Metrics {
+ Prometheus {
+ incoming_connections: Gauge,
+ missing_incoming_connections: Gauge,
+ outgoing_connections: Gauge,
+ missing_outgoing_connections: Gauge,
+ },
+ Noop,
+}
+
+pub enum Event {
+ NewOutgoing,
+ NewIncoming,
+ DelOutgoing,
+ DelIncoming,
+ ConnectedOutgoing,
+ ConnectedIncoming,
+ DisconnectedOutgoing,
+ DisconnectedIncoming,
+}
+
+impl Metrics {
+ pub fn new(registry: Option) -> Result {
+ match registry {
+ Some(registry) => Ok(Metrics::Prometheus {
+ incoming_connections: register(
+ Gauge::new(
+ "clique_network_incoming_connections",
+ "present incoming connections",
+ )?,
+ ®istry,
+ )?,
+ missing_incoming_connections: register(
+ Gauge::new(
+ "clique_network_missing_incoming_connections",
+ "difference between expected and present incoming connections",
+ )?,
+ ®istry,
+ )?,
+ outgoing_connections: register(
+ Gauge::new(
+ "clique_network_outgoing_connections",
+ "present outgoing connections",
+ )?,
+ ®istry,
+ )?,
+ missing_outgoing_connections: register(
+ Gauge::new(
+ "clique_network_missing_outgoing_connections",
+ "difference between expected and present outgoing connections",
+ )?,
+ ®istry,
+ )?,
+ }),
+ None => Ok(Metrics::Noop),
+ }
+ }
+
+ pub fn noop() -> Self {
+ Metrics::Noop
+ }
+
+ pub fn report_event(&self, event: Event) {
+ use Event::*;
+ if let Metrics::Prometheus {
+ incoming_connections,
+ outgoing_connections,
+ missing_incoming_connections,
+ missing_outgoing_connections,
+ } = self
+ {
+ match event {
+ NewIncoming => missing_incoming_connections.inc(),
+ NewOutgoing => missing_outgoing_connections.inc(),
+ DelIncoming => missing_incoming_connections.dec(),
+ DelOutgoing => missing_outgoing_connections.dec(),
+ ConnectedIncoming => {
+ incoming_connections.inc();
+ missing_incoming_connections.dec();
+ }
+ ConnectedOutgoing => {
+ outgoing_connections.inc();
+ missing_outgoing_connections.dec();
+ }
+ DisconnectedIncoming => {
+ incoming_connections.dec();
+ missing_incoming_connections.inc();
+ }
+ DisconnectedOutgoing => {
+ outgoing_connections.dec();
+ missing_outgoing_connections.inc();
+ }
+ }
+ }
+ }
+}
diff --git a/blockchain/clique/src/mock.rs b/blockchain/clique/src/mock.rs
new file mode 100644
index 000000000..a6d874c46
--- /dev/null
+++ b/blockchain/clique/src/mock.rs
@@ -0,0 +1,693 @@
+// بِسْمِ اللَّهِ الرَّحْمَنِ الرَّحِيم
+
+// This file is part of Setheum.
+
+// Copyright (C) 2019-Present Setheum Labs.
+// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
+
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+
+// You should have received a copy of the GNU General Public License
+// along with this program. If not, see .
+
+use std::{
+ collections::HashMap,
+ fmt::{Display, Error as FmtError, Formatter},
+ io::Result as IoResult,
+ pin::Pin,
+ sync::Arc,
+ task::{Context, Poll},
+ time::Duration,
+};
+
+use futures::{
+ channel::{mpsc, mpsc::UnboundedReceiver, oneshot},
+ Future, StreamExt,
+};
+use log::info;
+use parity_scale_codec::{Decode, Encode, Output};
+use rand::Rng;
+use tokio::{
+ io::{duplex, AsyncRead, AsyncWrite, DuplexStream, ReadBuf},
+ time::timeout,
+};
+
+use crate::{
+ protocols::{ProtocolError, ResultForService},
+ AddressingInformation, ConnectionInfo, Data, Dialer, Listener, Network, NetworkIdentity,
+ PeerAddressInfo, PeerId, PublicKey, SecretKey, Splittable, LOG_TARGET,
+};
+
+#[derive(Hash, Debug, Clone, PartialEq, Eq)]
+pub struct MockData {
+ data: u32,
+ filler: Vec,
+ decodes: bool,
+}
+
+impl MockData {
+ pub fn new(data: u32, filler_size: usize) -> MockData {
+ MockData {
+ data,
+ filler: vec![0; filler_size],
+ decodes: true,
+ }
+ }
+
+ pub fn new_undecodable(data: u32, filler_size: usize) -> MockData {
+ MockData {
+ data,
+ filler: vec![0; filler_size],
+ decodes: false,
+ }
+ }
+
+ pub fn data(&self) -> u32 {
+ self.data
+ }
+}
+
+impl Encode for MockData {
+ fn size_hint(&self) -> usize {
+ self.data.size_hint() + self.filler.size_hint() + self.decodes.size_hint()
+ }
+
+ fn encode_to(&self, dest: &mut T) {
+ // currently this is exactly the default behaviour, but we still
+ // need it here to make sure that decode works in the future
+ self.data.encode_to(dest);
+ self.filler.encode_to(dest);
+ self.decodes.encode_to(dest);
+ }
+}
+
+impl Decode for MockData {
+ fn decode(
+ value: &mut I,
+ ) -> Result {
+ let data = u32::decode(value)?;
+ let filler = Vec::::decode(value)?;
+ let decodes = bool::decode(value)?;
+ if !decodes {
+ return Err("Simulated decode failure.".into());
+ }
+ Ok(Self {
+ data,
+ filler,
+ decodes,
+ })
+ }
+}
+
+#[derive(Clone)]
+pub struct Channel(
+ pub mpsc::UnboundedSender,
+ pub Arc>>,
+);
+
+const TIMEOUT_FAIL: Duration = Duration::from_secs(10);
+
+impl Channel {
+ pub fn new() -> Self {
+ let (tx, rx) = mpsc::unbounded();
+ Channel(tx, Arc::new(tokio::sync::Mutex::new(rx)))
+ }
+
+ pub fn send(&self, msg: T) {
+ self.0.unbounded_send(msg).unwrap();
+ }
+
+ pub async fn next(&mut self) -> Option {
+ timeout(TIMEOUT_FAIL, self.1.lock().await.next())
+ .await
+ .ok()
+ .flatten()
+ }
+
+ pub async fn take(&mut self, n: usize) -> Vec {
+ timeout(
+ TIMEOUT_FAIL,
+ self.1.lock().await.by_ref().take(n).collect::>(),
+ )
+ .await
+ .unwrap_or_default()
+ }
+
+ pub async fn try_next(&self) -> Option {
+ self.1.lock().await.try_next().unwrap_or(None)
+ }
+
+ pub async fn close(self) -> Option {
+ self.0.close_channel();
+ self.try_next().await
+ }
+}
+
+impl Default for Channel {
+ fn default() -> Self {
+ Self::new()
+ }
+}
+
+/// A mock secret key that is able to sign messages.
+#[derive(Debug, PartialEq, Eq, Clone, Hash)]
+pub struct MockSecretKey([u8; 4]);
+
+/// A mock public key for verifying signatures.
+#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Hash, Encode, Decode)]
+pub struct MockPublicKey([u8; 4]);
+
+impl Display for MockPublicKey {
+ fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), FmtError> {
+ write!(f, "PublicKey({:?})", self.0)
+ }
+}
+
+impl AsRef<[u8]> for MockPublicKey {
+ fn as_ref(&self) -> &[u8] {
+ self.0.as_ref()
+ }
+}
+
+/// A mock signature, able to discern whether the correct key has been used to sign a specific
+/// message.
+#[derive(Debug, PartialEq, Eq, Clone, Hash, Encode, Decode)]
+pub struct MockSignature {
+ message: Vec,
+ key_id: [u8; 4],
+}
+
+impl PublicKey for MockPublicKey {
+ type Signature = MockSignature;
+
+ fn verify(&self, message: &[u8], signature: &Self::Signature) -> bool {
+ (message == signature.message.as_slice()) && (self.0 == signature.key_id)
+ }
+}
+
+impl PeerId for MockPublicKey {}
+
+impl SecretKey for MockSecretKey {
+ type Signature = MockSignature;
+ type PublicKey = MockPublicKey;
+
+ fn sign(&self, message: &[u8]) -> Self::Signature {
+ MockSignature {
+ message: message.to_vec(),
+ key_id: self.0,
+ }
+ }
+
+ fn public_key(&self) -> Self::PublicKey {
+ MockPublicKey(self.0)
+ }
+}
+
+/// Create a random key pair.
+pub fn key() -> (MockPublicKey, MockSecretKey) {
+ let secret_key = MockSecretKey(rand::random());
+ (secret_key.public_key(), secret_key)
+}
+
+/// Create a HashMap with public keys as keys and secret keys as values.
+pub fn random_keys(n_peers: usize) -> HashMap {
+ let mut result = HashMap::with_capacity(n_peers);
+ while result.len() < n_peers {
+ let (pk, sk) = key();
+ result.insert(pk, sk);
+ }
+ result
+}
+
+/// A mock that can be split into two streams.
+pub struct MockSplittable {
+ incoming_data: DuplexStream,
+ outgoing_data: DuplexStream,
+}
+
+impl MockSplittable {
+ /// Create a pair of mock splittables connected to each other.
+ pub fn new(max_buf_size: usize) -> (Self, Self) {
+ let (in_a, out_b) = duplex(max_buf_size);
+ let (in_b, out_a) = duplex(max_buf_size);
+ (
+ MockSplittable {
+ incoming_data: in_a,
+ outgoing_data: out_a,
+ },
+ MockSplittable {
+ incoming_data: in_b,
+ outgoing_data: out_b,
+ },
+ )
+ }
+}
+
+impl AsyncRead for MockSplittable {
+ fn poll_read(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ buf: &mut ReadBuf<'_>,
+ ) -> Poll> {
+ Pin::new(&mut self.get_mut().incoming_data).poll_read(cx, buf)
+ }
+}
+
+impl AsyncWrite for MockSplittable {
+ fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll> {
+ Pin::new(&mut self.get_mut().outgoing_data).poll_write(cx, buf)
+ }
+
+ fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> {
+ Pin::new(&mut self.get_mut().outgoing_data).poll_flush(cx)
+ }
+
+ fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> {
+ Pin::new(&mut self.get_mut().outgoing_data).poll_shutdown(cx)
+ }
+}
+
+impl ConnectionInfo for MockSplittable {
+ fn peer_address_info(&self) -> PeerAddressInfo {
+ String::from("MOCK_ADDRESS")
+ }
+}
+
+impl ConnectionInfo for DuplexStream {
+ fn peer_address_info(&self) -> PeerAddressInfo {
+ String::from("MOCK_ADDRESS")
+ }
+}
+
+impl Splittable for MockSplittable {
+ type Sender = DuplexStream;
+ type Receiver = DuplexStream;
+
+ fn split(self) -> (Self::Sender, Self::Receiver) {
+ (self.outgoing_data, self.incoming_data)
+ }
+}
+
+#[derive(Clone, Debug, PartialEq, Eq, Hash, Encode, Decode)]
+pub struct MockAddressingInformation {
+ peer_id: MockPublicKey,
+ address: String,
+ valid: bool,
+}
+
+impl AddressingInformation for MockAddressingInformation {
+ type PeerId = MockPublicKey;
+
+ fn peer_id(&self) -> Self::PeerId {
+ self.peer_id.clone()
+ }
+
+ fn verify(&self) -> bool {
+ self.valid
+ }
+
+ fn address(&self) -> String {
+ self.address.clone()
+ }
+}
+
+impl NetworkIdentity for MockAddressingInformation {
+ type PeerId = MockPublicKey;
+ type AddressingInformation = MockAddressingInformation;
+
+ fn identity(&self) -> Self::AddressingInformation {
+ self.clone()
+ }
+}
+
+impl From for Vec {
+ fn from(address: MockAddressingInformation) -> Self {
+ vec![address]
+ }
+}
+
+impl TryFrom> for MockAddressingInformation {
+ type Error = ();
+
+ fn try_from(mut addresses: Vec) -> Result {
+ match addresses.pop() {
+ Some(address) => Ok(address),
+ None => Err(()),
+ }
+ }
+}
+
+pub fn random_peer_id() -> MockPublicKey {
+ key().0
+}
+
+pub fn random_address_from(address: String, valid: bool) -> MockAddressingInformation {
+ let peer_id = random_peer_id();
+ MockAddressingInformation {
+ peer_id,
+ address,
+ valid,
+ }
+}
+
+pub fn random_address() -> MockAddressingInformation {
+ random_address_from(
+ rand::thread_rng()
+ .sample_iter(&rand::distributions::Alphanumeric)
+ .map(char::from)
+ .take(43)
+ .collect(),
+ true,
+ )
+}
+
+pub fn random_invalid_address() -> MockAddressingInformation {
+ random_address_from(
+ rand::thread_rng()
+ .sample_iter(&rand::distributions::Alphanumeric)
+ .map(char::from)
+ .take(43)
+ .collect(),
+ false,
+ )
+}
+
+#[derive(Clone)]
+pub struct MockNetwork {
+ pub add_connection: Channel<(MockPublicKey, MockAddressingInformation)>,
+ pub remove_connection: Channel,
+ pub send: Channel<(D, MockPublicKey)>,
+ pub next: Channel,
+}
+
+#[async_trait::async_trait]
+impl Network for MockNetwork {
+ fn add_connection(&mut self, peer: MockPublicKey, address: MockAddressingInformation) {
+ self.add_connection.send((peer, address));
+ }
+
+ fn remove_connection(&mut self, peer: MockPublicKey) {
+ self.remove_connection.send(peer);
+ }
+
+ fn send(&self, data: D, recipient: MockPublicKey) {
+ self.send.send((data, recipient));
+ }
+
+ async fn next(&mut self) -> Option {
+ self.next.next().await
+ }
+}
+
+impl MockNetwork {
+ pub fn new() -> Self {
+ MockNetwork {
+ add_connection: Channel::new(),
+ remove_connection: Channel::new(),
+ send: Channel::new(),
+ next: Channel::new(),
+ }
+ }
+
+ // Consumes the network asserting there are no unreceived messages in the channels.
+ pub async fn close_channels(self) {
+ assert!(self.add_connection.close().await.is_none());
+ assert!(self.remove_connection.close().await.is_none());
+ assert!(self.send.close().await.is_none());
+ assert!(self.next.close().await.is_none());
+ }
+}
+
+impl Default for MockNetwork {
+ fn default() -> Self {
+ Self::new()
+ }
+}
+
+/// Bidirectional in-memory stream that closes abruptly after a specified
+/// number of poll_write calls.
+#[derive(Debug)]
+pub struct UnreliableDuplexStream {
+ stream: DuplexStream,
+ counter: Option,
+ peer_address: Address,
+}
+
+impl AsyncWrite for UnreliableDuplexStream {
+ fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll> {
+ let this = self.get_mut();
+ if let Some(ref mut c) = this.counter {
+ if c == &0 {
+ if Pin::new(&mut this.stream).poll_shutdown(cx).is_pending() {
+ return Poll::Pending;
+ }
+ } else {
+ *c -= 1;
+ }
+ }
+ Pin::new(&mut this.stream).poll_write(cx, buf)
+ }
+
+ fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> {
+ Pin::new(&mut self.get_mut().stream).poll_flush(cx)
+ }
+
+ fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> {
+ Pin::new(&mut self.get_mut().stream).poll_shutdown(cx)
+ }
+}
+
+impl AsyncRead for UnreliableDuplexStream {
+ fn poll_read(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ buf: &mut ReadBuf<'_>,
+ ) -> Poll> {
+ Pin::new(&mut self.get_mut().stream).poll_read(cx, buf)
+ }
+}
+
+/// A stream that can be split into two instances of UnreliableDuplexStream.
+#[derive(Debug)]
+pub struct UnreliableSplittable {
+ incoming_data: UnreliableDuplexStream,
+ outgoing_data: UnreliableDuplexStream,
+ peer_address: Address,
+}
+
+impl UnreliableSplittable {
+ /// Create a pair of mock splittables connected to each other.
+ pub fn new(
+ max_buf_size: usize,
+ ends_after: Option,
+ l_address: Address,
+ r_address: Address,
+ ) -> (Self, Self) {
+ let (l_in, r_out) = duplex(max_buf_size);
+ let (r_in, l_out) = duplex(max_buf_size);
+ (
+ UnreliableSplittable {
+ incoming_data: UnreliableDuplexStream {
+ stream: l_in,
+ counter: ends_after,
+ peer_address: r_address,
+ },
+ outgoing_data: UnreliableDuplexStream {
+ stream: l_out,
+ counter: ends_after,
+ peer_address: r_address,
+ },
+ peer_address: r_address,
+ },
+ UnreliableSplittable {
+ incoming_data: UnreliableDuplexStream {
+ stream: r_in,
+ counter: ends_after,
+ peer_address: l_address,
+ },
+ outgoing_data: UnreliableDuplexStream {
+ stream: r_out,
+ counter: ends_after,
+ peer_address: l_address,
+ },
+ peer_address: l_address,
+ },
+ )
+ }
+}
+
+impl AsyncRead for UnreliableSplittable {
+ fn poll_read(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ buf: &mut ReadBuf<'_>,
+ ) -> Poll> {
+ Pin::new(&mut self.get_mut().incoming_data).poll_read(cx, buf)
+ }
+}
+
+impl AsyncWrite for UnreliableSplittable {
+ fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll> {
+ Pin::new(&mut self.get_mut().outgoing_data).poll_write(cx, buf)
+ }
+
+ fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> {
+ Pin::new(&mut self.get_mut().outgoing_data).poll_flush(cx)
+ }
+
+ fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> {
+ Pin::new(&mut self.get_mut().outgoing_data).poll_shutdown(cx)
+ }
+}
+
+impl ConnectionInfo for UnreliableSplittable {
+ fn peer_address_info(&self) -> PeerAddressInfo {
+ self.peer_address.to_string()
+ }
+}
+
+impl ConnectionInfo for UnreliableDuplexStream {
+ fn peer_address_info(&self) -> PeerAddressInfo {
+ self.peer_address.to_string()
+ }
+}
+
+impl Splittable for UnreliableSplittable {
+ type Sender = UnreliableDuplexStream;
+ type Receiver = UnreliableDuplexStream;
+
+ fn split(self) -> (Self::Sender, Self::Receiver) {
+ (self.outgoing_data, self.incoming_data)
+ }
+}
+
+type Address = u32;
+pub type Addresses = HashMap;
+type Callers = HashMap;
+type Connection = UnreliableSplittable;
+
+#[derive(Clone)]
+pub struct MockDialer {
+ // used for logging
+ own_address: Address,
+ channel_connect: mpsc::UnboundedSender<(Address, Address, oneshot::Sender)>,
+}
+
+#[async_trait::async_trait]
+impl Dialer for MockDialer {
+ type Connection = Connection;
+ type Error = std::io::Error;
+
+ async fn connect(&mut self, address: Address) -> Result {
+ let (tx, rx) = oneshot::channel();
+ self.channel_connect
+ .unbounded_send((self.own_address, address, tx))
+ .expect("should send");
+ Ok(rx.await.expect("should receive"))
+ }
+}
+
+pub struct MockListener {
+ channel_accept: mpsc::UnboundedReceiver,
+}
+
+#[async_trait::async_trait]
+impl Listener for MockListener {
+ type Connection = Connection;
+ type Error = std::io::Error;
+
+ async fn accept(&mut self) -> Result {
+ Ok(self.channel_accept.next().await.expect("should receive"))
+ }
+}
+
+pub struct UnreliableConnectionMaker {
+ dialers: mpsc::UnboundedReceiver<(Address, Address, oneshot::Sender)>,
+ listeners: Vec>,
+}
+
+impl UnreliableConnectionMaker {
+ pub fn new(ids: Vec) -> (Self, Callers, Addresses) {
+ let mut listeners = Vec::with_capacity(ids.len());
+ let mut callers = HashMap::with_capacity(ids.len());
+ let (tx_dialer, dialers) = mpsc::unbounded();
+ // create peer addresses that will be understood by the main loop in method run
+ // each peer gets a one-element vector containing its index, so we'll be able
+ // to retrieve proper communication channels
+ let addr: Addresses = ids
+ .clone()
+ .into_iter()
+ .zip(0..ids.len())
+ .map(|(id, u)| (id, u as u32))
+ .collect();
+ // create callers for every peer, keep channels for communicating with them
+ for id in ids.into_iter() {
+ let (tx_listener, rx_listener) = mpsc::unbounded();
+ let dialer = MockDialer {
+ own_address: *addr.get(&id).expect("should be there"),
+ channel_connect: tx_dialer.clone(),
+ };
+ let listener = MockListener {
+ channel_accept: rx_listener,
+ };
+ listeners.push(tx_listener);
+ callers.insert(id, (dialer, listener));
+ }
+ (
+ UnreliableConnectionMaker { dialers, listeners },
+ callers,
+ addr,
+ )
+ }
+
+ pub async fn run(&mut self, connections_end_after: Option) {
+ loop {
+ info!(
+ target: LOG_TARGET,
+ "UnreliableConnectionMaker: waiting for new request..."
+ );
+ let (dialer_address, listener_address, c) =
+ self.dialers.next().await.expect("should receive");
+ info!(
+ target: LOG_TARGET,
+ "UnreliableConnectionMaker: received request"
+ );
+ let (dialer_stream, listener_stream) = Connection::new(
+ 4096,
+ connections_end_after,
+ dialer_address,
+ listener_address,
+ );
+ info!(
+ target: LOG_TARGET,
+ "UnreliableConnectionMaker: sending stream"
+ );
+ c.send(dialer_stream).expect("should send");
+ self.listeners[listener_address as usize]
+ .unbounded_send(listener_stream)
+ .expect("should send");
+ }
+ }
+}
+
+pub struct MockPrelims {
+ pub id_incoming: MockPublicKey,
+ pub pen_incoming: MockSecretKey,
+ pub id_outgoing: MockPublicKey,
+ pub pen_outgoing: MockSecretKey,
+ pub incoming_handle: Pin>>>>,
+ pub outgoing_handle: Pin>>>>,
+ pub data_from_incoming: UnboundedReceiver,
+ pub data_from_outgoing: Option>,
+ pub result_from_incoming: UnboundedReceiver>,
+ pub result_from_outgoing: UnboundedReceiver>,
+ pub authorization_requests: mpsc::UnboundedReceiver<(MockPublicKey, oneshot::Sender)>,
+}
diff --git a/blockchain/clique/src/outgoing.rs b/blockchain/clique/src/outgoing.rs
new file mode 100644
index 000000000..f69d54d22
--- /dev/null
+++ b/blockchain/clique/src/outgoing.rs
@@ -0,0 +1,135 @@
+// بِسْمِ اللَّهِ الرَّحْمَنِ الرَّحِيم
+
+// This file is part of Setheum.
+
+// Copyright (C) 2019-Present Setheum Labs.
+// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
+
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+
+// You should have received a copy of the GNU General Public License
+// along with this program. If not, see .
+
+use std::fmt::{Debug, Display, Error as FmtError, Formatter};
+
+use futures::channel::mpsc;
+use log::{debug, info};
+use tokio::time::{sleep, timeout, Duration};
+
+use crate::{
+ metrics::Metrics,
+ protocols::{protocol, ProtocolError, ProtocolNegotiationError, ResultForService},
+ ConnectionInfo, Data, Dialer, PeerAddressInfo, PublicKey, SecretKey, LOG_TARGET,
+};
+
+enum OutgoingError> {
+ Dial(ND::Error),
+ ProtocolNegotiation(PeerAddressInfo, ProtocolNegotiationError),
+ Protocol(PeerAddressInfo, ProtocolError),
+ TimedOut,
+}
+
+impl> Display for OutgoingError {
+ fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), FmtError> {
+ use OutgoingError::*;
+ match self {
+ Dial(e) => write!(f, "dial error: {e}"),
+ ProtocolNegotiation(addr, e) => write!(
+ f,
+ "communication with {addr} failed, protocol negotiation error: {e}"
+ ),
+ Protocol(addr, e) => write!(f, "communication with {addr} failed, protocol error: {e}"),
+ TimedOut => write!(f, "dial timeout",),
+ }
+ }
+}
+
+/// Arbitrarily chosen timeout, should be more than enough.
+const DIAL_TIMEOUT: Duration = Duration::from_secs(60);
+
+async fn manage_outgoing>(
+ secret_key: SK,
+ public_key: SK::PublicKey,
+ mut dialer: ND,
+ address: A,
+ result_for_parent: mpsc::UnboundedSender>,
+ data_for_user: mpsc::UnboundedSender,
+ metrics: Metrics,
+) -> Result<(), OutgoingError> {
+ debug!(target: LOG_TARGET, "Trying to connect to {}.", public_key);
+ let stream = timeout(DIAL_TIMEOUT, dialer.connect(address))
+ .await
+ .map_err(|_| OutgoingError::TimedOut)?
+ .map_err(OutgoingError::Dial)?;
+ let peer_address_info = stream.peer_address_info();
+ debug!(
+ target: LOG_TARGET,
+ "Performing outgoing protocol negotiation."
+ );
+ let (stream, protocol) = protocol(stream)
+ .await
+ .map_err(|e| OutgoingError::ProtocolNegotiation(peer_address_info.clone(), e))?;
+ debug!(target: LOG_TARGET, "Negotiated protocol, running.");
+ protocol
+ .manage_outgoing(
+ stream,
+ secret_key,
+ public_key,
+ result_for_parent,
+ data_for_user,
+ metrics,
+ )
+ .await
+ .map_err(|e| OutgoingError::Protocol(peer_address_info.clone(), e))
+}
+
+const RETRY_DELAY: Duration = Duration::from_secs(10);
+
+/// Establish an outgoing connection to the provided peer using the dialer and then manage it.
+/// While this works it will send any data from the user to the peer. Any failures will be reported
+/// to the parent, so that connections can be reestablished if necessary.
+pub async fn outgoing>(
+ secret_key: SK,
+ public_key: SK::PublicKey,
+ dialer: ND,
+ address: A,
+ result_for_parent: mpsc::UnboundedSender>,
+ data_for_user: mpsc::UnboundedSender,
+ metrics: Metrics,
+) {
+ if let Err(e) = manage_outgoing(
+ secret_key,
+ public_key.clone(),
+ dialer,
+ address.clone(),
+ result_for_parent.clone(),
+ data_for_user,
+ metrics,
+ )
+ .await
+ {
+ info!(
+ target: LOG_TARGET,
+ "Outgoing connection to {} {:?} failed: {}, will retry after {}s.",
+ public_key,
+ address,
+ e,
+ RETRY_DELAY.as_secs()
+ );
+ sleep(RETRY_DELAY).await;
+ if result_for_parent
+ .unbounded_send((public_key, None))
+ .is_err()
+ {
+ debug!(target: LOG_TARGET, "Could not send the closing message, we've probably been terminated by the parent service.");
+ }
+ }
+}
diff --git a/blockchain/clique/src/protocols/handshake.rs b/blockchain/clique/src/protocols/handshake.rs
new file mode 100644
index 000000000..27f0c7a2b
--- /dev/null
+++ b/blockchain/clique/src/protocols/handshake.rs
@@ -0,0 +1,384 @@
+// بِسْمِ اللَّهِ الرَّحْمَنِ الرَّحِيم
+
+// This file is part of Setheum.
+
+// Copyright (C) 2019-Present Setheum Labs.
+// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
+
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+
+// You should have received a copy of the GNU General Public License
+// along with this program. If not, see .
+
+use std::fmt::{Display, Error as FmtError, Formatter};
+
+use parity_scale_codec::{Decode, Encode};
+use rand::Rng;
+use tokio::time::{timeout, Duration};
+
+use crate::{
+ io::{receive_data, send_data, ReceiveError, SendError},
+ PublicKey, SecretKey, Splittable,
+};
+
+pub const HANDSHAKE_TIMEOUT: Duration = Duration::from_secs(10);
+
+/// Handshake error.
+#[derive(Debug)]
+pub enum HandshakeError {
+ /// Send error.
+ SendError(SendError),
+ /// Receive error.
+ ReceiveError(ReceiveError),
+ /// Signature error.
+ SignatureError,
+ /// Challenge contains invalid peer id.
+ ChallengeError(PK, PK),
+ /// Timeout.
+ TimedOut,
+}
+
+impl Display for HandshakeError {
+ fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), FmtError> {
+ use HandshakeError::*;
+ match self {
+ SendError(e) => write!(f, "send error: {e}"),
+ ReceiveError(e) => write!(f, "receive error: {e}"),
+ SignatureError => write!(f, "signature error"),
+ ChallengeError(expected, got) => write!(
+ f,
+ "challenge error, expected peer {expected}, received from {got}"
+ ),
+ TimedOut => write!(f, "timed out"),
+ }
+ }
+}
+
+impl From for HandshakeError {
+ fn from(e: SendError) -> Self {
+ HandshakeError::SendError(e)
+ }
+}
+
+impl From for HandshakeError {
+ fn from(e: ReceiveError) -> Self {
+ HandshakeError::ReceiveError(e)
+ }
+}
+
+/// Handshake challenge. Contains public key of the creator, and a random nonce.
+#[derive(Debug, Clone, Encode, Decode)]
+struct Challenge {
+ public_key: PK,
+ nonce: [u8; 32],
+}
+
+impl Challenge {
+ /// Prepare new challenge that contains ID of the creator.
+ fn new(public_key: PK) -> Self {
+ let nonce = rand::thread_rng().gen::<[u8; 32]>();
+ Self { public_key, nonce }
+ }
+}
+
+/// Handshake response. Contains public key of the creator, and signature
+/// related to the received challenge.
+#[derive(Debug, Clone, Encode, Decode)]
+struct Response {
+ public_key: PK,
+ signature: PK::Signature,
+}
+
+impl Response {
+ // Amusingly the `Signature = PK::Signature` is necessary, the compiler cannot even do this
+ // simple reasoning. :/
+ /// Create a new response by signing the challenge.
+ fn new>(
+ secret_key: &SK,
+ challenge: &Challenge,
+ ) -> Self {
+ Self {
+ public_key: secret_key.public_key(),
+ signature: secret_key.sign(&challenge.encode()),
+ }
+ }
+
+ /// Verify the Response sent by the peer.
+ fn verify(&self, challenge: &Challenge) -> bool {
+ self.public_key.verify(&challenge.encode(), &self.signature)
+ }
+}
+
+/// Performs the handshake with a peer that called us.
+/// The goal is to obtain the public key of the peer, and split
+/// the communication stream into two halves.
+/// The peer needs to prove their identity by signing a randomly generated
+/// challenge, but apart from that, the returned communication channels
+/// will NOT be secured in any way. We assume that if the channel is
+/// compromised after the handshake, the peer will establish another connection,
+/// which will replace the current one.
+pub async fn execute_v0_handshake_incoming(
+ stream: S,
+ secret_key: SK,
+) -> Result<(S::Sender, S::Receiver, SK::PublicKey), HandshakeError> {
+ // send challenge
+ let our_challenge = Challenge::new(secret_key.public_key());
+ let stream = send_data(stream, our_challenge.clone()).await?;
+ // receive response
+ let (stream, peer_response) = receive_data::<_, Response>(stream).await?;
+ // validate response
+ if !peer_response.verify(&our_challenge) {
+ return Err(HandshakeError::SignatureError);
+ }
+ let (sender, receiver) = stream.split();
+ let public_key = peer_response.public_key;
+ Ok((sender, receiver, public_key))
+}
+
+/// Performs the handshake with a peer that we called. We assume that their
+/// public key is known to us.
+/// The goal is to authenticate ourselves, and split the communication stream
+/// into two halves.
+/// We need to prove our identity by signing a randomly generated
+/// challenge, but apart from that, the returned communication channels
+/// will NOT be secured in any way. We assume that if the channel is
+/// compromised after the handshake, we will establish another connection,
+/// which will replace the current one.
+pub async fn execute_v0_handshake_outgoing(
+ stream: S,
+ secret_key: SK,
+ public_key: SK::PublicKey,
+) -> Result<(S::Sender, S::Receiver), HandshakeError> {
+ // receive challenge
+ let (stream, peer_challenge) = receive_data::<_, Challenge>(stream).await?;
+ if public_key != peer_challenge.public_key {
+ return Err(HandshakeError::ChallengeError(
+ public_key,
+ peer_challenge.public_key,
+ ));
+ }
+ // send response
+ let our_response = Response::new(&secret_key, &peer_challenge);
+ let stream = send_data(stream, our_response).await?;
+ let (sender, receiver) = stream.split();
+ Ok((sender, receiver))
+}
+
+/// Wrapper that adds timeout to the function performing handshake.
+pub async fn v0_handshake_incoming(
+ stream: S,
+ secret_key: SK,
+) -> Result<(S::Sender, S::Receiver, SK::PublicKey), HandshakeError> {
+ timeout(
+ HANDSHAKE_TIMEOUT,
+ execute_v0_handshake_incoming(stream, secret_key),
+ )
+ .await
+ .map_err(|_| HandshakeError::TimedOut)?
+}
+
+/// Wrapper that adds timeout to the function performing handshake.
+pub async fn v0_handshake_outgoing