Skip to content

Commit

Permalink
feat(etl): rework etl args, add accounts selector functionality
Browse files Browse the repository at this point in the history
* Get rid of any required JSON configuration for ETL
* Add accounts selector functionality to ETL
  • Loading branch information
armyhaylenko committed Feb 4, 2025
1 parent b1be37c commit 0b371bd
Show file tree
Hide file tree
Showing 10 changed files with 170 additions and 87 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,5 @@ test-ledger
programs
docker/geyser-outputs
docker/solana-outputs
.env
snapshot
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,4 @@ build:
@docker build -f Dockerfile . -t ${IMAGE_NAME}

stream:
for f in $(shell ls ${SNAPSHOTDIR}); do echo $$(realpath $${f}) && docker run --env-file .env -p 3000:3000 --rm -it --mount type=bind,source=$$(realpath $${f}),target=$$(realpath $${f}),readonly --mount type=bind,source=$$(pwd)/etl-config.json,target=/app/etl-config.json,readonly ${IMAGE_NAME} $$(realpath $${f}) --geyser=/app/etl-config.json && date; done
for f in $(shell ls ${SNAPSHOTDIR}); do echo $$(realpath $${f}) && docker run --env-file .env -p 3000:3000 --rm -it --mount type=bind,source=$$(realpath $${f}),target=$$(realpath $${f}),readonly --mount type=bind,source=$$(pwd)/accounts-selector-config.json,target=/app/accounts-selector-config.json,readonly ${IMAGE_NAME} $$(realpath $${f}) --accounts-selector-config=/app/accounts-selector-config.json && date; done
32 changes: 19 additions & 13 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,19 @@ If you are using this plugin for your bespoke use case then the build steps are

### Building Locally

**NOTE -> M1 macs may have issues. Linux is best.**
#### Linux

`cargo build` for debug or
`cargo build --release` for a release build.

You will now have a libplerkle.so file in the target folder. This is the binary that you will pass into the validator using the following option.
You will now have a libplerkle.so file in the target folder.

#### Mac

Building is similar to Linux, except for the extension of the library produced.
Instead of a `.so` file, look for `libplerkle.dylib`. The loader does not really care what extension to link, as long as it's a proper dynamically linked object, such as a `dylib`.

### Configuration

```bash
--geyser-plugin-config plugin-config.json
Expand Down Expand Up @@ -78,10 +85,9 @@ PLUGIN_MESSENGER_CONFIG='{ messenger_type="Redis", connection_config={ redis_con

The PLUGIN_MESSENGER_CONFIG determines which compiled messenger to select and a specific configuration for the messenger.


#### Additional Configuration Examples

***Producer Configuration***
**_Producer Configuration_**

- "pipeline_size_bytes" - Maximum command size, roughly equates to the payload size. This setting locally buffers bytes in a queue to be flushed when the buffer grows past the desired amount. Default is 512MB (max redis command size) / 1000, maximum is 512MB (max redis command size) / 1000. You should test your optimal size to avoid high send latency and avoid RTT.
- "local_buffer_max_window" - Maximum time to wait for the buffer to fill be for flushing. For lower traffic you dont want to be waiting around so set a max window and it will send at a minumum of every X milliseconds . Default 10
Expand All @@ -92,9 +98,8 @@ The PLUGIN_MESSENGER_CONFIG determines which compiled messenger to select and a
- "transaction_stream_size" - default value 10_000_000
- "block_stream_size" - default value 100_000


```
Lower Scale Low network latency
Lower Scale Low network latency
PLUGIN_MESSENGER_CONFIG='{pipeline_size_bytes=1000000,local_buffer_max_window=10, messenger_type="Redis", connection_config={ redis_connection_str="redis://redis" } }'
Expand All @@ -105,11 +110,11 @@ PLUGIN_MESSENGER_CONFIG='{pipeline_size_bytes=50000000,local_buffer_max_window=5
```

***Consumer Configuration***
**_Consumer Configuration_**

- "retries" - Amount of times to deliver the message. If delivered this many times and not ACKed, then it is deleted
- "batch_size" - Max Amout of messages to grab within the wait timeout window.
- "message_wait_timeout" - Amount of time the consumer will keep the stream open and wait for messages
- "message_wait_timeout" - Amount of time the consumer will keep the stream open and wait for messages
- "idle_timeout" - Amount of time a consumer can have the message before it goes back on the queue
- "consumer_id" - VERY important. This is used to scale horizontally so messages arent duplicated over instances.Make sure this is different per instance

Expand All @@ -125,11 +130,12 @@ PLUGIN_BLOCK_STREAM_SIZE=250000

NOTE: in 1.4.0 we are not sending to slot status.


### Metrics

The plugin exposes the following statsd metrics

- count plugin.startup -> times the plugin started
- time message_send_queue_time -> time spent on messenger internal buffer
- time message_send_queue_time -> time spent on messenger internal buffer
- time message_send_latency -> rtt time to messenger bus
- count account_seen_event , tags: owner , is_startup -> number of account events filtered and seen
- time startup.timer -> startup flush timer
Expand Down Expand Up @@ -174,9 +180,9 @@ plerkle_serialization-https://crates.io/crates/plerkle_serialization

## Snapshot ETL

The Plerkle snapshot tool can be used for parsing Solana account snapshots. The repository already includes pre-configured geyser-config.json and etl-config.json files, which are ready to use. The only thing you might want to modify is the list of programs in geyser-config.json; otherwise, you can leave the configurations as they are.
The Plerkle snapshot tool can be used for parsing Solana account snapshots. The repository already includes a pre-configured `accounts-selector-config.json` file, which is ready to use. The only thing you might want to modify is the list of programs in etl-config.json; otherwise, you can leave the configurations as they are.

Before running the tool, it's important to create an .env file, modeled after .env.example. In this file, you should specify the path to the directory containing the snapshots as well as the Plerkle messenger configuration.
Before running the tool, it's important to create an .env file, modeled after .env.example. In this file, you should specify the path to the directory containing the snapshots as well as the snapshot redis connection details.

Once everything is set up, you can build the Docker container for ETL by running:

Expand All @@ -192,4 +198,4 @@ The next step is to run the ETL:
make stream
```

