Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make pindexer more easily embeddable as a library #4712

Merged
merged 7 commits into from
Jul 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

23 changes: 12 additions & 11 deletions crates/bin/pindexer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,16 @@ publish = false
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
anyhow = {workspace = true}
clap = {workspace = true}
cometindex = {workspace = true}
penumbra-shielded-pool = {workspace = true, default-features = false}
penumbra-stake = {workspace = true, default-features = false}
penumbra-app = {workspace = true, default-features = false}
penumbra-num = {workspace = true, default-features = false}
penumbra-asset = {workspace = true, default-features = false}
penumbra-proto = {workspace = true, default-features = false}
tokio = {workspace = true, features = ["full"]}
serde_json = {workspace = true}
sqlx = { workspace = true, features = ["chrono"] }
cometindex = { workspace = true }
penumbra-shielded-pool = { workspace = true, default-features = false }
penumbra-stake = { workspace = true, default-features = false }
penumbra-app = { workspace = true, default-features = false }
penumbra-num = { workspace = true, default-features = false }
penumbra-asset = { workspace = true, default-features = false }
penumbra-proto = { workspace = true, default-features = false }
tokio = { workspace = true, features = ["full"] }
anyhow = { workspace = true }
serde_json = { workspace = true }
tracing = { workspace = true }
tracing = {workspace = true}
3 changes: 2 additions & 1 deletion crates/bin/pindexer/src/block.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use cometindex::{async_trait, sqlx, AppView, ContextualizedEvent, PgTransaction};
use penumbra_proto::{core::component::sct::v1 as pb, event::ProtoEvent};
use sqlx::types::chrono::DateTime;
use sqlx::{types::chrono::DateTime, PgPool};

