Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a minimal but extensible indexer #4573

Closed
wants to merge 18 commits into from
Closed
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
462 changes: 459 additions & 3 deletions Cargo.lock

Large diffs are not rendered by default.

4 changes: 4 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ members = [
"crates/bin/pcli",
"crates/bin/pclientd",
"crates/bin/pd",
"crates/bin/pindexer",
"crates/cnidarium",
"crates/cnidarium-component",
"crates/core/app",
Expand Down Expand Up @@ -51,6 +52,7 @@ members = [
"crates/util/auto-https",
"crates/util/tendermint-proxy",
"crates/util/tower-trace",
"crates/util/cometindex",
"crates/view",
"crates/wallet",
"tools/summonerd",
Expand Down Expand Up @@ -135,6 +137,7 @@ chrono = { default-features = false, version = "0.4" }
clap = { version = "3.2" }
cnidarium = { default-features = false, path = "crates/cnidarium" }
cnidarium-component = { default-features = false, path = "crates/cnidarium-component" }
cometindex = { path = "crates/util/cometindex" }
criterion = { version = "0.4" }
decaf377 = { default-features = false, version = "0.5" }
decaf377-fmd = { path = "crates/crypto/decaf377-fmd" }
Expand Down Expand Up @@ -213,6 +216,7 @@ serde_json = { version = "1.0.96" }
serde_unit_struct = { version = "0.1" }
serde_with = { version = "3.5.1" }
sha2 = { version = "0.10" }
sqlx = { version = "0.7", features = ["postgres", "runtime-tokio", "tls-rustls"] }
tap = "1.0.1"
tempfile = { version = "3.3.0" }
tendermint = { default-features = false, version = "0.34.0" }
Expand Down
18 changes: 18 additions & 0 deletions crates/bin/pindexer/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
[package]
name = "pindexer"
version = {workspace = true}
authors = {workspace = true}
edition = {workspace = true}
repository = {workspace = true}
homepage = {workspace = true}
license = {workspace = true}
publish = false

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
cometindex = {workspace = true}
penumbra-shielded-pool = {workspace = true, default-features = false}
penumbra-proto = {workspace = true, default-features = false}
tokio = {workspace = true, features = ["full"]}
anyhow = {workspace = true}
3 changes: 3 additions & 0 deletions crates/bin/pindexer/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
pub use cometindex::Indexer;
cratelyn marked this conversation as resolved.
Show resolved Hide resolved

pub mod shielded_pool;
13 changes: 13 additions & 0 deletions crates/bin/pindexer/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
use anyhow::Result;
use pindexer::{shielded_pool::fmd::ClueSet, Indexer};

#[tokio::main]
async fn main() -> Result<()> {
Indexer::new()
.with_default_tracing()
.with_index(ClueSet {})
.run()
.await?;

Ok(())
}
1 change: 1 addition & 0 deletions crates/bin/pindexer/src/shielded_pool.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
pub mod fmd;
56 changes: 56 additions & 0 deletions crates/bin/pindexer/src/shielded_pool/fmd.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
use cometindex::{async_trait, sqlx, AppView, ContextualizedEvent, PgTransaction};
use penumbra_proto::{core::component::shielded_pool::v1 as pb, event::ProtoEvent};

#[derive(Debug)]
pub struct ClueSet {}

#[async_trait]
impl AppView for ClueSet {
async fn init_chain(&self, dbtx: &mut PgTransaction) -> Result<(), anyhow::Error> {
sqlx::query(
// table name is module path + struct name
"
CREATE TABLE IF NOT EXISTS shielded_pool_fmd_clue_set (
id SERIAL PRIMARY KEY,
clue_bytes BYTEA NOT NULL,
tx_hash BYTEA NOT NULL
);
",
)
.execute(dbtx.as_mut())
.await?;
Ok(())
}

fn is_relevant(&self, type_str: &str) -> bool {
type_str == "penumbra.core.component.shielded_pool.v1.EventBroadcastClue"
}

async fn index_event(
&self,
dbtx: &mut PgTransaction,
event: &ContextualizedEvent,
) -> Result<(), anyhow::Error> {
let pe = pb::EventBroadcastClue::from_event(event.as_ref())?;

let clue_bytes = pe
.clue
.ok_or_else(|| anyhow::anyhow!("clue event missing clue"))?
.inner;

let tx_hash = event.tx_hash.as_ref().expect("tx_hash not found").to_vec();

sqlx::query(
"
INSERT INTO shielded_pool_fmd_clue_set (clue_bytes, tx_hash)
VALUES ($1, $2)
",
)
.bind(&clue_bytes)
.bind(&tx_hash)
.execute(dbtx.as_mut())
.await?;

Ok(())
}
}
24 changes: 24 additions & 0 deletions crates/util/cometindex/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
[package]
name = "cometindex"
version = {workspace = true}
authors = {workspace = true}
edition = {workspace = true}
description = "A library for building indexers for CometBFT events"
repository = {workspace = true}
homepage = {workspace = true}
license = {workspace = true}
publish = false

[dependencies]
tokio = {workspace = true, features = ["full"]}
clap = {workspace = true, features = ["derive", "env"]}
anyhow = {workspace = true}
tracing = {workspace = true}
tracing-subscriber = {workspace = true}
sqlx = {workspace = true, features = ["postgres", "json", "runtime-tokio", "tls-rustls"] }
async-trait = {workspace = true}
tendermint = {workspace = true}
serde_json = {workspace = true}
futures = {workspace = true}
hex = {workspace = true}

69 changes: 69 additions & 0 deletions crates/util/cometindex/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@


# macos

```
brew install postgresql@16
```
start
```
brew services start postgresql@16
```
create database `testnet_raw`
```
$ psql postgres
psql (16.3 (Homebrew))
Type "help" for help.

postgres=# CREATE DATABASE testnet_raw;
CREATE DATABASE
postgres=#
\q
```
test connection
```
psql "postgresql://localhost:5432/testnet_raw?sslmode=disable"
```
put in comet config.toml
```
[tx_index]
indexer = "psql"
psql-conn = "postgresql://localhost:5432/testnet_raw?sslmode=disable"
```

example data
```
testnet_raw=# SELECT * FROM events JOIN blocks ON events.block_id = blocks.rowid WHERE blocks.height = 426939 ;
rowid | block_id | tx_id | type | rowid | height | chain_id | created_at
-------+----------+-------+-------------------------------------------------------------+-------+--------+---------------------------+-------------------------------
99043 | 7534 | | block | 7534 | 426939 | penumbra-testnet-deimos-8 | 2024-06-07 01:38:33.440578-04
99044 | 7534 | | penumbra.core.component.sct.v1.EventAnchor | 7534 | 426939 | penumbra-testnet-deimos-8 | 2024-06-07 01:38:33.440578-04
99045 | 7534 | | penumbra.core.component.sct.v1.EventBlockRoot | 7534 | 426939 | penumbra-testnet-deimos-8 | 2024-06-07 01:38:33.440578-04
99046 | 7534 | 4350 | tx | 7534 | 426939 | penumbra-testnet-deimos-8 | 2024-06-07 01:38:33.440578-04
99047 | 7534 | 4350 | tx | 7534 | 426939 | penumbra-testnet-deimos-8 | 2024-06-07 01:38:33.440578-04
99048 | 7534 | 4350 | penumbra.core.component.shielded_pool.v1.EventSpend | 7534 | 426939 | penumbra-testnet-deimos-8 | 2024-06-07 01:38:33.440578-04
99049 | 7534 | 4350 | penumbra.core.component.sct.v1.EventCommitment | 7534 | 426939 | penumbra-testnet-deimos-8 | 2024-06-07 01:38:33.440578-04
99050 | 7534 | 4350 | penumbra.core.component.shielded_pool.v1.EventOutput | 7534 | 426939 | penumbra-testnet-deimos-8 | 2024-06-07 01:38:33.440578-04
99051 | 7534 | 4350 | penumbra.core.component.sct.v1.EventCommitment | 7534 | 426939 | penumbra-testnet-deimos-8 | 2024-06-07 01:38:33.440578-04
99052 | 7534 | 4350 | penumbra.core.component.shielded_pool.v1.EventOutput | 7534 | 426939 | penumbra-testnet-deimos-8 | 2024-06-07 01:38:33.440578-04
99053 | 7534 | 4350 | penumbra.core.component.shielded_pool.v1.EventBroadcastClue | 7534 | 426939 | penumbra-testnet-deimos-8 | 2024-06-07 01:38:33.440578-04
99054 | 7534 | 4350 | penumbra.core.component.shielded_pool.v1.EventBroadcastClue | 7534 | 426939 | penumbra-testnet-deimos-8 | 2024-06-07 01:38:33.440578-04
99055 | 7534 | 4351 | tx | 7534 | 426939 | penumbra-testnet-deimos-8 | 2024-06-07 01:38:33.440578-04
99056 | 7534 | 4351 | tx | 7534 | 426939 | penumbra-testnet-deimos-8 | 2024-06-07 01:38:33.440578-04
99057 | 7534 | 4351 | penumbra.core.component.shielded_pool.v1.EventSpend | 7534 | 426939 | penumbra-testnet-deimos-8 | 2024-06-07 01:38:33.440578-04
99058 | 7534 | 4351 | penumbra.core.component.sct.v1.EventCommitment | 7534 | 426939 | penumbra-testnet-deimos-8 | 2024-06-07 01:38:33.440578-04
99059 | 7534 | 4351 | penumbra.core.component.shielded_pool.v1.EventOutput | 7534 | 426939 | penumbra-testnet-deimos-8 | 2024-06-07 01:38:33.440578-04
99060 | 7534 | 4351 | penumbra.core.component.sct.v1.EventCommitment | 7534 | 426939 | penumbra-testnet-deimos-8 | 2024-06-07 01:38:33.440578-04
99061 | 7534 | 4351 | penumbra.core.component.shielded_pool.v1.EventOutput | 7534 | 426939 | penumbra-testnet-deimos-8 | 2024-06-07 01:38:33.440578-04
99062 | 7534 | 4351 | penumbra.core.component.shielded_pool.v1.EventBroadcastClue | 7534 | 426939 | penumbra-testnet-deimos-8 | 2024-06-07 01:38:33.440578-04
99063 | 7534 | 4351 | penumbra.core.component.shielded_pool.v1.EventBroadcastClue | 7534 | 426939 | penumbra-testnet-deimos-8 | 2024-06-07 01:38:33.440578-04
(21 rows)
```

example invocations
```
cargo run --bin pindexer -- -s "postgresql://localhost:5432/testnet_raw?sslmode=disable" -d "postgresql://localhost:5432/testnet_compiled?sslmode=disable"
```
```
cargo run --example fmd_clues -- -s "postgresql://localhost:5432/testnet_raw?sslmode=disable" -d "postgresql://localhost:5432/testnet_compiled?sslmode=disable"
```
79 changes: 79 additions & 0 deletions crates/util/cometindex/examples/fmd_clues.rs
cratelyn marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
use anyhow::Result;
use cometindex::{async_trait, AppView, ContextualizedEvent, Indexer, PgTransaction};

// This example is silly because it doesn't do any "compilation" of the raw
// events, so it's only useful as an example of exercising the harness and the
// intended usage: the _downstream_ crate depends on cometindex (generic over
// any event) and has its own app specific logic. But it doesn't have to
// reimplement the binary handling / arg parsing / etc

#[derive(Debug)]
struct FmdCluesExample {}

#[async_trait]
impl AppView for FmdCluesExample {
async fn init_chain(&self, dbtx: &mut PgTransaction) -> Result<(), anyhow::Error> {
sqlx::query(
"
CREATE TABLE IF NOT EXISTS fmd_clues_example (
id SERIAL PRIMARY KEY,
tx_hash BYTEA NOT NULL,
fmd_clue VARCHAR NOT NULL
);
",
)
.execute(dbtx.as_mut())
.await?;
Ok(())
}

fn is_relevant(&self, type_str: &str) -> bool {
type_str == "penumbra.core.component.shielded_pool.v1.EventBroadcastClue"
}

async fn index_event(
&self,
dbtx: &mut PgTransaction,
event: &ContextualizedEvent,
) -> Result<(), anyhow::Error> {
// this is just an example in the integration tests, so we don't want to do any
// - queries against existing table state
// - parsing of the event data into structured data
// - computations of derived data
// but these should all be possible
let clue = event
.event
.attributes
.iter()
.find(|attr| attr.key == "clue")
.expect("fmd_clue attribute not found")
.value
.clone();
let tx_hash = event.tx_hash.as_ref().expect("tx_hash not found").to_vec();

sqlx::query(
"
INSERT INTO fmd_clues (tx_hash, fmd_clue)
VALUES ($1, $2)
",
)
.bind(&tx_hash)
.bind(&clue)
.execute(dbtx.as_mut())
.await?;

Ok(())
}
}

#[tokio::main]
async fn main() -> Result<()> {
Indexer::new()
.with_default_tracing()
// add as many indexers as you want
.with_index(FmdCluesExample {})
.run()
.await?;

Ok(())
}
19 changes: 19 additions & 0 deletions crates/util/cometindex/src/contextualized.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
use tendermint::abci::Event;

#[derive(Debug)]
pub struct ContextualizedEvent {
pub event: Event,
pub block_height: u64,
pub tx_hash: Option<[u8; 32]>,
/// The rowid of the event in the local database.
///
/// Note that this is a purely local identifier and won't be the same across
/// different event databases.
pub local_rowid: i64,
}

impl AsRef<Event> for ContextualizedEvent {
fn as_ref(&self) -> &Event {
&self.event
}
}
1 change: 1 addition & 0 deletions crates/util/cometindex/src/engine.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@

20 changes: 20 additions & 0 deletions crates/util/cometindex/src/index.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
use async_trait::async_trait;
use sqlx::{Postgres, Transaction};

use crate::ContextualizedEvent;

pub type PgTransaction<'a> = Transaction<'a, Postgres>;

/// Represents a specific index of raw event data.
#[async_trait]
pub trait AppView: std::fmt::Debug {
async fn init_chain(&self, dbtx: &mut PgTransaction) -> Result<(), anyhow::Error>;

fn is_relevant(&self, type_str: &str) -> bool;

async fn index_event(
&self,
dbtx: &mut PgTransaction,
event: &ContextualizedEvent,
) -> Result<(), anyhow::Error>;
}
Loading
Loading