diff --git a/Cargo.lock b/Cargo.lock index 048db25..92deb15 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2426,7 +2426,9 @@ name = "raiden-network-messages" version = "0.1.0" dependencies = [ "derive_more", + "hex", "raiden-blockchain", + "raiden-pathfinding", "raiden-primitives", "raiden-state-machine", "serde", @@ -2444,6 +2446,7 @@ dependencies = [ "chrono", "futures", "matrix-sdk", + "parking_lot 0.11.2", "raiden-blockchain", "raiden-network-messages", "raiden-primitives", diff --git a/bin/raiden/src/main.rs b/bin/raiden/src/main.rs index 1612ab4..926af54 100755 --- a/bin/raiden/src/main.rs +++ b/bin/raiden/src/main.rs @@ -27,6 +27,7 @@ use raiden_client::{ SyncService, }, }; +use raiden_network_messages::decode::MessageDecoder; use raiden_pathfinding::{ self, config::PFSConfig, @@ -222,6 +223,14 @@ async fn main() { }, }; + let message_decoder = MessageDecoder { + private_key: account.private_key(), + our_address: account.address(), + proxy_manager: proxy_manager.clone(), + secret_registry_address: default_addresses.secret_registry, + pathfinding_service_url: cli.services_config.pathfinding_service_address.clone(), + }; + // # // # Initialize Raiden // # @@ -245,7 +254,7 @@ async fn main() { config, contracts_manager, proxy_manager, - state_manager, + state_manager: state_manager.clone(), transport: transport_sender.clone(), }); @@ -280,7 +289,7 @@ async fn main() { futures::join!( block_monitor_service.start(), - transport_service.run(transitioner), + transport_service.run(state_manager, transitioner, message_decoder), http_service.start() ); } diff --git a/raiden/network/messages/Cargo.toml b/raiden/network/messages/Cargo.toml index 60c949b..2f3c8f4 100644 --- a/raiden/network/messages/Cargo.toml +++ b/raiden/network/messages/Cargo.toml @@ -13,6 +13,7 @@ rust-version = "1.59" [dependencies] # 3rd-Party derive_more = { version = "0.99.11", default-features = false } +hex = { version = "0.4.3" } thiserror = { version = "1.0", default-features = false } tiny-keccak = { version = "2.0.0", features = [ "keccak" ] } serde = { version = "1.0.136", default-features = false, features = [ "derive" ] } @@ -21,5 +22,6 @@ web3 = { version = "0.18.0", default-features = false, features = ["signing"] } # Raiden raiden-blockchain = { path = "../../blockchain" } +raiden-pathfinding = { path = "../../pathfinding" } raiden-primitives = { path = "../../primitives" } raiden-state-machine = { path = "../../state-machine" } diff --git a/raiden/network/messages/src/decode.rs b/raiden/network/messages/src/decode.rs index 33fb227..3738be6 100644 --- a/raiden/network/messages/src/decode.rs +++ b/raiden/network/messages/src/decode.rs @@ -1,4 +1,55 @@ -use std::collections::HashMap; +use std::{ + collections::HashMap, + sync::Arc, +}; + +use raiden_blockchain::{ + keys::{ + self, + PrivateKey, + }, + proxies::ProxyManager, +}; +use raiden_primitives::types::{ + Address, + Bytes, + PaymentIdentifier, + Secret, + SecretHash, + SecretRegistryAddress, + Signature, + TokenAmount, +}; +use raiden_state_machine::{ + machine::channel::utils::hash_balance_data, + types::{ + ActionInitMediator, + ActionInitTarget, + AddressMetadata, + BalanceProofState, + CanonicalIdentifier, + ChainState, + DecryptedSecret, + HashTimeLockState, + HopState, + LockedTransferState, + ReceiveLockExpired, + ReceiveProcessed, + ReceiveSecretRequest, + ReceiveSecretReveal, + ReceiveUnlock, + ReceiveWithdrawConfirmation, + ReceiveWithdrawExpired, + ReceiveWithdrawRequest, + RouteState, + StateChange, + }, + views, +}; +use web3::signing::{ + self, + keccak256, +}; use super::messages::{ IncomingMessage, @@ -9,16 +60,295 @@ use crate::messages::{ Processed, SecretRequest, SecretReveal, + SignedEnvelopeMessage, + SignedMessage, Unlock, WithdrawConfirmation, WithdrawExpired, WithdrawRequest, }; -pub struct MessageDecoder {} +#[derive(Clone)] +pub struct MessageDecoder { + pub private_key: PrivateKey, + pub our_address: Address, + pub proxy_manager: Arc, + pub secret_registry_address: SecretRegistryAddress, + pub pathfinding_service_url: String, +} impl MessageDecoder { - pub fn decode(body: serde_json::Value) -> Result { + pub async fn decode( + &self, + chain_state: ChainState, + body: serde_json::Value, + ) -> Result, String> { + let message = self.into_message(body)?; + + match message.inner { + crate::messages::MessageInner::LockedTransfer(message) => { + let sender = self.get_sender(&message.bytes(), &message.signature)?; + let balance_hash = hash_balance_data( + message.transferred_amount, + message.locked_amount, + message.locksroot.clone(), + )?; + let balance_proof = BalanceProofState { + nonce: message.nonce, + transferred_amount: message.transferred_amount, + locked_amount: message.locked_amount, + locksroot: message.locksroot.clone(), + canonical_identifier: CanonicalIdentifier { + chain_identifier: message.chain_id, + token_network_address: message.token_network_address, + channel_identifier: message.channel_identifier, + }, + balance_hash, + message_hash: Some(message.message_hash()), + signature: Some(Signature::from_slice(&message.signature)), + sender: Some(sender), + }; + + let route_states: Vec = message + .metadata + .routes + .iter() + .map(|route| RouteState { + route: route.route.clone(), + address_to_metadata: route.address_metadata.clone(), + swaps: Default::default(), + estimated_fee: Default::default(), + }) + .collect(); + + let transfer = LockedTransferState { + payment_identifier: message.payment_identifier, + token: message.token, + lock: HashTimeLockState::create( + message.lock.amount, + message.lock.expiration, + message.lock.secrethash, + ), + initiator: message.initiator, + target: message.target, + message_identifier: message.message_identifier, + route_states, + balance_proof: balance_proof.clone(), + secret: message.secret, + }; + let from_hop = HopState { + node_address: sender, + channel_identifier: message.channel_identifier, + }; + + if message.target == self.our_address { + let mut init_target = ActionInitTarget { + sender, + balance_proof, + from_hop, + transfer: transfer.clone(), + received_valid_secret: false, + }; + + if let Some(encrypted_secret) = message.metadata.secret { + let decrypted_secret = + decrypt_secret(encrypted_secret.0, &self.private_key)?; + if transfer.lock.amount < decrypted_secret.amount || + transfer.payment_identifier != decrypted_secret.payment_identifier + { + return Err(format!("Invalid Secret")) + } + + init_target.received_valid_secret = true; + return Ok(vec![ + StateChange::ActionInitTarget(init_target), + StateChange::ReceiveSecretReveal(ReceiveSecretReveal { + sender, + secret: decrypted_secret.secret, + secrethash: message.lock.secrethash, + }), + ]) + } + return Ok(vec![StateChange::ActionInitTarget(init_target)]) + } else { + let mut filtered_route_states = vec![]; + for route_state in transfer.route_states.iter() { + if let Some(next_hope_address) = route_state.hop_after(self.our_address) { + if let Some(channel_state) = + views::get_channel_by_token_network_and_partner( + &chain_state, + transfer + .balance_proof + .canonical_identifier + .token_network_address, + next_hope_address, + ) { + filtered_route_states.push(route_state.clone()); + } + } + } + return Ok(vec![StateChange::ActionInitMediator(ActionInitMediator { + sender, + balance_proof, + from_hop, + candidate_route_states: filtered_route_states, + from_transfer: transfer, + })]) + } + }, + crate::messages::MessageInner::LockExpired(message) => { + let sender = self.get_sender(&message.bytes(), &message.signature)?; + let balance_hash = hash_balance_data( + message.transferred_amount, + message.locked_amount, + message.locksroot.clone(), + )?; + let balance_proof = BalanceProofState { + nonce: message.nonce, + transferred_amount: message.transferred_amount, + locked_amount: message.locked_amount, + locksroot: message.locksroot.clone(), + canonical_identifier: CanonicalIdentifier { + chain_identifier: message.chain_id, + token_network_address: message.token_network_address, + channel_identifier: message.channel_identifier, + }, + balance_hash, + message_hash: Some(message.message_hash()), + signature: Some(Signature::from_slice(&message.signature)), + sender: Some(sender), + }; + Ok(vec![StateChange::ReceiveLockExpired(ReceiveLockExpired { + sender, + secrethash: message.secrethash, + message_identifier: message.message_identifier, + balance_proof, + })]) + }, + crate::messages::MessageInner::SecretRequest(message) => { + let sender = self.get_sender(&message.bytes(), &message.signature)?; + Ok(vec![StateChange::ReceiveSecretRequest(ReceiveSecretRequest { + sender, + secrethash: message.secrethash, + payment_identifier: message.payment_identifier, + amount: message.amount, + expiration: message.expiration, + revealsecret: None, + })]) + }, + crate::messages::MessageInner::SecretReveal(message) => { + let sender = self.get_sender(&message.bytes(), &message.signature)?; + let mut secrethash = vec![]; + secrethash.extend_from_slice(&keccak256(&message.secret.0)); + Ok(vec![StateChange::ReceiveSecretReveal(ReceiveSecretReveal { + sender, + secrethash: SecretHash::from_slice(&secrethash), + secret: message.secret, + })]) + }, + crate::messages::MessageInner::Unlock(message) => { + let sender = self.get_sender(&message.bytes(), &message.signature)?; + let balance_hash = hash_balance_data( + message.transferred_amount, + message.locked_amount, + message.locksroot.clone(), + )?; + let balance_proof = BalanceProofState { + nonce: message.nonce, + transferred_amount: message.transferred_amount, + locked_amount: message.locked_amount, + locksroot: message.locksroot.clone(), + canonical_identifier: CanonicalIdentifier { + chain_identifier: message.chain_id, + token_network_address: message.token_network_address, + channel_identifier: message.channel_identifier, + }, + balance_hash, + message_hash: Some(message.message_hash()), + signature: Some(Signature::from_slice(&message.signature)), + sender: Some(sender), + }; + let mut secrethash = vec![]; + secrethash.extend_from_slice(&keccak256(&message.secret.0)); + Ok(vec![StateChange::ReceiveUnlock(ReceiveUnlock { + sender, + balance_proof, + secrethash: SecretHash::from_slice(&secrethash), + message_identifier: message.message_identifier, + secret: message.secret, + })]) + }, + crate::messages::MessageInner::WithdrawRequest(message) => { + let sender = self.get_sender(&message.bytes(), &message.signature)?; + + let sender_metadata = raiden_pathfinding::query_address_metadata( + self.pathfinding_service_url.clone(), + sender, + ) + .await + .map_err(|e| format!("Could not fetch address metadata: {:?}", sender))?; + + Ok(vec![StateChange::ReceiveWithdrawRequest(ReceiveWithdrawRequest { + sender, + message_identifier: message.message_identifier, + canonical_identifier: CanonicalIdentifier { + chain_identifier: message.chain_id, + token_network_address: message.token_network_address, + channel_identifier: message.channel_identifier, + }, + total_withdraw: message.total_withdraw, + nonce: message.nonce, + expiration: message.expiration, + signature: Signature::from_slice(&message.signature), + participant: message.participant, + coop_settle: message.coop_settle, + sender_metadata: Some(sender_metadata), + })]) + }, + crate::messages::MessageInner::WithdrawConfirmation(message) => { + let sender = self.get_sender(&message.bytes(), &message.signature)?; + Ok(vec![StateChange::ReceiveWithdrawConfirmation(ReceiveWithdrawConfirmation { + sender, + message_identifier: message.message_identifier, + canonical_identifier: CanonicalIdentifier { + chain_identifier: message.chain_id, + token_network_address: message.token_network_address, + channel_identifier: message.channel_identifier, + }, + total_withdraw: message.total_withdraw, + nonce: message.nonce, + expiration: message.expiration, + signature: Signature::from_slice(&message.signature), + participant: message.participant, + })]) + }, + crate::messages::MessageInner::WithdrawExpired(message) => { + let sender = self.get_sender(&message.bytes(), &message.signature)?; + Ok(vec![StateChange::ReceiveWithdrawExpired(ReceiveWithdrawExpired { + sender, + message_identifier: message.message_identifier, + canonical_identifier: CanonicalIdentifier { + chain_identifier: message.chain_id, + token_network_address: message.token_network_address, + channel_identifier: message.channel_identifier, + }, + total_withdraw: message.total_withdraw, + nonce: message.nonce, + expiration: message.expiration, + participant: message.participant, + })]) + }, + crate::messages::MessageInner::Processed(message) => { + let sender = self.get_sender(&message.bytes(), &message.signature)?; + Ok(vec![StateChange::ReceiveProcessed(ReceiveProcessed { + sender, + message_identifier: message.message_identifier, + })]) + }, + } + } + + pub fn into_message(&self, body: serde_json::Value) -> Result { let s = body.as_str().ok_or(format!("Could not convert message to string"))?.to_owned(); let map: HashMap = @@ -99,4 +429,43 @@ impl MessageDecoder { _ => return Err(format!("Message type {} is unknown", message_type)), }; } + + fn get_sender(&self, data: &[u8], signature: &[u8]) -> Result { + keys::recover(&data, &signature) + .map_err(|e| format!("Could not recover address from signature: {}", e)) + } +} + +pub fn encrypt_secret( + secret: Secret, + target_metadata: AddressMetadata, + amount: TokenAmount, + payment_identifier: PaymentIdentifier, +) -> Result { + let message = target_metadata.user_id; + let signature = hex::decode(target_metadata.displayname) + .map_err(|e| format!("Could not decode signature: {:?}", e))?; + let public_key = signing::recover(message.as_bytes(), &signature, 0) + .map_err(|e| format!("Could not recover public key: {:?}", e))?; + + let data = DecryptedSecret { secret, amount, payment_identifier }; + + let json = serde_json::to_string(&data) + .map_err(|e| format!("Could not serialize encrypted secret"))?; + + Ok(Bytes( + keys::encrypt(public_key.as_bytes(), json.as_bytes()) + .map_err(|e| format!("Could not encrypt secret: {:?}", e))?, + )) +} + +pub fn decrypt_secret( + encrypted_secret: Vec, + private_key: &PrivateKey, +) -> Result { + let decrypted_secret = keys::decrypt(&private_key, &encrypted_secret) + .map_err(|e| format!("Could not decrypt secret: {:?}", e))?; + let json = std::str::from_utf8(&decrypted_secret) + .map_err(|e| format!("Invalid UTF-8 sequence: {}", e))?; + serde_json::from_str(json).map_err(|e| format!("Could not deserialize secret: {:?}", e)) } diff --git a/raiden/network/transport/Cargo.toml b/raiden/network/transport/Cargo.toml index 6381bde..493a607 100644 --- a/raiden/network/transport/Cargo.toml +++ b/raiden/network/transport/Cargo.toml @@ -16,6 +16,7 @@ async-trait = { version = "0.1.51", default-features = false } chrono = { version = "0.4.19", default-features = false, features = [ "clock" ] } futures = { version = "0.3.21", default-features = false } matrix-sdk = { version = "0.6.1", default-features = false, features = ["native-tls", "e2e-encryption"] } +parking_lot = { version = "0.11.2", default-features = false } thiserror = { version = "1.0", default-features = false } reqwest = { version = "0.11.4", default-features = false, features = ["json"] } serde = { version = "1.0.136", default-features = false, features = [ "derive" ] } diff --git a/raiden/network/transport/src/matrix/service.rs b/raiden/network/transport/src/matrix/service.rs index bc6c8b1..0999992 100644 --- a/raiden/network/transport/src/matrix/service.rs +++ b/raiden/network/transport/src/matrix/service.rs @@ -18,6 +18,7 @@ use matrix_sdk::{ serde::Raw, }, }; +use parking_lot::RwLock; use raiden_network_messages::{ decode::MessageDecoder, messages::{ @@ -25,11 +26,17 @@ use raiden_network_messages::{ TransportServiceMessage, }, }; -use raiden_state_machine::types::{ - QueueIdentifier, - StateChange, +use raiden_state_machine::{ + types::{ + QueueIdentifier, + StateChange, + }, + views, +}; +use raiden_transition::{ + manager::StateManager, + Transitioner, }; -use raiden_transition::Transitioner; use tokio::{ select, sync::mpsc::{ @@ -54,7 +61,6 @@ type BoxFuture<'a, T> = Pin + Send + Sync + 'a>>; pub struct MatrixService { config: TransportConfig, client: MatrixClient, - transition_service: Arc, sender: UnboundedSender, receiver: UnboundedReceiverStream, message_queues: HashMap>, @@ -65,7 +71,6 @@ impl MatrixService { pub fn new( config: TransportConfig, client: MatrixClient, - transition_service: Arc, ) -> (Self, UnboundedSender) { let (sender, receiver) = mpsc::unbounded_channel(); @@ -73,7 +78,6 @@ impl MatrixService { Self { config, client, - transition_service, sender: sender.clone(), receiver: UnboundedReceiverStream::new(receiver), message_queues: HashMap::new(), @@ -96,7 +100,12 @@ impl MatrixService { } } - pub async fn run(mut self) { + pub async fn run( + mut self, + state_manager: Arc>, + transition_service: Arc, + message_decoder: MessageDecoder, + ) { let mut sync_settings = SyncSettings::new().timeout(Duration::from_secs(30)); loop { select! { @@ -105,7 +114,7 @@ impl MatrixService { Ok(response) => { let to_device_events = response.to_device.events; for to_device_event in to_device_events.iter() { - self.process_event(to_device_event).await; + self.process_event(state_manager.clone(), transition_service.clone(), message_decoder.clone(), to_device_event).await; } let sync_token = response.next_batch; sync_settings = SyncSettings::new().timeout(Duration::from_secs(30)).token(sync_token); @@ -134,7 +143,13 @@ impl MatrixService { } } - pub async fn process_event(&self, event: &Raw) { + pub async fn process_event( + &self, + state_manager: Arc>, + transitioner: Arc, + message_decoder: MessageDecoder, + event: &Raw, + ) { match event.get_field::("type") { Ok(Some(message_type)) => { let event_body = event.json().get(); @@ -155,7 +170,10 @@ impl MatrixService { return }, }; - let message = match MessageDecoder::decode(content.clone()) { + + let chain_state = state_manager.read().current_state.clone(); + let state_changes = match message_decoder.decode(chain_state, content.clone()).await + { Ok(message) => message, Err(e) => { error!("Could not decode message: {}", message_type); @@ -163,7 +181,10 @@ impl MatrixService { }, }; - println!("Message received: {:?}", message); + for state_change in state_changes { + debug!("Transition state change: {:?}", state_change); + transitioner.transition(state_change).await; + } }, Ok(None) => { error!("Invalid event. Field 'type' does not exist");