Skip to content

Commit 304e17c

Browse files
committed
Reactor & update dependencies
1 parent 7e44050 commit 304e17c

File tree

13 files changed

+180
-359
lines changed

13 files changed

+180
-359
lines changed

Cargo.lock

Lines changed: 18 additions & 58 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

client/Cargo.toml

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,21 +18,20 @@ path = "src/lib.rs"
1818
[dependencies]
1919
spaces_wallet = { path = "../wallet" }
2020
spaces_protocol = { path = "../protocol", version = "*", features = ["std"]}
21-
spacedb = { git = "https://github.com/spacesprotocol/spacedb", version = "0.0.5" }
21+
spacedb = { git = "https://github.com/spacesprotocol/spacedb", version = "0.0.6" }
2222

2323
tokio = { version = "1.37.0", features = ["signal"] }
2424
ctrlc = "3.4.4"
2525
anyhow = "1.0.86"
2626
clap = { version = "4.5.6", features = ["derive", "env"] }
2727
log = "0.4.21"
2828
serde = { version = "1.0.200", features = ["derive"] }
29-
toml = "0.8.14"
3029
hex = "0.4.3"
3130
jsonrpsee = { version = "0.22.5", features = ["server", "http-client", "macros"] }
3231
directories = "5.0.1"
3332
env_logger = "0.11.3"
3433
serde_json = "1.0.116"
35-
bincode = {version = "2.0.0-rc.3", features = ["serde", "derive"]}
34+
bincode = {version = "2.0.1", features = ["serde", "derive"]}
3635
base64 = "0.22.1"
3736
futures = "0.3.30"
3837
reqwest = { version = "0.12.5", default-features = false, features = ["json", "blocking", "rustls-tls"] }

client/src/app.rs

Lines changed: 139 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,139 @@
1+
use std::path::PathBuf;
2+
use std::sync::Arc;
3+
use anyhow::anyhow;
4+
use tokio::sync::{broadcast, mpsc};
5+
use tokio::task::{JoinHandle, JoinSet};
6+
use crate::config::Args;
7+
use crate::rpc::{AsyncChainState, RpcServerImpl, WalletLoadRequest, WalletManager};
8+
use crate::source::{BitcoinBlockSource, BitcoinRpc};
9+
use crate::spaces::Spaced;
10+
use crate::store::LiveSnapshot;
11+
use crate::wallets::RpcWallet;
12+
13+
pub struct App {
14+
shutdown: broadcast::Sender<()>,
15+
services: JoinSet<anyhow::Result<()>>,
16+
}
17+
18+
impl App {
19+
pub fn new(shutdown: broadcast::Sender<()>) -> Self {
20+
Self {
21+
shutdown,
22+
services: JoinSet::new(),
23+
}
24+
}
25+
26+
async fn setup_rpc_wallet(&mut self, spaced: &Spaced, rx: mpsc::Receiver<WalletLoadRequest>, cbf: bool) {
27+
let wallet_service = RpcWallet::service(
28+
spaced.network,
29+
spaced.rpc.clone(),
30+
spaced.chain.state.clone(),
31+
rx,
32+
self.shutdown.clone(),
33+
spaced.num_workers,
34+
cbf
35+
);
36+
37+
self.services.spawn(async move {
38+
wallet_service
39+
.await
40+
.map_err(|e| anyhow!("Wallet service error: {}", e))
41+
});
42+
}
43+
44+
async fn setup_rpc_services(&mut self, spaced: &Spaced) {
45+
let (wallet_loader_tx, wallet_loader_rx) = mpsc::channel(1);
46+
47+
let wallet_manager = WalletManager {
48+
data_dir: spaced.data_dir.join("wallets"),
49+
network: spaced.network,
50+
rpc: spaced.rpc.clone(),
51+
wallet_loader: wallet_loader_tx,
52+
wallets: Arc::new(Default::default()),
53+
};
54+
55+
let (async_chain_state, async_chain_state_handle) = create_async_store(
56+
spaced.rpc.clone(),
57+
spaced.anchors_path.clone(),
58+
spaced.chain.state.clone(),
59+
spaced.block_index.as_ref().map(|index| index.state.clone()),
60+
self.shutdown.subscribe(),
61+
)
62+
.await;
63+
64+
self.services.spawn(async {
65+
async_chain_state_handle
66+
.await
67+
.map_err(|e| anyhow!("Chain state error: {}", e))
68+
});
69+
let rpc_server = RpcServerImpl::new(async_chain_state.clone(), wallet_manager);
70+
71+
let bind = spaced.bind.clone();
72+
let shutdown = self.shutdown.clone();
73+
74+
self.services.spawn(async move {
75+
rpc_server
76+
.listen(bind, shutdown)
77+
.await
78+
.map_err(|e| anyhow!("RPC Server error: {}", e))
79+
});
80+
81+
self.setup_rpc_wallet(spaced, wallet_loader_rx, spaced.cbf).await;
82+
}
83+
84+
async fn setup_sync_service(&mut self, mut spaced: Spaced) {
85+
let (spaced_sender, spaced_receiver) = tokio::sync::oneshot::channel();
86+
87+
let shutdown = self.shutdown.clone();
88+
let rpc = spaced.rpc.clone();
89+
90+
std::thread::spawn(move || {
91+
let source = BitcoinBlockSource::new(rpc);
92+
_ = spaced_sender.send(spaced.protocol_sync(source, shutdown));
93+
});
94+
95+
self.services.spawn(async move {
96+
spaced_receiver
97+
.await?
98+
.map_err(|e| anyhow!("Protocol sync error: {}", e))
99+
});
100+
}
101+
102+
pub async fn run(&mut self, args: Vec<String>) -> anyhow::Result<()> {
103+
let shutdown_receiver = self.shutdown.subscribe();
104+
let spaced = Args::configure(args, shutdown_receiver).await?;
105+
self.setup_rpc_services(&spaced).await;
106+
self.setup_sync_service(spaced).await;
107+
108+
while let Some(res) = self.services.join_next().await {
109+
res??
110+
}
111+
112+
Ok(())
113+
}
114+
}
115+
116+
async fn create_async_store(
117+
rpc: BitcoinRpc,
118+
anchors: Option<PathBuf>,
119+
chain_state: LiveSnapshot,
120+
block_index: Option<LiveSnapshot>,
121+
shutdown: broadcast::Receiver<()>,
122+
) -> (AsyncChainState, JoinHandle<()>) {
123+
let (tx, rx) = mpsc::channel(32);
124+
let async_store = AsyncChainState::new(tx);
125+
let client = reqwest::Client::new();
126+
let handle = tokio::spawn(async move {
127+
AsyncChainState::handler(
128+
&client,
129+
rpc,
130+
anchors,
131+
chain_state,
132+
block_index,
133+
rx,
134+
shutdown,
135+
)
136+
.await
137+
});
138+
(async_store, handle)
139+
}

0 commit comments

Comments
 (0)