Skip to content

Commit

Permalink
Implementing more of the LDK model rpc calls
Browse files Browse the repository at this point in the history
  • Loading branch information
bjohnson5 committed Oct 23, 2024
1 parent e0ed13e commit 5d9aed2
Show file tree
Hide file tree
Showing 3 changed files with 188 additions and 31 deletions.
15 changes: 10 additions & 5 deletions blast_cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -551,7 +551,8 @@ async fn run_command(blast: &mut blast_core::Blast, cmd: String) -> Vec<String>
match blast.open_channel(source, dest, amount, push, chan_id, true).await {
Ok(_) => {},
Err(e) => {
println!("{}", format!("Unable to open channel: {}", e));
let msg = format!("Unable to open channel: {}", e);
output.push(msg);
}
}
},
Expand All @@ -565,7 +566,8 @@ async fn run_command(blast: &mut blast_core::Blast, cmd: String) -> Vec<String>
match blast.close_channel(source, chan_id).await {
Ok(_) => {},
Err(e) => {
println!("{}", format!("Unable to open channel: {}", e));
let msg = format!("Unable to open channel: {}", e);
output.push(msg);
}
}
},
Expand All @@ -575,7 +577,8 @@ async fn run_command(blast: &mut blast_core::Blast, cmd: String) -> Vec<String>
match blast.connect_peer(source, dest).await {
Ok(_) => {},
Err(e) => {
println!("{}", format!("Unable to connect peers: {}", e));
let msg = format!("Unable to connect peers: {}", e);
output.push(msg);
}
}
},
Expand All @@ -585,7 +588,8 @@ async fn run_command(blast: &mut blast_core::Blast, cmd: String) -> Vec<String>
match blast.disconnect_peer(source, dest).await {
Ok(_) => {},
Err(e) => {
println!("{}", format!("Unable to disconnect peers: {}", e));
let msg = format!("Unable to disconnect peers: {}", e);
output.push(msg);
}
}
},
Expand All @@ -594,7 +598,8 @@ async fn run_command(blast: &mut blast_core::Blast, cmd: String) -> Vec<String>
match blast.fund_node(source, true).await {
Ok(_) => {},
Err(e) => {
println!("{}", format!("Unable to fund node: {}", e));
let msg = format!("Unable to fund node: {}", e);
output.push(msg);
}
}
},
Expand Down
4 changes: 4 additions & 0 deletions blast_models/blast_ldk/blast_ldk/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@ tokio = { version = "1.37.0", features = ["full"] }
prost = "0.12"
serde = { version = "1.0.104", features = ["derive"] }
serde_json = "1.0.104"
secp256k1 = "0.29.1"
hex = "0.4.3"
simplelog = "0.12.2"
log = "0.4.20"

[build-dependencies]
tonic-build = "0.11.0"
200 changes: 174 additions & 26 deletions blast_models/blast_ldk/blast_ldk/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,29 @@
use std::str::FromStr;
use std::time::Duration;
use std::thread;
use std::sync::Arc;
use std::collections::HashMap;
use std::fs;
use std::fs::File;

use ldk_node::bip39::serde::{Deserialize, Serialize};
use ldk_node::{Builder, LogLevel};
use ldk_node::bitcoin::Network;
use ldk_node::config::Config;
use ldk_node::lightning::ln::msgs::SocketAddress;

use secp256k1::PublicKey;
use tonic::{transport::Server, Request, Response, Status};
use tonic::Code;
use tokio::sync::Mutex;
use tokio::sync::oneshot;
use tokio::runtime::Runtime;
use simplelog::WriteLogger;
use simplelog::Config as LogConfig;
use log::LevelFilter;
use std::path::PathBuf;
use std::env;
use std::net::TcpListener;

use blast_rpc_server::BlastRpcServer;
use blast_rpc_server::BlastRpc;
Expand Down Expand Up @@ -58,6 +68,32 @@ struct BlastLdkServer {
runtime: Arc<Runtime>
}

