Skip to content

Commit

Permalink
Merge pull request #7 from ourzora/erik/back-2440-mark-mints-as-seen-…
Browse files Browse the repository at this point in the history
…prune

[back-2440] Move to alloy, implement watching chain
  • Loading branch information
erikreppel authored Apr 2, 2024
2 parents 9aee185 + fbeda79 commit 7f4c854
Show file tree
Hide file tree
Showing 14 changed files with 1,450 additions and 98 deletions.
1,001 changes: 979 additions & 22 deletions Cargo.lock

Large diffs are not rendered by default.

43 changes: 40 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,55 @@ edition = "2021"
[dependencies]
tracing = "0.1.40"
tracing-subscriber = { version = "0.3.18", features = ["env-filter"] }
libp2p = { version = "0.53.2", features = ["tokio", "tcp", "macros", "noise", "yamux", "kad", "gossipsub"] }
libp2p = { version = "0.53.2", features = [
"tokio",
"tcp",
"macros",
"noise",
"yamux",
"kad",
"gossipsub",
] }
tokio = { version = "1.36.0", features = ["full"] }
eyre = "0.6.12"
async-trait = "0.1.77"
serde = { version = "1.0.197", features = ["derive"] }
serde_json = "1.0.114"
alloy-primitives = { version = "0.6.4", features = ["serde"] }
clap = { version = "4.5.2", features = ["derive"] }
envconfig = "0.10.0"
colored = "2.1.0"
sqlx = { version = "0.7.4", features = ["runtime-tokio", "macros", "sqlite", "postgres", "any"] }
sqlx = { version = "0.7.4", features = [
"runtime-tokio",
"macros",
"sqlite",
"postgres",
"any",
] }
once_cell = "1.19.0"
rand = "0.8.5"
futures-util = "0.3"

alloy = { git = "https://github.com/alloy-rs/alloy", rev = "7e39c85f9f51e6449a8b661f54df0ac213f18639", features = [
"sol-types",
"network",
"rpc-types-eth",
"pubsub",
] }

alloy-rpc-client = { git = "https://github.com/alloy-rs/alloy", rev = "7e39c85f9f51e6449a8b661f54df0ac213f18639", features = [
"pubsub",
"ws",
] }
alloy-provider = { git = "https://github.com/alloy-rs/alloy", rev = "7e39c85f9f51e6449a8b661f54df0ac213f18639", features = [
"pubsub",
] }

alloy-sol-types = { git = "https://github.com/alloy-rs/core", rev = "525a233" }
alloy-primitives = { git = "https://github.com/alloy-rs/core", rev = "525a233", features = ["serde"] }


[patch.crates-io]
alloy-primitives = { git = "https://github.com/alloy-rs/core", rev = "525a233" }


[profile.dev.package.sqlx-macros]
Expand Down
69 changes: 69 additions & 0 deletions src/chain.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
use crate::controller::ControllerCommands;
use crate::types::Premint;
use alloy::network::Ethereum;
use alloy::pubsub::PubSubFrontend;
use alloy_provider::{Provider, RootProvider};
use alloy_rpc_client::{RpcClient, WsConnect};
use futures_util::StreamExt;
use tokio::sync::mpsc::Sender;

/// Checks for new premints being brought onchain then sends to controller to handle
struct MintChecker {
chain_id: u64,
rpc_url: String,
channel: Sender<ControllerCommands>,
}

impl MintChecker {
pub async fn poll_for_new_mints<T: Premint>(&self) -> eyre::Result<()> {
let mut highest_block: Option<u64> = None;

let rpc = self.make_provider().await?;
let mut filter = if let Some(filter) = T::check_filter(self.chain_id) {
filter
} else {
let err = eyre::eyre!("No filter for chain / premint type, skipping spawning checker");
tracing::warn!(error = err.to_string(), "checking failed");
return Err(err);
};

loop {
// set start block incase of WS disconnect
if let Some(highest_block) = highest_block {
filter = filter.from_block(highest_block);
}
let mut stream = rpc.subscribe_logs(&filter).await?.into_stream();

while let Some(log) = stream.next().await {
match T::map_claim(self.chain_id, log.clone()) {
Ok(claim) => {
if let Err(err) = self
.channel
.send(ControllerCommands::ResolveOnchainMint(claim))
.await
{
tracing::error!("Error sending claim to controller: {}", err);
}
}
Err(e) => {
tracing::error!("Error processing log while checking premint: {}", e);
}
}
if let Some(block_number) = log.block_number {
highest_block = Some(block_number.to());
}
}
}
}

async fn make_provider(&self) -> eyre::Result<RootProvider<Ethereum, PubSubFrontend>> {
let ws_transport = WsConnect::new(self.rpc_url.clone());

// Connect to the WS client.
let rpc_client = RpcClient::connect_pubsub(ws_transport).await?;

// Create the provider.
let provider = RootProvider::<Ethereum, _>::new(rpc_client);
Ok(provider)
}
}
82 changes: 81 additions & 1 deletion src/config.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
use crate::types::PremintName;
use envconfig::Envconfig;
use std::collections::HashMap;
use std::env;
use std::error::Error;
use std::str::FromStr;