This command will launch the ETL Docker container. It will load the snapshot archives, the Geyser plugin binary, and stream all the accounts from the snapshot to the plugin.
This command will launch the ETL Docker container. It will load the snapshot archives, the Geyser plugin binary, and stream all the accounts from the snapshot to the plugin.
9 changes: 9 additions & 0 deletions accounts-selector-config.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
{
"owners": [
"metaqbxxUerdq28cj1RbAWkYQm3ybzjb6a8bt518x1s",
"TokenzQdBNbLqP5VEhdkAS6EPFLC1PHnBqCXEpPxuEb",
"TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA",
"CoREENxT6tW1HoK8ypY1SxRMZTcVPm7R94rH4PZNhX7d"
],
"select_all_accounts": false
}
5 changes: 0 additions & 5 deletions etl-config.json

This file was deleted.

2 changes: 1 addition & 1 deletion plerkle_snapshot/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[package]
# Renamed from original "solana-snapshot-etl"
name = "plerkle_snapshot"
version = "0.4.0"
version = "0.5.0"
edition = "2021"
license = "Apache-2.0"
documentation = "https://docs.rs/solana-snapshot-etl"
Expand Down
67 changes: 67 additions & 0 deletions plerkle_snapshot/src/bin/solana-snapshot-etl/accounts_selector.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
//! Copied from plerkle/src/accounts_selector.rs with some API-changing improvements.
use std::collections::HashSet;
use tracing::*;

const fn select_all_accounts_by_default() -> bool {
true
}

#[derive(Debug, serde::Deserialize)]
pub(crate) struct AccountsSelectorConfig {
#[serde(default)]
pub accounts: Vec<String>,
#[serde(default)]
pub owners: Vec<String>,
#[serde(default = "select_all_accounts_by_default")]
pub select_all_accounts: bool,
}

impl Default for AccountsSelectorConfig {
fn default() -> Self {
Self {
accounts: vec![],
owners: vec![],
select_all_accounts: select_all_accounts_by_default(),
}
}
}

#[derive(Clone)]
pub(crate) struct AccountsSelector {
pub accounts: HashSet<Vec<u8>>,
pub owners: HashSet<Vec<u8>>,
pub select_all_accounts: bool,
}

impl AccountsSelector {
pub fn new(config: AccountsSelectorConfig) -> Self {
let AccountsSelectorConfig {
accounts,
owners,
select_all_accounts,
} = config;
info!(
"Creating AccountsSelector from accounts: {:?}, owners: {:?}",
accounts, owners
);

let accounts = accounts
.iter()
.map(|key| bs58::decode(key).into_vec().unwrap())
.collect();
let owners = owners
.iter()
.map(|key| bs58::decode(key).into_vec().unwrap())
.collect();
AccountsSelector {
accounts,
owners,
select_all_accounts,
}
}

pub fn is_account_selected(&self, account: &[u8], owner: &[u8]) -> bool {
self.select_all_accounts || self.accounts.contains(account) || self.owners.contains(owner)
}
}
80 changes: 45 additions & 35 deletions plerkle_snapshot/src/bin/solana-snapshot-etl/geyser.rs
Original file line number Diff line number Diff line change
@@ -1,33 +1,34 @@
// TODO add multi-threading

