Skip to content

Commit 07c1668

Browse files
authored
feat: optimize insert_batch_consolidation_outcome (#390)
* feat: optimize insert_batch_consolidation_outcome * feat: answer comments
1 parent 0152748 commit 07c1668

File tree

3 files changed

+91
-86
lines changed

3 files changed

+91
-86
lines changed

crates/database/db/src/db.rs

Lines changed: 23 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -320,18 +320,6 @@ impl DatabaseWriteOperations for Database {
320320
)
321321
}
322322

323-
async fn insert_block(
324-
&self,
325-
block_info: BlockInfo,
326-
batch_info: BatchInfo,
327-
) -> Result<(), DatabaseError> {
328-
metered!(
329-
DatabaseOperation::InsertBlock,
330-
self,
331-
tx_mut(move |tx| async move { tx.insert_block(block_info, batch_info).await })
332-
)
333-
}
334-
335323
async fn insert_genesis_block(&self, genesis_hash: B256) -> Result<(), DatabaseError> {
336324
metered!(
337325
DatabaseOperation::InsertGenesisBlock,
@@ -354,16 +342,16 @@ impl DatabaseWriteOperations for Database {
354342
)
355343
}
356344

357-
async fn update_l1_messages_with_l2_block(
345+
async fn update_l1_messages_with_l2_blocks(
358346
&self,
359-
block_info: L2BlockInfoWithL1Messages,
347+
blocks: Vec<L2BlockInfoWithL1Messages>,
360348
) -> Result<(), DatabaseError> {
361349
metered!(
362350
DatabaseOperation::UpdateL1MessagesWithL2Block,
363351
self,
364352
tx_mut(move |tx| {
365-
let block_info = block_info.clone();
366-
async move { tx.update_l1_messages_with_l2_block(block_info).await }
353+
let blocks = blocks.clone();
354+
async move { tx.update_l1_messages_with_l2_blocks(blocks).await }
367355
})
368356
)
369357
}
@@ -854,12 +842,14 @@ mod test {
854842
let batch_info: BatchInfo = data.clone().into();
855843
db.insert_batch(data).await.unwrap();
856844

845+
let mut blocks = Vec::new();
857846
for _ in 0..10 {
858847
let block_info =
859848
BlockInfo { number: block_number, hash: B256::arbitrary(&mut u).unwrap() };
860-
db.insert_block(block_info, batch_info).await.unwrap();
861849
block_number += 1;
850+
blocks.push(block_info);
862851
}
852+
db.insert_blocks(blocks, batch_info).await.unwrap();
863853

864854
// Fetch the highest block for the batch hash and verify number.
865855
let highest_block_info =
@@ -893,12 +883,14 @@ mod test {
893883
db.insert_batch(first_batch).await.unwrap();
894884
db.insert_batch(second_batch).await.unwrap();
895885

886+
let mut blocks = Vec::new();
896887
for _ in 0..10 {
897888
let block_info =
898889
BlockInfo { number: block_number, hash: B256::arbitrary(&mut u).unwrap() };
899-
db.insert_block(block_info, first_batch_info).await.unwrap();
900890
block_number += 1;
891+
blocks.push(block_info);
901892
}
893+
db.insert_blocks(blocks, first_batch_info).await.unwrap();
902894

903895
// Fetch the highest block for the batch hash and verify number.
904896
let highest_block_info =
@@ -1136,10 +1128,9 @@ mod test {
11361128
let mut block_infos = Vec::new();
11371129
for i in 200..205 {
11381130
let block_info = BlockInfo { number: i, hash: B256::arbitrary(&mut u).unwrap() };
1139-
let l2_block = block_info;
11401131
block_infos.push(block_info);
1141-
db.insert_block(l2_block, batch_info).await.unwrap();
11421132
}
1133+
db.insert_blocks(block_infos.clone(), batch_info).await.unwrap();
11431134

11441135
// Test getting existing blocks
11451136
for expected_block in block_infos {
@@ -1177,9 +1168,7 @@ mod test {
11771168
let safe_block_1 = BlockInfo { number: 200, hash: B256::arbitrary(&mut u).unwrap() };
11781169
let safe_block_2 = BlockInfo { number: 201, hash: B256::arbitrary(&mut u).unwrap() };
11791170

1180-
db.insert_block(safe_block_1, batch_info).await.unwrap();
1181-
1182-
db.insert_block(safe_block_2, batch_info).await.unwrap();
1171+
db.insert_blocks(vec![safe_block_1, safe_block_2], batch_info).await.unwrap();
11831172

11841173
// Should return the highest safe block (block 201)
11851174
let latest_safe = db.get_latest_safe_l2_info().await.unwrap();
@@ -1198,11 +1187,12 @@ mod test {
11981187

11991188
// Insert multiple L2 blocks with batch info
12001189
let batch_info = BatchInfo { index: 0, hash: B256::default() };
1190+
let mut blocks = Vec::new();
12011191
for i in 400..410 {
12021192
let block_info = BlockInfo { number: i, hash: B256::arbitrary(&mut u).unwrap() };
1203-
1204-
db.insert_block(block_info, batch_info).await.unwrap();
1193+
blocks.push(block_info);
12051194
}
1195+
db.insert_blocks(blocks, batch_info).await.unwrap();
12061196

12071197
// Delete blocks with number > 405
12081198
let deleted_count = db.delete_l2_blocks_gt_block_number(405).await.unwrap();
@@ -1245,10 +1235,9 @@ mod test {
12451235
for i in 100..110 {
12461236
let batch_data = db.get_batch_by_index(i).await.unwrap().unwrap();
12471237
let batch_info: BatchInfo = batch_data.into();
1248-
12491238
let block_info = BlockInfo { number: 500 + i, hash: B256::arbitrary(&mut u).unwrap() };
12501239

1251-
db.insert_block(block_info, batch_info).await.unwrap();
1240+
db.insert_blocks(vec![block_info], batch_info).await.unwrap();
12521241
}
12531242

12541243
// Delete L2 blocks with batch index > 105
@@ -1304,8 +1293,8 @@ mod test {
13041293
L2BlockInfoWithL1Messages { block_info, l1_messages: l1_message_hashes.clone() };
13051294

13061295
// Insert block
1307-
db.insert_block(l2_block.block_info, batch_info).await.unwrap();
1308-
db.update_l1_messages_with_l2_block(l2_block).await.unwrap();
1296+
db.insert_blocks(vec![l2_block.block_info], batch_info).await.unwrap();
1297+
db.update_l1_messages_with_l2_blocks(vec![l2_block]).await.unwrap();
13091298

13101299
// Verify block was inserted
13111300
let retrieved_block = db.get_l2_block_info_by_number(500).await.unwrap();
@@ -1340,7 +1329,7 @@ mod test {
13401329

13411330
// Insert initial block
13421331
let block_info = BlockInfo { number: 600, hash: B256::arbitrary(&mut u).unwrap() };
1343-
db.insert_block(block_info, batch_info_1).await.unwrap();
1332+
db.insert_blocks(vec![block_info], batch_info_1).await.unwrap();
13441333

13451334
// Verify initial insertion
13461335
let retrieved_block = db.get_l2_block_info_by_number(600).await.unwrap();
@@ -1359,7 +1348,7 @@ mod test {
13591348
assert_eq!(initial_batch_info, batch_info_1);
13601349

13611350
// Update the same block with different batch info (upsert)
1362-
db.insert_block(block_info, batch_info_2).await.unwrap();
1351+
db.insert_blocks(vec![block_info], batch_info_2).await.unwrap();
13631352

13641353
// Verify the block still exists and was updated
13651354
let retrieved_block = db.get_l2_block_info_by_number(600).await.unwrap().unwrap();
@@ -1393,23 +1382,22 @@ mod test {
13931382
let block_1 = BlockInfo { number: 1, hash: B256::arbitrary(&mut u).unwrap() };
13941383
let block_2 = BlockInfo { number: 2, hash: B256::arbitrary(&mut u).unwrap() };
13951384
db.insert_batch(batch_data_1.clone()).await.unwrap();
1396-
db.insert_block(block_1, batch_data_1.clone().into()).await.unwrap();
1397-
db.insert_block(block_2, batch_data_1.clone().into()).await.unwrap();
1385+
db.insert_blocks(vec![block_1, block_2], batch_data_1.clone().into()).await.unwrap();
13981386

13991387
// Insert batch 2 and associate it with one block in the database
14001388
let batch_data_2 =
14011389
BatchCommitData { index: 2, block_number: 20, ..Arbitrary::arbitrary(&mut u).unwrap() };
14021390
let block_3 = BlockInfo { number: 3, hash: B256::arbitrary(&mut u).unwrap() };
14031391
db.insert_batch(batch_data_2.clone()).await.unwrap();
1404-
db.insert_block(block_3, batch_data_2.clone().into()).await.unwrap();
1392+
db.insert_blocks(vec![block_3], batch_data_2.clone().into()).await.unwrap();
14051393

14061394
// Insert batch 3 produced at the same block number as batch 2 and associate it with one
14071395
// block
14081396
let batch_data_3 =
14091397
BatchCommitData { index: 3, block_number: 20, ..Arbitrary::arbitrary(&mut u).unwrap() };
14101398
let block_4 = BlockInfo { number: 4, hash: B256::arbitrary(&mut u).unwrap() };
14111399
db.insert_batch(batch_data_3.clone()).await.unwrap();
1412-
db.insert_block(block_4, batch_data_3.clone().into()).await.unwrap();
1400+
db.insert_blocks(vec![block_4], batch_data_3.clone().into()).await.unwrap();
14131401

14141402
db.set_finalized_l1_block_number(21).await.unwrap();
14151403

crates/database/db/src/operations.rs

Lines changed: 67 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ use rollup_node_primitives::{
88
};
99
use scroll_alloy_rpc_types_engine::BlockDataHint;
1010
use sea_orm::{
11-
sea_query::{Expr, OnConflict},
11+
sea_query::{CaseStatement, Expr, OnConflict},
1212
ColumnTrait, Condition, DbErr, EntityTrait, QueryFilter, QueryOrder, QuerySelect,
1313
};
1414
use std::fmt;
@@ -99,13 +99,6 @@ pub trait DatabaseWriteOperations {
9999
batch_info: BatchInfo,
100100
) -> Result<(), DatabaseError>;
101101

102-
/// Insert a new block in the database.
103-
async fn insert_block(
104-
&self,
105-
block_info: BlockInfo,
106-
batch_info: BatchInfo,
107-
) -> Result<(), DatabaseError>;
108-
109102
/// Insert the genesis block into the database.
110103
async fn insert_genesis_block(&self, genesis_hash: B256) -> Result<(), DatabaseError>;
111104

@@ -116,9 +109,9 @@ pub trait DatabaseWriteOperations {
116109
) -> Result<(), DatabaseError>;
117110

118111
/// Update the executed L1 messages with the provided L2 block number in the database.
119-
async fn update_l1_messages_with_l2_block(
112+
async fn update_l1_messages_with_l2_blocks(
120113
&self,
121-
block_info: L2BlockInfoWithL1Messages,
114+
block_info: Vec<L2BlockInfoWithL1Messages>,
122115
) -> Result<(), DatabaseError>;
123116

124117
/// Purge all L1 message to L2 block mappings from the database for blocks greater or equal to
@@ -426,29 +419,18 @@ impl<T: WriteConnectionProvider + ?Sized + Sync> DatabaseWriteOperations for T {
426419
&self,
427420
blocks: Vec<BlockInfo>,
428421
batch_info: BatchInfo,
429-
) -> Result<(), DatabaseError> {
430-
for block in blocks {
431-
self.insert_block(block, batch_info).await?;
432-
}
433-
Ok(())
434-
}
435-
436-
async fn insert_block(
437-
&self,
438-
block_info: BlockInfo,
439-
batch_info: BatchInfo,
440422
) -> Result<(), DatabaseError> {
441423
// We only insert safe blocks into the database, we do not persist unsafe blocks.
442424
tracing::trace!(
443425
target: "scroll::db",
444426
batch_hash = ?batch_info.hash,
445427
batch_index = batch_info.index,
446-
block_number = block_info.number,
447-
block_hash = ?block_info.hash,
448-
"Inserting block into database."
428+
blocks = ?blocks,
429+
"Inserting blocks into database."
449430
);
450-
let l2_block: models::l2_block::ActiveModel = (block_info, batch_info).into();
451-
models::l2_block::Entity::insert(l2_block)
431+
let l2_blocks: Vec<models::l2_block::ActiveModel> =
432+
blocks.into_iter().map(|b| (b, batch_info).into()).collect();
433+
models::l2_block::Entity::insert_many(l2_blocks)
452434
.on_conflict(
453435
OnConflict::column(models::l2_block::Column::BlockNumber)
454436
.update_columns([
@@ -458,6 +440,7 @@ impl<T: WriteConnectionProvider + ?Sized + Sync> DatabaseWriteOperations for T {
458440
])
459441
.to_owned(),
460442
)
443+
.on_empty_do_nothing()
461444
.exec(self.get_connection())
462445
.await?;
463446

@@ -467,7 +450,7 @@ impl<T: WriteConnectionProvider + ?Sized + Sync> DatabaseWriteOperations for T {
467450
async fn insert_genesis_block(&self, genesis_hash: B256) -> Result<(), DatabaseError> {
468451
let genesis_block = BlockInfo::new(0, genesis_hash);
469452
let genesis_batch = BatchInfo::new(0, B256::ZERO);
470-
self.insert_block(genesis_block, genesis_batch).await
453+
self.insert_blocks(vec![genesis_block], genesis_batch).await
471454
}
472455

473456
async fn update_l1_messages_from_l2_blocks(
@@ -481,31 +464,63 @@ impl<T: WriteConnectionProvider + ?Sized + Sync> DatabaseWriteOperations for T {
481464
.await?;
482465

483466
// Then, update the executed L1 messages for each block.
484-
for block in blocks {
485-
self.update_l1_messages_with_l2_block(block).await?;
486-
}
467+
self.update_l1_messages_with_l2_blocks(blocks).await?;
468+
487469
Ok(())
488470
}
489471

490-
async fn update_l1_messages_with_l2_block(
472+
async fn update_l1_messages_with_l2_blocks(
491473
&self,
492-
block_info: L2BlockInfoWithL1Messages,
474+
blocks: Vec<L2BlockInfoWithL1Messages>,
493475
) -> Result<(), DatabaseError> {
494-
tracing::trace!(
495-
target: "scroll::db",
496-
block_number = block_info.block_info.number,
497-
l1_messages = ?block_info.l1_messages,
498-
"Updating executed L1 messages from block with L2 block number in the database."
499-
);
500-
models::l1_message::Entity::update_many()
501-
.col_expr(
502-
models::l1_message::Column::L2BlockNumber,
476+
if blocks.is_empty() {
477+
return Ok(());
478+
}
479+
let start = blocks.first().unwrap().block_info.number;
480+
let end = blocks.last().unwrap().block_info.number;
481+
tracing::trace!(target: "scroll::db", start_block = start, end_block = end, "Updating executed L1 messages from blocks with L2 block number in the database.");
482+
483+
let mut case = CaseStatement::new();
484+
let mut all_hashes = Vec::new();
485+
486+
for block_info in blocks {
487+
if block_info.l1_messages.is_empty() {
488+
continue;
489+
}
490+
491+
tracing::trace!(
492+
target: "scroll::db",
493+
block_number = block_info.block_info.number,
494+
l1_messages = ?block_info.l1_messages,
495+
"Including L1 messages from block in batch update."
496+
);
497+
498+
let hashes: Vec<Vec<u8>> = block_info.l1_messages.iter().map(|x| x.to_vec()).collect();
499+
500+
case = case.case(
501+
models::l1_message::Column::Hash.is_in(hashes.clone()),
503502
Expr::value(block_info.block_info.number as i64),
504-
)
505-
.filter(
506-
models::l1_message::Column::Hash
507-
.is_in(block_info.l1_messages.iter().map(|x| x.to_vec())),
508-
)
503+
);
504+
505+
all_hashes.extend(hashes);
506+
}
507+
508+
if all_hashes.is_empty() {
509+
return Ok(());
510+
}
511+
512+
// query translates to the following sql:
513+
// UPDATE l1_message
514+
// SET l2_block_number = CASE
515+
// WHEN hash IN (block1_hashes) THEN block1_number
516+
// WHEN hash IN (block2_hashes) THEN block2_number
517+
// WHEN hash IN (block3_hashes) THEN block3_number
518+
// ELSE 0
519+
// END
520+
// WHERE hash IN (all_hashes)
521+
models::l1_message::Entity::update_many()
522+
.col_expr(models::l1_message::Column::L2BlockNumber, case.into())
523+
.filter(models::l1_message::Column::Hash.is_in(all_hashes))
509524
.exec(self.get_connection())
510525
.await?;
511526

@@ -539,10 +554,12 @@ impl<T: WriteConnectionProvider + ?Sized + Sync> DatabaseWriteOperations for T {
539554
&self,
540555
outcome: BatchConsolidationOutcome,
541556
) -> Result<(), DatabaseError> {
542-
for block in outcome.blocks {
543-
self.insert_block(block.block_info, outcome.batch_info).await?;
544-
self.update_l1_messages_with_l2_block(block).await?;
545-
}
557+
self.insert_blocks(
558+
outcome.blocks.iter().map(|b| b.block_info).collect(),
559+
outcome.batch_info,
560+
)
561+
.await?;
562+
self.update_l1_messages_with_l2_blocks(outcome.blocks).await?;
546563
self.update_skipped_l1_messages(outcome.skipped_l1_messages).await?;
547564
Ok(())
548565
}

crates/sequencer/tests/e2e.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -439,7 +439,7 @@ async fn can_build_blocks_with_finalized_l1_messages() {
439439
assert!(!block.body.transactions.iter().any(|tx| tx.tx_hash() == &unfinalized_message_hash));
440440

441441
// Handle the build block with the sequencer in order to update L1 message queue index.
442-
database.update_l1_messages_with_l2_block((&block).into()).await.unwrap();
442+
database.update_l1_messages_with_l2_blocks(vec![(&block).into()]).await.unwrap();
443443

444444
// update finalized block number to 3, now both messages should be available
445445
database.set_finalized_l1_block_number(3).await.unwrap();

0 commit comments

Comments
 (0)