diff --git a/Cargo.lock b/Cargo.lock index 11619bb4d..90ec3ce12 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1072,6 +1072,7 @@ dependencies = [ "serde_yaml", "shellexpand", "slip132", + "socks", "strict_encoding", "strip-ansi-escapes", "sysinfo", @@ -1080,6 +1081,7 @@ dependencies = [ "tonic", "tonic-build", "tonic-web", + "torut", "uuid", "zmq2", ] @@ -3085,6 +3087,17 @@ dependencies = [ "winapi", ] +[[package]] +name = "socks" +version = "0.3.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f0c3dbbd9ae980613c6dd8e28a9407b50509d3803b57624d5dfe8315218cd58b" +dependencies = [ + "byteorder", + "libc", + "winapi", +] + [[package]] name = "spin" version = "0.5.2" diff --git a/Cargo.toml b/Cargo.toml index 809a5154c..1ac74ff03 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -102,6 +102,8 @@ tonic = "0.7.2" tonic-web = "0.3.0" uuid = { version = "1.1", features = ["v4", "serde"] } zmq = { package = "zmq2", version = "0.5.0" } +socks = "0.3.4" +torut = "0.2.1" [build-dependencies] tonic-build = "0.7" diff --git a/src/bin/peerd.rs b/src/bin/peerd.rs index e394c8e96..7d1e4c4d5 100644 --- a/src/bin/peerd.rs +++ b/src/bin/peerd.rs @@ -211,8 +211,14 @@ fn main() { PeerSocket::Connect(remote_node) => { debug!("Peerd running in CONNECT mode"); debug!("Connecting to {}", &remote_node.addr()); - peerd::run_from_connect(service_config, remote_node, local_socket, local_node) - .expect("Error running peerd runtime"); + peerd::run_from_connect( + service_config, + remote_node, + local_socket, + local_node, + opts.shared.tor_proxy, + ) + .expect("Error running peerd runtime"); unreachable!() } }; diff --git a/src/bus/ctl.rs b/src/bus/ctl.rs index 3d51ab127..ec387eb6e 100644 --- a/src/bus/ctl.rs +++ b/src/bus/ctl.rs @@ -32,6 +32,7 @@ use crate::syncerd::{Health, SweepAddressAddendum}; use crate::{Error, ServiceId}; use super::p2p::Commit; +use super::{HiddenServiceInfo, WrapOnionAddressV3}; #[derive(Clone, Debug, Display, From, NetworkEncode, NetworkDecode)] #[non_exhaustive] @@ -134,6 +135,12 @@ pub enum CtlMsg { #[display("get_balance")] GetBalance(AddressSecretKey), + #[display("set_hidden_service_info")] + SetHiddenServiceInfo(HiddenServiceInfo), + + #[display("delete_hidden_service_info")] + DeleteHiddenServiceInfo(WrapOnionAddressV3), + #[display("funding_updated()")] FundingUpdated, diff --git a/src/bus/info.rs b/src/bus/info.rs index c60e0c510..9c20a1f6d 100644 --- a/src/bus/info.rs +++ b/src/bus/info.rs @@ -28,6 +28,8 @@ use crate::Error; use super::ctl::FundingInfo; use super::StateTransition; +use super::HiddenServiceInfo; + #[derive(Clone, Debug, Display, From, NetworkEncode, NetworkDecode)] #[non_exhaustive] pub enum InfoMsg { @@ -67,6 +69,9 @@ pub enum InfoMsg { #[display("get_checkpoint_entry({0})")] GetCheckpointEntry(SwapId), + #[display("get_all_hidden_service_info")] + GetAllHiddenServiceInfo, + // Progress functionalities // ---------------- // Returns a SwapProgress message @@ -179,6 +184,9 @@ pub enum InfoMsg { #[display("{0}")] AddressBalance(AddressBalance), + + #[display("hidden_service_info_list")] + HiddenServiceInfoList(List), } #[derive(Clone, PartialEq, Eq, Debug, Display, NetworkEncode, NetworkDecode)] diff --git a/src/bus/types.rs b/src/bus/types.rs index 765786c26..a7c3cd503 100644 --- a/src/bus/types.rs +++ b/src/bus/types.rs @@ -5,6 +5,7 @@ // https://opensource.org/licenses/MIT. use std::{ + convert::TryInto, fmt::{self, Debug, Display, Formatter}, str::FromStr, }; @@ -16,10 +17,11 @@ use farcaster_core::{ }; use amplify::{ToYamlString, Wrapper}; -use internet2::addr::NodeId; +use internet2::addr::{InetSocketAddr, NodeId}; use microservices::rpc; use serde_with::DisplayFromStr; -use strict_encoding::{NetworkDecode, NetworkEncode}; +use strict_encoding::{NetworkDecode, NetworkEncode, StrictDecode, StrictEncode}; +use torut::onion::{OnionAddressV3, TorPublicKeyV3}; use crate::swapd::StateReport; use crate::syncerd::Health; @@ -39,6 +41,43 @@ pub struct CheckpointEntry { pub expected_counterparty_node_id: Option, } +#[derive(Clone, Debug, Display, Eq, PartialEq, NetworkDecode, NetworkEncode)] +#[cfg_attr( + feature = "serde", + derive(Serialize, Deserialize), + serde(crate = "serde_crate") +)] +#[display(Debug)] +pub struct HiddenServiceInfo { + pub onion_address: WrapOnionAddressV3, + pub bind_address: InetSocketAddr, +} + +#[derive(Clone, PartialEq, Eq, Debug, Display)] +#[cfg_attr( + feature = "serde", + derive(Serialize, Deserialize), + serde(crate = "serde_crate") +)] +#[display(Debug)] +pub struct WrapOnionAddressV3(pub OnionAddressV3); + +impl StrictEncode for WrapOnionAddressV3 { + fn strict_encode(&self, mut e: E) -> Result { + let address_encoding = self.0.get_public_key().as_bytes().to_vec(); + address_encoding.strict_encode(&mut e) + } +} + +impl StrictDecode for WrapOnionAddressV3 { + fn strict_decode(mut d: D) -> Result { + let onion_address = Vec::::strict_decode(&mut d)?; + Ok(WrapOnionAddressV3(OnionAddressV3::from( + &TorPublicKeyV3::from_bytes(&onion_address.try_into().unwrap()).unwrap(), + ))) + } +} + #[derive(Clone, Debug, Display, Eq, PartialEq, Hash, NetworkDecode, NetworkEncode)] #[cfg_attr( feature = "serde", diff --git a/src/cli/command.rs b/src/cli/command.rs index 84ab8c7c3..69052c3b4 100644 --- a/src/cli/command.rs +++ b/src/cli/command.rs @@ -7,6 +7,7 @@ use farcaster_core::swap::btcxmr::{Deal, DealParameters}; use farcaster_core::Uuid; use std::io::{self, Read}; + use std::str::FromStr; use internet2::addr::{InetSocketAddr, NodeAddr}; diff --git a/src/config.rs b/src/config.rs index 079a654d4..0b7e53532 100644 --- a/src/config.rs +++ b/src/config.rs @@ -7,10 +7,10 @@ use crate::{AccordantBlockchain, ArbitratingBlockchain, Error}; use farcaster_core::blockchain::Network; use internet2::addr::InetSocketAddr; -use std::fs::File; use std::io::prelude::*; use std::path::Path; use std::str::FromStr; +use std::{fs::File, net::SocketAddr}; use serde::{Deserialize, Serialize}; @@ -139,6 +139,32 @@ impl Config { } } + /// Returns the addr of the tor control socket if it is set, or the default + /// socket at localhost:9051 if not. + pub fn get_tor_control_socket(&self) -> Result { + if let Some(FarcasterdConfig { + tor_control_socket: Some(addr), + .. + }) = &self.farcasterd + { + Ok(SocketAddr::from_str(addr)?) + } else { + Ok(SocketAddr::from_str("127.0.0.1:9051")?) + } + } + + pub fn create_hidden_service(&self) -> bool { + if let Some(FarcasterdConfig { + create_hidden_service: Some(create_hidden_service), + .. + }) = &self.farcasterd + { + *create_hidden_service + } else { + false + } + } + /// Returns the swap config for the specified network and arbitrating/accordant blockchains pub fn get_swap_config( &self, @@ -230,6 +256,11 @@ pub struct FarcasterdConfig { pub bind_ip: Option, /// Whether checkpoints should be auto restored at start-up, or not pub auto_restore: Option, + /// Tor control socket for creating the hidden service + pub tor_control_socket: Option, + /// Whether to create a hidden service or not. If set, the node will only + /// run in hidden service mode + pub create_hidden_service: Option, } #[derive(Deserialize, Serialize, Debug, Clone)] @@ -432,6 +463,8 @@ impl Default for FarcasterdConfig { // write the default port and ip in the generated config bind_port: Some(FARCASTER_BIND_PORT), bind_ip: Some(FARCASTER_BIND_IP.to_string()), + tor_control_socket: None, + create_hidden_service: None, } } } diff --git a/src/databased/runtime.rs b/src/databased/runtime.rs index 8fb6ced51..443733732 100644 --- a/src/databased/runtime.rs +++ b/src/databased/runtime.rs @@ -4,13 +4,17 @@ // license that can be found in the LICENSE file or at // https://opensource.org/licenses/MIT. +use crate::bus::{HiddenServiceInfo, WrapOnionAddressV3}; use farcaster_core::swap::btcxmr::Deal; use farcaster_core::swap::SwapId; use farcaster_core::{blockchain::Blockchain, role::TradeRole}; +use internet2::addr::InetSocketAddr; use lmdb::{Cursor, Transaction as LMDBTransaction}; +use std::convert::TryInto; use std::io::Cursor as IoCursor; use std::path::PathBuf; use strict_encoding::{StrictDecode, StrictEncode}; +use torut::onion::{OnionAddressV3, TorPublicKeyV3}; use crate::bus::{ ctl::{Checkpoint, CtlMsg}, @@ -212,6 +216,23 @@ impl Runtime { })?; } + CtlMsg::SetHiddenServiceInfo(HiddenServiceInfo { + onion_address, + bind_address, + }) => { + self.database + .set_hidden_service_info(&onion_address.0, &bind_address)?; + } + + CtlMsg::DeleteHiddenServiceInfo(onion_address) => { + if let Err(err) = self.database.delete_hidden_service_info(&onion_address.0) { + warn!( + "Did not delete hidden service info: {} due to {}", + onion_address, err + ); + } + } + _ => { error!("BusMsg {} is not supported by the CTL interface", request); } @@ -255,6 +276,16 @@ impl Runtime { }; } + InfoMsg::GetAllHiddenServiceInfo => { + let hidden_service_infos = self.database.get_all_hidden_service_info()?.into(); + endpoints.send_to( + ServiceBus::Info, + self.identity(), + source, + BusMsg::Info(InfoMsg::HiddenServiceInfoList(hidden_service_infos)), + )?; + } + InfoMsg::GetCheckpointEntry(swap_id) => { match self.database.get_checkpoint_info(&swap_id) { Ok(entry) => { @@ -400,6 +431,7 @@ const LMDB_CHECKPOINT_INFOS: &str = "checkpoint_infos"; const LMDB_BITCOIN_ADDRESSES: &str = "bitcoin_addresses"; const LMDB_MONERO_ADDRESSES: &str = "monero_addresses"; const LMDB_DEAL_HISTORY: &str = "deal_history"; +const LMDB_HIDDEN_SERVICE_INFO: &str = "hidden_service_info"; impl Database { fn new(path: PathBuf) -> Result { @@ -412,6 +444,7 @@ impl Database { env.create_db(Some(LMDB_BITCOIN_ADDRESSES), lmdb::DatabaseFlags::empty())?; env.create_db(Some(LMDB_DEAL_HISTORY), lmdb::DatabaseFlags::empty())?; env.create_db(Some(LMDB_MONERO_ADDRESSES), lmdb::DatabaseFlags::empty())?; + env.create_db(Some(LMDB_HIDDEN_SERVICE_INFO), lmdb::DatabaseFlags::empty())?; Ok(Database(env)) } @@ -644,6 +677,52 @@ impl Database { res } + fn set_hidden_service_info( + &mut self, + onion_address: &OnionAddressV3, + socket_address: &InetSocketAddr, + ) -> Result<(), Error> { + let db = self.0.open_db(Some(LMDB_HIDDEN_SERVICE_INFO))?; + let mut tx = self.0.begin_rw_txn()?; + let key_bytes = onion_address.get_public_key().to_bytes(); + if tx.get(db, &key_bytes).is_ok() { + tx.del(db, &key_bytes, None)?; + } + let mut val = vec![]; + socket_address.strict_encode(&mut val)?; + tx.put(db, &key_bytes, &val, lmdb::WriteFlags::empty())?; + tx.commit()?; + Ok(()) + } + + fn get_all_hidden_service_info(&mut self) -> Result, Error> { + let db = self.0.open_db(Some(LMDB_HIDDEN_SERVICE_INFO))?; + let tx = self.0.begin_ro_txn()?; + let mut cursor = tx.open_ro_cursor(db)?; + let res = cursor + .iter() + .map(|(key, value)| { + Ok(HiddenServiceInfo { + onion_address: WrapOnionAddressV3(OnionAddressV3::from( + &TorPublicKeyV3::from_bytes(key.try_into().unwrap()).unwrap(), + )), + bind_address: InetSocketAddr::strict_decode(std::io::Cursor::new(value))?, + }) + }) + .collect(); + drop(cursor); + tx.abort(); + res + } + + fn delete_hidden_service_info(&mut self, key: &OnionAddressV3) -> Result<(), lmdb::Error> { + let db = self.0.open_db(Some(LMDB_HIDDEN_SERVICE_INFO))?; + let mut tx = self.0.begin_rw_txn()?; + tx.del(db, &key.get_public_key().as_bytes(), None)?; + tx.commit()?; + Ok(()) + } + fn get_checkpoint_state(&mut self, checkpoint_key: &CheckpointKey) -> Result, Error> { let db = self.0.open_db(Some(LMDB_CHECKPOINTS))?; let tx = self.0.begin_ro_txn()?; diff --git a/src/error.rs b/src/error.rs index 211152acc..f8a775e3e 100644 --- a/src/error.rs +++ b/src/error.rs @@ -139,6 +139,11 @@ pub enum Error { #[from] Inet2AddrParseError(internet2::addr::AddrParseError), + /// Net socket address parsing errors + #[display(inner)] + #[from] + SocketAddrError(std::net::AddrParseError), + /// Tonic gRPC deamon transport errors #[display(inner)] #[from] diff --git a/src/farcasterd/mod.rs b/src/farcasterd/mod.rs index 6877b8842..8a6ac719a 100644 --- a/src/farcasterd/mod.rs +++ b/src/farcasterd/mod.rs @@ -9,6 +9,7 @@ mod opts; mod runtime; pub mod stats; mod syncer_state_machine; +mod tor_control; mod trade_state_machine; #[cfg(feature = "shell")] diff --git a/src/farcasterd/runtime.rs b/src/farcasterd/runtime.rs index 2507df345..bca6f438e 100644 --- a/src/farcasterd/runtime.rs +++ b/src/farcasterd/runtime.rs @@ -8,10 +8,13 @@ use crate::bus::ctl::{CtlMsg, FundingInfo, GetKeys, SwapKeys}; use crate::bus::info::FundingInfos; use crate::bus::p2p::{PeerMsg, TakerCommit}; use crate::bus::sync::SyncMsg; -use crate::bus::{BusMsg, DealInfo, DealStatus, List, ServiceBus}; +use crate::bus::{ + BusMsg, DealInfo, DealStatus, HiddenServiceInfo, List, ServiceBus, WrapOnionAddressV3, +}; use crate::event::StateMachineExecutor; use crate::farcasterd::stats::Stats; use crate::farcasterd::syncer_state_machine::{SyncerStateMachine, SyncerStateMachineExecutor}; +use crate::farcasterd::tor_control::create_v3_onion_service; use crate::farcasterd::trade_state_machine::{TradeStateMachine, TradeStateMachineExecutor}; use crate::farcasterd::Opts; use crate::syncerd::{AddressBalance, TaskAborted}; @@ -50,7 +53,7 @@ use microservices::esb::{self, Handler}; pub fn run( service_config: ServiceConfig, config: Config, - _opts: Opts, + opts: Opts, wallet_token: Token, ) -> Result<(), Error> { let _walletd = launch("walletd", ["--token", &wallet_token.to_string()])?; @@ -91,9 +94,13 @@ pub fn run( progress_subscriptions: none!(), stats: none!(), config, + opts, + hidden_service: None, syncer_task_counter: 0, trade_state_machines: vec![], syncer_state_machines: none!(), + old_hidden_services: vec![], + checked_database_consistency: false, }; let broker = true; @@ -108,16 +115,20 @@ pub struct Runtime { node_secret_key: Option, // Set by Keys request shortly after Hello from walletd node_public_key: Option, // Set by Keys request shortly after Hello from walletd pub listens: HashSet, // Set by MakeDeal, contains unique socket addresses of the binding peerd listeners. + pub hidden_service: Option, // Set By MakeDeal, contains a unique hidden service address. pub spawning_services: HashSet, // Services that have been launched, but have not replied with Hello yet pub registered_services: HashSet, // Services that have announced themselves with Hello pub deals: HashSet, // The set of all known deals. Includes open, consumed and ended deals includes open, consumed and ended deals progress: HashMap>, // A mapping from Swap ServiceId to its sent and received progress messages (Progress, Success, Failure) progress_subscriptions: HashMap>, // A mapping from a Client ServiceId to its subsribed swap progresses - pub stats: Stats, // Some stats about deals and swaps - pub config: Config, // The complete node configuration + pub stats: Stats, // Some stats about deals and swaps + pub config: Config, // Configuration for syncers, auto-funding, and grpc + pub opts: Opts, pub syncer_task_counter: u32, // A strictly incrementing counter of issued syncer tasks pub trade_state_machines: Vec, // New trade state machines are inserted on creation and destroyed upon state machine end transitions syncer_state_machines: HashMap, // New syncer state machines are inserted by their syncer task id when sending a syncer request and destroyed upon matching syncer request receival + pub old_hidden_services: Vec, // Populated at the start once the HiddenServiceInfoList is received + checked_database_consistency: bool, } impl CtlServer for Runtime {} @@ -242,6 +253,12 @@ impl Runtime { BusMsg::Ctl(CtlMsg::CleanDanglingDeals), )?; self.handle_auto_restore(endpoints)?; + endpoints.send_to( + ServiceBus::Info, + self.identity(), + source.clone(), + BusMsg::Info(InfoMsg::GetAllHiddenServiceInfo), + )?; } ServiceId::Wallet => { self.registered_services.insert(source.clone()); @@ -663,6 +680,11 @@ impl Runtime { )?; } + InfoMsg::HiddenServiceInfoList(list) => { + self.old_hidden_services = list.to_vec(); + self.checked_database_consistency = true; + } + req => { warn!("Ignoring request: {}", req.err()); } @@ -724,6 +746,10 @@ impl Runtime { Err(Error::Farcaster( "Farcaster not ready yet, databased still starting".to_string(), )) + } else if !self.checked_database_consistency { + Err(Error::Farcaster( + "Farcaster not ready yet, waiting for database consistency check".to_string(), + )) } else { Ok(()) } @@ -1103,6 +1129,109 @@ impl Runtime { } } + pub fn listen_tor( + &mut self, + endpoints: &mut Endpoints, + bind_addr: InetSocketAddr, + public_port: u16, + ) -> Result<(InetSocketAddr, NodeId), Error> { + self.services_ready()?; + let (peer_secret_key, peer_public_key) = self.peer_keys_ready()?; + let node_id = NodeId::from(peer_public_key); + + if self.hidden_service.is_some() && self.listens.iter().any(|a| a == &bind_addr) { + let hidden_service = self.hidden_service.expect("checked"); + debug!("Already created hidden service: {}", hidden_service); + return Ok((hidden_service, node_id)); + } + + if self.listens.iter().any(|a| a == &bind_addr) { + let msg = format!("Already listening on {}", &bind_addr); + debug!("{}", &msg); + return Err(Error::Farcaster("lmao".to_string())); + } + info!( + "{} for incoming peer connections on {}", + "Starting listener".bright_blue_bold(), + bind_addr.bright_blue_bold() + ); + + let address = bind_addr.address(); + let port = bind_addr + .port() + .ok_or_else(|| Error::Farcaster("listen requires the port to listen on".to_string()))?; + + let public_onion_address = create_v3_onion_service( + bind_addr, + public_port, + self.config.get_tor_control_socket()?, + &self.old_hidden_services, + ) + .unwrap(); + let public_socket_addr = InetSocketAddr::Tor(public_onion_address.get_public_key()); + + // Get rid of the old hidden services in the database, write to current + // hidden service to it, and sync our list of used hidden services with + // the database + for s in self.old_hidden_services.iter() { + endpoints.send_to( + ServiceBus::Ctl, + self.identity(), + ServiceId::Database, + BusMsg::Ctl(CtlMsg::DeleteHiddenServiceInfo(s.onion_address.clone())), + )?; + } + endpoints.send_to( + ServiceBus::Ctl, + self.identity(), + ServiceId::Database, + BusMsg::Ctl(CtlMsg::SetHiddenServiceInfo(HiddenServiceInfo { + onion_address: WrapOnionAddressV3(public_onion_address), + bind_address: bind_addr, + })), + )?; + endpoints.send_to( + ServiceBus::Info, + self.identity(), + ServiceId::Database, + BusMsg::Info(InfoMsg::GetAllHiddenServiceInfo), + )?; + + debug!("Instantiating peerd..."); + let child = launch( + "peerd", + [ + "--listen", + &format!("{}", address), + "--port", + &port.to_string(), + "--peer-secret-key", + &format!("{}", peer_secret_key.display_secret()), + "--token", + &self.wallet_token.clone().to_string(), + ], + ); + + // in case it can't connect wait for it to crash + std::thread::sleep(Duration::from_secs_f32(0.1)); + + // status is Some if peerd returns because it crashed + let (child, status) = child.and_then(|mut c| c.try_wait().map(|s| (c, s)))?; + if status.is_some() { + return Err(Error::Peer(internet2::presentation::Error::InvalidEndpoint)); + } + + self.listens.insert(bind_addr); + self.hidden_service = Some(public_socket_addr); + debug!("New instance of peerd launched with PID {}", child.id()); + info!( + "Connection daemon {} for incoming peer connections on {}", + "listens".bright_green_bold(), + bind_addr + ); + Ok((public_socket_addr, node_id)) + } + pub fn listen(&mut self, bind_addr: InetSocketAddr) -> Result { self.services_ready()?; let (peer_secret_key, peer_public_key) = self.peer_keys_ready()?; diff --git a/src/farcasterd/tor_control.rs b/src/farcasterd/tor_control.rs new file mode 100644 index 000000000..7d2ef6daf --- /dev/null +++ b/src/farcasterd/tor_control.rs @@ -0,0 +1,87 @@ +use std::net::SocketAddr; + +use internet2::addr::InetSocketAddr; +use tokio::net::TcpStream; +use torut::{ + control::{ConnError, UnauthenticatedConn}, + onion::OnionAddressV3, +}; + +use crate::bus::HiddenServiceInfo; + +pub fn create_v3_onion_service( + bind_addr: InetSocketAddr, + public_port: u16, + control_addr: SocketAddr, + existing_hidden_services: &Vec, +) -> Result { + use tokio::runtime::Builder; + let rt = Builder::new_multi_thread() + .worker_threads(1) + .enable_all() + .build() + .unwrap(); + rt.block_on(async { + // Connect to the Tor control socket + let s = TcpStream::connect(control_addr.to_string()).await.unwrap(); + let mut uc = UnauthenticatedConn::new(s); + + // Authenticate to the tor socket + let proto_info = uc.load_protocol_info().await.unwrap(); + if let Some(tor_auth_data) = proto_info.make_auth_data()? { + uc.authenticate(&tor_auth_data).await?; + } + let mut ac = uc.into_authenticated().await; + + // We have to set a dummy event handler, even if we are not reading any + // events + ac.set_async_event_handler(Some(|_| async move { Ok(()) })); + + // Take ownership (meaning control) the Tor instance + ac.take_ownership().await.unwrap(); + + // Check if we previously created this hidden service and used it on the same bind + if !existing_hidden_services.is_empty() { + let res = ac.get_info("onions/detached").await.unwrap(); + for existing_hidden_service in existing_hidden_services.iter() { + for line in res.lines() { + if line.contains( + &existing_hidden_service + .onion_address + .0 + .get_address_without_dot_onion(), + ) && existing_hidden_service.bind_address == bind_addr + { + return Ok(existing_hidden_service.onion_address.0); + } + } + } + } + + let bind_socket_addr = match bind_addr { + InetSocketAddr::IPv4(socket) => SocketAddr::V4(socket), + InetSocketAddr::IPv6(socket) => SocketAddr::V6(socket), + _ => { + return Err(ConnError::InvalidFormat); + } + }; + + // Generate a secret service + let key = torut::onion::TorSecretKeyV3::generate(); + ac.add_onion_v3( + &key, + true, // Detach to ensure the hidden service remains on the tor instance after we disconnect from the control socket + false, + false, + None, + &mut [(public_port, bind_socket_addr)].iter(), + ) + .await?; + + // Rescind ownership of the control port + ac.drop_ownership().await?; + + println!("public key: {}", key.public().get_onion_address()); + Ok(key.public().get_onion_address()) + }) +} diff --git a/src/farcasterd/trade_state_machine.rs b/src/farcasterd/trade_state_machine.rs index 3f0c04e5c..521c50ff5 100644 --- a/src/farcasterd/trade_state_machine.rs +++ b/src/farcasterd/trade_state_machine.rs @@ -459,7 +459,21 @@ fn attempt_transition_to_make_deal( } Ok(bind_addr) => bind_addr, }; - match runtime.listen(bind_addr) { + let res = if runtime.config.create_hidden_service() { + runtime.listen_tor( + event.endpoints, + bind_addr, + public_addr.port().ok_or_else(|| { + Error::Farcaster( + "Cannot combine create hidden service with passed in Tor public address" + .to_string(), + ) + })?, + ) + } else { + runtime.listen(bind_addr).map(|n| (public_addr, n)) + }; + match res { Err(err) => { warn!("Failed to start peerd listen, cannot make deal: {}", err); event.complete_client_ctl(CtlMsg::Failure(Failure { @@ -468,7 +482,7 @@ fn attempt_transition_to_make_deal( }))?; Ok(None) } - Ok(node_id) => { + Ok((public_addr, node_id)) => { let deal = deal_parameters.to_v1(node_id.public_key(), public_addr); let msg = s!("Deal registered, please share with taker."); info!( diff --git a/src/peerd/runtime.rs b/src/peerd/runtime.rs index d6e43e743..55195be6d 100644 --- a/src/peerd/runtime.rs +++ b/src/peerd/runtime.rs @@ -7,6 +7,7 @@ use farcaster_core::swap::SwapId; use internet2::addr::LocalNode; use microservices::peer::RecvMessage; +use std::net::SocketAddr; use std::sync::Arc; use std::thread::spawn; use std::time::{Duration, SystemTime}; @@ -39,7 +40,14 @@ pub fn start_connect_peer_listener_runtime( remote_node_addr: NodeAddr, local_node: LocalNode, peerd_service_id: ServiceId, + tor_proxy: Option, ) -> Result<(PeerSender, std::sync::mpsc::Sender<()>), Error> { + if tor_proxy.is_none() && matches!(remote_node_addr.addr, InetSocketAddr::Tor(_)) { + return Err(Error::Farcaster(format!( + "Can only connect to remote peer on Tor address {} when Tor proxy is set too", + remote_node_addr + ))); + } let connection = PeerConnection::connect_brontozaur(local_node, remote_node_addr)?; debug!("Connected to remote peer: {}", remote_node_addr); @@ -115,6 +123,7 @@ pub fn run_from_connect( remote_node_addr: NodeAddr, local_socket: Option, local_node: LocalNode, + tor_proxy: Option, ) -> Result<(), Error> { debug!("Opening bridge between runtime and peer receiver threads"); let rx = ZMQ_CONTEXT.socket(zmq::PULL)?; @@ -132,6 +141,7 @@ pub fn run_from_connect( remote_node_addr: Some(remote_node_addr), local_socket, local_node, + tor_proxy, peer_sender: None, // As connector we create the sender on is_ready forked_from_listener: false, started: SystemTime::now(), @@ -237,6 +247,7 @@ pub fn run_from_listener( remote_node_addr, local_socket, local_node, + tor_proxy: None, peer_sender: Some(peer_sender), forked_from_listener: true, started: SystemTime::now(), @@ -372,6 +383,7 @@ pub struct Runtime { remote_node_addr: Option, local_socket: Option, local_node: LocalNode, + tor_proxy: Option, peer_sender: Option, // TODO: make this an enum instead with a descriptive distinction of listening and connecting to a listener @@ -404,6 +416,7 @@ impl esb::Handler for Runtime { self.remote_node_addr.expect("Checked for connecter"), self.local_node, self.identity(), + self.tor_proxy, ) { Ok(val) => { debug!( @@ -666,6 +679,7 @@ impl Runtime { self.remote_node_addr.expect("Checked for connnecter"), self.local_node, self.identity(), + self.tor_proxy, ) { Err(err) => { attempt += 1;