From 0b371bd0d7274e84597b486beca8e1694bba52c5 Mon Sep 17 00:00:00 2001 From: armyhaylenko Date: Tue, 4 Feb 2025 18:57:31 +0200 Subject: [PATCH] feat(etl): rework etl args, add accounts selector functionality * Get rid of any required JSON configuration for ETL * Add accounts selector functionality to ETL --- .gitignore | 2 + Cargo.lock | 2 +- Makefile | 2 +- README.md | 32 +++++--- accounts-selector-config.json | 9 +++ etl-config.json | 5 -- plerkle_snapshot/Cargo.toml | 2 +- .../solana-snapshot-etl/accounts_selector.rs | 67 ++++++++++++++++ .../src/bin/solana-snapshot-etl/geyser.rs | 80 +++++++++++-------- .../src/bin/solana-snapshot-etl/main.rs | 56 ++++++------- 10 files changed, 170 insertions(+), 87 deletions(-) create mode 100644 accounts-selector-config.json delete mode 100644 etl-config.json create mode 100644 plerkle_snapshot/src/bin/solana-snapshot-etl/accounts_selector.rs diff --git a/.gitignore b/.gitignore index 3a7b6e1d..c21be9f4 100644 --- a/.gitignore +++ b/.gitignore @@ -22,3 +22,5 @@ test-ledger programs docker/geyser-outputs docker/solana-outputs +.env +snapshot diff --git a/Cargo.lock b/Cargo.lock index 261112da..e1bbb8e0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3048,7 +3048,7 @@ dependencies = [ [[package]] name = "plerkle_snapshot" -version = "0.4.0" +version = "0.5.0" dependencies = [ "agave-geyser-plugin-interface", "async-trait", diff --git a/Makefile b/Makefile index de4c05c5..39e62d74 100644 --- a/Makefile +++ b/Makefile @@ -13,4 +13,4 @@ build: @docker build -f Dockerfile . -t ${IMAGE_NAME} stream: - for f in $(shell ls ${SNAPSHOTDIR}); do echo $$(realpath $${f}) && docker run --env-file .env -p 3000:3000 --rm -it --mount type=bind,source=$$(realpath $${f}),target=$$(realpath $${f}),readonly --mount type=bind,source=$$(pwd)/etl-config.json,target=/app/etl-config.json,readonly ${IMAGE_NAME} $$(realpath $${f}) --geyser=/app/etl-config.json && date; done + for f in $(shell ls ${SNAPSHOTDIR}); do echo $$(realpath $${f}) && docker run --env-file .env -p 3000:3000 --rm -it --mount type=bind,source=$$(realpath $${f}),target=$$(realpath $${f}),readonly --mount type=bind,source=$$(pwd)/accounts-selector-config.json,target=/app/accounts-selector-config.json,readonly ${IMAGE_NAME} $$(realpath $${f}) --accounts-selector-config=/app/accounts-selector-config.json && date; done diff --git a/README.md b/README.md index e678cb10..85f3d0f4 100644 --- a/README.md +++ b/README.md @@ -32,12 +32,19 @@ If you are using this plugin for your bespoke use case then the build steps are ### Building Locally -**NOTE -> M1 macs may have issues. Linux is best.** +#### Linux `cargo build` for debug or `cargo build --release` for a release build. -You will now have a libplerkle.so file in the target folder. This is the binary that you will pass into the validator using the following option. +You will now have a libplerkle.so file in the target folder. + +#### Mac + +Building is similar to Linux, except for the extension of the library produced. +Instead of a `.so` file, look for `libplerkle.dylib`. The loader does not really care what extension to link, as long as it's a proper dynamically linked object, such as a `dylib`. + +### Configuration ```bash --geyser-plugin-config plugin-config.json @@ -78,10 +85,9 @@ PLUGIN_MESSENGER_CONFIG='{ messenger_type="Redis", connection_config={ redis_con The PLUGIN_MESSENGER_CONFIG determines which compiled messenger to select and a specific configuration for the messenger. - #### Additional Configuration Examples -***Producer Configuration*** +**_Producer Configuration_** - "pipeline_size_bytes" - Maximum command size, roughly equates to the payload size. This setting locally buffers bytes in a queue to be flushed when the buffer grows past the desired amount. Default is 512MB (max redis command size) / 1000, maximum is 512MB (max redis command size) / 1000. You should test your optimal size to avoid high send latency and avoid RTT. - "local_buffer_max_window" - Maximum time to wait for the buffer to fill be for flushing. For lower traffic you dont want to be waiting around so set a max window and it will send at a minumum of every X milliseconds . Default 10 @@ -92,9 +98,8 @@ The PLUGIN_MESSENGER_CONFIG determines which compiled messenger to select and a - "transaction_stream_size" - default value 10_000_000 - "block_stream_size" - default value 100_000 - ``` -Lower Scale Low network latency +Lower Scale Low network latency PLUGIN_MESSENGER_CONFIG='{pipeline_size_bytes=1000000,local_buffer_max_window=10, messenger_type="Redis", connection_config={ redis_connection_str="redis://redis" } }' @@ -105,11 +110,11 @@ PLUGIN_MESSENGER_CONFIG='{pipeline_size_bytes=50000000,local_buffer_max_window=5 ``` -***Consumer Configuration*** +**_Consumer Configuration_** - "retries" - Amount of times to deliver the message. If delivered this many times and not ACKed, then it is deleted - "batch_size" - Max Amout of messages to grab within the wait timeout window. -- "message_wait_timeout" - Amount of time the consumer will keep the stream open and wait for messages +- "message_wait_timeout" - Amount of time the consumer will keep the stream open and wait for messages - "idle_timeout" - Amount of time a consumer can have the message before it goes back on the queue - "consumer_id" - VERY important. This is used to scale horizontally so messages arent duplicated over instances.Make sure this is different per instance @@ -125,11 +130,12 @@ PLUGIN_BLOCK_STREAM_SIZE=250000 NOTE: in 1.4.0 we are not sending to slot status. - ### Metrics + The plugin exposes the following statsd metrics + - count plugin.startup -> times the plugin started -- time message_send_queue_time -> time spent on messenger internal buffer +- time message_send_queue_time -> time spent on messenger internal buffer - time message_send_latency -> rtt time to messenger bus - count account_seen_event , tags: owner , is_startup -> number of account events filtered and seen - time startup.timer -> startup flush timer @@ -174,9 +180,9 @@ plerkle_serialization-https://crates.io/crates/plerkle_serialization ## Snapshot ETL -The Plerkle snapshot tool can be used for parsing Solana account snapshots. The repository already includes pre-configured geyser-config.json and etl-config.json files, which are ready to use. The only thing you might want to modify is the list of programs in geyser-config.json; otherwise, you can leave the configurations as they are. +The Plerkle snapshot tool can be used for parsing Solana account snapshots. The repository already includes a pre-configured `accounts-selector-config.json` file, which is ready to use. The only thing you might want to modify is the list of programs in etl-config.json; otherwise, you can leave the configurations as they are. -Before running the tool, it's important to create an .env file, modeled after .env.example. In this file, you should specify the path to the directory containing the snapshots as well as the Plerkle messenger configuration. +Before running the tool, it's important to create an .env file, modeled after .env.example. In this file, you should specify the path to the directory containing the snapshots as well as the snapshot redis connection details. Once everything is set up, you can build the Docker container for ETL by running: @@ -192,4 +198,4 @@ The next step is to run the ETL: make stream ``` -This command will launch the ETL Docker container. It will load the snapshot archives, the Geyser plugin binary, and stream all the accounts from the snapshot to the plugin. \ No newline at end of file +This command will launch the ETL Docker container. It will load the snapshot archives, the Geyser plugin binary, and stream all the accounts from the snapshot to the plugin. diff --git a/accounts-selector-config.json b/accounts-selector-config.json new file mode 100644 index 00000000..fa829c4c --- /dev/null +++ b/accounts-selector-config.json @@ -0,0 +1,9 @@ +{ + "owners": [ + "metaqbxxUerdq28cj1RbAWkYQm3ybzjb6a8bt518x1s", + "TokenzQdBNbLqP5VEhdkAS6EPFLC1PHnBqCXEpPxuEb", + "TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA", + "CoREENxT6tW1HoK8ypY1SxRMZTcVPm7R94rH4PZNhX7d" + ], + "select_all_accounts": false +} diff --git a/etl-config.json b/etl-config.json deleted file mode 100644 index fc55004f..00000000 --- a/etl-config.json +++ /dev/null @@ -1,5 +0,0 @@ -{ - "libpath": "/app/target/release/libplerkle.so", - "geyser_conf_path": "/app/geyser-config.json", - "throttle_nanos": 0 -} \ No newline at end of file diff --git a/plerkle_snapshot/Cargo.toml b/plerkle_snapshot/Cargo.toml index 6693faf9..11ab767f 100644 --- a/plerkle_snapshot/Cargo.toml +++ b/plerkle_snapshot/Cargo.toml @@ -1,7 +1,7 @@ [package] # Renamed from original "solana-snapshot-etl" name = "plerkle_snapshot" -version = "0.4.0" +version = "0.5.0" edition = "2021" license = "Apache-2.0" documentation = "https://docs.rs/solana-snapshot-etl" diff --git a/plerkle_snapshot/src/bin/solana-snapshot-etl/accounts_selector.rs b/plerkle_snapshot/src/bin/solana-snapshot-etl/accounts_selector.rs new file mode 100644 index 00000000..dd3312b3 --- /dev/null +++ b/plerkle_snapshot/src/bin/solana-snapshot-etl/accounts_selector.rs @@ -0,0 +1,67 @@ +//! Copied from plerkle/src/accounts_selector.rs with some API-changing improvements. + +use std::collections::HashSet; +use tracing::*; + +const fn select_all_accounts_by_default() -> bool { + true +} + +#[derive(Debug, serde::Deserialize)] +pub(crate) struct AccountsSelectorConfig { + #[serde(default)] + pub accounts: Vec, + #[serde(default)] + pub owners: Vec, + #[serde(default = "select_all_accounts_by_default")] + pub select_all_accounts: bool, +} + +impl Default for AccountsSelectorConfig { + fn default() -> Self { + Self { + accounts: vec![], + owners: vec![], + select_all_accounts: select_all_accounts_by_default(), + } + } +} + +#[derive(Clone)] +pub(crate) struct AccountsSelector { + pub accounts: HashSet>, + pub owners: HashSet>, + pub select_all_accounts: bool, +} + +impl AccountsSelector { + pub fn new(config: AccountsSelectorConfig) -> Self { + let AccountsSelectorConfig { + accounts, + owners, + select_all_accounts, + } = config; + info!( + "Creating AccountsSelector from accounts: {:?}, owners: {:?}", + accounts, owners + ); + + let accounts = accounts + .iter() + .map(|key| bs58::decode(key).into_vec().unwrap()) + .collect(); + let owners = owners + .iter() + .map(|key| bs58::decode(key).into_vec().unwrap()) + .collect(); + AccountsSelector { + accounts, + owners, + select_all_accounts, + } + } + + pub fn is_account_selected(&self, account: &[u8], owner: &[u8]) -> bool { + self.select_all_accounts || self.accounts.contains(account) || self.owners.contains(owner) + } +} diff --git a/plerkle_snapshot/src/bin/solana-snapshot-etl/geyser.rs b/plerkle_snapshot/src/bin/solana-snapshot-etl/geyser.rs index b4295eec..bed148bc 100644 --- a/plerkle_snapshot/src/bin/solana-snapshot-etl/geyser.rs +++ b/plerkle_snapshot/src/bin/solana-snapshot-etl/geyser.rs @@ -1,33 +1,34 @@ // TODO add multi-threading -use agave_geyser_plugin_interface::geyser_plugin_interface::{ - GeyserPlugin, ReplicaAccountInfo, ReplicaAccountInfoVersions, -}; +use agave_geyser_plugin_interface::geyser_plugin_interface::ReplicaAccountInfo; use figment::value::{Map, Tag}; use indicatif::{ProgressBar, ProgressStyle}; use plerkle_messenger::redis_messenger::RedisMessenger; use plerkle_messenger::{MessageStreamer, MessengerConfig}; use plerkle_serialization::serializer::serialize_account; use plerkle_snapshot::append_vec::StoredMeta; -use solana_sdk::account::{Account, AccountSharedData}; +use solana_sdk::account::{Account, AccountSharedData, ReadableAccount}; use std::error::Error; use std::sync::atomic::AtomicU64; use std::sync::atomic::Ordering; use std::sync::Arc; use tokio::sync::Mutex; +use crate::accounts_selector::AccountsSelector; + const ACCOUNT_STREAM_KEY: &str = "ACC"; #[derive(Clone)] pub(crate) struct GeyserDumper { messenger: Arc>, throttle_nanos: u64, + accounts_selector: AccountsSelector, pub accounts_spinner: ProgressBar, pub accounts_count: Arc, } impl GeyserDumper { - pub(crate) async fn new(throttle_nanos: u64) -> Self { + pub(crate) async fn new(throttle_nanos: u64, accounts_selector: AccountsSelector) -> Self { // TODO dedup spinner definitions let spinner_style = ProgressStyle::with_template( "{prefix:>10.bold.dim} {spinner} rate={per_sec} total={human_pos}", @@ -64,6 +65,7 @@ impl GeyserDumper { Self { messenger: Arc::new(Mutex::new(messenger)), accounts_spinner, + accounts_selector, accounts_count: Arc::new(AtomicU64::new(0)), throttle_nanos, } @@ -74,38 +76,46 @@ impl GeyserDumper { (meta, account): (StoredMeta, AccountSharedData), slot: u64, ) -> Result<(), Box> { - let account: Account = account.into(); - // Get runtime and sender channel. - // Serialize data. - let ai = &ReplicaAccountInfo { - pubkey: meta.pubkey.as_ref(), - lamports: account.lamports, - owner: account.owner.as_ref(), - executable: account.executable, - rent_epoch: account.rent_epoch, - data: &account.data, - write_version: meta.write_version, - }; - let account = - plerkle_serialization::solana_geyser_plugin_interface_shims::ReplicaAccountInfoV2 { - pubkey: ai.pubkey, - lamports: ai.lamports, - owner: ai.owner, - executable: ai.executable, - rent_epoch: ai.rent_epoch, - data: ai.data, - write_version: ai.write_version, - txn_signature: None, + if self + .accounts_selector + .is_account_selected(meta.pubkey.as_ref(), account.owner().as_ref()) + { + let account: Account = account.into(); + // Serialize data. + let ai = &ReplicaAccountInfo { + pubkey: meta.pubkey.as_ref(), + lamports: account.lamports, + owner: account.owner.as_ref(), + executable: account.executable, + rent_epoch: account.rent_epoch, + data: &account.data, + write_version: meta.write_version, }; - let builder = flatbuffers::FlatBufferBuilder::new(); - let builder = serialize_account(builder, &account, slot, false); - let data = builder.finished_data(); + let account = + plerkle_serialization::solana_geyser_plugin_interface_shims::ReplicaAccountInfoV2 { + pubkey: ai.pubkey, + lamports: ai.lamports, + owner: ai.owner, + executable: ai.executable, + rent_epoch: ai.rent_epoch, + data: ai.data, + write_version: ai.write_version, + txn_signature: None, + }; + let builder = flatbuffers::FlatBufferBuilder::new(); + let builder = serialize_account(builder, &account, slot, false); + let data = builder.finished_data(); + + self.messenger + .lock() + .await + .send(ACCOUNT_STREAM_KEY, data) + .await?; + } else { + tracing::trace!(?account, ?meta, "Account filtered out by accounts selector"); + return Ok(()); + } - self.messenger - .lock() - .await - .send(ACCOUNT_STREAM_KEY, data) - .await?; let prev = self.accounts_count.fetch_add(1, Ordering::Relaxed); self.accounts_spinner.set_position(prev + 1); diff --git a/plerkle_snapshot/src/bin/solana-snapshot-etl/main.rs b/plerkle_snapshot/src/bin/solana-snapshot-etl/main.rs index 6298cdf7..5a14aa8e 100644 --- a/plerkle_snapshot/src/bin/solana-snapshot-etl/main.rs +++ b/plerkle_snapshot/src/bin/solana-snapshot-etl/main.rs @@ -1,5 +1,8 @@ +mod accounts_selector; +mod geyser; +mod mpl_metadata; + use crate::geyser::GeyserDumper; -use agave_geyser_plugin_interface::geyser_plugin_interface::GeyserPlugin; use clap::Parser; use indicatif::{ProgressBar, ProgressBarIter, ProgressStyle}; use plerkle_snapshot::archived::ArchiveSnapshotExtractor; @@ -8,22 +11,26 @@ use plerkle_snapshot::{ append_vec_iter, AppendVecIterator, ReadProgressTracking, SnapshotExtractor, }; use reqwest::blocking::Response; -use serde::Deserialize; use std::fs::File; use std::io::{IoSliceMut, Read}; -use std::path::Path; +use std::path::{Path, PathBuf}; use tracing::{info, warn}; -mod geyser; -mod mpl_metadata; +use self::accounts_selector::{AccountsSelector, AccountsSelectorConfig}; #[derive(Parser, Debug)] #[clap(author, version, about, long_about = None)] struct Args { #[clap(help = "Snapshot source (unpacked snapshot, archive file, or HTTP link)")] source: String, - #[clap(long, help = "Load Geyser plugin from given config file")] - geyser: String, + #[clap( + long, + default_value_t = 0, + help = "Throttle nanoseconds between the dumping of individual accounts" + )] + throttle_nanos: u64, + #[clap(long, help = "Path to accounts selector config (optional)")] + accounts_selector_config: Option, } #[tokio::main(flavor = "multi_thread")] @@ -38,15 +45,20 @@ async fn main() -> Result<(), Box> { } let args = Args::parse(); + let accounts_selector_config = args + .accounts_selector_config + .and_then(|path| { + std::fs::read(path).ok().map(|slice| { + serde_json::from_slice::(&slice) + .expect("could not decode accounts selector config!") + }) + }) + .unwrap_or_default(); + let accounts_selector = AccountsSelector::new(accounts_selector_config); let mut loader = SupportedLoader::new(&args.source, Box::new(LoadProgressTracking {}))?; - info!("Dumping to Geyser plugin: {}", &args.geyser); - - let cfg = Config::read(&args.geyser) - .map_err(|e| format!("Config error: {}", e.to_string())) - .unwrap(); - let mut dumper = GeyserDumper::new(cfg.throttle_nanos).await; + let mut dumper = GeyserDumper::new(args.throttle_nanos, accounts_selector).await; for append_vec in loader.iter() { let append_vec = append_vec.unwrap(); let slot = append_vec.get_slot(); @@ -71,24 +83,6 @@ async fn main() -> Result<(), Box> { Ok(()) } -#[derive(Deserialize)] -pub struct Config { - // path to the built Geyser binary - pub libpath: String, - // path to the Geyser config file - pub geyser_conf_path: String, - pub throttle_nanos: u64, -} - -impl Config { - pub fn read(path: &str) -> Result { - let data = std::fs::read_to_string(path)?; - let c: Config = serde_json::from_str(data.as_str())?; - - Ok(c) - } -} - struct LoadProgressTracking {} impl ReadProgressTracking for LoadProgressTracking {