diff --git a/Cargo.toml b/Cargo.toml index 187f95b..43dffbc 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,30 +5,48 @@ version = "0.1.0" authors = ["Dr. Maxim Orlovsky "] keywords = ["bitcoin", "node"] license = "MIT" -edition = "2018" +edition = "2021" readme = "README.md" +[lib] +name = "bp_node" + +[[bin]] +name = "queryd" +path = "src/bin/queryd.rs" + +[[bin]] +name = "bp-cli" +path = "src/bin/bp-cli.rs" + +[[bin]] +name = "bp-indexer" +path = "src/bin/bp-indexer.rs" + [dependencies] dotenv = "~0.15" -clap = "=3.0.0-beta.1" +clap = "=3.0.0-beta.5" chrono = "~0.4" derive_wrapper = "~0.1" -async-trait = "~0.1" log = { version = "~0.4", features = ["max_level_trace", "release_max_level_debug"] } env_logger = "~0.7" diesel = { version = "~1.4", features = ["postgres", "uuid", "numeric", "chrono"] } -tokio = { version = "~0.2", features = ["full"] } -futures = "~0.3" zmq = "~0.9" tiny_http = "~0.6" prometheus = "~0.8" +amplify = "~3.9.1" +bp-core = "0.5.0-rc.1" +microservices = { version = "0.5.0-beta.1", default-features = false, features = ["peer"] } +internet2 = "0.5.0-alpha.2" +bitcoin = "0.27.1" +bitcoin_hashes = "0.10.0" +miniscript = "6.0.1" [dependencies.lnpbp] git = "https://github.com/lnp-bp/rust-lnpbp" -tag = "v0.1.0-alpha.3" -features = ["tor", "tokio", "log", "daemons", "serde"] +branch = "master" -# lnpbp requires custom version of bitcoin, so to maintain type compatibility -# we have to use library through lnpbp::bitcoin handle and not via direct -# dependency -#bitcoin = "~0.23" +[patch.crates-io] +# Remove this once https://github.com/jean-airoldie/zeromq-src-rs/pull/15 got merged +zeromq-src = { git = "https://github.com/LNP-BP/zeromq-src-rs", branch = "fix/cmake" } +bp-core = { git = "https://github.com/xn3cr0nx/bp-core", branch = "pub-short-id" } diff --git a/README.md b/README.md index b09cb4a..0231cef 100644 --- a/README.md +++ b/README.md @@ -5,7 +5,6 @@ different indexing servers (like Electrum) for efficient and extended queries against bitcoin blockchain (see the drawing below). The daemon is made with focus on: -* non-blocking/async IO and APIs * ZMQ APIs for the clients * efficient indexing with [LNPBP-5 standard](https://github.com/LNP-BP/lnpbps/blob/master/lnpbp-0005.md) diff --git a/src/bin/bp-cli.rs b/src/bin/bp-cli.rs index 2f7eb88..aa4b7b1 100644 --- a/src/bin/bp-cli.rs +++ b/src/bin/bp-cli.rs @@ -11,41 +11,52 @@ // along with this software. // If not, see . -#![feature(never_type)] - -use std::env; +#![recursion_limit = "256"] +// Coding conventions +#![deny( + non_upper_case_globals, + non_camel_case_types, + non_snake_case, + unused_mut, + unused_imports, + dead_code, + // missing_docs +)] + +use clap::Parser; use log::*; -use clap::derive::Clap; - -use bp_node::cli::*; +use std::env; +use bp_node::cli::{Command, Config, Opts, Runtime}; -#[tokio::main] -async fn main() -> Result<(), String> { +fn main() -> Result<(), String> { // TODO: Parse config file as well let opts: Opts = Opts::parse(); let config: Config = opts.clone().into(); if env::var("RUST_LOG").is_err() { - env::set_var("RUST_LOG", match config.verbose { - 0 => "error", - 1 => "warn", - 2 => "info", - 3 => "debug", - 4 => "trace", - _ => "trace", - }); + env::set_var( + "RUST_LOG", + match config.verbose { + 0 => "error", + 1 => "warn", + 2 => "info", + 3 => "debug", + 4 => "trace", + _ => "trace", + }, + ); } env_logger::init(); log::set_max_level(LevelFilter::Trace); - let runtime = Runtime::init(config).await?; + let runtime = Runtime::init(config)?; // Non-interactive command processing: debug!("Parsing and processing a command"); match opts.command { - Command::Query { query } => runtime.command_query(query).await?, - _ => unimplemented!() + Command::Query { query } => runtime.command_query(query)?, + _ => unimplemented!(), } Ok(()) diff --git a/src/bin/bp-indexer.rs b/src/bin/bp-indexer.rs index 39324d9..286e133 100644 --- a/src/bin/bp-indexer.rs +++ b/src/bin/bp-indexer.rs @@ -11,33 +11,41 @@ // along with this software. // If not, see . -#![feature(never_type)] +#![recursion_limit = "256"] +// Coding conventions +#![deny( + non_upper_case_globals, + non_camel_case_types, + non_snake_case, + unused_mut, + unused_imports, + dead_code, + // missing_docs +)] - -use std::env; +use bp_node::indexer::{Command, Config, Error, Opts, Runtime}; +use clap::Parser; use log::*; -use clap::derive::Clap; - -use lnpbp::TryService; - -use bp_node::indexer::*; - +use microservices::node::TryService; +use std::env; -#[tokio::main] -async fn main() -> Result<(), Error> { +fn main() -> Result<(), Error> { // TODO: Parse config file as well let opts: Opts = Opts::parse(); let config: Config = opts.clone().into(); if env::var("RUST_LOG").is_err() { - env::set_var("RUST_LOG", match config.verbose { - 0 => "error", - 1 => "warn", - 2 => "info", - 3 => "debug", - 4 => "trace", - _ => "trace", - }); + env::set_var( + "RUST_LOG", + match config.verbose { + 0 => "error", + 1 => "warn", + 2 => "info", + 3 => "debug", + 4 => "trace", + _ => "trace", + }, + ); } env_logger::init(); log::set_max_level(LevelFilter::Trace); @@ -45,13 +53,14 @@ async fn main() -> Result<(), Error> { let mut runtime = Runtime::init(config)?; match opts.command { - Command::ClearIndex => { - runtime.clear_db() - }, + Command::ClearIndex => runtime.clear_db(), Command::IndexBlockchain { clear, .. } => { - if clear.unwrap_or(false) { runtime.clear_db()?; } - runtime.run_or_panic("Indexer runtime").await - }, - _ => unimplemented!() + if clear.unwrap_or(false) { + runtime.clear_db()?; + } + runtime.run_or_panic("Indexer runtime"); + unreachable!() + } + _ => unimplemented!(), } } diff --git a/src/bin/queryd.rs b/src/bin/queryd.rs index 3a433a3..98ad0ee 100644 --- a/src/bin/queryd.rs +++ b/src/bin/queryd.rs @@ -11,36 +11,50 @@ // along with this software. // If not, see . -#![feature(never_type)] +#![recursion_limit = "256"] +// Coding conventions +#![deny( + non_upper_case_globals, + non_camel_case_types, + non_snake_case, + unused_mut, + unused_imports, + dead_code, + // missing_docs +)] -use std::env; +use clap::Parser; use log::*; -use clap::derive::Clap; - -use lnpbp::TryService; +use microservices::node::TryService; +use std::env; -use bp_node::BootstrapError; -use bp_node::queryd::*; +use bp_node::{ + queryd::{Config, Opts, Runtime}, + BootstrapError, +}; -#[tokio::main] -async fn main() -> Result<(), BootstrapError> { +fn main() -> Result<(), BootstrapError> { // TODO: Parse config file as well let opts: Opts = Opts::parse(); let config: Config = opts.into(); if env::var("RUST_LOG").is_err() { - env::set_var("RUST_LOG", match config.verbose { - 0 => "error", - 1 => "warn", - 2 => "info", - 3 => "debug", - 4 => "trace", - _ => "trace", - }); + env::set_var( + "RUST_LOG", + match config.verbose { + 0 => "error", + 1 => "warn", + 2 => "info", + 3 => "debug", + 4 => "trace", + _ => "trace", + }, + ); } env_logger::init(); log::set_max_level(LevelFilter::Trace); - let runtime = Runtime::init(config).await?; - runtime.run_or_panic("Queryd runtime").await + let runtime = Runtime::init(config)?; + runtime.run_or_panic("Queryd runtime"); + unreachable!() } diff --git a/src/cli/config.rs b/src/cli/config.rs index d588c5b..eaa3f38 100644 --- a/src/cli/config.rs +++ b/src/cli/config.rs @@ -11,52 +11,61 @@ // along with this software. // If not, see . - -use clap::Clap; +use clap::Parser; use crate::msgbus::constants::*; - -#[derive(Clap, Clone, Debug, Display)] +#[derive(Parser, Clone, Debug, Display)] #[display_from(Debug)] #[clap( name = "bp-cli", version = "0.0.1", author = "Dr Maxim Orlovsky ", - about = "BP node command-line interface; part of Bitcoin protocol node" + about = "BP node command-line interface; part of Bitcoin protocol node" )] pub struct Opts { /// Path and name of the configuration file - #[clap(global = true, short = "c", long = "config", default_value = "./cli.toml")] + #[clap( + global = true, + short = 'c', + long = "config", + default_value = "./cli.toml" + )] pub config: String, /// Sets verbosity level; can be used multiple times to increase verbosity - #[clap(global = true, short = "v", long = "verbose", min_values = 0, max_values = 4, parse(from_occurrences))] + #[clap( + global = true, + short = 'v', + long = "verbose", + min_values = 0, + max_values = 4, + parse(from_occurrences) + )] pub verbose: u8, /// IPC connection string for queryd daemon API - #[clap(global = true, short = "w", long = "queryd-api", default_value = MSGBUS_PEER_API_ADDR, env="BP_CLI_QUERYD_API_ADDR")] + #[clap(global = true, short = 'w', long = "queryd-api", default_value = MSGBUS_PEER_API_ADDR, env="BP_CLI_QUERYD_API_ADDR")] pub queryd_api_socket_str: String, /// IPC connection string for queryd daemon push notifications on perr status updates - #[clap(global = true, short = "W", long = "queryd-push", default_value = MSGBUS_PEER_PUSH_ADDR, env="BP_CLI_QUERYD_PUSH_ADDR")] + #[clap(global = true, short = 'W', long = "queryd-push", default_value = MSGBUS_PEER_PUSH_ADDR, env="BP_CLI_QUERYD_PUSH_ADDR")] pub queryd_push_socket_str: String, #[clap(subcommand)] - pub command: Command + pub command: Command, } -#[derive(Clap, Clone, Debug, Display)] +#[derive(Parser, Clone, Debug, Display)] #[display_from(Debug)] pub enum Command { /// Sends command to a wired daemon to connect to the new peer Query { /// Query to run against Bitcoin blockchain & transaction index - query: String + query: String, }, } - // We need config structure since not all of the parameters can be specified // via environment and command-line arguments. Thus we need a config file and // default set of configuration @@ -85,7 +94,7 @@ impl Default for Config { Self { verbose: 0, msgbus_peer_api_addr: MSGBUS_PEER_API_ADDR.to_string(), - msgbus_peer_sub_addr: MSGBUS_PEER_PUSH_ADDR.to_string() + msgbus_peer_sub_addr: MSGBUS_PEER_PUSH_ADDR.to_string(), } } -} \ No newline at end of file +} diff --git a/src/cli/runtime.rs b/src/cli/runtime.rs index 3d3d44e..1a2cbe3 100644 --- a/src/cli/runtime.rs +++ b/src/cli/runtime.rs @@ -11,16 +11,12 @@ // along with this software. // If not, see . - use std::convert::TryFrom; -use lnpbp::TryService; - use super::*; use crate::error::BootstrapError; use crate::msgbus::{self, Multipart}; - pub struct Runtime { config: Config, context: zmq::Context, @@ -29,21 +25,32 @@ pub struct Runtime { } impl Runtime { - pub async fn init(config: Config) -> Result { + pub fn init(config: Config) -> Result { let context = zmq::Context::new(); - debug!("Opening API socket to wired on {} ...", config.msgbus_peer_api_addr); - let api_socket = context.socket(zmq::REQ) + debug!( + "Opening API socket to wired on {} ...", + config.msgbus_peer_api_addr + ); + let api_socket = context + .socket(zmq::REQ) .map_err(|e| BootstrapError::PublishingError(e))?; - api_socket.bind(&config.msgbus_peer_api_addr) + api_socket + .bind(&config.msgbus_peer_api_addr) .map_err(|e| BootstrapError::PublishingError(e))?; - debug!("Opening push notification socket to wired on {} ...", config.msgbus_peer_sub_addr); - let sub_socket = context.socket(zmq::SUB) + debug!( + "Opening push notification socket to wired on {} ...", + config.msgbus_peer_sub_addr + ); + let sub_socket = context + .socket(zmq::SUB) .map_err(|e| BootstrapError::SubscriptionError(e))?; - sub_socket.connect(&config.msgbus_peer_sub_addr) + sub_socket + .connect(&config.msgbus_peer_sub_addr) .map_err(|e| BootstrapError::SubscriptionError(e))?; - sub_socket.set_subscribe("".as_bytes()) + sub_socket + .set_subscribe("".as_bytes()) .map_err(|e| BootstrapError::SubscriptionError(e))?; debug!("Console is launched"); @@ -55,12 +62,14 @@ impl Runtime { }) } - pub async fn command_query(&self, query: String) -> Result<(), msgbus::Error> { + pub fn command_query(&self, query: String) -> Result<(), msgbus::Error> { info!("Performing QUERY command {} ...", query); let multipart: msgbus::Multipart = msgbus::Command::Query(msgbus::Query { query }).into(); self.api_socket.send_multipart(multipart, 0)?; trace!("Request sent, awaiting response ..."); - let rep: Multipart = self.api_socket.recv_multipart(0)? + let rep: Multipart = self + .api_socket + .recv_multipart(0)? .iter() .map(|vec| zmq::Message::from(vec)) .collect(); @@ -69,23 +78,14 @@ impl Runtime { } } -#[async_trait] -impl TryService for Runtime { - type ErrorType = tokio::task::JoinError; - - async fn try_run_loop(self) -> Result { - loop { - - } - } -} - impl Drop for Runtime { fn drop(&mut self) { trace!("Shutting down sockets"); - self.api_socket.disconnect(&self.config.msgbus_peer_api_addr) + self.api_socket + .disconnect(&self.config.msgbus_peer_api_addr) .unwrap_or_else(|err| error!("Error disconnecting message bus API socket: {}", err)); - self.sub_socket.disconnect(&self.config.msgbus_peer_sub_addr) + self.sub_socket + .disconnect(&self.config.msgbus_peer_sub_addr) .unwrap_or_else(|err| error!("Error disconnecting message bus push socket: {}", err)); } } diff --git a/src/db/models.rs b/src/db/models.rs index edb69d9..9618f9d 100644 --- a/src/db/models.rs +++ b/src/db/models.rs @@ -11,16 +11,14 @@ // along with this software. // If not, see . -use chrono::NaiveDateTime; -use lnpbp::{ - bitcoin, - bp::short_id::{self, Descriptor, BlockChecksum} -}; use super::schema::*; +use bitcoin::{Transaction, TxIn, TxOut}; +use chrono::NaiveDateTime; +use bp::short_id::{BlockChecksum, Descriptor, Error}; #[derive(Identifiable, Queryable, Insertable, Clone, Debug, Display)] #[display_from(Debug)] -#[table_name="block"] +#[table_name = "block"] pub struct Block { pub id: i64, pub block_id: Vec, @@ -33,16 +31,19 @@ pub struct Block { } impl Block { - pub fn compose(block: &bitcoin::Block, descriptor: Descriptor) -> Result { + pub fn compose( + block: &bitcoin::Block, + descriptor: Descriptor, + ) -> Result { Ok(Self { - id: descriptor.try_into_u64()? as i64, - block_id: block.block_hash().to_vec(), - merkle_root: block.merkle_root().to_vec(), - ts: NaiveDateTime::from_timestamp(block.header.time as i64, 0), - difficulty: block.header.bits as i64, - nonce: block.header.nonce as i32, - ver: block.header.version as i32, - tx_count: block.txdata.len() as i32 + id: descriptor.try_into_u64()? as i64, + block_id: block.block_hash().to_vec(), + merkle_root: block.merkle_root().to_vec(), + ts: NaiveDateTime::from_timestamp(block.header.time as i64, 0), + difficulty: block.header.bits as i64, + nonce: block.header.nonce as i32, + ver: block.header.version as i32, + tx_count: block.txdata.len() as i32, }) } @@ -50,7 +51,7 @@ impl Block { fn from_blockchain(block: &bitcoin::Block, block_height: u32) -> Self { let descriptor = Descriptor::OnchainBlock { block_height, - block_checksum: BlockChecksum::from(block.block_hash()) + block_checksum: BlockChecksum::from(block.block_hash()), }; Self::compose(block, descriptor).expect("Just generated descriptor can't fail") } @@ -58,32 +59,35 @@ impl Block { #[derive(Identifiable, Queryable, Insertable, Clone, Debug, Display)] #[display_from(Debug)] -#[table_name="tx"] +#[table_name = "tx"] pub struct Tx { pub id: i64, pub ver: i32, pub locktime: i32, pub out_count: i16, pub in_count: i16, - pub fee: Option + pub fee: Option, } impl Tx { - pub fn compose(tx: &bitcoin::Transaction, descriptor: short_id::Descriptor) -> Result { + pub fn compose( + tx: &Transaction, + descriptor: Descriptor, + ) -> Result { Ok(Self { id: descriptor.try_into_u64()? as i64, ver: tx.version as i32, locktime: tx.lock_time as i32, out_count: tx.output.len() as i16, in_count: tx.input.len() as i16, - fee: None + fee: None, }) } } #[derive(Identifiable, Queryable, Insertable, Clone, Debug, Display)] #[display_from(Debug)] -#[table_name="txin"] +#[table_name = "txin"] pub struct Txin { pub id: i64, pub seq: i32, @@ -91,32 +95,37 @@ pub struct Txin { } impl Txin { - pub fn compose(txin: &bitcoin::TxIn, - descriptor: short_id::Descriptor, - txo_descriptor: short_id::Descriptor) -> Result { + pub fn compose( + txin: &TxIn, + descriptor: Descriptor, + txo_descriptor: Descriptor, + ) -> Result { Ok(Self { id: descriptor.try_into_u64()? as i64, seq: txin.sequence as i32, - txout_id: txo_descriptor.try_into_u64()? as i64 + txout_id: txo_descriptor.try_into_u64()? as i64, }) } } #[derive(Identifiable, Queryable, Insertable, Clone, Debug, Display)] #[display_from(Debug)] -#[table_name="txout"] +#[table_name = "txout"] pub struct Txout { pub id: i64, pub amount: i64, - pub script: Vec + pub script: Vec, } impl Txout { - pub fn compose(txout: &bitcoin::TxOut, descriptor: short_id::Descriptor) -> Result { + pub fn compose( + txout: &TxOut, + descriptor: Descriptor, + ) -> Result { Ok(Self { id: descriptor.try_into_u64()? as i64, amount: txout.value as i64, - script: txout.script_pubkey.to_bytes() + script: txout.script_pubkey.to_bytes(), }) } } diff --git a/src/error.rs b/src/error.rs index eb14bbc..ecd9483 100644 --- a/src/error.rs +++ b/src/error.rs @@ -11,10 +11,7 @@ // along with this software. // If not, see . - use std::io; -use tokio::task::JoinError; - #[derive(Debug, Display)] #[display_from(Debug)] @@ -24,14 +21,15 @@ pub enum BootstrapError { ArgParseError(String), SubscriptionError(zmq::Error), PublishingError(zmq::Error), - MultithreadError(JoinError), MonitorSocketError(Box), } -impl std::error::Error for BootstrapError { } +impl std::error::Error for BootstrapError {} impl From for String { - fn from(err: BootstrapError) -> Self { format!("{}", err) } + fn from(err: BootstrapError) -> Self { + format!("{}", err) + } } impl From<&str> for BootstrapError { @@ -51,9 +49,3 @@ impl From for BootstrapError { BootstrapError::IoError(err) } } - -impl From for BootstrapError { - fn from(err: JoinError) -> Self { - BootstrapError::MultithreadError(err) - } -} diff --git a/src/indexer/config.rs b/src/indexer/config.rs index db852f1..8f0ace2 100644 --- a/src/indexer/config.rs +++ b/src/indexer/config.rs @@ -11,44 +11,64 @@ // along with this software. // If not, see . - -use clap::{Clap}; - -use lnpbp::bitcoin::{Block, Transaction, BlockHash, Txid, hashes::hex::FromHex}; - +use bitcoin::hashes::hex::FromHex; +use bitcoin::{Block, BlockHash, Transaction, Txid}; +use clap::{ArgEnum, Parser}; const BITCOIN_DIR: &str = "/var/lib/bitcoin"; -#[derive(Clap, Clone, Debug, Display)] +#[derive(Parser, Clone, Debug, Display)] #[display_from(Debug)] #[clap( name = "bp-indexer", version = "0.0.1", author = "Dr Maxim Orlovsky ", - about = "BP blockchain indexing utility; part of Bitcoin protocol node" + about = "BP blockchain indexing utility; part of Bitcoin protocol node" )] pub struct Opts { /// Path and name of the configuration file - #[clap(global = true, short = "c", long = "config", default_value = "./indexer.toml")] + #[clap( + global = true, + short = 'c', + long = "config", + default_value = "./indexer.toml" + )] pub config: String, /// Sets verbosity level; can be used multiple times to increase verbosity - #[clap(global = true, short = "v", long = "verbose", min_values = 0, max_values = 4, parse(from_occurrences))] + #[clap( + global = true, + short = 'v', + long = "verbose", + min_values = 0, + max_values = 4, + parse(from_occurrences) + )] pub verbose: u8, /// Connection string to index database - #[clap(global = true, short = "i", long = "index-db", default_value = "postgresql://postgres:example@localhost:5432/bp")] + #[clap( + global = true, + short = 'i', + long = "index-db", + default_value = "postgresql://postgres:example@localhost:5432/bp" + )] pub index_db: String, /// Connection string to state storing database - #[clap(global = true, short = "s", long = "state-db", default_value = "postgresql://postgres:example@localhost:5432/bp-indexer")] + #[clap( + global = true, + short = 's', + long = "state-db", + default_value = "postgresql://postgres:example@localhost:5432/bp-indexer" + )] pub state_db: String, #[clap(subcommand)] - pub command: Command + pub command: Command, } -#[derive(Clap, Clone, Debug, Display)] +#[derive(Parser, ArgEnum, Clone, Debug, Display)] #[display_from(Debug)] pub enum Command { /// Clear parsing status and index data @@ -57,7 +77,12 @@ pub enum Command { /// Reports on current Bitcoin blockchain parse status Status { /// Output formatting to use - #[clap(short = "f", long = "formatting", default_value="pretty-print", arg_enum)] + #[clap( + short = 'f', + long = "formatting", + default_value = "pretty-print", + arg_enum + )] formatting: Formatting, }, @@ -65,7 +90,7 @@ pub enum Command { IndexBlockchain { // TODO: Relace string with `PathBuf`; use #[clap(parse(from_os_str))] /// Bitcoin core data directory - #[clap(short = "b", long = "bitcoin-dir", default_value = BITCOIN_DIR)] + #[clap(short = 'b', long = "bitcoin-dir", default_value = BITCOIN_DIR)] bitcoin_dir: String, /// Clears the existing index data and starts parsing from scratch. @@ -78,8 +103,13 @@ pub enum Command { /// Adds custom off-chain block to the index IndexBlock { /// Format of the provided data. - #[clap(short = "f", long = "format", - conflicts_with("block"), default_value = "auto", arg_enum)] + #[clap( + short = 'f', + long = "format", + conflicts_with("block"), + default_value = "auto", + arg_enum + )] format: DataFormat, // TODO: Move `parse_block_str` implementation into `bitcoin::Block::FromStr` @@ -94,8 +124,13 @@ pub enum Command { /// Adds custom off-chain transaction to the index IndexTransaction { /// Format of the provided data. - #[clap(short = "f", long = "format", - conflicts_with("block"), default_value = "auto", arg_enum)] + #[clap( + short = 'f', + long = "format", + conflicts_with("block"), + default_value = "auto", + arg_enum + )] format: DataFormat, // TODO: Move `parse_tx_str` implementation into `bitcoin::Transaction::FromStr` @@ -111,7 +146,7 @@ pub enum Command { RemoveBlock { /// Block hash (block id) to remove from database. If matches on-chain /// block the parameter is ignored and program fails. - #[clap(parse(try_from_str = ::lnpbp::bitcoin::BlockHash::from_hex))] + #[clap(parse(try_from_str = BlockHash::from_hex))] block_hash: BlockHash, }, @@ -119,12 +154,12 @@ pub enum Command { RemoveTransaction { /// Transaction id to remove from database. If matches on-chain /// transaction the parameter is ignored and program fails. - #[clap(parse(try_from_str = ::lnpbp::bitcoin::Txid::from_hex))] + #[clap(parse(try_from_str = Txid::from_hex))] txid: Txid, }, } -#[derive(Clap, Clone, Debug)] +#[derive(Parser, ArgEnum, Clone, Debug)] pub enum Formatting { PrettyPrint, Json, @@ -133,7 +168,7 @@ pub enum Formatting { AwkFriendly, } -#[derive(Clap, Clone, Debug)] +#[derive(Parser, ArgEnum, Clone, Debug)] pub enum DataFormat { Auto, Binary, @@ -173,7 +208,7 @@ impl Default for Config { verbose: 1, bitcoin_dir: BITCOIN_DIR.to_string(), index_db: "".to_string(), - state_db: "".to_string() + state_db: "".to_string(), } } } diff --git a/src/indexer/db/model.rs b/src/indexer/db/model.rs index 5af22c3..1baac69 100644 --- a/src/indexer/db/model.rs +++ b/src/indexer/db/model.rs @@ -11,28 +11,24 @@ // along with this software. // If not, see . - -use lnpbp::{ - bitcoin::{self, consensus::encode::serialize, BlockHash}, - bp::{short_id, BlockChecksum}, - Wrapper -}; +use bitcoin::{consensus::encode::serialize, Block, BlockHash}; +use bp::short_id::{BlockChecksum, Descriptor}; use chrono::{NaiveDateTime, Utc}; use diesel::pg::data_types::PgInterval; +use amplify::Wrapper; pub(in crate::indexer) use crate::indexer::db::schema as state_schema; +pub(in crate::indexer) use state_schema::cached_block::dsl::cached_block as cache_table; pub(in crate::indexer) use state_schema::state::dsl::state as state_table; pub(in crate::indexer) use state_schema::utxo::dsl::utxo as utxo_table; -pub(in crate::indexer) use state_schema::cached_block::dsl::cached_block as cache_table; use state_schema::*; use crate::parser; - #[derive(Identifiable, Queryable, Insertable, AsChangeset, Clone, Debug, Display)] #[display_from(Debug)] -#[table_name="state"] +#[table_name = "state"] pub(in crate::indexer) struct State { pub id: i16, pub started_at: NaiveDateTime, @@ -75,13 +71,13 @@ impl Default for State { processed_time: PgInterval { microseconds: 0, days: 0, - months: 0 + months: 0, }, utxo_size: 0, utxo_volume: 0, utxo_bytes: 0, block_cache_size: 0, - block_cache_bytes: 0 + block_cache_bytes: 0, } } } @@ -93,8 +89,14 @@ impl From for State { id: 0, started_at: Utc::now().naive_utc(), updated_at: Utc::now().naive_utc(), - last_block_hash: state.last_block_hash.unwrap_or(BlockHash::default()).to_vec(), - last_block_time: NaiveDateTime::from_timestamp(state.last_block_time.unwrap_or(0) as i64, 0), + last_block_hash: state + .last_block_hash + .unwrap_or(BlockHash::default()) + .to_vec(), + last_block_time: NaiveDateTime::from_timestamp( + state.last_block_time.unwrap_or(0) as i64, + 0, + ), known_height: state.known_height as i32, processed_height: state.processed_height as i32, processed_txs: state.processed_txs as i64, @@ -113,27 +115,26 @@ impl From for State { } } - #[derive(Queryable, Insertable)] -#[table_name="cached_block"] +#[table_name = "cached_block"] pub(in crate::indexer) struct CachedBlock { pub hash: Vec, pub prev_hash: Vec, pub block: Vec, } -impl From for CachedBlock { - fn from(block: bitcoin::Block) -> Self { +impl From for CachedBlock { + fn from(block: Block) -> Self { Self { hash: block.block_hash().to_vec(), prev_hash: block.header.prev_blockhash.to_vec(), - block: serialize(&block) + block: serialize(&block), } } } #[derive(Queryable, Insertable)] -#[table_name="utxo"] +#[table_name = "utxo"] pub(in crate::indexer) struct Utxo { pub txid: Vec, pub block_height: i32, @@ -142,13 +143,13 @@ pub(in crate::indexer) struct Utxo { pub output_index: i16, } -impl From for short_id::Descriptor { +impl From for Descriptor { fn from(utxo: Utxo) -> Self { - short_id::Descriptor::OnchainTxOutput { + Descriptor::OnchainTxOutput { block_height: utxo.block_height as u32, block_checksum: BlockChecksum::from_inner(utxo.block_checksum as u8), tx_index: utxo.tx_index as u16, - output_index: utxo.output_index as u16 + output_index: utxo.output_index as u16, } } } diff --git a/src/indexer/error.rs b/src/indexer/error.rs index 93be67f..f36da16 100644 --- a/src/indexer/error.rs +++ b/src/indexer/error.rs @@ -16,7 +16,7 @@ use std::io; use diesel::ConnectionError; use diesel::result::Error as DieselError; -use lnpbp::bitcoin; +use bitcoin::{consensus, hashes}; use crate::parser; @@ -49,14 +49,14 @@ impl From for Error { } } -impl From for Error { - fn from(err: bitcoin::consensus::encode::Error) -> Self { +impl From for Error { + fn from(err: consensus::encode::Error) -> Self { Error::CurruptBlockFile } } -impl From for Error { - fn from(_: bitcoin::hashes::Error) -> Self { +impl From for Error { + fn from(_: hashes::Error) -> Self { Error::IndexDBIntegrityError } } diff --git a/src/indexer/runtime.rs b/src/indexer/runtime.rs index 0b42482..f60138a 100644 --- a/src/indexer/runtime.rs +++ b/src/indexer/runtime.rs @@ -11,20 +11,15 @@ // along with this software. // If not, see . - -use std::{io, fs}; -use diesel::prelude::*; +use bitcoin::{network::stream_reader::StreamReader, Block}; use diesel::pg::PgConnection; -use lnpbp::TryService; -use lnpbp::bitcoin::{ - Block, - network::stream_reader::StreamReader -}; +use diesel::prelude::*; +use microservices::node::TryService; +use std::{fs, io}; use super::{Config, Error}; use crate::parser::BulkParser; - pub struct Runtime { config: Config, parser: BulkParser, @@ -49,16 +44,15 @@ impl Runtime { Ok(Self { config, parser, - blckfile_no: 0 + blckfile_no: 0, }) } } -#[async_trait] impl TryService for Runtime { type ErrorType = Error; - async fn try_run_loop(mut self) -> Result { + fn try_run_loop(mut self) -> Result<(), Self::ErrorType> { loop { self.parse_block_file()?; } @@ -73,7 +67,10 @@ impl Runtime { } fn parse_block_file(&mut self) -> Result<(), Error> { - let blckfile_name = format!("{}/blocks/blk{:05}.dat", self.config.bitcoin_dir, self.blckfile_no); + let blckfile_name = format!( + "{}/blocks/blk{:05}.dat", + self.config.bitcoin_dir, self.blckfile_no + ); info!("Reading blocks from {} ...", blckfile_name); let blckfile = fs::File::open(blckfile_name)?; diff --git a/src/indexer/state.rs b/src/indexer/state.rs index efbee47..288dfdb 100644 --- a/src/indexer/state.rs +++ b/src/indexer/state.rs @@ -11,20 +11,12 @@ // along with this software. // If not, see . +use bitcoin::{consensus::deserialize, hashes::Hash, BlockHash, Txid}; +use diesel::{prelude::*, result::Error as DieselError, PgConnection}; -use diesel::{ - prelude::*, - PgConnection, - result::Error as DieselError, -}; -use lnpbp::bitcoin::{ - BlockHash, Txid, hashes::Hash, consensus::deserialize -}; - -use super::Error; use super::db::model::{self, *}; -use crate::parser::{State as ParserState, data::*, BulkParser}; - +use super::Error; +use crate::parser::{data::*, BulkParser, State as ParserState}; impl BulkParser { pub fn restore(state_conn: PgConnection, index_conn: PgConnection) -> Result { @@ -37,31 +29,43 @@ impl BulkParser { } impl ParserState { - pub(super) fn restore(state_conn: &PgConnection, index_conn: &PgConnection) -> Result { - let state_model = state_table.find(0) + pub(super) fn restore( + state_conn: &PgConnection, + index_conn: &PgConnection, + ) -> Result { + let state_model = state_table + .find(0) .first(state_conn) .or::(Ok(model::State::default()))?; - let utxo = utxo_table.load::(state_conn) + let utxo = utxo_table + .load::(state_conn) .or::(Ok(Vec::new()))? .into_iter() .try_fold(UtxoMap::new(), |mut map, utxo| -> Result { - map.entry(Txid::from_slice(&utxo.txid[..]).map_err(|_| Error::IndexDBIntegrityError)?) - .or_insert_with(VoutMap::new) - .insert(utxo.output_index as u16, utxo.into()); + map.entry( + Txid::from_slice(&utxo.txid[..]).map_err(|_| Error::IndexDBIntegrityError)?, + ) + .or_insert_with(VoutMap::new) + .insert(utxo.output_index as u16, utxo.into()); Ok(map) })?; - let block_cache = cache_table.load::(state_conn) + let block_cache = cache_table + .load::(state_conn) .or::(Ok(Vec::new()))? .into_iter() - .try_fold(BlockMap::new(), |mut map, block| -> Result { - map.insert( - BlockHash::from_slice(&block.hash[..]).map_err(|_| Error::IndexDBIntegrityError)?, - deserialize(&block.block[..]).map_err(|_| Error::IndexDBIntegrityError)? - ); - Ok(map) - })?; + .try_fold( + BlockMap::new(), + |mut map, block| -> Result { + map.insert( + BlockHash::from_slice(&block.hash[..]) + .map_err(|_| Error::IndexDBIntegrityError)?, + deserialize(&block.block[..]).map_err(|_| Error::IndexDBIntegrityError)?, + ); + Ok(map) + }, + )?; Ok(Self { utxo, @@ -70,30 +74,33 @@ impl ParserState { block_cache_removal: vec![], last_block_hash: Some(BlockHash::from_slice(&state_model.last_block_hash[..])?), last_block_time: Some(state_model.last_block_time.timestamp() as u32), - known_height: state_model.known_height as u32, - processed_height: state_model.processed_height as u32, - processed_txs: state_model.processed_txs as u64, - processed_txins: state_model.processed_txins as u64, - processed_txouts: state_model.processed_txouts as u64, - processed_blocks: state_model.processed_blocks as u64, - processed_volume: state_model.processed_volume as u64, - processed_bytes: state_model.processed_bytes as u64, - processed_time: state_model.processed_time.microseconds as u64, - utxo_size: state_model.utxo_size as u32, - utxo_volume: state_model.utxo_volume as u64, - utxo_bytes: state_model.utxo_bytes as u32, - block_cache_size: state_model.block_cache_size as u32, - block_cache_bytes: state_model.block_cache_bytes as u32, + known_height: state_model.known_height as u32, + processed_height: state_model.processed_height as u32, + processed_txs: state_model.processed_txs as u64, + processed_txins: state_model.processed_txins as u64, + processed_txouts: state_model.processed_txouts as u64, + processed_blocks: state_model.processed_blocks as u64, + processed_volume: state_model.processed_volume as u64, + processed_bytes: state_model.processed_bytes as u64, + processed_time: state_model.processed_time.microseconds as u64, + utxo_size: state_model.utxo_size as u32, + utxo_volume: state_model.utxo_volume as u64, + utxo_bytes: state_model.utxo_bytes as u32, + block_cache_size: state_model.block_cache_size as u32, + block_cache_bytes: state_model.block_cache_bytes as u32, }) } - pub(super) fn store(&mut self, state_conn: &PgConnection, index_conn: &PgConnection) -> Result<(), Error> { + pub(super) fn store( + &mut self, + state_conn: &PgConnection, + index_conn: &PgConnection, + ) -> Result<(), Error> { // Not doing as a transaction: atomicity is the job of `BulkParser` // also, saving multiple times does not damage state data // Updating state data - diesel::update(state_table - .find(0)) + diesel::update(state_table.find(0)) .set(model::State::from(self.clone())) .execute(state_conn) .map_err(|err| Error::StateDBError(err))?; @@ -117,7 +124,8 @@ impl ParserState { .map_err(|err| Error::StateDBError(err))?; // Saving block cache - let values: Vec = self.block_cache + let values: Vec = self + .block_cache .iter() .map(|(_, block)| model::CachedBlock::from(block.clone())) .collect(); @@ -127,21 +135,27 @@ impl ParserState { .map_err(|err| Error::StateDBError(err))?; // Saving UTXOs - let values: Vec = self.utxo + let values: Vec = self + .utxo .iter() .flat_map(|(txid, vout_map)| -> Vec { - vout_map.iter().map(|(vout, descriptor)| -> model::Utxo { - model::Utxo { - txid: txid.to_vec(), - block_height: descriptor.get_block_height() - .expect("Runtime error 5: parser-generated descriptor fails conversion") as i32, - block_checksum: descriptor.get_block_checksum() - .expect("Runtime error 6: parser-generated descriptor fails conversion") as i16, - tx_index: descriptor.get_tx_index() - .expect("Runtime error 6: parser-generated descriptor fails conversion") as i16, - output_index: *vout as i16 - } - }) + vout_map + .iter() + .map(|(vout, descriptor)| -> model::Utxo { + model::Utxo { + txid: txid.to_vec(), + block_height: descriptor.get_block_height().expect( + "Runtime error 5: parser-generated descriptor fails conversion", + ) as i32, + block_checksum: descriptor.get_block_checksum().expect( + "Runtime error 6: parser-generated descriptor fails conversion", + ) as i16, + tx_index: descriptor.get_tx_index().expect( + "Runtime error 6: parser-generated descriptor fails conversion", + ) as i16, + output_index: *vout as i16, + } + }) .collect() }) .collect(); @@ -152,4 +166,4 @@ impl ParserState { Ok(()) } -} \ No newline at end of file +} diff --git a/src/lib.rs b/src/lib.rs index d7b29dc..a5af2f4 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -11,42 +11,34 @@ // along with this software. // If not, see . - // We need this since code is not completed and a lot of it is written // for future functionality // Remove this once the first version will be complete #![allow(dead_code)] #![allow(unused_variables)] +#![allow(non_snake_case)] +#![allow(unreachable_patterns)] // In mutithread environments it's critical to capture all failures #![deny(unused_must_use)] - -#![feature(never_type)] #![feature(unwrap_infallible)] #![feature(in_band_lifetimes)] -#[macro_use] -extern crate tokio; -extern crate futures; extern crate zmq; #[macro_use] extern crate diesel; -extern crate clap; #[macro_use] extern crate derive_wrapper; -#[macro_use] -extern crate async_trait; +extern crate chrono; +extern crate dotenv; +extern crate env_logger; #[macro_use] extern crate log; -extern crate env_logger; -extern crate dotenv; -extern crate chrono; -extern crate tiny_http; extern crate prometheus; -extern crate lnpbp; +extern crate tiny_http; +pub mod error; pub mod msgbus; pub mod queryd; -pub mod error; pub mod util; pub mod cli; diff --git a/src/msgbus/command.rs b/src/msgbus/command.rs index 2510adf..08ec23d 100644 --- a/src/msgbus/command.rs +++ b/src/msgbus/command.rs @@ -43,12 +43,12 @@ impl TryFrom for Command { })?; Ok(match cmd { - MSGID_OKAY => Command::Okay, - MSGID_ACK => Command::Ack, - MSGID_SUCCESS => Command::Success, - MSGID_DONE => Command::Done, - MSGID_FAILURE => Command::Failure, - MSGID_QUERY => Command::Query(args.try_into()?), + REPID_OKAY => Command::Okay, + REPID_ACK => Command::Ack, + REPID_SUCCESS => Command::Success, + REPID_DONE => Command::Done, + REPID_FAILURE => Command::Failure, + REQID_UTXO => Command::Query(args.try_into()?), _ => Err(Error::UnknownCommand)?, }) } @@ -59,13 +59,13 @@ impl From for Multipart { use Command::*; match command { - Okay => vec![zmq::Message::from(&MSGID_OKAY.to_be_bytes()[..])], - Ack => vec![zmq::Message::from(&MSGID_ACK.to_be_bytes()[..])], - Success => vec![zmq::Message::from(&MSGID_SUCCESS.to_be_bytes()[..])], - Done => vec![zmq::Message::from(&MSGID_DONE.to_be_bytes()[..])], - Failure => vec![zmq::Message::from(&MSGID_FAILURE.to_be_bytes()[..])], + Okay => vec![zmq::Message::from(&REPID_OKAY.to_be_bytes()[..])], + Ack => vec![zmq::Message::from(&REPID_ACK.to_be_bytes()[..])], + Success => vec![zmq::Message::from(&REPID_SUCCESS.to_be_bytes()[..])], + Done => vec![zmq::Message::from(&REPID_DONE.to_be_bytes()[..])], + Failure => vec![zmq::Message::from(&REPID_FAILURE.to_be_bytes()[..])], Query(query) => vec![ - zmq::Message::from(&MSGID_QUERY.to_be_bytes()[..]), + zmq::Message::from(&REQID_UTXO.to_be_bytes()[..]), ].into_iter() .chain(Multipart::from(query)) .collect::(), diff --git a/src/msgbus/encode.rs b/src/msgbus/encode.rs new file mode 100644 index 0000000..a6576ec --- /dev/null +++ b/src/msgbus/encode.rs @@ -0,0 +1,208 @@ +// LNP/BP Rust Library +// Written in 2020 by +// Dr. Maxim Orlovsky +// Encoding strategies concept developed by +// Martin Habovštiak +// +// To the extent possible under law, the author(s) have dedicated all +// copyright and related and neighboring rights to this software to +// the public domain worldwide. This software is distributed without +// any warranty. +// +// You should have received a copy of the MIT License +// along with this software. +// If not, see . + +use std::convert::TryFrom; +use std::vec::IntoIter; +use zmq::Message; + +use bitcoin::consensus::encode::{ + deserialize as consensus_deserialize, serialize as consensus_serialize, +}; + +use super::{Error, Multipart}; +#[cfg(feature = "use-rgb")] +use crate::csv::{self, network_deserialize, network_serialize}; +use amplify::strategy::Holder; +use bp::short_id::ShortId; + +// 1. Encoding messages +pub trait MessageEncode +where + Self: Sized, +{ + type Error: std::error::Error; + fn into_message(self) -> Message; + fn try_from_message(message: Message) -> Result; +} + +/// This is a trick for rust compiler helping to distinguish types implementing +/// mutually-exclusive traits (required until negative trait impls will be there) +/// Implemented after concept by Martin Habovštiak +mod strategy { + use std::fmt; + + // Defining strategies: + /// Strategy used for encoding data structures that support encoding with + /// bitcoin consensus rules (`bitcoin::consensus::encode`) + pub enum BitcoinConsensus {} + + /// Strategy used for encoding data structures that support encoding with + /// RGB network serialization rules + #[cfg(feature = "use-rgb")] + pub enum RGBStrategy {} + + /// Strategy used for encoding data structures that can be directly + /// represented as `zmq::Message` with `TryFrom` and + /// `Into` trait implementations + pub enum Native {} + + /// Strategy used for custom implementation of data structure encoding + pub trait Other { + type Strategy: Clone + fmt::Debug + fmt::Display; + } +} + +// 1.1. Auto impl for bitcoin-serialized types +impl MessageEncode for Holder +where + T: bitcoin::consensus::encode::Encodable + bitcoin::consensus::encode::Decodable, +{ + type Error = Error; + fn into_message(self) -> Message { + Message::from(consensus_serialize(&self.into_inner())) + } + fn try_from_message(message: Message) -> Result { + Ok(Self::new(consensus_deserialize(&message)?)) + } +} + +// 1.2. Auto impl for client-validation-serialized types +#[cfg(feature = "use-rgb")] +impl MessageEncode for Holder +where + T: csv::serialize::Network, +{ + type Error = Error; + fn into_message(self) -> Message { + Message::from(network_serialize(&self.into_inner()).expect("Commitment serialize failed")) + } + fn try_from_message(message: Message) -> Result { + Ok(Self::new(network_deserialize(&message)?)) + } +} + +// 1.3. Auto impl for types defining own Message serialization rules with TryFrom/Into +impl MessageEncode for Holder +where + T: TryFrom + Into, +{ + type Error = Error; + fn into_message(self) -> Message { + self.into_inner().into() + } + fn try_from_message(message: Message) -> Result { + Ok(Self::new(T::try_from(message)?)) + } +} + +// 1.4. Blanket impl +impl MessageEncode for T +where + T: strategy::Other, + Holder::Strategy>: MessageEncode, +{ + type Error = ::Strategy> as MessageEncode>::Error; + fn into_message(self) -> Message { + Holder::new(self).into_message() + } + fn try_from_message(message: Message) -> Result { + Ok(Holder::try_from_message(message)?.into_inner()) + } +} + +// 1.5. Impl for bp::ShortId +impl MessageEncode for ShortId { + type Error = Error; + fn into_message(self) -> Message { + Message::from(&self.into_u64().to_be_bytes()[..]) + } + fn try_from_message(message: Message) -> Result { + if message.len() != 8 { + Err(Error::MalformedArgument) + } else { + let mut buf = [0u8; 8]; + buf.clone_from_slice(&message[..]); + Ok(Self::from(u64::from_be_bytes(buf))) + } + } +} + +// 2. Encoding multipart messages +pub trait MultipartEncode: TryFrom + Into { + fn into_multipart(self) -> Multipart { + self.into() + } +} + +// Primitive type implementations +// 1. Vector +// We can't use wrapper; in the current version it does not support generics +//wrapper!(ReqVec<'a, T: ReqArg<'a>>, PhantomData<&'a Vec>, Vec, +// doc="Wrapper around `Vec` supporting `Req` trait"); +#[derive(Clone)] +pub struct VecEncoding(Vec); + +// repr(transparent) is not yet working for generics, so we have to implement manually +impl VecEncoding +where + T: MessageEncode, +{ + pub fn new(vec: Vec) -> Self { + Self(vec) + } + pub fn into_iter(self) -> IntoIter { + self.0.into_iter() + } +} + +// repr(transparent) is not yet working for generics, so we have to implement manually +impl IntoIterator for VecEncoding +where + T: MessageEncode, +{ + type Item = as IntoIterator>::Item; + type IntoIter = as IntoIterator>::IntoIter; + fn into_iter(self) -> Self::IntoIter { + self.0.into_iter() + } +} + +impl MultipartEncode for VecEncoding where T: MessageEncode {} + +impl TryFrom for VecEncoding +where + T: MessageEncode, +{ + type Error = (); + + fn try_from(args: Multipart) -> Result { + Ok(VecEncoding::new(args.into_iter().try_fold( + Vec::::new(), + |mut vec, arg| { + vec.push(T::try_from_message(arg).map_err(|_| ())?); + Ok(vec) + }, + )?)) + } +} + +impl From> for Multipart +where + T: MessageEncode, +{ + fn from(vec: VecEncoding) -> Self { + vec.into_iter().map(T::into_message).collect() + } +} diff --git a/src/msgbus/error.rs b/src/msgbus/error.rs index 8a349e9..2875ddc 100644 --- a/src/msgbus/error.rs +++ b/src/msgbus/error.rs @@ -11,26 +11,34 @@ // along with this software. // If not, see . - -use lnpbp::bitcoin; -use lnpbp::bitcoin::secp256k1; - +use bitcoin::{consensus, secp256k1}; #[derive(Debug, Display)] #[display_from(Debug)] pub enum Error { MessageBusError(zmq::Error), + + // Request-specific errors MalformedRequest, MalformedCommand, - MalformedArgument, UnknownCommand, - WrongNumberOfArguments + + // Reply-specific errors + MalformedReply, + MalformedStatus, + UnknownStatus, + + // General API errors that may happen with both requests and replies + MalformedArgument, + WrongNumberOfArguments, } impl std::error::Error for Error {} impl From for String { - fn from(err: Error) -> Self { format!("{}", err) } + fn from(err: Error) -> Self { + format!("{}", err) + } } impl From for Error { @@ -39,8 +47,8 @@ impl From for Error { } } -impl From for Error { - fn from(_: bitcoin::consensus::encode::Error) -> Self { +impl From for Error { + fn from(_: consensus::encode::Error) -> Self { Error::MalformedArgument } } diff --git a/src/msgbus/mod.rs b/src/msgbus/mod.rs index 81a960e..68ca67c 100644 --- a/src/msgbus/mod.rs +++ b/src/msgbus/mod.rs @@ -11,19 +11,32 @@ // along with this software. // If not, see . - +mod command; pub mod constants; +pub mod encode; mod error; -mod command; pub mod proc; -pub use error::*; pub use command::*; +pub use error::*; pub use proc::*; - +pub use encode::*; use std::convert::{TryFrom, TryInto}; - pub type Multipart = Vec; - +pub type CommandId = u16; + +pub fn split_cmd_args(multipart: &Multipart) -> Result<(CommandId, &[zmq::Message]), Error> { + Ok(multipart + .split_first() + .ok_or(Error::MalformedRequest) + .and_then(|(cmd_data, args)| { + if cmd_data.len() != 2 { + Err(Error::MalformedCommand)? + } + let mut buf = [0u8; 2]; + buf.clone_from_slice(&cmd_data[0..2]); + Ok((u16::from_be_bytes(buf), args)) + })?) +} diff --git a/src/msgbus/proc/connect.rs b/src/msgbus/proc/connect.rs index ead98a9..8123bb5 100644 --- a/src/msgbus/proc/connect.rs +++ b/src/msgbus/proc/connect.rs @@ -11,39 +11,35 @@ // along with this software. // If not, see . - use std::convert::TryFrom; use super::*; - #[derive(Clone, Debug, Display)] #[display_from(Debug)] pub struct Query { pub query: String, } -impl Procedure<'_> for Query { } +impl Procedure<'_> for Query {} impl TryFrom<&[zmq::Message]> for Query { type Error = Error; fn try_from(args: &[zmq::Message]) -> Result { - if args.len() != 1 { Err(Error::WrongNumberOfArguments)? } + if args.len() != 1 { + Err(Error::WrongNumberOfArguments)? + } - let query = String::from_utf8(args[0][..].to_vec()) - .map_err(|_| Error::MalformedArgument)?; + let query = + String::from_utf8(args[0][..].to_vec()).map_err(|_| Error::MalformedArgument)?; - Ok(Self { - query - }) + Ok(Self { query }) } } impl From for Multipart { fn from(proc: Query) -> Self { - vec![ - zmq::Message::from(&proc.query), - ] + vec![zmq::Message::from(&proc.query)] } } diff --git a/src/msgbus/proc/mod.rs b/src/msgbus/proc/mod.rs index 307752b..24a2782 100644 --- a/src/msgbus/proc/mod.rs +++ b/src/msgbus/proc/mod.rs @@ -12,17 +12,15 @@ // If not, see . mod connect; -pub use connect::*; pub(self) use super::*; +pub use connect::*; - -pub(super) const MSGID_OKAY: u16 = 0x0001; -pub(super) const MSGID_ACK: u16 = 0x0002; -pub(super) const MSGID_SUCCESS: u16 = 0x0003; -pub(super) const MSGID_DONE: u16 = 0x0004; -pub(super) const MSGID_FAILURE: u16 = 0x0005; -pub(super) const MSGID_QUERY: u16 = 0x0010; - +pub const REQID_UTXO: u16 = 0x0010; +pub const REPID_OKAY: u16 = 0x0001; +pub const REPID_ACK: u16 = 0x0002; +pub const REPID_SUCCESS: u16 = 0x0003; +pub const REPID_DONE: u16 = 0x0004; +pub const REPID_FAILURE: u16 = 0x0005; pub trait Procedure<'a>: TryFrom<&'a [zmq::Message]> + Into { fn into_multipart(self) -> Multipart { diff --git a/src/parser/block_parser.rs b/src/parser/block_parser.rs index 6566f35..1ddb4a8 100644 --- a/src/parser/block_parser.rs +++ b/src/parser/block_parser.rs @@ -11,20 +11,12 @@ // along with this software. // If not, see . +use bitcoin::{Block, Transaction, TxIn, TxOut, Txid}; +use bp::short_id::{BlockChecksum, Descriptor, Dimension}; +use std::collections::{hash_map::Entry, HashMap}; -use std::collections::{HashMap, hash_map::Entry}; -use lnpbp::{ - bitcoin::{ - Txid, Block, Transaction, TxIn, TxOut - }, - bp::short_id::{ - Descriptor, Dimension, BlockChecksum - } -}; - +use super::{error::Error, *}; use crate::db::models as index_models; -use super::{*, error::Error}; - #[derive(Debug, Display)] #[display_from(Debug)] @@ -39,16 +31,20 @@ pub(super) struct BlockParser<'a> { } impl<'a> BlockParser<'a> { - pub(super) fn parse(block: Block, data: &'a mut ParseData, utxo: &'a UtxoMap) -> Result { + pub(super) fn parse( + block: Block, + data: &'a mut ParseData, + utxo: &'a UtxoMap, + ) -> Result { let block_checksum = BlockChecksum::from(block.block_hash()); let mut parser = Self { coinbase_amount: None, descriptor: Descriptor::OnchainBlock { block_height: data.state.processed_height as u32, - block_checksum + block_checksum, }, data, - base_utxo: utxo + base_utxo: utxo, }; parser.parse_block(&block)?; Ok(parser) @@ -61,17 +57,19 @@ impl BlockParser<'_> { self.descriptor = Descriptor::OnchainBlock { block_height: self.data.state.processed_height as u32, - block_checksum: BlockChecksum::from(block.block_hash()) + block_checksum: BlockChecksum::from(block.block_hash()), }; - block.txdata + block + .txdata .iter() .enumerate() .try_for_each(|(index, tx)| self.parse_tx(index, tx))?; - self.data.blocks - .push(index_models::Block::compose(block, self.descriptor) - .map_err(|_| Error::CorruptedShortId)?); + self.data.blocks.push( + index_models::Block::compose(block, self.descriptor) + .map_err(|_| Error::CorruptedShortId)?, + ); self.data.state.processed_height += 1; // TODO: Update the rest of the state @@ -86,7 +84,8 @@ impl BlockParser<'_> { None }; - self.descriptor = self.descriptor + self.descriptor = self + .descriptor .upgraded(index as u16, None) .expect("Descriptor upgrade for an onchain block does not fail"); @@ -100,12 +99,14 @@ impl BlockParser<'_> { .enumerate() .try_for_each(|(index, txin)| self.parse_txin(index, txin))?; - self.descriptor = self.descriptor + self.descriptor = self + .descriptor .downgraded() .expect("Descriptor downgrade from an onchain transaction can't fail"); - self.data.txs.push(index_models::Tx::compose(tx, self.descriptor) - .map_err(|_| Error::CorruptedShortId)?); + self.data.txs.push( + index_models::Tx::compose(tx, self.descriptor).map_err(|_| Error::CorruptedShortId)?, + ); // TODO: Update state stats @@ -113,7 +114,9 @@ impl BlockParser<'_> { } fn parse_txin(&mut self, index: usize, txin: &TxIn) -> Result<(), Error> { - let block_descriptor = self.descriptor.downgraded() + let block_descriptor = self + .descriptor + .downgraded() .expect("Transaction to block descriptor downgrade can't fail"); let txo_descriptor = if let Some(coinbase_amount) = self.coinbase_amount { @@ -127,22 +130,31 @@ impl BlockParser<'_> { block_descriptor } else { // TODO: Update state stats - self.base_utxo.get_descriptor(&txin.previous_output) + self.base_utxo + .get_descriptor(&txin.previous_output) .map(|d| { self.data.spent.push(txin.previous_output); d.clone() }) - .or_else(|| self.data.state.utxo.extract_descriptor(&txin.previous_output)) + .or_else(|| { + self.data + .state + .utxo + .extract_descriptor(&txin.previous_output) + }) .ok_or(Error::BlockValidationIncosistency)? .clone() }; - let descriptor = self.descriptor + let descriptor = self + .descriptor .upgraded(index as u16, Some(Dimension::Input)) .expect("Descriptor upgrade for an onchain transaction does not fail"); - self.data.txins.push(index_models::Txin::compose(txin, descriptor, txo_descriptor) - .map_err(|_| Error::CorruptedShortId)?); + self.data.txins.push( + index_models::Txin::compose(txin, descriptor, txo_descriptor) + .map_err(|_| Error::CorruptedShortId)?, + ); // TODO: Update state stats @@ -150,7 +162,8 @@ impl BlockParser<'_> { } fn parse_txout(&mut self, index: usize, txid: Txid, txout: &TxOut) -> Result<(), Error> { - let descriptor = self.descriptor + let descriptor = self + .descriptor .upgraded(index as u16, Some(Dimension::Output)) .expect("Descriptor upgrade for an onchain transaction does not fail"); @@ -160,8 +173,9 @@ impl BlockParser<'_> { }; txoset.insert(index as u16, self.descriptor); - self.data.txouts.push(index_models::Txout::compose(txout, descriptor) - .map_err(|_| Error::CorruptedShortId)?); + self.data.txouts.push( + index_models::Txout::compose(txout, descriptor).map_err(|_| Error::CorruptedShortId)?, + ); // TODO: Update state stats diff --git a/src/parser/bulk_parser.rs b/src/parser/bulk_parser.rs index f3d7b86..dacbd60 100644 --- a/src/parser/bulk_parser.rs +++ b/src/parser/bulk_parser.rs @@ -16,7 +16,7 @@ use diesel::{ prelude::*, pg::PgConnection }; -use lnpbp::bitcoin::Block; +use bitcoin::Block; use crate::db::schema; use super::*; diff --git a/src/parser/data.rs b/src/parser/data.rs index ed857b9..bdae427 100644 --- a/src/parser/data.rs +++ b/src/parser/data.rs @@ -17,10 +17,8 @@ use std::{ HashMap, hash_map::Entry } }; -use lnpbp::{ - bitcoin::{Txid, BlockHash, Block, OutPoint}, - bp::short_id::Descriptor -}; +use bitcoin::{Txid, BlockHash, Block, OutPoint}; +use bp::short_id::Descriptor; use crate::db::models; use super::state::State; diff --git a/src/parser/error.rs b/src/parser/error.rs index 4bb7844..04c420e 100644 --- a/src/parser/error.rs +++ b/src/parser/error.rs @@ -13,7 +13,7 @@ use diesel::result::Error as DieselError; -use lnpbp::bitcoin; +use bitcoin::hashes; #[derive(PartialEq, Debug, Display)] @@ -33,8 +33,8 @@ impl From for Error { } } -impl From for Error { - fn from(_: bitcoin::hashes::Error) -> Self { +impl From for Error { + fn from(_: hashes::Error) -> Self { Error::IndexIntegrityError } } diff --git a/src/parser/state.rs b/src/parser/state.rs index 3d9eca9..73c8af2 100644 --- a/src/parser/state.rs +++ b/src/parser/state.rs @@ -16,9 +16,7 @@ use std::{ fmt, ops::AddAssign }; -use lnpbp::bitcoin::{ - Block, BlockHash, Txid, OutPoint -}; +use bitcoin::{Block, BlockHash, Txid, OutPoint}; use super::*; diff --git a/src/queryd/api/config.rs b/src/queryd/api/config.rs index 596f674..7f644a0 100644 --- a/src/queryd/api/config.rs +++ b/src/queryd/api/config.rs @@ -12,7 +12,7 @@ // If not, see . -use lnpbp::internet::InetSocketAddr; +use internet2::addr::InetSocketAddr; use crate::queryd::config::Config as MainConfig; diff --git a/src/queryd/api/mod.rs b/src/queryd/api/mod.rs index 729cc86..4687121 100644 --- a/src/queryd/api/mod.rs +++ b/src/queryd/api/mod.rs @@ -12,16 +12,13 @@ // If not, see . pub mod config; -pub mod service; -mod request; mod reply; -pub mod req; +mod request; +pub mod service; pub use config::*; -pub use service::*; -pub use request::*; pub use reply::*; -pub use req::*; - +pub use request::*; +pub use service::*; -use std::convert::{TryFrom, TryInto}; +use std::convert::TryFrom; diff --git a/src/queryd/api/reply.rs b/src/queryd/api/reply.rs index be0ef53..f06f277 100644 --- a/src/queryd/api/reply.rs +++ b/src/queryd/api/reply.rs @@ -11,11 +11,9 @@ // along with this software. // If not, see . - -use lnpbp::rpc::{Multipart, Error}; - use super::*; +use crate::msgbus::{proc, Error, Multipart}; #[derive(Clone, Debug, Display)] #[display_from(Debug)] @@ -32,7 +30,8 @@ impl TryFrom for Reply { type Error = Error; fn try_from(multipart: Multipart) -> Result { - let (cmd, args) = multipart.split_first() + let (cmd, args) = multipart + .split_first() .ok_or(Error::MalformedReply) .and_then(|(cmd_data, args)| { if cmd_data.len() != 2 { @@ -59,11 +58,11 @@ impl From for Multipart { use Reply::*; match reply { - Okay => vec![zmq::Message::from(&REPID_OKAY.to_be_bytes()[..])], - Ack => vec![zmq::Message::from(&REPID_ACK.to_be_bytes()[..])], - Success => vec![zmq::Message::from(&REPID_SUCCESS.to_be_bytes()[..])], - Done => vec![zmq::Message::from(&REPID_DONE.to_be_bytes()[..])], - Failure => vec![zmq::Message::from(&REPID_FAILURE.to_be_bytes()[..])], + Okay => vec![zmq::Message::from(&proc::REPID_OKAY.to_be_bytes()[..])], + Ack => vec![zmq::Message::from(&proc::REPID_ACK.to_be_bytes()[..])], + Success => vec![zmq::Message::from(&proc::REPID_SUCCESS.to_be_bytes()[..])], + Done => vec![zmq::Message::from(&proc::REPID_DONE.to_be_bytes()[..])], + Failure => vec![zmq::Message::from(&proc::REPID_FAILURE.to_be_bytes()[..])], } } } diff --git a/src/queryd/api/req/mod.rs b/src/queryd/api/req/mod.rs deleted file mode 100644 index 31c74c6..0000000 --- a/src/queryd/api/req/mod.rs +++ /dev/null @@ -1,35 +0,0 @@ -// Bitcoin protocol (BP) daemon node -// Written in 2020 by -// Dr. Maxim Orlovsky -// -// To the extent possible under law, the author(s) have dedicated all -// copyright and related and neighboring rights to this software to -// the public domain worldwide. This software is distributed without -// any warranty. -// -// You should have received a copy of the MIT License -// along with this software. -// If not, see . - -mod query; -pub use query::*; - -pub(self) use super::*; - - -use core::slice::Iter; -use std::marker::PhantomData; - -use lnpbp::wrapper; -use lnpbp::bp::short_id::ShortId; -use lnpbp::bitcoin; -use zmq::Message; - -pub(super) const REQID_UTXO: u16 = 0x0010; - -pub(super) const REPID_OKAY: u16 = 0x0001; -pub(super) const REPID_ACK: u16 = 0x0002; -pub(super) const REPID_SUCCESS: u16 = 0x0003; -pub(super) const REPID_DONE: u16 = 0x0004; -pub(super) const REPID_FAILURE: u16 = 0x0005; - diff --git a/src/queryd/api/req/query.rs b/src/queryd/api/req/query.rs deleted file mode 100644 index a85917c..0000000 --- a/src/queryd/api/req/query.rs +++ /dev/null @@ -1,49 +0,0 @@ -// Bitcoin protocol (BP) daemon node -// Written in 2020 by -// Dr. Maxim Orlovsky -// -// To the extent possible under law, the author(s) have dedicated all -// copyright and related and neighboring rights to this software to -// the public domain worldwide. This software is distributed without -// any warranty. -// -// You should have received a copy of the MIT License -// along with this software. -// If not, see . - - -use std::convert::TryFrom; - -use lnpbp::rpc::{Multipart, Error}; - -use super::*; - - -#[derive(Clone, Debug, Display)] -#[display_from(Debug)] -pub struct Query { - pub query: String, -} - -impl TryFrom<&[zmq::Message]> for Query { - type Error = Error; - - fn try_from(args: &[zmq::Message]) -> Result { - if args.len() != 1 { Err(Error::WrongNumberOfArguments)? } - - let query = String::from_utf8(args[0][..].to_vec()) - .map_err(|_| Error::MalformedArgument)?; - - Ok(Self { - query - }) - } -} - -impl From for Multipart { - fn from(proc: Query) -> Self { - vec![ - zmq::Message::from(&proc.query), - ] - } -} diff --git a/src/queryd/api/request.rs b/src/queryd/api/request.rs index 38830d4..d2ce880 100644 --- a/src/queryd/api/request.rs +++ b/src/queryd/api/request.rs @@ -11,18 +11,18 @@ // along with this software. // If not, see . +use bp::short_id::ShortId; -use lnpbp::rpc::*; -use lnpbp::bp::short_id::ShortId; - -use super::*; +use crate::msgbus::{proc, VecEncoding}; +use std::convert::{TryFrom, TryInto}; +use crate::msgbus::{split_cmd_args, Error, Multipart}; #[non_exhaustive] pub enum Request { Txid(VecEncoding), SpendingTxin(VecEncoding), - Utxo(Query), + Utxo(proc::Query), } impl TryFrom for Request { @@ -43,12 +43,11 @@ impl From for Multipart { use Request::*; match command { - Utxo(query) => vec![ - zmq::Message::from(&REQID_UTXO.to_be_bytes()[..]), - ].into_iter() + Utxo(query) => vec![zmq::Message::from(&proc::REQID_UTXO.to_be_bytes()[..])] + .into_iter() .chain(Multipart::from(query)) .collect::(), - _ => unimplemented!() + _ => unimplemented!(), } } } diff --git a/src/queryd/api/service.rs b/src/queryd/api/service.rs index 502f564..279d6a3 100644 --- a/src/queryd/api/service.rs +++ b/src/queryd/api/service.rs @@ -11,17 +11,13 @@ // along with this software. // If not, see . +use crate::msgbus::{Error, Multipart, Query}; -use std::convert::TryFrom; -use futures::TryFutureExt; +use internet2::addr::InetSocketAddrExt; +use microservices::node::TryService; -use lnpbp::rpc::{Multipart, Error}; -use lnpbp::TryService; -use lnpbp::internet::InetSocketAddrExt; - -use crate::BootstrapError; use super::*; - +use crate::BootstrapError; pub struct ApiService { config: Config, @@ -29,78 +25,81 @@ pub struct ApiService { subscriber: zmq::Socket, } -#[async_trait] impl TryService for ApiService { type ErrorType = Error; - async fn try_run_loop(mut self) -> Result { + fn try_run_loop(mut self) -> Result<(), Error> { loop { - match self.run().await { + match self.run() { Ok(_) => debug!("API request processing complete"), Err(err) => { error!("Error processing API request: {}", err); Err(err)?; - }, + } } } } } impl ApiService { - pub fn init(config: Config, - context: zmq::Context - ) -> Result { + pub fn init(config: Config, context: zmq::Context) -> Result { trace!("Opening API socket on {} ...", config.socket_addr); let addr = InetSocketAddrExt::tcp(config.socket_addr.address, config.socket_addr.port); - let subscriber = context.socket(zmq::REP) + let subscriber = context + .socket(zmq::REP) .map_err(|e| BootstrapError::SubscriptionError(e))?; - subscriber.connect(&addr.to_string()) + subscriber + .connect(&addr.to_string()) .map_err(|e| BootstrapError::SubscriptionError(e))?; //subscriber.set_subscribe("".as_bytes()) // .map_err(|e| BootstrapError::SubscriptionError(e))?; - debug!("API sucket opened"); + debug!("API socket opened"); Ok(Self { config, context, - subscriber + subscriber, }) } - async fn run(&mut self) -> Result<(), Error> { - let req: Multipart = self.subscriber + fn run(&mut self) -> Result<(), Error> { + let req: Multipart = self + .subscriber .recv_multipart(0) - .map_err(|err| Error::SocketError(err))? + .map_err(|err| Error::MessageBusError(err))? .into_iter() .map(zmq::Message::from) .collect(); trace!("New API request"); trace!("Received API request {:x?}, processing ... ", req[0]); - let reply = self.proc_command(req) - .inspect_err(|err| error!("Error processing request: {}", err)) - .await + let reply = self + .proc_command(req) .unwrap_or(Reply::Failure); - trace!("Received response from command processor: `{}`; replying to client", reply); - self.subscriber.send_multipart(Multipart::from(Reply::Success), 0)?; + trace!( + "Received response from command processor: `{}`; replying to client", + reply + ); + self.subscriber + .send_multipart(Multipart::from(Reply::Success), 0)?; debug!("Sent reply {}", Reply::Success); Ok(()) } - async fn proc_command(&mut self, req: Multipart) -> Result { + fn proc_command(&mut self, req: Multipart) -> Result { use Request::*; let command = Request::try_from(req)?; match command { - Utxo(query) => self.command_query(query).await, - _ => Err(Error::UnknownCommand) + Utxo(query) => self.command_query(query), + _ => Err(Error::UnknownCommand), } } - async fn command_query(&mut self, query: Query) -> Result { + fn command_query(&mut self, query: Query) -> Result { debug!("Got QUERY {}", query); // TODO: Do query processing diff --git a/src/queryd/config.rs b/src/queryd/config.rs index 60730b9..593db1d 100644 --- a/src/queryd/config.rs +++ b/src/queryd/config.rs @@ -13,14 +13,14 @@ use std::net::SocketAddr; -use clap::Clap; +use clap::{Parser}; -use lnpbp::internet::{InetSocketAddr, InetAddr}; +use internet2::addr::{InetSocketAddr, InetAddr}; const MONITOR_ADDR_DEFAULT: &str = "0.0.0.0:9665"; -#[derive(Clap)] +#[derive(Parser)] #[clap( name = "queryd", version = "0.0.1", @@ -29,28 +29,28 @@ const MONITOR_ADDR_DEFAULT: &str = "0.0.0.0:9665"; )] pub struct Opts { /// Path and name of the configuration file - #[clap(short = "c", long = "config", default_value = "wired.toml")] + #[clap(short = 'c', long = "config", default_value = "wired.toml")] pub config: String, /// Sets verbosity level; can be used multiple times to increase verbosity - #[clap(global = true, short = "v", long = "verbose", min_values = 0, max_values = 4, parse(from_occurrences))] + #[clap(global = true, short = 'v', long = "verbose", min_values = 0, max_values = 4, parse(from_occurrences))] pub verbose: u8, /// IPv4, IPv6 or Tor address to listen for incoming API requests - #[clap(short = "i", long = "inet-addr", default_value = "0.0.0.0", env="BP_QUERYD_INET_ADDR", + #[clap(short = 'i', long = "inet-addr", default_value = "0.0.0.0", env="BP_QUERYD_INET_ADDR", parse(try_from_str))] address: InetAddr, /// Use custom port to listen for incoming API requests - #[clap(short = "a", long = "api-port", default_value = "9713", env="BP_QUERYD_API_PORT")] + #[clap(short = 'a', long = "api-port", default_value = "9713", env="BP_QUERYD_API_PORT")] api_port: u16, /// Use custom port to listen for incoming API requests - #[clap(short = "p", long = "push-port", default_value = "9716", env="BP_QUERYD_PUSH_PORT")] + #[clap(short = 'p', long = "push-port", default_value = "9716", env="BP_QUERYD_PUSH_PORT")] push_port: u16, /// Address for Prometheus monitoring information exporter - #[clap(short = "m", long = "monitor", default_value = MONITOR_ADDR_DEFAULT, env="BP_QUERYD_MONITOR", + #[clap(short = 'm', long = "monitor", default_value = MONITOR_ADDR_DEFAULT, env="BP_QUERYD_MONITOR", parse(try_from_str))] monitor: SocketAddr, } diff --git a/src/queryd/monitor/service.rs b/src/queryd/monitor/service.rs index 05cbb6b..da257fa 100644 --- a/src/queryd/monitor/service.rs +++ b/src/queryd/monitor/service.rs @@ -11,15 +11,13 @@ // along with this software. // If not, see . - -use tiny_http; use prometheus::Encoder; +use tiny_http; -use lnpbp::Service; +use microservices::node::Service; +use super::{error::Error, *}; use crate::error::*; -use super::{*, error::Error}; - pub struct MonitorService { config: Config, @@ -27,27 +25,24 @@ pub struct MonitorService { http_server: tiny_http::Server, } -#[async_trait] impl Service for MonitorService { - async fn run_loop(mut self) -> ! { + fn run_loop(mut self) -> () { loop { - match self.run().await { + match self.run() { Ok(_) => debug!("Monitoring client request processing completed"), Err(err) => { error!("Error processing monitoring client request: {}", err) - }, + } } } } } impl MonitorService { - pub fn init(config: Config, - context: zmq::Context) -> Result { + pub fn init(config: Config, context: zmq::Context) -> Result { let socket_addr = config.socket_addr.clone(); - let http_server = tiny_http::Server::http( - socket_addr.clone() - ).map_err(|err| BootstrapError::MonitorSocketError(err))?; + let http_server = tiny_http::Server::http(socket_addr.clone()) + .map_err(|err| BootstrapError::MonitorSocketError(err))?; Ok(Self { config, @@ -56,17 +51,18 @@ impl MonitorService { }) } - async fn run(&mut self) -> Result<(), Error> { - let request = self.http_server + fn run(&mut self) -> Result<(), Error> { + let request = self + .http_server .recv() .map_err(|err| Error::APIRequestError(err))?; let mut buffer = vec![]; - prometheus::TextEncoder::new() - .encode(&prometheus::gather(), &mut buffer)?; + prometheus::TextEncoder::new().encode(&prometheus::gather(), &mut buffer)?; let response = tiny_http::Response::from_data(buffer); - request.respond(response) + request + .respond(response) .map_err(|err| Error::APIResponseError(err)) } } diff --git a/src/queryd/runtime.rs b/src/queryd/runtime.rs index 47e7f2a..d5d5259 100644 --- a/src/queryd/runtime.rs +++ b/src/queryd/runtime.rs @@ -11,13 +11,11 @@ // along with this software. // If not, see . - -use lnpbp::{TryService, Service}; - -use crate::BootstrapError; -use super::{Config, ApiService}; +use super::{ApiService, Config}; use crate::queryd::MonitorService; - +use crate::BootstrapError; +use microservices::node::{Service, TryService}; +use std::thread; pub struct Runtime { config: Config, @@ -27,18 +25,12 @@ pub struct Runtime { } impl Runtime { - pub async fn init(config: Config) -> Result { + pub fn init(config: Config) -> Result { let context = zmq::Context::new(); - let api_service = ApiService::init( - config.clone().into(), - context.clone() - )?; + let api_service = ApiService::init(config.clone().into(), context.clone())?; // TODO: Add push notification service - let monitor_service = MonitorService::init( - config.clone().into(), - context.clone() - )?; + let monitor_service = MonitorService::init(config.clone().into(), context.clone())?; Ok(Self { config, @@ -49,29 +41,29 @@ impl Runtime { } } -#[async_trait] impl TryService for Runtime { - type ErrorType = tokio::task::JoinError; + type ErrorType = BootstrapError; - async fn try_run_loop(self) -> Result { + fn try_run_loop(self) -> Result<(), Self::ErrorType> { let api_addr = self.config.msgbus_peer_api_addr.clone(); let monitor_addr = self.config.monitor_addr.clone(); let api_service = self.api_service; let monitor_service = self.monitor_service; - try_join!( - tokio::spawn(async move { - info!("API service is listening on {}", api_addr); - api_service.run_or_panic("API service").await - }), - // TODO: Add push notification service - tokio::spawn(async move { - info!("Monitoring (Prometheus) exporter service is listening on {}", monitor_addr); - monitor_service.run_loop().await - }) - )?; + let handler = thread::spawn(move || { + info!("API service is listening on {}", api_addr); + api_service.run_or_panic("API service") + }); + // TODO: Add push notification service + thread::spawn(move || { + info!( + "Monitoring (Prometheus) exporter service is listening on {}", + monitor_addr + ); + monitor_service.run_loop() + }); - loop { } + loop {} } } diff --git a/src/util/parse.rs b/src/util/parse.rs index 17b9adc..5d61874 100644 --- a/src/util/parse.rs +++ b/src/util/parse.rs @@ -11,22 +11,20 @@ // along with this software. // If not, see . - -use lnpbp::bitcoin::{self, Block, Transaction}; -use lnpbp::bitcoin::hashes::hex::FromHex; +use bitcoin::hashes::hex::FromHex; +use bitcoin::{consensus, Block, Transaction}; // TODO: Move `parse_block_str` implementation into `bitcoin::Block::FromStr` -pub fn parse_block_str(data: &str) -> Result { - // TODO: Fix `itcoin::consensus::encode::Error::ParseFailed` `&str` type to String +pub fn parse_block_str(data: &str) -> Result { + // TODO: Fix `consensus::encode::Error::ParseFailed` `&str` type to String let vec = Vec::from_hex(data) - .map_err(|err| bitcoin::consensus::encode::Error::ParseFailed("Not a hexadecimal string"))?; - bitcoin::consensus::deserialize(&vec) + .map_err(|err| consensus::encode::Error::ParseFailed("Not a hexadecimal string"))?; + consensus::deserialize(&vec) } - // TODO: Move `parse_tx_str` implementation into `bitcoin::Transaction::FromStr` -pub fn parse_tx_str(data: &str) -> Result { +pub fn parse_tx_str(data: &str) -> Result { let vec = Vec::from_hex(data) - .map_err(|err| bitcoin::consensus::encode::Error::ParseFailed("Not a hexadecimal string"))?; - bitcoin::consensus::deserialize(&vec) + .map_err(|err| consensus::encode::Error::ParseFailed("Not a hexadecimal string"))?; + consensus::deserialize(&vec) }