Skip to content

Commit

Permalink
fix(consensus)!: remove substate value when substate is DOWN
Browse files Browse the repository at this point in the history
  • Loading branch information
sdbondi committed Jan 31, 2025
1 parent ec0eb2e commit 4777a6e
Show file tree
Hide file tree
Showing 32 changed files with 284 additions and 242 deletions.
40 changes: 6 additions & 34 deletions applications/tari_indexer/src/event_scanner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,15 @@ use tari_bor::decode;
use tari_crypto::tari_utilities::message_format::MessageFormat;
use tari_dan_common_types::{committee::Committee, Epoch, PeerAddress, ShardGroup};
use tari_dan_p2p::proto::rpc::{GetTransactionResultRequest, PayloadResultStatus, SyncBlocksRequest};
use tari_dan_storage::consensus_models::{Block, BlockId, Decision, TransactionRecord};
use tari_dan_storage::consensus_models::{Block, BlockId, Decision};
use tari_engine_types::{
commit_result::{ExecuteResult, TransactionResult},
events::Event,
substate::{Substate, SubstateId, SubstateValue},
};
use tari_epoch_manager::{base_layer::EpochManagerHandle, EpochManagerReader};
use tari_template_lib::models::{EntityId, TemplateAddress};
use tari_transaction::{Transaction, TransactionId};
use tari_transaction::TransactionId;
use tari_validator_node_rpc::client::{TariValidatorNodeRpcClientFactory, ValidatorNodeClientFactory};

