From 93ef1b23fbb22276d43cee916845edc730b207c3 Mon Sep 17 00:00:00 2001 From: Sabaun Taraki Date: Fri, 15 Nov 2024 11:11:13 +0300 Subject: [PATCH 1/9] Basic tx pool definition --- Cargo.lock | 4 +++ ethexe/tx-pool/Cargo.toml | 11 +++++++ ethexe/tx-pool/src/lib.rs | 62 +++++++++++++++++++++++++++++++++++++++ 3 files changed, 77 insertions(+) create mode 100644 ethexe/tx-pool/Cargo.toml create mode 100644 ethexe/tx-pool/src/lib.rs diff --git a/Cargo.lock b/Cargo.lock index 42b1e99a4da..03b3aa8e2cc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4729,6 +4729,10 @@ dependencies = [ "tempfile", ] +[[package]] +name = "ethexe-tx-pool" +version = "1.6.2" + [[package]] name = "ethexe-utils" version = "1.6.2" diff --git a/ethexe/tx-pool/Cargo.toml b/ethexe/tx-pool/Cargo.toml new file mode 100644 index 00000000000..83a23032f53 --- /dev/null +++ b/ethexe/tx-pool/Cargo.toml @@ -0,0 +1,11 @@ +[package] +name = "ethexe-tx-pool" +version.workspace = true +authors.workspace = true +edition.workspace = true +license.workspace = true +homepage.workspace = true +repository.workspace = true +rust-version.workspace = true + +[dependencies] diff --git a/ethexe/tx-pool/src/lib.rs b/ethexe/tx-pool/src/lib.rs new file mode 100644 index 00000000000..d8ddae1957a --- /dev/null +++ b/ethexe/tx-pool/src/lib.rs @@ -0,0 +1,62 @@ +// This file is part of Gear. +// +// Copyright (C) 2024 Gear Technologies Inc. +// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +//! Ethexe transaction pool. + +pub struct TranscationPool { + transactions: Vec, +} + +impl TranscationPool { + pub fn new() -> Self { + Self { + transactions: Vec::new(), + } + } +} + +impl TranscationPool { + pub fn add_new_transaction(&mut self, tx: Tx) -> Result<(), Tx::Error> { + tx + .validate() + .map(|_| self.transactions.push(tx)) + } +} + +pub trait Transaction { + type Error; + fn validate(&self) -> Result<(), Self::Error>; +} + +pub enum EthexeTransaction { + Message { + pub_key: Vec, + raw_message: Vec, + signed_message: Vec, + } +} + +impl Transaction for EthexeTransaction { + type Error = (); + + fn validate(&self) -> Result<(), Self::Error> { + match self { + EthexeTransaction::Message { pub_key, raw_message, signed_message } => todo!(), + } + } +} From ba8cc2a4ec5ae918dc82ac4023f4ebef75a68112 Mon Sep 17 00:00:00 2001 From: Sabaun Taraki Date: Tue, 19 Nov 2024 16:28:02 +0300 Subject: [PATCH 2/9] Add validation with checking signatures --- Cargo.lock | 4 +++ ethexe/signer/src/digest.rs | 2 +- ethexe/signer/src/lib.rs | 13 +++++++- ethexe/signer/src/signature.rs | 57 +++++++++++++++++++++++++++++----- ethexe/tx-pool/Cargo.toml | 2 ++ ethexe/tx-pool/src/lib.rs | 26 ++++++++++------ 6 files changed, 86 insertions(+), 18 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 03b3aa8e2cc..08346a76a74 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4732,6 +4732,10 @@ dependencies = [ [[package]] name = "ethexe-tx-pool" version = "1.6.2" +dependencies = [ + "anyhow", + "ethexe-signer", +] [[package]] name = "ethexe-utils" diff --git a/ethexe/signer/src/digest.rs b/ethexe/signer/src/digest.rs index 9eee8f61053..17ab724ef9b 100644 --- a/ethexe/signer/src/digest.rs +++ b/ethexe/signer/src/digest.rs @@ -39,7 +39,7 @@ use sha3::Digest as _; derive_more::Into, derive_more::AsRef, )] -pub struct Digest([u8; 32]); +pub struct Digest(pub(crate) [u8; 32]); impl fmt::Debug for Digest { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { diff --git a/ethexe/signer/src/lib.rs b/ethexe/signer/src/lib.rs index f9b57187f64..a6600ed9e23 100644 --- a/ethexe/signer/src/lib.rs +++ b/ethexe/signer/src/lib.rs @@ -21,14 +21,15 @@ mod digest; mod signature; +// Exports pub use digest::{Digest, ToDigest}; -use secp256k1::hashes::hex::{Case, DisplayHex}; pub use sha3; pub use signature::Signature; use anyhow::{anyhow, bail, Result}; use gprimitives::{ActorId, H160}; use parity_scale_codec::{Decode, Encode}; +use secp256k1::hashes::hex::{Case, DisplayHex}; use sha3::Digest as _; use signature::RawSignature; use std::{fmt, fs, path::PathBuf, str::FromStr}; @@ -122,6 +123,16 @@ impl PublicKey { } } +impl TryFrom<&[u8]> for PublicKey { + type Error = anyhow::Error; + + fn try_from(data: &[u8]) -> Result { + let bytes = <[u8; 33]>::try_from(data)?; + + Ok(Self::from_bytes(bytes)) + } +} + impl FromStr for PublicKey { type Err = anyhow::Error; diff --git a/ethexe/signer/src/signature.rs b/ethexe/signer/src/signature.rs index 4a91d51dab4..4b260ccfc38 100644 --- a/ethexe/signer/src/signature.rs +++ b/ethexe/signer/src/signature.rs @@ -19,7 +19,7 @@ //! Secp256k1 signature types and utilities. use crate::{Digest, PrivateKey, PublicKey}; -use anyhow::{Context, Result}; +use anyhow::{Context, Error, Result}; use parity_scale_codec::{Decode, Encode}; use secp256k1::{ ecdsa::{RecoverableSignature, RecoveryId}, @@ -48,6 +48,16 @@ impl RawSignature { } } +impl TryFrom<&[u8]> for RawSignature { + type Error = Error; + + fn try_from(data: &[u8]) -> Result { + let bytes = <[u8; 65]>::try_from(data)?; + + Ok(RawSignature(bytes)) + } +} + impl From for [u8; 65] { fn from(sig: RawSignature) -> [u8; 65] { sig.0 @@ -62,6 +72,7 @@ impl AsRef<[u8]> for RawSignature { impl From for RawSignature { fn from(mut sig: Signature) -> RawSignature { + // TODO: Include chain id, as that's for transaction of pre-EIP-155 (!) sig.0[64] -= 27; RawSignature(sig.0) } @@ -71,25 +82,56 @@ impl From for RawSignature { pub struct Signature([u8; 65]); impl Signature { + pub fn create_for_digest(private_key: PrivateKey, digest: Digest) -> Result { + let raw_signature = RawSignature::create_for_digest(private_key, digest)?; + Ok(raw_signature.into()) + } + pub fn to_hex(&self) -> String { hex::encode(self.0) } + pub fn verify(&self, digest: Digest) -> Result<()> { + let signature = (*self).try_into()?; + let public_key = self.recover_from_digest_with_signature(Some(signature), digest)?; + let secp256k1_pub_key = secp256k1::PublicKey::from_byte_array_compressed(&public_key.0)?; + let message = Message::from_digest(digest.0); + + secp256k1::global::SECP256K1 + .verify_ecdsa(&message, &signature.to_standard(), &secp256k1_pub_key) + .map_err(Into::into) + } + pub fn recover_from_digest(&self, digest: Digest) -> Result { - let sig = (*self).try_into()?; - let public_key = secp256k1::global::SECP256K1 - .recover_ecdsa(&Message::from_digest(digest.into()), &sig)?; - Ok(PublicKey::from_bytes(public_key.serialize())) + self.recover_from_digest_with_signature(None, digest) } - pub fn create_for_digest(private_key: PrivateKey, digest: Digest) -> Result { - let raw_signature = RawSignature::create_for_digest(private_key, digest)?; + fn recover_from_digest_with_signature( + &self, + signature: Option, + digest: Digest, + ) -> Result { + let signature = signature.unwrap_or((*self).try_into()?); + signature + .recover(&Message::from_digest(digest.0)) + .map(|pub_key| PublicKey::from_bytes(pub_key.serialize())) + .map_err(Into::into) + } +} + +impl TryFrom<&[u8]> for Signature { + type Error = Error; + + fn try_from(data: &[u8]) -> Result { + let raw_signature = RawSignature::try_from(data)?; + Ok(raw_signature.into()) } } impl From for Signature { fn from(mut sig: RawSignature) -> Self { + // TODO: Include chain id, as that's for transaction of pre-EIP-155 (!) sig.0[64] += 27; Signature(sig.0) } @@ -125,6 +167,7 @@ impl TryFrom for RecoverableSignature { fn try_from(sig: Signature) -> Result { RecoverableSignature::from_compact( sig.0[..64].as_ref(), + // TODO: Include chain id, as that's for transaction of pre-EIP-155 (!) RecoveryId::try_from((sig.0[64] - 27) as i32)?, ) .map_err(Into::into) diff --git a/ethexe/tx-pool/Cargo.toml b/ethexe/tx-pool/Cargo.toml index 83a23032f53..448f8b5aa97 100644 --- a/ethexe/tx-pool/Cargo.toml +++ b/ethexe/tx-pool/Cargo.toml @@ -9,3 +9,5 @@ repository.workspace = true rust-version.workspace = true [dependencies] +anyhow.workspace = true +ethexe-signer.workspace = true \ No newline at end of file diff --git a/ethexe/tx-pool/src/lib.rs b/ethexe/tx-pool/src/lib.rs index d8ddae1957a..d3780f46c00 100644 --- a/ethexe/tx-pool/src/lib.rs +++ b/ethexe/tx-pool/src/lib.rs @@ -18,6 +18,9 @@ //! Ethexe transaction pool. +use anyhow::Result; +use ethexe_signer::{Signature, ToDigest}; + pub struct TranscationPool { transactions: Vec, } @@ -32,31 +35,36 @@ impl TranscationPool { impl TranscationPool { pub fn add_new_transaction(&mut self, tx: Tx) -> Result<(), Tx::Error> { - tx - .validate() - .map(|_| self.transactions.push(tx)) + tx.validate().map(|_| self.transactions.push(tx)) } } -pub trait Transaction { +pub trait Transaction { type Error; fn validate(&self) -> Result<(), Self::Error>; } pub enum EthexeTransaction { Message { - pub_key: Vec, raw_message: Vec, - signed_message: Vec, - } + signature: Vec, + }, } impl Transaction for EthexeTransaction { - type Error = (); + type Error = anyhow::Error; fn validate(&self) -> Result<(), Self::Error> { match self { - EthexeTransaction::Message { pub_key, raw_message, signed_message } => todo!(), + EthexeTransaction::Message { + raw_message, + signature, + } => { + let message_digest = raw_message.to_digest(); + let signature = Signature::try_from(signature.as_ref())?; + + signature.verify(message_digest) + } } } } From 71fc8ee46735cf2a4ad0a75bb7570932f217d3ed Mon Sep 17 00:00:00 2001 From: Sabaun Taraki Date: Wed, 20 Nov 2024 17:04:29 +0300 Subject: [PATCH 3/9] Refactor signer crate, state important todos --- Cargo.lock | 1 + ethexe/signer/Cargo.toml | 2 +- ethexe/signer/src/address.rs | 100 ++++++++++++++ ethexe/signer/src/digest.rs | 5 +- ethexe/signer/src/lib.rs | 227 ++++++++----------------------- ethexe/signer/src/private_key.rs | 46 +++++++ ethexe/signer/src/public_key.rs | 109 +++++++++++++++ ethexe/signer/src/signature.rs | 90 ++++++------ ethexe/signer/src/utils.rs | 34 +++++ ethexe/tx-pool/src/lib.rs | 3 +- 10 files changed, 400 insertions(+), 217 deletions(-) create mode 100644 ethexe/signer/src/address.rs create mode 100644 ethexe/signer/src/private_key.rs create mode 100644 ethexe/signer/src/public_key.rs create mode 100644 ethexe/signer/src/utils.rs diff --git a/Cargo.lock b/Cargo.lock index 08346a76a74..b28aaa0b502 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -273,6 +273,7 @@ checksum = "ea59dc42102bc9a1905dc57901edc6dd48b9f38115df86c7d252acba70d71d04" dependencies = [ "alloy-primitives", "alloy-rlp", + "k256", "serde", ] diff --git a/ethexe/signer/Cargo.toml b/ethexe/signer/Cargo.toml index 4c4fbdb4681..dcc4548a4aa 100644 --- a/ethexe/signer/Cargo.toml +++ b/ethexe/signer/Cargo.toml @@ -23,4 +23,4 @@ secp256k1 = { version = "0.30", features = ["rand", "global-context", "hashes", sha3 = { version = "0.10", default-features = false } [dev-dependencies] -alloy.workspace = true +alloy = { workspace = true, features = ["k256"] } diff --git a/ethexe/signer/src/address.rs b/ethexe/signer/src/address.rs new file mode 100644 index 00000000000..6ac7dcc0d72 --- /dev/null +++ b/ethexe/signer/src/address.rs @@ -0,0 +1,100 @@ +// This file is part of Gear. +// +// Copyright (C) 2024 Gear Technologies Inc. +// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +//! Ethereum address. + +use crate::{utils, PublicKey}; +use anyhow::{anyhow, Error, Result}; +use gprimitives::{ActorId, H160}; +use parity_scale_codec::{Decode, Encode}; +use sha3::Digest as _; +use std::{fmt, str::FromStr}; + +/// Ethereum address type. +/// +/// Basically a 20 bytes buffer, which is obtained from the least significant 20 bytes +/// of the hashed with keccak256 public key. +#[derive(Encode, Decode, Default, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)] +pub struct Address(pub [u8; 20]); + +impl Address { + /// Address hex string. + pub fn to_hex(&self) -> String { + hex::encode(self.0) + } +} + +impl From<[u8; 20]> for Address { + fn from(value: [u8; 20]) -> Self { + Self(value) + } +} + +impl From for Address { + fn from(value: H160) -> Self { + Self(value.into()) + } +} + +impl From for Address { + fn from(key: PublicKey) -> Self { + let public_key_uncompressed = secp256k1::PublicKey::from(key).serialize_uncompressed(); + + let mut address = Address::default(); + let hash = sha3::Keccak256::digest(&public_key_uncompressed[1..]); + address.0[..20].copy_from_slice(&hash[12..]); + + address + } +} + +impl FromStr for Address { + type Err = Error; + + fn from_str(s: &str) -> Result { + Ok(Self(utils::decode_to_array(s)?)) + } +} + +/// Tries to convert `ActorId`` into `Address`. +/// +/// Succeeds if first 12 bytes are 0. +impl TryFrom for Address { + type Error = Error; + + fn try_from(id: ActorId) -> Result { + id.as_ref() + .iter() + .take(12) + .all(|&byte| byte == 0) + .then_some(Address(id.to_address_lossy().0)) + .ok_or_else(|| anyhow!("First 12 bytes are not 0, it is not ethereum address")) + } +} + +impl fmt::Debug for Address { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "0x{}", self.to_hex()) + } +} + +impl fmt::Display for Address { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "0x{}", self.to_hex()) + } +} diff --git a/ethexe/signer/src/digest.rs b/ethexe/signer/src/digest.rs index 17ab724ef9b..6b50c76c7aa 100644 --- a/ethexe/signer/src/digest.rs +++ b/ethexe/signer/src/digest.rs @@ -16,7 +16,9 @@ // You should have received a copy of the GNU General Public License // along with this program. If not, see . -//! Keccak256 digest type. Implements AsDigest hashing for ethexe common types. +//! Keccak256 digest type. +//! +//! Implements `ToDigest` hashing for ethexe common types. use core::fmt; use ethexe_common::router::{ @@ -25,6 +27,7 @@ use ethexe_common::router::{ use parity_scale_codec::{Decode, Encode}; use sha3::Digest as _; +/// Common digest type for the ethexe. #[derive( Clone, Copy, diff --git a/ethexe/signer/src/lib.rs b/ethexe/signer/src/lib.rs index a6600ed9e23..4b474a1df4f 100644 --- a/ethexe/signer/src/lib.rs +++ b/ethexe/signer/src/lib.rs @@ -17,174 +17,49 @@ // along with this program. If not, see . //! Signer library for ethexe. - +//! +//! The crate defines types and related logic for private keys, public keys types, +//! cryptographic signatures and ethereum address. +//! +//! Cryptographic instrumentary of the crate is based on secp256k1 standard +//! using [secp256k1](https://crates.io/crates/secp256k1) crate, but all the +//! machinery used is wrapped in the crate's types. + +mod address; mod digest; +mod private_key; +mod public_key; mod signature; +mod utils; // Exports +pub use address::Address; pub use digest::{Digest, ToDigest}; +pub use private_key::PrivateKey; +pub use public_key::PublicKey; pub use sha3; pub use signature::Signature; -use anyhow::{anyhow, bail, Result}; -use gprimitives::{ActorId, H160}; -use parity_scale_codec::{Decode, Encode}; +use anyhow::{bail, Result}; use secp256k1::hashes::hex::{Case, DisplayHex}; -use sha3::Digest as _; use signature::RawSignature; -use std::{fmt, fs, path::PathBuf, str::FromStr}; - -#[derive(Debug, Clone, Copy, Eq, PartialEq)] -pub struct PublicKey(pub [u8; 33]); - -#[derive(Encode, Decode, Default, Clone, Copy, PartialEq, Eq, Hash)] -pub struct PrivateKey(pub [u8; 32]); - -impl From for PublicKey { - fn from(key: PrivateKey) -> Self { - let secret_key = - secp256k1::SecretKey::from_slice(&key.0[..]).expect("32 bytes, within curve order"); - let public_key = secp256k1::PublicKey::from_secret_key_global(&secret_key); - - PublicKey::from_bytes(public_key.serialize()) - } -} - -#[derive(Encode, Decode, Default, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)] -pub struct Address(pub [u8; 20]); - -impl From<[u8; 20]> for Address { - fn from(value: [u8; 20]) -> Self { - Self(value) - } -} - -impl From for Address { - fn from(value: H160) -> Self { - Self(value.into()) - } -} - -impl TryFrom for Address { - type Error = anyhow::Error; - - fn try_from(id: ActorId) -> std::result::Result { - id.as_ref() - .iter() - .take(12) - .all(|&byte| byte == 0) - .then_some(Address(id.to_address_lossy().0)) - .ok_or_else(|| anyhow!("First 12 bytes are not 0, it is not ethereum address")) - } -} - -fn strip_prefix(s: &str) -> &str { - if let Some(s) = s.strip_prefix("0x") { - s - } else { - s - } -} - -fn decode_to_array(s: &str) -> Result<[u8; N]> { - let mut buf = [0; N]; - hex::decode_to_slice(strip_prefix(s), &mut buf) - .map_err(|_| anyhow!("invalid hex format for {s:?}"))?; - Ok(buf) -} - -impl FromStr for PrivateKey { - type Err = anyhow::Error; - - fn from_str(s: &str) -> Result { - Ok(Self(decode_to_array(s)?)) - } -} - -impl PublicKey { - pub fn from_bytes(bytes: [u8; 33]) -> Self { - Self(bytes) - } - - pub fn to_hex(&self) -> String { - hex::encode(self.0) - } - - pub fn to_address(&self) -> Address { - let public_key_uncompressed = secp256k1::PublicKey::from_slice(&self.0) - .expect("Invalid public key") - .serialize_uncompressed(); - - let mut address = Address::default(); - let hash = sha3::Keccak256::digest(&public_key_uncompressed[1..]); - address.0[..20].copy_from_slice(&hash[12..]); - - address - } -} - -impl TryFrom<&[u8]> for PublicKey { - type Error = anyhow::Error; - - fn try_from(data: &[u8]) -> Result { - let bytes = <[u8; 33]>::try_from(data)?; - - Ok(Self::from_bytes(bytes)) - } -} - -impl FromStr for PublicKey { - type Err = anyhow::Error; - - fn from_str(s: &str) -> Result { - Ok(Self(decode_to_array(s)?)) - } -} - -impl Address { - pub fn to_hex(&self) -> String { - hex::encode(self.0) - } -} - -impl FromStr for Address { - type Err = anyhow::Error; - - fn from_str(s: &str) -> Result { - Ok(Self(decode_to_array(s)?)) - } -} - -impl fmt::Debug for Address { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - write!(f, "0x{}", self.to_hex()) - } -} - -impl fmt::Display for PublicKey { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - write!(f, "{}", self.to_hex()) - } -} - -impl fmt::Display for Address { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - write!(f, "0x{}", self.to_hex()) - } -} +use std::{fs, path::PathBuf, str::FromStr}; +/// Signer which signs data using owned key store. #[derive(Debug, Clone)] pub struct Signer { key_store: PathBuf, } impl Signer { + /// Create a new signer with a key store location. pub fn new(key_store: PathBuf) -> Result { fs::create_dir_all(key_store.as_path())?; Ok(Self { key_store }) } + /// Create a new signer with a key temporary key store location. pub fn tmp() -> Self { let temp_dir = tempfile::tempdir().expect("Cannot create temp dir for keys"); Self { @@ -192,34 +67,40 @@ impl Signer { } } + /// Create a ECDSA recoverable signature with `Electrum` notation for the `v` value. + /// + /// For more info about `v` value read [`RawSignature`] docs. pub fn raw_sign_digest(&self, public_key: PublicKey, digest: Digest) -> Result { let private_key = self.get_private_key(public_key)?; RawSignature::create_for_digest(private_key, digest) } + /// Create a ECDSA recoverable signature with pre or post EIP-155 `v` value notation. + // TODO [sab] change docs after introducing a parity pub fn sign_digest(&self, public_key: PublicKey, digest: Digest) -> Result { let private_key = self.get_private_key(public_key)?; Signature::create_for_digest(private_key, digest) } + /// Create a ECDSA recoverable signature for the raw bytes data. pub fn sign(&self, public_key: PublicKey, data: &[u8]) -> Result { self.sign_digest(public_key, data.to_digest()) } + /// Create a ECDSA recoverable signature for the raw bytes data with + /// an ethereum address provided instead of the public key. + /// + /// If the private key for the ethereum address is stored, the signature will be returned. pub fn sign_with_addr(&self, address: Address, data: &[u8]) -> Result { - let keys = self.list_keys()?; - - for key in keys { - if key.to_address() == address { - return self.sign(key, data); - } + match self.get_key_by_addr(address)? { + Some(public_key) => self.sign(public_key, data), + None => bail!("Address not found: {}", address), } - - bail!("Address not found: {}", address); } + /// Get a public key for the provided ethereum address. If no key found a `None` is returned. pub fn get_key_by_addr(&self, address: Address) -> Result> { let keys = self.list_keys()?; @@ -232,49 +113,55 @@ impl Signer { Ok(None) } + /// Check if key exists for the ethereum address. pub fn has_addr(&self, address: Address) -> Result { Ok(self.get_key_by_addr(address)?.is_some()) } + /// Check if key exists in the key store. pub fn has_key(&self, key: PublicKey) -> Result { let key_path = self.key_store.join(key.to_hex()); let has_key = fs::metadata(key_path).is_ok(); Ok(has_key) } + /// Add a private key to the key store. pub fn add_key(&self, key: PrivateKey) -> Result { - let secret_key = - secp256k1::SecretKey::from_slice(&key.0[..]).expect("32 bytes, within curve order"); - let public_key = secp256k1::PublicKey::from_secret_key_global(&secret_key); + let public_key: PublicKey = key.into(); - let local_public = PublicKey::from_bytes(public_key.serialize()); + let key_file = self.key_store.join(public_key.to_hex()); + fs::write(key_file, key.0)?; - let key_file = self.key_store.join(local_public.to_hex()); - fs::write(key_file, secret_key.secret_bytes())?; - Ok(local_public) + Ok(public_key) } + /// Generate a new private key and return a public key for it. pub fn generate_key(&self) -> Result { - let (secret_key, public_key) = + let (secp256k1_secret_key, secp256k1_public_key) = secp256k1::generate_keypair(&mut secp256k1::rand::thread_rng()); - let local_public = PublicKey::from_bytes(public_key.serialize()); + let public_key: PublicKey = secp256k1_public_key.into(); - let key_file = self.key_store.join(local_public.to_hex()); + let key_file = self.key_store.join(public_key.to_hex()); println!( "Secret key: {}", - secret_key.secret_bytes().to_hex_string(Case::Lower) + secp256k1_secret_key + .secret_bytes() + .to_hex_string(Case::Lower) ); - fs::write(key_file, secret_key.secret_bytes())?; - Ok(local_public) + fs::write(key_file, secp256k1_secret_key.secret_bytes())?; + + Ok(public_key) } + /// Remove all the keys from the key store. pub fn clear_keys(&self) -> Result<()> { fs::remove_dir_all(&self.key_store)?; Ok(()) } + /// Get a list of the stored public keys. pub fn list_keys(&self) -> Result> { let mut keys = vec![]; @@ -288,6 +175,7 @@ impl Signer { Ok(keys) } + /// Get a private key for the public one from the key store. pub fn get_private_key(&self, key: PublicKey) -> Result { let mut buf = [0u8; 32]; @@ -307,7 +195,8 @@ impl Signer { #[cfg(test)] mod tests { use super::*; - use alloy::primitives::{keccak256, Signature}; + use alloy::primitives::{keccak256, Signature as AlloySignature}; + use gprimitives::ActorId; use std::env::temp_dir; #[test] @@ -337,7 +226,7 @@ mod tests { let hash = keccak256(message); // Recover the address using the signature - let alloy_sig = Signature::try_from(signature.as_ref()).expect("failed to parse sig"); + let alloy_sig = AlloySignature::try_from(signature.as_ref()).expect("failed to parse sig"); let recovered_address = alloy_sig .recover_address_from_prehash(&hash) @@ -375,7 +264,7 @@ mod tests { let hash = keccak256(message); // Recover the address using the signature - let alloy_sig = Signature::try_from(signature.as_ref()).expect("failed to parse sig"); + let alloy_sig = AlloySignature::try_from(signature.as_ref()).expect("failed to parse sig"); let recovered_address = alloy_sig .recover_address_from_prehash(&hash) diff --git a/ethexe/signer/src/private_key.rs b/ethexe/signer/src/private_key.rs new file mode 100644 index 00000000000..99785001a62 --- /dev/null +++ b/ethexe/signer/src/private_key.rs @@ -0,0 +1,46 @@ +// This file is part of Gear. +// +// Copyright (C) 2024 Gear Technologies Inc. +// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +//! Private key type. + +use crate::utils; +use anyhow::{Error, Result}; +use parity_scale_codec::{Decode, Encode}; +use secp256k1::SecretKey as Secp256k1SecretKey; +use std::str::FromStr; + +/// Private key. +/// +/// Private key type used for elliptic curves maths for secp256k1 standard +/// is a 256 bits unsigned integer, which the type stores as a 32 bytes array. +#[derive(Encode, Decode, Default, Clone, Copy, PartialEq, Eq, Hash)] +pub struct PrivateKey(pub [u8; 32]); + +impl From for Secp256k1SecretKey { + fn from(key: PrivateKey) -> Self { + Secp256k1SecretKey::from_byte_array(&key.0).expect("32 bytes; within curve order") + } +} + +impl FromStr for PrivateKey { + type Err = Error; + + fn from_str(s: &str) -> Result { + Ok(Self(utils::decode_to_array(s)?)) + } +} diff --git a/ethexe/signer/src/public_key.rs b/ethexe/signer/src/public_key.rs new file mode 100644 index 00000000000..430a43dc038 --- /dev/null +++ b/ethexe/signer/src/public_key.rs @@ -0,0 +1,109 @@ +// This file is part of Gear. +// +// Copyright (C) 2024 Gear Technologies Inc. +// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +//! Public key type. + +use crate::{utils, Address, PrivateKey}; +use anyhow::{Error, Result}; +use secp256k1::PublicKey as Secp256k1PublicKey; +use std::{fmt, str::FromStr}; + +/// Public key. +/// +/// Basically, public key is a point on the elliptic curve, which should have +/// two coordinates - `x` and `y`, both 256 bits unsigned integers. But it's possible +/// to store only `x` coordinate, as `y` can be calculated. +/// +/// As the secp256k1 elliptic curve is symmetric, the y can be either positive or +/// negative. To stress the exact position of the `y` the prefix byte is used, so +/// the public key becomes 33 bytes, not 32. +#[derive(Debug, Clone, Copy, Eq, PartialEq)] +pub struct PublicKey(pub [u8; 33]); + +impl PublicKey { + /// Create public key from the private key. + /// + /// Only `ethexe-signer` types are used. + pub fn from_private(private_key: PrivateKey) -> Self { + let secret_key = private_key.into(); + let public_key = Secp256k1PublicKey::from_secret_key_global(&secret_key); + + public_key.into() + } + + pub fn try_from_slice(slice: &[u8]) -> Result { + let bytes = <[u8; 33]>::try_from(slice)?; + + Ok(Self::from_bytes(bytes)) + } + + /// Create public key from compressed public key bytes. + pub fn from_bytes(bytes: [u8; 33]) -> Self { + Self(bytes) + } + + /// Public key hex string. + pub fn to_hex(&self) -> String { + hex::encode(self.0) + } + + /// Convert public key to ethereum address. + pub fn to_address(&self) -> Address { + (*self).into() + } +} + +impl From for PublicKey { + fn from(key: PrivateKey) -> Self { + Self::from_private(key) + } +} + +impl From for PublicKey { + fn from(key: Secp256k1PublicKey) -> Self { + Self(key.serialize()) + } +} + +impl From for Secp256k1PublicKey { + fn from(key: PublicKey) -> Self { + Secp256k1PublicKey::from_byte_array_compressed(&key.0).expect("invalid public key") + } +} + +impl FromStr for PublicKey { + type Err = Error; + + fn from_str(s: &str) -> Result { + Ok(Self(utils::decode_to_array(s)?)) + } +} + +impl TryFrom<&[u8]> for PublicKey { + type Error = Error; + + fn try_from(data: &[u8]) -> Result { + Self::try_from_slice(data) + } +} + +impl fmt::Display for PublicKey { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "{}", self.to_hex()) + } +} diff --git a/ethexe/signer/src/signature.rs b/ethexe/signer/src/signature.rs index 4b260ccfc38..ec3810a5bee 100644 --- a/ethexe/signer/src/signature.rs +++ b/ethexe/signer/src/signature.rs @@ -19,7 +19,7 @@ //! Secp256k1 signature types and utilities. use crate::{Digest, PrivateKey, PublicKey}; -use anyhow::{Context, Error, Result}; +use anyhow::{Error, Result}; use parity_scale_codec::{Decode, Encode}; use secp256k1::{ ecdsa::{RecoverableSignature, RecoveryId}, @@ -27,34 +27,29 @@ use secp256k1::{ }; use std::fmt; +/// A recoverable ECDSA signature with `v` value in an `Electrum` notation. +/// +/// 'Electrum' notation signatures define `v` to be from the `{0; 1}` set. #[derive(Clone, Copy, PartialEq, Eq)] pub struct RawSignature([u8; 65]); impl RawSignature { + /// Create a recoverable signature for the provided digest using the private key. pub fn create_for_digest(private_key: PrivateKey, digest: Digest) -> Result { - let secp_secret_key = secp256k1::SecretKey::from_slice(&private_key.0) - .with_context(|| "Invalid secret key format for {:?}")?; - + let secp_secret_key = private_key.into(); let message = Message::from_digest(digest.into()); let recoverable = secp256k1::global::SECP256K1.sign_ecdsa_recoverable(&message, &secp_secret_key); - let (id, signature) = recoverable.serialize_compact(); - let mut bytes = [0u8; 65]; - bytes[..64].copy_from_slice(signature.as_ref()); - bytes[64] = i32::from(id) as u8; - Ok(RawSignature(bytes)) - } -} -impl TryFrom<&[u8]> for RawSignature { - type Error = Error; - - fn try_from(data: &[u8]) -> Result { - let bytes = <[u8; 65]>::try_from(data)?; + let mut ret = [0u8; 65]; + ret[..64].copy_from_slice(signature.as_ref()); + ret[64] = i32::from(id) + .try_into() + .expect("recovery id is within u8 range"); - Ok(RawSignature(bytes)) + Ok(RawSignature(ret)) } } @@ -72,66 +67,61 @@ impl AsRef<[u8]> for RawSignature { impl From for RawSignature { fn from(mut sig: Signature) -> RawSignature { - // TODO: Include chain id, as that's for transaction of pre-EIP-155 (!) + // TODO [sab]: Define parity in accordance to pre-EIP-155 and post-EIP-155 standards. sig.0[64] -= 27; RawSignature(sig.0) } } +/// A recoverable ECDSA signature type with any possible `v`. +/// +/// The signature can be in 'Electrum' notation, pre- or post- EIP-155 notations. #[derive(Clone, Copy, Encode, Decode, PartialEq, Eq)] pub struct Signature([u8; 65]); impl Signature { + /// Create a recoverable signature for the provided digest using the private key. pub fn create_for_digest(private_key: PrivateKey, digest: Digest) -> Result { let raw_signature = RawSignature::create_for_digest(private_key, digest)?; Ok(raw_signature.into()) } + /// Covert signature to hex string. pub fn to_hex(&self) -> String { hex::encode(self.0) } - pub fn verify(&self, digest: Digest) -> Result<()> { - let signature = (*self).try_into()?; - let public_key = self.recover_from_digest_with_signature(Some(signature), digest)?; - let secp256k1_pub_key = secp256k1::PublicKey::from_byte_array_compressed(&public_key.0)?; - let message = Message::from_digest(digest.0); - - secp256k1::global::SECP256K1 - .verify_ecdsa(&message, &signature.to_standard(), &secp256k1_pub_key) - .map_err(Into::into) + /// Verify the signature with public key recovery from the signature. + pub fn verify_with_public_key_recover(&self, digest: Digest) -> Result<()> { + let public_key = self.recover_from_digest(digest)?; + self.verify(public_key, digest) } + /// Recovers public key which was used to create the signature for the signed digest. pub fn recover_from_digest(&self, digest: Digest) -> Result { - self.recover_from_digest_with_signature(None, digest) - } - - fn recover_from_digest_with_signature( - &self, - signature: Option, - digest: Digest, - ) -> Result { - let signature = signature.unwrap_or((*self).try_into()?); + let signature: RecoverableSignature = (*self).try_into()?; signature .recover(&Message::from_digest(digest.0)) - .map(|pub_key| PublicKey::from_bytes(pub_key.serialize())) + .map(PublicKey::from) .map_err(Into::into) } -} - -impl TryFrom<&[u8]> for Signature { - type Error = Error; - fn try_from(data: &[u8]) -> Result { - let raw_signature = RawSignature::try_from(data)?; + /// Verifies the signature using the public key and digest possibly signed with + /// the public key. + pub fn verify(&self, public_key: PublicKey, digest: Digest) -> Result<()> { + let signature: RecoverableSignature = (*self).try_into()?; + let message = Message::from_digest(digest.0); + let secp256k1_public_key = public_key.into(); - Ok(raw_signature.into()) + secp256k1::global::SECP256K1 + .verify_ecdsa(&message, &signature.to_standard(), &secp256k1_public_key) + .map_err(Into::into) } } impl From for Signature { fn from(mut sig: RawSignature) -> Self { - // TODO: Include chain id, as that's for transaction of pre-EIP-155 (!) + // TODO [sab]: Include chain id, as that's for transaction of pre-EIP-155 (!) sig.0[64] += 27; Signature(sig.0) } @@ -143,6 +133,16 @@ impl From for [u8; 65] { } } +impl TryFrom<&[u8]> for Signature { + type Error = Error; + + fn try_from(data: &[u8]) -> Result { + <[u8; 65]>::try_from(data) + .map(Signature) + .map_err(Into::into) + } +} + impl AsRef<[u8]> for Signature { fn as_ref(&self) -> &[u8] { &self.0 diff --git a/ethexe/signer/src/utils.rs b/ethexe/signer/src/utils.rs new file mode 100644 index 00000000000..5e5e5caa0f6 --- /dev/null +++ b/ethexe/signer/src/utils.rs @@ -0,0 +1,34 @@ +// This file is part of Gear. +// +// Copyright (C) 2024 Gear Technologies Inc. +// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +use anyhow::{anyhow, Result}; + +pub(crate) fn decode_to_array(s: &str) -> Result<[u8; N]> { + let mut buf = [0; N]; + hex::decode_to_slice(strip_prefix(s), &mut buf) + .map_err(|_| anyhow!("invalid hex format for {s:?}"))?; + Ok(buf) +} + +pub(crate) fn strip_prefix(s: &str) -> &str { + if let Some(s) = s.strip_prefix("0x") { + s + } else { + s + } +} diff --git a/ethexe/tx-pool/src/lib.rs b/ethexe/tx-pool/src/lib.rs index d3780f46c00..587c56cfb5d 100644 --- a/ethexe/tx-pool/src/lib.rs +++ b/ethexe/tx-pool/src/lib.rs @@ -22,6 +22,7 @@ use anyhow::Result; use ethexe_signer::{Signature, ToDigest}; pub struct TranscationPool { + /// Txs store transactions: Vec, } @@ -63,7 +64,7 @@ impl Transaction for EthexeTransaction { let message_digest = raw_message.to_digest(); let signature = Signature::try_from(signature.as_ref())?; - signature.verify(message_digest) + signature.verify_with_public_key_recover(message_digest) } } } From 574983fc7100367bb66ce0a983eb9724522e063e Mon Sep 17 00:00:00 2001 From: Sabaun Taraki Date: Tue, 26 Nov 2024 21:40:50 +0300 Subject: [PATCH 4/9] Introduce initial logic for sending messages via tx pool --- Cargo.lock | 7 ++ Cargo.toml | 1 + ethexe/cli/Cargo.toml | 1 + ethexe/cli/src/service.rs | 40 +++++++++- ethexe/db/src/database.rs | 11 +++ ethexe/rpc/Cargo.toml | 3 +- ethexe/rpc/src/apis/mod.rs | 2 + ethexe/rpc/src/apis/tx_pool.rs | 82 ++++++++++++++++++++ ethexe/rpc/src/errors.rs | 4 + ethexe/rpc/src/lib.rs | 21 ++++- ethexe/signer/src/signature.rs | 6 +- ethexe/tx-pool/Cargo.toml | 7 +- ethexe/tx-pool/src/lib.rs | 97 ++++++++++++++--------- ethexe/tx-pool/src/service.rs | 124 ++++++++++++++++++++++++++++++ ethexe/tx-pool/src/tests.rs | 68 ++++++++++++++++ ethexe/tx-pool/src/transaction.rs | 80 +++++++++++++++++++ 16 files changed, 506 insertions(+), 48 deletions(-) create mode 100644 ethexe/rpc/src/apis/tx_pool.rs create mode 100644 ethexe/tx-pool/src/service.rs create mode 100644 ethexe/tx-pool/src/tests.rs create mode 100644 ethexe/tx-pool/src/transaction.rs diff --git a/Cargo.lock b/Cargo.lock index b28aaa0b502..ec558e7a9c0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4484,6 +4484,7 @@ dependencies = [ "ethexe-runtime-common", "ethexe-sequencer", "ethexe-signer", + "ethexe-tx-pool", "ethexe-utils", "ethexe-validator", "futures", @@ -4648,6 +4649,7 @@ dependencies = [ "ethexe-db", "ethexe-processor", "ethexe-runtime-common", + "ethexe-tx-pool", "futures", "gear-core", "gprimitives", @@ -4735,7 +4737,12 @@ name = "ethexe-tx-pool" version = "1.6.2" dependencies = [ "anyhow", + "ethexe-db", "ethexe-signer", + "gprimitives", + "log", + "parity-scale-codec", + "tokio", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 8ea6b3bf7b8..382dfb4e76b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -306,6 +306,7 @@ ethexe-utils = { path = "ethexe/utils", default-features = false } ethexe-validator = { path = "ethexe/validator", default-features = false } ethexe-rpc = { path = "ethexe/rpc", default-features = false } ethexe-common = { path = "ethexe/common" } +ethexe-tx-pool = { path = "ethexe/tx-pool", default-features = false } # Common executor between `sandbox-host` and `lazy-pages-fuzzer` wasmi = { package = "wasmi", version = "0.38"} diff --git a/ethexe/cli/Cargo.toml b/ethexe/cli/Cargo.toml index dee68571558..23cb1592264 100644 --- a/ethexe/cli/Cargo.toml +++ b/ethexe/cli/Cargo.toml @@ -27,6 +27,7 @@ ethexe-runtime-common.workspace = true ethexe-prometheus-endpoint.workspace = true ethexe-rpc.workspace = true ethexe-utils.workspace = true +ethexe-tx-pool.workspace = true gprimitives.workspace = true clap = { workspace = true, features = ["derive"] } diff --git a/ethexe/cli/src/service.rs b/ethexe/cli/src/service.rs index 2b4d039ed3e..1184c844889 100644 --- a/ethexe/cli/src/service.rs +++ b/ethexe/cli/src/service.rs @@ -36,6 +36,7 @@ use ethexe_observer::{RequestBlockData, RequestEvent}; use ethexe_processor::{LocalOutcome, ProcessorConfig}; use ethexe_sequencer::agro::AggregatedCommitments; use ethexe_signer::{Digest, PublicKey, Signature, Signer}; +use ethexe_tx_pool::{EthexeTransaction, TxPoolCore, TxPoolInputTaskSender, TxPoolService}; use ethexe_validator::BlockCommitmentValidationRequest; use futures::{future, stream::StreamExt, FutureExt}; use gprimitives::H256; @@ -64,6 +65,10 @@ pub struct Service { validator: Option, metrics_service: Option, rpc: Option, + tx_pool_service: Option<( + TxPoolService>, + TxPoolInputTaskSender, + )>, } // TODO: consider to move this to another module #4176 @@ -98,6 +103,8 @@ impl Service { let rocks_db = ethexe_db::RocksDatabase::open(config.database_path.clone())?; let db = ethexe_db::Database::from_one(&rocks_db, ethereum_router_address.0); + let tx_pool_artifacts = TxPoolService::new((db.clone(),)); + let observer = ethexe_observer::Observer::new( &config.ethereum_rpc, ethereum_router_address, @@ -193,10 +200,9 @@ impl Service { }) .transpose()?; - let rpc = config - .rpc_config - .as_ref() - .map(|config| ethexe_rpc::RpcService::new(config.clone(), db.clone())); + let rpc = config.rpc_config.as_ref().map(|config| { + ethexe_rpc::RpcService::new(config.clone(), db.clone(), tx_pool_artifacts.1.clone()) + }); Ok(Self { db, @@ -211,6 +217,7 @@ impl Service { metrics_service, rpc, block_time: config.block_time, + tx_pool_service: Some(tx_pool_artifacts), }) } @@ -237,6 +244,10 @@ impl Service { validator: Option, metrics_service: Option, rpc: Option, + tx_pool_service: Option<( + TxPoolService, + TxPoolInputTaskSender, + )>, ) -> Self { Self { db, @@ -251,6 +262,7 @@ impl Service { validator, metrics_service, rpc, + tx_pool_service, } } @@ -431,6 +443,7 @@ impl Service { mut validator, metrics_service, rpc, + tx_pool_service, block_time, } = self; @@ -465,6 +478,18 @@ impl Service { None }; + let (mut tx_pool_handle, mut tx_pool_input_task_sender) = + if let Some((tx_pool_service, tx_pool_input_task_sender)) = tx_pool_service { + log::info!("🚅 Tx pool service starting..."); + + ( + Some(tokio::spawn(tx_pool_service.run())), + Some(tx_pool_input_task_sender), + ) + } else { + (None, None) + }; + let mut roles = "Observer".to_string(); if let Some(seq) = sequencer.as_ref() { roles.push_str(&format!(", Sequencer ({})", seq.address())); @@ -539,6 +564,7 @@ impl Service { validator.as_mut(), sequencer.as_mut(), network_sender.as_mut(), + tx_pool_input_task_sender.as_mut(), ); if let Err(err) = result { @@ -571,6 +597,10 @@ impl Service { log::info!("`RPCWorker` has terminated, shutting down..."); break; } + _ = maybe_await(tx_pool_handle.as_mut()) => { + log::info!("`TxPoolService` has terminated, shutting down..."); + break; + } } } @@ -761,6 +791,8 @@ impl Service { maybe_validator: Option<&mut ethexe_validator::Validator>, maybe_sequencer: Option<&mut ethexe_sequencer::Sequencer>, maybe_network_sender: Option<&mut ethexe_network::NetworkSender>, + // TODO [sab]: + _maybe_tx_pool_input_task_sender: Option<&mut TxPoolInputTaskSender>, ) -> Result<()> { let message = NetworkMessage::decode(&mut data)?; match message { diff --git a/ethexe/db/src/database.rs b/ethexe/db/src/database.rs index 87ee5631588..00903192822 100644 --- a/ethexe/db/src/database.rs +++ b/ethexe/db/src/database.rs @@ -58,6 +58,7 @@ enum KeyPrefix { CodeValid = 10, BlockStartSchedule = 11, BlockEndSchedule = 12, + Transaction = 13, } impl KeyPrefix { @@ -444,6 +445,16 @@ impl Database { self.cas.write(data) } + // TODO [sab]: + // 1. Maybe concrete transaction type and encode it? + pub fn validated_transaction(&self, tx_hash: H256) -> Option> { + self.kv.get(&KeyPrefix::Transaction.one(tx_hash)) + } + + pub fn set_validated_transaction(&self, tx_hash: H256, tx: Vec) { + self.kv.put(&KeyPrefix::Transaction.one(tx_hash), tx); + } + fn block_small_meta(&self, block_hash: H256) -> Option { self.kv .get(&KeyPrefix::BlockSmallMeta.two(self.router_address, block_hash)) diff --git a/ethexe/rpc/Cargo.toml b/ethexe/rpc/Cargo.toml index 8cf8f0bf0c2..f1f9e2ab0b5 100644 --- a/ethexe/rpc/Cargo.toml +++ b/ethexe/rpc/Cargo.toml @@ -10,12 +10,13 @@ repository.workspace = true # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -tokio = { workspace = true } +tokio = { workspace = true, features = ["sync"] } anyhow.workspace = true futures.workspace = true gprimitives = { workspace = true, features = ["serde"] } ethexe-db.workspace = true ethexe-processor.workspace = true +ethexe-tx-pool.workspace = true jsonrpsee = { version = "0.24", features = ["server", "macros"] } tower = { version = "0.4.13", features = ["full"] } hyper = { version = "1.4.1", features = ["server"] } diff --git a/ethexe/rpc/src/apis/mod.rs b/ethexe/rpc/src/apis/mod.rs index d516efc0d5d..92d9bf8e27a 100644 --- a/ethexe/rpc/src/apis/mod.rs +++ b/ethexe/rpc/src/apis/mod.rs @@ -18,6 +18,8 @@ mod block; mod program; +mod tx_pool; pub use block::{BlockApi, BlockServer}; pub use program::{ProgramApi, ProgramServer}; +pub use tx_pool::{TransactionPoolApi, TransactionPoolServer}; diff --git a/ethexe/rpc/src/apis/tx_pool.rs b/ethexe/rpc/src/apis/tx_pool.rs new file mode 100644 index 00000000000..a6ce471a1de --- /dev/null +++ b/ethexe/rpc/src/apis/tx_pool.rs @@ -0,0 +1,82 @@ +// This file is part of Gear. +// +// Copyright (C) 2024 Gear Technologies Inc. +// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +//! Transaction pool rpc interface. + +use crate::errors; +use ethexe_tx_pool::{EthexeTransaction, InputTask, TxPoolInputTaskSender}; +use jsonrpsee::{ + core::{async_trait, RpcResult}, + proc_macros::rpc, +}; +use tokio::sync::oneshot; + +#[rpc(server)] +pub trait TransactionPool { + #[method(name = "transactionPool_sendMessage")] + async fn send_message(&self, raw_message: Vec, signature: Vec) -> RpcResult<()>; +} + +#[derive(Clone)] +pub struct TransactionPoolApi { + tx_pool_task_sender: TxPoolInputTaskSender, +} + +impl TransactionPoolApi { + pub fn new(tx_pool_task_sender: TxPoolInputTaskSender) -> Self { + Self { + tx_pool_task_sender, + } + } +} + +/* +Test case for raw rpc call with websocat: +transactionPool_sendMessage {"raw_message": [104, 101, 108, 108, 111, 95, 119, 111, 114, 108, 100], "signature": [89, 105, 98, 73, 40, 32, 213, 19, 244, 171, 227, 144, 199, 56, 94, 1, 223, 229, 208, 245, 103, 132, 35, 75, 99, 195, 70, 169, 1, 48, 250, 219, 100, 79, 7, 240, 253, 122, 22, 12, 87, 45, 173, 191, 36, 72, 136, 222, 57, 6, 55, 244, 196, 125, 135, 250, 237, 70, 116, 65, 46, 175, 75, 37, 27]} +*/ + +#[async_trait] +impl TransactionPoolServer for TransactionPoolApi { + async fn send_message(&self, raw_message: Vec, signature: Vec) -> RpcResult<()> { + log::debug!("Called send_transaction with vars: raw_message - {raw_message:?}, signature - {signature:?}"); + + let (response_sender, response_receiver) = oneshot::channel(); + let input_task = InputTask::AddTransaction { + transaction: EthexeTransaction::Message { + raw_message, + signature, + }, + response_sender, + }; + + self.tx_pool_task_sender.send(input_task).map_err(|e| { + log::error!( + "Failed to send tx pool input task: {e}. \ + The receiving end in the tx pool might have been dropped." + ); + errors::internal() + })?; + + let res = response_receiver.await.map_err(|e| { + log::error!("Failed to receive tx pool response: {e}"); + errors::internal() + })?; + + res.map_err(errors::tx_pool) + } +} diff --git a/ethexe/rpc/src/errors.rs b/ethexe/rpc/src/errors.rs index 49b4e786166..fb46db47deb 100644 --- a/ethexe/rpc/src/errors.rs +++ b/ethexe/rpc/src/errors.rs @@ -29,3 +29,7 @@ pub fn runtime(err: anyhow::Error) -> ErrorObject<'static> { pub fn internal() -> ErrorObject<'static> { ErrorObject::owned(8000, "Internal error", None::<&str>) } + +pub fn tx_pool(err: anyhow::Error) -> ErrorObject<'static> { + ErrorObject::owned(8000, "Transaction pool error", Some(format!("{err}"))) +} diff --git a/ethexe/rpc/src/lib.rs b/ethexe/rpc/src/lib.rs index c26aeb12b29..2c2919010d7 100644 --- a/ethexe/rpc/src/lib.rs +++ b/ethexe/rpc/src/lib.rs @@ -17,8 +17,11 @@ // along with this program. If not, see . use anyhow::anyhow; -use apis::{BlockApi, BlockServer, ProgramApi, ProgramServer}; +use apis::{ + BlockApi, BlockServer, ProgramApi, ProgramServer, TransactionPoolApi, TransactionPoolServer, +}; use ethexe_db::Database; +use ethexe_tx_pool::{EthexeTransaction, TxPoolInputTaskSender}; use futures::FutureExt; use jsonrpsee::{ server::{ @@ -52,11 +55,20 @@ pub struct RpcConfig { pub struct RpcService { config: RpcConfig, db: Database, + tx_pool_task_sender: TxPoolInputTaskSender, } impl RpcService { - pub fn new(config: RpcConfig, db: Database) -> Self { - Self { config, db } + pub fn new( + config: RpcConfig, + db: Database, + tx_pool_task_sender: TxPoolInputTaskSender, + ) -> Self { + Self { + config, + db, + tx_pool_task_sender, + } } pub const fn port(&self) -> u16 { @@ -70,6 +82,9 @@ impl RpcService { let mut module = JsonrpcModule::new(()); module.merge(ProgramServer::into_rpc(ProgramApi::new(self.db.clone())))?; module.merge(BlockServer::into_rpc(BlockApi::new(self.db.clone())))?; + module.merge(TransactionPoolServer::into_rpc(TransactionPoolApi::new( + self.tx_pool_task_sender, + )))?; let (stop_handle, server_handle) = stop_channel(); diff --git a/ethexe/signer/src/signature.rs b/ethexe/signer/src/signature.rs index ec3810a5bee..7297b5de595 100644 --- a/ethexe/signer/src/signature.rs +++ b/ethexe/signer/src/signature.rs @@ -136,10 +136,8 @@ impl From for [u8; 65] { impl TryFrom<&[u8]> for Signature { type Error = Error; - fn try_from(data: &[u8]) -> Result { - <[u8; 65]>::try_from(data) - .map(Signature) - .map_err(Into::into) + fn try_from(mut data: &[u8]) -> Result { + Decode::decode(&mut data).map_err(Into::into) } } diff --git a/ethexe/tx-pool/Cargo.toml b/ethexe/tx-pool/Cargo.toml index 448f8b5aa97..1cef642acdb 100644 --- a/ethexe/tx-pool/Cargo.toml +++ b/ethexe/tx-pool/Cargo.toml @@ -10,4 +10,9 @@ rust-version.workspace = true [dependencies] anyhow.workspace = true -ethexe-signer.workspace = true \ No newline at end of file +ethexe-signer.workspace = true +ethexe-db.workspace = true +log.workspace = true +parity-scale-codec = { workspace = true, features = ["std", "derive"] } +tokio = { workspace = true, features = ["sync"] } +gprimitives = { workspace = true, features = ["codec"] } \ No newline at end of file diff --git a/ethexe/tx-pool/src/lib.rs b/ethexe/tx-pool/src/lib.rs index 587c56cfb5d..88839a29e2c 100644 --- a/ethexe/tx-pool/src/lib.rs +++ b/ethexe/tx-pool/src/lib.rs @@ -18,54 +18,81 @@ //! Ethexe transaction pool. -use anyhow::Result; -use ethexe_signer::{Signature, ToDigest}; +mod service; +mod transaction; -pub struct TranscationPool { - /// Txs store - transactions: Vec, -} +#[cfg(test)] +mod tests; -impl TranscationPool { - pub fn new() -> Self { - Self { - transactions: Vec::new(), - } - } +pub use service::{InputTask, TxPoolInputTaskSender, TxPoolService}; +pub use transaction::{EthexeTransaction, Transaction}; + +// TODO [sab] decide on tx pool channel size + +use anyhow::{anyhow, Result}; +use ethexe_db::{Database, MemDb}; +use ethexe_signer::ToDigest; +use parity_scale_codec::Encode; +use std::{fmt::Debug, marker::PhantomData}; +use tokio::sync::{mpsc, oneshot}; + +/// Transaction pool trait. +// TODO [sab] define type of hashes and signatures for the tx pool +pub trait TxPoolTrait { + /// Transaction type. + type Transaction: Transaction; + + /// Add transaction to the pool. + // TODO [sab] maybe take error from Transaction? + fn add_transaction(&self, transaction: Self::Transaction) -> Result<()>; } -impl TranscationPool { - pub fn add_new_transaction(&mut self, tx: Tx) -> Result<(), Tx::Error> { - tx.validate().map(|_| self.transactions.push(tx)) +impl TxPoolTrait for () { + type Transaction = (); + + fn add_transaction(&self, _transaction: Self::Transaction) -> Result<()> { + Ok(()) } } -pub trait Transaction { - type Error; - fn validate(&self) -> Result<(), Self::Error>; +pub struct TxPoolCore { + // TODO [sab] trait for tx pool db? + db: Database, + _phantom: PhantomData, } -pub enum EthexeTransaction { - Message { - raw_message: Vec, - signature: Vec, - }, +impl TxPoolCore { + pub fn new(db: Database) -> Self { + Self { + db, + _phantom: PhantomData, + } + } } -impl Transaction for EthexeTransaction { - type Error = anyhow::Error; +impl TxPoolTrait for TxPoolCore +where + Tx: Transaction + Encode, + Tx::Error: Into, +{ + type Transaction = Tx; - fn validate(&self) -> Result<(), Self::Error> { - match self { - EthexeTransaction::Message { - raw_message, - signature, - } => { - let message_digest = raw_message.to_digest(); - let signature = Signature::try_from(signature.as_ref())?; + fn add_transaction(&self, transaction: Self::Transaction) -> Result<()> { + let tx_bytes = transaction.encode(); + let tx_hash = transaction.tx_hash(); - signature.verify_with_public_key_recover(message_digest) - } + // TODO [sab] if transactione exists - send info, that already exists to ban the spammer + if self.db.validated_transaction(tx_hash).is_none() { + transaction.validate().map_err(Into::into)?; + self.db.set_validated_transaction(tx_hash, tx_bytes); } + + Ok(()) + } +} + +impl From<(Database,)> for TxPoolCore { + fn from((db,): (Database,)) -> Self { + TxPoolCore::new(db) } } diff --git a/ethexe/tx-pool/src/service.rs b/ethexe/tx-pool/src/service.rs new file mode 100644 index 00000000000..140f8dcf1ab --- /dev/null +++ b/ethexe/tx-pool/src/service.rs @@ -0,0 +1,124 @@ +// This file is part of Gear. +// +// Copyright (C) 2024 Gear Technologies Inc. +// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +//! Transaction pool io. + +pub use input::{InputTask, TxPoolInputTaskSender}; + +use crate::{Transaction, TxPoolTrait}; +use input::TxPoolInputTaskReceiver; +use tokio::sync::mpsc; + +/// Transaction pool service. +/// +/// Serves as an interface for the transaction pool core. +pub struct TxPoolService> { + core: TxPool, + input_interface: TxPoolInputTaskReceiver, +} + +impl> TxPoolService { + pub fn new(tx_pool_core: impl Into) -> (Self, TxPoolInputTaskSender) { + let tx_pool_core = tx_pool_core.into(); + let (tx, rx) = mpsc::unbounded_channel(); + + let tx_pool_interface = Self { + core: tx_pool_core, + input_interface: TxPoolInputTaskReceiver { receiver: rx }, + }; + + (tx_pool_interface, TxPoolInputTaskSender { sender: tx }) + } + + /// Runs transaction pool service expecting to receive tasks from the + /// tx pool input task sender. + pub async fn run(mut self) { + while let Some(task) = self.input_interface.recv().await { + match task { + InputTask::AddTransaction { + transaction, + response_sender, + } => { + if response_sender + .send(self.core.add_transaction(transaction)) + .is_err() + { + log::debug!("`AddTransaction` task receiver dropped.") + } + } + } + } + } +} + +// TODO [sab] move I\O to the other crate so ethexe-rpc is lighter +mod input { + use anyhow::Result; + use std::ops::{Deref, DerefMut}; + use tokio::sync::{mpsc, oneshot}; + + /// Input task for the transaction pool service. + /// + /// The task is later processed to be executed by + /// the [`crate::TxPool`] implementation. + pub enum InputTask { + /// Request for adding the transaction to the transaction pool. + /// Sends the response back to the task sender. + AddTransaction { + transaction: Tx, + response_sender: oneshot::Sender>, + }, + } + + #[derive(Debug, Clone)] + pub struct TxPoolInputTaskSender { + pub(crate) sender: mpsc::UnboundedSender>, + } + + impl Deref for TxPoolInputTaskSender { + type Target = mpsc::UnboundedSender>; + + fn deref(&self) -> &Self::Target { + &self.sender + } + } + + impl DerefMut for TxPoolInputTaskSender { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.sender + } + } + + pub(crate) struct TxPoolInputTaskReceiver { + pub(crate) receiver: mpsc::UnboundedReceiver>, + } + + impl Deref for TxPoolInputTaskReceiver { + type Target = mpsc::UnboundedReceiver>; + + fn deref(&self) -> &Self::Target { + &self.receiver + } + } + + impl DerefMut for TxPoolInputTaskReceiver { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.receiver + } + } +} diff --git a/ethexe/tx-pool/src/tests.rs b/ethexe/tx-pool/src/tests.rs new file mode 100644 index 00000000000..11dea5bc40c --- /dev/null +++ b/ethexe/tx-pool/src/tests.rs @@ -0,0 +1,68 @@ +// This file is part of Gear. +// +// Copyright (C) 2024 Gear Technologies Inc. +// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +use crate::{EthexeTransaction, Transaction, TxPoolCore, TxPoolTrait}; +use ethexe_db::{Database, MemDb}; +use ethexe_signer::{PrivateKey, PublicKey, Signature, Signer, ToDigest}; +use parity_scale_codec::{Decode, Encode}; +use std::str::FromStr; + +const PRIVATE_KEY: &str = "4c0883a69102937d6231471b5dbb6204fe51296170827936ea5cce4b76994b0f"; + +fn prepare_keys() -> (Signer, PublicKey) { + let signer = Signer::tmp(); + + let public_key = signer + .add_key(PrivateKey::from_str(PRIVATE_KEY).expect("invalid private key")) + .expect("key addition failed"); + + (signer, public_key) +} + +#[test] +fn test_add_transaction_tx_pool_core() { + let (signer, public_key) = prepare_keys(); + let db = Database::from_one(&MemDb::default(), Default::default()); + let tx_pool = TxPoolCore::::new(db.clone()); + + let message = b"hello_world"; + println!("raw message bytes {message:?}"); + // sha3 hash of the data + let message_digest = message.to_digest(); + let signature = signer + .sign_digest(public_key, message_digest) + .expect("signing failed"); + println!("signature bytes {:?}", signature.encode()); + + let tx = EthexeTransaction::Message { + raw_message: message.to_vec(), + signature: signature.encode(), + }; + let tx_hash = tx.tx_hash(); + + // Check adding doesn't fail + assert!(tx_pool.add_transaction(tx.clone()).is_ok()); + + // Check transaction is in the db + let db_data = db.validated_transaction(tx_hash); + assert!(db_data.is_some()); + + // Check actual db data + let db_tx = EthexeTransaction::decode(&mut db_data.unwrap().as_ref()).expect("decoding failed"); + assert_eq!(db_tx, tx); +} diff --git a/ethexe/tx-pool/src/transaction.rs b/ethexe/tx-pool/src/transaction.rs new file mode 100644 index 00000000000..826d4f749a5 --- /dev/null +++ b/ethexe/tx-pool/src/transaction.rs @@ -0,0 +1,80 @@ +// This file is part of Gear. +// +// Copyright (C) 2024 Gear Technologies Inc. +// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +//! Tx pool transaction related types. + +use ethexe_signer::{Signature, ToDigest}; +use gprimitives::H256; +use parity_scale_codec::{Decode, Encode}; + +/// Ethexe transaction behaviour. +pub trait Transaction { + /// Error type for the trait operations. + type Error; + + /// Validate transaction. + fn validate(&self) -> Result<(), Self::Error>; + + /// Get transaction hash. + fn tx_hash(&self) -> H256; +} + +impl Transaction for () { + type Error = anyhow::Error; + + fn validate(&self) -> Result<(), Self::Error> { + Ok(()) + } + + fn tx_hash(&self) -> H256 { + H256::zero() + } +} + +/// Main ethexe transaction type. +#[derive(Debug, Clone, Encode, Decode, PartialEq, Eq)] +pub enum EthexeTransaction { + /// Message send transaction + /// **TEMPORARY**. + Message { + raw_message: Vec, + signature: Vec, + }, +} + +impl Transaction for EthexeTransaction { + type Error = anyhow::Error; + + fn validate(&self) -> Result<(), Self::Error> { + match self { + EthexeTransaction::Message { + raw_message, + signature, + } => { + let message_digest = raw_message.to_digest(); + let signature = Signature::try_from(signature.as_ref())?; + + signature.verify_with_public_key_recover(message_digest) + } + } + } + + fn tx_hash(&self) -> H256 { + ethexe_db::hash(&self.encode()) + } +} From 627575b621127667c0cde49596ebd29744a63855 Mon Sep 17 00:00:00 2001 From: Sabaun Taraki Date: Wed, 27 Nov 2024 21:01:21 +0300 Subject: [PATCH 5/9] Introduce receiving and gossiping txs --- Cargo.lock | 2 + ethexe/cli/Cargo.toml | 2 + ethexe/cli/src/service.rs | 74 +++++++++++++++----- ethexe/cli/src/tests.rs | 121 ++++++++++++++++++++++++++++++++- ethexe/network/src/lib.rs | 40 +++++++---- ethexe/rpc/src/apis/tx_pool.rs | 4 +- ethexe/tx-pool/src/lib.rs | 8 ++- ethexe/tx-pool/src/service.rs | 106 ++++++++++++++++++++++++++--- 8 files changed, 308 insertions(+), 49 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index ec558e7a9c0..5b6f708dfa8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4497,7 +4497,9 @@ dependencies = [ "ntest", "parity-scale-codec", "rand", + "reqwest", "serde", + "serde_json", "static_init", "tempfile", "tokio", diff --git a/ethexe/cli/Cargo.toml b/ethexe/cli/Cargo.toml index 23cb1592264..66a530aa13a 100644 --- a/ethexe/cli/Cargo.toml +++ b/ethexe/cli/Cargo.toml @@ -71,6 +71,8 @@ alloy = { workspace = true, features = [ ntest = "0.9.3" gear-core.workspace = true gear-utils.workspace = true +reqwest.workspace = true +serde_json.workspace = true demo-ping = { workspace = true, features = ["debug", "ethexe"] } demo-async = { workspace = true, features = ["debug", "ethexe"] } diff --git a/ethexe/cli/src/service.rs b/ethexe/cli/src/service.rs index 1184c844889..8b6d5ce450e 100644 --- a/ethexe/cli/src/service.rs +++ b/ethexe/cli/src/service.rs @@ -36,7 +36,10 @@ use ethexe_observer::{RequestBlockData, RequestEvent}; use ethexe_processor::{LocalOutcome, ProcessorConfig}; use ethexe_sequencer::agro::AggregatedCommitments; use ethexe_signer::{Digest, PublicKey, Signature, Signer}; -use ethexe_tx_pool::{EthexeTransaction, TxPoolCore, TxPoolInputTaskSender, TxPoolService}; +use ethexe_tx_pool::{ + EthexeTransaction, InputTask, OutputTask, TxPoolCore, TxPoolInputTaskSender, TxPoolService, + TxPoolServiceArtifacts, +}; use ethexe_validator::BlockCommitmentValidationRequest; use futures::{future, stream::StreamExt, FutureExt}; use gprimitives::H256; @@ -65,10 +68,8 @@ pub struct Service { validator: Option, metrics_service: Option, rpc: Option, - tx_pool_service: Option<( - TxPoolService>, - TxPoolInputTaskSender, - )>, + tx_pool_service: + Option>>, } // TODO: consider to move this to another module #4176 @@ -86,6 +87,9 @@ pub enum NetworkMessage { codes: Option<(Digest, Signature)>, blocks: Option<(Digest, Signature)>, }, + Transaction { + transaction: EthexeTransaction, + }, } impl Service { @@ -244,10 +248,9 @@ impl Service { validator: Option, metrics_service: Option, rpc: Option, - tx_pool_service: Option<( - TxPoolService, - TxPoolInputTaskSender, - )>, + tx_pool_service: Option< + TxPoolServiceArtifacts>, + >, ) -> Self { Self { db, @@ -478,16 +481,19 @@ impl Service { None }; - let (mut tx_pool_handle, mut tx_pool_input_task_sender) = - if let Some((tx_pool_service, tx_pool_input_task_sender)) = tx_pool_service { + let (mut tx_pool_handle, mut tx_pool_input_task_sender, mut tx_pool_ouput_task_receiver) = + if let Some((tx_pool_service, tx_pool_input_task_sender, tx_pool_ouput_task_receiver)) = + tx_pool_service + { log::info!("🚅 Tx pool service starting..."); ( Some(tokio::spawn(tx_pool_service.run())), Some(tx_pool_input_task_sender), + Some(tx_pool_ouput_task_receiver), ) } else { - (None, None) + (None, None, None) }; let mut roles = "Observer".to_string(); @@ -589,6 +595,9 @@ impl Service { _ => {} } } + Some(task) = maybe_await(tx_pool_ouput_task_receiver.as_mut().map(|rx| rx.recv())) => { + Self::process_tx_pool_output_task(task, network_sender.as_mut()); + } _ = maybe_await(network_handle.as_mut()) => { log::info!("`NetworkWorker` has terminated, shutting down..."); break; @@ -646,7 +655,7 @@ impl Service { if let Some(network_sender) = maybe_network_sender { log::debug!("Publishing commitments to network..."); - network_sender.publish_message( + network_sender.publish_commitment( NetworkMessage::PublishCommitments { codes: aggregated_codes.clone(), blocks: aggregated_blocks.clone(), @@ -735,7 +744,7 @@ impl Service { codes: code_requests.clone(), blocks: block_requests.clone(), }; - network_sender.publish_message(message.encode()); + network_sender.publish_commitment(message.encode()); } if let Some(validator) = maybe_validator { @@ -791,8 +800,7 @@ impl Service { maybe_validator: Option<&mut ethexe_validator::Validator>, maybe_sequencer: Option<&mut ethexe_sequencer::Sequencer>, maybe_network_sender: Option<&mut ethexe_network::NetworkSender>, - // TODO [sab]: - _maybe_tx_pool_input_task_sender: Option<&mut TxPoolInputTaskSender>, + maybe_tx_pool_input_task_sender: Option<&mut TxPoolInputTaskSender>, ) -> Result<()> { let message = NetworkMessage::decode(&mut data)?; match message { @@ -829,7 +837,7 @@ impl Service { .transpose()?; let message = NetworkMessage::ApproveCommitments { codes, blocks }; - network_sender.publish_message(message.encode()); + network_sender.publish_commitment(message.encode()); Ok(()) } @@ -846,6 +854,20 @@ impl Service { sequencer.receive_blocks_signature(digest, signature)?; } + Ok(()) + } + NetworkMessage::Transaction { transaction } => { + // TODO [sab] check if it should be unwrapped + let Some(tx_pool_input_task_sender) = maybe_tx_pool_input_task_sender else { + return Ok(()); + }; + + // TODO [sab] handle result + let _ = tx_pool_input_task_sender.send(InputTask::AddTransaction { + transaction: transaction, + response_sender: None, + }); + Ok(()) } } @@ -874,6 +896,24 @@ impl Service { Ok(true) } + + fn process_tx_pool_output_task( + task: Option>, + mut maybe_network_sender: Option<&mut ethexe_network::NetworkSender>, + ) { + let Some(task) = task else { + + }; + match task { + OutputTask::PropogateTransaction { transaction } => { + if let Some(network_sender) = maybe_network_sender.as_mut() { + log::debug!("Publishing transaction to network..."); + network_sender + .publish_transaction(NetworkMessage::Transaction { transaction }.encode()); + } + } + } + } } mod utils { diff --git a/ethexe/cli/src/tests.rs b/ethexe/cli/src/tests.rs index 8618be28953..538df934cd8 100644 --- a/ethexe/cli/src/tests.rs +++ b/ethexe/cli/src/tests.rs @@ -35,13 +35,14 @@ use ethexe_processor::Processor; use ethexe_runtime_common::state::{Storage, ValueWithExpiry}; use ethexe_sequencer::Sequencer; use ethexe_signer::Signer; +use ethexe_tx_pool::{EthexeTransaction, Transaction}; use ethexe_validator::Validator; use gear_core::{ ids::prelude::*, message::{ReplyCode, SuccessReplyReason}, }; use gprimitives::{ActorId, CodeId, MessageId, H160, H256}; -use parity_scale_codec::Encode; +use parity_scale_codec::{Decode, Encode}; use std::{ collections::{BTreeMap, BTreeSet}, sync::Arc, @@ -789,9 +790,88 @@ async fn multiple_validators() { assert_eq!(res.reply_payload, res.message_id.encode().as_slice()); } +#[tokio::test(flavor = "multi_thread")] +#[ntest::timeout(60_000)] +async fn tx_pool_gossip() { + gear_utils::init_default_logger(); + + // Setup env of 2 nodes, one of them knows about the other one. + let mut env = TestEnv::new(TestEnvConfig::default().validators_amount(3)) + .await + .unwrap(); + + log::info!("📗 Starting node 0"); + let mut node0 = env.new_node( + NodeConfig::default() + .validator(env.validators[0]) + // TODO [sab] run only pool, but not network + .service_rpc(9506) + .network(None, None), + ); + node0.start_service().await; + + log::info!("📗 Starting node 1"); + let mut node1 = env.new_node( + NodeConfig::default() + .validator(env.validators[1]) + .service_rpc(9505) + .network(None, node0.multiaddr.clone()), + ); + node1.start_service().await; + + // Prepare tx data + let raw_message = b"hello world".to_vec(); + let signature = env.signer + .sign(env.validators[2], &raw_message) + .expect("failed signing message"); + let signature_bytes = signature.encode(); + + // Send request + log::info!("Sending tx pool request"); + send_json_request(node1.service_rpc_url().expect("rpc server is set"), || { + serde_json::json!({ + "jsonrpc": "2.0", + "method": "transactionPool_sendMessage", + "params": { + "raw_message": raw_message, + "signature": signature_bytes.clone(), + }, + "id": 1, + }) + }).await; + + tokio::time::sleep(Duration::from_secs(5)).await; + + // Check that node0 received the message + let tx = EthexeTransaction::Message { raw_message, signature: signature_bytes }; + let tx_hash = tx.tx_hash(); + + let tx_data = node0.db.validated_transaction(tx_hash).expect("tx not found"); + let node0_db_tx: EthexeTransaction = Decode::decode(&mut &tx_data[..]).expect("failed to decode tx"); + assert_eq!(node0_db_tx, tx); +} + +async fn send_json_request(rpc_server_url: String, create_request: impl Fn() -> serde_json::Value) -> Result<> { + let client = reqwest::Client::new(); + let req_body = create_request(); + + let resp = client + .post(rpc_server_url) + .json(&req_body) + .send() + .await + .unwrap_or_else(|err| panic!("Failed to send request: {err:?}")); + + assert!(resp.status().is_success()) +} + mod utils { + use std::net::SocketAddr; + use super::*; use ethexe_observer::SimpleBlockData; + use ethexe_rpc::{RpcConfig, RpcService}; + use ethexe_tx_pool::TxPoolService; use futures::StreamExt; use gear_core::message::ReplyCode; use tokio::sync::{broadcast::Sender, Mutex}; @@ -931,6 +1011,7 @@ mod utils { sequencer_public_key, validator_public_key, network, + service_rpc, } = config; let db = @@ -962,6 +1043,7 @@ mod utils { validator_public_key, network_address, network_bootstrap_address, + service_rpc, } } @@ -1093,6 +1175,8 @@ mod utils { pub validator_public_key: Option, /// Network configuration, if provided then new node starts with network. pub network: Option, + /// RPC configuration, if provided then new node starts with RPC service. + pub service_rpc: Option, } impl NodeConfig { @@ -1122,6 +1206,18 @@ mod utils { }); self } + + pub fn service_rpc(mut self, rpc_port: u16) -> Self { + let service_rpc = RpcConfig { + listen_addr: SocketAddr::new( + "127.0.0.1".parse().unwrap(), + rpc_port + ) + }; + self.service_rpc = Some(service_rpc); + + self + } } #[derive(Default)] @@ -1255,6 +1351,7 @@ mod utils { validator_public_key: Option, network_address: Option, network_bootstrap_address: Option, + service_rpc: Option, } impl Node { @@ -1325,6 +1422,19 @@ mod utils { None => None, }; + let (rpc, tx_pool) = { + match self.service_rpc.as_ref() { + Some(rpc_config) => { + let tx_pool_artifacts = TxPoolService::new((self.db.clone(),)); + + let rpc = RpcService::new(rpc_config.clone(), self.db.clone(), tx_pool_artifacts.1.clone()); + + (Some(rpc), Some(tx_pool_artifacts)) + }, + None => (None, None) + } + }; + let service = Service::new_from_parts( self.db.clone(), self.observer.clone(), @@ -1337,7 +1447,8 @@ mod utils { sequencer, validator, None, - None, + rpc, + tx_pool ); let handle = task::spawn(service.run()); @@ -1356,6 +1467,12 @@ mod utils { let _ = handle.await; self.multiaddr = None; } + + pub fn service_rpc_url(&self) -> Option { + self.service_rpc.as_ref().map(|rpc| + format!("http://{}", rpc.listen_addr.to_string()) + ) + } } #[derive(Clone)] diff --git a/ethexe/network/src/lib.rs b/ethexe/network/src/lib.rs index a7d7c39116f..2ae063e51ef 100644 --- a/ethexe/network/src/lib.rs +++ b/ethexe/network/src/lib.rs @@ -33,7 +33,8 @@ use libp2p::{ connection_limits, core::{muxing::StreamMuxerBox, upgrade}, futures::StreamExt, - gossipsub, identify, identity, kad, mdns, + gossipsub::{self, IdentTopic}, + identify, identity, kad, mdns, multiaddr::Protocol, ping, swarm::{ @@ -120,7 +121,7 @@ impl NetworkService { #[derive(Debug)] enum NetworkSenderEvent { - PublishMessage { data: Vec }, + PublishMessage { data: Vec, topic: IdentTopic }, RequestDbData(db_sync::Request), RequestValidated(Result), } @@ -136,13 +137,21 @@ impl NetworkSender { (Self { tx }, rx) } + pub fn publish_transaction(&self, data: impl Into>) { + let _res = self.tx.send(NetworkSenderEvent::PublishMessage { + data: data.into(), + topic: tx_topic(), + }); + } + // TODO: consider to append salt here to be sure that message is unique. // This is important for the cases of malfunctions in ethexe, when the same message // needs to be sent again #4255 - pub fn publish_message(&self, data: impl Into>) { - let _res = self - .tx - .send(NetworkSenderEvent::PublishMessage { data: data.into() }); + pub fn publish_commitment(&self, data: impl Into>) { + let _res = self.tx.send(NetworkSenderEvent::PublishMessage { + data: data.into(), + topic: gpu_commitments_topic(), + }); } pub fn request_db_data(&self, request: db_sync::Request) { @@ -431,7 +440,7 @@ impl NetworkEventLoop { topic, }, .. - }) if gpu_commitments_topic().hash() == topic => { + }) if gpu_commitments_topic().hash() == topic || tx_topic().hash() == topic => { let _res = self .external_tx .send(NetworkReceiverEvent::Message { source, data }); @@ -475,13 +484,8 @@ impl NetworkEventLoop { fn handle_network_rx_event(&mut self, event: NetworkSenderEvent) { match event { - NetworkSenderEvent::PublishMessage { data } => { - if let Err(e) = self - .swarm - .behaviour_mut() - .gossipsub - .publish(gpu_commitments_topic(), data) - { + NetworkSenderEvent::PublishMessage { data, topic } => { + if let Err(e) = self.swarm.behaviour_mut().gossipsub.publish(topic, data) { log::debug!("gossipsub publishing failed: {e}") } } @@ -577,6 +581,7 @@ impl Behaviour { .map_err(|e| anyhow!("`gossipsub` scoring parameters error: {e}"))?; gossipsub.subscribe(&gpu_commitments_topic())?; + gossipsub.subscribe(&tx_topic())?; let db_sync = db_sync::Behaviour::new(db_sync::Config::default(), peer_score_handle, db); @@ -599,6 +604,10 @@ fn gpu_commitments_topic() -> gossipsub::IdentTopic { gossipsub::IdentTopic::new("gpu-commitments") } +fn tx_topic() -> gossipsub::IdentTopic { + gossipsub::IdentTopic::new("tx") +} + #[cfg(test)] mod tests { use super::*; @@ -612,6 +621,7 @@ mod tests { let tmp_dir1 = tempfile::tempdir().unwrap(); let config = NetworkEventLoopConfig::new_memory(tmp_dir1.path().to_path_buf(), "/memory/1"); + let signer1 = ethexe_signer::Signer::new(tmp_dir1.path().join("key")).unwrap(); let db = Database::from_one(&MemDb::default(), [0; 20]); let service1 = NetworkService::new(config.clone(), &signer1, db).unwrap(); @@ -640,7 +650,7 @@ mod tests { // Send a commitment from service1 let commitment_data = b"test commitment".to_vec(); - sender.publish_message(commitment_data.clone()); + sender.publish_commitment(commitment_data.clone()); let mut receiver = service2.receiver; diff --git a/ethexe/rpc/src/apis/tx_pool.rs b/ethexe/rpc/src/apis/tx_pool.rs index a6ce471a1de..1e4112bdc07 100644 --- a/ethexe/rpc/src/apis/tx_pool.rs +++ b/ethexe/rpc/src/apis/tx_pool.rs @@ -53,7 +53,7 @@ transactionPool_sendMessage {"raw_message": [104, 101, 108, 108, 111, 95, 119, 1 #[async_trait] impl TransactionPoolServer for TransactionPoolApi { async fn send_message(&self, raw_message: Vec, signature: Vec) -> RpcResult<()> { - log::debug!("Called send_transaction with vars: raw_message - {raw_message:?}, signature - {signature:?}"); + log::debug!("Called send_message with vars: raw_message - {raw_message:?}, signature - {signature:?}"); let (response_sender, response_receiver) = oneshot::channel(); let input_task = InputTask::AddTransaction { @@ -61,7 +61,7 @@ impl TransactionPoolServer for TransactionPoolApi { raw_message, signature, }, - response_sender, + response_sender: Some(response_sender), }; self.tx_pool_task_sender.send(input_task).map_err(|e| { diff --git a/ethexe/tx-pool/src/lib.rs b/ethexe/tx-pool/src/lib.rs index 88839a29e2c..e6ec1f2d175 100644 --- a/ethexe/tx-pool/src/lib.rs +++ b/ethexe/tx-pool/src/lib.rs @@ -24,7 +24,10 @@ mod transaction; #[cfg(test)] mod tests; -pub use service::{InputTask, TxPoolInputTaskSender, TxPoolService}; +pub use service::{ + InputTask, OutputTask, TxPoolInputTaskSender, TxPoolOutputTaskReceiver, TxPoolService, + TxPoolServiceArtifacts, +}; pub use transaction::{EthexeTransaction, Transaction}; // TODO [sab] decide on tx pool channel size @@ -44,6 +47,7 @@ pub trait TxPoolTrait { /// Add transaction to the pool. // TODO [sab] maybe take error from Transaction? + // TODO [sab] maybe return a "validated transaction"? fn add_transaction(&self, transaction: Self::Transaction) -> Result<()>; } @@ -81,7 +85,7 @@ where let tx_bytes = transaction.encode(); let tx_hash = transaction.tx_hash(); - // TODO [sab] if transactione exists - send info, that already exists to ban the spammer + // TODO [sab] handle duplicates if self.db.validated_transaction(tx_hash).is_none() { transaction.validate().map_err(Into::into)?; self.db.set_validated_transaction(tx_hash, tx_bytes); diff --git a/ethexe/tx-pool/src/service.rs b/ethexe/tx-pool/src/service.rs index 140f8dcf1ab..a6bf6f9e71c 100644 --- a/ethexe/tx-pool/src/service.rs +++ b/ethexe/tx-pool/src/service.rs @@ -19,30 +19,45 @@ //! Transaction pool io. pub use input::{InputTask, TxPoolInputTaskSender}; +pub use output::{OutputTask, TxPoolOutputTaskReceiver}; use crate::{Transaction, TxPoolTrait}; use input::TxPoolInputTaskReceiver; +use output::TxPoolOutputTaskSender; use tokio::sync::mpsc; +pub type TxPoolServiceArtifacts = ( + TxPoolService, + TxPoolInputTaskSender, + TxPoolOutputTaskReceiver, +); + /// Transaction pool service. /// /// Serves as an interface for the transaction pool core. pub struct TxPoolService> { core: TxPool, input_interface: TxPoolInputTaskReceiver, + output_inteface: TxPoolOutputTaskSender, } -impl> TxPoolService { - pub fn new(tx_pool_core: impl Into) -> (Self, TxPoolInputTaskSender) { +impl> TxPoolService { + pub fn new(tx_pool_core: impl Into) -> TxPoolServiceArtifacts { let tx_pool_core = tx_pool_core.into(); - let (tx, rx) = mpsc::unbounded_channel(); + let (tx_in, rx_in) = mpsc::unbounded_channel(); + let (tx_out, rx_out) = mpsc::unbounded_channel(); let tx_pool_interface = Self { core: tx_pool_core, - input_interface: TxPoolInputTaskReceiver { receiver: rx }, + input_interface: TxPoolInputTaskReceiver { receiver: rx_in }, + output_inteface: TxPoolOutputTaskSender { sender: tx_out }, }; - (tx_pool_interface, TxPoolInputTaskSender { sender: tx }) + ( + tx_pool_interface, + TxPoolInputTaskSender { sender: tx_in }, + TxPoolOutputTaskReceiver { receiver: rx_out }, + ) } /// Runs transaction pool service expecting to receive tasks from the @@ -54,11 +69,18 @@ impl> TxPoolService { - if response_sender - .send(self.core.add_transaction(transaction)) - .is_err() + let res = self.core.add_transaction(transaction.clone()); + if let Some(response_sender) = response_sender { + let _ = response_sender.send(res).inspect_err(|err| { + log::error!("`AddTransaction` task receiver dropped - {err:?}") + }); + } + + if let Err(err) = self + .output_inteface + .send(OutputTask::PropogateTransaction { transaction }) { - log::debug!("`AddTransaction` task receiver dropped.") + log::error!("Failed to send `PropogateTransaction` task: {err:?}"); } } } @@ -78,13 +100,18 @@ mod input { /// the [`crate::TxPool`] implementation. pub enum InputTask { /// Request for adding the transaction to the transaction pool. - /// Sends the response back to the task sender. + /// Sends the response back to the task sender, if there's receiver, + /// that expects the response. AddTransaction { transaction: Tx, - response_sender: oneshot::Sender>, + response_sender: Option>>, }, } + /// Transaction pool input task sender. + /// + /// Used as a sending end to communicate with the transaction pool service + /// to run some action on the transaction pool. #[derive(Debug, Clone)] pub struct TxPoolInputTaskSender { pub(crate) sender: mpsc::UnboundedSender>, @@ -104,6 +131,7 @@ mod input { } } + /// Transaction pool input task receiver. pub(crate) struct TxPoolInputTaskReceiver { pub(crate) receiver: mpsc::UnboundedReceiver>, } @@ -122,3 +150,59 @@ mod input { } } } + +mod output { + use anyhow::Result; + use std::ops::{Deref, DerefMut}; + use tokio::sync::mpsc; + + /// Output task sent from the transaction pool service. + /// + /// The task is not obligatory to be anyhow handled, + /// but is a way to communicate with an external service. + pub enum OutputTask { + /// Signals to the external service to propogate the transaction + PropogateTransaction { transaction: Tx }, + } + + /// Transaction pool output task sender. + pub(crate) struct TxPoolOutputTaskSender { + pub(crate) sender: mpsc::UnboundedSender>, + } + + impl Deref for TxPoolOutputTaskSender { + type Target = mpsc::UnboundedSender>; + + fn deref(&self) -> &Self::Target { + &self.sender + } + } + + impl DerefMut for TxPoolOutputTaskSender { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.sender + } + } + + /// Transaction pool output task receiver. + /// + /// Used as a receiving end to transaction pool service + /// external communication channel. + pub struct TxPoolOutputTaskReceiver { + pub(crate) receiver: mpsc::UnboundedReceiver>, + } + + impl Deref for TxPoolOutputTaskReceiver { + type Target = mpsc::UnboundedReceiver>; + + fn deref(&self) -> &Self::Target { + &self.receiver + } + } + + impl DerefMut for TxPoolOutputTaskReceiver { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.receiver + } + } +} From a569591248b51f16fab3e160fc727ee164a8c626 Mon Sep 17 00:00:00 2001 From: Sabaun Taraki Date: Thu, 28 Nov 2024 15:16:57 +0300 Subject: [PATCH 6/9] Resolve some todos --- Cargo.lock | 2 +- ethexe/cli/src/service.rs | 77 +++++++++++++++------------------- ethexe/cli/src/tests.rs | 51 ++++++++++------------ ethexe/db/src/database.rs | 2 - ethexe/rpc/src/apis/tx_pool.rs | 5 --- ethexe/rpc/src/errors.rs | 2 + ethexe/rpc/src/lib.rs | 5 +-- ethexe/tx-pool/Cargo.toml | 2 +- ethexe/tx-pool/src/lib.rs | 29 +++++++------ ethexe/tx-pool/src/service.rs | 28 ++++++------- 10 files changed, 91 insertions(+), 112 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index dbd219c1807..328d0d04db8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4737,7 +4737,7 @@ dependencies = [ [[package]] name = "ethexe-tx-pool" -version = "1.6.2" +version = "1.7.0" dependencies = [ "anyhow", "ethexe-db", diff --git a/ethexe/cli/src/service.rs b/ethexe/cli/src/service.rs index 0860e8f309e..d3927a32c07 100644 --- a/ethexe/cli/src/service.rs +++ b/ethexe/cli/src/service.rs @@ -35,8 +35,7 @@ use ethexe_processor::{LocalOutcome, ProcessorConfig}; use ethexe_sequencer::agro::AggregatedCommitments; use ethexe_signer::{Digest, PublicKey, Signature, Signer}; use ethexe_tx_pool::{ - EthexeTransaction, InputTask, OutputTask, TxPoolCore, TxPoolInputTaskSender, TxPoolService, - TxPoolServiceArtifacts, + EthexeTransaction, InputTask, OutputTask, StandardInputTaskSender, StandardTxPoolInstantiationArtifacts, TxPoolService }; use ethexe_validator::BlockCommitmentValidationRequest; use futures::{future, stream::StreamExt, FutureExt}; @@ -59,6 +58,7 @@ pub struct Service { processor: ethexe_processor::Processor, signer: ethexe_signer::Signer, block_time: Duration, + tx_pool_artifacts: StandardTxPoolInstantiationArtifacts, // Optional services network: Option, @@ -66,8 +66,6 @@ pub struct Service { validator: Option, metrics_service: Option, rpc: Option, - tx_pool_service: - Option>>, } // TODO: consider to move this to another module #4176 @@ -105,8 +103,6 @@ impl Service { let rocks_db = ethexe_db::RocksDatabase::open(config.database_path.clone())?; let db = ethexe_db::Database::from_one(&rocks_db, ethereum_router_address.0); - let tx_pool_artifacts = TxPoolService::new((db.clone(),)); - let observer = ethexe_observer::Observer::new( &config.ethereum_rpc, ethereum_router_address, @@ -211,8 +207,11 @@ impl Service { }) .transpose()?; + log::info!("🚅 Tx pool service starting..."); + let tx_pool_artifacts = TxPoolService::new((db.clone(),)); + let rpc = config.rpc_config.as_ref().map(|config| { - ethexe_rpc::RpcService::new(config.clone(), db.clone(), tx_pool_artifacts.1.clone()) + ethexe_rpc::RpcService::new(config.clone(), db.clone(), tx_pool_artifacts.input_sender.clone()) }); Ok(Self { @@ -228,7 +227,7 @@ impl Service { metrics_service, rpc, block_time: config.block_time, - tx_pool_service: Some(tx_pool_artifacts), + tx_pool_artifacts, }) } @@ -255,9 +254,7 @@ impl Service { validator: Option, metrics_service: Option, rpc: Option, - tx_pool_service: Option< - TxPoolServiceArtifacts>, - >, + tx_pool_artifacts: StandardTxPoolInstantiationArtifacts, ) -> Self { Self { db, @@ -272,7 +269,7 @@ impl Service { validator, metrics_service, rpc, - tx_pool_service, + tx_pool_artifacts, } } @@ -460,7 +457,7 @@ impl Service { mut validator, metrics_service, rpc, - tx_pool_service, + tx_pool_artifacts, block_time, } = self; @@ -495,20 +492,13 @@ impl Service { None }; - let (mut tx_pool_handle, mut tx_pool_input_task_sender, mut tx_pool_ouput_task_receiver) = - if let Some((tx_pool_service, tx_pool_input_task_sender, tx_pool_ouput_task_receiver)) = - tx_pool_service - { - log::info!("🚅 Tx pool service starting..."); - ( - Some(tokio::spawn(tx_pool_service.run())), - Some(tx_pool_input_task_sender), - Some(tx_pool_ouput_task_receiver), - ) - } else { - (None, None, None) - }; + let StandardTxPoolInstantiationArtifacts { + service: tx_pool_service, + input_sender: tx_pool_input_task_sender, + output_receiver: mut tx_pool_ouput_task_receiver, + } = tx_pool_artifacts; + let mut tx_pool_handle = tokio::spawn(tx_pool_service.run()); let mut roles = "Observer".to_string(); if let Some(seq) = sequencer.as_ref() { @@ -584,7 +574,7 @@ impl Service { validator.as_mut(), sequencer.as_mut(), network_sender.as_mut(), - tx_pool_input_task_sender.as_mut(), + &tx_pool_input_task_sender, ); if let Err(err) = result { @@ -609,7 +599,8 @@ impl Service { _ => {} } } - Some(task) = maybe_await(tx_pool_ouput_task_receiver.as_mut().map(|rx| rx.recv())) => { + Some(task) = tx_pool_ouput_task_receiver.recv() => { + log::debug!("Received a task from the tx pool - {task:?}"); Self::process_tx_pool_output_task(task, network_sender.as_mut()); } _ = maybe_await(network_handle.as_mut()) => { @@ -620,7 +611,7 @@ impl Service { log::info!("`RPCWorker` has terminated, shutting down..."); break; } - _ = maybe_await(tx_pool_handle.as_mut()) => { + _ = &mut tx_pool_handle => { log::info!("`TxPoolService` has terminated, shutting down..."); break; } @@ -814,7 +805,7 @@ impl Service { maybe_validator: Option<&mut ethexe_validator::Validator>, maybe_sequencer: Option<&mut ethexe_sequencer::Sequencer>, maybe_network_sender: Option<&mut ethexe_network::NetworkSender>, - maybe_tx_pool_input_task_sender: Option<&mut TxPoolInputTaskSender>, + tx_pool_input_task_sender: &StandardInputTaskSender, ) -> Result<()> { let message = NetworkMessage::decode(&mut data)?; match message { @@ -871,16 +862,17 @@ impl Service { Ok(()) } NetworkMessage::Transaction { transaction } => { - // TODO [sab] check if it should be unwrapped - let Some(tx_pool_input_task_sender) = maybe_tx_pool_input_task_sender else { - return Ok(()); - }; - - // TODO [sab] handle result - let _ = tx_pool_input_task_sender.send(InputTask::AddTransaction { - transaction: transaction, - response_sender: None, - }); + let _ = tx_pool_input_task_sender + .send(InputTask::AddTransaction { + transaction: transaction, + response_sender: None, + }) + .inspect_err(|e| { + log::error!( + "Failed to send tx pool input task: {e}. \ + The receiving end in the tx pool might have been dropped." + ); + }); Ok(()) } @@ -912,12 +904,9 @@ impl Service { } fn process_tx_pool_output_task( - task: Option>, + task: OutputTask, mut maybe_network_sender: Option<&mut ethexe_network::NetworkSender>, ) { - let Some(task) = task else { - - }; match task { OutputTask::PropogateTransaction { transaction } => { if let Some(network_sender) = maybe_network_sender.as_mut() { diff --git a/ethexe/cli/src/tests.rs b/ethexe/cli/src/tests.rs index 808c4cda181..c94712da108 100644 --- a/ethexe/cli/src/tests.rs +++ b/ethexe/cli/src/tests.rs @@ -805,8 +805,6 @@ async fn tx_pool_gossip() { let mut node0 = env.new_node( NodeConfig::default() .validator(env.validators[0]) - // TODO [sab] run only pool, but not network - .service_rpc(9506) .network(None, None), ); node0.start_service().await; @@ -829,7 +827,7 @@ async fn tx_pool_gossip() { // Send request log::info!("Sending tx pool request"); - send_json_request(node1.service_rpc_url().expect("rpc server is set"), || { + let resp = send_json_request(node1.service_rpc_url().expect("rpc server is set"), || { serde_json::json!({ "jsonrpc": "2.0", "method": "transactionPool_sendMessage", @@ -839,7 +837,11 @@ async fn tx_pool_gossip() { }, "id": 1, }) - }).await; + }) + .await + .expect("failed sending request"); + + assert!(resp.status().is_success()); tokio::time::sleep(Duration::from_secs(5)).await; @@ -852,23 +854,19 @@ async fn tx_pool_gossip() { assert_eq!(node0_db_tx, tx); } -async fn send_json_request(rpc_server_url: String, create_request: impl Fn() -> serde_json::Value) -> Result<> { +async fn send_json_request(rpc_server_url: String, create_request: impl Fn() -> serde_json::Value) -> Result { let client = reqwest::Client::new(); let req_body = create_request(); - let resp = client + client .post(rpc_server_url) .json(&req_body) .send() .await - .unwrap_or_else(|err| panic!("Failed to send request: {err:?}")); - - assert!(resp.status().is_success()) } mod utils { use std::net::SocketAddr; - use super::*; use ethexe_observer::SimpleBlockData; use ethexe_rpc::{RpcConfig, RpcService}; @@ -1012,7 +1010,7 @@ mod utils { sequencer_public_key, validator_public_key, network, - service_rpc, + service_rpc_config, } = config; let db = @@ -1044,7 +1042,7 @@ mod utils { validator_public_key, network_address, network_bootstrap_address, - service_rpc, + service_rpc_config, } } @@ -1177,7 +1175,7 @@ mod utils { /// Network configuration, if provided then new node starts with network. pub network: Option, /// RPC configuration, if provided then new node starts with RPC service. - pub service_rpc: Option, + pub service_rpc_config: Option, } impl NodeConfig { @@ -1209,13 +1207,13 @@ mod utils { } pub fn service_rpc(mut self, rpc_port: u16) -> Self { - let service_rpc = RpcConfig { + let service_rpc_config = RpcConfig { listen_addr: SocketAddr::new( "127.0.0.1".parse().unwrap(), rpc_port ) }; - self.service_rpc = Some(service_rpc); + self.service_rpc_config = Some(service_rpc_config); self } @@ -1352,7 +1350,7 @@ mod utils { validator_public_key: Option, network_address: Option, network_bootstrap_address: Option, - service_rpc: Option, + service_rpc_config: Option, } impl Node { @@ -1423,18 +1421,13 @@ mod utils { None => None, }; - let (rpc, tx_pool) = { - match self.service_rpc.as_ref() { - Some(rpc_config) => { - let tx_pool_artifacts = TxPoolService::new((self.db.clone(),)); + let tx_pool_artifacts = TxPoolService::new((self.db.clone(),)); - let rpc = RpcService::new(rpc_config.clone(), self.db.clone(), tx_pool_artifacts.1.clone()); - - (Some(rpc), Some(tx_pool_artifacts)) - }, - None => (None, None) - } - }; + let rpc = self.service_rpc_config + .as_ref() + .map(|service_rpc_config| { + RpcService::new(service_rpc_config.clone(), self.db.clone(), tx_pool_artifacts.input_sender.clone()) + }); let service = Service::new_from_parts( self.db.clone(), @@ -1449,7 +1442,7 @@ mod utils { validator, None, rpc, - tx_pool + tx_pool_artifacts ); let handle = task::spawn(service.run()); @@ -1470,7 +1463,7 @@ mod utils { } pub fn service_rpc_url(&self) -> Option { - self.service_rpc.as_ref().map(|rpc| + self.service_rpc_config.as_ref().map(|rpc| format!("http://{}", rpc.listen_addr.to_string()) ) } diff --git a/ethexe/db/src/database.rs b/ethexe/db/src/database.rs index a37c0102cf3..590e692dbee 100644 --- a/ethexe/db/src/database.rs +++ b/ethexe/db/src/database.rs @@ -445,8 +445,6 @@ impl Database { self.cas.write(data) } - // TODO [sab]: - // 1. Maybe concrete transaction type and encode it? pub fn validated_transaction(&self, tx_hash: H256) -> Option> { self.kv.get(&KeyPrefix::Transaction.one(tx_hash)) } diff --git a/ethexe/rpc/src/apis/tx_pool.rs b/ethexe/rpc/src/apis/tx_pool.rs index 1e4112bdc07..0e5ff83fe4f 100644 --- a/ethexe/rpc/src/apis/tx_pool.rs +++ b/ethexe/rpc/src/apis/tx_pool.rs @@ -45,11 +45,6 @@ impl TransactionPoolApi { } } -/* -Test case for raw rpc call with websocat: -transactionPool_sendMessage {"raw_message": [104, 101, 108, 108, 111, 95, 119, 111, 114, 108, 100], "signature": [89, 105, 98, 73, 40, 32, 213, 19, 244, 171, 227, 144, 199, 56, 94, 1, 223, 229, 208, 245, 103, 132, 35, 75, 99, 195, 70, 169, 1, 48, 250, 219, 100, 79, 7, 240, 253, 122, 22, 12, 87, 45, 173, 191, 36, 72, 136, 222, 57, 6, 55, 244, 196, 125, 135, 250, 237, 70, 116, 65, 46, 175, 75, 37, 27]} -*/ - #[async_trait] impl TransactionPoolServer for TransactionPoolApi { async fn send_message(&self, raw_message: Vec, signature: Vec) -> RpcResult<()> { diff --git a/ethexe/rpc/src/errors.rs b/ethexe/rpc/src/errors.rs index fb46db47deb..0b050954891 100644 --- a/ethexe/rpc/src/errors.rs +++ b/ethexe/rpc/src/errors.rs @@ -18,6 +18,8 @@ use jsonrpsee::types::ErrorObject; +// Todo [sab] + pub fn db(err: &'static str) -> ErrorObject<'static> { ErrorObject::owned(8000, "Database error", Some(err)) } diff --git a/ethexe/rpc/src/lib.rs b/ethexe/rpc/src/lib.rs index 2c2919010d7..e30780fac46 100644 --- a/ethexe/rpc/src/lib.rs +++ b/ethexe/rpc/src/lib.rs @@ -21,7 +21,6 @@ use apis::{ BlockApi, BlockServer, ProgramApi, ProgramServer, TransactionPoolApi, TransactionPoolServer, }; use ethexe_db::Database; -use ethexe_tx_pool::{EthexeTransaction, TxPoolInputTaskSender}; use futures::FutureExt; use jsonrpsee::{ server::{ @@ -55,14 +54,14 @@ pub struct RpcConfig { pub struct RpcService { config: RpcConfig, db: Database, - tx_pool_task_sender: TxPoolInputTaskSender, + tx_pool_task_sender: ethexe_tx_pool::StandardInputTaskSender, } impl RpcService { pub fn new( config: RpcConfig, db: Database, - tx_pool_task_sender: TxPoolInputTaskSender, + tx_pool_task_sender: ethexe_tx_pool::StandardInputTaskSender, ) -> Self { Self { config, diff --git a/ethexe/tx-pool/Cargo.toml b/ethexe/tx-pool/Cargo.toml index 1cef642acdb..8f6d0614550 100644 --- a/ethexe/tx-pool/Cargo.toml +++ b/ethexe/tx-pool/Cargo.toml @@ -15,4 +15,4 @@ ethexe-db.workspace = true log.workspace = true parity-scale-codec = { workspace = true, features = ["std", "derive"] } tokio = { workspace = true, features = ["sync"] } -gprimitives = { workspace = true, features = ["codec"] } \ No newline at end of file +gprimitives = { workspace = true, features = ["codec"] } diff --git a/ethexe/tx-pool/src/lib.rs b/ethexe/tx-pool/src/lib.rs index e6ec1f2d175..9db31ebff2f 100644 --- a/ethexe/tx-pool/src/lib.rs +++ b/ethexe/tx-pool/src/lib.rs @@ -26,28 +26,33 @@ mod tests; pub use service::{ InputTask, OutputTask, TxPoolInputTaskSender, TxPoolOutputTaskReceiver, TxPoolService, - TxPoolServiceArtifacts, + TxPoolInstantiationArtifacts, }; pub use transaction::{EthexeTransaction, Transaction}; -// TODO [sab] decide on tx pool channel size - -use anyhow::{anyhow, Result}; -use ethexe_db::{Database, MemDb}; -use ethexe_signer::ToDigest; +use anyhow::Result; +use ethexe_db::Database; use parity_scale_codec::Encode; -use std::{fmt::Debug, marker::PhantomData}; -use tokio::sync::{mpsc, oneshot}; +use std::marker::PhantomData; + +/// Transaction pool with a [`EthexeTransaction`] transaction type. +pub type StandardTxPool = TxPoolCore; +/// Transaction pool service with a [`EthexeTransaction`] transaction type and a [`StandardTxPool`] as a transaction pool. +pub type StandardTxPoolService = TxPoolService; +/// Transaction pool input task sender with a [`EthexeTransaction`] transaction type. +pub type StandardInputTaskSender = TxPoolInputTaskSender; +/// Transaction pool output task receiver with a [`EthexeTransaction`] transaction type. +pub type StandardOutputTaskReceiver = TxPoolOutputTaskReceiver; +/// Transaction pool instantiation artifacts with a [`EthexeTransaction`] transaction type and a [`StandardTxPool`] as a transaction pool. +pub type StandardTxPoolInstantiationArtifacts = + TxPoolInstantiationArtifacts; /// Transaction pool trait. -// TODO [sab] define type of hashes and signatures for the tx pool pub trait TxPoolTrait { /// Transaction type. type Transaction: Transaction; /// Add transaction to the pool. - // TODO [sab] maybe take error from Transaction? - // TODO [sab] maybe return a "validated transaction"? fn add_transaction(&self, transaction: Self::Transaction) -> Result<()>; } @@ -60,7 +65,6 @@ impl TxPoolTrait for () { } pub struct TxPoolCore { - // TODO [sab] trait for tx pool db? db: Database, _phantom: PhantomData, } @@ -85,7 +89,6 @@ where let tx_bytes = transaction.encode(); let tx_hash = transaction.tx_hash(); - // TODO [sab] handle duplicates if self.db.validated_transaction(tx_hash).is_none() { transaction.validate().map_err(Into::into)?; self.db.set_validated_transaction(tx_hash, tx_bytes); diff --git a/ethexe/tx-pool/src/service.rs b/ethexe/tx-pool/src/service.rs index a6bf6f9e71c..8c95253eaa6 100644 --- a/ethexe/tx-pool/src/service.rs +++ b/ethexe/tx-pool/src/service.rs @@ -26,11 +26,12 @@ use input::TxPoolInputTaskReceiver; use output::TxPoolOutputTaskSender; use tokio::sync::mpsc; -pub type TxPoolServiceArtifacts = ( - TxPoolService, - TxPoolInputTaskSender, - TxPoolOutputTaskReceiver, -); +/// Transaction pool instantiation artifacts carrier. +pub struct TxPoolInstantiationArtifacts> { + pub service: TxPoolService, + pub input_sender: TxPoolInputTaskSender, + pub output_receiver: TxPoolOutputTaskReceiver, +} /// Transaction pool service. /// @@ -42,22 +43,22 @@ pub struct TxPoolService> } impl> TxPoolService { - pub fn new(tx_pool_core: impl Into) -> TxPoolServiceArtifacts { + pub fn new(tx_pool_core: impl Into) -> TxPoolInstantiationArtifacts { let tx_pool_core = tx_pool_core.into(); let (tx_in, rx_in) = mpsc::unbounded_channel(); let (tx_out, rx_out) = mpsc::unbounded_channel(); - let tx_pool_interface = Self { + let service = Self { core: tx_pool_core, input_interface: TxPoolInputTaskReceiver { receiver: rx_in }, output_inteface: TxPoolOutputTaskSender { sender: tx_out }, }; - ( - tx_pool_interface, - TxPoolInputTaskSender { sender: tx_in }, - TxPoolOutputTaskReceiver { receiver: rx_out }, - ) + TxPoolInstantiationArtifacts { + service, + input_sender: TxPoolInputTaskSender { sender: tx_in }, + output_receiver: TxPoolOutputTaskReceiver { receiver: rx_out }, + } } /// Runs transaction pool service expecting to receive tasks from the @@ -88,7 +89,6 @@ impl> TxPoolServi } } -// TODO [sab] move I\O to the other crate so ethexe-rpc is lighter mod input { use anyhow::Result; use std::ops::{Deref, DerefMut}; @@ -152,7 +152,6 @@ mod input { } mod output { - use anyhow::Result; use std::ops::{Deref, DerefMut}; use tokio::sync::mpsc; @@ -160,6 +159,7 @@ mod output { /// /// The task is not obligatory to be anyhow handled, /// but is a way to communicate with an external service. + #[derive(Debug)] pub enum OutputTask { /// Signals to the external service to propogate the transaction PropogateTransaction { transaction: Tx }, From 8b40859d32a21c05d5ccaf0dc4d60a538b381662 Mon Sep 17 00:00:00 2001 From: Sabaun Taraki Date: Thu, 28 Nov 2024 15:27:32 +0300 Subject: [PATCH 7/9] Clippy and fmt --- ethexe/cli/src/service.rs | 18 ++++++---- ethexe/cli/src/tests.rs | 63 +++++++++++++++++++---------------- ethexe/tx-pool/src/lib.rs | 4 +-- ethexe/tx-pool/src/service.rs | 41 +++++++++++++---------- 4 files changed, 70 insertions(+), 56 deletions(-) diff --git a/ethexe/cli/src/service.rs b/ethexe/cli/src/service.rs index d3927a32c07..6f5b577f6dd 100644 --- a/ethexe/cli/src/service.rs +++ b/ethexe/cli/src/service.rs @@ -35,7 +35,8 @@ use ethexe_processor::{LocalOutcome, ProcessorConfig}; use ethexe_sequencer::agro::AggregatedCommitments; use ethexe_signer::{Digest, PublicKey, Signature, Signer}; use ethexe_tx_pool::{ - EthexeTransaction, InputTask, OutputTask, StandardInputTaskSender, StandardTxPoolInstantiationArtifacts, TxPoolService + EthexeTransaction, InputTask, OutputTask, StandardInputTaskSender, + StandardTxPoolInstantiationArtifacts, }; use ethexe_validator::BlockCommitmentValidationRequest; use futures::{future, stream::StreamExt, FutureExt}; @@ -208,10 +209,14 @@ impl Service { .transpose()?; log::info!("🚅 Tx pool service starting..."); - let tx_pool_artifacts = TxPoolService::new((db.clone(),)); + let tx_pool_artifacts = ethexe_tx_pool::new((db.clone(),)); let rpc = config.rpc_config.as_ref().map(|config| { - ethexe_rpc::RpcService::new(config.clone(), db.clone(), tx_pool_artifacts.input_sender.clone()) + ethexe_rpc::RpcService::new( + config.clone(), + db.clone(), + tx_pool_artifacts.input_sender.clone(), + ) }); Ok(Self { @@ -492,9 +497,8 @@ impl Service { None }; - - let StandardTxPoolInstantiationArtifacts { - service: tx_pool_service, + let StandardTxPoolInstantiationArtifacts { + service: tx_pool_service, input_sender: tx_pool_input_task_sender, output_receiver: mut tx_pool_ouput_task_receiver, } = tx_pool_artifacts; @@ -864,7 +868,7 @@ impl Service { NetworkMessage::Transaction { transaction } => { let _ = tx_pool_input_task_sender .send(InputTask::AddTransaction { - transaction: transaction, + transaction, response_sender: None, }) .inspect_err(|e| { diff --git a/ethexe/cli/src/tests.rs b/ethexe/cli/src/tests.rs index c94712da108..f5f7def73cf 100644 --- a/ethexe/cli/src/tests.rs +++ b/ethexe/cli/src/tests.rs @@ -820,7 +820,8 @@ async fn tx_pool_gossip() { // Prepare tx data let raw_message = b"hello world".to_vec(); - let signature = env.signer + let signature = env + .signer .sign(env.validators[2], &raw_message) .expect("failed signing message"); let signature_bytes = signature.encode(); @@ -840,39 +841,44 @@ async fn tx_pool_gossip() { }) .await .expect("failed sending request"); - + assert!(resp.status().is_success()); tokio::time::sleep(Duration::from_secs(5)).await; // Check that node0 received the message - let tx = EthexeTransaction::Message { raw_message, signature: signature_bytes }; + let tx = EthexeTransaction::Message { + raw_message, + signature: signature_bytes, + }; let tx_hash = tx.tx_hash(); - - let tx_data = node0.db.validated_transaction(tx_hash).expect("tx not found"); - let node0_db_tx: EthexeTransaction = Decode::decode(&mut &tx_data[..]).expect("failed to decode tx"); + + let tx_data = node0 + .db + .validated_transaction(tx_hash) + .expect("tx not found"); + let node0_db_tx: EthexeTransaction = + Decode::decode(&mut &tx_data[..]).expect("failed to decode tx"); assert_eq!(node0_db_tx, tx); } -async fn send_json_request(rpc_server_url: String, create_request: impl Fn() -> serde_json::Value) -> Result { +async fn send_json_request( + rpc_server_url: String, + create_request: impl Fn() -> serde_json::Value, +) -> Result { let client = reqwest::Client::new(); let req_body = create_request(); - client - .post(rpc_server_url) - .json(&req_body) - .send() - .await + client.post(rpc_server_url).json(&req_body).send().await } mod utils { - use std::net::SocketAddr; use super::*; use ethexe_observer::SimpleBlockData; use ethexe_rpc::{RpcConfig, RpcService}; - use ethexe_tx_pool::TxPoolService; use futures::StreamExt; use gear_core::message::ReplyCode; + use std::net::SocketAddr; use tokio::sync::{broadcast::Sender, Mutex}; pub struct TestEnv { @@ -1208,10 +1214,7 @@ mod utils { pub fn service_rpc(mut self, rpc_port: u16) -> Self { let service_rpc_config = RpcConfig { - listen_addr: SocketAddr::new( - "127.0.0.1".parse().unwrap(), - rpc_port - ) + listen_addr: SocketAddr::new("127.0.0.1".parse().unwrap(), rpc_port), }; self.service_rpc_config = Some(service_rpc_config); @@ -1421,13 +1424,15 @@ mod utils { None => None, }; - let tx_pool_artifacts = TxPoolService::new((self.db.clone(),)); + let tx_pool_artifacts = ethexe_tx_pool::new((self.db.clone(),)); - let rpc = self.service_rpc_config - .as_ref() - .map(|service_rpc_config| { - RpcService::new(service_rpc_config.clone(), self.db.clone(), tx_pool_artifacts.input_sender.clone()) - }); + let rpc = self.service_rpc_config.as_ref().map(|service_rpc_config| { + RpcService::new( + service_rpc_config.clone(), + self.db.clone(), + tx_pool_artifacts.input_sender.clone(), + ) + }); let service = Service::new_from_parts( self.db.clone(), @@ -1442,7 +1447,7 @@ mod utils { validator, None, rpc, - tx_pool_artifacts + tx_pool_artifacts, ); let handle = task::spawn(service.run()); @@ -1461,11 +1466,11 @@ mod utils { let _ = handle.await; self.multiaddr = None; } - + pub fn service_rpc_url(&self) -> Option { - self.service_rpc_config.as_ref().map(|rpc| - format!("http://{}", rpc.listen_addr.to_string()) - ) + self.service_rpc_config + .as_ref() + .map(|rpc| format!("http://{}", rpc.listen_addr)) } } diff --git a/ethexe/tx-pool/src/lib.rs b/ethexe/tx-pool/src/lib.rs index 9db31ebff2f..8dfa1251cb7 100644 --- a/ethexe/tx-pool/src/lib.rs +++ b/ethexe/tx-pool/src/lib.rs @@ -25,8 +25,8 @@ mod transaction; mod tests; pub use service::{ - InputTask, OutputTask, TxPoolInputTaskSender, TxPoolOutputTaskReceiver, TxPoolService, - TxPoolInstantiationArtifacts, + new, InputTask, OutputTask, TxPoolInputTaskSender, TxPoolInstantiationArtifacts, + TxPoolOutputTaskReceiver, TxPoolService, }; pub use transaction::{EthexeTransaction, Transaction}; diff --git a/ethexe/tx-pool/src/service.rs b/ethexe/tx-pool/src/service.rs index 8c95253eaa6..f49a4c6c1bb 100644 --- a/ethexe/tx-pool/src/service.rs +++ b/ethexe/tx-pool/src/service.rs @@ -26,6 +26,29 @@ use input::TxPoolInputTaskReceiver; use output::TxPoolOutputTaskSender; use tokio::sync::mpsc; +/// Creates a new transaction pool service. +pub fn new(tx_pool_core: impl Into) -> TxPoolInstantiationArtifacts +where + Tx: Transaction + Clone, + TxPool: TxPoolTrait, +{ + let tx_pool_core = tx_pool_core.into(); + let (tx_in, rx_in) = mpsc::unbounded_channel(); + let (tx_out, rx_out) = mpsc::unbounded_channel(); + + let service = TxPoolService { + core: tx_pool_core, + input_interface: TxPoolInputTaskReceiver { receiver: rx_in }, + output_inteface: TxPoolOutputTaskSender { sender: tx_out }, + }; + + TxPoolInstantiationArtifacts { + service, + input_sender: TxPoolInputTaskSender { sender: tx_in }, + output_receiver: TxPoolOutputTaskReceiver { receiver: rx_out }, + } +} + /// Transaction pool instantiation artifacts carrier. pub struct TxPoolInstantiationArtifacts> { pub service: TxPoolService, @@ -43,24 +66,6 @@ pub struct TxPoolService> } impl> TxPoolService { - pub fn new(tx_pool_core: impl Into) -> TxPoolInstantiationArtifacts { - let tx_pool_core = tx_pool_core.into(); - let (tx_in, rx_in) = mpsc::unbounded_channel(); - let (tx_out, rx_out) = mpsc::unbounded_channel(); - - let service = Self { - core: tx_pool_core, - input_interface: TxPoolInputTaskReceiver { receiver: rx_in }, - output_inteface: TxPoolOutputTaskSender { sender: tx_out }, - }; - - TxPoolInstantiationArtifacts { - service, - input_sender: TxPoolInputTaskSender { sender: tx_in }, - output_receiver: TxPoolOutputTaskReceiver { receiver: rx_out }, - } - } - /// Runs transaction pool service expecting to receive tasks from the /// tx pool input task sender. pub async fn run(mut self) { From 5b7b8d10978285ae7890045f4c8ab14db5291e9c Mon Sep 17 00:00:00 2001 From: Sabaun Taraki Date: Thu, 28 Nov 2024 15:37:17 +0300 Subject: [PATCH 8/9] State todos --- ethexe/rpc/src/errors.rs | 2 +- ethexe/signer/src/lib.rs | 2 +- ethexe/signer/src/signature.rs | 4 ++-- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/ethexe/rpc/src/errors.rs b/ethexe/rpc/src/errors.rs index 0b050954891..8c45421f50f 100644 --- a/ethexe/rpc/src/errors.rs +++ b/ethexe/rpc/src/errors.rs @@ -18,7 +18,7 @@ use jsonrpsee::types::ErrorObject; -// Todo [sab] +// TODO #4364: https://github.com/gear-tech/gear/issues/4364 pub fn db(err: &'static str) -> ErrorObject<'static> { ErrorObject::owned(8000, "Database error", Some(err)) diff --git a/ethexe/signer/src/lib.rs b/ethexe/signer/src/lib.rs index 4b474a1df4f..e17ad339695 100644 --- a/ethexe/signer/src/lib.rs +++ b/ethexe/signer/src/lib.rs @@ -77,7 +77,7 @@ impl Signer { } /// Create a ECDSA recoverable signature with pre or post EIP-155 `v` value notation. - // TODO [sab] change docs after introducing a parity + // TODO #4365 pub fn sign_digest(&self, public_key: PublicKey, digest: Digest) -> Result { let private_key = self.get_private_key(public_key)?; diff --git a/ethexe/signer/src/signature.rs b/ethexe/signer/src/signature.rs index 7297b5de595..a5d934090b1 100644 --- a/ethexe/signer/src/signature.rs +++ b/ethexe/signer/src/signature.rs @@ -67,7 +67,7 @@ impl AsRef<[u8]> for RawSignature { impl From for RawSignature { fn from(mut sig: Signature) -> RawSignature { - // TODO [sab]: Define parity in accordance to pre-EIP-155 and post-EIP-155 standards. + // TODO #4365: https://github.com/gear-tech/gear/issues/4365 sig.0[64] -= 27; RawSignature(sig.0) } @@ -121,7 +121,7 @@ impl Signature { impl From for Signature { fn from(mut sig: RawSignature) -> Self { - // TODO [sab]: Include chain id, as that's for transaction of pre-EIP-155 (!) + // TODO #4365: https://github.com/gear-tech/gear/issues/4365 sig.0[64] += 27; Signature(sig.0) } From d30c16d53f1dfd041455f55c88629ceaf9303a5f Mon Sep 17 00:00:00 2001 From: Sabaun Taraki Date: Thu, 28 Nov 2024 16:01:04 +0300 Subject: [PATCH 9/9] fix clippy --- ethexe/signer/src/lib.rs | 2 +- ethexe/tx-pool/src/tests.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/ethexe/signer/src/lib.rs b/ethexe/signer/src/lib.rs index e17ad339695..fc07995c872 100644 --- a/ethexe/signer/src/lib.rs +++ b/ethexe/signer/src/lib.rs @@ -76,7 +76,7 @@ impl Signer { RawSignature::create_for_digest(private_key, digest) } - /// Create a ECDSA recoverable signature with pre or post EIP-155 `v` value notation. + /// Create a ECDSA recoverable signature. // TODO #4365 pub fn sign_digest(&self, public_key: PublicKey, digest: Digest) -> Result { let private_key = self.get_private_key(public_key)?; diff --git a/ethexe/tx-pool/src/tests.rs b/ethexe/tx-pool/src/tests.rs index 11dea5bc40c..ad0d0e68eb6 100644 --- a/ethexe/tx-pool/src/tests.rs +++ b/ethexe/tx-pool/src/tests.rs @@ -18,7 +18,7 @@ use crate::{EthexeTransaction, Transaction, TxPoolCore, TxPoolTrait}; use ethexe_db::{Database, MemDb}; -use ethexe_signer::{PrivateKey, PublicKey, Signature, Signer, ToDigest}; +use ethexe_signer::{PrivateKey, PublicKey, Signer, ToDigest}; use parity_scale_codec::{Decode, Encode}; use std::str::FromStr;