Skip to content

Commit

Permalink
Refactor events models (#699)
Browse files Browse the repository at this point in the history
rtso authored Feb 1, 2025

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature.
1 parent 32af360 commit 89e0159
Showing 20 changed files with 186 additions and 493 deletions.
6 changes: 3 additions & 3 deletions rust/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion rust/Cargo.toml
Original file line number Diff line number Diff line change
@@ -31,7 +31,7 @@ anyhow = "1.0.86"
aptos-indexer-processor-sdk = { git = "https://github.com/aptos-labs/aptos-indexer-processor-sdk.git", rev = "b79ed8b5864b2a12a1f9c5fd01579462e029b2ae" }
aptos-indexer-processor-sdk-server-framework = { git = "https://github.com/aptos-labs/aptos-indexer-processor-sdk.git", rev = "b79ed8b5864b2a12a1f9c5fd01579462e029b2ae" }
aptos-protos = { git = "https://github.com/aptos-labs/aptos-core.git", rev = "1d8460a995503574ec4e9699d3442d0150d7f3b9" }
aptos-indexer-test-transactions = { git = "https://github.com/aptos-labs/aptos-core.git", rev = "63eb8945a86521bc0fb0e26b306693e11327c85d" }
aptos-indexer-test-transactions = { git = "https://github.com/aptos-labs/aptos-core.git", rev = "b91d6bfe955e0df12927ae06f44c9763f5f016d3" }
aptos-indexer-testing-framework = { git = "https://github.com/aptos-labs/aptos-indexer-processor-sdk.git", rev = "b79ed8b5864b2a12a1f9c5fd01579462e029b2ae" }
async-trait = "0.1.53"
backtrace = "0.3.58"
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
[]
3 changes: 1 addition & 2 deletions rust/integration-tests/src/models/events_models.rs
Original file line number Diff line number Diff line change
@@ -10,8 +10,7 @@ use serde::{Deserialize, Serialize};
/**
* Event model
* this is created // b/c there is inserated_at field which isn't defined in the Event struct, we can't just load the events directly without specifying the fields.
// TODO: make this more generic to load all fields, then we should be able to run tests for all processor in one test case.
* TODO: make this more generic to load all fields, then we should be able to run tests for all processor in one test case.
*/
#[derive(Clone, Debug, Deserialize, FieldCount, Identifiable, Insertable, Serialize, Queryable)]
#[diesel(primary_key(transaction_version, event_index))]
18 changes: 18 additions & 0 deletions rust/integration-tests/src/sdk_tests/events_processor_tests.rs
Original file line number Diff line number Diff line change
@@ -52,6 +52,7 @@ mod tests {
IMPORTED_DEVNET_TXNS_78753811_COIN_TRANSFER_WITH_V2_EVENTS,
IMPORTED_DEVNET_TXNS_78753831_TOKEN_V1_MINT_TRANSFER_WITH_V2_EVENTS,
IMPORTED_DEVNET_TXNS_78753832_TOKEN_V2_MINT_TRANSFER_WITH_V2_EVENTS,
IMPORTED_MAINNET_TXNS_554229017_EVENTS_WITH_NO_EVENT_SIZE_INFO,
IMPORTED_TESTNET_TXNS_1255836496_V2_FA_METADATA_, IMPORTED_TESTNET_TXNS_1_GENESIS,
IMPORTED_TESTNET_TXNS_278556781_V1_COIN_REGISTER_FA_METADATA,
IMPORTED_TESTNET_TXNS_2_NEW_BLOCK_EVENT, IMPORTED_TESTNET_TXNS_3_EMPTY_TXN,
@@ -151,6 +152,19 @@ mod tests {
.await;
}

// This is a test for the validator txn with missing events
// This happens because we did not fully backfill validator txn events
// so GRPC can return a txn with event size info but no events
// We expect no events parsed
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn mainnet_events_processor_validator_txn_missing_events() {
process_single_mainnet_event_txn(
IMPORTED_MAINNET_TXNS_554229017_EVENTS_WITH_NO_EVENT_SIZE_INFO, // this is misnamed, but it's the validatortxn with missing events
Some("validator_txn_missing_events".to_string()),
)
.await;
}

// Example 2: Test for multiple transactions handling
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn testnet_events_processor_db_output_scenario_testing() {
@@ -218,6 +232,10 @@ mod tests {
process_single_event_txn(txn, test_case_name, "imported_testnet_txns").await
}

async fn process_single_mainnet_event_txn(txn: &[u8], test_case_name: Option<String>) {
process_single_event_txn(txn, test_case_name, "imported_mainnet_txns").await
}

// Helper function to abstract out the single transaction processing
async fn process_single_event_txn(
txn: &[u8],
72 changes: 64 additions & 8 deletions rust/processor/src/db/common/models/event_models/raw_events.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
use crate::utils::util::{standardize_address, truncate_str};
use aptos_protos::transaction::v1::{Event as EventPB, EventSizeInfo};
use crate::utils::{
counters::PROCESSOR_UNKNOWN_TYPE_COUNT,
util::{parse_timestamp, standardize_address, truncate_str},
};
use aptos_protos::transaction::v1::{
transaction::TxnData, Event as EventPB, EventSizeInfo, Transaction,
};
use serde::{Deserialize, Serialize};

use tracing::warn;
/// P99 currently is 303 so using 300 as a safe max length
pub const EVENT_TYPE_MAX_LENGTH: usize = 300;

@@ -21,12 +26,8 @@ pub struct RawEvent {
pub total_bytes: Option<i64>,
}

pub trait EventConvertible {
fn from_raw(raw_item: &RawEvent) -> Self;
}

impl RawEvent {
pub fn from_raw_event(
fn from_event(
event: &EventPB,
txn_version: i64,
txn_block_height: i64,
@@ -56,3 +57,58 @@ impl RawEvent {
}
}
}

pub fn parse_events(txn: &Transaction, processor_name: &str) -> Vec<RawEvent> {
let txn_version = txn.version as i64;
let block_height = txn.block_height as i64;
let block_timestamp = parse_timestamp(txn.timestamp.as_ref().unwrap(), txn_version);
let size_info = match txn.size_info.as_ref() {
Some(size_info) => Some(size_info),
None => {
warn!(version = txn.version, "Transaction size info not found");
None
},
};
let txn_data = match txn.txn_data.as_ref() {
Some(data) => data,
None => {
warn!(
transaction_version = txn_version,
"Transaction data doesn't exist"
);
PROCESSOR_UNKNOWN_TYPE_COUNT
.with_label_values(&[processor_name])
.inc();
return vec![];
},
};
let default = vec![];
let raw_events = match txn_data {
TxnData::BlockMetadata(tx_inner) => &tx_inner.events,
TxnData::Genesis(tx_inner) => &tx_inner.events,
TxnData::User(tx_inner) => &tx_inner.events,
TxnData::Validator(tx_inner) => &tx_inner.events,
_ => &default,
};

let event_size_info = size_info.map(|info| info.event_size_info.as_slice());

raw_events
.iter()
.enumerate()
.map(|(index, event)| {
// event_size_info will be used for user transactions only, no promises for other transactions.
// If event_size_info is missing due, it defaults to 0.
// No need to backfill as event_size_info is primarily for debugging user transactions.
let size_info = event_size_info.and_then(|infos| infos.get(index));
RawEvent::from_event(
event,
txn_version,
block_height,
index as i64,
size_info,
Some(block_timestamp),
)
})
.collect::<Vec<RawEvent>>()
}
Original file line number Diff line number Diff line change
@@ -183,6 +183,8 @@ impl Transaction {
.expect("Txn Timestamp is invalid!");

let txn_size_info = transaction.size_info.as_ref();
let len_event_size_info =
txn_size_info.map(|size_info| size_info.event_size_info.len() as i64);

match txn_data {
TxnData::User(user_txn) => {
@@ -204,6 +206,10 @@ impl Transaction {
},
None => (None, None),
};
// For some older validator transactions, GRPC stubs out the raw events.
// We use the event size info as the source of truth to determine the number of events in the transaction.
// This inconsistency should only be temporary until all transaction events are backfilled in GRPC.
let num_events = len_event_size_info.unwrap_or(user_txn.events.len() as i64);

let serialized_payload =
payload_cleaned.map(|payload| canonical_json::to_string(&payload).unwrap());
@@ -214,7 +220,7 @@ impl Transaction {
payload_type,
txn_version,
transaction_type,
user_txn.events.len() as i64,
num_events,
block_height,
epoch,
block_timestamp,
@@ -239,14 +245,15 @@ impl Transaction {
payload_cleaned.map(|payload| canonical_json::to_string(&payload).unwrap());

let payload_type = None;
let num_events = len_event_size_info.unwrap_or(genesis_txn.events.len() as i64);
(
Self::from_transaction_info_with_data(
transaction_info,
serialized_payload,
payload_type,
txn_version,
transaction_type,
genesis_txn.events.len() as i64,
num_events,
block_height,
epoch,
block_timestamp,
@@ -263,14 +270,16 @@ impl Transaction {
block_height,
block_timestamp,
);
let num_events =
len_event_size_info.unwrap_or(block_metadata_txn.events.len() as i64);
(
Self::from_transaction_info_with_data(
transaction_info,
None,
None,
txn_version,
transaction_type,
block_metadata_txn.events.len() as i64,
num_events,
block_height,
epoch,
block_timestamp,
@@ -303,14 +312,15 @@ impl Transaction {
block_height,
block_timestamp,
);
let num_events = len_event_size_info.unwrap_or(inner.events.len() as i64);
(
Self::from_transaction_info_with_data(
transaction_info,
None,
None,
txn_version,
transaction_type,
inner.events.len() as i64,
num_events,
block_height,
epoch,
block_timestamp,
Loading

0 comments on commit 89e0159

Please sign in to comment.