use crate::{
Expand Down Expand Up @@ -563,6 +563,10 @@ impl EventScanner {
.sync_blocks(SyncBlocksRequest {
start_block_id: start_block_id.map(|id| id.as_bytes().to_vec()).unwrap_or_default(),
epoch: Some(up_to_epoch.into()),
// TODO: QC should be validated
stream_qcs: false,
stream_substates: false,
stream_transactions: false,
})
.await?;
while let Some(resp) = stream.next().await {
Expand All @@ -573,38 +577,6 @@ impl EventScanner {
.ok_or_else(|| anyhow::anyhow!("Expected peer to return a newblock"))?;
let block = Block::try_from(new_block)?;

let Some(_) = stream.next().await else {
anyhow::bail!("Peer closed session before sending QC message")
};

let Some(resp) = stream.next().await else {
anyhow::bail!("Peer closed session before sending substate update count message")
};
let msg = resp?;
let num_substates =
msg.substate_count()
.ok_or_else(|| anyhow::anyhow!("Expected peer to return substate count"))? as usize;

for _ in 0..num_substates {
let Some(_) = stream.next().await else {
anyhow::bail!("Peer closed session before sending substate updates message")
};
}

let Some(resp) = stream.next().await else {
anyhow::bail!("Peer closed session before sending transactions message")
};
let msg = resp?;
let transactions = msg
.into_transactions()
.ok_or_else(|| anyhow::anyhow!("Expected peer to return transactions"))?;

let _transactions = transactions
.into_iter()
.map(Transaction::try_from)
.map(|r| r.map(TransactionRecord::new))
.collect::<Result<Vec<_>, _>>()?;

blocks.push(block);
}

Expand Down
2 changes: 1 addition & 1 deletion applications/tari_indexer_web_ui/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

21 changes: 17 additions & 4 deletions applications/tari_validator_node/src/json_rpc/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -231,9 +231,22 @@ impl JsonRpcHandlers {

let tx = self.state_store.create_read_tx().unwrap();
match SubstateRecord::get(&tx, &request.address).optional() {
Ok(Some(state)) => Ok(JsonRpcResponse::success(answer_id, GetStateResponse {
data: state.into_substate().to_bytes(),
})),
Ok(Some(state)) => {
let Some(substate) = state.into_substate() else {
return Err(JsonRpcResponse::error(
answer_id,
JsonRpcError::new(
JsonRpcErrorReason::ApplicationError(100),
format!("Substate {} is DOWN", request.address),
json::Value::Null,
),
));
};

Ok(JsonRpcResponse::success(answer_id, GetStateResponse {
data: substate.to_bytes(),
}))
},
Ok(None) => Err(JsonRpcResponse::error(
answer_id,
JsonRpcError::new(
Expand Down Expand Up @@ -372,7 +385,7 @@ impl JsonRpcHandlers {
Some(substate) => Ok(JsonRpcResponse::success(answer_id, GetSubstateResponse {
status: SubstateStatus::Up,
created_by_tx: Some(substate.created_by_transaction),
value: Some(substate.into_substate_value()),
value: substate.into_substate_value(),
})),
None => Ok(JsonRpcResponse::success(answer_id, GetSubstateResponse {
status: SubstateStatus::DoesNotExist,
Expand Down
150 changes: 101 additions & 49 deletions applications/tari_validator_node/src/p2p/rpc/block_sync_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@ use std::collections::HashSet;

use log::*;
use tari_dan_common_types::{optional::Optional, Epoch};
use tari_dan_p2p::proto::rpc::{sync_blocks_response::SyncData, QuorumCertificates, SyncBlocksResponse, Transactions};
use tari_dan_p2p::{
proto,
proto::rpc::{sync_blocks_response::SyncData, QuorumCertificates, SyncBlocksResponse},
};
use tari_dan_storage::{
consensus_models::{Block, BlockId, QuorumCertificate, SubstateUpdate, TransactionRecord},
StateStore,
Expand All @@ -19,12 +22,12 @@ const LOG_TARGET: &str = "tari::dan::rpc::sync_task";

const BLOCK_BUFFER_SIZE: usize = 15;

type BlockData = (
Block,
Vec<QuorumCertificate>,
Vec<SubstateUpdate>,
Vec<TransactionRecord>,
);
struct BlockData {
block: Block,
qcs: Vec<QuorumCertificate>,
substates: Vec<SubstateUpdate>,
transactions: Vec<TransactionRecord>,
}
type BlockBuffer = Vec<BlockData>;

pub struct BlockSyncTask<TStateStore: StateStore> {
Expand All @@ -49,12 +52,12 @@ impl<TStateStore: StateStore> BlockSyncTask<TStateStore> {
}
}

pub async fn run(mut self) -> Result<(), ()> {
pub async fn run(mut self, req: proto::rpc::SyncBlocksRequest) -> Result<(), ()> {
let mut buffer = Vec::with_capacity(BLOCK_BUFFER_SIZE);
let mut current_block_id = self.start_block_id;
let mut counter = 0;
loop {
match self.fetch_next_batch(&mut buffer, &current_block_id) {
match self.fetch_next_batch(&mut buffer, &current_block_id, &req) {
Ok(last_block) => {
current_block_id = last_block;
},
Expand All @@ -74,7 +77,7 @@ impl<TStateStore: StateStore> BlockSyncTask<TStateStore> {

counter += buffer.len();
for data in buffer.drain(..) {
self.send_block_data(data).await?;
self.send_block_data(&req, data).await?;
}

// If we didn't fill up the buffer, send the final blocks
Expand All @@ -99,13 +102,18 @@ impl<TStateStore: StateStore> BlockSyncTask<TStateStore> {
);

for data in buffer.drain(..) {
self.send_block_data(data).await?;
self.send_block_data(&req, data).await?;
}

Ok(())
}

fn fetch_next_batch(&self, buffer: &mut BlockBuffer, current_block_id: &BlockId) -> Result<BlockId, StorageError> {
fn fetch_next_batch(
&self,
buffer: &mut BlockBuffer,
current_block_id: &BlockId,
req: &proto::rpc::SyncBlocksRequest,
) -> Result<BlockId, StorageError> {
self.store.with_read_tx(|tx| {
let mut current_block_id = *current_block_id;
let mut last_block_id = current_block_id;
Expand Down Expand Up @@ -143,17 +151,35 @@ impl<TStateStore: StateStore> BlockSyncTask<TStateStore> {
}

last_block_id = current_block_id;
let all_qcs = child
.commands()
.iter()
.filter_map(|cmd| cmd.transaction())
.flat_map(|transaction| transaction.evidence.qc_ids_iter())
.collect::<HashSet<_>>();
let all_qcs = req
.stream_qcs
.then(|| {
child
.commands()
.iter()
.filter_map(|cmd| cmd.transaction())
.flat_map(|transaction| transaction.evidence.qc_ids_iter())
.collect::<HashSet<_>>()
})
.unwrap_or_default();
let certificates = QuorumCertificate::get_all(tx, all_qcs)?;
let updates = child.get_substate_updates(tx)?;
let transactions = child.get_transactions(tx)?;
let updates = req
.stream_substates
.then(|| child.get_substate_updates(tx))
.transpose()?
.unwrap_or_default();
let transactions = req
.stream_transactions
.then(|| child.get_transactions(tx))
.transpose()?
.unwrap_or_default();

buffer.push((child, certificates, updates, transactions));
buffer.push(BlockData {
block: child,
qcs: certificates,
substates: updates,
transactions,
});
if buffer.len() == buffer.capacity() {
break;
}
Expand Down Expand Up @@ -226,43 +252,69 @@ impl<TStateStore: StateStore> BlockSyncTask<TStateStore> {
Ok(())
}

async fn send_block_data(&mut self, (block, qcs, updates, transactions): BlockData) -> Result<(), ()> {
async fn send_block_data(&mut self, req: &proto::rpc::SyncBlocksRequest, data: BlockData) -> Result<(), ()> {
let BlockData {
block,
qcs,
substates: updates,
transactions,
} = data;
self.send(Ok(SyncBlocksResponse {
sync_data: Some(SyncData::Block((&block).into())),
}))
.await?;
self.send(Ok(SyncBlocksResponse {
sync_data: Some(SyncData::QuorumCertificates(QuorumCertificates {
quorum_certificates: qcs.iter().map(Into::into).collect(),
})),
}))
.await?;
match u32::try_from(updates.len()) {
Ok(count) => {
self.send(Ok(SyncBlocksResponse {
sync_data: Some(SyncData::SubstateCount(count)),
}))
.await?;
},
Err(_) => {
self.send(Err(RpcStatus::general("number of substates exceeds u32")))
.await?;
return Err(());
},
}
for update in updates {

if req.stream_qcs {
self.send(Ok(SyncBlocksResponse {
sync_data: Some(SyncData::SubstateUpdate(update.into())),
sync_data: Some(SyncData::QuorumCertificates(QuorumCertificates {
quorum_certificates: qcs.iter().map(Into::into).collect(),
})),
}))
.await?;
}
if req.stream_substates {
match u32::try_from(updates.len()) {
Ok(count) => {
self.send(Ok(SyncBlocksResponse {
sync_data: Some(SyncData::SubstateCount(count)),
}))
.await?;
},
Err(_) => {
self.send(Err(RpcStatus::general("number of substates exceeds u32")))
.await?;
return Err(());
},
}
for update in updates {
self.send(Ok(SyncBlocksResponse {
sync_data: Some(SyncData::SubstateUpdate(update.into())),
}))
.await?;
}
}

self.send(Ok(SyncBlocksResponse {
sync_data: Some(SyncData::Transactions(Transactions {
transactions: transactions.iter().map(|t| &t.transaction).map(Into::into).collect(),
})),
}))
.await?;
if req.stream_transactions {
match u32::try_from(transactions.len()) {
Ok(count) => {
self.send(Ok(SyncBlocksResponse {
sync_data: Some(SyncData::TransactionCount(count)),
}))
.await?;
},
Err(_) => {
self.send(Err(RpcStatus::general("number of substates exceeds u32")))
.await?;
return Err(());
},
}
for transaction in transactions {
self.send(Ok(SyncBlocksResponse {
sync_data: Some(SyncData::Transaction(transaction.transaction().into())),
}))
.await?;
}
}

Ok(())
}
Expand Down
10 changes: 7 additions & 3 deletions applications/tari_validator_node/src/p2p/rpc/service_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,10 @@ impl ValidatorNodeRpcService for ValidatorNodeRpcServiceImpl {
status: SubstateStatus::Up as i32,
address: substate.substate_id().to_bytes(),
version: substate.version(),
substate: substate.substate_value().to_bytes(),
substate: substate
.substate_value()
.map(|v| v.to_bytes())
.ok_or_else(|| RpcStatus::general("NEVER HAPPEN: UP substate has no value"))?,
created_transaction_hash: substate.created_by_transaction().into_array().to_vec(),
destroyed_transaction_hash: vec![],
quorum_certificates: vec![(&created_qc).into()],
Expand Down Expand Up @@ -278,7 +281,7 @@ impl ValidatorNodeRpcService for ValidatorNodeRpcServiceImpl {
.await
.map_err(RpcStatus::log_internal_error(LOG_TARGET))?;

let start_block_id = Some(req.start_block_id)
let start_block_id = Some(req.start_block_id.as_slice())
.filter(|i| !i.is_empty())
.map(BlockId::try_from)
.transpose()
Expand All @@ -299,6 +302,7 @@ impl ValidatorNodeRpcService for ValidatorNodeRpcServiceImpl {
None => {
let epoch = req
.epoch
.clone()
.map(Epoch::from)
.map(|end| end.min(current_epoch))
.unwrap_or(current_epoch);
Expand All @@ -324,7 +328,7 @@ impl ValidatorNodeRpcService for ValidatorNodeRpcServiceImpl {
};

let (sender, receiver) = mpsc::channel(10);
task::spawn(BlockSyncTask::new(self.shard_state_store.clone(), start_block_id, None, sender).run());
task::spawn(BlockSyncTask::new(self.shard_state_store.clone(), start_block_id, None, sender).run(req));

Ok(Streaming::new(receiver))
}
Expand Down
2 changes: 1 addition & 1 deletion applications/tari_validator_node/src/state_bootstrap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ where
SubstateRecord {
version: id.version(),
substate_id: id.into_substate_id(),
substate_value: value.into(),
substate_value: Some(value.into()),
state_hash: Default::default(),
created_by_transaction: Default::default(),
created_justify: *genesis_block.justify().id(),
Expand Down
Loading

0 comments on commit 4777a6e

Please sign in to comment.