Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(rooch-da): add repair command for segment issues #3344

Merged
merged 13 commits into from
Feb 20, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion crates/rooch-da/src/actor/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ impl Submitter {
tx_order_start,
tx_order_end,
&tx_list,
self.sequencer_key.copy(),
&self.sequencer_key,
)?;
let batch_meta = batch.meta.clone();
let meta_signature = batch.meta_signature.clone();
Expand Down
13 changes: 12 additions & 1 deletion crates/rooch-pipeline-processor/src/actor/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,14 @@ impl PipelineProcessorActor {
tx_order,
err
);
// try to revert this tx for DA, error may cause by network or other reasons
let ret = self
.da_server
.revert_tx(RevertTransactionMessage { tx_order })
.await; // if revert public failed, only pause runtime DA state, easy to monitor and restart service will fix it
if let Err(e) = ret {
tracing::error!("Revert public tx failed, error: {:?}", e);
}
self.rooch_db.revert_tx(ledger_tx.tx_hash())?;
Err(err)
}
Expand Down Expand Up @@ -232,10 +240,13 @@ impl PipelineProcessorActor {
return;
}
}
let _ = self
let ret = self
.da_server
.revert_tx(RevertTransactionMessage { tx_order })
.await; // if revert public failed, only pause runtime DA state, easy to monitor and restart service will fix it
if let Err(e) = ret {
tracing::error!("Revert public tx failed, error: {:?}", e);
}
let ret = self.rooch_db.revert_tx(tx_hash);
if let Err(e) = ret {
tracing::error!(
Expand Down
4 changes: 2 additions & 2 deletions crates/rooch-types/src/da/batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ impl DABatch {
tx_order_start: u64,
tx_order_end: u64,
tx_list: &Vec<LedgerTransaction>,
sequencer_key: RoochKeyPair,
sequencer_key: &RoochKeyPair,
) -> anyhow::Result<Self> {
// Verify transaction ordering constraints before signing
verify_tx_order(block_number, tx_list, tx_order_start, tx_order_end)?;
Expand All @@ -127,7 +127,7 @@ impl DABatch {
let batch_meta = DABatchMeta::new(block_number, tx_order_start, tx_order_end, tx_list_hash);
let meta_bytes = bcs::to_bytes(&batch_meta).expect("encode batch_meta should success");
let meta_hash = sha2_256_of(&meta_bytes);
let meta_signature = Signature::sign(&meta_hash.0, &sequencer_key)
let meta_signature = Signature::sign(&meta_hash.0, sequencer_key)
.as_ref()
.to_vec();

Expand Down
2 changes: 1 addition & 1 deletion crates/rooch-types/src/da/chunk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ mod tests {
.map(|i| random_ledger_transaction_with_order(i as u64 + 1, &keypair))
.collect::<Vec<_>>();
let batch =
DABatch::new(123, 1, 128, &tx_list, keypair).expect("create batch should success");
DABatch::new(123, 1, 128, &tx_list, &keypair).expect("create batch should success");

let chunk = ChunkV0::from(batch.clone());
let segments = chunk.to_segments(1023);
Expand Down
1 change: 1 addition & 0 deletions crates/rooch/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ ciborium = { workspace = true }
wasmer = { workspace = true }
tiny-keccak = { workspace = true }
reqwest = { workspace = true }
rocksdb = { workspace = true }

move-binary-format = { workspace = true }
move-cli = { workspace = true }
Expand Down
2 changes: 1 addition & 1 deletion crates/rooch/src/commands/da/commands/exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -688,7 +688,7 @@ impl ExecInner {
return Err(error);
}

tracing::warn!(
warn!(
"L2 Tx execution failed with a non-VM panic error. Ignoring and returning Ok; tx_order: {}, error: {:?}",
tx_order,
error
Expand Down
49 changes: 48 additions & 1 deletion crates/rooch/src/commands/da/commands/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ use rooch_config::RoochOpt;
use rooch_db::RoochDB;
use rooch_rpc_client::Client;
use rooch_store::RoochStore;
use rooch_types::da::chunk::chunk_from_segments;
use rooch_types::crypto::RoochKeyPair;
use rooch_types::da::batch::DABatch;
use rooch_types::da::chunk::{chunk_from_segments, Chunk, ChunkV0};
use rooch_types::da::segment::{segment_from_bytes, SegmentID};
use rooch_types::rooch_network::RoochChainID;
use rooch_types::sequencer::SequencerInfo;
Expand All @@ -40,9 +42,12 @@ pub mod exec;
pub mod index;
pub mod namespace;
pub mod pack;
pub mod repair;
pub mod unpack;
pub mod verify;

const DEFAULT_MAX_SEGMENT_SIZE: usize = 4 * 1024 * 1024;

pub(crate) struct SequencedTxStore {
tx_accumulator: MerkleAccumulator,
last_sequenced_tx_order: AtomicU64,
Expand Down Expand Up @@ -544,11 +549,25 @@ impl TxMetaStore {
let mut exp_roots = HashMap::new();
let mut max_verified_tx_order = 0;

// tx orders with the same hash, the tx info stored in the db is the latest one
// need to skip these tx orders
// tx_order, tx_order_with_same_hash, tx_hash, block_number
// 84658122,84658124,0x4854d82441239cda0805b963b404a007ecbc6366ce3fd4c37572cdb692624430,373386
// 84659999,84660000,0x44955c646f81defdcd4db2d038fc52f50949e943b59dc02757587f05e14ef007,373386
// 84706263,84706267,0xbb3bf39a48f85413e43f09de8a16b90237379f33abd77a8e6ea3c19ca96f2bc0,373392
// 84706265,84706267,0xbb3bf39a48f85413e43f09de8a16b90237379f33abd77a8e6ea3c19ca96f2bc0,373392
// 84706266,84706267,0xbb3bf39a48f85413e43f09de8a16b90237379f33abd77a8e6ea3c19ca96f2bc0,373392
let tx_order_with_dup_hash_issue: Vec<u64> =
vec![84658122, 84659999, 84706263, 84706265, 84706266];

let mut reader = BufReader::new(File::open(exp_roots_path)?);
for line in reader.by_ref().lines() {
let line = line?;
let parts: Vec<&str> = line.split(':').collect();
let tx_order = parts[0].parse::<u64>()?;
if tx_order_with_dup_hash_issue.contains(&tx_order) {
continue;
}
let state_root_raw = parts[1];
let state_root = if state_root_raw == "null" {
H256::zero()
Expand Down Expand Up @@ -984,3 +1003,31 @@ impl TxPositionIndexer {
Ok(())
}
}

fn write_down_segments(
chunk_id: u128,
tx_order_start: u64,
tx_order_end: u64,
tx_list: &Vec<LedgerTransaction>,
sequencer_keypair: &RoochKeyPair,
segment_dir: PathBuf,
) -> anyhow::Result<()> {
let batch = DABatch::new(
chunk_id,
tx_order_start,
tx_order_end,
tx_list,
sequencer_keypair,
)?;
// ensure the batch is valid
batch.verify(true)?;

let segments = ChunkV0::from(batch).to_segments(DEFAULT_MAX_SEGMENT_SIZE);
for segment in segments.iter() {
let segment_path = segment_dir.join(segment.get_id().to_string());
let mut writer = File::create(segment_path)?;
writer.write_all(&segment.to_bytes())?;
writer.flush()?;
}
Ok(())
}
22 changes: 5 additions & 17 deletions crates/rooch/src/commands/da/commands/pack.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,15 @@
// SPDX-License-Identifier: Apache-2.0

use crate::cli_types::WalletContextOptions;
use crate::commands::da::commands::write_down_segments;
use crate::utils::get_sequencer_keypair;
use clap::Parser;
use rooch_types::da::batch::DABatch;
use rooch_types::da::chunk::{Chunk, ChunkV0};
use rooch_types::error::RoochResult;
use rooch_types::transaction::LedgerTransaction;
use std::fs::File;
use std::io::{BufRead, BufReader, Read, Write};
use std::io::{BufRead, BufReader, Read};
use std::path::PathBuf;

const DEFAULT_MAX_SEGMENT_SIZE: usize = 4 * 1024 * 1024;

/// Unpack human-readable LedgerTransaction List to segments.
#[derive(Debug, Parser)]
pub struct PackCommand {
Expand Down Expand Up @@ -44,23 +41,14 @@ impl PackCommand {
let tx_order_start = tx_list.first().unwrap().sequence_info.tx_order;
let tx_order_end = tx_list.last().unwrap().sequence_info.tx_order;

let batch = DABatch::new(
write_down_segments(
self.chunk_id,
tx_order_start,
tx_order_end,
&tx_list,
sequencer_keypair,
&sequencer_keypair,
self.segment_dir,
)?;
// ensure the batch is valid
batch.verify(true)?;

let segments = ChunkV0::from(batch).to_segments(DEFAULT_MAX_SEGMENT_SIZE);
for segment in segments.iter() {
let segment_path = self.segment_dir.join(segment.get_id().to_string());
let mut writer = File::create(segment_path)?;
writer.write_all(&segment.to_bytes())?;
writer.flush()?;
}

Ok(())
}
Expand Down
Loading
Loading