#[derive(Envconfig, Debug)]
pub struct Config {
Expand Down Expand Up @@ -27,6 +31,36 @@ pub struct Config {
// Comma separated list of default premint types to process
#[envconfig(from = "PREMINT_TYPES", default = "zora_premint_v2")]
pub premint_types: String,

#[envconfig(from = "CHAIN_INCLUSION_MODE", default = "verify")]
pub chain_inclusion_mode: ChainInclusionMode,

#[envconfig(from = "SUPPORTED_CHAIN_IDS", default = "777777,8423")]
pub supported_chain_ids: String,
// Dynamic configuration: RPC urls take the form of CHAIN_<chain_id>_RPC_WSS
// If not provided in the environment, the default is to use the public node
#[envconfig(from = "TRUSTED_PEERS")]
pub trusted_peers: Option<String>,
}

#[derive(Debug, Clone, Copy, PartialEq)]
pub enum ChainInclusionMode {
Check, // node will check chains for new premints getting included
Verify, // node will verify that premints are included on chain based on messages from other nodes
Trust, // node will trust that premints are included on chain based on messages from other trusted nodes
}

impl FromStr for ChainInclusionMode {
type Err = eyre::Report;

fn from_str(s: &str) -> Result<Self, Self::Err> {
match s.to_lowercase().as_str() {
"check" => Ok(Self::Check),
"verify" => Ok(Self::Verify),
"trust" => Ok(Self::Trust),
_ => Err(eyre::eyre!("Invalid chain inclusion mode")),
}
}
}

impl Config {
Expand All @@ -44,14 +78,54 @@ impl Config {
.map(|s| PremintName(s.to_string()))
.collect()
}

pub fn supported_chains(&self) -> Vec<u64> {
self.supported_chain_ids
.split(',')
.map(|s| s.parse().unwrap())
.collect()
}

pub fn trusted_peers(&self) -> Vec<String> {
match &self.trusted_peers {
None => vec![],
Some(peers) => peers.split(',').map(|s| s.to_string()).collect(),
}
}

pub fn rpc_url(&self, chain_id: u64) -> eyre::Result<String> {
let defaults = HashMap::from([
(7777777, "wss://rpc.zora.co"),
(8423, "wss://base-rpc.publicnode.com"),
]);

match env::var(format!("CHAIN_{}_RPC_WSS", chain_id)) {
Ok(url) => Ok(url),
Err(_) => match defaults.get(&chain_id) {
Some(url) => Ok(url.to_string()),
None => Err(eyre::eyre!("No default RPC URL for chain {}", chain_id)),
},
}
}

pub fn validate(self) -> Self {
for chain_id in self.supported_chains() {
self.rpc_url(chain_id).expect(format!("Failed to get RPC URL for configured chain_id {chain_id}. Set environment variable CHAIN_{chain_id}_RPC_WSS").as_str());
}
self
}
}

pub fn init() -> Config {
Config::init_from_env().expect("Failed to load config")
Config::init_from_env()
.expect("Failed to load config")
.validate()
}

#[cfg(test)]
mod test {
use crate::config::ChainInclusionMode;

#[test]
fn test_premint_names() {
let config = super::Config {
Expand All @@ -63,6 +137,9 @@ mod test {
prune_minted_premints: false,
peer_limit: 1000,
premint_types: "simple,zora_premint_v2".to_string(),
chain_inclusion_mode: ChainInclusionMode::Check,
supported_chain_ids: "7777777".to_string(),
trusted_peers: None,
};

let names = config.premint_names();
Expand All @@ -79,6 +156,9 @@ mod test {
prune_minted_premints: false,
peer_limit: 1000,
premint_types: "zora_premint_v2".to_string(),
chain_inclusion_mode: ChainInclusionMode::Check,
supported_chain_ids: "7777777".to_string(),
trusted_peers: None,
};

let names = config.premint_names();
Expand Down
6 changes: 5 additions & 1 deletion src/controller.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::p2p::NetworkState;
use crate::storage::PremintStorage;
use crate::types::{MintpoolNodeInfo, PremintTypes};
use crate::types::{InclusionClaim, MintpoolNodeInfo, PremintTypes};
use sqlx::SqlitePool;
use tokio::select;
use tokio::sync::{mpsc, oneshot};
Expand Down Expand Up @@ -42,6 +42,7 @@ pub enum ControllerCommands {
channel: oneshot::Sender<MintpoolNodeInfo>,
},
Query(DBQuery),
ResolveOnchainMint(InclusionClaim),
}

pub enum DBQuery {
Expand Down Expand Up @@ -146,6 +147,9 @@ impl Controller {
};
}
},
ControllerCommands::ResolveOnchainMint(_) => {
todo!("prune")
}
}
Ok(())
}
Expand Down
2 changes: 2 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
pub mod chain;
pub mod config;
pub mod controller;
pub mod p2p;
pub mod premints;
pub mod run;
pub mod stdin;
pub mod storage;
Expand Down
4 changes: 2 additions & 2 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use clap::Parser;
use mintpool::run::start_swarm_and_controller;
use mintpool::run::start_services;
use mintpool::stdin::watch_stdin;
use tracing_subscriber::EnvFilter;

Expand All @@ -14,7 +14,7 @@ async fn main() -> eyre::Result<()> {

tracing::info!("Starting mintpool with config: {:?}", config);

let ctl = start_swarm_and_controller(&config).await?;
let ctl = start_services(&config).await?;
watch_stdin(ctl).await;

Ok(())
Expand Down
1 change: 0 additions & 1 deletion src/p2p.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use crate::config::Config;
use crate::controller::{P2PEvent, SwarmCommand};
use crate::types::{MintpoolNodeInfo, Premint, PremintName, PremintTypes};
use alloy_primitives::private::derive_more::Display;
use eyre::WrapErr;
use libp2p::core::ConnectedPoint;
use libp2p::futures::StreamExt;
Expand Down
1 change: 1 addition & 0 deletions src/premints/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
pub mod zora_v2;
Loading

0 comments on commit 7f4c854

Please sign in to comment.