diff --git a/blockstore/src/block_stores/postgres/postgres_mappings.rs b/blockstore/src/block_stores/postgres/postgres_mappings.rs index 40c34a11..0b29a24d 100644 --- a/blockstore/src/block_stores/postgres/postgres_mappings.rs +++ b/blockstore/src/block_stores/postgres/postgres_mappings.rs @@ -35,6 +35,7 @@ pub fn build_create_transaction_mapping_table_statement(epoch: EpochRef) -> Stri } // note: sigantures might contain duplicates but that's quite rare and can be ignored for transactions +// TODO return &str pub async fn perform_transaction_mapping(postgres_session: &PostgresSession, epoch: EpochRef, signatures: &[&str]) -> anyhow::Result> { let started_at = Instant::now(); let schema = PostgresEpoch::build_schema_name(epoch); @@ -42,23 +43,25 @@ pub async fn perform_transaction_mapping(postgres_session: &PostgresSession, epo r#" WITH requested_sigs AS ( - SELECT signature from unnest($1::text[]) tab(signature) + SELECT signature, row_number() over()::smallint as idx FROM unnest($1::text[]) tab(signature) ), existed AS ( - SELECT * FROM {schema}.transaction_ids + SELECT signature, transaction_id, idx FROM {schema}.transaction_ids INNER JOIN requested_sigs USING(signature) ), inserted AS ( INSERT INTO {schema}.transaction_ids(signature) SELECT signature FROM requested_sigs - ON CONFLICT DO NOTHING - RETURNING * + EXCEPT + SELECT signature from existed + RETURNING transaction_id, signature ) - SELECT transaction_id, signature FROM inserted + SELECT transaction_id, idx FROM inserted + INNER JOIN requested_sigs USING(signature) UNION ALL - SELECT transaction_id, signature FROM existed + SELECT transaction_id, idx FROM existed "#, schema = schema ); @@ -68,7 +71,9 @@ pub async fn perform_transaction_mapping(postgres_session: &PostgresSession, epo let mapping_pairs = mappings.iter() .map(|row| { let tx_id: i32 = row.get(0); - let tx_sig: String = row.get(1); + let idx: i16 = row.get(1); // 1-based index + assert!(idx < 20000, "idx is 16 bit - might overflow soon"); + let tx_sig = signatures[(idx - 1) as usize].to_string(); (tx_sig, tx_id) }); @@ -116,23 +121,25 @@ pub async fn perform_account_mapping(postgres_session: &PostgresSession, epoch: r#" WITH requested_account_keys AS ( - SELECT account_key from unnest($1::text[]) tab(account_key) + SELECT account_key, row_number() over()::smallint as idx FROM unnest($1::text[]) tab(account_key) ), existed AS ( - SELECT * FROM {schema}.account_ids + SELECT account_key, acc_id, idx FROM {schema}.account_ids INNER JOIN requested_account_keys USING(account_key) ), inserted AS ( INSERT INTO {schema}.account_ids(account_key) SELECT account_key FROM requested_account_keys - ON CONFLICT DO NOTHING - RETURNING * + EXCEPT + SELECT account_key from existed + RETURNING account_key, acc_id ) - SELECT acc_id, account_key FROM inserted + SELECT acc_id, idx FROM inserted + INNER JOIN requested_account_keys USING(account_key) UNION ALL - SELECT acc_id, account_key FROM existed + SELECT acc_id, idx FROM existed "#, schema = schema ); @@ -142,8 +149,10 @@ pub async fn perform_account_mapping(postgres_session: &PostgresSession, epoch: let mapping_pairs = mappings.iter() .map(|row| { let acc_id: i64 = row.get(0); - let account_key: String = row.get(1); - (account_key, acc_id) + let idx: i16 = row.get(1); // 1-based index + assert!(idx < 20000, "idx is 16 bit - might overflow soon"); + let tx_sig = account_keys[(idx - 1) as usize].to_string(); + (tx_sig, acc_id) }); // pubkey <-> acc_id @@ -188,23 +197,25 @@ pub async fn perform_blockhash_mapping(postgres_session: &PostgresSession, epoch r#" WITH requested_blockhashes AS ( - SELECT blockhash from unnest($1::text[]) tab(blockhash) + SELECT blockhash, row_number() over()::smallint as idx FROM unnest($1::text[]) tab(blockhash) ), existed AS ( - SELECT * FROM {schema}.blockhash_ids + SELECT blockhash, blockhash_id, idx FROM {schema}.blockhash_ids INNER JOIN requested_blockhashes USING(blockhash) ), inserted AS ( INSERT INTO {schema}.blockhash_ids(blockhash) - SELECT blockhash from requested_blockhashes - ON CONFLICT DO NOTHING - RETURNING * + SELECT blockhash FROM requested_blockhashes + EXCEPT + SELECT blockhash from existed + RETURNING blockhash_id, blockhash ) - SELECT blockhash_id, blockhash FROM inserted + SELECT blockhash_id, idx FROM inserted + INNER JOIN requested_blockhashes USING(blockhash) UNION ALL - SELECT blockhash_id, blockhash FROM existed + SELECT blockhash_id, idx FROM existed "#, schema = schema ); @@ -214,8 +225,10 @@ pub async fn perform_blockhash_mapping(postgres_session: &PostgresSession, epoch let mapping_pairs = mappings.iter() .map(|row| { let blockhash_id: i32 = row.get(0); - let blockhash: String = row.get(1); - (blockhash, blockhash_id) + let idx: i16 = row.get(1); // 1-based index + assert!(idx < 20000, "idx is 16 bit - might overflow soon"); + let tx_sig = blockhashes[(idx - 1) as usize].to_string(); + (tx_sig, blockhash_id) }); // blockhash <-> blockhash_id