impl BlastLdkServer {
async fn get_node(&self, id: String) -> Result<Arc<ldk_node::Node>, Status> {
let bldk = self.blast_ldk.lock().await;
let node = match bldk.nodes.get(&id) {
Some(n) => n,
None => {
return Err(Status::new(Code::NotFound, "Node not found."))
}
};

Ok(node.clone())
}

fn get_available_port(&self) -> Option<u16> {
(8000..9000)
.find(|port| self.port_is_available(*port))
}

fn port_is_available(&self, port: u16) -> bool {
match TcpListener::bind(("127.0.0.1", port)) {
Ok(_) => true,
Err(_) => false,
}
}
}

#[tonic::async_trait]
impl BlastRpc for BlastLdkServer {
async fn start_nodes(&self, request: Request<BlastStartRequest>) -> Result<Response<BlastStartResponse>,Status> {
Expand All @@ -67,11 +103,21 @@ impl BlastRpc for BlastLdkServer {
data_dir.push_str("/blast_data/");
for i in 0..num_nodes {
let node_id = prepend_and_pad("blast_ldk-", i);
let mut listen_addr: Vec<SocketAddress> = Vec::new();
let port = self.get_available_port().unwrap();
let a = format!("127.0.0.1:{}", port);
let addr = match SocketAddress::from_str(&a) {
Ok(a) => a,
Err(_) => {
return Err(Status::new(Code::InvalidArgument, "Could not create listen address."));
}
};
listen_addr.push(addr);
let config = Config {
storage_dir_path: format!("{}{}", data_dir, node_id),
log_dir_path: None,
network: Network::Regtest,
listening_addresses: None,
listening_addresses: Some(listen_addr),
node_alias: None,
sending_parameters: None,
trusted_peers_0conf: Vec::new(),
Expand All @@ -84,11 +130,22 @@ impl BlastRpc for BlastLdkServer {
builder.set_chain_source_bitcoind_rpc(String::from("127.0.0.1"), 18443, String::from("user"), String::from("pass"));
builder.set_gossip_source_p2p();

let node = Arc::new(builder.build().unwrap());
let ldknode = match builder.build() {
Ok(n) => n,
Err(_) => {
return Err(Status::new(Code::Unknown, "Could not create the ldk node."));
}
};
let node = Arc::new(ldknode);

match node.start_with_runtime(Arc::clone(&self.runtime)) {
Ok(_) => {},
Err(_) => {
return Err(Status::new(Code::Unknown, "Could not start the ldk node."));
}
}

node.start_with_runtime(Arc::clone(&self.runtime)).unwrap();
println!("Node ({:?}) Status: {:?}", node_id, node.status());
thread::sleep(Duration::from_secs(10));
thread::sleep(Duration::from_secs(2));

let mut bldk = self.blast_ldk.lock().await;
bldk.nodes.insert(node_id.clone(), node.clone());
Expand All @@ -111,39 +168,62 @@ impl BlastRpc for BlastLdkServer {
let response = Response::new(start_response);
Ok(response)
}

async fn get_sim_ln(&self, _request: Request<BlastSimlnRequest>) -> Result<Response<BlastSimlnResponse>, Status> {
let bldk = self.blast_ldk.lock().await;
let simln_response = BlastSimlnResponse { simln_data: bldk.simln_data.clone().into() };
let response = Response::new(simln_response);
Ok(response)
}

async fn get_pub_key(&self, request: Request<BlastPubKeyRequest>,) -> Result<Response<BlastPubKeyResponse>, Status> {
let node_id = &request.get_ref().node;
let bldk = self.blast_ldk.lock().await;
let node = bldk.nodes.get(node_id).unwrap();
let node = self.get_node(node_id.to_string()).await?;
let pub_key = node.node_id().to_string();

let key_response = BlastPubKeyResponse { pub_key: pub_key };
let response = Response::new(key_response);
Ok(response)
}

async fn list_peers(&self, _request: Request<BlastPeersRequest>,) -> Result<Response<BlastPeersResponse>, Status> {
Err(Status::new(Code::InvalidArgument, "name is invalid"))
async fn list_peers(&self, request: Request<BlastPeersRequest>,) -> Result<Response<BlastPeersResponse>, Status> {
let node_id = &request.get_ref().node;
let node = self.get_node(node_id.to_string()).await?;
let peers = format!("{:?}", node.list_peers());

let peers_response = BlastPeersResponse { peers: peers };
let response = Response::new(peers_response);
Ok(response)
}

async fn wallet_balance(&self, _request: Request<BlastWalletBalanceRequest>) -> Result<Response<BlastWalletBalanceResponse>, Status> {
Err(Status::new(Code::InvalidArgument, "name is invalid"))
async fn wallet_balance(&self, request: Request<BlastWalletBalanceRequest>) -> Result<Response<BlastWalletBalanceResponse>, Status> {
let node_id = &request.get_ref().node;
let node = self.get_node(node_id.to_string()).await?;
let balance = node.list_balances().total_onchain_balance_sats;

let balance_response = BlastWalletBalanceResponse { balance: balance.to_string() };
let response = Response::new(balance_response);
Ok(response)
}

async fn channel_balance(&self, _request: Request<BlastChannelBalanceRequest>) -> Result<Response<BlastChannelBalanceResponse>, Status> {
Err(Status::new(Code::InvalidArgument, "name is invalid"))
async fn channel_balance(&self, request: Request<BlastChannelBalanceRequest>) -> Result<Response<BlastChannelBalanceResponse>, Status> {
let node_id = &request.get_ref().node;
let node = self.get_node(node_id.to_string()).await?;
let balance = node.list_balances().total_lightning_balance_sats;

let balance_response = BlastChannelBalanceResponse { balance: balance.to_string() };
let response = Response::new(balance_response);
Ok(response)
}

async fn list_channels(&self, _request: Request<BlastListChannelsRequest>) -> Result<Response<BlastListChannelsResponse>, Status> {
Err(Status::new(Code::InvalidArgument, "name is invalid"))
async fn list_channels(&self, request: Request<BlastListChannelsRequest>) -> Result<Response<BlastListChannelsResponse>, Status> {
let node_id = &request.get_ref().node;
let node = self.get_node(node_id.to_string()).await?;
let chans = format!("{:?}", node.list_channels());

let chan_response = BlastListChannelsResponse { channels: chans };
let response = Response::new(chan_response);
Ok(response)
}

async fn open_channel(&self, _request: Request<BlastOpenChannelRequest>) -> Result<Response<BlastOpenChannelResponse>, Status> {
Expand All @@ -158,20 +238,79 @@ impl BlastRpc for BlastLdkServer {
Err(Status::new(Code::InvalidArgument, "name is invalid"))
}

async fn connect_peer(&self, _request: Request<BlastConnectRequest>) -> Result<Response<BlastConnectResponse>, Status> {
Err(Status::new(Code::InvalidArgument, "name is invalid"))
async fn connect_peer(&self, request: Request<BlastConnectRequest>) -> Result<Response<BlastConnectResponse>, Status> {
let req = &request.get_ref();
let node_id = &req.node;
let peer_pub = match PublicKey::from_slice(hex::decode(&req.peer_pub_key).unwrap().as_slice()) {
Ok(k) => { k },
Err(_) => {
return Err(Status::new(Code::InvalidArgument, format!("Could not parse peer pub key: {:?}", req.peer_pub_key)));
}
};
let addr = req.peer_addr.clone();
let converted_addr = addr.replace("localhost", "127.0.0.1");
let peer_addr = match SocketAddress::from_str(&converted_addr) {
Ok(a) => { a },
Err(_) => {
return Err(Status::new(Code::InvalidArgument, format!("Could not parse peer address: {:?}", &req.peer_addr)));
}
};
let node = self.get_node(node_id.to_string()).await?;
match node.connect(peer_pub, peer_addr, true) {
Ok(_) => {
let connect_response = BlastConnectResponse { success: true };
let response = Response::new(connect_response);
Ok(response)
},
Err(_) => {
let connect_response = BlastConnectResponse { success: false };
let response = Response::new(connect_response);
Ok(response)
}
}
}

async fn disconnect_peer(&self, _request: Request<BlastDisconnectRequest>) -> Result<Response<BlastDisconnectResponse>, Status> {
Err(Status::new(Code::InvalidArgument, "name is invalid"))
async fn disconnect_peer(&self, request: Request<BlastDisconnectRequest>) -> Result<Response<BlastDisconnectResponse>, Status> {
let req = &request.get_ref();
let node_id = &req.node;
let peer_pub = match PublicKey::from_slice(hex::decode(&req.peer_pub_key).unwrap().as_slice()) {
Ok(k) => { k },
Err(_) => {
return Err(Status::new(Code::InvalidArgument, format!("Could not parse peer pub key: {:?}", req.peer_pub_key)));
}
};
let node = self.get_node(node_id.to_string()).await?;
match node.disconnect(peer_pub) {
Ok(_) => {
let connect_response = BlastDisconnectResponse { success: true };
let response = Response::new(connect_response);
Ok(response)
},
Err(_) => {
let connect_response = BlastDisconnectResponse { success: false };
let response = Response::new(connect_response);
Ok(response)
}
}
}

async fn get_btc_address(&self, _request: Request<BlastBtcAddressRequest>) -> Result<Response<BlastBtcAddressResponse>, Status> {
Err(Status::new(Code::InvalidArgument, "name is invalid"))
}

async fn get_listen_address(&self, _request: Request<BlastListenAddressRequest>) -> Result<Response<BlastListenAddressResponse>, Status> {
Err(Status::new(Code::InvalidArgument, "name is invalid"))
async fn get_listen_address(&self, request: Request<BlastListenAddressRequest>) -> Result<Response<BlastListenAddressResponse>, Status> {
let node_id = &request.get_ref().node;
let node = self.get_node(node_id.to_string()).await?;
let addr = match node.config().listening_addresses {
Some(a) => { a },
None => {
return Err(Status::new(Code::InvalidArgument, "Could not get listening address."));
}
};

let listen_response = BlastListenAddressResponse { address: addr.clone().get(0).unwrap().clone().to_string() };
let response = Response::new(listen_response);
Ok(response)
}

async fn stop_model(&self, _request: Request<BlastStopModelRequest>) -> Result<Response<BlastStopModelResponse>, Status> {
Expand Down Expand Up @@ -200,6 +339,15 @@ impl BlastRpc for BlastLdkServer {
}

fn main() -> Result<(), Box<dyn std::error::Error>> {
let home = env::var("HOME").expect("HOME environment variable not set");
let folder_path = PathBuf::from(home).join(".blast/blast_ldk.log");
std::fs::create_dir_all(folder_path.parent().unwrap()).unwrap();
let _ = WriteLogger::init(
LevelFilter::Info,
LogConfig::default(),
File::create(folder_path).unwrap(),
);

let rt = Arc::new(tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
Expand All @@ -215,7 +363,7 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
runtime: Arc::clone(&rt)
};

println!("Starting gRPC server at {}", addr);
log::info!("Starting gRPC server at {}", addr);

let server = rt.spawn(async move {
Server::builder()
Expand All @@ -233,11 +381,11 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
let _ = server.await;
});

println!("Shutting down gRPC server at {}", addr);
log::info!("Shutting down gRPC server at {}", addr);

Ok(())
}

fn prepend_and_pad(input: &str, num: i32) -> String {
format!("{}{:04}", input, num)
}
}

0 comments on commit 5d9aed2

Please sign in to comment.