Skip to content

Commit

Permalink
Add DB Schema Config (#27)
Browse files Browse the repository at this point in the history
  • Loading branch information
bh2smith authored Jan 22, 2024
1 parent 3c90792 commit 10c2a47
Show file tree
Hide file tree
Showing 5 changed files with 41 additions and 14 deletions.
4 changes: 4 additions & 0 deletions arak.example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@ ethrpc = "http://localhost:8545"
[database.sqlite]
connection = "file:arak.db"

#[database.postgres]
#connection = "postgresql://postgres:postgres@localhost:5432/arak"
#schema = "local"

[[event]]
name = "cowprotocol_settlements"
start = 12593265
Expand Down
3 changes: 3 additions & 0 deletions kubernetes/config/configmap.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ metadata:
namespace: {{ .Namespace }}
data:
arak.toml: |
[database.postgres]
schema = "mainnet"
[indexer]
page-size = 200
Expand Down
2 changes: 1 addition & 1 deletion src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ pub struct Config {
#[serde(rename_all = "kebab-case")]
pub enum Database {
Sqlite { connection: String },
Postgres { connection: String },
Postgres { connection: String, schema: String },
}

#[derive(Debug, Deserialize)]
Expand Down
38 changes: 27 additions & 11 deletions src/database/postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,18 @@ async fn connect(params: &str) -> Result<tokio_postgres::Client> {
}

impl Postgres {
pub async fn connect(params: &str) -> Result<Self> {
tracing::debug!("opening postgres database");
let client = connect(params).await.context("connect")?;
pub async fn connect(connection_str: &str, schema: &str) -> Result<Self> {
tracing::debug!("opening postgres database with schema {schema}");
let client = connect(connection_str).await.context("connect")?;
// create and set db schema
client
.execute(&format!("CREATE SCHEMA IF NOT EXISTS {schema};"), &[])
.await
.context("create schema")?;
client
.execute(&format!("SET search_path TO {schema};"), &[])
.await
.context("set schema")?;
client
.execute(CREATE_EVENT_BLOCK_TABLE, &[])
.await
Expand Down Expand Up @@ -578,7 +587,6 @@ const INSERT_TRANSACTION: &str = r#"INSERT INTO transactions (block_number, inde
ON CONFLICT DO NOTHING;"#;

const REMOVE_TRANSACTIONS_FROM: &str = "DELETE FROM transactions WHERE block_number >=$1;";

const CREATE_EVENT_BLOCK_TABLE: &str = "CREATE TABLE IF NOT EXISTS _event_block(event TEXT \
PRIMARY KEY NOT NULL, indexed BIGINT NOT NULL, finalized \
BIGINT NOT NULL);";
Expand Down Expand Up @@ -633,12 +641,22 @@ mod tests {
},
};

fn local_postgres_url() -> String {
"postgresql://postgres@localhost".to_string()
fn test_setup() -> (String, String) {
(
"postgresql://postgres@localhost".to_string(),
"local".to_string(),
)
}

async fn empty_db() -> Postgres {
clear_database().await;
let (db_url, schema) = test_setup();
Postgres::connect(&db_url, &schema).await.unwrap()
}

async fn clear_database() {
let client = connect(&local_postgres_url()).await.unwrap();
let (db_url, _) = test_setup();
let client = connect(&db_url).await.unwrap();
// https://stackoverflow.com/a/36023359
let query = r#"
DO $$ DECLARE
Expand All @@ -656,8 +674,7 @@ mod tests {
#[ignore]
#[tokio::test]
async fn large_number() {
clear_database().await;
let mut db = Postgres::connect(&local_postgres_url()).await.unwrap();
let mut db = empty_db().await;
let event = r#"
event Event (
uint256,
Expand All @@ -681,8 +698,7 @@ mod tests {
#[ignore]
#[tokio::test]
async fn boolean_and_text_fields() {
clear_database().await;
let mut db = Postgres::connect(&local_postgres_url()).await.unwrap();
let mut db = empty_db().await;
let event = r#"
event Event (
bool,
Expand Down
8 changes: 6 additions & 2 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,12 @@ async fn main() -> Result<()> {
config::Database::Sqlite { connection } => {
run_indexer(&config, database::Sqlite::open(connection)?).await?;
}
config::Database::Postgres { connection } => {
run_indexer(&config, database::Postgres::connect(connection).await?).await?;
config::Database::Postgres { connection, schema } => {
run_indexer(
&config,
database::Postgres::connect(connection, schema).await?,
)
.await?;
}
}

Expand Down

0 comments on commit 10c2a47

Please sign in to comment.