#[derive(Debug)]
pub struct Block {}
Expand Down Expand Up @@ -36,6 +36,7 @@ CREATE TABLE IF NOT EXISTS block_details (
&self,
dbtx: &mut PgTransaction,
event: &ContextualizedEvent,
_src_db: &PgPool,
) -> Result<(), anyhow::Error> {
let pe = pb::EventBlockRoot::from_event(event.as_ref())?;
let timestamp = pe.timestamp.expect("Block has no timestamp");
Expand Down
2 changes: 1 addition & 1 deletion crates/bin/pindexer/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
pub use cometindex::{AppView, Indexer};
pub use cometindex::{opt::Options, AppView, ContextualizedEvent, Indexer, PgPool, PgTransaction};

mod indexer_ext;
pub use indexer_ext::IndexerExt;
Expand Down
5 changes: 3 additions & 2 deletions crates/bin/pindexer/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
use anyhow::Result;
use clap::Parser as _;
use pindexer::block::Block;
use pindexer::{Indexer, IndexerExt as _};
use pindexer::{Indexer, IndexerExt as _, Options};

#[tokio::main]
async fn main() -> Result<()> {
Indexer::new()
Indexer::new(Options::parse())
.with_default_tracing()
.with_default_penumbra_app_views()
.with_index(Block {})
Expand Down
3 changes: 2 additions & 1 deletion crates/bin/pindexer/src/shielded_pool/fmd.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use cometindex::{async_trait, sqlx, AppView, ContextualizedEvent, PgTransaction};
use cometindex::{async_trait, sqlx, AppView, ContextualizedEvent, PgPool, PgTransaction};
use penumbra_proto::{core::component::shielded_pool::v1 as pb, event::ProtoEvent};

#[derive(Debug)]
Expand Down Expand Up @@ -34,6 +34,7 @@ CREATE TABLE IF NOT EXISTS shielded_pool_fmd_clue_set (
&self,
dbtx: &mut PgTransaction,
event: &ContextualizedEvent,
_src_db: &PgPool,
) -> Result<(), anyhow::Error> {
let pe = pb::EventBroadcastClue::from_event(event.as_ref())?;

Expand Down
3 changes: 2 additions & 1 deletion crates/bin/pindexer/src/stake/delegation_txs.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use anyhow::{anyhow, Result};
use cometindex::{async_trait, sqlx, AppView, ContextualizedEvent, PgTransaction};
use cometindex::{async_trait, sqlx, AppView, ContextualizedEvent, PgPool, PgTransaction};
use penumbra_num::Amount;
use penumbra_proto::{core::component::stake::v1 as pb, event::ProtoEvent};

Expand Down Expand Up @@ -54,6 +54,7 @@ impl AppView for DelegationTxs {
&self,
dbtx: &mut PgTransaction,
event: &ContextualizedEvent,
_src_db: &PgPool,
) -> Result<()> {
let pe = pb::EventDelegate::from_event(event.as_ref())?;

Expand Down
3 changes: 2 additions & 1 deletion crates/bin/pindexer/src/stake/missed_blocks.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use anyhow::Result;
use cometindex::{async_trait, sqlx, AppView, ContextualizedEvent, PgTransaction};
use cometindex::{async_trait, sqlx, AppView, ContextualizedEvent, PgPool, PgTransaction};

use penumbra_proto::{core::component::stake::v1 as pb, event::ProtoEvent};

Expand Down Expand Up @@ -52,6 +52,7 @@ impl AppView for MissedBlocks {
&self,
dbtx: &mut PgTransaction,
event: &ContextualizedEvent,
_src_db: &PgPool,
) -> Result<(), anyhow::Error> {
let pe = pb::EventValidatorMissedBlock::from_event(event.as_ref())?;
let ik_bytes = pe
Expand Down
3 changes: 2 additions & 1 deletion crates/bin/pindexer/src/stake/slashings.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use anyhow::{anyhow, Result};
use cometindex::{async_trait, sqlx, AppView, ContextualizedEvent, PgTransaction};
use cometindex::{async_trait, sqlx, AppView, ContextualizedEvent, PgPool, PgTransaction};

use penumbra_proto::{core::component::stake::v1 as pb, event::ProtoEvent};
use penumbra_stake::IdentityKey;
Expand Down Expand Up @@ -52,6 +52,7 @@ impl AppView for Slashings {
&self,
dbtx: &mut PgTransaction,
event: &ContextualizedEvent,
_src_db: &PgPool,
) -> Result<(), anyhow::Error> {
let pe = pb::EventSlashingPenaltyApplied::from_event(event.as_ref())?;
let ik = IdentityKey::try_from(
Expand Down
3 changes: 2 additions & 1 deletion crates/bin/pindexer/src/stake/undelegation_txs.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use anyhow::{anyhow, Result};
use cometindex::{async_trait, sqlx, AppView, ContextualizedEvent, PgTransaction};
use cometindex::{async_trait, sqlx, AppView, ContextualizedEvent, PgPool, PgTransaction};
use penumbra_num::Amount;
use penumbra_proto::{core::component::stake::v1 as pb, event::ProtoEvent};

Expand Down Expand Up @@ -54,6 +54,7 @@ impl AppView for UndelegationTxs {
&self,
dbtx: &mut PgTransaction,
event: &ContextualizedEvent,
_src_db: &PgPool,
) -> Result<()> {
let pe = pb::EventUndelegate::from_event(event.as_ref())?;

Expand Down
3 changes: 2 additions & 1 deletion crates/bin/pindexer/src/stake/validator_set.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::collections::BTreeMap;

use anyhow::{anyhow, Context, Result};
use cometindex::{async_trait, sqlx, AppView, ContextualizedEvent, PgTransaction};
use cometindex::{async_trait, sqlx, AppView, ContextualizedEvent, PgPool, PgTransaction};

use penumbra_app::genesis::AppState;
use penumbra_asset::asset;
Expand Down Expand Up @@ -68,6 +68,7 @@ impl AppView for ValidatorSet {
&self,
dbtx: &mut PgTransaction,
event: &ContextualizedEvent,
_src_db: &PgPool,
) -> Result<(), anyhow::Error> {
match event.event.kind.as_str() {
"penumbra.core.component.stake.v1.EventValidatorDefinitionUpload" => {
Expand Down
7 changes: 5 additions & 2 deletions crates/util/cometindex/examples/fmd_clues.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use anyhow::Result;
use cometindex::{async_trait, AppView, ContextualizedEvent, Indexer, PgTransaction};
use clap::Parser;
use cometindex::{async_trait, opt::Options, AppView, ContextualizedEvent, Indexer, PgTransaction};
use sqlx::PgPool;

// This example is silly because it doesn't do any "compilation" of the raw
// events, so it's only useful as an example of exercising the harness and the
Expand Down Expand Up @@ -39,6 +41,7 @@ CREATE TABLE IF NOT EXISTS fmd_clues_example (
&self,
dbtx: &mut PgTransaction,
event: &ContextualizedEvent,
_src_db: &PgPool,
) -> Result<(), anyhow::Error> {
// this is just an example in the integration tests, so we don't want to do any
// - queries against existing table state
Expand Down Expand Up @@ -72,7 +75,7 @@ CREATE TABLE IF NOT EXISTS fmd_clues_example (

#[tokio::main]
async fn main() -> Result<()> {
Indexer::new()
Indexer::new(Options::parse())
.with_default_tracing()
// add as many indexers as you want
.with_index(FmdCluesExample {})
Expand Down
2 changes: 2 additions & 0 deletions crates/util/cometindex/src/index.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use async_trait::async_trait;
pub use sqlx::PgPool;
use sqlx::{Postgres, Transaction};

use crate::ContextualizedEvent;
Expand All @@ -20,5 +21,6 @@ pub trait AppView: std::fmt::Debug {
&self,
dbtx: &mut PgTransaction,
event: &ContextualizedEvent,
src_db: &PgPool,
) -> Result<(), anyhow::Error>;
}
26 changes: 20 additions & 6 deletions crates/util/cometindex/src/indexer.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
use std::pin::Pin;

use anyhow::{Context as _, Result};
use clap::Parser;
use futures::{Stream, StreamExt, TryStreamExt};
use sqlx::PgPool;
use sqlx::{postgres::PgPoolOptions, PgPool};
use tap::{Tap, TapFallible, TapOptional};
use tendermint::abci;
use tracing::{debug, info};
Expand All @@ -16,9 +15,9 @@ pub struct Indexer {
}

impl Indexer {
pub fn new() -> Self {
pub fn new(opts: Options) -> Self {
Self {
opts: Options::parse(),
opts,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure this is desirable -- the original API was structured that way so that downstream consumers didn't have control over the CLI. Everyone gets the same binary, the only difference is which AppViews are hooked into it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you want to hook in different AppViews you have to consumer pindexer as a library, in which case having control over your CLI seems much better to me, and doesn't cause odd spooky behavior.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't fully understand the disagreement about exposing CLI opts: in the interest of making pindexer more accessible, e.g. via importing into other rust code, I'm biased toward making this change and dealing with the consequences later.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this change should be rolled back, it's an explicit design goal to not allow control over the CLI, so that every Penumbra indexer has the same interface.

indexes: Vec::new(),
}
}
Expand Down Expand Up @@ -60,7 +59,22 @@ impl Indexer {
indexes,
} = self;

let src_db = PgPool::connect(&src_database_url).await?;
// Create a source db, with, for sanity, some read only settings.
// These will be overrideable by a consumer who knows what they're doing,
// but prevents basic mistakes.
// c.f. https://github.com/launchbadge/sqlx/issues/481#issuecomment-727011811
let src_db = PgPoolOptions::new()
.after_connect(|conn, _| {
Box::pin(async move {
sqlx::query("SET SESSION CHARACTERISTICS AS TRANSACTION READ ONLY;")
.execute(conn)
.await?;
Ok(())
})
})
.connect(&src_database_url)
.await?;

let dst_db = PgPool::connect(&dst_database_url).await?;

// Check if the destination db is initialized
Expand Down Expand Up @@ -169,7 +183,7 @@ impl Indexer {
for index in indexes {
if index.is_relevant(&event.as_ref().kind) {
tracing::debug!(?event, ?index, "relevant to index");
index.index_event(&mut dbtx, &event).await?;
index.index_event(&mut dbtx, &event, &src_db).await?;
}
}
// Mark that we got to at least this event
Expand Down
2 changes: 1 addition & 1 deletion crates/util/cometindex/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ pub mod indexer;
pub mod opt;

pub use contextualized::ContextualizedEvent;
pub use index::{AppView, PgTransaction};
pub use index::{AppView, PgPool, PgTransaction};
pub use indexer::Indexer;

pub use async_trait::async_trait;
Expand Down
2 changes: 1 addition & 1 deletion crates/util/cometindex/src/opt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use anyhow::{Error, Result};
use clap::Parser;

/// This struct represents the command-line options
#[derive(Debug, Parser)]
#[derive(Clone, Debug, Parser)]
#[clap(
name = "cometindex",
about = "processes raw events emitted by cometbft applications",
Expand Down
Loading