use agave_geyser_plugin_interface::geyser_plugin_interface::{
GeyserPlugin, ReplicaAccountInfo, ReplicaAccountInfoVersions,
};
use agave_geyser_plugin_interface::geyser_plugin_interface::ReplicaAccountInfo;
use figment::value::{Map, Tag};
use indicatif::{ProgressBar, ProgressStyle};
use plerkle_messenger::redis_messenger::RedisMessenger;
use plerkle_messenger::{MessageStreamer, MessengerConfig};
use plerkle_serialization::serializer::serialize_account;
use plerkle_snapshot::append_vec::StoredMeta;
use solana_sdk::account::{Account, AccountSharedData};
use solana_sdk::account::{Account, AccountSharedData, ReadableAccount};
use std::error::Error;
use std::sync::atomic::AtomicU64;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use tokio::sync::Mutex;

use crate::accounts_selector::AccountsSelector;

const ACCOUNT_STREAM_KEY: &str = "ACC";

#[derive(Clone)]
pub(crate) struct GeyserDumper {
messenger: Arc<Mutex<RedisMessenger>>,
throttle_nanos: u64,
accounts_selector: AccountsSelector,
pub accounts_spinner: ProgressBar,
pub accounts_count: Arc<AtomicU64>,
}

impl GeyserDumper {
pub(crate) async fn new(throttle_nanos: u64) -> Self {
pub(crate) async fn new(throttle_nanos: u64, accounts_selector: AccountsSelector) -> Self {
// TODO dedup spinner definitions
let spinner_style = ProgressStyle::with_template(
"{prefix:>10.bold.dim} {spinner} rate={per_sec} total={human_pos}",
Expand Down Expand Up @@ -64,6 +65,7 @@ impl GeyserDumper {
Self {
messenger: Arc::new(Mutex::new(messenger)),
accounts_spinner,
accounts_selector,
accounts_count: Arc::new(AtomicU64::new(0)),
throttle_nanos,
}
Expand All @@ -74,38 +76,46 @@ impl GeyserDumper {
(meta, account): (StoredMeta, AccountSharedData),
slot: u64,
) -> Result<(), Box<dyn Error>> {
let account: Account = account.into();
// Get runtime and sender channel.
// Serialize data.
let ai = &ReplicaAccountInfo {
pubkey: meta.pubkey.as_ref(),
lamports: account.lamports,
owner: account.owner.as_ref(),
executable: account.executable,
rent_epoch: account.rent_epoch,
data: &account.data,
write_version: meta.write_version,
};
let account =
plerkle_serialization::solana_geyser_plugin_interface_shims::ReplicaAccountInfoV2 {
pubkey: ai.pubkey,
lamports: ai.lamports,
owner: ai.owner,
executable: ai.executable,
rent_epoch: ai.rent_epoch,
data: ai.data,
write_version: ai.write_version,
txn_signature: None,
if self
.accounts_selector
.is_account_selected(meta.pubkey.as_ref(), account.owner().as_ref())
{
let account: Account = account.into();
// Serialize data.
let ai = &ReplicaAccountInfo {
pubkey: meta.pubkey.as_ref(),
lamports: account.lamports,
owner: account.owner.as_ref(),
executable: account.executable,
rent_epoch: account.rent_epoch,
data: &account.data,
write_version: meta.write_version,
};
let builder = flatbuffers::FlatBufferBuilder::new();
let builder = serialize_account(builder, &account, slot, false);
let data = builder.finished_data();
let account =
plerkle_serialization::solana_geyser_plugin_interface_shims::ReplicaAccountInfoV2 {
pubkey: ai.pubkey,
lamports: ai.lamports,
owner: ai.owner,
executable: ai.executable,
rent_epoch: ai.rent_epoch,
data: ai.data,
write_version: ai.write_version,
txn_signature: None,
};
let builder = flatbuffers::FlatBufferBuilder::new();
let builder = serialize_account(builder, &account, slot, false);
let data = builder.finished_data();

self.messenger
.lock()
.await
.send(ACCOUNT_STREAM_KEY, data)
.await?;
} else {
tracing::trace!(?account, ?meta, "Account filtered out by accounts selector");
return Ok(());
}

self.messenger
.lock()
.await
.send(ACCOUNT_STREAM_KEY, data)
.await?;
let prev = self.accounts_count.fetch_add(1, Ordering::Relaxed);
self.accounts_spinner.set_position(prev + 1);

Expand Down
Loading

0 comments on commit 0b371bd

Please sign in to comment.