Skip to content

Commit

Permalink
feat: add invalid-indexes error checking to the advance-runner
Browse files Browse the repository at this point in the history
renan061 committed Jul 18, 2024
1 parent c83eb1d commit b1a9dd2
Showing 5 changed files with 119 additions and 19 deletions.
109 changes: 103 additions & 6 deletions offchain/advance-runner/src/broker.rs
Original file line number Diff line number Diff line change
@@ -13,6 +13,13 @@ pub enum BrokerFacadeError {
#[snafu(display("broker internal error"))]
BrokerInternalError { source: BrokerError },

#[snafu(display(
"expected first_index from claim to be {}, but got {}",
expected,
got
))]
InvalidIndexes { expected: u128, got: u128 },

#[snafu(display("failed to consume input event"))]
ConsumeError { source: BrokerError },

@@ -101,24 +108,40 @@ impl BrokerFacade {
"producing rollups claim"
);

let result = self
let last_claim_event = self
.client
.peek_latest(&self.claims_stream)
.await
.context(BrokerInternalSnafu)?;

let claim_produced = match result {
let should_enqueue_claim = match last_claim_event {
Some(event) => {
tracing::trace!(?event, "got last claim produced");
rollups_claim.epoch_index <= event.payload.epoch_index
let last_claim = event.payload;
tracing::trace!(?last_claim, "got last claim from Redis");
let should_enqueue_claim =
rollups_claim.epoch_index > last_claim.epoch_index;

// If this happens, then something is wrong with the dispatcher.
let invalid_indexes =
rollups_claim.first_index != last_claim.last_index + 1;
if should_enqueue_claim && invalid_indexes {
tracing::debug!("rollups_claim.first_index = {}, last_claim.last_index = {}",
rollups_claim.first_index, last_claim.last_index);
return Err(BrokerFacadeError::InvalidIndexes {
expected: last_claim.last_index + 1,
got: rollups_claim.first_index,
});
};

should_enqueue_claim
}
None => {
tracing::trace!("no claims in the stream");
false
true
}
};

if !claim_produced {
if should_enqueue_claim {
self.client
.produce(&self.claims_stream, rollups_claim)
.await
@@ -300,4 +323,78 @@ mod tests {
vec![rollups_claim0, rollups_claim1]
);
}

#[test_log::test(tokio::test)]
async fn test_invalid_indexes_overlapping() {
let docker = Cli::default();
let mut state = TestState::setup(&docker).await;
let rollups_claim1 = RollupsClaim {
dapp_address: Address::new([0xa0; ADDRESS_SIZE]),
epoch_index: 0,
epoch_hash: Hash::new([0xb0; HASH_SIZE]),
first_index: 0,
last_index: 6,
};
let rollups_claim2 = RollupsClaim {
dapp_address: Address::new([0xa0; ADDRESS_SIZE]),
epoch_index: 1,
epoch_hash: Hash::new([0xb0; HASH_SIZE]),
first_index: 6,
last_index: 7,
};
state
.fixture
.produce_rollups_claim(rollups_claim1.clone())
.await;
let result = state
.facade
.produce_rollups_claim(rollups_claim2.clone())
.await;
assert!(result.is_err());
assert_eq!(
BrokerFacadeError::InvalidIndexes {
expected: 7,
got: 6
}
.to_string(),
result.unwrap_err().to_string()
)
}

#[test_log::test(tokio::test)]
async fn test_invalid_indexes_nonsequential() {
let docker = Cli::default();
let mut state = TestState::setup(&docker).await;
let rollups_claim1 = RollupsClaim {
dapp_address: Address::new([0xa0; ADDRESS_SIZE]),
epoch_index: 0,
epoch_hash: Hash::new([0xb0; HASH_SIZE]),
first_index: 0,
last_index: 6,
};
let rollups_claim2 = RollupsClaim {
dapp_address: Address::new([0xa0; ADDRESS_SIZE]),
epoch_index: 1,
epoch_hash: Hash::new([0xb0; HASH_SIZE]),
first_index: 11,
last_index: 14,
};
state
.fixture
.produce_rollups_claim(rollups_claim1.clone())
.await;
let result = state
.facade
.produce_rollups_claim(rollups_claim2.clone())
.await;
assert!(result.is_err());
assert_eq!(
BrokerFacadeError::InvalidIndexes {
expected: 7,
got: 11
}
.to_string(),
result.unwrap_err().to_string()
)
}
}
6 changes: 4 additions & 2 deletions offchain/authority-claimer/src/checker.rs
Original file line number Diff line number Diff line change
@@ -127,14 +127,16 @@ impl DuplicateChecker for DefaultDuplicateChecker {
.flatten() // Back to only one Option
.map(|claim| claim.last_index + 1) // Maps to a number
.unwrap_or(0); // If None, unwrap to 0
tracing::debug!("checking duplicate claim: expected_first_index={}, rollups_claim={:?}",
expected_first_index, rollups_claim);
if rollups_claim.first_index == expected_first_index {
// This claim is the one the blockchain expects, so it is not considered duplicate.
// This claim is the one the blockchain expects, so it is not considered a duplicate.
Ok(false)
} else if rollups_claim.last_index < expected_first_index {
// This claim is already on the blockchain.
Ok(true)
} else {
// This claim is not on blockchain, but it isn't the one blockchain expects.
// This claim is not on the blockchain, but it isn't the one the blockchain expects.
// If this happens, there is a bug on the dispatcher.
Err(DuplicateCheckerError::ClaimMismatch {
expected_first_index,
14 changes: 7 additions & 7 deletions offchain/dispatcher/src/drivers/context.rs
Original file line number Diff line number Diff line change
@@ -65,6 +65,13 @@ impl Context {
broker: &impl BrokerSend,
) -> Result<(), BrokerFacadeError> {
let input_block_number = input.block_added.number.as_u64();
let input_epoch = self.calculate_epoch(input_block_number);
self.last_finished_epoch.map(|last_finished_epoch| {
// Asserting that the calculated epoch comes after the last finished epoch.
// (If last_finished_epoch == None then we don't need the assertion.)
assert!(input_epoch > last_finished_epoch)
});

self.finish_epoch_if_needed(input_block_number, broker)
.await?;

@@ -76,13 +83,6 @@ impl Context {
.inc();

self.inputs_sent += 1;

let input_epoch = self.calculate_epoch(input_block_number);
self.last_finished_epoch.map(|last_finished_epoch| {
// Asserting that the calculated epoch comes after the last finished epoch.
// (If last_finished_epoch == None then we don't need the assertion.)
assert!(input_epoch > last_finished_epoch)
});
self.last_input_epoch = Some(input_epoch);

Ok(())
5 changes: 3 additions & 2 deletions offchain/dispatcher/src/drivers/machine.rs
Original file line number Diff line number Diff line change
@@ -36,8 +36,9 @@ impl MachineDriver {
}
};

let block = block.number.as_u64();
context.finish_epoch_if_needed(block, broker).await?;
let block_number = block.number.as_u64();
tracing::debug!("reacting to standalone block {}", block_number);
context.finish_epoch_if_needed(block_number, broker).await?;

Ok(())
}
4 changes: 2 additions & 2 deletions offchain/dispatcher/src/machine/rollups_broker.rs
Original file line number Diff line number Diff line change
@@ -185,10 +185,10 @@ impl BrokerSend for BrokerFacade {
tracing::info!(?inputs_sent_count, "finishing epoch");

let mut broker = self.broker.lock().await;
let status = self.broker_status(&mut broker).await?;
let status = self.broker_status(&mut broker).await?; // Epoch number gets incremented here!

let event = build_next_finish_epoch(&status);
tracing::trace!(?event, "producing finish epoch event");
tracing::info!(?event, "producing finish epoch event");

epoch_sanity_check!(event, inputs_sent_count);

0 comments on commit b1a9dd2

Please sign in to comment.